Skip to content

anomaly

sqllocks_spindle.streaming.anomaly

Anomaly injection for the Spindle streaming engine.

Three anomaly types (matching the taxonomy from the design spec):

  • :class:PointAnomaly — single row with an extreme column value
  • :class:ContextualAnomaly — normal value placed in the wrong context
  • :class:CollectiveAnomaly — a cluster of rows that is abnormal together

Combine them in an :class:AnomalyRegistry and call :meth:AnomalyRegistry.inject on any DataFrame before streaming.

Classes

Anomaly

Bases: ABC

Base class for anomaly definitions.

Attributes
anomaly_type abstractmethod property

Short label used in the _spindle_anomaly_type column.

fraction abstractmethod property

Fraction of eligible rows (or groups) to mark as anomalous.

Methods:
inject(df, rng) abstractmethod

Inject anomalies into df and return a modified copy.

PointAnomaly dataclass

Bases: Anomaly

Single-row anomaly: a column value that is far outside the normal range.

Example: an order total of $99,999 when the average is $50.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
column str

Column to modify.

required
multiplier_range tuple[float, float]

(min, max) multiplier applied to the column mean to produce the anomalous value.

(10.0, 100.0)
fraction float

Fraction of rows to mark (default 0.01 = 1%).

0.01

ContextualAnomaly dataclass

Bases: Anomaly

Normal value placed in the wrong context.

Example: a winter coat sold in July, or a NULL shipping address on a delivered order.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
column str

Column whose value will be replaced with an anomalous value.

required
condition_column str

Column used to identify "normal" rows to corrupt.

required
normal_values list[Any]

Values of condition_column that make a row eligible.

required
anomalous_values list[Any]

Replacement values to write into column.

required
fraction float

Fraction of eligible rows to corrupt (default 0.01).

0.01

CollectiveAnomaly dataclass

Bases: Anomaly

A sequence of rows that is abnormal together.

Example: 47 orders from the same customer within 10 minutes (velocity fraud). All rows in the affected group have their timestamp compressed into a short window and are labelled as anomalous.

Parameters:

Name Type Description Default
name str

Short label, used in _spindle_anomaly_type.

required
group_column str

Column whose distinct values define groups (e.g. customer_id).

required
timestamp_column str

Datetime column to compress into a short window.

required
window_seconds float

Duration of the burst window (default 600 s = 10 min).

600.0
fraction float

Fraction of groups to corrupt (default 0.005).

0.005

AnomalyRegistry

Registry of anomaly definitions applied during streaming.

Example::

registry = AnomalyRegistry([
    PointAnomaly("extreme_total", column="total_amount"),
    ContextualAnomaly(
        "winter_in_summer",
        column="product_category",
        condition_column="order_month",
        normal_values=[6, 7, 8],
        anomalous_values=["Winter Coats", "Heavy Jackets"],
    ),
])

# Apply to any DataFrame before streaming
df_labelled = registry.inject(df, rng)
Methods:
add(anomaly)

Register an anomaly and return self for chaining.

inject(df, rng)

Apply all registered anomalies to df and return a labelled copy.

Label columns _spindle_is_anomaly and _spindle_anomaly_type are always present in the returned DataFrame, even when the registry is empty.