sqllocks_spindle
sqllocks_spindle
¶
Spindle by SQLLocks — Multi-domain synthetic data generator for Microsoft Fabric.
Classes¶
Spindle
¶
Main entry point for Spindle data generation.
Methods:¶
estimate_memory(domain=None, schema=None, scale=None, scale_overrides=None)
¶
Estimate RAM usage in bytes per table and total.
generate(domain=None, schema=None, scale=None, scale_overrides=None, seed=None, on_progress=None, enforce_correlations=True, fidelity_profile=None)
¶
Generate synthetic data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
A Domain instance (e.g., RetailDomain()) with built-in schema. |
None
|
|
schema
|
str | Path | dict | SpindleSchema | None
|
Path to .spindle.json, raw dict, or parsed SpindleSchema. |
None
|
scale
|
str | None
|
Scale preset name (small, medium, large, xlarge). |
None
|
scale_overrides
|
dict[str, int] | None
|
Override row counts for specific tables. |
None
|
seed
|
int | None
|
Random seed for reproducibility. |
None
|
on_progress
|
Callable[[str, int, int], None] | None
|
Optional callback(table_name, tables_done, tables_total). |
None
|
enforce_correlations
|
bool
|
If True (default), apply GaussianCopula post-pass when correlated_columns metadata is present in the schema. |
True
|
fidelity_profile
|
Optional DatasetProfile. When provided, returns a (GenerationResult, FidelityReport) tuple instead of GenerationResult. |
None
|
generate_stream(domain=None, schema=None, scale=None, scale_overrides=None, seed=None, on_progress=None)
¶
Generate synthetic data and yield each table as it completes.
Same signature as generate(). Yields (table_name, DataFrame)
tuples in dependency order, allowing callers to write table N to a
store while table N+1 is still being generated.
Example::
for table_name, df in spindle.generate_stream(domain=RetailDomain(), scale="medium"):
writer.write(table_name, df)
describe(domain=None, schema=None)
¶
Parse and return schema without generating data.
ChunkedSpindle
¶
Generate billion-row datasets in bounded memory.
Uses a two-pass approach:
1. Parent tables generated fully in-memory (typically small).
2. Child tables generated in chunks of chunk_size rows.
Example::
cs = ChunkedSpindle()
result = cs.generate_chunked(
domain=FinancialDomain(),
scale="warehouse",
chunk_size=1_000_000,
)
# Parent tables are immediately available
for name, df in result.parent_tables.items():
print(f"{name}: {len(df)} rows")
# Child tables stream via iterator
for table_name in result.child_table_names:
for chunk in result.iter_chunks(table_name):
writer.write(chunk)
Methods:¶
generate_chunked(domain=None, schema=None, scale=None, scale_overrides=None, seed=None, chunk_size=1000000, target_table=None, target_count=None)
¶
Generate data with chunked child tables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
A Domain instance. |
None
|
|
schema
|
Any
|
Path to .spindle.json, raw dict, or parsed SpindleSchema. |
None
|
scale
|
str | None
|
Scale preset name. |
None
|
scale_overrides
|
dict[str, int] | None
|
Override row counts for specific tables. |
None
|
seed
|
int | None
|
Random seed for reproducibility. |
None
|
chunk_size
|
int
|
Rows per chunk for child tables. |
1000000
|
target_table
|
str | None
|
Anchor table name — derive all other table counts proportionally from this table's target_count. |
None
|
target_count
|
int | None
|
Number of rows for the anchor table. Required when target_table is provided. |
None
|
Returns:
| Type | Description |
|---|---|
ChunkedGenerationResult
|
ChunkedGenerationResult with parent tables materialized and |
ChunkedGenerationResult
|
child tables available via iter_chunks(). |
ChunkedGenerationResult
dataclass
¶
Result of a chunked generation run.
Parent tables are fully materialized (small). Child tables are available
only via iter_chunks() to keep memory bounded.
Methods:¶
iter_chunks(table_name)
¶
Yield DataFrames of chunk_size rows for a child table.
Must be called in dependency order. Each table can only be iterated once.
write_with(writer, **kwargs)
¶
Convenience: write parent tables, then stream child chunks through a writer.
The writer must implement either
write_table(table_name, df, **kwargs)for individual DataFrames, orstage_chunk(table_name, chunk_df, idx)+copy_into(table_name)for bulk writers.
MultiWriter
¶
Write generated data to multiple Fabric stores concurrently.
Fans out to all configured writers using ThreadPoolExecutor. Each writer runs concurrently. Partial failures are captured without aborting other stores.
Example::
from sqllocks_spindle.fabric import (
EventhouseWriter, WarehouseBulkWriter,
LakehouseFilesWriter, MultiWriter,
)
mw = MultiWriter(
eventhouse=EventhouseWriter(...),
lakehouse=LakehouseFilesWriter(...),
)
result = mw.write(tables)
print(result.summary())
Methods:¶
write(tables, **kwargs)
¶
Write tables to all configured stores concurrently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tables
|
dict[str, DataFrame]
|
Mapping of table_name -> DataFrame. |
required |
**kwargs
|
Any
|
Extra args forwarded to each writer's write_all(). |
{}
|
Returns:
| Type | Description |
|---|---|
MultiWriteResult
|
MultiWriteResult aggregating per-store results. |
MultiWriteResult
dataclass
¶
Aggregated result from writing to multiple stores.
LakehouseWriteResult
dataclass
¶
Result of a Lakehouse write_all() operation.
CdmEntityMap
¶
Optional mapping from source table names to CDM entity names.
If a table is not in the map, it defaults to PascalCase of the table name.
CdmMapper
¶
Export tables as a Microsoft CDM folder (model.json + data files).
The output folder structure is compatible with Fabric CDM connectors, Dataverse, Power Platform, and Azure Data Lake Storage CDM folders.
Usage::
from sqllocks_spindle.transform import CdmMapper, CdmEntityMap
mapper = CdmMapper()
mapper.write_cdm_folder(
tables=result.tables,
output_dir="./cdm",
domain_name="SpindleRetail",
entity_map=CdmEntityMap({"customer": "Contact", "order": "SalesOrder"}),
)
Methods:¶
write_cdm_folder(tables, output_dir, domain_name='SpindleOutput', entity_map=None, fmt='csv')
¶
Write a CDM folder to disk.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tables
|
dict[str, DataFrame]
|
Dict of table_name → DataFrame. |
required |
output_dir
|
str | Path
|
Root directory for CDM folder output. |
required |
domain_name
|
str
|
The CDM model name (appears in model.json). |
'SpindleOutput'
|
entity_map
|
CdmEntityMap | None
|
Optional mapping of table names to CDM entity names. |
None
|
fmt
|
str
|
Data file format — "csv" (default) or "parquet". |
'csv'
|
Returns:
| Type | Description |
|---|---|
list[Path]
|
List of written file paths. |
to_model_json(tables, domain_name='SpindleOutput', entity_map=None, fmt='csv')
¶
Generate a model.json manifest dict without writing to disk.
Useful for in-memory CDM metadata generation or Fabric notebook use.
DimSpec
dataclass
¶
Specification for building one dimension table.
FactSpec
dataclass
¶
Specification for building one fact table.
StarSchemaMap
¶
Describes how to transform a domain result into a star schema.
StarSchemaResult
dataclass
¶
StarSchemaTransform
¶
Transform a 3NF GenerationResult into a star schema using a StarSchemaMap.
Methods:¶
transform(tables, schema_map)
¶
Apply the star schema transform.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tables
|
dict[str, DataFrame]
|
Dict of table_name → DataFrame (from GenerationResult.tables). |
required |
schema_map
|
StarSchemaMap
|
Mapping spec defining dims and facts. |
required |
Returns:
| Type | Description |
|---|---|
StarSchemaResult
|
StarSchemaResult with dimensions, facts, and date_dim. |
Anomaly
¶
Bases: ABC
Base class for anomaly definitions.
AnomalyRegistry
¶
Registry of anomaly definitions applied during streaming.
Example::
registry = AnomalyRegistry([
PointAnomaly("extreme_total", column="total_amount"),
ContextualAnomaly(
"winter_in_summer",
column="product_category",
condition_column="order_month",
normal_values=[6, 7, 8],
anomalous_values=["Winter Coats", "Heavy Jackets"],
),
])
# Apply to any DataFrame before streaming
df_labelled = registry.inject(df, rng)
BurstWindow
dataclass
¶
A time window where event rate is multiplied by a factor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_offset_seconds
|
float
|
Seconds from stream start when the burst begins. |
required |
duration_seconds
|
float
|
How long the burst lasts. |
required |
multiplier
|
float
|
Rate multiplier during the burst (e.g. 10.0 = 10x normal rate). |
required |
Example::
# 10x rate burst starting at t=30s, lasting 60 seconds
BurstWindow(start_offset_seconds=30, duration_seconds=60, multiplier=10.0)
CollectiveAnomaly
dataclass
¶
Bases: Anomaly
A sequence of rows that is abnormal together.
Example: 47 orders from the same customer within 10 minutes (velocity fraud). All rows in the affected group have their timestamp compressed into a short window and are labelled as anomalous.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
group_column
|
str
|
Column whose distinct values define groups (e.g. customer_id). |
required |
timestamp_column
|
str
|
Datetime column to compress into a short window. |
required |
window_seconds
|
float
|
Duration of the burst window (default 600 s = 10 min). |
600.0
|
fraction
|
float
|
Fraction of groups to corrupt (default 0.005). |
0.005
|
ConsoleSink
¶
Bases: StreamWriter
Prints events as JSON Lines to stdout.
Useful for local development and quick debugging. No external dependencies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
indent
|
int | None
|
JSON indent level ( |
None
|
prefix
|
str
|
Optional string prepended before each JSON line. |
''
|
file
|
Output file object (defaults to |
None
|
ContextualAnomaly
dataclass
¶
Bases: Anomaly
Normal value placed in the wrong context.
Example: a winter coat sold in July, or a NULL shipping address on a delivered order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
column
|
str
|
Column whose value will be replaced with an anomalous value. |
required |
condition_column
|
str
|
Column used to identify "normal" rows to corrupt. |
required |
normal_values
|
list[Any]
|
Values of |
required |
anomalous_values
|
list[Any]
|
Replacement values to write into |
required |
fraction
|
float
|
Fraction of eligible rows to corrupt (default 0.01). |
0.01
|
FileSink
¶
Bases: StreamWriter
Writes events as JSON Lines (one JSON object per line) to a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Output file path. Parent directories are created automatically. |
required |
mode
|
str
|
File open mode — |
'a'
|
PointAnomaly
dataclass
¶
Bases: Anomaly
Single-row anomaly: a column value that is far outside the normal range.
Example: an order total of $99,999 when the average is $50.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
column
|
str
|
Column to modify. |
required |
multiplier_range
|
tuple[float, float]
|
|
(10.0, 100.0)
|
fraction
|
float
|
Fraction of rows to mark (default 0.01 = 1%). |
0.01
|
SpindleStreamer
¶
Stream generated synthetic data row-by-row through a :class:StreamWriter.
Workflow¶
- Generate a full batch via the Spindle engine (or accept pre-generated tables).
- Convert each row to an event dict, adding
_spindle_tableand_spindle_seqmetadata fields. - Optionally inject anomalies via an :class:
~sqllocks_spindle.streaming.anomaly.AnomalyRegistry. - Optionally reorder a fraction of events to simulate out-of-order arrival.
- Emit in batches through the configured :class:
StreamWritersink. Rate-limiting (token bucket + Poisson inter-arrivals) is applied only whenStreamConfig(realtime=True).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
A Domain instance (e.g. |
None
|
|
tables
|
dict[str, DataFrame] | None
|
Pre-generated |
None
|
sink
|
StreamWriter | None
|
:class: |
None
|
config
|
StreamConfig | None
|
:class: |
None
|
anomaly_registry
|
Optional :class: |
None
|
|
scale
|
str
|
Scale preset for generation (ignored when tables is provided). |
'small'
|
seed
|
int | None
|
Random seed. |
42
|
Example::
from sqllocks_spindle import RetailDomain
from sqllocks_spindle.streaming import SpindleStreamer, StreamConfig, FileSink
result = SpindleStreamer(
domain=RetailDomain(),
sink=FileSink("events.jsonl", mode="w"),
config=StreamConfig(max_events=500),
).stream("order")
print(result)
Methods:¶
stream(table, timestamp_column=None)
¶
Stream all rows of table through the configured sink.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
Name of the table to stream. |
required |
timestamp_column
|
str | None
|
Column to use as |
None
|
Returns:
| Type | Description |
|---|---|
StreamResult
|
class: |
stream_all(timestamp_column=None)
¶
Stream every table in generation order.
Returns:
| Type | Description |
|---|---|
list[StreamResult]
|
List of :class: |
StreamConfig
dataclass
¶
Configuration for a Spindle streaming run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events_per_second
|
float
|
Base target throughput. |
10.0
|
duration_seconds
|
float | None
|
Stop after this many wall-clock seconds ( |
None
|
max_events
|
int | None
|
Stop after this many events ( |
None
|
out_of_order_fraction
|
float
|
Fraction of events to reorder (0.0–1.0). |
0.0
|
out_of_order_max_delay_slots
|
int
|
Maximum slot positions an OOO event is delayed. |
10
|
burst_windows
|
list[BurstWindow]
|
List of :class: |
list()
|
time_pattern
|
TimePattern | None
|
Optional :class: |
None
|
label_anomalies
|
bool
|
Keep |
True
|
batch_size
|
int
|
Events per :meth: |
100
|
realtime
|
bool
|
If |
False
|
Methods:¶
get_rate_multiplier(elapsed, wall_hour=0, wall_dow=0)
¶
Return the composite rate multiplier at a given moment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
elapsed
|
float
|
Seconds since stream start. |
required |
wall_hour
|
int
|
Current wall-clock hour (0–23). |
0
|
wall_dow
|
int
|
Current wall-clock day-of-week (0=Monday). |
0
|
StreamResult
dataclass
¶
Result of a streaming run.
Attributes:
| Name | Type | Description |
|---|---|---|
table |
str
|
Name of the streamed table. |
events_sent |
int
|
Total events transmitted to the sink. |
anomaly_count |
int
|
Number of events flagged as anomalous. |
out_of_order_count |
int
|
Number of events deliberately reordered. |
elapsed_seconds |
float
|
Wall-clock duration of the streaming run. |
events_per_second_actual |
float
|
Measured throughput. |
StreamWriter
¶
Bases: ABC
Abstract base for all streaming sinks.
Concrete implementations provide the actual transport layer:
:class:~sqllocks_spindle.streaming.sinks.console_sink.ConsoleSink,
:class:~sqllocks_spindle.streaming.sinks.file_sink.FileSink,
:class:~sqllocks_spindle.streaming.sinks.eventhub_sink.EventHubSink,
:class:~sqllocks_spindle.streaming.sinks.kafka_sink.KafkaSink.
Subclasses must implement :meth:send_batch. :meth:send has a default
implementation that wraps a single event in a list.
TimePattern
dataclass
¶
Time-of-day and day-of-week rate multipliers.
Applied on top of the base rate when simulating real-world temporal patterns. E.g., e-commerce has high traffic 7pm–10pm and lower traffic at 3am.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hour_multipliers
|
dict[int, float]
|
Mapping of hour (0–23) to multiplier. |
dict()
|
dow_multipliers
|
dict[int, float]
|
Mapping of day-of-week (0=Monday … 6=Sunday) to multiplier. |
dict()
|
StreamingMultiWriter
¶
Stream generated data to multiple sinks concurrently.
For each (table_name, DataFrame) batch yielded by generate_stream(),
converts rows to event dicts and fans out to all registered sinks in
parallel via ThreadPoolExecutor.
Usage::
from sqllocks_spindle.streaming import ConsoleSink, FileSink, StreamingMultiWriter
smw = StreamingMultiWriter(
console=ConsoleSink(),
file=FileSink("events.jsonl"),
)
result = smw.stream(spindle.generate_stream(domain=domain, scale="small", seed=42))
print(result.summary())
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_workers
|
int | None
|
Number of parallel threads per batch (default: number of sinks). |
None
|
batch_size
|
int
|
Max rows per send_batch call (default: 100). |
100
|
stop_on_sink_error
|
bool
|
If True, abort streaming when any sink errors. Default False — partial failures are captured and streaming continues. |
False
|
**sinks
|
StreamWriter
|
Keyword arguments of |
{}
|
Methods:¶
add_sink(name, sink)
¶
Add a sink and return self for chaining.
remove_sink(name)
¶
Remove a sink by name.
stream(generator)
¶
Stream (table_name, DataFrame) batches to all sinks in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
generator
|
Iterator[tuple[str, DataFrame]]
|
Iterator yielding (table_name, DataFrame) tuples.
Typically |
required |
Returns:
| Type | Description |
|---|---|
StreamingMultiWriteResult
|
StreamingMultiWriteResult with per-sink stats. |
stream_table(table_name, df)
¶
Stream a single DataFrame to all sinks. Returns {sink_name: success}.
StreamingMultiWriteResult
dataclass
¶
Aggregated result from streaming to multiple sinks.
SinkResult
dataclass
¶
Per-sink streaming result.
ContinueEngine
¶
Generate incremental deltas (inserts, updates, deletes) from existing data.
Methods:¶
continue_from(existing, schema=None, config=None)
¶
Generate incremental changes from existing data.
Parameters¶
existing:
Either a GenerationResult (from Spindle.generate()) or a
plain dict[str, pd.DataFrame].
schema:
Optional SpindleSchema. If existing is a GenerationResult
the schema is extracted automatically.
config:
ContinueConfig controlling insert/update/delete volumes and
state-transition rules. Defaults are used when None.
ContinueConfig
dataclass
¶
Configuration for incremental generation.
Attributes:
| Name | Type | Description |
|---|---|---|
insert_count |
int
|
Number of new rows to generate per anchor table. |
update_fraction |
float
|
Fraction of existing rows to update (0.0 - 1.0). |
delete_fraction |
float
|
Fraction of existing rows to soft-delete (0.0 - 1.0). |
state_transitions |
dict[str, dict[str, dict[str, float]]]
|
Per-column Markov transition probabilities.
Format: |
timestamp_column |
str
|
Name of the delta-timestamp metadata column. |
delta_type_column |
str
|
Name of the delta-type metadata column. |
seed |
int | None
|
Optional random seed for reproducibility. |
DeltaResult
dataclass
¶
Result of incremental generation.
TimeTravelEngine
¶
Generate monthly point-in-time snapshots showing data evolution.
Methods:¶
generate(domain, config=None, scale='small')
¶
Generate monthly snapshots of a domain's data.
Month 0 is the initial dataset. Each subsequent month applies: - New entity growth (inserts) at growth_rate * seasonality multiplier - Status transitions (updates) at update_fraction rate - Churn (soft deletes) at churn_rate
Returns a TimeTravelResult with N+1 snapshots (month 0 through month N).
TimeTravelConfig
dataclass
¶
Configuration for time-travel snapshot generation.
TimeTravelResult
dataclass
¶
DataMasker
¶
Replace PII in real data with synthetic values preserving distributions.
DataProfiler
¶
Analyse one or more DataFrames and produce profiles.
Methods:¶
profile_dataframe(df, table_name='table')
¶
Profile a single DataFrame.
profile_dataset(tables)
¶
Profile a dict of DataFrames and detect cross-table relationships.
profile(df, table_name='table')
¶
Alias for profile_dataframe(). Profile a single DataFrame.
from_csv(path, table_name=None, sample_rows=None, **kwargs)
classmethod
¶
Profile a CSV file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to the CSV file. |
required |
table_name
|
str | None
|
Name for the table profile. Defaults to the filename stem. |
None
|
sample_rows
|
int | None
|
If set, sample this many rows before profiling. |
None
|
**kwargs
|
Passed to DataProfiler constructor (fit_threshold, top_n_values, etc.). |
{}
|
ExportedProfile
dataclass
¶
A portable profile that can be imported into any domain.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
str
|
Profile identifier (e.g. |
description |
str
|
Human-readable description of what this profile represents. |
source_domain |
str
|
Name of the domain this profile was exported from
(or |
distributions |
dict[str, dict[str, float]]
|
Mapping of |
ratios |
dict[str, float]
|
Mapping of ratio names to float multipliers. |
metadata |
dict[str, Any]
|
Arbitrary extra information (row counts, column types, etc.). |
LakehouseProfiler
¶
Profile Fabric Lakehouse Delta tables and return TableProfile objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workspace_id
|
str
|
Fabric workspace GUID. |
required |
lakehouse_id
|
str
|
Fabric lakehouse GUID. |
required |
token_provider
|
Any | None
|
A callable returning an Azure access token string. Defaults to DefaultAzureCredential when azure-identity is installed. |
None
|
default_sample_rows
|
int | None
|
Row limit for profiling. Pass None to scan entire table. |
100000
|
Methods:¶
profile_table(table_name, sample_rows='default')
¶
Profile a single Delta table.
profile_all(sample_rows='default')
¶
Profile all tables in the lakehouse.
detect_foreign_keys(table_names=None, overlap_threshold=0.9, sample_rows='default', full_scan=False)
¶
Sampled cross-table FK detection (advisory). ADR-009 / STORY-016.
Reads each table's columns (sampled by default) and runs the proven
DataProfiler._detect_foreign_keys_advisory core (naming *_id plus
value-overlap >= overlap_threshold) across every table pair. Detected
FKs are advisory and reported with the measured overlap; a declared
star_map / RelationshipDef remains authoritative and overrides
(resolved by the caller, not here).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_names
|
list[str] | None
|
Tables to scan. Defaults to all tables in the lakehouse. |
None
|
overlap_threshold
|
float
|
Minimum child-to-parent value overlap to report a FK (default 0.9, configurable per ADR-009). |
0.9
|
sample_rows
|
int | None | str
|
Per-table row cap used when reading key columns.
|
'default'
|
full_scan
|
bool
|
Read entire tables (no sampling) to confirm a sampled result (ADR-009 full-scan option). |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, dict[str, dict[str, Any]]]
|
``{child_table: {col_name: {"parent_table": str, "overlap": float, |
dict[str, dict[str, dict[str, Any]]]
|
"advisory": True, "full_scan": bool}}}`` for every detected FK. |
reconcile_declared_foreign_keys(detected, declared)
staticmethod
¶
Declared FKs override detected advisory FKs (ADR-009 / STORY-017).
A declared star_map / RelationshipDef is AUTHORITATIVE: where a
declaration exists for a (child_table, child_col) it wins over any
detected FK, even a high-overlap one. Detected FKs that a declaration
overrode are REPORTED (not silently dropped) for transparency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
detected
|
dict[str, dict[str, dict[str, Any]]]
|
the output of :meth: |
required |
declared
|
Any
|
iterable of |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
|
dict[str, Any]
|
Resolved declared entries carry |
MaskConfig
dataclass
¶
Configuration for data masking.
ProfileIO
¶
Export, import, and list domain profiles.
All public methods are stateless — no configuration is stored on the
instance. Instantiate with ProfileIO() and call methods directly.
Example::
io = ProfileIO()
io.export_profile(RetailDomain(), Path("retail_profile.json"))
io.import_profile(Path("retail_profile.json"), HealthcareDomain(), save_as="from_retail")
io.list_profiles(RetailDomain())
Methods:¶
export_profile(domain, output_path, profile_name='default')
¶
Export a domain's active profile to a standalone JSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
Any
|
A :class: |
required |
output_path
|
str | Path
|
Destination file path (created if it does not exist). |
required |
profile_name
|
str
|
Label stored in the exported metadata. |
'default'
|
Returns:
| Type | Description |
|---|---|
Path
|
The resolved :class: |
import_profile(profile_path, target_domain, save_as=None)
¶
Import an exported profile into a target domain's profiles/ directory.
The imported file is converted to the standard domain profile format
(i.e. metadata is stripped; only name, description,
distributions, and ratios are kept).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
profile_path
|
str | Path
|
Path to an exported profile JSON file. |
required |
target_domain
|
Any
|
The domain instance to import into. |
required |
save_as
|
str | None
|
Override the profile name (and filename). When None the
name is taken from the file's |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The name the profile was saved as. |
list_profiles(domain)
¶
List all profiles available for a domain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
Any
|
A :class: |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, str | int]]
|
A list of dicts with keys |
list[dict[str, str | int]]
|
|
from_dataframe(df, table_name='table', name='inferred')
¶
Create a profile by inferring distributions from a DataFrame.
Categorical columns (object dtype or low cardinality) are converted into normalised distribution weights. High-cardinality columns are skipped.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The source DataFrame. |
required |
table_name
|
str
|
Prefix for distribution keys ( |
'table'
|
name
|
str
|
Name to assign to the resulting profile. |
'inferred'
|
Returns:
| Name | Type | Description |
|---|---|---|
An |
ExportedProfile
|
class: |
SchemaBuilder
¶
ProfileRegistry
¶
Manages named, tagged profiles under a configurable root directory.
Directory layout::
<root>/
<system>/
<table>/
<profile_name>.json
_index.json ← auto-maintained index
Methods:¶
save(profile)
¶
Save a profile to disk and update the index.
load(identity)
¶
Load a profile by identity (system/table/name).
delete(identity)
¶
Delete a profile from disk and index.
list_all()
¶
Return all index entries sorted by identity.
search(query=None, system=None, table=None, tags=None)
¶
Filter index entries by query string, system, table, and/or tags.
add_tags(identity, tags)
¶
Add tags to a profile (in-place, no duplicates).
remove_tags(identity, tags)
¶
Remove tags from a profile.
import_from_dir(source_dir, overwrite=False)
¶
Import all *.json profile files from a directory tree.
Returns a list of imported identity strings.
diff(identity_a, identity_b)
¶
Compare two profiles column by column.
Returns a dict with keys: added, removed, changed.
reindex()
¶
Rebuild _index.json from all .json files on disk. Returns count.
save_from_dataset_profile(dataset_profile, system, name, tags=None, description='', config=None)
¶
Convert a DatasetProfile into registry profiles via the SafeProfile mapper.
STORY-014 (ADR-001): the per-column stats are now built through
SafeProfile.from_dataset_profile (the canonical safe-and-correct
mapper), NOT the old hand-read of non-existent .min/.max/
.top_values attributes (the B2 attribute-mismatch bug). So registry
profiles carry the SAFE statistic set (dtype/null_rate/cardinality/mean/
std/quantiles/bounds/categorical_weights/categorical_histogram), with no
raw values. The RegistryProfile wrapper (system/table/name/tags/
description/source_rows) is unchanged, so the registry read side
(load/diff/tag/reindex) is unaffected; no on-disk format break, no sidecar.
Legacy registry files (old min/max/top_values columns) still load as-is.
config is forwarded to the SafeProfile mapper (e.g. k, sensitive).
One RegistryProfile is created per table. Returns the saved profiles.
validate(identity, result, sample_rows=500)
¶
Compare a GenerationResult against a stored profile.
Reconstructs an approximate reference DataFrame from stored column statistics and runs FidelityComparator against the new generation. Returns a FidelityReport. Requires scipy.
RegistryProfile
dataclass
¶
Functions:¶
get_preset(name)
¶
Get a built-in preset by name.
list_presets()
¶
List all built-in presets.