The Single Consumer Client

The SingleConsumer client class is used when you want to consume messages but don’t need to coordinate consumer instances amongst themselves. It’s importable via the kiel.clients module and provides a consume() method capable of starting at the beginning or end of a topic or a given datetime or timedelta.
from kiel import clients
from tornado import gen

consumer = clients.SingleConsumer(
    ["kafka01", "kafka02"],
    deserializer=None,
    max_wait_time=1000,  # in milliseconds
    min_bytes=1,
    max_bytes=(1024 * 1024),
)

@gen.coroutine
def run():
    yield consumer.connect()
    msgs = yield consumer.consume("example.topic")
    for msg in msgs:
        print(msg)

The only required constructor parameter is the list of bootstrap broker hosts.

Where to Start

Other than the topic to consume, the consume() method also takes an optional parameter of where in the topic’s history to start.

Note

The start parameter is honored in only two cases

  • when consuming from a topic for the first time
  • an “offset out of range” error is encountered.

There are four different possible kinds of values:

  • SingleConsumer.END (default)

    This denotes the tail end of the topic, the consume() call will return messages once some are available.

  • SingleConsumer.BEGINNING

    The very beginning of a topic (often 0). Useful for re-processing topics.

  • datetime

    Starts consuming a topic at roughly the point it was at a given time (in UTC).

  • timedelta

    Starts consuming a topic at roughly the point it was at a reliative time.

Warning

The time-based options rely on epoch seconds and are vulnerable to clock skew between brokers and client servers.

The Deserializer

The JSON Default

By default json.dumps is used as a deserializer. This works in conjunction with the default serializer on the Producer class:

import random

from kiel import clients
from tornado import gen

producer = clients.Producer(["kafka01"])
consumer = clients.SingleConsumer(["kafka01"])

@gen.coroutine
def produce():
    yield producer.connect()
    while True:
        yield producer.produce(
            "example.colors", {"color": random.choice(["blue", "red"])}
        )

@gen.coroutine
def consume():
    yield consumer.connect()
    while True:
        msgs = yield consumer.consume("example.colors")
        for msg in msgs:
            print(msg["color"])

Customizing

Deserializing can be customized via the deserializer constructor parameter. The given callable will be passed a message’s value as a single argument.

A trivial example where messages are rot-13 encoded:

import codecs

from kiel import clients
from tornado import gen


def deserialize(value):
    return codecs.decode(value, "rot_13")

consumer = clients.SingleConsumer(["kafka01"], deserializer=deserialize)

@gen.coroutine
def consume():
    yield consumer.connect()
    while True:
        msgs = yield consumer.consume("example.colors")
        for msg in msgs:
            print(msg["color"])

Limiting Responses

Max and Min Bytes

The size window of responses can be controlled via the min_bytes and max_bytes constructor arguments. These direct the Kafka brokers to not respond until at least min_bytes of data is present and to construct responses no greater max_bytes.

Note

The max_bytes directive isn’t exact as it only limits the data in the partition clauses of responses, there will still be other overhead. The Kafka protocol does not recognize an overal “max bytes” setting but has a per partition maximum, which the consumer calculates as max_bytes / number of partitions.

This can be helpful for consumers starting from the beginning of a large topic and must throttle the otherwise-massive initial responses.

from kiel import clients
from tornado import gen

consumer = clients.SingleConsumer(
    ["kafka01"],
    min_bytes=1024,
    max_bytes=(10 * 1024 * 1024)
)

@gen.coroutine
def start_from_beginning():
    yield consumer.connect()

    msgs = yield consumer.consume("example.topic", start=consumer.BEGINNING)
    while msgs:
        # process msgs, etc.
        msgs = yield consumer.consume("example.topic", start=consumer.BEGINNING)

Response Wait Time

The max_wait_time constructor argument can be used to tell brokers how long the consumer is willing to wait for data. If the max_wait_time is reached before data is available the broker will respond with a retriable “timeout” error code and the consume() call will return with an empty list.

Compression

Kafka bakes compression into the wire protocol itself so the consumer classes take care of decompression for you.

Warning

Naturally, if you’re using compression schemes with external dependencies (i.e. non-gzip schemes) when producing messages your consumers must also have those dependencies installed!