kiel.clients.consumer
¶kiel.clients.consumer.
BaseConsumer
(brokers, deserializer=None, max_wait_time=1000, min_bytes=1, max_bytes=1048576)[source]¶Bases: kiel.clients.client.Client
Base class for consumers, provides consume()
but no parition allocation.
Allows for customizing the deserialier
used. Default is a JSON
deserializer.
allocation
¶Property meant to denote which topics and partitions this consumer should be aware of.
This is left to subclasses to implement, as it is one of the main behavioral differences between a single consumer and a grouped consumer.
determine_offsets
(*args, **kwargs)[source]¶Subclass coroutine function for setting values in self.offsets
.
Kafka offers a simple “offset” api as well as a more involved set of offset fetch and commit apis. Determining which ones to use and how is left to the subclasses.
consume
(*args, **kwargs)[source]¶Fetches from a given topics returns a list of deserialized values.
If the given topic is not known to have synced offsets, a call to
determine_offsets()
is made first.
If a topic is unknown entirely the cluster’s heal()
method is
called and the check retried.
Since error codes and deserialization are taken care of by
handle_fetch_response
this method merely yields to wait on the
deserialized results and returns a flattened list.
handle_fetch_response
(response)[source]¶Handler for responses from the message “fetch” api.
Messages returned with the “no error” code are deserialized and collected, the full resulting list is returned.
A retriable error code will cause the cluster “heal” flag to be set.
An error indicating that the offset used for the partition was out
of range will cause the offending topic’s offsets to be redetermined
on the next call to consume()
.
Note
This class and its subclasses assume that fetch requests are made on one topic at a time, so this handler only deals with the first topic returned.
deserialize_messages
(topic_name, partition)[source]¶Calls the deserializer
on each Message
value and gives the
result.
If an error is encountered when deserializing it is logged and the offending message is skipped.
After each successful deserialization the self.offsets
entry for
the particular topic/partition pair is incremented.