VoltSP YAML Configuration Language¶
VoltSP provides a declarative YAML configuration language for defining streaming data pipelines without writing Java code. This document describes the structure and options available in the YAML configuration format.
Basic Structure¶
A VoltSP pipeline configuration requires the following main sections:
version: 1 # Required: Configuration version (must be 1)
name: "pipeline-name" # Required: Pipeline name
source: { } # Required: Source configuration
pipeline: { } # Optional: Processing steps to apply
sink: { } # Required: Sink configuration
logging: { } # Optional: Logging configuration
Configuration Sections¶
Version¶
Must be 1
. This field is required.
version: 1
Name¶
Pipeline name that will be visible in the logs as well as metrics. This field is required.
name: "my-pipeline"
Source Configuration¶
The source
section defines where the pipeline gets its data. You must specify exactly one source type. Available source types include:
file
: Read from a local filestdin
: Read from standard inputcollection
: Read from a static collection of elements defined inlinekafka
: Read from a Kafka topicnetwork
: Read from network (supports UDP and TCP)beats
: Receive data from a Filebeat agent.
Each source type has its own configuration parameters.
Pipeline Configuration¶
The pipeline
section defines processing configuration and any data transformations. It includes:
parallelism
: Optional value specifying pipeline parallelismprocessors
: Optional array of processor configurations
Sink Configuration¶
The sink
section defines where the pipeline outputs its data. You must specify exactly one sink type. Available sink types include:
voltdb
: Output to VoltDBkafka
: Output to Kafkafile
: Output to a filedirectory
: Output to files in a directory (if parallelism is greater than one each thread will output data to a separate file)stdout
: Output to standard output of the VoltSP processnetwork
: Output to network (supports UDP and TCP)blackhole
: Discard all outputelasticsearch
: Output to Elasticsearchsyslog
: Output to Syslog using TCP and RFC3164 message format.
Logging Configuration¶
Not yet implemented
The optional logging
section configures logging behavior:
logging:
globalLevel: "DEBUG" # Global log level
loggers: # Per-logger configuration
"org.myapp": "TRACE"
"org.thirdparty": "WARN"
Source Types¶
File Source¶
Reads data from a file:
source:
file:
path: "input.txt" # Required: Path to input file
delimiter: "\n" # Optional: Record delimiter
Stdin Source¶
Reads data from standard input:
source:
stdin: {}
Collection Source¶
Reads from a static collection of strings:
source:
collection:
elements: # Required: Array of strings
- "element1"
- "element2"
Kafka Source¶
Reads from Kafka topics:
source:
kafka:
servers: "host1:9092,host2:9092" # Required: Kafka bootstrap servers
topic: "my-topic" # Required: Topic name
consumer_group: "my-group" # Required: Consumer group ID
starting_offset: "LATEST" # Required: Starting offset (LATEST/EARLIEST)
Network Source¶
Reads from network:
source:
network:
address: "12345" # Required: Port number or address:port
type: "UDP" # Required: UDP or TCP
decoder: "line" # Required: Decoder type (none/identity/line/bytes)
Beats Source¶
Reads from Elastic Beats:
source:
beats:
address: "0.0.0.0" # Required: Listen address
port: 5044 # Required: Listen port
idleTimeout: "PT30S" # Optional: Connection idle timeout (ISO8601 duration)
Sink Types¶
VoltDB Sink¶
Outputs to VoltDB:
sink:
voltdb:
procedure: "MyStoredProc" # Required: Stored procedure name
host: "voltdb-host" # Required: VoltDB host
port: 21212 # Required: VoltDB port
retries: 3 # Optional: Number of retries
Kafka Sink¶
Outputs to Kafka:
sink:
kafka:
servers: "host1:9092,host2:9092" # Required: Kafka bootstrap servers
topic: "output-topic" # Required: Topic name
File Sink¶
Outputs to a single file:
sink:
file:
path: "output.txt" # Required: Output file path
Directory Sink¶
Outputs to multiple files in a directory:
sink:
directory:
path: "/output/dir" # Required: Output directory path
Stdout Sink¶
Outputs to standard output:
sink:
stdout: {}
Network Sink¶
Outputs to network:
sink:
network:
type: "UDP" # Required: UDP or TCP
address: "host:port" # Required: Target address
Elasticsearch Sink¶
Outputs to Elasticsearch:
sink:
elasticsearch:
host: "es-host" # Required: Elasticsearch host
indexName: "my-index" # Required: Index name
port: 9200 # Required: Elasticsearch port
username: "user" # Required: Username
password: "pass" # Required: Password
payloadSizeInBytes: 5242880 # Required: Maximum payload size
requestParameters: # Optional: Additional request parameters
timeout: "30s"
requestHeaders: # Optional: Additional request headers
Content-Type: "application/json"
Syslog Sink¶
Outputs to Syslog:
sink:
syslog:
host: "syslog-host" # Required: Syslog host
port: 514 # Required: Syslog port
facility: "USER" # Optional: Syslog facility
severity: "NOTICE" # Optional: Syslog severity
hostname: "my-host" # Optional: Source hostname
tag: "my-app" # Optional: Message tag
Processor Types¶
Processors can be written in multiple languages and are defined in the pipeline's processors
array. Each processor must specify its language and code:
pipeline:
processors:
- javascript:
code: "message.toUpperCase()"
- python:
code: |
import re
def process(message):
return message.lower()
process(message)
- ruby:
code: |
message.reverse
Complete Examples¶
Simple File Processing Pipeline¶
version: 1
name: "file-processor"
source:
file:
path: "input.txt"
pipeline:
parallelism: 1
processors:
- javascript:
code: |
message.toUpperCase();
sink:
file:
path: "output.txt"
Kafka to VoltDB Pipeline¶
version: 1
name: "kafka-to-voltdb"
source:
kafka:
servers: "kafka1:9092,kafka2:9092"
topic: "incoming-data"
consumer_group: "processor-group"
starting_offset: "LATEST"
pipeline:
parallelism: 4
processors:
- javascript:
code: |
// Transform message
JSON.parse(message)
sink:
voltdb:
host: "voltdb-host"
port: 21212
procedure: "ProcessData"
logging:
globalLevel: "INFO"
loggers:
org.voltdb: "DEBUG"
Network to Network Pipeline¶
version: 1
name: "network-relay"
source:
network:
type: "UDP"
address: "12345"
decoder: "line"
pipeline:
parallelism: 1
sink:
network:
type: "UDP"
address: "target-host:54321"