Skip to content

Performance Optimization

This guide covers performance optimization techniques for ZephCast applications.

Batch Processing

Kafka Batch Publishing

from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

async def batch_publish():
    client = KafkaClient(
        stream_name="my-topic",
        config=KafkaConfig(
            bootstrap_servers="localhost:9092",
            batch_size=16384,  # 16KB
            linger_ms=10,      # Wait up to 10ms to batch messages
            compression_type="snappy"
        )
    )

    await client.connect()

    try:
        # Messages will be automatically batched
        for i in range(10000):
            await client.send(f"Message {i}")
    finally:
        await client.close()

RabbitMQ Batch Publishing

from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig

async def batch_publish():
    client = RabbitClient(
        stream_name="my-routing-key",
        config=RabbitConfig(
            queue_name="my-queue",
            rabbitmq_url="amqp://guest:guest@localhost:5672/",
            batch_size=100,
            batch_timeout=1.0  # seconds
        )
    )

    await client.connect()

    try:
        messages = [f"Message {i}" for i in range(1000)]
        await client.send_batch(messages)
    finally:
        await client.close()

Connection Pooling

Redis Connection Pool

from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig

client = RedisClient(
    stream_name="my-stream",
    config=RedisConfig(
        redis_url="redis://localhost:6379",
        max_connections=10,
        min_connections=2
    )
)

RabbitMQ Channel Pool

from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig

client = RabbitClient(
    stream_name="my-routing-key",
    config=RabbitConfig(
        queue_name="my-queue",
        rabbitmq_url="amqp://guest:guest@localhost:5672/",
        channel_pool_size=5
    )
)

Message Compression

Kafka Compression

from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

client = KafkaClient(
    stream_name="my-topic",
    config=KafkaConfig(
        bootstrap_servers="localhost:9092",
        compression_type="snappy",  # or "gzip", "lz4", "zstd"
        compression_level=6
    )
)

Custom Message Compression

import zlib
from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig

async def compress_and_send():
    client = RabbitClient(
        stream_name="my-routing-key",
        config=RabbitConfig(
            queue_name="my-queue",
            rabbitmq_url="amqp://guest:guest@localhost:5672/"
        )
    )

    await client.connect()

    try:
        message = "Large message content"
        compressed = zlib.compress(message.encode())

        await client.send(compressed)
    finally:
        await client.close()

Consumer Optimization

Parallel Processing

import asyncio
from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

async def parallel_processing():
    client = KafkaClient(
        stream_name="my-topic",
        config=KafkaConfig(
            bootstrap_servers="localhost:9092",
            group_id="my-group"
        )
    )

    await client.connect()

    async def process_message(message):
        # Simulate async processing
        await asyncio.sleep(0.1)
        return f"Processed: {message}"

    try:
        tasks = []
        async for message in client:
            task = asyncio.create_task(process_message(message))
            tasks.append(task)

            # Limit concurrent tasks
            if len(tasks) >= 10:
                await asyncio.gather(*tasks)
                tasks = []
    finally:
        if tasks:
            await asyncio.gather(*tasks)
        await client.close()

Prefetch Settings

from zephcast.aio.rabbit.client import RabbitClient
from zephcast.aio.rabbit.config import RabbitConfig

client = RabbitClient(
    stream_name="my-routing-key",
    config=RabbitConfig(
        queue_name="my-queue",
        rabbitmq_url="amqp://guest:guest@localhost:5672/",
        prefetch_count=100  # Number of unacked messages
    )
)

Memory Management

Message Size Limits

from zephcast.aio.kafka.client import KafkaClient
from zephcast.aio.kafka.config import KafkaConfig

client = KafkaClient(
    stream_name="my-topic",
    config=KafkaConfig(
        bootstrap_servers="localhost:9092",
        max_request_size=1048576,  # 1MB
        message_max_bytes=1000000
    )
)

Stream Trimming (Redis)

from zephcast.aio.redis.client import RedisClient
from zephcast.aio.redis.config import RedisConfig

client = RedisClient(
    stream_name="my-stream",
    config=RedisConfig(
        redis_url="redis://localhost:6379",
        max_stream_length=1000,  # Keep only last 1000 messages
        approximate_trimming=True
    )
)

Monitoring

Performance Metrics

import time
from zephcast.kafka.async_client import AsyncKafkaClient

async def monitor_performance():
    client = AsyncKafkaClient(
        stream_name="my-topic",
        bootstrap_servers="localhost:9092"
    )

    await client.connect()

    try:
        # Monitor publishing performance
        start_time = time.time()
        message_count = 0

        for i in range(10000):
            await client.send(f"Message {i}")
            message_count += 1

            if message_count % 1000 == 0:
                elapsed = time.time() - start_time
                rate = message_count / elapsed
                print(f"Publishing rate: {rate:.2f} messages/second")
    finally:
        await client.close()

Best Practices

  1. Use appropriate batch sizes
  2. Enable message compression
  3. Configure connection pools
  4. Set proper prefetch counts
  5. Monitor memory usage
  6. Use parallel processing wisely
  7. Implement backpressure
  8. Monitor performance metrics
  9. Use appropriate timeouts
  10. Clean up resources properly