kiel.zookeeper.allocator
¶kiel.zookeeper.allocator.
PartitionAllocator
(zk_hosts, group_name, consumer_name, allocator_fn, on_rebalance=None)[source]¶Bases: object
Helper class that uses Zookeeper to allocate partitions among consumers.
Uses a Party
instance to represent the group membership and a
SharedSet
instance to handle the set of partitions to be allocated.
The allocator_fn
argument is a callable that is passed a sorted list
of members and partitions whenever change to either happens.
Note
It is incredibly important that the allocator_fn
be stable! All
all of the instances of the allocator must agree on what partitions go
where or all hell will break loose.
allocation
¶Property representing the topics allocated for a specific consumer.
members_path
¶Property representing the znode path of the member Party
.
partition_path
¶Property representing the znode path of the SharedSet
.
start
(seed_partitions)[source]¶Connects to zookeeper and collects member and partition data.
Leverages the create_attempt()
and wait_on_event()
helper
functions in order to bridge the gap between threaded async
and tornado async.
Returns a Future
instance once done so that coroutine
methods may yield to it.
stop
()[source]¶Signals the Party
that this member has left and closes connections.
This method returns a Future
so that it can be yielded to in
coroutines.
handle_connection_change
(state)[source]¶Handler for changes to the kazoo client’s connection’s state.
Responsible for updating the connected
threading event such that
it is only set if/when the kazoo connection is live.
on_group_members_change
(new_members)[source]¶Callback for when membership of the Party
changes.
Sets the self.members
attribute if membership actually
changed, calling rebalance()
if so.
Sets the members_collected
threading event when done.
on_partition_change
(new_partitions)[source]¶Callback for when data in the SharedSet
changes.
If new_partitions
is None
it means we’re the first to
use the SharedSet
so we populate it with our known partitions.
If the data has been altered in any way the self.partitions
attribute is updated and rebalance()
called.
Sets the partitions_collected
threading event when done.
add_partitions
(partitions)[source]¶Ensures that the SharedSet
contains the given partitions.
The partitions
argument should be a dictionary keyed on
topic names who’s values are lists of associated partition IDs.