Skip to content

streamer

sqllocks_spindle.streaming.streamer

SpindleStreamer — main public API for Phase 2 streaming.

Classes

StreamResult dataclass

Result of a streaming run.

Attributes:

Name Type Description
table str

Name of the streamed table.

events_sent int

Total events transmitted to the sink.

anomaly_count int

Number of events flagged as anomalous.

out_of_order_count int

Number of events deliberately reordered.

elapsed_seconds float

Wall-clock duration of the streaming run.

events_per_second_actual float

Measured throughput.

SpindleStreamer

Stream generated synthetic data row-by-row through a :class:StreamWriter.

Workflow
  1. Generate a full batch via the Spindle engine (or accept pre-generated tables).
  2. Convert each row to an event dict, adding _spindle_table and _spindle_seq metadata fields.
  3. Optionally inject anomalies via an :class:~sqllocks_spindle.streaming.anomaly.AnomalyRegistry.
  4. Optionally reorder a fraction of events to simulate out-of-order arrival.
  5. Emit in batches through the configured :class:StreamWriter sink. Rate-limiting (token bucket + Poisson inter-arrivals) is applied only when StreamConfig(realtime=True).

Parameters:

Name Type Description Default
domain

A Domain instance (e.g. RetailDomain()). Mutually exclusive with tables.

None
tables dict[str, DataFrame] | None

Pre-generated dict[table_name, DataFrame] — skips generation.

None
sink StreamWriter | None

:class:StreamWriter to emit events through. Defaults to :class:~sqllocks_spindle.streaming.sinks.console_sink.ConsoleSink.

None
config StreamConfig | None

:class:StreamConfig controlling rate, bursts, OOO, etc.

None
anomaly_registry

Optional :class:~sqllocks_spindle.streaming.anomaly.AnomalyRegistry.

None
scale str

Scale preset for generation (ignored when tables is provided).

'small'
seed int | None

Random seed.

42

Example::

from sqllocks_spindle import RetailDomain
from sqllocks_spindle.streaming import SpindleStreamer, StreamConfig, FileSink

result = SpindleStreamer(
    domain=RetailDomain(),
    sink=FileSink("events.jsonl", mode="w"),
    config=StreamConfig(max_events=500),
).stream("order")

print(result)
Methods:
stream(table, timestamp_column=None)

Stream all rows of table through the configured sink.

Parameters:

Name Type Description Default
table str

Name of the table to stream.

required
timestamp_column str | None

Column to use as _spindle_event_time. If None, the first datetime-typed column is auto-detected.

None

Returns:

Type Description
StreamResult

class:StreamResult with run statistics.

stream_all(timestamp_column=None)

Stream every table in generation order.

Returns:

Type Description
list[StreamResult]

List of :class:StreamResult, one per table.