How VoltSP Window Aggregation Works¶
VoltSP Streams¶
A stream represents a chain of self-dependent operators that can create an event, transform or filter and consume event at a sink. At any phase an event cannot be consumed outside operators defined in the pipeline and once an event is consumed by a sink, the framework starts processing next event from a source. Such stream cannot be part of bigger graph of transformation in one pipeline. This can be done by running and manually orchestrating multiple pipelines that understand how streams defined by those pipelines depend on each other.
Data streams¶
VoltSP since version 1.4 supports a data stream, which is expected to emit events at the defined trigger. Data stream does not define when the event will be emitted or how many events. It defines a trigger or group of triggers that are associated with new event emission. Data stream can accept any type of event <> and emit same type or different.
import org.voltdb.stream.api.ExecutionContext;
import org.voltdb.stream.api.extension.VoltDataTrigger;
import org.voltdb.stream.api.extension.VoltStreamSourceConfigurator;
import org.voltdb.stream.api.pipeline.VoltStreamSink;
public interface VoltDataEmitter<I, R, T extends VoltDataTrigger> extends VoltStreamSink<I> {
/**
* For a given trigger an emitter is configuring a source, which later must be connected with a sink and new child stream is created.
* This method is guaranteed to be called after this emitter is fully configured - see {@link #configure(ExecutionContext)}.
* @param trigger point
* @return source for the trigger point
*/
VoltStreamSourceConfigurator<R> configureSourceFor(T trigger);
/**
* For a given trigger an emitter is configuring a source, which later must be connected with a sink and new child stream is created.
* All operators in child stream are guaranteed to be configured after this emitter has been configured - see {@link #configure(ExecutionContext)}.
* This callback is fired when all child stream ale configured and emitter would like to finalize its configuration.
* @param context of the execution
*/
default void configuredAllChildStreams(ExecutionContext context) {}
}
org.voltdb.stream.api.extension.VoltDataEmitterConfigurator.
Data streams are perfect candidates to implement caching, buffering, deduplication mechanism, calls to external systems, etc., as data stream can be referenced in a pipeline. A data stream at defined trigger become a source for a child stream. Such child stream must be terminated either by a sink or by another emitter creating yet another child stream.
Window aggregation¶
this is experimental feature, available for preview only
A window aggregation is a special emitter that accepts event of type <> and
always returns one or more org.voltdb.stream.api.pipeline.window.VoltAggregate depends on used trigger and window type.
An aggregation must be defined by a window during pipeline declaration.
A org.voltdb.stream.api.pipeline.window.VoltAggregateBuilder is used to define a custom aggregation.
package org.voltdb.stream.api.pipeline.window;
import java.util.function.Function;
public interface VoltAggregateBuilder<I> {
VoltAggregateBuilder<I> count();
VoltAggregateBuilder<I> sum(String name, Function<I, Long> extractor);
VoltAggregateBuilder<I> min(String name, Function<I, Long> extractor);
VoltAggregateBuilder<I> max(String name, Function<I, Long> extractor);
VoltAggregateBuilder<I> distinct(String name, EventToLongExtractor<I> extractor);
VoltAggregateBuilder<I> distinct(String name, EventToIntegerExtractor<I> extractor);
VoltAggregateBuilder<I> distinct(String name, EventToStringExtractor<I> extractor);
VoltAggregateBuilder<I> last(String name, EventToLongExtractor<I> extractor);
VoltAggregateBuilder<I> last(String name, EventToIntegerExtractor<I> extractor);
VoltAggregateBuilder<I> last(String name, EventToStringExtractor<I> extractor);
VoltAggregateBuilder<I> first(String name, EventToLongExtractor<I> extractor);
VoltAggregateBuilder<I> first(String name, EventToIntegerExtractor<I> extractor);
VoltAggregateBuilder<I> first(String name, EventToStringExtractor<I> extractor);
}
An aggregate is represented as org.voltdb.stream.api.pipeline.window.VoltAggregate
package org.voltdb.stream.api.pipeline.window;
import java.time.Instant;
import java.util.Set;
public interface VoltAggregate {
<T> T getKey(Class<T> clazz);
<T> T getAggId(Class<T> clazz);
Instant emitTimestamp();
int count();
long min(String name);
long max(String name);
long sum(String name);
<T> T first(String name, Class<T> clazz);
<T> T last(String name, Class<T> clazz);
<T> Set<T> distinct(String name, Class<T> clazz);
}
Time Tumbling Window¶
Local Time Tumbling Window¶
A tumbling time window defines a time buckets within events are aggregated. No two windows can co-exist in same time. First window always begins at 00:00:00 of a day when pipeline is started. A window configuration accepts a duration, an event time supplier and exception handler. If event time supplier is not provided, the framework will use current processing time, or it's approximation.
This local window is not persisted, it operates locally and consumes any event that is read by a source within one worker thread. The window is not partitioned, that's why emitted aggregation has no key.
Trigger points¶
Time tumbling window defines two trigger points - org.voltdb.stream.plugin.window.api.tumbling.TumblingWindowTrigger
package org.voltdb.stream.plugin.window.api.tumbling;
public enum TumblingWindowTrigger implements VoltDataTrigger {
/**
* Triggered after window has been closed.
*/
atEndOfWindow,
/**
* Triggered after each event has been aggregated.
* An emitted aggregate is a snapshot of the aggregate after the event has been applied.
*/
atEventAggregate,
/**
* Triggered every time the aggregate fails with an error and an event has not been aggregated.
*/
atAggregateError
;
}
Example¶
A time tumbling window aggregation can be defined as
public record Order(long orderId, long value) {}
public record Event(
String phoneNumber,
String calledNumber,
Instant eventTime,
Duration duration,
Order order) {}
public record Report(int reportId,
Set<String> callerNumbers,
Set<String> calledNumbers,
int count,
long minOrderValue,
long maxOrderValue) {}
public static class WindowTimeProcessorPipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
var windowSpan = Duration.ofHours(1);
var aggregator = stream
.withName("Aggregator")
.consumeFromSource(Sources.generate(new EventSupplier(), 10, 20))
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(windowSpan)
.withExceptionHandler((records, context, throwable) -> {
failedEvents.addAll((Collection<Event>) records);
errors.add(throwable);
})
.withEventTimeSupplier(Event::eventTime)
.withAggregateDefinition(builder -> {
builder.count()
.min("order", event -> event.order().value())
.max("order", event -> event.order().value())
.distinct("caller", (EventToStringExtractor<Event>) Event::phoneNumber)
.distinct("callee", (EventToStringExtractor<Event>) Event::calledNumber)
.last("reportId", (EventToIntegerExtractor<Event>) event -> 234)
;
})
);
aggregator
.emit("at-the-end", TumblingWindowTrigger.atEndOfWindow)
.processWith(aggregate -> new Report(aggregate.last("reportId", Integer.class),
aggregate.distinct("caller", String.class),
aggregate.distinct("callee", String.class),
aggregate.count(),
aggregate.min("order"),
aggregate.max("order"))
)
.terminateWithSink(new ReportConsumer());
}
}
or in YAML:
version: 1
name: WindowTimeProcessorPipeline
source:
collection:
elements:
- "Lorem ipsum dolor sit amet"
- "consectetur adipiscing elit"
- "ed do eiusmod tempor incididunt ut labore et dolore magna aliqua"
processors:
- javascript:
script: |
function process(line) {
// returns a java.lang.String
return line;
}
emitter:
tumbling-time-window:
windowSpan: "1h"
aggregateDefinition:
from: "java.lang.String"
builder:
- type: count
- type: min (length)
- type: max (length)
- type: distinct (toString)
triggers:
- trigger: atEndOfWindow
sink:
stdout: {}
A processing time is taken from an event, a window will remain open, accepting events as long as system doesn't observe an event time greater than window time frame.
In this example a window will close and emit an aggregate at each hour (see previous note). The window calculates 1. a minimum value observed for orders value 1. a maximum value observed for orders value 1. a distinct set of caller numbers 1. a distinct set of called numbers 1. a constant that has been passed from window consumer side to an aggregate consumer 1. for each aggregate an event count is provided
Count Tumbling Window¶
Local Count Tumbling Window¶
A count tumbling window is a sibling of a time tumbling window, but it emits aggregation after observing N events.
Example¶
A count tumbling window aggregation can be defined as
public record Order(long orderId, long value) {}
public record Event(
String phoneNumber,
String calledNumber,
Instant eventTime,
Duration duration,
Order order) {}
public record Report(int reportId,
Set<String> callerNumbers,
Set<String> calledNumbers,
int count,
long minOrderValue,
long maxOrderValue) {}
public static class WindowCountProcessorPipeline implements VoltPipeline {
@Override
public void define(VoltStreamBuilder stream) {
var windowCount = 500;
var aggregator = stream
.withName("Aggregator")
.consumeFromSource(Sources.generate(new EventSupplier(), 10, 20))
.aggregateWithWindow(Windows.<Event>tumblingCountWindow()
.withMaxEventsPerWindow(windowCount)
.withExceptionHandler((records, context, throwable) -> {
failedEvents.addAll((Collection<Event>) records);
errors.add(throwable);
})
.withAggregateDefinition(builder -> {
builder.count()
.min("order", event -> event.order().value())
.max("order", event -> event.order().value())
.distinct("caller", (EventToStringExtractor<Event>) Event::phoneNumber)
.distinct("callee", (EventToStringExtractor<Event>) Event::calledNumber)
.last("reportId", (EventToIntegerExtractor<Event>) event -> 234)
;
})
);
aggregator
.emit("at-the-end", TumblingWindowTrigger.atEndOfWindow)
.processWith(aggregate -> new Report(aggregate.last("reportId", Integer.class),
aggregate.distinct("caller", String.class),
aggregate.distinct("callee", String.class),
aggregate.count(),
aggregate.min("order"),
aggregate.max("order"))
)
.terminateWithSink(new ReportConsumer());
}
}
or in YAML:
version: 1
name: WindowCountProcessorPipeline
source:
collection:
elements:
- "Lorem ipsum dolor sit amet"
- "consectetur adipiscing elit"
- "ed do eiusmod tempor incididunt ut labore et dolore magna aliqua"
processors:
- javascript:
script: |
function process(line) {
// returns a java.lang.String
return line;
}
emitter:
tumbling-count-window:
maxEventsPerWindow: 500
aggregateDefinition:
from: "java.lang.String"
builder:
- type: count
- type: min (length)
- type: max (length)
- type: distinct (toString)
triggers:
- trigger: atEndOfWindow
sink:
stdout: {}
In this example a window will close and emit an aggregate after 500 observed events. The window calculates 1. a minimum value observed for orders value 1. a maximum value observed for orders value 1. a distinct set of caller numbers 1. a distinct set of called numbers 1. a constant that has been passed from window consumer side to an aggregate consumer 1. for each aggregate an event count is provided