envelope
sqllocks_spindle.streaming.envelope
¶
Event envelope dataclass and factory for Spindle streaming.
Classes¶
EventEnvelope
dataclass
¶
Standard event envelope for Spindle streaming events.
All events emitted through Eventstream (or any other sink) are wrapped in this envelope to provide consistent metadata across domains and tables.
Attributes:
| Name | Type | Description |
|---|---|---|
schema_version |
str
|
Envelope schema version (default |
event_type |
str
|
Type of the event, typically |
event_time |
str
|
ISO-8601 timestamp of when the event occurred. |
correlation_id |
str
|
Unique identifier for tracing (auto-generated UUID). |
tenant_id |
str | None
|
Optional tenant identifier for multi-tenant scenarios. |
payload |
dict[str, Any]
|
The actual event data (row dict). |
metadata |
dict[str, Any]
|
Additional metadata key-value pairs. |
EnvelopeFactory
¶
Create :class:EventEnvelope instances from raw row dicts.
Provides a consistent way to wrap generated data rows into event envelopes with proper correlation IDs, timestamps, and domain context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
default_tenant_id
|
str | None
|
Optional default tenant ID applied to all envelopes unless overridden per call. |
None
|
timestamp_column
|
str | None
|
Name of the column to use as |
None
|
Example::
factory = EnvelopeFactory(default_tenant_id="acme-corp")
envelope = factory.create_envelope(
row_dict={"order_id": 1, "amount": 99.99, "order_date": "2025-01-15"},
table_name="order",
event_type="retail.order.created",
)
print(EnvelopeFactory.to_json(envelope))
Methods:¶
create_envelope(row_dict, table_name, event_type, schema_version='1.0', tenant_id=None, metadata=None)
¶
Create an :class:EventEnvelope from a raw row dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
row_dict
|
dict[str, Any]
|
The data payload (a single row as a dict). |
required |
table_name
|
str
|
Name of the source table (added to metadata). |
required |
event_type
|
str
|
Event type string (e.g. |
required |
schema_version
|
str
|
Envelope schema version (default |
'1.0'
|
tenant_id
|
str | None
|
Override tenant ID for this envelope. Falls back to the factory default if None. |
None
|
metadata
|
dict[str, Any] | None
|
Additional metadata to merge into the envelope. |
None
|
Returns:
| Type | Description |
|---|---|
EventEnvelope
|
A populated :class: |
to_dict(envelope)
staticmethod
¶
Convert an :class:EventEnvelope to a plain dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
envelope
|
EventEnvelope
|
The envelope to convert. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A JSON-serialisable dict. |
to_json(envelope)
staticmethod
¶
Convert an :class:EventEnvelope to a JSON string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
envelope
|
EventEnvelope
|
The envelope to convert. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A compact JSON string. |