Redis Client Guide¶
The Redis client in ZephCast provides a high-level interface for working with Redis Streams.
Features¶
- Asynchronous message publishing and consumption
- Consumer groups
- Stream trimming
- Message acknowledgment
- Connection pooling
- SSL/TLS support
Basic Usage¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
async def redis_example():
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379"
)
)
await client.connect()
await client.send("Hello Redis!")
async for message in client:
print(f"Received: {message}")
break
await client.close()
Consumer Groups¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
async def consumer_group_example():
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
consumer_group="my-group",
consumer_name="consumer-1",
redis_url="redis://localhost:6379"
)
)
await client.connect()
async for message in client:
print(f"Consumer received: {message}")
await client.ack(message)
await client.close()
Stream Management¶
Stream Trimming¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
async def stream_trimming_example():
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379",
max_stream_length=1000
)
)
await client.connect()
# Stream will be automatically trimmed
for i in range(2000):
await client.send(f"Message {i}")
await client.close()
Message Acknowledgment¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
async def ack_example():
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
consumer_group="my-group",
auto_ack=False,
redis_url="redis://localhost:6379"
)
)
await client.connect()
try:
async for message in client:
try:
print(f"Processing: {message}")
# Acknowledge success
await client.ack(message)
except Exception:
# Message will be redelivered
continue
finally:
await client.close()
Connection Management¶
Connection Pooling¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379",
max_connections=10,
min_connections=2
)
)
SSL/TLS Configuration¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="rediss://localhost:6379", # Note: rediss:// for SSL
ssl_ca_certs="/path/to/ca.pem",
ssl_certfile="/path/to/cert.pem",
ssl_keyfile="/path/to/key.pem"
)
)
Error Handling¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
from redis.exceptions import RedisError
async def error_handling_example():
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379"
)
)
try:
await client.connect()
await client.send("test message")
except RedisError as e:
print(f"Redis error: {e}")
finally:
await client.close()
Best Practices¶
- Use consumer groups for scalable message processing
- Configure appropriate stream length limits
- Implement proper message acknowledgment
- Use connection pooling for better performance
- Monitor stream length and memory usage
- Use SSL/TLS in production environments
- Implement proper error handling and retries
- Clean up old messages using XDEL or XTRIM
- Monitor consumer group lag
- Use appropriate connection timeouts