Skip to content

Quick Start Guide

This guide will help you get started with ZephCast by walking through basic examples for each supported message broker.

Basic Concepts

ZephCast provides a unified interface for working with different message brokers. The main concepts are:

  • Client: A connection to a message broker
  • Stream: A named channel for sending/receiving messages
  • Producer: Sends messages to a stream
  • Consumer: Receives messages from a stream

Async Iterator Pattern

All async clients in ZephCast implement the async iterator pattern, which provides a clean and Pythonic way to work with message streams:

# Using async context manager for automatic connection management
async with client:
    # Using async iterator for message consumption
    async for message in client:
        print(f"Received: {message}")

This pattern is equivalent to:

# Manual connection management
await client.connect()
try:
    # Manual message consumption
    async for message in client.receive():
        print(f"Received: {message}")
finally:
    await client.close()

Kafka Example

Here's a complete example of using ZephCast with Kafka:

import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

async def kafka_example():
    # Create a client
    client = KafkaClient(
        stream_name="my-topic",
        config=KafkaConfig(
            bootstrap_servers="localhost:9092"
        )
    )

    async with client:
        # Send some messages
        await client.send("Hello Kafka!")
        await client.send("Message 2")
        await client.send("Message 3")

        # Receive and process messages
        async for message in client:
            print(f"Received from Kafka: {message}")
            # Break after receiving 3 messages
            if message == "Message 3":
                break

# Run the example
asyncio.run(kafka_example())

RabbitMQ Example

Here's how to use ZephCast with RabbitMQ:

import asyncio
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig

async def rabbitmq_example():
    # Create a client
    client = RabbitClient(
        stream_name="my-routing-key",
        config=RabbitConfig(
            queue_name="my-queue",
            rabbitmq_url="amqp://guest:guest@localhost:5672/"
        )
    )

    async with client:
        # Send some messages
        await client.send("Hello RabbitMQ!")
        await client.send("Message 2")
        await client.send("Message 3")

        # Receive and process messages
        async for message in client:
            print(f"Received from RabbitMQ: {message}")
            # Break after receiving 3 messages
            if message == "Message 3":
                break

# Run the example
asyncio.run(rabbitmq_example())

Redis Example

Here's how to use ZephCast with Redis Streams:

import asyncio
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig

async def redis_example():
    # Create a client
    client = RedisClient(
        stream_name="my-stream",
        config=RedisConfig(
            redis_url="redis://localhost:6379"
        )
    )

    async with client:
        # Send some messages
        await client.send("Hello Redis!")
        await client.send("Message 2")
        await client.send("Message 3")

        # Receive and process messages
        async for message in client:
            print(f"Received from Redis: {message}")
            # Break after receiving 3 messages
            if message == "Message 3":
                break

# Run the example
asyncio.run(redis_example())

Consumer Groups Example

Here's an example of using consumer groups with Kafka:

import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

async def consumer_group_example():
    # Create multiple consumers in the same group
    consumers = [
        KafkaClient(
            stream_name="my-topic",
            config=KafkaConfig(
                bootstrap_servers="localhost:9092",
                group_id="my-group"
            )
        )
        for _ in range(3)
    ]

    # Connect all consumers
    await asyncio.gather(*(consumer.connect() for consumer in consumers))

    try:
        # Create a producer
        producer = KafkaClient(
            stream_name="my-topic",
            config=KafkaConfig(
                bootstrap_servers="localhost:9092"
            )
        )

        async with producer:
            # Send some messages
            for i in range(10):
                await producer.send(f"Message {i}")

            # Process messages with multiple consumers
            async def consume(client, consumer_id):
                async for message in client:
                    print(f"Consumer {consumer_id} received: {message}")

            # Run consumers concurrently
            await asyncio.gather(*(
                consume(consumer, i) 
                for i, consumer in enumerate(consumers)
            ))
    finally:
        # Clean up
        await asyncio.gather(*(consumer.close() for consumer in consumers))

# Run the example
asyncio.run(consumer_group_example())

Next Steps