Services

Streaming

Real-time data streaming between services and clients.

Kinotic services can push multiple values to callers over time using the streaming pattern. Instead of returning a single response, a streaming method returns an Observable that emits values as they become available.

Defining a Streaming Method

Any service method that returns an Observable is treated as a streaming endpoint.

import { Publish } from '@kinotic-ai/core'
import { Observable, interval } from 'rxjs'
import { map } from 'rxjs/operators'

@Publish('com.example')
class SensorService {
    streamReadings(sensorId: string): Observable<SensorReading> {
        return interval(1000).pipe(
            map(() => ({
                sensorId,
                value: Math.random() * 100,
                timestamp: new Date()
            }))
        )
    }
}

Consuming a Stream

Callers receive an Observable and subscribe to process values as they arrive.

const stream$ = sensorService.streamReadings('sensor-1')

const subscription = stream$.subscribe({
    next: (reading) => console.log('Reading:', reading.value),
    error: (err) => console.error('Stream error:', err),
    complete: () => console.log('Stream ended')
})

// Stop listening when done
subscription.unsubscribe()

Use Cases

  • Real-time data feeds -- IoT sensors, live metrics, dashboards
  • Notifications and alerts -- Push updates to connected clients
  • Change data capture -- React to data changes as they happen
  • Large result sets -- Stream results incrementally instead of loading everything at once
Copyright © 2026