Kafka API Reference¶
The Kafka client provides a high-level interface for working with Apache Kafka.
Core Components¶
KafkaClient¶
The main client class for interacting with Kafka.
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
group_id="my-group" # Optional
)
)
Methods¶
connect()- Establish connection to Kafkaclose()- Close the connectionsend(message)- Send a message to the configured topic__aiter__()- Async iterator for receiving messages (directly iterate over the client)
Configuration Options¶
stream_name- Topic nameconfig- KafkaConfig object with the following options:bootstrap_servers- Kafka broker addressesgroup_id- Consumer group ID (optional)security_protocol- Security protocol (optional)sasl_mechanism- SASL mechanism (optional)sasl_plain_username- SASL username (optional)sasl_plain_password- SASL password (optional)
Usage Example¶
```python import asyncio from zephcast.aio.kafka.client import KafkaClient from zephcast.aio.kafka.config import KafkaConfig
async def kafka_example(): client = KafkaClient( stream_name="my-topic", config=KafkaConfig( bootstrap_servers="localhost:9092" ) )
await client.connect()
try:
await client.send("Hello Kafka!")
async for message in client:
print(f"Received: {message}")
break
finally:
await client.close()
asyncio.run(kafka_example())