kiel.clients.grouped
¶kiel.clients.grouped.
GroupedConsumer
(brokers, group, zk_hosts, deserializer=None, partition_allocator=None, autocommit=True, max_wait_time=1000, min_bytes=1, max_bytes=1048576)[source]¶Bases: kiel.clients.consumer.BaseConsumer
Consumer class with coordinated resource allocation among like members.
Uses an instance of a PartitionAllocator
to determine which topics and
partitions to consume. Whenever the allocation is rebalanced, each
consumed topic will have its partition offsets re-determined.
Constructed similarly to the SingleConsumer
class except for extra
paramters group
, zk_hosts
, partition_allocator
and
autocommit
.
allocation
¶Proxy property for the topics/partitions determined by the allocator.
connect
(*args, **kwargs)[source]¶Overriding connect()
that handles the allocator and coordinator.
Simple augmentation of the base class method that starts the allocator
and calls determine_coordinator()
.
consume
(*args, **kwargs)[source]¶Overriding consume()
that handles committing offsets.
This is where the autocommit
flag comes into play. If the flag
is set we call commit_offsets()
here right off the bat.
determine_coordinator
(*args, **kwargs)[source]¶Determines the ID of the broker that coordinates the group.
Uses the “consumer metadata” api to do its thing. All brokers contain coordinator metadata so each broker in the cluster is tried until one works.
handle_group_coordinator_response
(response)[source]¶Handler for consumer metadata api responses.
These responses are relatively simple and successful ones merely list the ID, host and port of the coordinator.
Returns True
if the coordinator was deterimend, False
if not.
determine_offsets
(*args, **kwargs)[source]¶Fetches offsets for a given topic via the “offset fetch” api.
Simple matter of sending an OffsetFetchRequest to the coordinator broker.
Note
The start
argument is actually ignored, it exists so that the
signature remains consistent with the other consumer classes.
handle_offset_fetch_response
(response)[source]¶Handler for offset fetch api responses.
Sets the corresponding entry in the self.offsets
structure for
successful partition responses.
Raises a NoOffsetsError
exception if a fatal, non-retriable error
is encountered.
Returns True
if the operation should be retried, False
if not.
commit_offsets
(*args, **kwargs)[source]¶Notifies Kafka that the consumer’s messages have been processed.
Uses the “v0” version of the offset commit request to maintain compatability with clusters running 0.8.1.
handle_offset_commit_response
(response)[source]¶Handles responses from the “offset commit” api.
For successful responses the affected topics are dropped from the set of topics that need commits.
In the special case of an offset_metadata_too_large
error code
the commit is retried with a blank metadata string.
kiel.clients.grouped.
naive_allocator
(members, partitions)[source]¶Default allocator with a round robin approach.
In this algorithm, each member of the group is cycled over and given a partition until there are no partitions left. This assumes roughly equal capacity for each member and aims for even distribution of partition counts.
Does not take into account incidental clustering of partitions within the same topic.