Skip to content

sqllocks_spindle

sqllocks_spindle

Spindle by SQLLocks — Multi-domain synthetic data generator for Microsoft Fabric.

Classes

Spindle

Main entry point for Spindle data generation.

Methods:
estimate_memory(domain=None, schema=None, scale=None, scale_overrides=None)

Estimate RAM usage in bytes per table and total.

generate(domain=None, schema=None, scale=None, scale_overrides=None, seed=None, on_progress=None, enforce_correlations=True, fidelity_profile=None)

Generate synthetic data.

Parameters:

Name Type Description Default
domain

A Domain instance (e.g., RetailDomain()) with built-in schema.

None
schema str | Path | dict | SpindleSchema | None

Path to .spindle.json, raw dict, or parsed SpindleSchema.

None
scale str | None

Scale preset name (small, medium, large, xlarge).

None
scale_overrides dict[str, int] | None

Override row counts for specific tables.

None
seed int | None

Random seed for reproducibility.

None
on_progress Callable[[str, int, int], None] | None

Optional callback(table_name, tables_done, tables_total).

None
enforce_correlations bool

If True (default), apply GaussianCopula post-pass when correlated_columns metadata is present in the schema.

True
fidelity_profile

Optional DatasetProfile. When provided, returns a (GenerationResult, FidelityReport) tuple instead of GenerationResult.

None
generate_stream(domain=None, schema=None, scale=None, scale_overrides=None, seed=None, on_progress=None)

Generate synthetic data and yield each table as it completes.

Same signature as generate(). Yields (table_name, DataFrame) tuples in dependency order, allowing callers to write table N to a store while table N+1 is still being generated.

Example::

for table_name, df in spindle.generate_stream(domain=RetailDomain(), scale="medium"):
    writer.write(table_name, df)
describe(domain=None, schema=None)

Parse and return schema without generating data.

ChunkedSpindle

Generate billion-row datasets in bounded memory.

Uses a two-pass approach: 1. Parent tables generated fully in-memory (typically small). 2. Child tables generated in chunks of chunk_size rows.

Example::

cs = ChunkedSpindle()
result = cs.generate_chunked(
    domain=FinancialDomain(),
    scale="warehouse",
    chunk_size=1_000_000,
)

# Parent tables are immediately available
for name, df in result.parent_tables.items():
    print(f"{name}: {len(df)} rows")

# Child tables stream via iterator
for table_name in result.child_table_names:
    for chunk in result.iter_chunks(table_name):
        writer.write(chunk)
Methods:
generate_chunked(domain=None, schema=None, scale=None, scale_overrides=None, seed=None, chunk_size=1000000, target_table=None, target_count=None)

Generate data with chunked child tables.

Parameters:

Name Type Description Default
domain

A Domain instance.

None
schema Any

Path to .spindle.json, raw dict, or parsed SpindleSchema.

None
scale str | None

Scale preset name.

None
scale_overrides dict[str, int] | None

Override row counts for specific tables.

None
seed int | None

Random seed for reproducibility.

None
chunk_size int

Rows per chunk for child tables.

1000000
target_table str | None

Anchor table name — derive all other table counts proportionally from this table's target_count.

None
target_count int | None

Number of rows for the anchor table. Required when target_table is provided.

None

Returns:

Type Description
ChunkedGenerationResult

ChunkedGenerationResult with parent tables materialized and

ChunkedGenerationResult

child tables available via iter_chunks().

ChunkedGenerationResult dataclass

Result of a chunked generation run.

Parent tables are fully materialized (small). Child tables are available only via iter_chunks() to keep memory bounded.

Methods:
iter_chunks(table_name)

Yield DataFrames of chunk_size rows for a child table.

Must be called in dependency order. Each table can only be iterated once.

write_with(writer, **kwargs)

Convenience: write parent tables, then stream child chunks through a writer.

The writer must implement either
  • write_table(table_name, df, **kwargs) for individual DataFrames, or
  • stage_chunk(table_name, chunk_df, idx) + copy_into(table_name) for bulk writers.

MultiWriter

Write generated data to multiple Fabric stores concurrently.

Fans out to all configured writers using ThreadPoolExecutor. Each writer runs concurrently. Partial failures are captured without aborting other stores.

Example::

from sqllocks_spindle.fabric import (
    EventhouseWriter, WarehouseBulkWriter,
    LakehouseFilesWriter, MultiWriter,
)

mw = MultiWriter(
    eventhouse=EventhouseWriter(...),
    lakehouse=LakehouseFilesWriter(...),
)
result = mw.write(tables)
print(result.summary())
Methods:
write(tables, **kwargs)

Write tables to all configured stores concurrently.

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Mapping of table_name -> DataFrame.

required
**kwargs Any

Extra args forwarded to each writer's write_all().

{}

Returns:

Type Description
MultiWriteResult

MultiWriteResult aggregating per-store results.

MultiWriteResult dataclass

Aggregated result from writing to multiple stores.

LakehouseWriteResult dataclass

Result of a Lakehouse write_all() operation.

CdmEntityMap

Optional mapping from source table names to CDM entity names.

If a table is not in the map, it defaults to PascalCase of the table name.

CdmMapper

Export tables as a Microsoft CDM folder (model.json + data files).

The output folder structure is compatible with Fabric CDM connectors, Dataverse, Power Platform, and Azure Data Lake Storage CDM folders.

Usage::

from sqllocks_spindle.transform import CdmMapper, CdmEntityMap

mapper = CdmMapper()
mapper.write_cdm_folder(
    tables=result.tables,
    output_dir="./cdm",
    domain_name="SpindleRetail",
    entity_map=CdmEntityMap({"customer": "Contact", "order": "SalesOrder"}),
)
Methods:
write_cdm_folder(tables, output_dir, domain_name='SpindleOutput', entity_map=None, fmt='csv')

Write a CDM folder to disk.

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Dict of table_name → DataFrame.

required
output_dir str | Path

Root directory for CDM folder output.

required
domain_name str

The CDM model name (appears in model.json).

'SpindleOutput'
entity_map CdmEntityMap | None

Optional mapping of table names to CDM entity names.

None
fmt str

Data file format — "csv" (default) or "parquet".

'csv'

Returns:

Type Description
list[Path]

List of written file paths.

to_model_json(tables, domain_name='SpindleOutput', entity_map=None, fmt='csv')

Generate a model.json manifest dict without writing to disk.

Useful for in-memory CDM metadata generation or Fabric notebook use.

DimSpec dataclass

Specification for building one dimension table.

FactSpec dataclass

Specification for building one fact table.

StarSchemaMap

Describes how to transform a domain result into a star schema.

StarSchemaResult dataclass

Result of a star schema transform.

Methods:
all_tables()

Return all tables (dimensions + date_dim + facts) as a flat dict.

StarSchemaTransform

Transform a 3NF GenerationResult into a star schema using a StarSchemaMap.

Methods:
transform(tables, schema_map)

Apply the star schema transform.

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Dict of table_name → DataFrame (from GenerationResult.tables).

required
schema_map StarSchemaMap

Mapping spec defining dims and facts.

required

Returns:

Type Description
StarSchemaResult

StarSchemaResult with dimensions, facts, and date_dim.

Anomaly

Bases: ABC

Base class for anomaly definitions.

Attributes
anomaly_type abstractmethod property

Short label used in the _spindle_anomaly_type column.

fraction abstractmethod property

Fraction of eligible rows (or groups) to mark as anomalous.

Methods:
inject(df, rng) abstractmethod

Inject anomalies into df and return a modified copy.

AnomalyRegistry

Registry of anomaly definitions applied during streaming.

Example::

registry = AnomalyRegistry([
    PointAnomaly("extreme_total", column="total_amount"),
    ContextualAnomaly(
        "winter_in_summer",
        column="product_category",
        condition_column="order_month",
        normal_values=[6, 7, 8],
        anomalous_values=["Winter Coats", "Heavy Jackets"],
    ),
])

# Apply to any DataFrame before streaming
df_labelled = registry.inject(df, rng)
Methods:
add(anomaly)

Register an anomaly and return self for chaining.

inject(df, rng)

Apply all registered anomalies to df and return a labelled copy.

Label columns _spindle_is_anomaly and _spindle_anomaly_type are always present in the returned DataFrame, even when the registry is empty.

BurstWindow dataclass

A time window where event rate is multiplied by a factor.

Parameters:

Name Type Description Default
start_offset_seconds float

Seconds from stream start when the burst begins.

required
duration_seconds float

How long the burst lasts.

required
multiplier float

Rate multiplier during the burst (e.g. 10.0 = 10x normal rate).

required

Example::

# 10x rate burst starting at t=30s, lasting 60 seconds
BurstWindow(start_offset_seconds=30, duration_seconds=60, multiplier=10.0)
Methods:
is_active(elapsed)

Return True if the burst window is active at the given elapsed time.

CollectiveAnomaly dataclass

Bases: Anomaly

A sequence of rows that is abnormal together.

Example: 47 orders from the same customer within 10 minutes (velocity fraud). All rows in the affected group have their timestamp compressed into a short window and are labelled as anomalous.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
group_column str

Column whose distinct values define groups (e.g. customer_id).

required
timestamp_column str

Datetime column to compress into a short window.

required
window_seconds float

Duration of the burst window (default 600 s = 10 min).

600.0
fraction float

Fraction of groups to corrupt (default 0.005).

0.005

ConsoleSink

Bases: StreamWriter

Prints events as JSON Lines to stdout.

Useful for local development and quick debugging. No external dependencies.

Parameters:

Name Type Description Default
indent int | None

JSON indent level (None for compact single-line output, 2 for pretty-printed).

None
prefix str

Optional string prepended before each JSON line.

''
file

Output file object (defaults to sys.stdout). Injectable for testing.

None

ContextualAnomaly dataclass

Bases: Anomaly

Normal value placed in the wrong context.

Example: a winter coat sold in July, or a NULL shipping address on a delivered order.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
column str

Column whose value will be replaced with an anomalous value.

required
condition_column str

Column used to identify "normal" rows to corrupt.

required
normal_values list[Any]

Values of condition_column that make a row eligible.

required
anomalous_values list[Any]

Replacement values to write into column.

required
fraction float

Fraction of eligible rows to corrupt (default 0.01).

0.01

FileSink

Bases: StreamWriter

Writes events as JSON Lines (one JSON object per line) to a file.

Parameters:

Name Type Description Default
path str | Path

Output file path. Parent directories are created automatically.

required
mode str

File open mode — "a" (append, default) or "w" (overwrite).

'a'

PointAnomaly dataclass

Bases: Anomaly

Single-row anomaly: a column value that is far outside the normal range.

Example: an order total of $99,999 when the average is $50.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
column str

Column to modify.

required
multiplier_range tuple[float, float]

(min, max) multiplier applied to the column mean to produce the anomalous value.

(10.0, 100.0)
fraction float

Fraction of rows to mark (default 0.01 = 1%).

0.01

SpindleStreamer

Stream generated synthetic data row-by-row through a :class:StreamWriter.

Workflow
  1. Generate a full batch via the Spindle engine (or accept pre-generated tables).
  2. Convert each row to an event dict, adding _spindle_table and _spindle_seq metadata fields.
  3. Optionally inject anomalies via an :class:~sqllocks_spindle.streaming.anomaly.AnomalyRegistry.
  4. Optionally reorder a fraction of events to simulate out-of-order arrival.
  5. Emit in batches through the configured :class:StreamWriter sink. Rate-limiting (token bucket + Poisson inter-arrivals) is applied only when StreamConfig(realtime=True).

Parameters:

Name Type Description Default
domain

A Domain instance (e.g. RetailDomain()). Mutually exclusive with tables.

None
tables dict[str, DataFrame] | None

Pre-generated dict[table_name, DataFrame] — skips generation.

None
sink StreamWriter | None

:class:StreamWriter to emit events through. Defaults to :class:~sqllocks_spindle.streaming.sinks.console_sink.ConsoleSink.

None
config StreamConfig | None

:class:StreamConfig controlling rate, bursts, OOO, etc.

None
anomaly_registry

Optional :class:~sqllocks_spindle.streaming.anomaly.AnomalyRegistry.

None
scale str

Scale preset for generation (ignored when tables is provided).

'small'
seed int | None

Random seed.

42

Example::

from sqllocks_spindle import RetailDomain
from sqllocks_spindle.streaming import SpindleStreamer, StreamConfig, FileSink

result = SpindleStreamer(
    domain=RetailDomain(),
    sink=FileSink("events.jsonl", mode="w"),
    config=StreamConfig(max_events=500),
).stream("order")

print(result)
Methods:
stream(table, timestamp_column=None)

Stream all rows of table through the configured sink.

Parameters:

Name Type Description Default
table str

Name of the table to stream.

required
timestamp_column str | None

Column to use as _spindle_event_time. If None, the first datetime-typed column is auto-detected.

None

Returns:

Type Description
StreamResult

class:StreamResult with run statistics.

stream_all(timestamp_column=None)

Stream every table in generation order.

Returns:

Type Description
list[StreamResult]

List of :class:StreamResult, one per table.

StreamConfig dataclass

Configuration for a Spindle streaming run.

Parameters:

Name Type Description Default
events_per_second float

Base target throughput.

10.0
duration_seconds float | None

Stop after this many wall-clock seconds (None = no limit).

None
max_events int | None

Stop after this many events (None = no limit).

None
out_of_order_fraction float

Fraction of events to reorder (0.0–1.0).

0.0
out_of_order_max_delay_slots int

Maximum slot positions an OOO event is delayed.

10
burst_windows list[BurstWindow]

List of :class:BurstWindow definitions.

list()
time_pattern TimePattern | None

Optional :class:TimePattern for time-of-day multipliers.

None
label_anomalies bool

Keep _spindle_is_anomaly / _spindle_anomaly_type columns in emitted events (default True).

True
batch_size int

Events per :meth:StreamWriter.send_batch call.

100
realtime bool

If True, rate-limit emissions using a token bucket + Poisson inter-arrival times. If False (default), emit as fast as possible (useful for bulk loading and unit tests).

False
Methods:
get_rate_multiplier(elapsed, wall_hour=0, wall_dow=0)

Return the composite rate multiplier at a given moment.

Parameters:

Name Type Description Default
elapsed float

Seconds since stream start.

required
wall_hour int

Current wall-clock hour (0–23).

0
wall_dow int

Current wall-clock day-of-week (0=Monday).

0

StreamResult dataclass

Result of a streaming run.

Attributes:

Name Type Description
table str

Name of the streamed table.

events_sent int

Total events transmitted to the sink.

anomaly_count int

Number of events flagged as anomalous.

out_of_order_count int

Number of events deliberately reordered.

elapsed_seconds float

Wall-clock duration of the streaming run.

events_per_second_actual float

Measured throughput.

StreamWriter

Bases: ABC

Abstract base for all streaming sinks.

Concrete implementations provide the actual transport layer: :class:~sqllocks_spindle.streaming.sinks.console_sink.ConsoleSink, :class:~sqllocks_spindle.streaming.sinks.file_sink.FileSink, :class:~sqllocks_spindle.streaming.sinks.eventhub_sink.EventHubSink, :class:~sqllocks_spindle.streaming.sinks.kafka_sink.KafkaSink.

Subclasses must implement :meth:send_batch. :meth:send has a default implementation that wraps a single event in a list.

Methods:
send(event)

Send a single event.

send_batch(events) abstractmethod

Send a batch of events.

Parameters:

Name Type Description Default
events list[dict[str, Any]]

List of event dicts to transmit.

required
close()

Close any open connections or file handles (no-op by default).

TimePattern dataclass

Time-of-day and day-of-week rate multipliers.

Applied on top of the base rate when simulating real-world temporal patterns. E.g., e-commerce has high traffic 7pm–10pm and lower traffic at 3am.

Parameters:

Name Type Description Default
hour_multipliers dict[int, float]

Mapping of hour (0–23) to multiplier.

dict()
dow_multipliers dict[int, float]

Mapping of day-of-week (0=Monday … 6=Sunday) to multiplier.

dict()
Methods:
get_multiplier(hour, dow)

Return the composite multiplier for a given hour and day-of-week.

business_hours() classmethod

Higher traffic 8am–6pm weekdays, very low on weekends.

retail_peak() classmethod

E-commerce pattern: evenings peak, weekends elevated.

StreamingMultiWriter

Stream generated data to multiple sinks concurrently.

For each (table_name, DataFrame) batch yielded by generate_stream(), converts rows to event dicts and fans out to all registered sinks in parallel via ThreadPoolExecutor.

Usage::

from sqllocks_spindle.streaming import ConsoleSink, FileSink, StreamingMultiWriter

smw = StreamingMultiWriter(
    console=ConsoleSink(),
    file=FileSink("events.jsonl"),
)
result = smw.stream(spindle.generate_stream(domain=domain, scale="small", seed=42))
print(result.summary())

Parameters:

Name Type Description Default
max_workers int | None

Number of parallel threads per batch (default: number of sinks).

None
batch_size int

Max rows per send_batch call (default: 100).

100
stop_on_sink_error bool

If True, abort streaming when any sink errors. Default False — partial failures are captured and streaming continues.

False
**sinks StreamWriter

Keyword arguments of sink_name=StreamWriter pairs.

{}
Methods:
add_sink(name, sink)

Add a sink and return self for chaining.

remove_sink(name)

Remove a sink by name.

stream(generator)

Stream (table_name, DataFrame) batches to all sinks in parallel.

Parameters:

Name Type Description Default
generator Iterator[tuple[str, DataFrame]]

Iterator yielding (table_name, DataFrame) tuples. Typically Spindle.generate_stream().

required

Returns:

Type Description
StreamingMultiWriteResult

StreamingMultiWriteResult with per-sink stats.

stream_table(table_name, df)

Stream a single DataFrame to all sinks. Returns {sink_name: success}.

StreamingMultiWriteResult dataclass

Aggregated result from streaming to multiple sinks.

SinkResult dataclass

Per-sink streaming result.

ContinueEngine

Generate incremental deltas (inserts, updates, deletes) from existing data.

Methods:
continue_from(existing, schema=None, config=None)

Generate incremental changes from existing data.

Parameters

existing: Either a GenerationResult (from Spindle.generate()) or a plain dict[str, pd.DataFrame]. schema: Optional SpindleSchema. If existing is a GenerationResult the schema is extracted automatically. config: ContinueConfig controlling insert/update/delete volumes and state-transition rules. Defaults are used when None.

ContinueConfig dataclass

Configuration for incremental generation.

Attributes:

Name Type Description
insert_count int

Number of new rows to generate per anchor table.

update_fraction float

Fraction of existing rows to update (0.0 - 1.0).

delete_fraction float

Fraction of existing rows to soft-delete (0.0 - 1.0).

state_transitions dict[str, dict[str, dict[str, float]]]

Per-column Markov transition probabilities. Format: {"table.column": {"current_state": {"next_state": probability}}} Example::

{"order.status": {
    "pending": {"shipped": 0.7, "cancelled": 0.3},
    "shipped": {"delivered": 0.9, "returned": 0.1},
}}
timestamp_column str

Name of the delta-timestamp metadata column.

delta_type_column str

Name of the delta-type metadata column.

seed int | None

Optional random seed for reproducibility.

DeltaResult dataclass

Result of incremental generation.

TimeTravelEngine

Generate monthly point-in-time snapshots showing data evolution.

Methods:
generate(domain, config=None, scale='small')

Generate monthly snapshots of a domain's data.

Month 0 is the initial dataset. Each subsequent month applies: - New entity growth (inserts) at growth_rate * seasonality multiplier - Status transitions (updates) at update_fraction rate - Churn (soft deletes) at churn_rate

Returns a TimeTravelResult with N+1 snapshots (month 0 through month N).

TimeTravelConfig dataclass

Configuration for time-travel snapshot generation.

TimeTravelResult dataclass

Result of time-travel generation.

Methods:
get_snapshot(month)

Get snapshot by month index.

to_partitioned_dfs()

Return all snapshots combined with a _snapshot_date column.

DataMasker

Replace PII in real data with synthetic values preserving distributions.

Methods:
mask(tables, config=None)

Mask PII columns across all tables.

Parameters

tables: Mapping of table name to DataFrame. config: Optional masking configuration. Defaults are sensible.

Returns

MaskResult with masked DataFrames and statistics.

DataProfiler

Analyse one or more DataFrames and produce profiles.

Methods:
profile_dataframe(df, table_name='table')

Profile a single DataFrame.

profile_dataset(tables)

Profile a dict of DataFrames and detect cross-table relationships.

profile(df, table_name='table')

Alias for profile_dataframe(). Profile a single DataFrame.

from_csv(path, table_name=None, sample_rows=None, **kwargs) classmethod

Profile a CSV file.

Parameters:

Name Type Description Default
path str | Path

Path to the CSV file.

required
table_name str | None

Name for the table profile. Defaults to the filename stem.

None
sample_rows int | None

If set, sample this many rows before profiling.

None
**kwargs

Passed to DataProfiler constructor (fit_threshold, top_n_values, etc.).

{}

ExportedProfile dataclass

A portable profile that can be imported into any domain.

Attributes:

Name Type Description
name str

Profile identifier (e.g. "default", "high_volume").

description str

Human-readable description of what this profile represents.

source_domain str

Name of the domain this profile was exported from (or "inferred" when created via :meth:ProfileIO.from_dataframe).

distributions dict[str, dict[str, float]]

Mapping of "table.column" keys to value→weight dicts.

ratios dict[str, float]

Mapping of ratio names to float multipliers.

metadata dict[str, Any]

Arbitrary extra information (row counts, column types, etc.).

LakehouseProfiler

Profile Fabric Lakehouse Delta tables and return TableProfile objects.

Parameters:

Name Type Description Default
workspace_id str

Fabric workspace GUID.

required
lakehouse_id str

Fabric lakehouse GUID.

required
token_provider Any | None

A callable returning an Azure access token string. Defaults to DefaultAzureCredential when azure-identity is installed.

None
default_sample_rows int | None

Row limit for profiling. Pass None to scan entire table.

100000
Methods:
profile_table(table_name, sample_rows='default')

Profile a single Delta table.

profile_all(sample_rows='default')

Profile all tables in the lakehouse.

detect_foreign_keys(table_names=None, overlap_threshold=0.9, sample_rows='default', full_scan=False)

Sampled cross-table FK detection (advisory). ADR-009 / STORY-016.

Reads each table's columns (sampled by default) and runs the proven DataProfiler._detect_foreign_keys_advisory core (naming *_id plus value-overlap >= overlap_threshold) across every table pair. Detected FKs are advisory and reported with the measured overlap; a declared star_map / RelationshipDef remains authoritative and overrides (resolved by the caller, not here).

Parameters:

Name Type Description Default
table_names list[str] | None

Tables to scan. Defaults to all tables in the lakehouse.

None
overlap_threshold float

Minimum child-to-parent value overlap to report a FK (default 0.9, configurable per ADR-009).

0.9
sample_rows int | None | str

Per-table row cap used when reading key columns. "default" uses self.default_sample_rows; None reads the full table. Ignored when full_scan=True.

'default'
full_scan bool

Read entire tables (no sampling) to confirm a sampled result (ADR-009 full-scan option).

False

Returns:

Type Description
dict[str, dict[str, dict[str, Any]]]

``{child_table: {col_name: {"parent_table": str, "overlap": float,

dict[str, dict[str, dict[str, Any]]]

"advisory": True, "full_scan": bool}}}`` for every detected FK.

reconcile_declared_foreign_keys(detected, declared) staticmethod

Declared FKs override detected advisory FKs (ADR-009 / STORY-017).

A declared star_map / RelationshipDef is AUTHORITATIVE: where a declaration exists for a (child_table, child_col) it wins over any detected FK, even a high-overlap one. Detected FKs that a declaration overrode are REPORTED (not silently dropped) for transparency.

Parameters:

Name Type Description Default
detected dict[str, dict[str, dict[str, Any]]]

the output of :meth:detect_foreign_keys.

required
declared Any

iterable of (child_table, child_col, parent_table) tuples, or dicts with those keys.

required

Returns:

Type Description
dict[str, Any]

{"foreign_keys": <resolved map>, "overridden": [<reports>]}.

dict[str, Any]

Resolved declared entries carry advisory=False, declared=True.

MaskConfig dataclass

Configuration for data masking.

ProfileIO

Export, import, and list domain profiles.

All public methods are stateless — no configuration is stored on the instance. Instantiate with ProfileIO() and call methods directly.

Example::

io = ProfileIO()
io.export_profile(RetailDomain(), Path("retail_profile.json"))
io.import_profile(Path("retail_profile.json"), HealthcareDomain(), save_as="from_retail")
io.list_profiles(RetailDomain())
Methods:
export_profile(domain, output_path, profile_name='default')

Export a domain's active profile to a standalone JSON file.

Parameters:

Name Type Description Default
domain Any

A :class:~sqllocks_spindle.domains.base.Domain instance whose _profile dict will be serialised.

required
output_path str | Path

Destination file path (created if it does not exist).

required
profile_name str

Label stored in the exported metadata.

'default'

Returns:

Type Description
Path

The resolved :class:Path the profile was written to.

import_profile(profile_path, target_domain, save_as=None)

Import an exported profile into a target domain's profiles/ directory.

The imported file is converted to the standard domain profile format (i.e. metadata is stripped; only name, description, distributions, and ratios are kept).

Parameters:

Name Type Description Default
profile_path str | Path

Path to an exported profile JSON file.

required
target_domain Any

The domain instance to import into.

required
save_as str | None

Override the profile name (and filename). When None the name is taken from the file's "name" field.

None

Returns:

Type Description
str

The name the profile was saved as.

list_profiles(domain)

List all profiles available for a domain.

Parameters:

Name Type Description Default
domain Any

A :class:~sqllocks_spindle.domains.base.Domain instance.

required

Returns:

Type Description
list[dict[str, str | int]]

A list of dicts with keys name, description,

list[dict[str, str | int]]

distributions (count), and ratios (count).

from_dataframe(df, table_name='table', name='inferred')

Create a profile by inferring distributions from a DataFrame.

Categorical columns (object dtype or low cardinality) are converted into normalised distribution weights. High-cardinality columns are skipped.

Parameters:

Name Type Description Default
df DataFrame

The source DataFrame.

required
table_name str

Prefix for distribution keys ("table_name.column").

'table'
name str

Name to assign to the resulting profile.

'inferred'

Returns:

Name Type Description
An ExportedProfile

class:ExportedProfile ready to be serialised or imported.

SchemaBuilder

Convert a DatasetProfile into a SpindleSchema.

Methods:
build(profile, domain_name='inferred', fit_threshold=0.8, correlation_threshold=0.5, include_anomaly_registry=False)

Build a complete SpindleSchema from a dataset profile.

ProfileRegistry

Manages named, tagged profiles under a configurable root directory.

Directory layout::

<root>/
  <system>/
    <table>/
      <profile_name>.json
  _index.json          ← auto-maintained index
Methods:
save(profile)

Save a profile to disk and update the index.

load(identity)

Load a profile by identity (system/table/name).

delete(identity)

Delete a profile from disk and index.

list_all()

Return all index entries sorted by identity.

search(query=None, system=None, table=None, tags=None)

Filter index entries by query string, system, table, and/or tags.

add_tags(identity, tags)

Add tags to a profile (in-place, no duplicates).

remove_tags(identity, tags)

Remove tags from a profile.

import_from_dir(source_dir, overwrite=False)

Import all *.json profile files from a directory tree.

Returns a list of imported identity strings.

diff(identity_a, identity_b)

Compare two profiles column by column.

Returns a dict with keys: added, removed, changed.

reindex()

Rebuild _index.json from all .json files on disk. Returns count.

save_from_dataset_profile(dataset_profile, system, name, tags=None, description='', config=None)

Convert a DatasetProfile into registry profiles via the SafeProfile mapper.

STORY-014 (ADR-001): the per-column stats are now built through SafeProfile.from_dataset_profile (the canonical safe-and-correct mapper), NOT the old hand-read of non-existent .min/.max/ .top_values attributes (the B2 attribute-mismatch bug). So registry profiles carry the SAFE statistic set (dtype/null_rate/cardinality/mean/ std/quantiles/bounds/categorical_weights/categorical_histogram), with no raw values. The RegistryProfile wrapper (system/table/name/tags/ description/source_rows) is unchanged, so the registry read side (load/diff/tag/reindex) is unaffected; no on-disk format break, no sidecar.

Legacy registry files (old min/max/top_values columns) still load as-is. config is forwarded to the SafeProfile mapper (e.g. k, sensitive).

One RegistryProfile is created per table. Returns the saved profiles.

validate(identity, result, sample_rows=500)

Compare a GenerationResult against a stored profile.

Reconstructs an approximate reference DataFrame from stored column statistics and runs FidelityComparator against the new generation. Returns a FidelityReport. Requires scipy.

RegistryProfile dataclass

A named, tagged data profile stored in the registry.

Identity format: <system>/<table>/<profile_name> e.g. salesforce/customer/prod-2026Q2

Attributes
identity property

Fully-qualified profile identity: system/table/name.

Functions:

get_preset(name)

Get a built-in preset by name.

list_presets()

List all built-in presets.