multi_writer
sqllocks_spindle.streaming.multi_writer
¶
StreamingMultiWriter — concurrent fan-out streaming to multiple sinks.
Accepts a generate_stream() iterator and fans each (table, DataFrame) batch to all configured StreamWriter sinks in parallel using ThreadPoolExecutor.
Classes¶
SinkResult
dataclass
¶
Per-sink streaming result.
StreamingMultiWriteResult
dataclass
¶
Aggregated result from streaming to multiple sinks.
StreamingMultiWriter
¶
Stream generated data to multiple sinks concurrently.
For each (table_name, DataFrame) batch yielded by generate_stream(),
converts rows to event dicts and fans out to all registered sinks in
parallel via ThreadPoolExecutor.
Usage::
from sqllocks_spindle.streaming import ConsoleSink, FileSink, StreamingMultiWriter
smw = StreamingMultiWriter(
console=ConsoleSink(),
file=FileSink("events.jsonl"),
)
result = smw.stream(spindle.generate_stream(domain=domain, scale="small", seed=42))
print(result.summary())
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int | None
|
Number of parallel threads per batch (default: number of sinks). |
None
|
batch_size
|
int
|
Max rows per send_batch call (default: 100). |
100
|
stop_on_sink_error
|
bool
|
If True, abort streaming when any sink errors. Default False — partial failures are captured and streaming continues. |
False
|
**sinks
|
StreamWriter
|
Keyword arguments of |
{}
|
Methods:¶
add_sink(name, sink)
¶
Add a sink and return self for chaining.
remove_sink(name)
¶
Remove a sink by name.
stream(generator)
¶
Stream (table_name, DataFrame) batches to all sinks in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
generator
|
Iterator[tuple[str, DataFrame]]
|
Iterator yielding (table_name, DataFrame) tuples.
Typically |
required |
Returns:
| Type | Description |
|---|---|
StreamingMultiWriteResult
|
StreamingMultiWriteResult with per-sink stats. |
stream_table(table_name, df)
¶
Stream a single DataFrame to all sinks. Returns {sink_name: success}.