Skip to content

Tumbling-time-window

The tumbling-time-window creates a-per-worker tumbling time window. In this implementation a window aggregate is stored only in the machine's memory. The aggregate is emitted configured sink via a emit point.

Based on VoltAggregateBuilder each event is transformed into a request. If eventTimeSupplier is provided, it is used to source a time of the event. Lastly a key can be provided so incoming event is routed to the right local window and aggregated.

Note that the window is a local, per-worker keyed aggregate, not a machine-wide keyed aggregate.

In case of downstream system error this tumbling-time-window will re-process events.

stream
  .aggregateWithWindow(Windows.<Event>tumblingTimeWindow()
      .withWindowSpan(Duration.ofHours(1))
      .withKeyExtractor(Event::getUserId)
      .withEventTimeSupplier(Event::getTimestamp)
      .withAggregateDefinition(builder -> builder
          .count()
          .sum("amount", Event::getAmount)
          .min("amount", Event::getAmount)
          .max("amount", Event::getAmount)
          .distinct("id", (EventToStringExtractor<Event>) Event::getId)
      )
      .withExceptionHandler((records, context, throwable) -> {
          // handle error
      })
  )
  .emit(TumblingWindowTrigger.atEndOfWindow)
  ...
emitter:
  tumbling-time-window:
    windowSpan: "1h"
    keyExtractor: "com.example.Event::getUserId"
    eventTimeSupplier: "com.example.Event::getTimestamp"
    aggregateDefinition:
      from: "com.example.Event"
      builder:
        - type: count
        - type: sum (amount)
        - type: min (amount)
        - type: max (amount)
        - type: distinct (id)
    # exceptionHandler: "com.example.MyExceptionHandler"
  triggers:
    - trigger: atEndOfWindow
      ...

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-window-api</artifactId>
    <version>1.7.1</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-window-api', version: '1.7.1'

Properties

windowSpan

Defines window time span, after which the window is closed. Required.

Type: object

keyExtractor

The key is optional, if set the window will aggregate only events with the same key value. The key has to conform to Java equal and hashCode contract. Type: object

aggregateDefinition

The aggregate is defined by the provided builder Required.

Type: object

eventTimeSupplier

If provided, the time will be sourced from the event. Otherwise time will be taken from local machine when the event is processed. Type: object

latenessConfig

Configuration settings for the event lateness Type: object

Fields of latenessConfig:

latenessConfig.delayWindowClose

This is an optional configuration.

It is only applied to time based windows that source time from event. Windows that are based on processing time and count based windows are not affected by lateness.

This configuration is used to specify the delay for closing a window when events are late. It is recommended to keep its value relatively small - for example 1-10 seconds, just to handle disruptions in event arrival, given that the upstream system is still available. This configuration should not be used to mitigate upstream systems downtime.

Once the window is closed, any late events are routed to exception handler.

Type: object

Default value: 0s

exceptionHandler

Custom exception handler enabling interception of all errors related to this emitter. Type: object

Available Triggers

atEndOfWindow

Triggered after window has been closed.

atEventAggregate

Triggered after each event has been aggregated. An emitted aggregate is a snapshot of the aggregate after the event has been applied.

atAggregateError

Triggered every time the aggregate fails with an error and an event has not been aggregated.

JSON Schema

You can validate or explore the configuration using its JSON Schema.