Performance Optimization¶
This guide covers performance optimization techniques for ZephCast applications.
Batch Processing¶
Kafka Batch Publishing¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def batch_publish():
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
batch_size=16384, # 16KB
linger_ms=10, # Wait up to 10ms to batch messages
compression_type="snappy"
)
)
await client.connect()
try:
# Messages will be automatically batched
for i in range(10000):
await client.send(f"Message {i}")
finally:
await client.close()
RabbitMQ Batch Publishing¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def batch_publish():
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/",
batch_size=100,
batch_timeout=1.0 # seconds
)
)
await client.connect()
try:
messages = [f"Message {i}" for i in range(1000)]
await client.send_batch(messages)
finally:
await client.close()
Connection Pooling¶
Redis Connection Pool¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379",
max_connections=10,
min_connections=2
)
)
RabbitMQ Channel Pool¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/",
channel_pool_size=5
)
)
Message Compression¶
Kafka Compression¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
compression_type="snappy", # or "gzip", "lz4", "zstd"
compression_level=6
)
)
Custom Message Compression¶
import zlib
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
async def compress_and_send():
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/"
)
)
await client.connect()
try:
message = "Large message content"
compressed = zlib.compress(message.encode())
await client.send(compressed)
finally:
await client.close()
Consumer Optimization¶
Parallel Processing¶
import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
async def parallel_processing():
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
group_id="my-group"
)
)
await client.connect()
async def process_message(message):
# Simulate async processing
await asyncio.sleep(0.1)
return f"Processed: {message}"
try:
tasks = []
async for message in client:
task = asyncio.create_task(process_message(message))
tasks.append(task)
# Limit concurrent tasks
if len(tasks) >= 10:
await asyncio.gather(*tasks)
tasks = []
finally:
if tasks:
await asyncio.gather(*tasks)
await client.close()
Prefetch Settings¶
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig
client = RabbitClient(
stream_name="my-routing-key",
config=RabbitConfig(
queue_name="my-queue",
rabbitmq_url="amqp://guest:guest@localhost:5672/",
prefetch_count=100 # Number of unacked messages
)
)
Memory Management¶
Message Size Limits¶
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig
client = KafkaClient(
stream_name="my-topic",
config=KafkaConfig(
bootstrap_servers="localhost:9092",
max_request_size=1048576, # 1MB
message_max_bytes=1000000
)
)
Stream Trimming (Redis)¶
from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig
client = RedisClient(
stream_name="my-stream",
config=RedisConfig(
redis_url="redis://localhost:6379",
max_stream_length=1000, # Keep only last 1000 messages
approximate_trimming=True
)
)
Monitoring¶
Performance Metrics¶
import time
from zephcast.kafka.async_client import AsyncKafkaClient
async def monitor_performance():
client = AsyncKafkaClient(
stream_name="my-topic",
bootstrap_servers="localhost:9092"
)
await client.connect()
try:
# Monitor publishing performance
start_time = time.time()
message_count = 0
for i in range(10000):
await client.send(f"Message {i}")
message_count += 1
if message_count % 1000 == 0:
elapsed = time.time() - start_time
rate = message_count / elapsed
print(f"Publishing rate: {rate:.2f} messages/second")
finally:
await client.close()
Best Practices¶
- Use appropriate batch sizes
- Enable message compression
- Configure connection pools
- Set proper prefetch counts
- Monitor memory usage
- Use parallel processing wisely
- Implement backpressure
- Monitor performance metrics
- Use appropriate timeouts
- Clean up resources properly