Tiny query-plan runner used as a deterministic reference engine for testing.
Row-stream stage with open/next/close
Stateless row-local computation.
Stateful expr with reset/push/pop/value.
source = CsvScan(Path("data.csv"))typed = Map("time", ToTimestamp(Select("@timestamp")), source=source)filtered = Filter(child=typed, predicate=Eq(Select("dim"), Literal("a")))ordered = Sort(child=filtered, keys=(SortKey(Select("time")),))win = WindowedAggregate(
child=ordered,
window=Minutes(5),
key=Select("time"),
fn=(Map("sum", child=SumExpr(ToInt(Select("val")))),),
)win_rate = WindowedAggregate(
child=ordered,
window=Minutes(5),
key=Select("time"),
fn=(Map("rate", child=RateExpr(ToInt(Select("counter_val")), Select("time"), Minutes(5))),),
)
bucketed = GroupAggregate(
child=win_rate,
key=(Map("bucket", StepCeil(Minutes(5), ToTimestamp(Literal("2024-01-01T00:00:00Z")), Select("time"))),),
fn=(Map("rate", child=LastExpr(Select("rate"))),),
)
result = Sort(child=bucketed, keys=(SortKey(Select("bucket")),))uv run walle-tester .cases/tbucket_wf.py data=<csv> start=<iso> end=<iso> window=<min> step_sz=<min>
Run unit tests: uv run test