Skip to main content

Kafka Consumer

What it does

Starts a workflow when a Kafka message is consumed from a configured topic and consumer group. This trigger is event-driven (no incoming HTTP request required).

Inputs / config

  • No runtime inputs.
  • Config fields:
    • credentials_id (required): Kafka credential id (bootstrap servers, optional SASL, TLS).
    • topic (required): topic to consume from.
    • consumer_group (required): consumer group id for offset tracking.
    • enabled (optional, default true): enable/disable consumption without deleting node.
    • commit_mode (optional, default after_success): after_success or always.
    • poll_timeout_ms (optional, default 1000): consume window before loop refresh.

Outputs

FieldTypeDescription
triggered_atstring (RFC3339)UTC time when the consumed message triggered the run
triggerstringAlways kafka_consumer
topicstringKafka topic where the message was consumed
partitionintegerKafka partition id
offsetintegerKafka offset
keystringKafka message key
messageobject or stringMessage payload (JSON object if parseable, else raw string)
headersobjectKafka record headers (string map)
run_idstringDeterministic run identifier for this consumed message

Multi-pod behavior

In multi-pod deployments, Zerq relies on Kafka consumer-group coordination:

  • Pods using the same consumer_group share partitions.
  • For a given partition, only one group member processes records at a time.
  • Delivery semantics remain at-least-once, so downstream actions should be idempotent when duplicates matter.

Registration lifecycle

Kafka consumer trigger registrations are maintained in-memory with an event-driven model:

  • On service startup, Zerq performs a one-time bootstrap load of all published, workflow-enabled proxies.
  • After startup, registrations are updated immediately when proxies/workflows are created, updated, enabled/disabled, or deleted.
  • There is no recurring 60-second full-database reconciliation loop.

Example

{
"id": "kafka_consumer_1",
"type": "kafka_consumer",
"data": {
"config": {
"credentials_id": "cred_kafka_prod",
"topic": "events.orders",
"consumer_group": "zerq-orders-consumer",
"enabled": true,
"commit_mode": "after_success",
"poll_timeout_ms": 1000
}
}
}