Skip to content

streaming

sqllocks_spindle.streaming

Spindle Phase 2 — Streaming engine.

Classes

BurstWindow dataclass

A time window where event rate is multiplied by a factor.

Parameters:

Name Type Description Default
start_offset_seconds float

Seconds from stream start when the burst begins.

required
duration_seconds float

How long the burst lasts.

required
multiplier float

Rate multiplier during the burst (e.g. 10.0 = 10x normal rate).

required

Example::

# 10x rate burst starting at t=30s, lasting 60 seconds
BurstWindow(start_offset_seconds=30, duration_seconds=60, multiplier=10.0)
Methods:
is_active(elapsed)

Return True if the burst window is active at the given elapsed time.

StreamConfig dataclass

Configuration for a Spindle streaming run.

Parameters:

Name Type Description Default
events_per_second float

Base target throughput.

10.0
duration_seconds float | None

Stop after this many wall-clock seconds (None = no limit).

None
max_events int | None

Stop after this many events (None = no limit).

None
out_of_order_fraction float

Fraction of events to reorder (0.0–1.0).

0.0
out_of_order_max_delay_slots int

Maximum slot positions an OOO event is delayed.

10
burst_windows list[BurstWindow]

List of :class:BurstWindow definitions.

list()
time_pattern TimePattern | None

Optional :class:TimePattern for time-of-day multipliers.

None
label_anomalies bool

Keep _spindle_is_anomaly / _spindle_anomaly_type columns in emitted events (default True).

True
batch_size int

Events per :meth:StreamWriter.send_batch call.

100
realtime bool

If True, rate-limit emissions using a token bucket + Poisson inter-arrival times. If False (default), emit as fast as possible (useful for bulk loading and unit tests).

False
Methods:
get_rate_multiplier(elapsed, wall_hour=0, wall_dow=0)

Return the composite rate multiplier at a given moment.

Parameters:

Name Type Description Default
elapsed float

Seconds since stream start.

required
wall_hour int

Current wall-clock hour (0–23).

0
wall_dow int

Current wall-clock day-of-week (0=Monday).

0

TimePattern dataclass

Time-of-day and day-of-week rate multipliers.

Applied on top of the base rate when simulating real-world temporal patterns. E.g., e-commerce has high traffic 7pm–10pm and lower traffic at 3am.

Parameters:

Name Type Description Default
hour_multipliers dict[int, float]

Mapping of hour (0–23) to multiplier.

dict()
dow_multipliers dict[int, float]

Mapping of day-of-week (0=Monday … 6=Sunday) to multiplier.

dict()
Methods:
get_multiplier(hour, dow)

Return the composite multiplier for a given hour and day-of-week.

business_hours() classmethod

Higher traffic 8am–6pm weekdays, very low on weekends.

retail_peak() classmethod

E-commerce pattern: evenings peak, weekends elevated.

StreamWriter

Bases: ABC

Abstract base for all streaming sinks.

Concrete implementations provide the actual transport layer: :class:~sqllocks_spindle.streaming.sinks.console_sink.ConsoleSink, :class:~sqllocks_spindle.streaming.sinks.file_sink.FileSink, :class:~sqllocks_spindle.streaming.sinks.eventhub_sink.EventHubSink, :class:~sqllocks_spindle.streaming.sinks.kafka_sink.KafkaSink.

Subclasses must implement :meth:send_batch. :meth:send has a default implementation that wraps a single event in a list.

Methods:
send(event)

Send a single event.

send_batch(events) abstractmethod

Send a batch of events.

Parameters:

Name Type Description Default
events list[dict[str, Any]]

List of event dicts to transmit.

required
close()

Close any open connections or file handles (no-op by default).

StreamingMultiWriter

Stream generated data to multiple sinks concurrently.

For each (table_name, DataFrame) batch yielded by generate_stream(), converts rows to event dicts and fans out to all registered sinks in parallel via ThreadPoolExecutor.

Usage::

from sqllocks_spindle.streaming import ConsoleSink, FileSink, StreamingMultiWriter

smw = StreamingMultiWriter(
    console=ConsoleSink(),
    file=FileSink("events.jsonl"),
)
result = smw.stream(spindle.generate_stream(domain=domain, scale="small", seed=42))
print(result.summary())

Parameters:

Name Type Description Default
max_workers int | None

Number of parallel threads per batch (default: number of sinks).

None
batch_size int

Max rows per send_batch call (default: 100).

100
stop_on_sink_error bool

If True, abort streaming when any sink errors. Default False — partial failures are captured and streaming continues.

False
**sinks StreamWriter

Keyword arguments of sink_name=StreamWriter pairs.

{}
Methods:
add_sink(name, sink)

Add a sink and return self for chaining.

remove_sink(name)

Remove a sink by name.

stream(generator)

Stream (table_name, DataFrame) batches to all sinks in parallel.

Parameters:

Name Type Description Default
generator Iterator[tuple[str, DataFrame]]

Iterator yielding (table_name, DataFrame) tuples. Typically Spindle.generate_stream().

required

Returns:

Type Description
StreamingMultiWriteResult

StreamingMultiWriteResult with per-sink stats.

stream_table(table_name, df)

Stream a single DataFrame to all sinks. Returns {sink_name: success}.

StreamingMultiWriteResult dataclass

Aggregated result from streaming to multiple sinks.

SinkResult dataclass

Per-sink streaming result.

Anomaly

Bases: ABC

Base class for anomaly definitions.

Attributes
anomaly_type abstractmethod property

Short label used in the _spindle_anomaly_type column.

fraction abstractmethod property

Fraction of eligible rows (or groups) to mark as anomalous.

Methods:
inject(df, rng) abstractmethod

Inject anomalies into df and return a modified copy.

AnomalyRegistry

Registry of anomaly definitions applied during streaming.

Example::

registry = AnomalyRegistry([
    PointAnomaly("extreme_total", column="total_amount"),
    ContextualAnomaly(
        "winter_in_summer",
        column="product_category",
        condition_column="order_month",
        normal_values=[6, 7, 8],
        anomalous_values=["Winter Coats", "Heavy Jackets"],
    ),
])

# Apply to any DataFrame before streaming
df_labelled = registry.inject(df, rng)
Methods:
add(anomaly)

Register an anomaly and return self for chaining.

inject(df, rng)

Apply all registered anomalies to df and return a labelled copy.

Label columns _spindle_is_anomaly and _spindle_anomaly_type are always present in the returned DataFrame, even when the registry is empty.

CollectiveAnomaly dataclass

Bases: Anomaly

A sequence of rows that is abnormal together.

Example: 47 orders from the same customer within 10 minutes (velocity fraud). All rows in the affected group have their timestamp compressed into a short window and are labelled as anomalous.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
group_column str

Column whose distinct values define groups (e.g. customer_id).

required
timestamp_column str

Datetime column to compress into a short window.

required
window_seconds float

Duration of the burst window (default 600 s = 10 min).

600.0
fraction float

Fraction of groups to corrupt (default 0.005).

0.005

ContextualAnomaly dataclass

Bases: Anomaly

Normal value placed in the wrong context.

Example: a winter coat sold in July, or a NULL shipping address on a delivered order.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
column str

Column whose value will be replaced with an anomalous value.

required
condition_column str

Column used to identify "normal" rows to corrupt.

required
normal_values list[Any]

Values of condition_column that make a row eligible.

required
anomalous_values list[Any]

Replacement values to write into column.

required
fraction float

Fraction of eligible rows to corrupt (default 0.01).

0.01

PointAnomaly dataclass

Bases: Anomaly

Single-row anomaly: a column value that is far outside the normal range.

Example: an order total of $99,999 when the average is $50.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
column str

Column to modify.

required
multiplier_range tuple[float, float]

(min, max) multiplier applied to the column mean to produce the anomalous value.

(10.0, 100.0)
fraction float

Fraction of rows to mark (default 0.01 = 1%).

0.01

ConsoleSink

Bases: StreamWriter

Prints events as JSON Lines to stdout.

Useful for local development and quick debugging. No external dependencies.

Parameters:

Name Type Description Default
indent int | None

JSON indent level (None for compact single-line output, 2 for pretty-printed).

None
prefix str

Optional string prepended before each JSON line.

''
file

Output file object (defaults to sys.stdout). Injectable for testing.

None

FileSink

Bases: StreamWriter

Writes events as JSON Lines (one JSON object per line) to a file.

Parameters:

Name Type Description Default
path str | Path

Output file path. Parent directories are created automatically.

required
mode str

File open mode — "a" (append, default) or "w" (overwrite).

'a'

SpindleStreamer

Stream generated synthetic data row-by-row through a :class:StreamWriter.

Workflow
  1. Generate a full batch via the Spindle engine (or accept pre-generated tables).
  2. Convert each row to an event dict, adding _spindle_table and _spindle_seq metadata fields.
  3. Optionally inject anomalies via an :class:~sqllocks_spindle.streaming.anomaly.AnomalyRegistry.
  4. Optionally reorder a fraction of events to simulate out-of-order arrival.
  5. Emit in batches through the configured :class:StreamWriter sink. Rate-limiting (token bucket + Poisson inter-arrivals) is applied only when StreamConfig(realtime=True).

Parameters:

Name Type Description Default
domain

A Domain instance (e.g. RetailDomain()). Mutually exclusive with tables.

None
tables dict[str, DataFrame] | None

Pre-generated dict[table_name, DataFrame] — skips generation.

None
sink StreamWriter | None

:class:StreamWriter to emit events through. Defaults to :class:~sqllocks_spindle.streaming.sinks.console_sink.ConsoleSink.

None
config StreamConfig | None

:class:StreamConfig controlling rate, bursts, OOO, etc.

None
anomaly_registry

Optional :class:~sqllocks_spindle.streaming.anomaly.AnomalyRegistry.

None
scale str

Scale preset for generation (ignored when tables is provided).

'small'
seed int | None

Random seed.

42

Example::

from sqllocks_spindle import RetailDomain
from sqllocks_spindle.streaming import SpindleStreamer, StreamConfig, FileSink

result = SpindleStreamer(
    domain=RetailDomain(),
    sink=FileSink("events.jsonl", mode="w"),
    config=StreamConfig(max_events=500),
).stream("order")

print(result)
Methods:
stream(table, timestamp_column=None)

Stream all rows of table through the configured sink.

Parameters:

Name Type Description Default
table str

Name of the table to stream.

required
timestamp_column str | None

Column to use as _spindle_event_time. If None, the first datetime-typed column is auto-detected.

None

Returns:

Type Description
StreamResult

class:StreamResult with run statistics.

stream_all(timestamp_column=None)

Stream every table in generation order.

Returns:

Type Description
list[StreamResult]

List of :class:StreamResult, one per table.

StreamResult dataclass

Result of a streaming run.

Attributes:

Name Type Description
table str

Name of the streamed table.

events_sent int

Total events transmitted to the sink.

anomaly_count int

Number of events flagged as anomalous.

out_of_order_count int

Number of events deliberately reordered.

elapsed_seconds float

Wall-clock duration of the streaming run.

events_per_second_actual float

Measured throughput.