streaming
sqllocks_spindle.streaming
¶
Spindle Phase 2 — Streaming engine.
Classes¶
BurstWindow
dataclass
¶
A time window where event rate is multiplied by a factor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_offset_seconds
|
float
|
Seconds from stream start when the burst begins. |
required |
duration_seconds
|
float
|
How long the burst lasts. |
required |
multiplier
|
float
|
Rate multiplier during the burst (e.g. 10.0 = 10x normal rate). |
required |
Example::
# 10x rate burst starting at t=30s, lasting 60 seconds
BurstWindow(start_offset_seconds=30, duration_seconds=60, multiplier=10.0)
StreamConfig
dataclass
¶
Configuration for a Spindle streaming run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events_per_second
|
float
|
Base target throughput. |
10.0
|
duration_seconds
|
float | None
|
Stop after this many wall-clock seconds ( |
None
|
max_events
|
int | None
|
Stop after this many events ( |
None
|
out_of_order_fraction
|
float
|
Fraction of events to reorder (0.0–1.0). |
0.0
|
out_of_order_max_delay_slots
|
int
|
Maximum slot positions an OOO event is delayed. |
10
|
burst_windows
|
list[BurstWindow]
|
List of :class: |
list()
|
time_pattern
|
TimePattern | None
|
Optional :class: |
None
|
label_anomalies
|
bool
|
Keep |
True
|
batch_size
|
int
|
Events per :meth: |
100
|
realtime
|
bool
|
If |
False
|
Methods:¶
get_rate_multiplier(elapsed, wall_hour=0, wall_dow=0)
¶
Return the composite rate multiplier at a given moment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
elapsed
|
float
|
Seconds since stream start. |
required |
wall_hour
|
int
|
Current wall-clock hour (0–23). |
0
|
wall_dow
|
int
|
Current wall-clock day-of-week (0=Monday). |
0
|
TimePattern
dataclass
¶
Time-of-day and day-of-week rate multipliers.
Applied on top of the base rate when simulating real-world temporal patterns. E.g., e-commerce has high traffic 7pm–10pm and lower traffic at 3am.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hour_multipliers
|
dict[int, float]
|
Mapping of hour (0–23) to multiplier. |
dict()
|
dow_multipliers
|
dict[int, float]
|
Mapping of day-of-week (0=Monday … 6=Sunday) to multiplier. |
dict()
|
StreamWriter
¶
Bases: ABC
Abstract base for all streaming sinks.
Concrete implementations provide the actual transport layer:
:class:~sqllocks_spindle.streaming.sinks.console_sink.ConsoleSink,
:class:~sqllocks_spindle.streaming.sinks.file_sink.FileSink,
:class:~sqllocks_spindle.streaming.sinks.eventhub_sink.EventHubSink,
:class:~sqllocks_spindle.streaming.sinks.kafka_sink.KafkaSink.
Subclasses must implement :meth:send_batch. :meth:send has a default
implementation that wraps a single event in a list.
StreamingMultiWriter
¶
Stream generated data to multiple sinks concurrently.
For each (table_name, DataFrame) batch yielded by generate_stream(),
converts rows to event dicts and fans out to all registered sinks in
parallel via ThreadPoolExecutor.
Usage::
from sqllocks_spindle.streaming import ConsoleSink, FileSink, StreamingMultiWriter
smw = StreamingMultiWriter(
console=ConsoleSink(),
file=FileSink("events.jsonl"),
)
result = smw.stream(spindle.generate_stream(domain=domain, scale="small", seed=42))
print(result.summary())
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int | None
|
Number of parallel threads per batch (default: number of sinks). |
None
|
batch_size
|
int
|
Max rows per send_batch call (default: 100). |
100
|
stop_on_sink_error
|
bool
|
If True, abort streaming when any sink errors. Default False — partial failures are captured and streaming continues. |
False
|
**sinks
|
StreamWriter
|
Keyword arguments of |
{}
|
Methods:¶
add_sink(name, sink)
¶
Add a sink and return self for chaining.
remove_sink(name)
¶
Remove a sink by name.
stream(generator)
¶
Stream (table_name, DataFrame) batches to all sinks in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
generator
|
Iterator[tuple[str, DataFrame]]
|
Iterator yielding (table_name, DataFrame) tuples.
Typically |
required |
Returns:
| Type | Description |
|---|---|
StreamingMultiWriteResult
|
StreamingMultiWriteResult with per-sink stats. |
stream_table(table_name, df)
¶
Stream a single DataFrame to all sinks. Returns {sink_name: success}.
StreamingMultiWriteResult
dataclass
¶
Aggregated result from streaming to multiple sinks.
SinkResult
dataclass
¶
Per-sink streaming result.
Anomaly
¶
Bases: ABC
Base class for anomaly definitions.
AnomalyRegistry
¶
Registry of anomaly definitions applied during streaming.
Example::
registry = AnomalyRegistry([
PointAnomaly("extreme_total", column="total_amount"),
ContextualAnomaly(
"winter_in_summer",
column="product_category",
condition_column="order_month",
normal_values=[6, 7, 8],
anomalous_values=["Winter Coats", "Heavy Jackets"],
),
])
# Apply to any DataFrame before streaming
df_labelled = registry.inject(df, rng)
CollectiveAnomaly
dataclass
¶
Bases: Anomaly
A sequence of rows that is abnormal together.
Example: 47 orders from the same customer within 10 minutes (velocity fraud). All rows in the affected group have their timestamp compressed into a short window and are labelled as anomalous.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
group_column
|
str
|
Column whose distinct values define groups (e.g. customer_id). |
required |
timestamp_column
|
str
|
Datetime column to compress into a short window. |
required |
window_seconds
|
float
|
Duration of the burst window (default 600 s = 10 min). |
600.0
|
fraction
|
float
|
Fraction of groups to corrupt (default 0.005). |
0.005
|
ContextualAnomaly
dataclass
¶
Bases: Anomaly
Normal value placed in the wrong context.
Example: a winter coat sold in July, or a NULL shipping address on a delivered order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
column
|
str
|
Column whose value will be replaced with an anomalous value. |
required |
condition_column
|
str
|
Column used to identify "normal" rows to corrupt. |
required |
normal_values
|
list[Any]
|
Values of |
required |
anomalous_values
|
list[Any]
|
Replacement values to write into |
required |
fraction
|
float
|
Fraction of eligible rows to corrupt (default 0.01). |
0.01
|
PointAnomaly
dataclass
¶
Bases: Anomaly
Single-row anomaly: a column value that is far outside the normal range.
Example: an order total of $99,999 when the average is $50.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
column
|
str
|
Column to modify. |
required |
multiplier_range
|
tuple[float, float]
|
|
(10.0, 100.0)
|
fraction
|
float
|
Fraction of rows to mark (default 0.01 = 1%). |
0.01
|
ConsoleSink
¶
Bases: StreamWriter
Prints events as JSON Lines to stdout.
Useful for local development and quick debugging. No external dependencies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
indent
|
int | None
|
JSON indent level ( |
None
|
prefix
|
str
|
Optional string prepended before each JSON line. |
''
|
file
|
Output file object (defaults to |
None
|
FileSink
¶
Bases: StreamWriter
Writes events as JSON Lines (one JSON object per line) to a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Output file path. Parent directories are created automatically. |
required |
mode
|
str
|
File open mode — |
'a'
|
SpindleStreamer
¶
Stream generated synthetic data row-by-row through a :class:StreamWriter.
Workflow¶
- Generate a full batch via the Spindle engine (or accept pre-generated tables).
- Convert each row to an event dict, adding
_spindle_tableand_spindle_seqmetadata fields. - Optionally inject anomalies via an :class:
~sqllocks_spindle.streaming.anomaly.AnomalyRegistry. - Optionally reorder a fraction of events to simulate out-of-order arrival.
- Emit in batches through the configured :class:
StreamWritersink. Rate-limiting (token bucket + Poisson inter-arrivals) is applied only whenStreamConfig(realtime=True).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
A Domain instance (e.g. |
None
|
|
tables
|
dict[str, DataFrame] | None
|
Pre-generated |
None
|
sink
|
StreamWriter | None
|
:class: |
None
|
config
|
StreamConfig | None
|
:class: |
None
|
anomaly_registry
|
Optional :class: |
None
|
|
scale
|
str
|
Scale preset for generation (ignored when tables is provided). |
'small'
|
seed
|
int | None
|
Random seed. |
42
|
Example::
from sqllocks_spindle import RetailDomain
from sqllocks_spindle.streaming import SpindleStreamer, StreamConfig, FileSink
result = SpindleStreamer(
domain=RetailDomain(),
sink=FileSink("events.jsonl", mode="w"),
config=StreamConfig(max_events=500),
).stream("order")
print(result)
Methods:¶
stream(table, timestamp_column=None)
¶
Stream all rows of table through the configured sink.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
Name of the table to stream. |
required |
timestamp_column
|
str | None
|
Column to use as |
None
|
Returns:
| Type | Description |
|---|---|
StreamResult
|
class: |
stream_all(timestamp_column=None)
¶
Stream every table in generation order.
Returns:
| Type | Description |
|---|---|
list[StreamResult]
|
List of :class: |
StreamResult
dataclass
¶
Result of a streaming run.
Attributes:
| Name | Type | Description |
|---|---|---|
table |
str
|
Name of the streamed table. |
events_sent |
int
|
Total events transmitted to the sink. |
anomaly_count |
int
|
Number of events flagged as anomalous. |
out_of_order_count |
int
|
Number of events deliberately reordered. |
elapsed_seconds |
float
|
Wall-clock duration of the streaming run. |
events_per_second_actual |
float
|
Measured throughput. |