Error Handling¶
Proper error handling is crucial for building reliable messaging applications. This guide covers error handling strategies for ZephCast.
Common Error Types¶
Connection Errors¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
from kafka.errors import KafkaConnectionError
async def handle_connection_error():
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
try:
await client.connect()
except KafkaConnectionError as e:
print(f"Connection failed: {e}")
# Implement retry logic or fallback
finally:
await client.close()
Message Publishing Errors¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
from aio_pika.exceptions import MessageError
async def handle_publish_error():
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
await client.connect()
try:
await client.send("My message")
except MessageError as e:
print(f"Failed to publish message: {e}")
...
finally:
await client.close()
Consumer Errors¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
from redis.exceptions import RedisError
async def handle_consumer_error():
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379"
)
)
await client.connect()
try:
async for message in client:
try:
process_message(message)
await client.ack(message)
except ProcessingError:
await client.nack(message)
except RedisError as e:
print(f"Redis error: {e}")
break
finally:
await client.close()
Retry Strategies¶
Exponential Backoff¶
import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def exponential_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
async def publish_with_retry():
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
await client.connect()
try:
await exponential_backoff(
lambda: client.send("Important message")
)
finally:
await client.close()
Circuit Breaker¶
import time
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
class CircuitBreaker:
def __init__(self, failure_threshold=3, reset_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = 0
self.is_open = False
async def call(self, func):
if self.is_open:
if time.time() - self.last_failure_time > self.reset_timeout:
self.is_open = False
self.failure_count = 0
else:
raise Exception("Circuit breaker is open")
try:
result = await func()
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.is_open = True
raise e
async def publish_with_circuit_breaker():
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
breaker = CircuitBreaker()
await client.connect()
try:
await breaker.call(
lambda: client.send("Test message")
)
finally:
await client.close()
Dead Letter Handling¶
RabbitMQ Dead Letter Exchange¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def setup_dead_letter():
main_client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
exchange_name="my-exchange",
dead_letter_exchange="dlx",
dead_letter_routing_key="failed",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
dlq_client = RabbitClient(
stream_name="failed",
config=RabbitConfig(
queue_name="dead-letter-queue",
exchange_name="dlx",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
await main_client.connect()
await dlq_client.connect()
try:
async for message in main_client:
try:
process_message(message)
await main_client.ack(message)
except Exception as e:
await main_client.nack(message, requeue=False)
finally:
await main_client.close()
await dlq_client.close()
Best Practices¶
- Always implement proper error handling
- Use appropriate retry strategies
- Set up dead letter queues/exchanges
- Implement circuit breakers for external services
- Log errors with context
- Monitor error rates and types
- Set appropriate timeouts
- Clean up resources in finally blocks
- Handle both expected and unexpected errors
- Implement graceful degradation