Skip to content

sinks

sqllocks_spindle.engine.sinks

Classes

Sink

Bases: Protocol

Protocol all sinks must implement.

Methods:
open(schema)

Called once before any chunks arrive. Create directories/tables/connections.

write_chunk(table, arrays)

Write one chunk of data for a table. Called once per table per chunk.

close()

Flush, commit, teardown. Called once after all chunks.

FabricConnectionProfile dataclass

Auth profile shared by all Fabric-backed sinks.

MemorySink

Accumulates all chunks in memory as DataFrames.

Not thread-safe. Each MemorySink instance should receive write_chunk() calls from a single thread. Safe for use with SinkRegistry since the registry calls each sink once per chunk dispatch.

Methods:
result()

Return all accumulated data as one DataFrame per table.

ParquetSink

Write chunks as partitioned Parquet files under output_dir/{table}/part-NNNNNN.parquet.

LakehouseSink

Delegate sink that writes chunks to a Fabric Lakehouse Files landing zone.

Thin wrapper around LakehouseFilesWriter. Accumulates numpy array chunks as DataFrames per table, then flushes via write_all() on close.

Works locally (base_path is a filesystem path) or against OneLake (base_path is an abfss:// URL). When using an abfss:// path the azure-storage-file-datalake and azure-identity packages must be installed (pip install sqllocks-spindle[fabric-files]).

Parameters:

Name Type Description Default
base_path str | Path | None

Root path for the landing zone. Passed through to :class:~sqllocks_spindle.fabric.LakehouseFilesWriter.

None
format str

Output format — "parquet", "csv", or "jsonl" (default "parquet").

'parquet'

WarehouseSink

Delegate sink that writes chunks to a Fabric Warehouse via COPY INTO.

Thin wrapper around :class:~sqllocks_spindle.fabric.WarehouseBulkWriter. Accumulates numpy array chunks as DataFrames per table, then flushes via write_tables() on close.

Requires pip install sqllocks-spindle[fabric-sql,parquet].

Parameters:

Name Type Description Default
connection_string str

pyodbc connection string for the Warehouse endpoint.

required
staging_lakehouse_path str

abfss:// path to a Lakehouse Files area for staging Parquet files before COPY INTO.

required
auth_method str

"cli", "msi", or "spn" (default "cli").

'cli'
schema_name str

SQL schema for target tables (default "dbo").

'dbo'
client_id str | None

Service principal client ID (required when auth_method="spn").

None
client_secret str | None

Service principal secret (required when auth_method="spn").

None
tenant_id str | None

Azure AD tenant ID (required when auth_method="spn").

None
chunk_size int

Rows per staged Parquet file (default 1 000 000).

1000000

KQLSink

Delegate sink that writes chunks to a Fabric Eventhouse (KQL database).

Thin wrapper around :class:~sqllocks_spindle.fabric.EventhouseWriter. Accumulates numpy array chunks as DataFrames per table, then ingests via write() on close.

Requires pip install sqllocks-spindle[fabric-kusto].

Parameters:

Name Type Description Default
cluster_uri str

Eventhouse URI, e.g. https://my-eventhouse.z0.kusto.fabric.microsoft.com.

required
database str

KQL database name.

required
auth_method str

"cli", "msi", "spn", or "fabric" (default "cli").

'cli'
table_prefix str

Optional prefix prepended to every KQL table name.

''
batch_size int

Rows per ingestion batch (default 10 000).

10000
client_id str | None

Service principal client ID (required when auth_method="spn").

None
client_secret str | None

Service principal secret (required when auth_method="spn").

None
tenant_id str | None

Azure AD tenant ID (required when auth_method="spn").

None

SQLDatabaseSink

Delegate sink that writes chunks to a Fabric SQL Database (or Azure SQL / Warehouse).

Thin wrapper around :class:~sqllocks_spindle.fabric.FabricSqlDatabaseWriter. Accumulates numpy array chunks as DataFrames per table, then flushes via write() on close.

Requires pip install sqllocks-spindle[fabric-sql].

Parameters:

Name Type Description Default
connection_string str

pyodbc connection string for the SQL endpoint.

required
auth_method str

"cli", "msi", "spn", "sql", "device-code", or "fabric" (default "cli").

'cli'
schema_name str

Target SQL schema (default "dbo").

'dbo'
mode str

Write mode — "create_insert", "insert_only", "truncate_insert", or "append" (default "create_insert").

'create_insert'
batch_size int

Rows per INSERT batch (default 5 000).

5000
staging_lakehouse_path str | None

Optional abfss:// staging path to enable COPY INTO for Fabric Warehouse targets (faster bulk load).

None
client_id str | None

Service principal client ID (required when auth_method="spn").

None
client_secret str | None

Service principal secret (required when auth_method="spn").

None
tenant_id str | None

Azure AD tenant ID (required when auth_method="spn").

None