Skip to content

multi_writer

sqllocks_spindle.streaming.multi_writer

StreamingMultiWriter — concurrent fan-out streaming to multiple sinks.

Accepts a generate_stream() iterator and fans each (table, DataFrame) batch to all configured StreamWriter sinks in parallel using ThreadPoolExecutor.

Classes

SinkResult dataclass

Per-sink streaming result.

StreamingMultiWriteResult dataclass

Aggregated result from streaming to multiple sinks.

StreamingMultiWriter

Stream generated data to multiple sinks concurrently.

For each (table_name, DataFrame) batch yielded by generate_stream(), converts rows to event dicts and fans out to all registered sinks in parallel via ThreadPoolExecutor.

Usage::

from sqllocks_spindle.streaming import ConsoleSink, FileSink, StreamingMultiWriter

smw = StreamingMultiWriter(
    console=ConsoleSink(),
    file=FileSink("events.jsonl"),
)
result = smw.stream(spindle.generate_stream(domain=domain, scale="small", seed=42))
print(result.summary())

Parameters:

Name Type Description Default
max_workers int | None

Number of parallel threads per batch (default: number of sinks).

None
batch_size int

Max rows per send_batch call (default: 100).

100
stop_on_sink_error bool

If True, abort streaming when any sink errors. Default False — partial failures are captured and streaming continues.

False
**sinks StreamWriter

Keyword arguments of sink_name=StreamWriter pairs.

{}
Methods:
add_sink(name, sink)

Add a sink and return self for chaining.

remove_sink(name)

Remove a sink by name.

stream(generator)

Stream (table_name, DataFrame) batches to all sinks in parallel.

Parameters:

Name Type Description Default
generator Iterator[tuple[str, DataFrame]]

Iterator yielding (table_name, DataFrame) tuples. Typically Spindle.generate_stream().

required

Returns:

Type Description
StreamingMultiWriteResult

StreamingMultiWriteResult with per-sink stats.

stream_table(table_name, df)

Stream a single DataFrame to all sinks. Returns {sink_name: success}.