Skip to Content
DocsOperationsMessageConsumerService

MessageConsumerService

MessageConsumerService operation is a service operation. It will start a background job that will be running until your explicitly stop it or any error occurs. The job will listen for new messages and execute the Scenario for each new message.

  • Source is the name of a message based source. Find a list of such source supported below.
  • Source options are optional and depend on the source. See below.
  • Scenario defines a scenario to execute for each new message in the source
  • Scenario parameters are optional scenario parameters in JSON format
  • Parallel readers specifies the number of parallel thread listening for messages from the source.
  • Target table will contain the id of the started job in the job_id column. Use StopJob operation to stop the job.

MessageConsumerService operation support the following types of sources in the source section:

  • RabbitMQSource
  • KafkaSource

RabbitMQSource-typed source

Queue name must contain the name of a RabbitMQ queue.

Configure the scenario specified in the Scenario field of MessageConsumerService operation to receive a new message from the RabbitMQ queue. Navigate to the scenario in the app navigation tree -> right-click on it and select Properties -> Add a new Input table with the following properties:

  • Name must be message
  • Source must be a source of JSONInMemorySource type
  • Table is any name of your choice. You will use in the scenario to get the RabbitMQ message.
  • Space and Model should be blank
  • Tmp Table is up to you

Kafka-typed source

In Source options you can specify optional operation-level connection properties as JSON Object, e.g. {"auto.offset.reset":"earliest", "enable.auto.commit":"true"} The Kafka connection properties will be constructed according to the following rules:

  • First, the following default properties are added:
    • bootstrap.servers set to the concatenation of host and port specified in the source as follows: <source.host>:<source.port>
    • value.deserializer is set to org.apache.kafka.common.serialization.Deserializer
    • key.deserializer is set to org.apache.kafka.common.serialization.Deserializer
    • group.id is set to randomly generated UUID
  • Then the source-level properties specified in the sourceOptions field of the source are added, possibly overwriting properties added at the previous step
  • Then the operation-level properties specified in the Source options field of the Load operation are added, possibly overwriting properties added at the previous step

Queue name must contain the name of a Kafka topic.

Configure the scenario specified in the Scenario field of MessageConsumerService operation to receive a new event from the Kafka topic. Navigate to the scenario in the app navigation tree -> right-click on it and select Properties -> Add a new Input table with the following properties:

  • Name must be value
  • Source must be a source of JSONInMemorySource type
  • Table is any name of your choice. You will use in the scenario to get the RabbitMQ message.
  • Space and Model should be blank
  • Tmp Table is up to you
Last updated on