Quick Start Guide¶
This guide will help you get started with ZephCast by walking through basic examples for each supported message broker.
Basic Concepts¶
ZephCast provides a unified interface for working with different message brokers. The main concepts are:
- Client: A connection to a message broker
- Stream: A named channel for sending/receiving messages
- Producer: Sends messages to a stream
- Consumer: Receives messages from a stream
Async Iterator Pattern¶
All async clients in ZephCast implement the async iterator pattern, which provides a clean and Pythonic way to work with message streams:
# Using async context manager for automatic connection management
async with client:
# Using async iterator for message consumption
async for message in client:
print(f"Received: {message}")
This pattern is equivalent to:
# Manual connection management
await client.connect()
try:
# Manual message consumption
async for message in client.receive():
print(f"Received: {message}")
finally:
await client.close()
Kafka Example¶
Here's a complete example of using ZephCast with Kafka:
import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def kafka_example():
# Create a client
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
async with client:
# Send some messages
await client.send("Hello Kafka!")
await client.send("Message 2")
await client.send("Message 3")
# Receive and process messages
async for message in client:
print(f"Received from Kafka: {message}")
# Break after receiving 3 messages
if message == "Message 3":
break
# Run the example
asyncio.run(kafka_example())
RabbitMQ Example¶
Here's how to use ZephCast with RabbitMQ:
import asyncio
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def rabbitmq_example():
# Create a client
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
async with client:
# Send some messages
await client.send("Hello RabbitMQ!")
await client.send("Message 2")
await client.send("Message 3")
# Receive and process messages
async for message in client:
print(f"Received from RabbitMQ: {message}")
# Break after receiving 3 messages
if message == "Message 3":
break
# Run the example
asyncio.run(rabbitmq_example())
Redis Example¶
Here's how to use ZephCast with Redis Streams:
import asyncio
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
async def redis_example():
# Create a client
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379"
)
)
async with client:
# Send some messages
await client.send("Hello Redis!")
await client.send("Message 2")
await client.send("Message 3")
# Receive and process messages
async for message in client:
print(f"Received from Redis: {message}")
# Break after receiving 3 messages
if message == "Message 3":
break
# Run the example
asyncio.run(redis_example())
Consumer Groups Example¶
Here's an example of using consumer groups with Kafka:
import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def consumer_group_example():
# Create multiple consumers in the same group
consumers = [
KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
group_id="my-group"
)
)
for _ in range(3)
]
# Connect all consumers
await asyncio.gather(*(consumer.connect() for consumer in consumers))
try:
# Create a producer
producer = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
async with producer:
# Send some messages
for i in range(10):
await producer.send(f"Message {i}")
# Process messages with multiple consumers
async def consume(client, consumer_id):
async for message in client:
print(f"Consumer {consumer_id} received: {message}")
# Run consumers concurrently
await asyncio.gather(*(
consume(consumer, i)
for i, consumer in enumerate(consumers)
))
finally:
# Clean up
await asyncio.gather(*(consumer.close() for consumer in consumers))
# Run the example
asyncio.run(consumer_group_example())
Next Steps¶
- Learn about Configuration Options
- See Advanced Usage
- Check out the API Reference