Skip to content

Kafka

The kafka source consumes data from an Apache Kafka topic. It allows configuration of consumer properties, such as topic names, bootstrap servers, poll timeout, starting offsets, and more. This source is suitable for integrating real-time data from Kafka into your streaming pipeline.

KafkaStreamSourceConfigurator.aConsumer()
   .withGroupId("my-group")
   .withTopicNames("topicA,topicB")
   .withBootstrapServers("serverA:9092,serverB:9092")
   .withStartingOffset(KafkaStartingOffset.EARLIEST)
   .withPollTimeout(Duration.ofMillis(250))
   .withMaxCommitRetries(3)
   .withMaxCommitTimeout(Duration.ofSeconds(10)
);
source:
   kafka:
   groupId: "my-group"
   bootstrapServers:
       - "serverA:9092"
       - "serverB:9092"
   topicNames:
       - "topicA"
       - "topicB"
   startingOffset: "EARLIEST"
   pollTimeout: "PT0.25S"
   maxCommitTimeout: "PT10S"
   maxCommitRetries: 3
   properties:
      key1: value1
      key2: value2

Properties

bootstrapServers

A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Type: array

topicNames

Type: array

groupId

A unique string that identifies the consumer group this consumer belongs to. Type: string

startingOffset

What to do when there is no initial offset in Kafka or if the current offset does not exist any more: - earliest: automatically reset the offset to the earliest offset. - latest: automatically reset the offset to the latest offset. - none: throw exception to the consumer if no previous offset is found for the consumer's group.

Equivalent to Kafka auto.offset.reset setting.

Type: object

Supported values: earliest, latest, none.

Default value: earliest

pollTimeout

The maximum time to block the receiving thread. Type: object

Default value: 1s

maxCommitTimeout

Sets the maximum timeout for commit retries. Type: object

Default value: 10s

maxCommitRetries

Configures the maximum number of retries for committing offsets. Type: number

Default value: 3

properties

Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. Type: object

keyDeserializer

Type: object

valueDeserializer

Type: object

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-connectors-api</artifactId>
    <version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-connectors-api', version: '1.4.0'

Usage Examples

KafkaStreamSourceConfigurator.aConsumer()
    .withGroupId("my-group")
    .withTopicNames("topicA,topicB")
    .withBootstrapServers("serverA:9092,serverB:9092")
    .withStartingOffset(KafkaStartingOffset.LATEST)
    .withPollTimeout(Duration.ofMillis(500))
    .withMaxCommitRetries(5)
    .withMaxCommitTimeout(Duration.ofSeconds(15))
    .withSSL(KafkaStreamSslConfiguration.builder()
        .withTruststore("/path/to/truststore.jks", "truststore-password")
        .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
        .build())
    .withExceptionHandler((kafkaConsumer, context, ex) -> {
        System.err.println("Error while consuming data: " + ex.getMessage());
    });

This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.