anomaly
sqllocks_spindle.streaming.anomaly
¶
Anomaly injection for the Spindle streaming engine.
Three anomaly types (matching the taxonomy from the design spec):
- :class:
PointAnomaly— single row with an extreme column value - :class:
ContextualAnomaly— normal value placed in the wrong context - :class:
CollectiveAnomaly— a cluster of rows that is abnormal together
Combine them in an :class:AnomalyRegistry and call :meth:AnomalyRegistry.inject
on any DataFrame before streaming.
Classes¶
Anomaly
¶
Bases: ABC
Base class for anomaly definitions.
PointAnomaly
dataclass
¶
Bases: Anomaly
Single-row anomaly: a column value that is far outside the normal range.
Example: an order total of $99,999 when the average is $50.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
column
|
str
|
Column to modify. |
required |
multiplier_range
|
tuple[float, float]
|
|
(10.0, 100.0)
|
fraction
|
float
|
Fraction of rows to mark (default 0.01 = 1%). |
0.01
|
ContextualAnomaly
dataclass
¶
Bases: Anomaly
Normal value placed in the wrong context.
Example: a winter coat sold in July, or a NULL shipping address on a delivered order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
column
|
str
|
Column whose value will be replaced with an anomalous value. |
required |
condition_column
|
str
|
Column used to identify "normal" rows to corrupt. |
required |
normal_values
|
list[Any]
|
Values of |
required |
anomalous_values
|
list[Any]
|
Replacement values to write into |
required |
fraction
|
float
|
Fraction of eligible rows to corrupt (default 0.01). |
0.01
|
CollectiveAnomaly
dataclass
¶
Bases: Anomaly
A sequence of rows that is abnormal together.
Example: 47 orders from the same customer within 10 minutes (velocity fraud). All rows in the affected group have their timestamp compressed into a short window and are labelled as anomalous.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short label, used in |
required |
group_column
|
str
|
Column whose distinct values define groups (e.g. customer_id). |
required |
timestamp_column
|
str
|
Datetime column to compress into a short window. |
required |
window_seconds
|
float
|
Duration of the burst window (default 600 s = 10 min). |
600.0
|
fraction
|
float
|
Fraction of groups to corrupt (default 0.005). |
0.005
|
AnomalyRegistry
¶
Registry of anomaly definitions applied during streaming.
Example::
registry = AnomalyRegistry([
PointAnomaly("extreme_total", column="total_amount"),
ContextualAnomaly(
"winter_in_summer",
column="product_category",
condition_column="order_month",
normal_values=[6, 7, 8],
anomalous_values=["Winter Coats", "Heavy Jackets"],
),
])
# Apply to any DataFrame before streaming
df_labelled = registry.inject(df, rng)