kiel.clients.consumer

class kiel.clients.consumer.BaseConsumer(brokers, deserializer=None, max_wait_time=1000, min_bytes=1, max_bytes=1048576)[source]

Bases: kiel.clients.client.Client

Base class for consumers, provides consume() but no parition allocation.

Allows for customizing the deserialier used. Default is a JSON deserializer.

allocation

Property meant to denote which topics and partitions this consumer should be aware of.

This is left to subclasses to implement, as it is one of the main behavioral differences between a single consumer and a grouped consumer.

determine_offsets(*args, **kwargs)[source]

Subclass coroutine function for setting values in self.offsets.

Kafka offers a simple “offset” api as well as a more involved set of offset fetch and commit apis. Determining which ones to use and how is left to the subclasses.

consume(*args, **kwargs)[source]

Fetches from a given topics returns a list of deserialized values.

If the given topic is not known to have synced offsets, a call to determine_offsets() is made first.

If a topic is unknown entirely the cluster’s heal() method is called and the check retried.

Since error codes and deserialization are taken care of by handle_fetch_response this method merely yields to wait on the deserialized results and returns a flattened list.

handle_fetch_response(response)[source]

Handler for responses from the message “fetch” api.

Messages returned with the “no error” code are deserialized and collected, the full resulting list is returned.

A retriable error code will cause the cluster “heal” flag to be set.

An error indicating that the offset used for the partition was out of range will cause the offending topic’s offsets to be redetermined on the next call to consume().

Note

This class and its subclasses assume that fetch requests are made on one topic at a time, so this handler only deals with the first topic returned.

deserialize_messages(topic_name, partition)[source]

Calls the deserializer on each Message value and gives the result.

If an error is encountered when deserializing it is logged and the offending message is skipped.

After each successful deserialization the self.offsets entry for the particular topic/partition pair is incremented.