streamer
sqllocks_spindle.streaming.streamer
¶
SpindleStreamer — main public API for Phase 2 streaming.
Classes¶
StreamResult
dataclass
¶
Result of a streaming run.
Attributes:
| Name | Type | Description |
|---|---|---|
table |
str
|
Name of the streamed table. |
events_sent |
int
|
Total events transmitted to the sink. |
anomaly_count |
int
|
Number of events flagged as anomalous. |
out_of_order_count |
int
|
Number of events deliberately reordered. |
elapsed_seconds |
float
|
Wall-clock duration of the streaming run. |
events_per_second_actual |
float
|
Measured throughput. |
SpindleStreamer
¶
Stream generated synthetic data row-by-row through a :class:StreamWriter.
Workflow¶
- Generate a full batch via the Spindle engine (or accept pre-generated tables).
- Convert each row to an event dict, adding
_spindle_tableand_spindle_seqmetadata fields. - Optionally inject anomalies via an :class:
~sqllocks_spindle.streaming.anomaly.AnomalyRegistry. - Optionally reorder a fraction of events to simulate out-of-order arrival.
- Emit in batches through the configured :class:
StreamWritersink. Rate-limiting (token bucket + Poisson inter-arrivals) is applied only whenStreamConfig(realtime=True).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
A Domain instance (e.g. |
None
|
|
tables
|
dict[str, DataFrame] | None
|
Pre-generated |
None
|
sink
|
StreamWriter | None
|
:class: |
None
|
config
|
StreamConfig | None
|
:class: |
None
|
anomaly_registry
|
Optional :class: |
None
|
|
scale
|
str
|
Scale preset for generation (ignored when tables is provided). |
'small'
|
seed
|
int | None
|
Random seed. |
42
|
Example::
from sqllocks_spindle import RetailDomain
from sqllocks_spindle.streaming import SpindleStreamer, StreamConfig, FileSink
result = SpindleStreamer(
domain=RetailDomain(),
sink=FileSink("events.jsonl", mode="w"),
config=StreamConfig(max_events=500),
).stream("order")
print(result)
Methods:¶
stream(table, timestamp_column=None)
¶
Stream all rows of table through the configured sink.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
Name of the table to stream. |
required |
timestamp_column
|
str | None
|
Column to use as |
None
|
Returns:
| Type | Description |
|---|---|
StreamResult
|
class: |
stream_all(timestamp_column=None)
¶
Stream every table in generation order.
Returns:
| Type | Description |
|---|---|
list[StreamResult]
|
List of :class: |