TheSingleConsumer
client class is used when you want to consume messages but don’t need to coordinate consumer instances amongst themselves. It’s importable via thekiel.clients
module and provides aconsume()
method capable of starting at the beginning or end of a topic or a givendatetime
ortimedelta
.
from kiel import clients
from tornado import gen
consumer = clients.SingleConsumer(
["kafka01", "kafka02"],
deserializer=None,
max_wait_time=1000, # in milliseconds
min_bytes=1,
max_bytes=(1024 * 1024),
)
@gen.coroutine
def run():
yield consumer.connect()
msgs = yield consumer.consume("example.topic")
for msg in msgs:
print(msg)
The only required constructor parameter is the list of bootstrap broker hosts.
Other than the topic to consume, the consume()
method also takes an optional
parameter of where in the topic’s history to start.
Note
The start
parameter is honored in only two cases
There are four different possible kinds of values:
SingleConsumer.END
(default)
This denotes the tail end of the topic, the consume()
call will return
messages once some are available.
SingleConsumer.BEGINNING
The very beginning of a topic (often 0). Useful for re-processing topics.
datetime
Starts consuming a topic at roughly the point it was at a given time (in UTC).
timedelta
Starts consuming a topic at roughly the point it was at a reliative time.
Warning
The time-based options rely on epoch seconds and are vulnerable to clock skew between brokers and client servers.
By default json.dumps
is used as a deserializer. This works in conjunction
with the default serializer on the Producer
class:
import random
from kiel import clients
from tornado import gen
producer = clients.Producer(["kafka01"])
consumer = clients.SingleConsumer(["kafka01"])
@gen.coroutine
def produce():
yield producer.connect()
while True:
yield producer.produce(
"example.colors", {"color": random.choice(["blue", "red"])}
)
@gen.coroutine
def consume():
yield consumer.connect()
while True:
msgs = yield consumer.consume("example.colors")
for msg in msgs:
print(msg["color"])
Deserializing can be customized via the deserializer
constructor parameter.
The given callable will be passed a message’s value as a single argument.
A trivial example where messages are rot-13 encoded:
import codecs
from kiel import clients
from tornado import gen
def deserialize(value):
return codecs.decode(value, "rot_13")
consumer = clients.SingleConsumer(["kafka01"], deserializer=deserialize)
@gen.coroutine
def consume():
yield consumer.connect()
while True:
msgs = yield consumer.consume("example.colors")
for msg in msgs:
print(msg["color"])
The size window of responses can be controlled via the min_bytes
and
max_bytes
constructor arguments. These direct the Kafka brokers to
not respond until at least min_bytes
of data is present and to
construct responses no greater max_bytes
.
Note
The max_bytes
directive isn’t exact as it only limits the data in
the partition clauses of responses, there will still be other overhead.
The Kafka protocol does not recognize an overal “max bytes” setting but
has a per partition maximum, which the consumer calculates as
max_bytes
/ number of partitions.
This can be helpful for consumers starting from the beginning of a large topic and must throttle the otherwise-massive initial responses.
from kiel import clients
from tornado import gen
consumer = clients.SingleConsumer(
["kafka01"],
min_bytes=1024,
max_bytes=(10 * 1024 * 1024)
)
@gen.coroutine
def start_from_beginning():
yield consumer.connect()
msgs = yield consumer.consume("example.topic", start=consumer.BEGINNING)
while msgs:
# process msgs, etc.
msgs = yield consumer.consume("example.topic", start=consumer.BEGINNING)
The max_wait_time
constructor argument can be used to tell brokers how long
the consumer is willing to wait for data. If the max_wait_time
is reached
before data is available the broker will respond with a retriable “timeout” error
code and the consume()
call will return with an empty list.
Kafka bakes compression into the wire protocol itself so the consumer classes take care of decompression for you.
Warning
Naturally, if you’re using compression schemes with external dependencies (i.e. non-gzip schemes) when producing messages your consumers must also have those dependencies installed!