Skip to content

warehouse_bulk_writer

sqllocks_spindle.fabric.warehouse_bulk_writer

Bulk writer for Fabric Warehouse using COPY INTO from staged Parquet.

Pattern: Stage Parquet to OneLake Files → COPY INTO → cleanup. Uses workspace identity (no SAS tokens needed within the same workspace).

Requires the fabric-sql and parquet extras::

pip install sqllocks-spindle[fabric-sql,parquet]

Classes

BulkWriteResult dataclass

Result of a bulk warehouse write operation.

WarehouseBulkWriter

Bulk writer for Microsoft Fabric Warehouse using COPY INTO.

Stages data as Parquet files on a Lakehouse Files path, then uses COPY INTO to load them into Warehouse tables. This is significantly faster than row-by-row INSERT for large datasets.

Parameters:

Name Type Description Default
connection_string str

pyodbc connection string for the Warehouse endpoint.

required
staging_lakehouse_path str

abfss:// path to a Lakehouse Files area for staging. Example: abfss://<workspace-id>@onelake.dfs.fabric.microsoft.com/<lakehouse-id>/Files

required
auth_method str

Authentication method for the Warehouse connection ("cli", "msi", "spn").

'cli'
schema_name str

SQL schema to create tables in (default "dbo").

'dbo'
local_staging_dir str | Path | None

Local directory for staging Parquet files before upload (when not running inside a Fabric Notebook). If None, uses a temp directory.

None

Example::

writer = WarehouseBulkWriter(
    connection_string="Driver={ODBC Driver 18 for SQL Server};Server=...",
    staging_lakehouse_path="abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files",
    auth_method="cli",
)
result = writer.write_chunked(chunked_result)
print(result.summary())
Methods:
create_table(table_name, sample_df)

Create the Warehouse table from a sample DataFrame's schema.

Drops the table first if it exists.

stage_chunk(table_name, chunk_df, chunk_idx)

Write a chunk as Parquet and stage it on OneLake for COPY INTO.

  • Inside a Fabric Notebook: writes directly via notebookutils.
  • Outside Fabric (local dev): writes to a temp file then uploads via ADLS Gen2 SDK (azure-storage-file-datalake), so COPY INTO always reads from the correct abfss:// path.

Returns the abfss:// remote path.

copy_into(table_name)

Execute COPY INTO from staged Parquet files into the Warehouse table.

Returns the number of rows loaded (as reported by the command).

cleanup_staging(table_name=None)

Remove staged Parquet files from OneLake.

If table_name is specified, clean only that table's staging files. Otherwise, clean the entire run's staging directory.

write_chunked(chunked_result)

Write a ChunkedGenerationResult to the Warehouse.

Creates tables, stages all chunks as Parquet, executes COPY INTO for each table, then cleans up staging files.

write_table_chunks(table_name, chunks)

Write an arbitrary iterator of DataFrames to a single Warehouse table.

Useful for manual chunked writes outside of ChunkedGenerationResult.

write_all(tables, **kwargs)

Write all tables — protocol-compatible alias for write_tables().

Conforms to the SpindleWriter protocol so WarehouseBulkWriter can be used with MultiStoreWriter.

write_tables(tables, chunk_size=1000000)

Write a dict of DataFrames to the Warehouse via COPY INTO.

For each table: DROP/CREATE, stage all chunks as Parquet, then execute a single wildcard COPY INTO per table. Tables are loaded concurrently via ThreadPoolExecutor (capped at 30 to stay below Fabric's 32-concurrent-query limit).

Parameters:

Name Type Description Default
tables dict[str, DataFrame]

Mapping of table_name -> DataFrame.

required
chunk_size int

Rows per Parquet chunk file (default 1M).

1000000

Returns:

Type Description
BulkWriteResult

BulkWriteResult with per-table row counts and any errors.