Consumer Groups¶
Consumer groups allow multiple consumers to work together to process messages from a stream. Each message is delivered to only one consumer in the group.
Overview¶
Consumer groups are supported by all three message brokers in ZephCast:
- Kafka: Native consumer groups
- RabbitMQ: Competing consumers pattern
- Redis: Stream consumer groups
Kafka Consumer Groups¶
import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def kafka_consumer_group():
consumers = [
KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
group_id="my-group"
)
)
for _ in range(3)
]
await asyncio.gather(*(consumer.connect() for consumer in consumers))
try:
async def consume(client, consumer_id):
async for message in client:
print(f"Consumer {consumer_id} received: {message}")
await asyncio.gather(*(
consume(consumer, i)
for i, consumer in enumerate(consumers)
))
finally:
await asyncio.gather(*(consumer.close() for consumer in consumers))
RabbitMQ Competing Consumers¶
import asyncio
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def rabbitmq_competing_consumers():
consumers = [
RabbitClient(
stream_name="task-queue",
config=RabbitConfig(
queue_name="shared-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
for _ in range(3)
]
await asyncio.gather(*(consumer.connect() for consumer in consumers))
try:
async def consume(client, consumer_id):
async for message in client:
print(f"Worker {consumer_id} processing: {message}")
await asyncio.sleep(1) # Simulate work
await asyncio.gather(*(
consume(consumer, i)
for i, consumer in enumerate(consumers)
))
finally:
await asyncio.gather(*(consumer.close() for consumer in consumers))
Redis Stream Consumer Groups¶
import asyncio
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
async def redis_consumer_group():
consumers = [
RedisClient(
stream_name="my-stream",
config=RedisConfig(
consumer_group="my-group",
consumer_name=f"consumer-{i}",
redis_url="redis://localhost:6379"
)
)
for i in range(3)
]
await asyncio.gather(*(consumer.connect() for consumer in consumers))
try:
async def consume(client, consumer_id):
async for message in client:
print(f"Consumer {consumer_id} received: {message}")
await client.ack(message)
await asyncio.gather(*(
consume(consumer, i)
for i, consumer in enumerate(consumers)
))
finally:
await asyncio.gather(*(consumer.close() for consumer in consumers))
Best Practices¶
Scaling¶
- Start with a small number of consumers
- Monitor processing throughput
- Add consumers gradually as needed
- Consider message ordering requirements
Message Processing¶
- Implement idempotent processing
- Handle message failures gracefully
- Consider using dead letter queues
- Implement proper acknowledgment
Monitoring¶
- Track consumer lag
- Monitor processing rates
- Set up alerts for stuck consumers
- Track message processing times
Error Handling¶
- Implement retry logic
- Use dead letter exchanges/queues
- Log failed messages
- Monitor error rates
Common Issues¶
Message Ordering¶
When using consumer groups, message ordering is only guaranteed within a single partition (Kafka) or stream (Redis). If ordering is critical:
- Use a single consumer
- Use partition keys (Kafka)
- Use separate queues (RabbitMQ)
Rebalancing¶
When consumers join or leave the group, messages may be rebalanced:
- Implement graceful shutdown
- Handle duplicate messages
- Use appropriate session timeouts
- Monitor rebalancing events
Performance¶
To optimize performance:
- Tune batch sizes
- Configure appropriate timeouts
- Use connection pooling
- Monitor resource usage