Tumbling-time-window¶
The tumbling-time-window creates a-per-worker tumbling time window. In this implementation a window aggregate is stored only in the machine's memory.
The aggregate is emitted configured sink via a emit point.
Based on VoltAggregateBuilder each event is transformed into a request.
If eventTimeSupplier is provided, it is used to source a time of the event.
Lastly a key can be provided so incoming event is routed to the right local window and aggregated.
Note that the window is a local, per-worker keyed aggregate, not a machine-wide keyed aggregate.
In case of downstream system error this tumbling-time-window will re-process events.
stream
.aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
.withWindowSpan(Duration.ofHours(1))
.withKeyExtractor(Event::getUserId)
.withEventTimeSupplier(Event::getTimestamp)
.withAggregateDefinition(builder -> builder
.count()
.sum("amount", Event::getAmount)
.min("amount", Event::getAmount)
.max("amount", Event::getAmount)
.distinct("id", (EventToStringExtractor<Event>) Event::getId)
)
.withExceptionHandler((records, context, throwable) -> {
// handle error
})
)
.emit(TumblingWindowTrigger.atEndOfWindow)
...
emitter:
tumbling-time-window:
windowSpan: "1h"
keyExtractor: "com.example.Event::getUserId"
eventTimeSupplier: "com.example.Event::getTimestamp"
aggregateDefinition:
from: "com.example.Event"
builder:
- type: count
- type: sum (amount)
- type: min (amount)
- type: max (amount)
- type: distinct (id)
# exceptionHandler: "com.example.MyExceptionHandler"
triggers:
- trigger: atEndOfWindow
...
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-window-api</artifactId>
<version>1.7.1</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-window-api', version: '1.7.1'
Properties¶
windowSpan¶
Defines window time span, after which the window is closed. Required.
Type: object
keyExtractor¶
The key is optional, if set the window will aggregate only events with the same key value. The key has to conform to Java equal and hashCode contract.
Type: object
aggregateDefinition¶
The aggregate is defined by the provided builder Required.
Type: object
eventTimeSupplier¶
If provided, the time will be sourced from the event. Otherwise time will be taken from local machine when the event is processed.
Type: object
latenessConfig¶
Configuration settings for the event lateness
Type: object
Fields of latenessConfig:
latenessConfig.delayWindowClose¶
This is an optional configuration.
It is only applied to time based windows that source time from event. Windows that are based on processing time and count based windows are not affected by lateness.
This configuration is used to specify the delay for closing a window when events are late. It is recommended to keep its value relatively small - for example 1-10 seconds, just to handle disruptions in event arrival, given that the upstream system is still available. This configuration should not be used to mitigate upstream systems downtime.
Once the window is closed, any late events are routed to exception handler.
Type: object
Default value: 0s
exceptionHandler¶
Custom exception handler enabling interception of all errors related to this emitter.
Type: object
Available Triggers¶
atEndOfWindow¶
Triggered after window has been closed.
atEventAggregate¶
Triggered after each event has been aggregated. An emitted aggregate is a snapshot of the aggregate after the event has been applied.
atAggregateError¶
Triggered every time the aggregate fails with an error and an event has not been aggregated.
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.