Redis API Reference¶
The Redis client provides a high-level interface for working with Redis Streams.
Core Components¶
RedisClient¶
The main client class for interacting with Redis Streams.
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379",
consumer_group="my-group" # Optional
)
)
Methods¶
connect()- Establish connection to Redisclose()- Close the connectionsend(message)- Add a message to the stream__aiter__()- Async iterator for receiving messages (directly iterate over the client)ack(message)- Acknowledge a message (when using consumer groups)
Configuration Options¶
stream_name- Stream nameconfig- RedisConfig object with the following options:redis_url- Redis connection URLconsumer_group- Consumer group name (optional)consumer_name- Consumer name (optional)max_stream_length- Maximum stream length (optional)auto_ack- Automatic acknowledgment (optional)max_connections- Connection pool size (optional)min_connections- Minimum connections in pool (optional)
Usage Example¶
import asyncio
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
async def redis_example():
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379"
)
)
await client.connect()
try:
await client.send("Hello Redis!")
async for message in client:
print(f"Received: {message}")
break
finally:
await client.close()
asyncio.run(redis_example())
Consumer Group Example¶
```python from zephcast.aio.redis.client import RedisClient from zephcast.aio.redis.config import RedisConfig
async def consumer_group_example(): client = RedisClient( stream_name="my-stream", config=RedisConfig( redis_url="redis://localhost:6379", consumer_group="my-group", consumer_name="consumer-1", auto_ack=False ) )
await client.connect()
try:
async for message in client:
try:
print(f"Processing: {message}")
await client.ack(message)
except Exception as e:
print(f"Processing failed: {e}")
# Message will be redelivered
continue
finally:
await client.close()