Skip to content

VoltSP YAML Configuration Language

VoltSP provides a declarative YAML configuration language for defining streaming data pipelines without writing Java code. This document describes the structure and options available in the YAML configuration format.

Basic Structure

A VoltSP pipeline configuration requires the following main sections:

version: 1              # Required: Configuration version (must be 1)
name: "pipeline-name"   # Required: Pipeline name
source: { }            # Required: Source configuration
pipeline: { }          # Optional: Processing steps to apply
sink: { }             # Required: Sink configuration
logging: { }          # Optional: Logging configuration

Configuration Sections

Version

Must be 1. This field is required.

version: 1

Name

Pipeline name that will be visible in the logs as well as metrics. This field is required.

name: "my-pipeline"

Source Configuration

The source section defines where the pipeline gets its data. You must specify exactly one source type. Available source types include:

  • file: Read from a local file
  • stdin: Read from standard input
  • collection: Read from a static collection of elements defined inline
  • kafka: Read from a Kafka topic
  • network: Read from network (supports UDP and TCP)
  • beats: Receive data from a Filebeat agent.

Each source type has its own configuration parameters.

Pipeline Configuration

The pipeline section defines processing configuration and any data transformations. It includes:

  • parallelism: Optional value specifying pipeline parallelism
  • processors: Optional array of processor configurations

Sink Configuration

The sink section defines where the pipeline outputs its data. You must specify exactly one sink type. Available sink types include:

  • voltdb: Output to VoltDB
  • kafka: Output to Kafka
  • file: Output to a file
  • directory: Output to files in a directory (if parallelism is greater than one each thread will output data to a separate file)
  • stdout: Output to standard output of the VoltSP process
  • network: Output to network (supports UDP and TCP)
  • blackhole: Discard all output
  • elasticsearch: Output to Elasticsearch
  • syslog: Output to Syslog using TCP and RFC3164 message format.

Logging Configuration

Not yet implemented

The optional logging section configures logging behavior:

logging:
  globalLevel: "DEBUG"        # Global log level
  loggers:                    # Per-logger configuration
    "org.myapp": "TRACE"
    "org.thirdparty": "WARN"

Source Types

File Source

Reads data from a file:

source:
  file:
    path: "input.txt"       # Required: Path to input file
    delimiter: "\n"         # Optional: Record delimiter

Stdin Source

Reads data from standard input:

source:
  stdin: {}

Collection Source

Reads from a static collection of strings:

source:
  collection:
    elements:              # Required: Array of strings
      - "element1"
      - "element2"

Kafka Source

Reads from Kafka topics:

source:
  kafka:
    servers: "host1:9092,host2:9092"   # Required: Kafka bootstrap servers
    topic: "my-topic"                  # Required: Topic name
    consumer_group: "my-group"         # Required: Consumer group ID
    starting_offset: "LATEST"          # Required: Starting offset (LATEST/EARLIEST)

Network Source

Reads from network:

source:
  network:
    address: "12345"                   # Required: Port number or address:port
    type: "UDP"                        # Required: UDP or TCP
    decoder: "line"                    # Required: Decoder type (none/identity/line/bytes)

Beats Source

Reads from Elastic Beats:

source:
  beats:
    address: "0.0.0.0"                # Required: Listen address
    port: 5044                        # Required: Listen port
    idleTimeout: "PT30S"              # Optional: Connection idle timeout (ISO8601 duration)

Sink Types

VoltDB Sink

Outputs to VoltDB:

sink:
  voltdb:
    procedure: "MyStoredProc"         # Required: Stored procedure name
    host: "voltdb-host"              # Required: VoltDB host
    port: 21212                      # Required: VoltDB port
    retries: 3                       # Optional: Number of retries

Kafka Sink

Outputs to Kafka:

sink:
  kafka:
    servers: "host1:9092,host2:9092"  # Required: Kafka bootstrap servers
    topic: "output-topic"             # Required: Topic name

File Sink

Outputs to a single file:

sink:
  file:
    path: "output.txt"                # Required: Output file path

Directory Sink

Outputs to multiple files in a directory:

sink:
  directory:
    path: "/output/dir"               # Required: Output directory path

Stdout Sink

Outputs to standard output:

sink:
  stdout: {}

Network Sink

Outputs to network:

sink:
  network:
    type: "UDP"                       # Required: UDP or TCP
    address: "host:port"              # Required: Target address

Elasticsearch Sink

Outputs to Elasticsearch:

sink:
  elasticsearch:
    host: "es-host"                   # Required: Elasticsearch host
    indexName: "my-index"             # Required: Index name
    port: 9200                        # Required: Elasticsearch port
    username: "user"                  # Required: Username
    password: "pass"                  # Required: Password
    payloadSizeInBytes: 5242880      # Required: Maximum payload size
    requestParameters:                # Optional: Additional request parameters
      timeout: "30s"
    requestHeaders:                   # Optional: Additional request headers
      Content-Type: "application/json"

Syslog Sink

Outputs to Syslog:

sink:
  syslog:
    host: "syslog-host"              # Required: Syslog host
    port: 514                        # Required: Syslog port
    facility: "USER"                 # Optional: Syslog facility
    severity: "NOTICE"               # Optional: Syslog severity
    hostname: "my-host"              # Optional: Source hostname
    tag: "my-app"                    # Optional: Message tag

Processor Types

Processors can be written in multiple languages and are defined in the pipeline's processors array. Each processor must specify its language and code:

pipeline:
  processors:
    - javascript:
        code: "message.toUpperCase()"
    - python:
        code: |
          import re
          def process(message):
              return message.lower()
          process(message)
    - ruby:
        code: |
          message.reverse

Complete Examples

Simple File Processing Pipeline

version: 1
name: "file-processor"

source:
  file:
    path: "input.txt"

pipeline:
  parallelism: 1
  processors:
    - javascript:
        code: |
          message.toUpperCase();

sink:
  file:
    path: "output.txt"

Kafka to VoltDB Pipeline

version: 1
name: "kafka-to-voltdb"

source:
  kafka:
    servers: "kafka1:9092,kafka2:9092"
    topic: "incoming-data"
    consumer_group: "processor-group"
    starting_offset: "LATEST"

pipeline:
  parallelism: 4
  processors:
    - javascript:
        code: |
          // Transform message
          JSON.parse(message)

sink:
  voltdb:
    host: "voltdb-host"
    port: 21212
    procedure: "ProcessData"

logging:
  globalLevel: "INFO"
  loggers:
    org.voltdb: "DEBUG"

Network to Network Pipeline

version: 1
name: "network-relay"

source:
  network:
    type: "UDP"
    address: "12345"
    decoder: "line"

pipeline:
  parallelism: 1

sink:
  network:
    type: "UDP"
    address: "target-host:54321"