RabbitMQ Client Guide¶
The RabbitMQ client in ZephCast provides a high-level interface for working with RabbitMQ, supporting various messaging patterns.
Features¶
- Asynchronous message publishing and consumption
- Exchange types (direct, fanout, topic, headers)
- Queue bindings and routing
- Message persistence
- Dead letter exchanges
- Publisher confirms
- Consumer acknowledgments
Basic Usage¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def rabbitmq_example():
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
await client.connect()
await client.send("Hello RabbitMQ!")
async for message in client:
print(f"Received: {message}")
break
await client.close()
Exchange Types¶
Direct Exchange¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
exchange_name="my-exchange",
exchange_type="direct",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
Fanout Exchange¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
client = RabbitClient(
stream_name="", # Routing key not used in fanout
config=RabbitConfig(
queue_name="my-queue",
exchange_name="my-fanout",
exchange_type="fanout",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
Topic Exchange¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
client = RabbitClient(
stream_name="user.created.*", # Topic pattern
config=RabbitConfig(
queue_name="user-events",
exchange_name="events",
exchange_type="topic",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
Message Persistence¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
exchange_name="my-exchange",
exchange_durable=True,
queue_durable=True,
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
Dead Letter Exchange¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
exchange_name="my-exchange",
dead_letter_exchange="dlx",
dead_letter_routing_key="failed",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
Publisher Confirms¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def publisher_confirms_example():
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
publisher_confirms=True,
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
await client.connect()
try:
# Send with confirmation
confirmation = await client.send("Important message")
if confirmation:
print("Message confirmed by broker")
finally:
await client.close()
Consumer Acknowledgments¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def consumer_acks_example():
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
auto_ack=False,
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
await client.connect()
try:
async for message in client:
try:
print(f"Processing: {message}")
await client.ack(message)
except Exception:
# Negative acknowledge on failure
await client.nack(message, requeue=True)
finally:
await client.close()
Best Practices¶
- Use appropriate exchange types for your messaging pattern
- Enable message persistence for important data
- Use publisher confirms for critical messages
- Implement proper consumer acknowledgments
- Configure dead letter exchanges for failed messages
- Set appropriate QoS prefetch values
- Monitor queue lengths and consumer health
- Use SSL in production environments
- Implement proper connection and channel cleanup