Skip to content

Kafka

The kafka sink sends data to an Apache Kafka topic. It supports a variety of configurations such as specifying the topic name, bootstrap servers, key and value serializers, SSL configuration, and more. This sink is ideal for integrating with Kafka for real-time data streaming.

KafkaStreamSinkConfigurator.accepting(String.class)
     .withTopicName("my-topic")
     .withBootstrapServers("kafka.example.com:9092")
     .withSchemaRegistry("http://registry.example.com")
     .withProperty("key1", "value1")
     .withHeaders(string -> List.of(new RecordHeader("encoding", ...))
     .withSSL(
         KafkaStreamSslConfiguration.builder()
             .withTruststore("/path/to/truststore.jks", "truststore-password")
             .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
             .build()
     );
sink:
    kafka:
        topicName: "my-topic"
        bootstrapServers: "kafka.example.com:9092"
        schemaRegistry: "http://registry.example.com"
        properties:
          key1: value1

Properties

bootstrapServers

A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Type: string

topicName

Sets the Kafka topic name where data will be sent. Type: string

properties

Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. Type: object

ssl

SSL configuration. Type: object

Fields of ssl:

ssl.enable

Type: boolean

ssl.protocols

Type: object

Supported values: tlsv1, tlsv1_1, tlsv1_2, tlsv1_3.

ssl.identificationAlgorithm

Type: string

ssl.truststoreFile

Type: string

ssl.truststorePassword

Type: string

ssl.truststoreType

Type: string

ssl.keystoreFile

Type: string

ssl.keystoreType

Type: string

valueSerializer

Type: string

keySerializer

Type: string

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-connectors-api</artifactId>
    <version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-connectors-api', version: '1.4.0'

Usage Examples

KafkaStreamSinkConfigurator.accepting(String.class)
   .withTopicName("my-topic")
   .withBootstrapServers("kafka.example.com:9092")
   .withSchemaRegistry("http://registry.example.com")
   .withProperty("key1", "value1")
   .withHeaders(string -> List.of(new RecordHeader("encoding", ...))
   .withSSL(
       KafkaStreamSslConfiguration.builder()
           .withTruststore("/path/to/truststore.jks", "truststore-password")
           .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
           .build()
       );

This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication.