Beats¶
The beats
source receives data from Elastic Beats agents.
BeatsSourceConfigBuilder.<BeatsMessage>builder()
.withAddress("0.0.0.0", 123)
.withClientInactivityTimeout(Duration.ofSeconds(42))
.withExceptionHandler(exceptionHandler)
.withDecoder(identity)
source:
beats:
address: "0.0.0.0:514"
client_inactivity_timeout: "PT1S"
Properties¶
address
¶
Sets the listening address in host:port
format.
Required.
Type: object
Fields of address
:
address.host
¶
Type: string
address.port
¶
Type: number
address.hasBracketlessColons
¶
Type: boolean
clientInactivityTimeout
¶
Time after which inactive connections will be closed.
Type: object
Default value: 30s
decoder
¶
Decoder to be applied to Beats data payloads received. Examples are decoders that convert incoming data to string or to byte arrays.
Required.
Type: object
exceptionHandler
¶
Custom exception handler enabling interception of all errors related to this source.
Type: object
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-beats-api</artifactId>
<version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-beats-api', version: '1.4.0'
Usage Examples¶
// Print beats messages to stdout but only if "some.key" is present in message metadata.
stream
.consumeFromSource(BeatsSourceConfigBuilder.<BeatsMessage>builder()
.withAddress("0.0.0.0", 5044)
.withClientInactivityTimeout(100, TimeUnit.SECONDS)
.withDecoder(Function.identity())
)
.processWith((VoltStreamFunction<BeatsMessage, String>) (beatsMessage, consumer, context) -> {
if (beatsMessage.getMap().containsKey("some.key")) {
consumer.consume(beatsMessage.getMessage());
}
})
.terminateWithSink(Sinks.stdout());