Skip to content

Kafka API Reference

The Kafka client provides a high-level interface for working with Apache Kafka.

Core Components

KafkaClient

The main client class for interacting with Kafka.

from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

client = KafkaClient(
    stream_name="my-topic",
    config=KafkaConfig(
        bootstrap_servers="localhost:9092",
        group_id="my-group"  # Optional
    )
)

Methods

  • connect() - Establish connection to Kafka
  • close() - Close the connection
  • send(message) - Send a message to the configured topic
  • __aiter__() - Async iterator for receiving messages (directly iterate over the client)

Configuration Options

  • stream_name - Topic name
  • config - KafkaConfig object with the following options:
  • bootstrap_servers - Kafka broker addresses
  • group_id - Consumer group ID (optional)
  • security_protocol - Security protocol (optional)
  • sasl_mechanism - SASL mechanism (optional)
  • sasl_plain_username - SASL username (optional)
  • sasl_plain_password - SASL password (optional)

Usage Example

```python import asyncio from zephcast.aio.kafka.client import KafkaClient from zephcast.aio.kafka.config import KafkaConfig

async def kafka_example(): client = KafkaClient( stream_name="my-topic", config=KafkaConfig( bootstrap_servers="localhost:9092" ) )

await client.connect()

try:
    await client.send("Hello Kafka!")

    async for message in client:
        print(f"Received: {message}")
        break

finally:
    await client.close()

asyncio.run(kafka_example())