Mqtt¶
The mqtt
sink connects to an MQTT broker to publish messages to specified topic.
It supports configuration of basic attributes like the broker’s host and port and topic filter.
Additionally it supports basic authentication, TLS as well as OAuth.
MqttSinkConfigBuilder.builder()
.withAddress("mqtt.example.com", 1883)
.withIdentifier("Tests")
.withWebsocketBuilder(websocket -> {
websocket.withSubpath("/topic");
})
.withAuthBuilder(auth -> {
auth.withUsername("admin");
auth.withPassword("admin");
})
sink:
mqtt:
address: "mqtt.example.com:1883"
identifier: "Tests"
websocket:
subpath: /topic
Properties¶
identifier
¶
Type: string
address
¶
Sets the address of the MQTT broker to connect to in host:port
format. When omitted the port is set to 1883
.
Required.
Type: object
Default value: :1883
Fields of address
:
address.host
¶
Type: string
address.port
¶
Type: number
address.hasBracketlessColons
¶
Type: boolean
websocket
¶
Configuration for a WebSocket transport to use by MQTT clients.
Type: object
Fields of websocket
:
websocket.subpath
¶
The path that should be used when connecting to websocket endpoint.
Type: string
ssl
¶
Secure transport configuration.
Type: object
Fields of ssl
:
ssl.keystoreFile
¶
Type: string
ssl.keystorePassword
¶
Type: string
ssl.keyPassword
¶
Type: string
ssl.pemEncodedFile
¶
Type: string
ssl.truststoreFile
¶
Type: string
ssl.truststorePassword
¶
Type: string
ssl.insecure
¶
Disables certificate checking and hostname validation. Typically used for debugging connection issues.
Type: boolean
ssl.ignoreHostnameValidation
¶
Type: boolean
auth
¶
If not specified no authentication is used.
Type: object
Fields of auth
:
auth.username
¶
The username to present to the broker.
Type: string
auth.password
¶
The password to present to the broker.
Type: string
connect
¶
General connection options such as keep alive and user properties to pass to client.
Type: object
Fields of connect
:
connect.userProperties
¶
List of key value paris that will be passed to MQTT client as user properties
Type: array
connect.keepAlive
¶
Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short)
Type: object
Unit: unsigned short
reconnect
¶
Configuration for an exponential backoff reconnect strategy.
Type: object
Fields of reconnect
:
reconnect.initialDelay
¶
The initial delay before attempting to reconnect.
Type: object
reconnect.maxDelay
¶
Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay
specified.
Type: object
oauth
¶
Configures the OAuth details.
Type: object
Fields of oauth
:
oauth.scopes
¶
Type: array
oauth.tokenUrl
¶
Type: string
oauth.clientId
¶
Type: string
oauth.clientSecret
¶
Type: string
oauth.grantType
¶
Type: string
Default value: client_credentials
oauth.username
¶
Type: string
oauth.password
¶
Type: string
retry
¶
Type: object
Fields of retry
:
retry.retries
¶
Number of retry attempts after a request failure.
Type: number
Default value: 3
retry.backoffDelay
¶
Initial delay before the first retry attempt.
Type: object
Default value: PT0.2S
retry.maxBackoffDelay
¶
Maximum delay between consecutive retry attempts.
Type: object
Default value: PT3S
exceptionHandler
¶
Custom exception handler enabling interception of all errors related to this sink.
Type: object
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-mqtt-api</artifactId>
<version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.4.0'
Usage Examples¶
stream
.withName("Send data to Mqtt")
.consumeFromSource(Sources.collection(SOURCE_MESSAGE))
.processWith(message -> MqttPublishMessage
.builder(topic, MqttMessageQoS.AT_LEAST_ONCE)
.withPayload(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)))
.build())
.terminateWithSink(
MqttSinkConfigBuilder.builder()
.withAddress("mqtt.example.com", 1883)
.withSslBuilder(ssl -> {
ssl.withIgnoreHostnameValidation(true);
})
.withOauthBuilder(oauth -> {
oauth.withClientId("admin");
oauth.withTokenUrl("...");
})
);
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
using websockets and oauth.
sink:
mqtt:
address: "mqtt.example.com:1883"
auth:
username: "admin
password: "admin
ssl:
ignoreHostnameValidation: true
websocket:
subpath: /topic
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
using websockets and authentication.