Skip to Content

Kafka (and Confluent)

This connector is tested for Apache Kafka (free version of Kafka) and Confluent (a commercial version of Kafka)

In an application, right-click on Sources -> select Create source -> type any Source name and select KafkaSource in the Source type field -> click Create.

  • host - host name
  • port - port
  • username - user’s name
  • password - user’s password
  • sourceOptions - JSON object with source-level Kafka properties

SSL authentication configuration

Use sourceOptions (in JSON-object format) to configure authentication. In the options you can refer to the username and **password fields using ${usernae} and ${password} variables. Below is an example of SSL authentication.

{"sasl.mechanism":"PLAIN","security.protocol":"SASL_SSL","sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${username}\" password=\"${password}\";"}

Sending data to Kafka

To publish messages to a Kafka topic use LoadTableFromSource (aka Load) operation. You can load a single JSON message from a JSONInMemorySource source or multiple JSON messages from a JSONCollectionSource source. In the Target section, select a KafkaSource-typed source as **Source, leave Space blank, enter the topic name in the Table field, specify Kafka operation-level properties as JSON object in Target source options (e.g. {"acks":"all"}).

The Kafka connection properties will be constructed according to the following rules:

  • First, the following default properties are added:
    • bootstrap.servers set to the concatenation of host and port specified in the source as follows: <source.host>:<source.port>
    • key.serializer is set to org.apache.kafka.common.serialization.Serializer
    • value.serializer is set to org.apache.kafka.common.serialization.Serializer
    • value.deserializer is set to org.apache.kafka.common.serialization.Deserializer
    • key.deserializer is set to org.apache.kafka.common.serialization.Deserializer
    • acks is set to all
  • Then the source-level properties specified in the sourceOptions field of the source are added, possibly overwriting properties added at the previous step
  • Then the operation-level properties specified in the Target source options field of the Load operation are added, possibly overwriting properties added at the previous step

The properties will overwrite the connection-level properties specified in the KafkaSource-typed source if there is any overlap.

Reading data from Kafka

To read data from a queue, use MessageConsumerService operation.

Last updated on