Skip to content

stream_emit

sqllocks_spindle.simulation.stream_emit

Stream emitter — wraps SpindleStreamer with envelope, replay, and schema tracking.

Classes

StreamEmitConfig dataclass

Configuration for :class:StreamEmitter.

Parameters:

Name Type Description Default
rate_per_sec float

Base events-per-second target.

10.0
jitter_ms float

Maximum random jitter added between events (milliseconds).

0.0
burst_windows list[BurstWindow]

List of :class:BurstWindow definitions.

list()
out_of_order_probability float

Fraction of events to deliver out of order.

0.0
replay_enabled bool

Whether to re-emit events from a sliding window.

False
replay_window_minutes float

Length of the replay window.

5.0
replay_probability float

Per-tick probability of triggering a replay burst.

0.05
replay_burst_size int

Number of events re-emitted per replay trigger.

10
topics list[str]

List of topic names (one per table, or explicit mapping).

list()
envelope_source str

Value for the envelope source field.

'spindle'
envelope_schema_version str

Default schema version string.

'1.0'
sink_type str

Sink identifier — "console", "file", "eventhub", "eventstream".

'console'
sink_connection dict[str, Any]

Connection info dict (keys depend on sink_type).

dict()
max_events int | None

Stop after this many events (None = no limit).

None
realtime bool

Pace emission in real time (vs. fast-as-possible).

False
seed int

Random seed.

42

StreamEmitResult dataclass

Result of a :meth:StreamEmitter.emit run.

Attributes:

Name Type Description
events_sent int

Total primary events emitted.

replay_events_sent int

Events re-emitted via the replay window.

topics_used set[str]

Set of topic strings written to.

elapsed_seconds float

Wall-clock duration.

schema_versions dict[str, str]

Mapping of topic -> schema version used.

StreamEmitter

Emit generated tables as enveloped streaming events.

Builds on :class:~sqllocks_spindle.streaming.streamer.SpindleStreamer but adds:

  • Event envelopes — each event is wrapped in a CloudEvents-style envelope with id, time, topic, schema_version, and correlation_id.
  • Replay window — a sliding buffer of recently emitted events that can be re-sent to simulate consumer-side replays / redeliveries.
  • Schema version tracking — each topic carries a schema version string so downstream consumers can detect schema evolution.

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Pre-generated dict[table_name, DataFrame].

required
config StreamEmitConfig | None

:class:StreamEmitConfig.

None
sink StreamWriter | None

Optional explicit :class:StreamWriter. If None, one is built from config.sink_type / config.sink_connection.

None

Example::

from sqllocks_spindle.simulation import StreamEmitter, StreamEmitConfig

cfg = StreamEmitConfig(rate_per_sec=50, topics=["orders", "returns"])
result = StreamEmitter(tables=gen_result.tables, config=cfg).emit()
Methods:
emit(tables=None, config=None)

Emit all rows from tables (or the instance tables) as enveloped events.

Parameters:

Name Type Description Default
tables dict[str, DataFrame] | None

Override tables for this call.

None
config StreamEmitConfig | None

Override config for this call.

None

Returns:

Type Description
StreamEmitResult

class:StreamEmitResult with run statistics.