Skip to content

simulation

sqllocks_spindle.simulation

Spindle simulation layer — file-drop, stream-emit, hybrid, financial-stream, IoT telemetry, and clickstream simulators.

Classes

ClickstreamConfig dataclass

Configuration for :class:ClickstreamSimulator.

Parameters:

Name Type Description Default
users int

Number of distinct users to simulate.

1000
duration_hours float

Total simulation window in hours.

24.0
avg_sessions_per_user float

Average number of sessions each user starts.

2.5
avg_pages_per_session float

Average page views for non-bounced sessions.

5.0
bounce_rate float

Fraction of human sessions that are single-page bounces.

0.35
funnel_enabled bool

Whether to track conversion funnel progression.

True
funnel_stages list[str]

Ordered list of funnel stage names.

(lambda: ['landing', 'product', 'cart', 'checkout', 'confirmation'])()
funnel_drop_rate float

Per-stage probability of a user abandoning.

0.4
bot_traffic_enabled bool

Whether to inject bot/crawler sessions.

True
bot_fraction float

Fraction of all sessions that come from bots.

0.15
bot_pages_per_session int

Number of pages a bot crawls per session.

50
page_pool list[str]

URL templates available for page views.

(lambda: ['/', '/products', '/products/{id}', '/cart', '/checkout', '/about', '/contact', '/blog', '/blog/{slug}', '/search', '/account', '/faq'])()
referrer_sources list[str]

Traffic sources for session attribution.

(lambda: ['direct', 'google', 'bing', 'facebook', 'twitter', 'email', 'reddit'])()
device_types list[str]

Device type labels for human sessions.

(lambda: ['desktop', 'mobile', 'tablet'])()
seed int

Random seed for reproducibility.

42

ClickstreamResult dataclass

Result of :meth:ClickstreamSimulator.run.

Attributes:

Name Type Description
sessions DataFrame

One row per session with metadata.

page_views DataFrame

One row per page view with timestamps and dwell times.

funnels DataFrame

Funnel progression records per session.

stats dict[str, Any]

Summary statistics dictionary.

ClickstreamSimulator

Generate realistic web clickstream data.

Produces sessions, page views, conversion funnels, and bot traffic for a configurable number of users over a simulation window.

Parameters:

Name Type Description Default
config ClickstreamConfig | None

:class:ClickstreamConfig (uses defaults if None).

None

Example::

cfg = ClickstreamConfig(users=500, duration_hours=12)
sim = ClickstreamSimulator(cfg)
result = sim.run()
print(result.stats)
Methods:
run()

Execute the simulation and return a :class:ClickstreamResult.

FileDropConfig dataclass

Configuration for file-drop simulation.

Parameters:

Name Type Description Default
domain str

Domain name used in path construction (e.g. "retail").

'default'
base_path str

Root directory for landing files. Maps to Files/landing in a Fabric lakehouse.

'Files/landing'
cadence str

Drop cadence — "daily", "hourly", or "every_15m".

'daily'
date_range_start str

Inclusive start date as YYYY-MM-DD.

''
date_range_end str

Inclusive end date as YYYY-MM-DD.

''
partitioning str

Partition folder template.

'dt=YYYY-MM-DD'
formats list[str]

File formats to write ("parquet", "csv", "jsonl").

(lambda: ['parquet'])()
file_naming str

File naming template. Placeholders: {domain}, {entity}, {dt}, {seq}, {ext}.

'{domain}_{entity}_{dt}_{seq}.{ext}'
entities list[str]

Restrict simulation to these table names. Empty = all tables.

list()
manifest_enabled bool

Write a _manifest.json per partition folder.

True
done_flag_enabled bool

Write a _done sentinel per partition folder.

True
lateness_enabled bool

Inject late-arriving rows (data from previous days).

True
lateness_probability float

Per-row probability of being marked late.

0.1
max_days_late int

Maximum staleness for late rows.

3
duplicates_enabled bool

Inject duplicate rows.

False
duplicate_probability float

Per-row probability of duplication.

0.02
backfill_enabled bool

Re-drop historical partitions.

False
max_days_back int

How far back a backfill can reach.

0
seed int

Random seed for reproducibility.

42

FileDropResult dataclass

Result of a file-drop simulation run.

Attributes:

Name Type Description
files_written list[Path]

All data file paths written.

manifest_paths list[Path]

Paths to _manifest.json files.

done_flag_paths list[Path]

Paths to _done sentinel files.

stats dict[str, Any]

Per-entity statistics dict.

FileDropSimulator

Simulate an upstream source dropping files on a cadence over a date range.

For each simulated time slot the simulator
  1. Slices rows belonging to that slot (temporal column or round-robin).
  2. Writes partitioned data files to disk.
  3. Optionally writes a manifest and done-flag.
  4. Optionally injects late arrivals, duplicates, and backfills.

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Mapping of table_name -> DataFrame (from :class:~sqllocks_spindle.engine.generator.GenerationResult).

required
config FileDropConfig

:class:FileDropConfig controlling paths, cadence, and data-quality anomalies.

required

Example::

from sqllocks_spindle.simulation import FileDropSimulator, FileDropConfig

cfg = FileDropConfig(
    domain="retail",
    date_range_start="2024-01-01",
    date_range_end="2024-01-31",
)
result = FileDropSimulator(tables=gen_result.tables, config=cfg).run()
Methods:
run()

Execute the file-drop simulation and return results.

FinancialStreamConfig dataclass

Configuration for :class:FinancialStreamSimulator.

Parameters:

Name Type Description Default
duration_hours float

Total simulation window in hours.

24.0
reversal_enabled bool

Whether to generate transaction reversals.

True
reversal_probability float

Fraction of transactions that get reversed.

0.03
reversal_delay_hours_max float

Maximum delay between original transaction and its reversal.

48.0
fraud_burst_enabled bool

Whether to generate fraud burst events.

True
fraud_burst_probability float

Per-hour probability of a fraud burst occurring.

0.01
fraud_burst_count int

Number of transactions per fraud burst.

15
fraud_burst_amount_range tuple[float, float]

(min, max) dollar range for fraud transactions.

(500.0, 10000.0)
settlement_enabled bool

Whether to generate settlement batch records.

True
settlement_batch_hours float

Settlement runs every N hours.

4.0
settlement_success_rate float

Fraction of settlements that succeed.

0.98
seed int

Random seed for reproducibility.

42

FinancialStreamResult dataclass

Result of a :meth:FinancialStreamSimulator.run execution.

Attributes:

Name Type Description
transactions DataFrame

Combined DataFrame — original transactions plus any generated reversals and fraud events.

reversals DataFrame

DataFrame containing only reversal records.

fraud_events DataFrame

DataFrame containing only fraud burst records.

settlements DataFrame

DataFrame containing settlement batch results.

stats dict[str, Any]

Summary statistics for the simulation run.

FinancialStreamSimulator

Generate financial transaction stream anomalies.

Layers reversals, fraud bursts, and settlement events on top of pre-generated transaction and account DataFrames.

Parameters:

Name Type Description Default
transactions_df DataFrame | None

Base transactions DataFrame. Expected to contain at least transaction_id, account_id, amount, and transaction_time columns.

None
accounts_df DataFrame | None

Accounts DataFrame. Expected to contain at least account_id.

None
config FinancialStreamConfig | None

:class:FinancialStreamConfig (uses defaults if None).

None

Example::

cfg = FinancialStreamConfig(duration_hours=24, seed=99)
sim = FinancialStreamSimulator(txns, accounts, cfg)
result = sim.run()
print(result.stats)
Methods:
run()

Execute the simulation and return a :class:FinancialStreamResult.

HybridConfig dataclass

Configuration for :class:HybridSimulator.

Specifies which tables (or all) go to the streaming path, which go to the micro-batch file-drop path, and how they are linked.

Parameters:

Name Type Description Default
stream_to str

Streaming target — "eventhouse", "lakehouse", or "both". Informational; the actual sink is determined by the embedded :attr:stream_config.

'eventhouse'
micro_batch_to str

Batch target — "lakehouse_files" (default).

'lakehouse_files'
stream_tables list[str]

Tables routed to the stream path. Empty = all.

list()
batch_tables list[str]

Tables routed to the file-drop path. Empty = all.

list()
stream_config StreamEmitConfig

Full :class:StreamEmitConfig for the streaming side.

StreamEmitConfig()
file_drop_config FileDropConfig

Full :class:FileDropConfig for the batch side.

FileDropConfig()
link_strategy str

How stream events and batch files are correlated. "correlation_id" (default) stamps every event and manifest with the same run-level id. "natural_keys" relies on matching business keys across both paths.

'correlation_id'
concurrent bool

Run stream and batch phases in parallel threads.

False
seed int

Random seed (propagated to child configs if they use defaults).

42

HybridResult dataclass

Result of a :meth:HybridSimulator.run invocation.

Attributes:

Name Type Description
file_drop_result FileDropResult | None

Result from the batch (file-drop) phase.

stream_result StreamEmitResult | None

Result from the streaming phase.

correlation_id str

Run-level correlation id linking both outputs.

link_strategy str

The linking strategy that was applied.

HybridSimulator

Run file-drop and stream-emit simulations together, linked by correlation.

The simulator:

  1. Splits the input tables into batch tables and stream tables based on :attr:HybridConfig.batch_tables / :attr:HybridConfig.stream_tables. By default all tables go to both paths.
  2. Stamps a shared correlation_id into every stream envelope and every batch manifest so downstream pipelines can join the two.
  3. Optionally runs both phases concurrently in threads.

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Pre-generated dict[table_name, DataFrame].

required
config HybridConfig | None

:class:HybridConfig.

None
sink StreamWriter | None

Optional explicit :class:StreamWriter for the streaming side.

None

Example::

from sqllocks_spindle.simulation import (
    HybridSimulator, HybridConfig, FileDropConfig, StreamEmitConfig,
)

cfg = HybridConfig(
    file_drop_config=FileDropConfig(
        domain="retail",
        date_range_start="2024-01-01",
        date_range_end="2024-01-07",
    ),
    stream_config=StreamEmitConfig(rate_per_sec=20),
    concurrent=True,
)
result = HybridSimulator(tables=gen_result.tables, config=cfg).run()
Methods:
run()

Execute both simulation phases and return combined results.

IoTTelemetryConfig dataclass

Configuration for :class:IoTTelemetrySimulator.

Parameters:

Name Type Description Default
fleet_size int

Number of active devices in the simulation.

50
duration_hours float

Total simulation window in hours.

24.0
reading_interval_seconds float

Nominal seconds between consecutive readings.

60.0
drift_enabled bool

Whether to apply gradual sensor drift.

True
drift_rate float

Per-reading drift magnitude (additive random-walk step size).

0.001
drift_probability float

Fraction of sensors that will experience drift.

0.1
missing_enabled bool

Whether to inject missing (NaN) readings.

True
missing_probability float

Per-reading chance of a value being set to NaN.

0.05
alert_storm_enabled bool

Whether to generate alert storm bursts.

True
alert_storm_probability float

Per-hour chance of an alert storm starting.

0.02
alert_storm_duration_minutes float

How long each alert storm lasts.

15.0
alert_storm_rate_multiplier float

Alert frequency multiplier during a storm.

10.0
battery_drain_enabled bool

Whether to simulate battery drain over time.

True
battery_drain_rate float

Battery percentage lost per hour.

0.1
seed int

Random seed for reproducibility.

42

IoTTelemetryResult dataclass

Result of :meth:IoTTelemetrySimulator.run.

Attributes:

Name Type Description
readings DataFrame

Modified readings DataFrame with drift and missing values applied.

alerts DataFrame

Generated alert events including alert-storm bursts.

fleet_status DataFrame

Per-device status snapshot at the end of the simulation.

stats dict[str, Any]

Summary statistics dictionary.

IoTTelemetrySimulator

Simulate realistic IoT telemetry anomalies on top of generated data.

Takes pre-generated readings and devices DataFrames (from the IoT domain generator) and applies configurable anomaly layers: sensor drift, missing readings, alert storms, and battery drain.

Parameters:

Name Type Description Default
readings_df DataFrame | None

Base readings DataFrame. Expected to contain at least device_id (or sensor_id), a numeric value column, and a reading_time (or created_at) timestamp column.

None
devices_df DataFrame | None

Devices DataFrame. Expected to contain at least device_id and optionally battery_level.

None
config IoTTelemetryConfig | None

:class:IoTTelemetryConfig.

None

Example::

sim = IoTTelemetrySimulator(readings_df=readings, devices_df=devices)
result = sim.run()
result.readings.head()
Methods:
run()

Execute all enabled simulation layers and return results.

OperationalLogConfig dataclass

Configuration for operational log simulation.

Parameters:

Name Type Description Default
service_count int

Number of services (uses first N from default list).

5
services list[dict[str, Any]]

Override service definitions.

list()
duration_hours float

Time span of the simulation.

24.0
start_time str

Simulation start (ISO format).

'2024-01-01T00:00:00'
events_per_hour float

Base event rate per service per hour.

100.0
latency_mean_ms float

Mean request latency (log-normal distribution).

50.0
latency_std_ms float

Std dev for log-normal latency.

30.0
latency_spike_enabled bool

Inject latency spikes.

True
latency_spike_probability float

Per-hour chance of a latency spike.

0.05
latency_spike_multiplier float

Multiplier for latency during spikes.

10.0
latency_spike_duration_minutes float

Duration of spike window.

15.0
outage_enabled bool

Inject outage storms.

True
outage_probability float

Per-hour chance of an outage starting.

0.02
outage_duration_minutes float

How long the outage lasts.

30.0
outage_error_rate float

Fraction of requests that fail during outage.

0.8
trace_enabled bool

Generate distributed trace/span IDs.

True
trace_depth_mean float

Mean number of spans per trace.

3.5
error_burst_enabled bool

Inject error bursts.

True
error_burst_probability float

Per-hour chance of error burst.

0.03
error_burst_count int

Errors per burst.

50
seed int

Random seed.

42

OperationalLogResult dataclass

Result of an operational log simulation.

Attributes:

Name Type Description
logs DataFrame

All log events sorted by timestamp.

traces DataFrame

Distributed trace records (trace_id, spans).

service_health DataFrame

Per-service health summary.

stats dict[str, Any]

Aggregate statistics.

OperationalLogSimulator

Generate synthetic operational / observability log data.

Produces structured log events with distributed tracing, latency distributions, error bursts, and outage simulation.

Methods:
run()

Execute the operational log simulation.

SCD2FileDropConfig dataclass

Configuration for SCD2 file-drop simulation.

Parameters:

Name Type Description Default
domain str

Domain name used in path construction (e.g. "retail").

'default'
base_path str

Root directory for landing files.

'Files/landing'
business_key_column str

Column that identifies the business entity.

'id'
scd2_columns list[str]

Columns to track for changes.

list()
effective_date_column str

Name of the valid-from column.

'valid_from'
end_date_column str

Name of the valid-to column.

'valid_to'
is_current_column str

Name of the is-current flag column.

'is_current'
initial_load_date str

Date string for the initial snapshot (YYYY-MM-DD).

'2024-01-01'
num_delta_days int

Number of daily delta files to generate.

30
daily_change_rate float

Fraction of records that change per day.

0.05
daily_new_rate float

Fraction of new records per day (relative to initial count).

0.02
formats list[str]

File formats to write ("parquet", "csv", "jsonl").

(lambda: ['parquet'])()
manifest_enabled bool

Write a _manifest.json alongside each drop.

True
seed int

Random seed for reproducibility.

42

SCD2FileDropResult dataclass

Result of an SCD2 file-drop simulation run.

Attributes:

Name Type Description
initial_load_path Path

Path to the initial full-load file.

delta_paths list[Path]

Paths to daily delta files.

manifest_paths list[Path]

Paths to _manifest.json files.

stats dict[str, Any]

Aggregate statistics for the simulation run.

SCD2FileDropSimulator

Simulate an upstream source landing SCD2-versioned files over time.

Generates an initial full snapshot and then daily delta files containing INSERT rows (new entities) and UPDATE rows (changed entities with valid_from / valid_to / is_current tracking).

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Mapping of entity_name -> DataFrame from a generation result.

required
config SCD2FileDropConfig

:class:SCD2FileDropConfig controlling paths and simulation parameters.

required

Example::

from sqllocks_spindle.simulation.scd2_file_drops import (
    SCD2FileDropSimulator,
    SCD2FileDropConfig,
)

cfg = SCD2FileDropConfig(
    domain="retail",
    business_key_column="customer_id",
    scd2_columns=["status", "address", "tier"],
)
result = SCD2FileDropSimulator(tables=gen_result.tables, config=cfg).run()
Methods:
run()

Execute the SCD2 file-drop simulation and return results.

StateDefinition dataclass

A single state in a business workflow.

Parameters:

Name Type Description Default
name str

Unique name for this state.

required
is_initial bool

Whether entities can start in this state.

False
is_terminal bool

Whether this state ends the workflow (no outgoing transitions).

False
metadata dict[str, Any]

Arbitrary key-value pairs attached to the state.

dict()

TransitionRule dataclass

A directed edge in the workflow state machine.

Parameters:

Name Type Description Default
from_state str

Source state name.

required
to_state str

Destination state name.

required
probability float

Relative weight for this transition (normalised per source state).

1.0
dwell_hours_mean float

Mean hours spent in from_state before this transition fires.

1.0
dwell_hours_std float

Standard deviation for the dwell-time normal distribution.

0.5
min_dwell_hours float

Hard floor on sampled dwell time.

0.1

WorkflowConfig dataclass

Configuration for :class:WorkflowSimulator.

Parameters:

Name Type Description Default
states list[StateDefinition]

List of state definitions for the workflow.

list()
transitions list[TransitionRule]

List of transition rules connecting states.

list()
entity_count int

Number of entities to simulate.

100
entity_prefix str

Prefix for generated entity IDs.

'entity'
start_time str

ISO-format start timestamp for the simulation.

'2024-01-01T00:00:00'
max_transitions_per_entity int

Safety limit to prevent infinite loops.

20
anomaly_enabled bool

Whether to inject anomalous transitions.

True
anomaly_skip_probability float

Chance of skipping a state in the workflow.

0.02
anomaly_stuck_probability float

Chance of an entity getting stuck (no further transitions).

0.01
anomaly_backward_probability float

Chance of transitioning backward to a previous state.

0.01
seed int

Random seed for reproducibility.

42

WorkflowResult dataclass

Result of :meth:WorkflowSimulator.run.

Attributes:

Name Type Description
events DataFrame

All transition events sorted by (entity_id, transitioned_at).

entity_summary DataFrame

Per-entity summary with final state and timing.

state_distribution dict[str, int]

Count of entities in each final state.

stats dict[str, Any]

Aggregate statistics dictionary.

WorkflowSimulator

Simulate business-process event streams using a state machine.

Entities are created in an initial state and transition through the workflow according to probabilistic :class:TransitionRule definitions until they reach a terminal state, get stuck (anomaly), or hit the per-entity transition safety limit.

Parameters:

Name Type Description Default
config WorkflowConfig

:class:WorkflowConfig describing the workflow graph and simulation parameters.

required

Example::

cfg = WorkflowConfig(states=states, transitions=transitions, entity_count=500)
sim = WorkflowSimulator(config=cfg)
result = sim.run()
result.events.head()
Methods:
run()

Execute the full simulation and return a :class:WorkflowResult.

StreamEmitConfig dataclass

Configuration for :class:StreamEmitter.

Parameters:

Name Type Description Default
rate_per_sec float

Base events-per-second target.

10.0
jitter_ms float

Maximum random jitter added between events (milliseconds).

0.0
burst_windows list[BurstWindow]

List of :class:BurstWindow definitions.

list()
out_of_order_probability float

Fraction of events to deliver out of order.

0.0
replay_enabled bool

Whether to re-emit events from a sliding window.

False
replay_window_minutes float

Length of the replay window.

5.0
replay_probability float

Per-tick probability of triggering a replay burst.

0.05
replay_burst_size int

Number of events re-emitted per replay trigger.

10
topics list[str]

List of topic names (one per table, or explicit mapping).

list()
envelope_source str

Value for the envelope source field.

'spindle'
envelope_schema_version str

Default schema version string.

'1.0'
sink_type str

Sink identifier — "console", "file", "eventhub", "eventstream".

'console'
sink_connection dict[str, Any]

Connection info dict (keys depend on sink_type).

dict()
max_events int | None

Stop after this many events (None = no limit).

None
realtime bool

Pace emission in real time (vs. fast-as-possible).

False
seed int

Random seed.

42

StreamEmitResult dataclass

Result of a :meth:StreamEmitter.emit run.

Attributes:

Name Type Description
events_sent int

Total primary events emitted.

replay_events_sent int

Events re-emitted via the replay window.

topics_used set[str]

Set of topic strings written to.

elapsed_seconds float

Wall-clock duration.

schema_versions dict[str, str]

Mapping of topic -> schema version used.

StreamEmitter

Emit generated tables as enveloped streaming events.

Builds on :class:~sqllocks_spindle.streaming.streamer.SpindleStreamer but adds:

  • Event envelopes — each event is wrapped in a CloudEvents-style envelope with id, time, topic, schema_version, and correlation_id.
  • Replay window — a sliding buffer of recently emitted events that can be re-sent to simulate consumer-side replays / redeliveries.
  • Schema version tracking — each topic carries a schema version string so downstream consumers can detect schema evolution.

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Pre-generated dict[table_name, DataFrame].

required
config StreamEmitConfig | None

:class:StreamEmitConfig.

None
sink StreamWriter | None

Optional explicit :class:StreamWriter. If None, one is built from config.sink_type / config.sink_connection.

None

Example::

from sqllocks_spindle.simulation import StreamEmitter, StreamEmitConfig

cfg = StreamEmitConfig(rate_per_sec=50, topics=["orders", "returns"])
result = StreamEmitter(tables=gen_result.tables, config=cfg).emit()
Methods:
emit(tables=None, config=None)

Emit all rows from tables (or the instance tables) as enveloped events.

Parameters:

Name Type Description Default
tables dict[str, DataFrame] | None

Override tables for this call.

None
config StreamEmitConfig | None

Override config for this call.

None

Returns:

Type Description
StreamEmitResult

class:StreamEmitResult with run statistics.

Functions:

get_preset_workflow(name)

Return (states, transitions) for a named preset workflow.

Available presets: order_fulfillment, support_ticket, employee_onboarding.

Raises:

Type Description
KeyError

If name is not a recognised preset.