kiel.clients.single

class kiel.clients.single.SingleConsumer(brokers, deserializer=None, max_wait_time=1000, min_bytes=1, max_bytes=1048576)[source]

Bases: kiel.clients.consumer.BaseConsumer

Usable consumer class for isolated-consumer use cases.

By “isolated” consumer, that means that the consumer runs independently of other consumers and does not need to apportion work among others.

Uses the basic offset api to determine topic/partition offsets.

BEGINNING = -2

special offset api value for ‘beginning offset’

END = -1

special offset api value for ‘very latest offset’

allocation

For single consumers the allocation is all topics and partitions.

determine_offsets(*args, **kwargs)[source]

Sends OffsetRequests to the cluster for a given topic and start point.

The start parameter can be any of of datetime.datetime, datetime.timedelta or one of SingleConsumer.BEGINNING or SingleConsumer.END. The value is translated into epoch seconds if need be and used for the “time” parameter for the offset requests.

An offset request is sent to each of the leader brokers for the given topic.

handle_offset_response(response)[source]

Handles responses from the offset api and sets self.offsets values.

A succesful response will update the topic/partition pair entry in the self.offsets structure.

A retriable error code response will cause the cluster’s heal() method to be called at the end of processing and the offending topic’s offsets to be re-evaluated on the next consume() call.

wind_down(*args, **kwargs)[source]

The single consumer keeps little to no state so wind down is a no-op.

kiel.clients.single.start_to_timestamp(start)[source]

Helper method for translating “start” values into offset api values.

Valid values are instances of datetime.datetime, datetime.timedelta or one of SingleConsumer.BEGINNING or SingleConsumer.END.