Skip to content

Mqtt

The mqtt sink connects to an MQTT broker to publish messages to specified topic. It supports configuration of basic attributes like the broker’s host and port and topic filter. Additionally it supports basic authentication, TLS as well as OAuth.

MqttSinkConfigBuilder.builder()
     .withAddress("mqtt.example.com", 1883)
     .withIdentifier("Tests")
     .withWebsocketBuilder(websocket -> {
             websocket.withSubpath("/topic");
     })
     .withAuthBuilder(auth -> {
         auth.withUsername("admin");
         auth.withPassword("admin");
     })
sink:
   mqtt:
     address: "mqtt.example.com:1883"
     identifier: "Tests"
     websocket:
         subpath: /topic

Properties

identifier

Type: string

address

Sets the address of the MQTT broker to connect to in host:port format. When omitted the port is set to 1883. Required.

Type: object

Default value: :1883

Fields of address:

address.host

Type: string

address.port

Type: number

address.hasBracketlessColons

Type: boolean

websocket

Configuration for a WebSocket transport to use by MQTT clients. Type: object

Fields of websocket:

websocket.subpath

The path that should be used when connecting to websocket endpoint. Type: string

ssl

Secure transport configuration. Type: object

Fields of ssl:

ssl.keystoreFile

Type: string

ssl.keystorePassword

Type: string

ssl.keyPassword

Type: string

ssl.pemEncodedFile

Type: string

ssl.truststoreFile

Type: string

ssl.truststorePassword

Type: string

ssl.insecure

Disables certificate checking and hostname validation. Typically used for debugging connection issues. Type: boolean

ssl.ignoreHostnameValidation

Type: boolean

auth

If not specified no authentication is used. Type: object

Fields of auth:

auth.username

The username to present to the broker. Type: string

auth.password

The password to present to the broker. Type: string

connect

General connection options such as keep alive and user properties to pass to client. Type: object

Fields of connect:

connect.userProperties

List of key value paris that will be passed to MQTT client as user properties Type: array

connect.keepAlive

Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short) Type: object

Unit: unsigned short

reconnect

Configuration for an exponential backoff reconnect strategy. Type: object

Fields of reconnect:

reconnect.initialDelay

The initial delay before attempting to reconnect. Type: object

reconnect.maxDelay

Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay specified. Type: object

oauth

Configures the OAuth details. Type: object

Fields of oauth:

oauth.scopes

Type: array

oauth.tokenUrl

Type: string

oauth.clientId

Type: string

oauth.clientSecret

Type: string

oauth.grantType

Type: string

Default value: client_credentials

oauth.username

Type: string

oauth.password

Type: string

retry

Type: object

Fields of retry:

retry.retries

Number of retry attempts after a request failure. Type: number

Default value: 3

retry.backoffDelay

Initial delay before the first retry attempt. Type: object

Default value: PT0.2S

retry.maxBackoffDelay

Maximum delay between consecutive retry attempts. Type: object

Default value: PT3S

exceptionHandler

Custom exception handler enabling interception of all errors related to this sink. Type: object

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-mqtt-api</artifactId>
    <version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.4.0'

Usage Examples

stream
    .withName("Send data to Mqtt")
    .consumeFromSource(Sources.collection(SOURCE_MESSAGE))
    .processWith(message -> MqttPublishMessage
            .builder(topic, MqttMessageQoS.AT_LEAST_ONCE)
            .withPayload(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)))
            .build())
    .terminateWithSink(
            MqttSinkConfigBuilder.builder()
                    .withAddress("mqtt.example.com", 1883)
                    .withSslBuilder(ssl -> {
                        ssl.withIgnoreHostnameValidation(true);
                    })
                    .withOauthBuilder(oauth -> {
                        oauth.withClientId("admin");
                        oauth.withTokenUrl("...");
                    })
    );

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 using websockets and oauth.

sink:
   mqtt:
       address: "mqtt.example.com:1883"
       auth:
           username: "admin
           password: "admin
       ssl:
           ignoreHostnameValidation: true
       websocket:
           subpath: /topic

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 using websockets and authentication.