Compound procedures are defined as Java classes that extend the class VoltCompoundProcedure
. Within
the run
method of the class, you define stages, which will be executed
sequentially — each stage having access to the results of any transactional procedures invoked in the previous
stage:
public static class MyCompoundProc extends VoltCompoundProcedure { public long run() { newStageList(this::myFirstStage) .then(this::MySecondStage) .then(this::MyLastStage) .build();
Within each stage you can queue one or more transactional stored procedures, using the
queueProcedureCall()
method. Once the stage executes, the queued procedures are submitted to the
appropriate partitions for execution. All procedures are initiated asynchronously and in parallel. Once all of the queued
procedures complete, the next stage is executed, receiving the results from the stored procedures, in the order that the
queueProcedureCall()
methods were invoked, as an argument.
Any parameters to the compound procedure (in the case of topics, the fields of the topic record) are received as
arguments to the run
method. Then each stage receives an array of ClientResponse
objects, matching the number and order of the procedures queued in the previous stage. So if the first stage queues two
procedures, the second stage will receive a two element array with the first element being the client response from the
first procedure and second element being the client response of the second procedure.
Let's look at an example. Say an application processes messages from IoT devices via a topic. The system needs to look up the device by its ID to verify it exists and identify the account associated with it, then look up the account information, and finally record the activity to the account log and pass on the enhanced message information to a new topic. Figure 12.1, “Business Logic for Processing Device Messages” shows the business logic the application must support.
This process can easily be written as a compound procedure with three stages, plus a fourth to complete the procedure:
deviceLookup — Use the ID in the topic input to look up the device record
accountLookup — Based on the response from stage 1, look up the account
associated with the device. If there is no response (that is, no record for the device), stop the procedure with the
abortProcedure
method.
extendMessage — Use the account information to create an extended message record, which is then inserted into the appropriate topic stream and recorded into the account log.
finish — Complete the procedure, reporting either success or failure.
The following example shows Java code that implements the compound procedure described above. Note the use of class variables to save state across the stages (in this case, the original topic fields plus the device and account information necessary in the last stage).
package devices; import org.voltdb.VoltCompoundProcedure; import org.voltdb.VoltTable; import org.voltdb.VoltType; import org.voltdb.client.ClientResponse; public class ProcessMessage extends VoltCompoundProcedure { private long deviceID, accountID; private String deviceName, accountName, message; public long run(long id, String messagetext) { this.deviceID = id; this.message = messagetext; newStageList(this::deviceLookup) .then(this::accountLookup) .then(this::extendMessage) .then(this::finish) .build(); return 0; } // stage 1 private void deviceLookup(ClientResponse[] none) { queueProcedureCall("DEVICE.select", deviceID); } // stage 2 private void accountLookup(ClientResponse[] resp) { VoltTable deviceRec = resp[0].getResults()[0]; if (deviceRec == null) { // Verify device exists abortProcedure("No such device. " + deviceID); return; } deviceName = deviceRec.getString("name"); accountID = deviceRec.getLong("account"); queueProcedureCall("ACCOUNT.select", accountID); } // stage 3 private void extendMessage(ClientResponse[] resp) { VoltTable accountRec = resp[0].getResults()[0]; accountName = accountRec.getString("name"); // Queue two separate procedures queueProcedureCall("ACCOUNT_LOG.insert", accountID,deviceID,message); queueProcedureCall("ExtendedMessage.insert", deviceID, deviceName, accountID, accountName, message); } // stage 4 private void finish(ClientResponse[] resp) { Long results = 1L completeProcedure(results); } }
Some important points to note about this example:
The compound procedure receives any input parameters as arguments to the | |
You define the order for the stages in the | |
You can interrupt the compound procedure at any point by calling | |
You can queue more than one procedure call. However, they are not initiated until the current stage completes. Once the stage completes, all procedures are initiated at the same time (not sequentially). It is also important to note that the procedures are called asynchronously, to avoid blocking threads while waiting for a response. So there is no guarantee as to the order in which the transactions queued in a single stage are executed. The only way to ensure two procedures are processed sequentially is to queue them in separate stages. | |
Finally, you must explicitly complete the compound procedure, either by calling
|