warehouse_bulk_writer
sqllocks_spindle.fabric.warehouse_bulk_writer
¶
Bulk writer for Fabric Warehouse using COPY INTO from staged Parquet.
Pattern: Stage Parquet to OneLake Files → COPY INTO → cleanup. Uses workspace identity (no SAS tokens needed within the same workspace).
Requires the fabric-sql and parquet extras::
pip install sqllocks-spindle[fabric-sql,parquet]
Classes¶
BulkWriteResult
dataclass
¶
Result of a bulk warehouse write operation.
WarehouseBulkWriter
¶
Bulk writer for Microsoft Fabric Warehouse using COPY INTO.
Stages data as Parquet files on a Lakehouse Files path, then uses
COPY INTO to load them into Warehouse tables. This is significantly
faster than row-by-row INSERT for large datasets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_string
|
str
|
pyodbc connection string for the Warehouse endpoint. |
required |
staging_lakehouse_path
|
str
|
abfss:// path to a Lakehouse Files area for staging.
Example: |
required |
auth_method
|
str
|
Authentication method for the Warehouse connection
( |
'cli'
|
schema_name
|
str
|
SQL schema to create tables in (default |
'dbo'
|
local_staging_dir
|
str | Path | None
|
Local directory for staging Parquet files before upload (when not running inside a Fabric Notebook). If None, uses a temp directory. |
None
|
Example::
writer = WarehouseBulkWriter(
connection_string="Driver={ODBC Driver 18 for SQL Server};Server=...",
staging_lakehouse_path="abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/Files",
auth_method="cli",
)
result = writer.write_chunked(chunked_result)
print(result.summary())
Methods:¶
create_table(table_name, sample_df)
¶
Create the Warehouse table from a sample DataFrame's schema.
Drops the table first if it exists.
stage_chunk(table_name, chunk_df, chunk_idx)
¶
Write a chunk as Parquet and stage it on OneLake for COPY INTO.
- Inside a Fabric Notebook: writes directly via notebookutils.
- Outside Fabric (local dev): writes to a temp file then uploads via ADLS Gen2 SDK (azure-storage-file-datalake), so COPY INTO always reads from the correct abfss:// path.
Returns the abfss:// remote path.
copy_into(table_name)
¶
Execute COPY INTO from staged Parquet files into the Warehouse table.
Returns the number of rows loaded (as reported by the command).
cleanup_staging(table_name=None)
¶
Remove staged Parquet files from OneLake.
If table_name is specified, clean only that table's staging files. Otherwise, clean the entire run's staging directory.
write_chunked(chunked_result)
¶
Write a ChunkedGenerationResult to the Warehouse.
Creates tables, stages all chunks as Parquet, executes COPY INTO for each table, then cleans up staging files.
write_table_chunks(table_name, chunks)
¶
Write an arbitrary iterator of DataFrames to a single Warehouse table.
Useful for manual chunked writes outside of ChunkedGenerationResult.
write_all(tables, **kwargs)
¶
Write all tables — protocol-compatible alias for write_tables().
Conforms to the SpindleWriter protocol so WarehouseBulkWriter can be used with MultiStoreWriter.
write_tables(tables, chunk_size=1000000)
¶
Write a dict of DataFrames to the Warehouse via COPY INTO.
For each table: DROP/CREATE, stage all chunks as Parquet, then execute a single wildcard COPY INTO per table. Tables are loaded concurrently via ThreadPoolExecutor (capped at 30 to stay below Fabric's 32-concurrent-query limit).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tables
|
dict[str, DataFrame]
|
Mapping of table_name -> DataFrame. |
required |
chunk_size
|
int
|
Rows per Parquet chunk file (default 1M). |
1000000
|
Returns:
| Type | Description |
|---|---|
BulkWriteResult
|
BulkWriteResult with per-table row counts and any errors. |