Kafka¶
The kafka
source consumes data from an Apache Kafka topic. It allows configuration of consumer properties,
such as topic names, bootstrap servers, poll timeout, starting offsets, and more.
This source is suitable for integrating real-time data from Kafka into your streaming pipeline.
KafkaStreamSourceConfigurator.aConsumer()
.withGroupId("my-group")
.withTopicNames("topicA,topicB")
.withBootstrapServers("serverA:9092,serverB:9092")
.withStartingOffset(KafkaStartingOffset.EARLIEST)
.withPollTimeout(Duration.ofMillis(250))
.withMaxCommitRetries(3)
.withMaxCommitTimeout(Duration.ofSeconds(10)
);
source:
kafka:
groupId: "my-group"
bootstrapServers:
- "serverA:9092"
- "serverB:9092"
topicNames:
- "topicA"
- "topicB"
startingOffset: "EARLIEST"
pollTimeout: "PT0.25S"
maxCommitTimeout: "PT10S"
maxCommitRetries: 3
properties:
key1: value1
key2: value2
Properties¶
bootstrapServers
¶
A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers.
Type: array
topicNames
¶
Type: array
groupId
¶
A unique string that identifies the consumer group this consumer belongs to.
Type: string
startingOffset
¶
What to do when there is no initial offset in Kafka or if the current offset does not exist any more: - earliest: automatically reset the offset to the earliest offset. - latest: automatically reset the offset to the latest offset. - none: throw exception to the consumer if no previous offset is found for the consumer's group.
Equivalent to Kafka auto.offset.reset
setting.
Type: object
Supported values: earliest
, latest
, none
.
Default value: earliest
pollTimeout
¶
The maximum time to block the receiving thread.
Type: object
Default value: 1s
maxCommitTimeout
¶
Sets the maximum timeout for commit retries.
Type: object
Default value: 10s
maxCommitRetries
¶
Configures the maximum number of retries for committing offsets.
Type: number
Default value: 3
properties
¶
Adds or overrides a specific Kafka client property, allowing customization for advanced configurations.
Type: object
keyDeserializer
¶
Type: object
valueDeserializer
¶
Type: object
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-connectors-api</artifactId>
<version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-connectors-api', version: '1.4.0'
Usage Examples¶
KafkaStreamSourceConfigurator.aConsumer()
.withGroupId("my-group")
.withTopicNames("topicA,topicB")
.withBootstrapServers("serverA:9092,serverB:9092")
.withStartingOffset(KafkaStartingOffset.LATEST)
.withPollTimeout(Duration.ofMillis(500))
.withMaxCommitRetries(5)
.withMaxCommitTimeout(Duration.ofSeconds(15))
.withSSL(KafkaStreamSslConfiguration.builder()
.withTruststore("/path/to/truststore.jks", "truststore-password")
.withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
.build())
.withExceptionHandler((kafkaConsumer, context, ex) -> {
System.err.println("Error while consuming data: " + ex.getMessage());
});
This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.