Tutorial 13: Medallion Architecture¶
Build a complete Bronze/Silver/Gold lakehouse pipeline with synthetic data -- from messy landing zone to curated star schema.
Prerequisites¶
- Python 3.10 or later
pip install sqllocks-spindle- Completed Tutorial 10: Fabric Lakehouse
- Basic understanding of the medallion architecture pattern (Bronze = raw, Silver = validated, Gold = analytics-ready)
What You'll Learn¶
- How to generate medium-scale retail data and write it to a bronze landing zone
- How to inject chaos (schema drift, orphan FKs) to simulate real-world messy upstream data
- How to run validation gates to catch data quality issues
- How to quarantine bad records and promote clean data to silver
- How to transform silver data into a gold star schema with surrogate keys and a date dimension
- How to organize the full medallion folder structure
Step 1: Generate Retail Data at Medium Scale¶
Generate a full retail dataset -- customers, products, stores, orders, and order lines -- at medium scale. Real medallion pipelines ingest messy upstream data, so Spindle gives us realistic relational data with proper FK relationships to build and test each tier.
from sqllocks_spindle import Spindle, RetailDomain
spindle = Spindle()
result = spindle.generate(domain=RetailDomain(), scale="medium", seed=42)
print(result.summary())
print(f"\nFK integrity errors: {len(result.verify_integrity())}")
At medium scale this produces approximately 1.9 million rows across 9 tables, with zero FK integrity errors.
Step 2: Bronze -- Write Raw Parquet with Chaos Injection¶
The bronze layer should contain data exactly as it arrived -- warts and all. First write the clean generated data to Parquet, then inject chaos to simulate real-world upstream issues. Testing your pipeline against only clean data gives you false confidence.
from pathlib import Path
from sqllocks_spindle.chaos.config import ChaosConfig
from sqllocks_spindle.chaos.engine import ChaosEngine
# Set up medallion directory structure
base_dir = Path("medallion_demo")
bronze_dir = base_dir / "bronze" / "retail"
silver_dir = base_dir / "silver" / "retail"
gold_dir = base_dir / "gold" / "retail"
for d in [bronze_dir, silver_dir, gold_dir]:
d.mkdir(parents=True, exist_ok=True)
# Write clean data as raw Parquet (the "as-generated" landing)
paths = result.to_parquet(bronze_dir)
print(f"Bronze: wrote {len(paths)} Parquet files to {bronze_dir}/")
for p in paths:
print(f" {p.name} ({p.stat().st_size / 1024:.1f} KB)")
Now inject chaos. The ChaosConfig controls intensity and which categories fire. Here we enable schema drift (extra/missing columns) and referential chaos (orphan FK values):
chaos_cfg = ChaosConfig(
enabled=True,
intensity="moderate",
seed=99,
warmup_days=0,
chaos_start_day=0,
categories={
"value": {"enabled": False, "weight": 0.0},
"schema": {"enabled": True, "weight": 0.15},
"referential": {"enabled": True, "weight": 0.20},
"temporal": {"enabled": False, "weight": 0.0},
"file": {"enabled": False, "weight": 0.0},
"volume": {"enabled": False, "weight": 0.0},
},
)
chaos = ChaosEngine(chaos_cfg)
# Apply schema chaos to each table
bronze_tables = {}
for table_name, df in result.tables.items():
corrupted = chaos.drift_schema(df.copy(), day=10)
bronze_tables[table_name] = corrupted
extra_cols = len(corrupted.columns) - len(df.columns)
print(f" {table_name}: {len(corrupted)} rows, {extra_cols:+d} columns from schema drift")
# Inject referential chaos across the full table set
bronze_tables = chaos.inject_referential_chaos(bronze_tables, day=10)
print("\nBronze layer chaos injection complete (schema drift + orphan FKs).")
The chaos engine adds extra columns to some tables and introduces orphan foreign key values that reference non-existent parent rows -- exactly the kinds of issues you encounter with real upstream data.
Step 3: Silver -- Clean and Validate with ValidationGates¶
The silver layer is your "validated, conformed" tier. Every record that makes it past the gates is trustworthy. Run the corrupted bronze data through Spindle's GateRunner with built-in validation gates.
from sqllocks_spindle.validation.gates import (
GateRunner, ValidationContext, GateResult
)
context = ValidationContext(
tables=bronze_tables,
schema=result.schema,
)
runner = GateRunner(gates=[
"referential_integrity",
"schema_conformance",
"null_constraint",
"unique_constraint",
])
gate_results = runner.run_all(context)
summary = GateRunner.summary(gate_results)
print(f"Gates run: {summary['total_gates']}")
print(f"Passed: {summary['passed']}")
print(f"Failed: {summary['failed']}")
print(f"Total errors: {summary['total_errors']}")
for gr in gate_results:
status = "PASS" if gr.passed else "FAIL"
print(f" [{status}] {gr.gate_name}: {len(gr.errors)} errors, {len(gr.warnings)} warnings")
Now quarantine bad records and promote clean data to silver. Drop chaos-injected extra columns to restore schema conformance, and isolate rows with null primary keys using the QuarantineManager.
from sqllocks_spindle.validation.quarantine import QuarantineManager
quarantine_dir = base_dir / "quarantine"
qm = QuarantineManager(domain="retail")
silver_tables = {}
for table_name, bronze_df in bronze_tables.items():
original_df = result.tables[table_name]
original_cols = set(original_df.columns)
bronze_cols = set(bronze_df.columns)
# Drop chaos-injected extra columns
extra_cols = bronze_cols - original_cols
clean_df = bronze_df.drop(columns=list(extra_cols), errors="ignore")
# Quarantine rows with null PKs
pk_cols = result.schema.tables[table_name].primary_key
if pk_cols:
bad_mask = clean_df[pk_cols].isna().any(axis=1)
if bad_mask.sum() > 0:
bad_rows = clean_df[bad_mask]
qm.quarantine_dataframe(
bad_rows, quarantine_dir, run_id="bronze_v1",
table_name=table_name, reason="Null primary key",
gate_name="null_constraint",
)
clean_df = clean_df[~bad_mask]
silver_tables[table_name] = clean_df
print(f" {table_name}: {len(bronze_df)} bronze -> {len(clean_df)} silver rows")
# Write silver to Parquet
for name, df in silver_tables.items():
df.to_parquet(silver_dir / f"{name}.parquet", index=False)
print(f"\nSilver: wrote {len(silver_tables)} cleaned tables to {silver_dir}/")
Step 4: Gold -- Transform to Star Schema¶
Use StarSchemaTransform to convert the cleaned silver tables into a dimensional model with surrogate keys, a date dimension, and fact/dim separation. This is optimized for Power BI and analytics queries.
from sqllocks_spindle import (
StarSchemaTransform, StarSchemaMap, DimSpec, FactSpec
)
schema_map = StarSchemaMap(
dims={
"dim_customer": DimSpec(
source="customer", sk="sk_customer", nk="customer_id"
),
"dim_product": DimSpec(
source="product", sk="sk_product", nk="product_id"
),
"dim_store": DimSpec(
source="store", sk="sk_store", nk="store_id"
),
},
facts={
"fact_order": FactSpec(
primary="order",
fk_map={
"customer_id": "dim_customer",
"store_id": "dim_store",
},
date_cols=["order_date"],
),
},
generate_date_dim=True,
)
transform = StarSchemaTransform()
star = transform.transform(silver_tables, schema_map)
print(star.summary())
This produces dimension tables (dim_customer, dim_product, dim_store, dim_date) and a fact_order table with surrogate keys replacing natural keys.
# Write gold star schema tables to Parquet
gold_tables = star.all_tables()
for name, df in gold_tables.items():
df.to_parquet(gold_dir / f"{name}.parquet", index=False)
print(f" {name}: {len(df):,} rows x {len(df.columns)} cols")
print(f"\nGold: wrote {len(gold_tables)} star schema tables to {gold_dir}/")
Step 5: Inspect the Final Folder Structure¶
The completed medallion pipeline produces a clean folder layout:
medallion_demo/
bronze/retail/ (9 raw Parquet files)
silver/retail/ (9 cleaned Parquet files)
gold/retail/ (5 star schema files: dim_customer, dim_product, dim_store, dim_date, fact_order)
quarantine/ (bad records with full metadata for investigation)
print("--- Medallion Directory Structure ---")
for tier in ["bronze", "silver", "gold", "quarantine"]:
tier_path = base_dir / tier
if tier_path.exists():
files = list(tier_path.rglob("*"))
data_files = [f for f in files if f.is_file() and not f.name.startswith(".")]
print(f" {tier}/ ({len(data_files)} files)")
for f in sorted(data_files)[:5]:
print(f" {f.relative_to(base_dir)}")
Run It Yourself
- Notebook:
F01_medallion_architecture.ipynb
Related¶
- Chaos guide -- full reference for all 6 chaos categories and intensity presets
- Validation guide -- all 8 built-in validation gates and the quarantine workflow
- Star Schema guide -- DimSpec, FactSpec, and date dimension configuration
Next Step¶
Tutorial 14: Scenario Packs -- run pre-built YAML-defined end-to-end data generation workflows across 11 industry verticals.