Kafka Consumer
What it does
Starts a workflow when a Kafka message is consumed from a configured topic and consumer group. This trigger is event-driven (no incoming HTTP request required).
Inputs / config
- No runtime inputs.
- Config fields:
credentials_id(required): Kafka credential id (bootstrap servers, optional SASL, TLS).topic(required): topic to consume from.consumer_group(required): consumer group id for offset tracking.enabled(optional, defaulttrue): enable/disable consumption without deleting node.commit_mode(optional, defaultafter_success):after_successoralways.poll_timeout_ms(optional, default1000): consume window before loop refresh.
Outputs
| Field | Type | Description |
|---|---|---|
triggered_at | string (RFC3339) | UTC time when the consumed message triggered the run |
trigger | string | Always kafka_consumer |
topic | string | Kafka topic where the message was consumed |
partition | integer | Kafka partition id |
offset | integer | Kafka offset |
key | string | Kafka message key |
message | object or string | Message payload (JSON object if parseable, else raw string) |
headers | object | Kafka record headers (string map) |
run_id | string | Deterministic run identifier for this consumed message |
Multi-pod behavior
In multi-pod deployments, Zerq relies on Kafka consumer-group coordination:
- Pods using the same
consumer_groupshare partitions. - For a given partition, only one group member processes records at a time.
- Delivery semantics remain at-least-once, so downstream actions should be idempotent when duplicates matter.
Registration lifecycle
Kafka consumer trigger registrations are maintained in-memory with an event-driven model:
- On service startup, Zerq performs a one-time bootstrap load of all published, workflow-enabled proxies.
- After startup, registrations are updated immediately when proxies/workflows are created, updated, enabled/disabled, or deleted.
- There is no recurring 60-second full-database reconciliation loop.
Example
{
"id": "kafka_consumer_1",
"type": "kafka_consumer",
"data": {
"config": {
"credentials_id": "cred_kafka_prod",
"topic": "events.orders",
"consumer_group": "zerq-orders-consumer",
"enabled": true,
"commit_mode": "after_success",
"poll_timeout_ms": 1000
}
}
}