Table of Contents
- Active(SP) Stream Data Processing
- See Active(SP) in Action
- How VoltSP Stream Processing Works
- Developing VoltSP Pipelines
- Helm Configuration Options
- Running VoltSP Pipelines
- Sinks
- Sources
- Transactional Semantics
- Configuring Sources
- Beats Source
- Kafka
- Network
- Iterable Source
- File Source
- Stdin Source
- MQTT Source
- Custom Sources, Sinks, and Processors
- Command Line Interface
- VoltSP YAML Configuration Language
Sources
Sources are key components of a streaming pipeline responsible for ingesting data to be processed. They determine how data is introduced into the pipeline, supporting a wide range of data origins including files, Kafka topics, and standard input. Each source can be configured to meet specific requirements, such as data formats, connection details, and polling intervals.
The following are the sources currently available for VoltSP. (See Custom Sources, Sinks, and Processors for information on defining your own custom source.)
- Beats Source: Receives data from Elastic Beats agents, supporting multiple versions and handling batch windows.
- File Source: Reads data items from a specified file, treating each line as a separate item. This is ideal for streaming data stored in text files.
- Generator Source: Produces data items algorithmically at a specified rate, suitable for synthetic data generation or simulation purposes.
- Iterable Source: Generates a stream of data items from an iterable collection, useful for testing or scenarios involving pre-defined in-memory data.
- Kafka Source: Consumes data from Apache Kafka topics, enabling seamless integration with real-time data streams.
- Network Source: Receives byte data from a network via UDP or TCP protocols.
- Stdin Source: Reads data items from standard input, making it useful for interactive scenarios or debugging purposes.
- MQTT Source: Consumes data from an MQTT broker by subscribing to specified topics. Supports configurable Quality of Service (QoS) settings.
Each source type offers flexibility in how data is ingested, with a variety of configuration options to support different data origins and processing needs. The following sections provide detailed documentation for each source, including configuration examples, API details, and usage guidelines.
Transactional Semantics
An essential aspect of any source is its transactional semantics. For example, when using a Kafka source, VoltSP guarantees that no messages will be committed unless they have been processed and sent to a downstream sink. If the sink also supports such guarantees (as the Kafka and VoltDB sinks do), the entire pipeline ensures no data loss and achieves at-least-once processing semantics.
Configuring Sources
There are three approaches to configuring sources, as described in the following sections:
Java Code Configuration
Sources can be configured directly using Java code. For example, the following Java hard codes the address of the Kafka bootstrap server:
KafkaStreamSourceConfigurator.aConsumer()
.withBootstrapServers("statically.configured.address")
.withGroupId(UUID.randomUUID().toString())
.withTopicNames("some-topic-name")
.withKeyDeserializer(ByteBufferDeserializer.class)
.withValueDeserializer(ByteBufferDeserializer.class)
.withStartingOffset(KafkaStartingOffset.EARLIEST);
Environment Variable Interpolation
To avoid hardcoding parameters, configuration values can be provided externally and interpolated using environment variables. For example:
KafkaStreamSourceConfigurator.aConsumer()
.withBootstrapServers("${bootstrap.servers}");
Helm-Based Auto-Configuration
Most sources also support automatic configuration when running in Kubernetes. Providing Helm values under streaming.pipeline.configuration
that match the documented format will inject the provided values and reduce the amount of code needed for setting up sources.
Taking the Kafka source as an example, some parameters can be provided by Helm and therefore omitted in the code:
KafkaStreamSourceConfigurator.aConsumer()
// Omitting .withBootstrapServers(...) and others
.withKeyDeserializer(ByteBufferDeserializer.class)
.withValueDeserializer(ByteBufferDeserializer.class)
.withStartingOffset(KafkaStartingOffset.EARLIEST);
streaming:
pipeline:
configuration:
source:
kafka:
groupId: groupA
topicNames:
- topicA
- topicB
bootstrapServers:
- serverA
- serverB
This approach allows for a flexible combination of code-based, environment-based, and Helm-based configuration to suit different deployment scenarios and requirements. See the section on Helm Configuration Options for alternative ways to configure source properties in Helm.
Beats Source
Overview
The beats
source receives data from Elastic Beats agents. It supports configuration of connection properties and client timeouts.
Configuration Options
Helm Configuration (YAML)
source:
beats:
address: "0.0.0.0:514"
client_inactivity_timeout: "PT1S"
Java Configuration
BeatsSourceConfigurator.consume()
.withAddress("0.0.0.0:514")
.withConnectionIdleTimeout(Duration.ofSeconds(1))
.withExceptionHandler(handler);
API Details
Methods
-
withAddress(String host, int port)
Sets the listening address and port. -
withAddress(String address)
Sets the listening address in "host:port" format. -
withConnectionIdleTimeout(Duration timeout)
Sets the client inactivity timeout. -
withExceptionHandler(ExceptionHandler handler)
Configures custom error handling. -
withDecoder(Function<BeatsMessage, T> decoder)
Configures custom message decoding.
Kafka
Overview
The kafka
source consumes data from an Apache Kafka topic. It allows configuration of consumer properties, such as topic names, bootstrap servers, poll timeout, starting offsets, and more. This source is suitable for integrating real-time data from Kafka into your streaming pipeline.
Configuration Options
Helm Configuration (YAML)
source:
kafka:
groupId: "my-group"
topicNames:
- "topicA"
- "topicB"
bootstrapServers:
- "serverA:9092"
- "serverB:9092"
startingOffset: "EARLIEST"
pollTimeout: "PT0.25S"
maxCommitTimeout: "PT10S"
maxCommitRetries: 3
properties:
key1: value1
key2: value2
Java Configuration
KafkaStreamSourceConfigurator.aConsumer()
.withGroupId("my-group")
.withTopicNames("topicA,topicB")
.withBootstrapServers("serverA:9092,serverB:9092")
.withStartingOffset(KafkaStartingOffset.EARLIEST)
.withPollTimeout(Duration.ofMillis(250))
.withMaxCommitRetries(3)
.withMaxCommitTimeout(Duration.ofSeconds(10));
API Details
Methods
-
withGroupId(String groupId)
Sets the consumer group ID. -
withTopicNames(String topicNames)
Specifies the Kafka topics to consume from. -
withBootstrapServers(String bootstrapServers)
Configures the Kafka cluster's bootstrap servers. -
withStartingOffset(KafkaStartingOffset startingOffsets)
Sets the starting offset for the consumer (EARLIEST
,LATEST
,NONE
). -
withPollTimeout(Duration pollTimeout)
Defines the poll timeout duration for Kafka consumer polling. -
withMaxCommitRetries(int retries)
Configures the maximum number of retries for committing offsets. -
withMaxCommitTimeout(Duration timeout)
Sets the maximum timeout for commit retries. -
withProperty(String key, String value)
withProperty(String key, long value)
withProperty(String key, int value)
-
withProperty(String key, boolean value)
Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. -
withSSL(KafkaStreamSslConfiguration sslConfigurator)
Configures SSL settings for secure communication with Kafka. -
withKeyDeserializer(Class<? extends Deserializer<KEY>> deserializerClass)
Sets the deserializer for the Kafka message key. -
withValueDeserializer(Class<? extends Deserializer<VALUE>> deserializerClass)
-
withValueDeserializer(Class<? extends Deserializer<?>> deserializerClass, Class<VALUE> deserializedType)
Sets the deserializer for the Kafka message value. -
withSchemaRegistryUrl(String schemaUrl)
Configures the schema registry URL for Avro serialization. -
withExceptionHandler(KafkaSourceExceptionHandler exceptionHandler)
Sets a custom exception handler for handling Kafka consumer errors.
KafkaStreamSslConfiguration.Builder allows configuring SSL/TLS for secure communication
withTruststore()
-
withTruststore(String truststoreFile, String truststorePassword)
Configures the truststore for verifying server certificates. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
withKeystore()
-
withKeystore(String keyStoreFile, String keyStorePassword, String keyPassword)
Configures the keystore for client certificate authentication. When deployed to a Kubernetes cluster invoking the no-argument version of this method will retrieve the necessary configuration from the environment. -
withSSLProtocols(TLSProtocols protocol)
Sets the SSL protocol version. -
withSSLEnabledProtocols(TLSProtocols... protocols)
Configures multiple SSL protocols for enhanced security. -
build()
Creates KafkaStreamSslConfiguration
Features
-
Auto-configuration
Automatically detects and applies configurations fromkafka.source
in the Helm YAML. -
Customizable
Supports a wide range of configuration options, including SSL settings, poll timeout, and starting offsets. -
Error Handling
Allows custom exception handling usingKafkaSourceExceptionHandler
to manage consumer errors gracefully. -
Flexible Deserialization
Supports custom deserializers for both keys and values, making it adaptable to different Kafka data formats.
Usage Example
KafkaStreamSourceConfigurator.aConsumer()
.withGroupId("my-group")
.withTopicNames("topicA,topicB")
.withBootstrapServers("serverA:9092,serverB:9092")
.withStartingOffset(KafkaStartingOffset.LATEST)
.withPollTimeout(Duration.ofMillis(500))
.withMaxCommitRetries(5)
.withMaxCommitTimeout(Duration.ofSeconds(15))
.withSSL(KafkaStreamSslConfiguration.builder()
.withTruststore("/path/to/truststore.jks", "truststore-password")
.withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
.build())
.withExceptionHandler((kafkaConsumer, context, ex) -> {
System.err.println("Error while consuming data: " + ex.getMessage());
});
This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.
Network
Overview
The network
source receives byte data via UDP or TCP protocols.
Configuration Options
Helm Configuration (YAML)
source:
network:
type: udp
address: "0.0.0.0:34567"
socketOptions:
SO_RCVBUF: 65536
SO_TIMEOUT: 1000
Java Configuration
NetworkStreamSourceConfigurator<String>()
.withType(NetworkType.UDP)
.withAddress("0.0.0.0:34567")
.withDecoder(Decoders.toLinesDecoder())
.withSocketServerConfiguration(new NetworkSocketServerConfiguration()
.withOption(SocketOption.SO_RCVBUF, "65536"));
API Details
Methods
-
withType(NetworkType type)
Specifies the network protocol type (UDP
orTCP
). -
withAddress(String host, int port)
Sets the listening address and port. -
withAddress(String address)
Sets the listening address in "host:port" format. -
withDecoder(Decoder<T> decoder)
Sets custom decoder for received data. -
withSocketServerConfiguration(NetworkSocketServerConfiguration config)
Configures socket options. -
withExceptionHandler(ExceptionHandler exceptionHandler)
Adds a custom exception handler for source failures.
Features
-
Auto-configuration
Automatically detects and applies configurations fromnetwork.source
in the Helm YAML. -
Custom Error Handling
Provides an option to specify a custom exception handler to manage errors during data reception.
Usage Example
NetworkStreamSourceConfigurator()
.withType(NetworkType.TCP)
.withPort(8080)
.withExceptionHandler((context, exception) -> {
System.err.println("Error while receiving data: " + exception.getMessage());
});
This configuration allows the source to receive data from the network using the specified protocol and port, with support for handling reception errors.
Iterable Source
Overview
The iterable
source generates a stream of data items from an iterable collection. It is useful for testing or for scenarios where data is pre-defined and available in-memory.
Configuration Options
Java Configuration
IterableSource.iterate("item1", "item2", "item3");
File Source
Overview
The file
source reads data items from a specified file, where each line is treated as a new data item. It is useful for streaming data stored in text files.
It requires parallelism to be set to 1
as only one thread can read lines from the file.
Configuration Options
Java Configuration
FileSource.withNewlineDelimiter("/path/to/input.txt");
Stdin Source
Overview
The stdin
source reads data items from standard input (stdin), treating each line as a separate data item. It is useful for interactive scenarios or debugging where input is provided manually.
Configuration Options
Java Configuration
StdinSource source = new StdinSource();
MQTT Source
Overview
The mqtt
source connects to an MQTT broker to consume publish messages from specified topics. It supports configuration of the broker’s host and port, topic filter, shared subscription group name, and message Quality of Service (QoS) level. Consumed messages are represented by the MqttPublishMessage
record, which encapsulates MQTT-specific attributes such as the payload, retain flag, message expiry, payload format, and user properties. MQTT source uses shared subscription to distribute incoming messages between all the workers.
Configuration Options
Helm Configuration (YAML)
source:
mqtt:
host: "mqtt.example.com"
port: 1883
topicFilter: "sensors/#"
groupName: "group1"
qos: "AT_LEAST_ONCE"
Java Configuration
MqttSourceConfigurator.aSource()
.withHost("mqtt.example.com")
.withPort(1883)
.withTopicFilter("sensors/#")
.withGroupName("group1")
.withQos(MqttMessageQoS.AT_LEAST_ONCE);
API Details
Methods
-
withHost(String host) Sets the hostname of the MQTT broker.
-
withPort(int port) Configures the port on which the MQTT broker is listening (default:
1883
). -
withTopicFilter(String topicFilter) Specifies the MQTT topic filter to subscribe to, supporting wildcards for flexible subscriptions.
-
withGroupName(String groupName) Sets the shared subscription group name for managing shared subscriptions.
-
withQos(MqttMessageQoS qos) Configures the message Quality of Service level. Available options include:
AT_MOST_ONCE
AT_LEAST_ONCE
EXACTLY_ONCE
Features
-
Auto-configuration The MQTT source automatically applies configuration values defined under
source.mqtt
in Helm or environment-based configurations. -
MQTT-Specific Message Format Incoming messages are encapsulated in the MqttPublishMessage record, which provides rich metadata such as payload format, retain flag, and custom user properties.
Usage Example
MqttSourceConfigurator.aSource()
.withHost("mqtt.example.com")
.withPort(1883)
.withTopicFilter("devices/+/data")
.withGroupName("device-group")
.withQos(MqttMessageQoS.AT_LEAST_ONCE);
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
, subscribes to topics matching devices/+/data
, and processes messages with an at-least-once QoS level.