kiel.clients.grouped

class kiel.clients.grouped.GroupedConsumer(brokers, group, zk_hosts, deserializer=None, partition_allocator=None, autocommit=True, max_wait_time=1000, min_bytes=1, max_bytes=1048576)[source]

Bases: kiel.clients.consumer.BaseConsumer

Consumer class with coordinated resource allocation among like members.

Uses an instance of a PartitionAllocator to determine which topics and partitions to consume. Whenever the allocation is rebalanced, each consumed topic will have its partition offsets re-determined.

Constructed similarly to the SingleConsumer class except for extra paramters group, zk_hosts, partition_allocator and autocommit.

allocation

Proxy property for the topics/partitions determined by the allocator.

connect(*args, **kwargs)[source]

Overriding connect() that handles the allocator and coordinator.

Simple augmentation of the base class method that starts the allocator and calls determine_coordinator().

consume(*args, **kwargs)[source]

Overriding consume() that handles committing offsets.

This is where the autocommit flag comes into play. If the flag is set we call commit_offsets() here right off the bat.

determine_coordinator(*args, **kwargs)[source]

Determines the ID of the broker that coordinates the group.

Uses the “consumer metadata” api to do its thing. All brokers contain coordinator metadata so each broker in the cluster is tried until one works.

handle_group_coordinator_response(response)[source]

Handler for consumer metadata api responses.

These responses are relatively simple and successful ones merely list the ID, host and port of the coordinator.

Returns True if the coordinator was deterimend, False if not.

determine_offsets(*args, **kwargs)[source]

Fetches offsets for a given topic via the “offset fetch” api.

Simple matter of sending an OffsetFetchRequest to the coordinator broker.

Note

The start argument is actually ignored, it exists so that the signature remains consistent with the other consumer classes.

handle_offset_fetch_response(response)[source]

Handler for offset fetch api responses.

Sets the corresponding entry in the self.offsets structure for successful partition responses.

Raises a NoOffsetsError exception if a fatal, non-retriable error is encountered.

Returns True if the operation should be retried, False if not.

commit_offsets(*args, **kwargs)[source]

Notifies Kafka that the consumer’s messages have been processed.

Uses the “v0” version of the offset commit request to maintain compatability with clusters running 0.8.1.

handle_offset_commit_response(response)[source]

Handles responses from the “offset commit” api.

For successful responses the affected topics are dropped from the set of topics that need commits.

In the special case of an offset_metadata_too_large error code the commit is retried with a blank metadata string.

wind_down(*args, **kwargs)[source]

Winding down calls stop() on the allocator.

kiel.clients.grouped.naive_allocator(members, partitions)[source]

Default allocator with a round robin approach.

In this algorithm, each member of the group is cycled over and given a partition until there are no partitions left. This assumes roughly equal capacity for each member and aims for even distribution of partition counts.

Does not take into account incidental clustering of partitions within the same topic.