RealtimeTrigger RealtimeTrigger

yaml
type: "io.kestra.plugin.kafka.RealtimeTrigger"

Consume a message in real-time from a Kafka topic and create one execution per message.

If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.kafka.Trigger instead.

Examples

Consume a message from a Kafka topic in real time.

yaml
id: kafka_realtime_trigger
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: "{{ trigger.value }}"

triggers:
  - id: realtime_trigger
    type: io.kestra.plugin.kafka.RealtimeTrigger
    topic: test_kestra
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      schema.registry.url: http://localhost:8085
      keyDeserializer: STRING
      valueDeserializer: AVRO
    groupId: kafkaConsumerGroupId

Properties

groupId

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Kafka consumer group ID.

Using a consumer group, we will fetch only records that haven't been consumed yet.

keyDeserializer

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • INTEGER
    • FLOAT
    • DOUBLE
    • LONG
    • SHORT
    • BYTE_ARRAY
    • BYTE_BUFFER
    • BYTES
    • UUID
    • VOID
    • AVRO
    • JSON

The deserializer used for the key.

Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.

properties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required: ✔️

Kafka connection properties.

The bootstrap.servers property is a minimal required configuration to connect to a Kafka topic. This property can reference any valid Consumer Configs or Producer Configs as key-value pairs. If you want to pass a truststore or a keystore, you must provide a base64 encoded string for ssl.keystore.location and ssl.truststore.location.

valueDeserializer

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️
  • Default: STRING
  • Possible Values:
    • STRING
    • INTEGER
    • FLOAT
    • DOUBLE
    • LONG
    • SHORT
    • BYTE_ARRAY
    • BYTE_BUFFER
    • BYTES
    • UUID
    • VOID
    • AVRO
    • JSON

The deserializer used for the value.

Possible values are: STRING, INTEGER, FLOAT, DOUBLE, LONG, SHORT, BYTE_ARRAY, BYTE_BUFFER, BYTES, UUID, VOID, AVRO, JSON.

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

partitions

  • Type: array
  • SubType: integer
  • Dynamic: ✔️
  • Required:

Topic partitions to consume messages from.

Manually assign a list of partitions to the consumer.

serdeProperties

  • Type: object
  • SubType: string
  • Dynamic: ✔️
  • Required:
  • Default: {}

Serializer configuration

Configuration that will be passed to serializer or deserializer. The avro.use.logical.type.converters is always passed when you have any values set to true.

since

  • Type: string
  • Dynamic: ✔️
  • Required:

Timestamp of a message to start consuming messages from.

By default, we consume all messages from the topics with no consumer group or depending on the configuration of the auto.offset.reset property. However, you can provide an arbitrary start time. This property is ignored if a consumer group is used. It must be a valid ISO 8601 date.

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

topic

  • Type: object
  • Dynamic:
  • Required:

Kafka topic(s) to consume messages from.

It can be a string or a list of strings to consume from one or multiple topics.

topicPattern

  • Type: string
  • Dynamic: ✔️
  • Required:

Kafka topic pattern to consume messages from.

Consumer will subscribe to all topics matching the specified pattern to get dynamically assigned partitions.

Outputs

headers

  • Type: array
  • SubType: String_
  • Required:

key

  • Type: object
  • Required:

offset

  • Type: integer
  • Required:

partition

  • Type: integer
  • Required:

timestamp

  • Type: string
  • Required:
  • Format: date-time

topic

  • Type: string
  • Required:

value

  • Type: object
  • Required:

Definitions

org.apache.commons.lang3.tuple.Pair_java.lang.String.java.lang.String_

Was this page helpful?