stream_emit
sqllocks_spindle.simulation.stream_emit
¶
Stream emitter — wraps SpindleStreamer with envelope, replay, and schema tracking.
Classes¶
StreamEmitConfig
dataclass
¶
Configuration for :class:StreamEmitter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rate_per_sec
|
float
|
Base events-per-second target. |
10.0
|
jitter_ms
|
float
|
Maximum random jitter added between events (milliseconds). |
0.0
|
burst_windows
|
list[BurstWindow]
|
List of :class: |
list()
|
out_of_order_probability
|
float
|
Fraction of events to deliver out of order. |
0.0
|
replay_enabled
|
bool
|
Whether to re-emit events from a sliding window. |
False
|
replay_window_minutes
|
float
|
Length of the replay window. |
5.0
|
replay_probability
|
float
|
Per-tick probability of triggering a replay burst. |
0.05
|
replay_burst_size
|
int
|
Number of events re-emitted per replay trigger. |
10
|
topics
|
list[str]
|
List of topic names (one per table, or explicit mapping). |
list()
|
envelope_source
|
str
|
Value for the envelope |
'spindle'
|
envelope_schema_version
|
str
|
Default schema version string. |
'1.0'
|
sink_type
|
str
|
Sink identifier — |
'console'
|
sink_connection
|
dict[str, Any]
|
Connection info dict (keys depend on |
dict()
|
max_events
|
int | None
|
Stop after this many events ( |
None
|
realtime
|
bool
|
Pace emission in real time (vs. fast-as-possible). |
False
|
seed
|
int
|
Random seed. |
42
|
StreamEmitResult
dataclass
¶
Result of a :meth:StreamEmitter.emit run.
Attributes:
| Name | Type | Description |
|---|---|---|
events_sent |
int
|
Total primary events emitted. |
replay_events_sent |
int
|
Events re-emitted via the replay window. |
topics_used |
set[str]
|
Set of topic strings written to. |
elapsed_seconds |
float
|
Wall-clock duration. |
schema_versions |
dict[str, str]
|
Mapping of topic -> schema version used. |
StreamEmitter
¶
Emit generated tables as enveloped streaming events.
Builds on :class:~sqllocks_spindle.streaming.streamer.SpindleStreamer
but adds:
- Event envelopes — each event is wrapped in a CloudEvents-style
envelope with
id,time,topic,schema_version, andcorrelation_id. - Replay window — a sliding buffer of recently emitted events that can be re-sent to simulate consumer-side replays / redeliveries.
- Schema version tracking — each topic carries a schema version string so downstream consumers can detect schema evolution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tables
|
dict[str, DataFrame]
|
Pre-generated |
required |
config
|
StreamEmitConfig | None
|
:class: |
None
|
sink
|
StreamWriter | None
|
Optional explicit :class: |
None
|
Example::
from sqllocks_spindle.simulation import StreamEmitter, StreamEmitConfig
cfg = StreamEmitConfig(rate_per_sec=50, topics=["orders", "returns"])
result = StreamEmitter(tables=gen_result.tables, config=cfg).emit()
Methods:¶
emit(tables=None, config=None)
¶
Emit all rows from tables (or the instance tables) as enveloped events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tables
|
dict[str, DataFrame] | None
|
Override tables for this call. |
None
|
config
|
StreamEmitConfig | None
|
Override config for this call. |
None
|
Returns:
| Type | Description |
|---|---|
StreamEmitResult
|
class: |