Skip to content

Voltdb-bulk

The voltdb-bulk operator differs from a typical sink in that it enables two-way communication with VoltDB. It sends requests to a VoltDB procedure and forwards the responses to be processed by subsequent components of the stream.

The voltdb-bulk operator processes requests in batches, enabling a high throughput procedure calls with at least once delivery guarantees.

Partitioning

Requests that hash to the same partition are batched and send directly to site responsible for that partition. The VoltDB site thread reads the requests one by one and executes the procedure invocation.

After processing, responses (or errors) are serialized and inserted into the VoltSP stream. The worker thread picks them up as regular events. Even if a VoltSP worker is unavailable, the event flow is preserved, and batches continue regardless of procedure execution delays.

VoltSP can finalize processing current batch and start consuming new batch of data, while VoltDB is still executing the procedure - these phases are detached. Even if called procedure is slow, VoltSP processing is not disturbed.

Handling duplicates

In case of failure while sending data to VoltDB, the batch may be retried, which can cause duplicates. Requests are saved with a batchId and a unique index to ensure each row’s uniqueness. The target procedure must deduplicate incoming requests if the business logic relies on lack of duplicates. It is similar to the situation when an application sends same request twice via a VoltDB client.

Reading responses

For each procedure (e.g. MyProcedure.java), VoltSP automatically creates an associated VoltDB stream named myprocedure_sq. Since VoltDB streams require a topic defined in deployment.xml, you must add it manually. For example:

<?xml version="1.0"?>
<deployment>
    ...
    <topics enabled="true">
        <topic name="myprocedure_topic" />
    </topics>
</deployment>

Once the topic is configured, procedure responses are fetched, decoded, and made available in the child stream.

Handling avro encoding

VoltDB procedures can accept and return Avro payloads. The first parameter must be the partition key; subsequent parameters may be Avro GenericRecord instances or POJOs.

To enable Avro serialization, both VoltSP and VoltDB must be able to retrieve schemas from a remote schema registry.

.consumeFromSource(...)
.terminateWithSink(BulkProcedureVoltSinkConfigBuilder.builder()
    .withProcedureName(value)
    .withServers(value)
    .withBatchSize(value)
    .withFlushInterval(value)
    .withClient(builder -> builder
        .withRetry(builder -> builder
            .withRetries(value)
            .withBackoffDelay(value)
            .withMaxBackoffDelay(value)
        )
        .withMaxTransactionsPerSecond(value)
        .withMaxOutstandingTransactions(value)
        .withRequestTimeout(value)
        .withAuthUser(value)
        .withAuthPassword(value)
        .withTrustStoreFile(value)
        .withTrustStorePassword(value)
    )
    .withExceptionHandler(value)
)
sink:
  voltdb-bulk:
    procedureName: value
    servers: value
    batchSize: value
    flushInterval: value
    client:
      retry:
        retries: value
        backoffDelay: value
        maxBackoffDelay: value
      maxTransactionsPerSecond: value
      maxOutstandingTransactions: value
      requestTimeout: value
      authUser: value
      authPassword: value
      trustStoreFile: value
      trustStorePassword: value
    exceptionHandler: value

Properties

procedureName

The name of the stored procedure in VoltDB to be invoked for bulk operations. Required.

Type: string

servers

A set of host and port addresses for connecting to the VoltDB cluster. Only one address is sufficient for cluster topology discovery. Required.

Type: array

batchSize

The maximum number of records to include in a single batch before inserting data into VoltDB. Higher values can improve throughput but will increase memory usage. Type: number

Default value: 100000

flushInterval

The time interval after which batch is flushed to VoltDB, even if the desired batch size is not reached. Type: object

Default value: 1s

client

Configuration settings for the VoltDB client, including authentication, retry policies, and performance limits. Type: object

Fields of client:

client.retry

Configuration for retrying failed operations, including the number of retries and backoff delays. Type: object

Fields of client.retry:

client.retry.retries

Number of retry attempts after a request failure. Type: number

Default value: 3

client.retry.backoffDelay

Initial delay before the first retry attempt. Type: object

Default value: PT0.2S

client.retry.maxBackoffDelay

Maximum delay between consecutive retry attempts. Type: object

Default value: PT3S

client.maxTransactionsPerSecond

The maximum number of transactions allowed per second to control the rate of operations. Type: number

client.maxOutstandingTransactions

The maximum number of outstanding transactions allowed, limiting concurrent operations. Type: number

client.requestTimeout

The timeout duration for client requests to VoltDB, after which a request is considered failed. Type: object

client.authUser

The username for authenticating with the VoltDB cluster. Type: string

client.authPassword

The password for authenticating with the VoltDB cluster. Type: string

client.trustStoreFile

The path to the trust store file used for secure connections to VoltDB. Type: string

client.trustStorePassword

The trust store password. Type: string

exceptionHandler

A custom exception handler to process errors that occur during the execution of bulk operations. Type: object

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-volt-api</artifactId>
    <version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.4.0'

Usage Examples

BulkProcedureVoltSinkConfigBuilder.builder()
    .addToServers("localhost", 12122)
    .withProcedureName("runMe")
    .withBatchSize(100)
    .withFlushInterval(10, TimeUnit.SECONDS)
    .withClientBuilder(builder -> {
        builder.withMaxOutstandingTransactions(42000);
        builder.withMaxTransactionsPerSecond(23);
        builder.withRequestTimeout(Duration.ofSeconds(5));
        builder.withAuthUser("admin");
        builder.withAuthPassword("admin123");
        builder.withTrustStoreFile("c:/Users32/trust.me");
        builder.withTrustStorePassword("got2have");

        builder.withRetryBuilder(retryBuilder -> {
            retryBuilder.withRetries(4);
            retryBuilder.withBackoffDelay(Duration.ofSeconds(2));
            retryBuilder.withMaxBackoffDelay(Duration.ofSeconds(11));
        });
    })
    .withExceptionHandler(exceptionHandler)
sink:
    voltdb-bulk:
      servers: localhost:12122
      procedureName: runMe
      batchSize: 100
      flushInterval: 10s
      client:
        maxOutstandingTransactions: 42000
        maxTransactionsPerSecond: 23
        requestTimeout: PT5S
        authUser: admin
        authPassword: admin123
        trustStoreFile: c:/Users32/trust.me
        trustStorePassword: got2have
        retry:
            retries: 4
            backoffDelay: 2s
            maxBackoffDelay: 11s