Skip to content

Sources

Sources are key components of a streaming pipeline responsible for ingesting data to be processed. They determine how data is introduced into the pipeline, supporting a wide range of data origins including files, Kafka topics, and standard input. Each source can be configured to meet specific requirements, such as data formats, connection details, and polling intervals.

The following are the sources currently available for VoltSP. (See Custom Sources, Sinks, and Processors for information on defining your own custom source.)

  • Beats Source: Receives data from Elastic Beats agents, supporting multiple versions and handling batch windows.
  • File Source: Reads data items from a specified file, treating each line as a separate item. This is ideal for streaming data stored in text files.
  • Generator Source: Produces data items algorithmically at a specified rate, suitable for synthetic data generation or simulation purposes.
  • Iterable Source: Generates a stream of data items from an iterable collection, useful for testing or scenarios involving pre-defined in-memory data.
  • Kafka Source: Consumes data from Apache Kafka topics, enabling seamless integration with real-time data streams.
  • Network Source: Receives byte data from a network via UDP or TCP protocols.
  • Stdin Source: Reads data items from standard input, making it useful for interactive scenarios or debugging purposes.
  • MQTT Source: Consumes data from an MQTT broker by subscribing to specified topics. Supports configurable Quality of Service (QoS) settings.

Each source type offers flexibility in how data is ingested, with a variety of configuration options to support different data origins and processing needs. The following sections provide detailed documentation for each source, including configuration examples, API details, and usage guidelines.

Transactional Semantics

An essential aspect of any source is its transactional semantics. For example, when using a Kafka source, VoltSP guarantees that no messages will be committed unless they have been processed and sent to a downstream sink. If the sink also supports such guarantees (as the Kafka and VoltDB sinks do), the entire pipeline ensures no data loss and achieves at-least-once processing semantics.

Configuring Sources

There are three approaches to configuring sources, as described in the following sections:

Java Code Configuration

Sources can be configured directly using Java code. For example, the following Java hard codes the address of the Kafka bootstrap server:

KafkaStreamSourceConfigurator.aConsumer()
    .withBootstrapServers("statically.configured.address")
    .withGroupId(UUID.randomUUID().toString())
    .withTopicNames("some-topic-name")
    .withKeyDeserializer(ByteBufferDeserializer.class)
    .withValueDeserializer(ByteBufferDeserializer.class)
    .withStartingOffset(KafkaStartingOffset.EARLIEST);

Environment Variable Interpolation

To avoid hardcoding parameters, configuration values can be provided externally and interpolated using environment variables. For example:

KafkaStreamSourceConfigurator.aConsumer()
    .withBootstrapServers("${bootstrap.servers}");

Helm-Based Auto-Configuration

Most sources also support automatic configuration when running in Kubernetes. Providing Helm values under streaming.pipeline.configuration that match the documented format will inject the provided values and reduce the amount of code needed for setting up sources.

Taking the Kafka source as an example, some parameters can be provided by Helm and therefore omitted in the code:

KafkaStreamSourceConfigurator.aConsumer()
    // Omitting .withBootstrapServers(...) and others
    .withKeyDeserializer(ByteBufferDeserializer.class)
    .withValueDeserializer(ByteBufferDeserializer.class)
    .withStartingOffset(KafkaStartingOffset.EARLIEST);
streaming:
  pipeline:
    configuration:
      source:
        kafka:
          groupId: groupA
          topicNames:
            - topicA
            - topicB
          bootstrapServers:
            - serverA
            - serverB

This approach allows for a flexible combination of code-based, environment-based, and Helm-based configuration to suit different deployment scenarios and requirements. See the section on Helm Configuration Options for alternative ways to configure source properties in Helm.

Security

Connector Credentials TLS mTLS OAuth2 *
VoltDB
Kafka
MQTT

* VoltSP supports the OAuth 2.0 Client Credentials grant type with JWT (JSON Web Token) access tokens.

Example of Kafka mTLS configuration:

    KafkaStreamSslConfiguration sslConfiguration = KafkaStreamSslConfiguration.builder()
        .withTruststore("/temp/truststore.jks", "pass1")
        .withKeystore("/temp/keystore.jks", "pass2", "keypass3")
        .build();

    KafkaStreamSourceConfigurator.aConsumer()
        .withGroupId("my-group")
        .withTopicNames("topicA,topicB")
        .withBootstrapServers("serverA:9092,serverB:9092")
        .withSSL(sslConfiguration)

Example of MQTT OAuth 2.0 configuration:

Java:

    OAuthConfigurator oauth = 
        new OAuthConfigurator("http://localhost:8080/token", "test_client", "test_secret")
            .withScopes("test_client_access");

    MqttSourceConfigurator.aSource()
        .withAddress("mqtt.example.com:1883")
        .withTopicFilter("sensors/#")
        .withGroupName("group1")
        .withQos(MqttMessageQoS.AT_LEAST_ONCE)
        .withOAuth(oauth));

Config file:

streaming:
  pipeline:
    configuration:
      source:
        mqtt:
          address: mqtt.example.com:1883
          topicFilter: sensors/#
          groupName: group1
          qos: AT_LEAST_ONCE
          oauth:
            token-url: http://localhost:8080/token
            client-id: test_client
            client-secret: test_secret
            scopes: test_client_access