Kafka Client Guide¶
The Kafka client in ZephCast provides a high-level interface for working with Apache Kafka.
Features¶
- Asynchronous message publishing and consumption
- Consumer group support
- SSL/SASL authentication
- Automatic reconnection
- Configurable batch sizes and compression
Basic Usage¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def kafka_example():
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
await client.connect()
await client.send("Hello Kafka!")
async for message in client:
print(f"Received: {message}")
break
await client.close()
Consumer Groups¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def consumer_group_example():
consumer = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
group_id="my-group"
)
)
await consumer.connect()
async for message in consumer:
print(f"Consumer received: {message}")
await consumer.close()
Security Configuration¶
SSL/TLS¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
security_protocol="SSL",
ssl_cafile="/path/to/ca.pem",
ssl_certfile="/path/to/cert.pem",
ssl_keyfile="/path/to/key.pem"
)
)
SASL¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username="user",
sasl_plain_password="password"
)
)
Performance Tuning¶
Batch Configuration¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
batch_size=16384,
linger_ms=10,
compression_type="snappy"
)
)
Consumer Configuration¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
fetch_max_bytes=52428800,
max_partition_fetch_bytes=1048576
)
)
Error Handling¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
from kafka.errors import KafkaError
async def error_handling_example():
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092"
)
)
try:
await client.connect()
await client.send("test message")
except KafkaError as e:
print(f"Kafka error: {e}")
finally:
await client.close()
Best Practices¶
- Always use consumer groups in production
- Configure appropriate batch sizes for your use case
- Use compression for large messages or high throughput
- Implement proper error handling and retries
- Monitor consumer lag and broker health
- Use SSL/SASL in production environments
- Configure appropriate timeout values
- Implement proper cleanup in finally blocks