Voltdb-bulk¶
The voltdb-bulk
operator differs from a typical sink in that it enables two-way communication with VoltDB.
It sends requests to a VoltDB procedure and forwards the responses to be processed by subsequent components of the stream.
The voltdb-bulk
operator processes requests in batches, enabling a high throughput procedure calls with at least once delivery guarantees.
Partitioning¶
Requests that hash to the same partition are batched and send directly to site responsible for that partition. The VoltDB site thread reads the requests one by one and executes the procedure invocation.
After processing, responses (or errors) are serialized and inserted into the VoltSP stream. The worker thread picks them up as regular events. Even if a VoltSP worker is unavailable, the event flow is preserved, and batches continue regardless of procedure execution delays.
VoltSP can finalize processing current batch and start consuming new batch of data, while VoltDB is still executing the procedure - these phases are detached. Even if called procedure is slow, VoltSP processing is not disturbed.
Handling duplicates¶
In case of failure while sending data to VoltDB, the batch may be retried, which can cause duplicates.
Requests are saved with a batchId
and a unique index
to ensure each row’s uniqueness. The target procedure
must deduplicate incoming requests if the business logic relies on lack of duplicates. It is similar to the situation when an application
sends same request twice via a VoltDB client.
Reading responses¶
For each procedure (e.g. MyProcedure.java
), VoltSP automatically creates an associated VoltDB stream
named myprocedure_sq
. Since VoltDB streams require a topic defined in deployment.xml
, you must add it manually.
For example:
<?xml version="1.0"?>
<deployment>
...
<topics enabled="true">
<topic name="myprocedure_topic" />
</topics>
</deployment>
Once the topic is configured, procedure responses are fetched, decoded, and made available in the child stream.
Handling avro encoding¶
VoltDB procedures can accept and return Avro payloads. The first parameter must be the partition key; subsequent parameters may be Avro GenericRecord instances or POJOs.
To enable Avro serialization, both VoltSP and VoltDB must be able to retrieve schemas from a remote schema registry.
.consumeFromSource(...)
.terminateWithSink(BulkProcedureVoltSinkConfigBuilder.builder()
.withProcedureName(value)
.withServers(value)
.withBatchSize(value)
.withFlushInterval(value)
.withClient(builder -> builder
.withRetry(builder -> builder
.withRetries(value)
.withBackoffDelay(value)
.withMaxBackoffDelay(value)
)
.withMaxTransactionsPerSecond(value)
.withMaxOutstandingTransactions(value)
.withRequestTimeout(value)
.withAuthUser(value)
.withAuthPassword(value)
.withTrustStoreFile(value)
.withTrustStorePassword(value)
)
.withExceptionHandler(value)
)
sink:
voltdb-bulk:
procedureName: value
servers: value
batchSize: value
flushInterval: value
client:
retry:
retries: value
backoffDelay: value
maxBackoffDelay: value
maxTransactionsPerSecond: value
maxOutstandingTransactions: value
requestTimeout: value
authUser: value
authPassword: value
trustStoreFile: value
trustStorePassword: value
exceptionHandler: value
Properties¶
procedureName
¶
The name of the stored procedure in VoltDB to be invoked for bulk operations. Required.
Type: string
servers
¶
A set of host and port addresses for connecting to the VoltDB cluster. Only one address is sufficient for cluster topology discovery. Required.
Type: array
batchSize
¶
The maximum number of records to include in a single batch before inserting data into VoltDB. Higher values can improve throughput but will increase memory usage.
Type: number
Default value: 100000
flushInterval
¶
The time interval after which batch is flushed to VoltDB, even if the desired batch size is not reached.
Type: object
Default value: 1s
client
¶
Configuration settings for the VoltDB client, including authentication, retry policies, and performance limits.
Type: object
Fields of client
:
client.retry
¶
Configuration for retrying failed operations, including the number of retries and backoff delays.
Type: object
Fields of client.retry
:
client.retry.retries
¶
Number of retry attempts after a request failure.
Type: number
Default value: 3
client.retry.backoffDelay
¶
Initial delay before the first retry attempt.
Type: object
Default value: PT0.2S
client.retry.maxBackoffDelay
¶
Maximum delay between consecutive retry attempts.
Type: object
Default value: PT3S
client.maxTransactionsPerSecond
¶
The maximum number of transactions allowed per second to control the rate of operations.
Type: number
client.maxOutstandingTransactions
¶
The maximum number of outstanding transactions allowed, limiting concurrent operations.
Type: number
client.requestTimeout
¶
The timeout duration for client requests to VoltDB, after which a request is considered failed.
Type: object
client.authUser
¶
The username for authenticating with the VoltDB cluster.
Type: string
client.authPassword
¶
The password for authenticating with the VoltDB cluster.
Type: string
client.trustStoreFile
¶
The path to the trust store file used for secure connections to VoltDB.
Type: string
client.trustStorePassword
¶
The trust store password.
Type: string
exceptionHandler
¶
A custom exception handler to process errors that occur during the execution of bulk operations.
Type: object
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-volt-api</artifactId>
<version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-volt-api', version: '1.4.0'
Usage Examples¶
BulkProcedureVoltSinkConfigBuilder.builder()
.addToServers("localhost", 12122)
.withProcedureName("runMe")
.withBatchSize(100)
.withFlushInterval(10, TimeUnit.SECONDS)
.withClientBuilder(builder -> {
builder.withMaxOutstandingTransactions(42000);
builder.withMaxTransactionsPerSecond(23);
builder.withRequestTimeout(Duration.ofSeconds(5));
builder.withAuthUser("admin");
builder.withAuthPassword("admin123");
builder.withTrustStoreFile("c:/Users32/trust.me");
builder.withTrustStorePassword("got2have");
builder.withRetryBuilder(retryBuilder -> {
retryBuilder.withRetries(4);
retryBuilder.withBackoffDelay(Duration.ofSeconds(2));
retryBuilder.withMaxBackoffDelay(Duration.ofSeconds(11));
});
})
.withExceptionHandler(exceptionHandler)
sink:
voltdb-bulk:
servers: localhost:12122
procedureName: runMe
batchSize: 100
flushInterval: 10s
client:
maxOutstandingTransactions: 42000
maxTransactionsPerSecond: 23
requestTimeout: PT5S
authUser: admin
authPassword: admin123
trustStoreFile: c:/Users32/trust.me
trustStorePassword: got2have
retry:
retries: 4
backoffDelay: 2s
maxBackoffDelay: 11s