Sources¶
Sources are key components of a streaming pipeline responsible for ingesting data to be processed. They determine how data is introduced into the pipeline, supporting a wide range of data origins including files, Kafka topics, and standard input. Each source can be configured to meet specific requirements, such as data formats, connection details, and polling intervals.
The following are the sources currently available for VoltSP. (See Custom Sources, Sinks, and Processors for information on defining your own custom source.)
- Beats Source: Receives data from Elastic Beats agents, supporting multiple versions and handling batch windows.
- File Source: Reads data items from a specified file, treating each line as a separate item. This is ideal for streaming data stored in text files.
- Generator Source: Produces data items algorithmically at a specified rate, suitable for synthetic data generation or simulation purposes.
- Iterable Source: Generates a stream of data items from an iterable collection, useful for testing or scenarios involving pre-defined in-memory data.
- Kafka Source: Consumes data from Apache Kafka topics, enabling seamless integration with real-time data streams.
- Network Source: Receives byte data from a network via UDP or TCP protocols.
- Stdin Source: Reads data items from standard input, making it useful for interactive scenarios or debugging purposes.
- MQTT Source: Consumes data from an MQTT broker by subscribing to specified topics. Supports configurable Quality of Service (QoS) settings.
Each source type offers flexibility in how data is ingested, with a variety of configuration options to support different data origins and processing needs. The following sections provide detailed documentation for each source, including configuration examples, API details, and usage guidelines.
Transactional Semantics¶
An essential aspect of any source is its transactional semantics. For example, when using a Kafka source, VoltSP guarantees that no messages will be committed unless they have been processed and sent to a downstream sink. If the sink also supports such guarantees (as the Kafka and VoltDB sinks do), the entire pipeline ensures no data loss and achieves at-least-once processing semantics.
Configuring Sources¶
There are three approaches to configuring sources, as described in the following sections:
Java Code Configuration¶
Sources can be configured directly using Java code. For example, the following Java hard codes the address of the Kafka bootstrap server:
KafkaStreamSourceConfigurator.aConsumer()
.withBootstrapServers("statically.configured.address")
.withGroupId(UUID.randomUUID().toString())
.withTopicNames("some-topic-name")
.withKeyDeserializer(ByteBufferDeserializer.class)
.withValueDeserializer(ByteBufferDeserializer.class)
.withStartingOffset(KafkaStartingOffset.EARLIEST);
Environment Variable Interpolation¶
To avoid hardcoding parameters, configuration values can be provided externally and interpolated using environment variables. For example:
KafkaStreamSourceConfigurator.aConsumer()
.withBootstrapServers("${bootstrap.servers}");
Helm-Based Auto-Configuration¶
Most sources also support automatic configuration when running in Kubernetes. Providing Helm values under streaming.pipeline.configuration
that match the documented format will inject the provided values and reduce the amount of code needed for setting up sources.
Taking the Kafka source as an example, some parameters can be provided by Helm and therefore omitted in the code:
KafkaStreamSourceConfigurator.aConsumer()
// Omitting .withBootstrapServers(...) and others
.withKeyDeserializer(ByteBufferDeserializer.class)
.withValueDeserializer(ByteBufferDeserializer.class)
.withStartingOffset(KafkaStartingOffset.EARLIEST);
streaming:
pipeline:
configuration:
source:
kafka:
groupId: groupA
topicNames:
- topicA
- topicB
bootstrapServers:
- serverA
- serverB
This approach allows for a flexible combination of code-based, environment-based, and Helm-based configuration to suit different deployment scenarios and requirements. See the section on Helm Configuration Options for alternative ways to configure source properties in Helm.
Security¶
Connector | Credentials | TLS | mTLS | OAuth2 * |
---|---|---|---|---|
VoltDB | ✓ | ✓ | ||
Kafka | ✓ | ✓ | ||
MQTT | ✓ | ✓ | ✓ | ✓ |
*
VoltSP supports the OAuth 2.0 Client Credentials grant type with JWT (JSON Web Token) access tokens.
Example of Kafka mTLS configuration:
KafkaStreamSslConfiguration sslConfiguration = KafkaStreamSslConfiguration.builder()
.withTruststore("/temp/truststore.jks", "pass1")
.withKeystore("/temp/keystore.jks", "pass2", "keypass3")
.build();
KafkaStreamSourceConfigurator.aConsumer()
.withGroupId("my-group")
.withTopicNames("topicA,topicB")
.withBootstrapServers("serverA:9092,serverB:9092")
.withSSL(sslConfiguration)
Example of MQTT OAuth 2.0 configuration:
Java:
OAuthConfigurator oauth =
new OAuthConfigurator("http://localhost:8080/token", "test_client", "test_secret")
.withScopes("test_client_access");
MqttSourceConfigurator.aSource()
.withAddress("mqtt.example.com:1883")
.withTopicFilter("sensors/#")
.withGroupName("group1")
.withQos(MqttMessageQoS.AT_LEAST_ONCE)
.withOAuth(oauth));
Config file:
streaming:
pipeline:
configuration:
source:
mqtt:
address: mqtt.example.com:1883
topicFilter: sensors/#
groupName: group1
qos: AT_LEAST_ONCE
oauth:
token-url: http://localhost:8080/token
client-id: test_client
client-secret: test_secret
scopes: test_client_access