Examples¶
This page provides practical examples of using ZephCast in different scenarios.
Basic Usage¶
Simple Producer/Consumer¶
import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def basic_example():
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
try:
await client.connect()
await client.send("Hello, World!")
async for message in client:
print(f"Received: {message}")
break
finally:
await client.close()
asyncio.run(basic_example())
Advanced Patterns¶
Fan-out Pattern¶
import asyncio
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def fan_out_example():
producer = RabbitClient(
stream_name="notifications",
config=RabbitConfig(
exchange_name="notifications",
exchange_type="fanout",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
consumers = [
RabbitClient(
stream_name="notifications",
config=RabbitConfig(
queue_name=f"consumer-{i}",
exchange_name="notifications",
exchange_type="fanout",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
for i in range(3)
]
try:
await producer.connect()
await asyncio.gather(*(consumer.connect() for consumer in consumers))
await producer.send("Broadcast message")
async def consume(client, consumer_id):
async for message in client:
print(f"Consumer {consumer_id} received: {message}")
break
await asyncio.gather(*(
consume(consumer, i)
for i, consumer in enumerate(consumers)
))
finally:
await producer.close()
await asyncio.gather(*(consumer.close() for consumer in consumers))
asyncio.run(fan_out_example())
Pub/Sub Pattern with Redis¶
import asyncio
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
async def pub_sub_example():
publisher = RedisClient(
stream_name="news-feed",
config=RedisConfig(
redis_url="redis://localhost:6379"
)
)
subscribers = [
RedisClient(
stream_name="news-feed",
config=RedisConfig(
consumer_group=f"group-{i}",
redis_url="redis://localhost:6379"
)
)
for i in range(2)
]
try:
await publisher.connect()
await asyncio.gather(*(sub.connect() for sub in subscribers))
await publisher.send("Breaking news!")
async def subscribe(client, sub_id):
async for message in client:
print(f"Subscriber {sub_id} received: {message}")
break
await asyncio.gather(*(
subscribe(sub, i)
for i, sub in enumerate(subscribers)
))
finally:
await publisher.close()
await asyncio.gather(*(sub.close() for sub in subscribers))
asyncio.run(pub_sub_example())
Error Handling¶
Retry Pattern¶
import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def retry_example():
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
max_retries = 3
retry_delay = 1 # seconds
for attempt in range(max_retries):
try:
await client.connect()
await client.send("Test message")
break
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"Attempt {attempt + 1} failed: {e}")
await asyncio.sleep(retry_delay * (attempt + 1))
await client.close()
asyncio.run(retry_example())
Integration Patterns¶
Message Transformation¶
import asyncio
import json
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def transform_example():
source = KafkaClient(
stream_name="raw-data",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
destination = KafkaClient(
stream_name="processed-data",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
try:
await source.connect()
await destination.connect()
async for message in source:
try:
data = json.loads(message)
transformed = {
"id": data["id"],
"timestamp": data["timestamp"],
"value": data["value"] * 2
}
await destination.send(json.dumps(transformed))
except json.JSONDecodeError:
print(f"Invalid JSON: {message}")
except KeyError as e:
print(f"Missing key: {e}")
finally:
await source.close()
await destination.close()
asyncio.run(transform_example())