Streaming One-to-One
Each input produces exactly one output. These compose freely; the planner fuses adjacent stages into a single composed stage when possible.
Fixture
Examples in this chapter run against:
{
"users": [{"id":1,"name":"Ada"},{"id":2,"name":"Bob"}],
"xs": [1, 2, 3, 4, 5],
"prices":[100, 105, 102, 110, 108, 115]
}
map
- Signature:
Array<A> -> Array<B>(withf: A -> B) - Demand law:
MapLike— preserves pull, forcesWhole.
QUERY: $.users.map(u => u.name)
OUT: ["Ada","Bob"]
QUERY: $.xs.map(@ * 2)
OUT: [2, 4, 6, 8, 10]
QUERY: $.users.map(@.name.upper())
OUT: ["ADA","BOB"]
map is the workhorse. The lambda may use any of the three forms.
enumerate
- Signature:
Array<A> -> Array<{index: Number, value: A}> - Behavior: Pair each element with its zero-based index. Output is a
record
{index, value}per element.
QUERY: $.xs.enumerate()
OUT: [{"index":0,"value":1},{"index":1,"value":2},{"index":2,"value":3},{"index":3,"value":4},{"index":4,"value":5}]
QUERY: $.users.map(@.name).enumerate()
OUT: [{"index":0,"value":"Ada"},{"index":1,"value":"Bob"}]
pairwise
- Signature:
Array<A> -> Array<[A, A]> - Behavior: Yield consecutive pairs
[xs[0], xs[1]],[xs[1], xs[2]], …
QUERY: [1,2,3,4].pairwise()
OUT: [[1,2],[2,3],[3,4]]
QUERY: $.xs.pairwise().map(p => p[1] - p[0])
OUT: [1, 1, 1, 1]
lag(n=1) and lead(n=1)
- Signature:
Array<Number> -> Array<Number | null> - Behavior: Shift by
npositions; out-of-range positions becomenull. - Numeric: Output values are returned as floats regardless of input numeric type.
QUERY: $.xs.lag()
OUT: [null, 1.0, 2.0, 3.0, 4.0]
QUERY: $.xs.lead()
OUT: [2.0, 3.0, 4.0, 5.0, null]
QUERY: $.xs.lag(2)
OUT: [null, null, 1.0, 2.0, 3.0]
diff_window(n=1)
- Signature:
Array<Number> -> Array<Number | null> - Behavior:
xs[i] - xs[i - n], withnulluntil lag is satisfied.
QUERY: $.prices.diff_window()
OUT: [null, 5.0, -3.0, 8.0, -2.0, 7.0]
pct_change(n=1)
- Signature:
Array<Number> -> Array<Number | null> - Behavior:
(xs[i] - xs[i-n]) / xs[i-n]— relative change.
QUERY: [100.0, 110.0, 121.0].pct_change()
OUT: [null, 0.1, 0.09999999999999998]
cummax and cummin
- Signature:
Array<Number> -> Array<Number> - Behavior: Running max / min up to and including the current position.
QUERY: $.prices.cummax()
OUT: [100.0, 105.0, 105.0, 110.0, 110.0, 115.0]
QUERY: $.prices.cummin()
OUT: [100.0, 100.0, 100.0, 100.0, 100.0, 100.0]
zscore
- Signature:
Array<Number> -> Array<Number> - Behavior: Standardise:
(x - mean) / stddev. Two passes (one for stats, one for transform); not strictly streaming, but presented as a one-to-one stage at the user surface.
QUERY: [1.0, 2.0, 3.0, 4.0, 5.0].zscore()
OUT: [-1.414213562373095, -0.7071067811865475, 0.0, 0.7071067811865475, 1.414213562373095]
accumulate
See Barriers — accumulate is a barrier because it requires
a custom reducer over the full input.
Practical examples
DOC: {"prices":[100, 105, 102, 110, 108, 115]}
# Apply tax to every price
QUERY: $.prices.map(@ * 1.08)
OUT: [108.0, 113.4, 110.16000000000001, 118.80000000000001, 116.64000000000001, 124.2]
# Day-over-day deltas
QUERY: [100,105,102,110,108].pairwise().map(p => p[1] - p[0])
OUT: [5, -3, 8, -2]
# Running max ("high-water mark")
QUERY: $.prices.cummax()
OUT: [100.0, 105.0, 105.0, 110.0, 110.0, 115.0]
# Lag-1 to compare current vs previous
QUERY: $.prices.lag()
OUT: [null, 100.0, 105.0, 102.0, 110.0, 108.0]