Skip to content

Redis API Reference

The Redis client provides a high-level interface for working with Redis Streams.

Core Components

RedisClient

The main client class for interacting with Redis Streams.

from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig

client = RedisClient(
    stream_name="my-stream",
    config=RedisConfig(
        redis_url="redis://localhost:6379",
        consumer_group="my-group"  # Optional
    )
)

Methods

  • connect() - Establish connection to Redis
  • close() - Close the connection
  • send(message) - Add a message to the stream
  • __aiter__() - Async iterator for receiving messages (directly iterate over the client)
  • ack(message) - Acknowledge a message (when using consumer groups)

Configuration Options

  • stream_name - Stream name
  • config - RedisConfig object with the following options:
  • redis_url - Redis connection URL
  • consumer_group - Consumer group name (optional)
  • consumer_name - Consumer name (optional)
  • max_stream_length - Maximum stream length (optional)
  • auto_ack - Automatic acknowledgment (optional)
  • max_connections - Connection pool size (optional)
  • min_connections - Minimum connections in pool (optional)

Usage Example

import asyncio
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig

async def redis_example():
    client = RedisClient(
        stream_name="my-stream",
        config=RedisConfig(
            redis_url="redis://localhost:6379"
        )
    )

    await client.connect()

    try:
        await client.send("Hello Redis!")

        async for message in client:
            print(f"Received: {message}")
            break

    finally:
        await client.close()

asyncio.run(redis_example())

Consumer Group Example

```python from zephcast.aio.redis.client import RedisClient from zephcast.aio.redis.config import RedisConfig

async def consumer_group_example(): client = RedisClient( stream_name="my-stream", config=RedisConfig( redis_url="redis://localhost:6379", consumer_group="my-group", consumer_name="consumer-1", auto_ack=False ) )

await client.connect()

try:
    async for message in client:
        try:
            print(f"Processing: {message}")
            await client.ack(message)
        except Exception as e:
            print(f"Processing failed: {e}")
            # Message will be redelivered
            continue

finally:
    await client.close()