kiel.zookeeper.allocator

class kiel.zookeeper.allocator.PartitionAllocator(zk_hosts, group_name, consumer_name, allocator_fn, on_rebalance=None)[source]

Bases: object

Helper class that uses Zookeeper to allocate partitions among consumers.

Uses a Party instance to represent the group membership and a SharedSet instance to handle the set of partitions to be allocated.

The allocator_fn argument is a callable that is passed a sorted list of members and partitions whenever change to either happens.

Note

It is incredibly important that the allocator_fn be stable! All all of the instances of the allocator must agree on what partitions go where or all hell will break loose.

allocation

Property representing the topics allocated for a specific consumer.

members_path

Property representing the znode path of the member Party.

partition_path

Property representing the znode path of the SharedSet.

start(seed_partitions)[source]

Connects to zookeeper and collects member and partition data.

Leverages the create_attempt() and wait_on_event() helper functions in order to bridge the gap between threaded async and tornado async.

Returns a Future instance once done so that coroutine methods may yield to it.

stop()[source]

Signals the Party that this member has left and closes connections.

This method returns a Future so that it can be yielded to in coroutines.

connect()[source]

Establishes the kazoo connection and registers the connection handler.

handle_connection_change(state)[source]

Handler for changes to the kazoo client’s connection’s state.

Responsible for updating the connected threading event such that it is only set if/when the kazoo connection is live.

on_group_members_change(new_members)[source]

Callback for when membership of the Party changes.

Sets the self.members attribute if membership actually changed, calling rebalance() if so.

Sets the members_collected threading event when done.

on_partition_change(new_partitions)[source]

Callback for when data in the SharedSet changes.

If new_partitions is None it means we’re the first to use the SharedSet so we populate it with our known partitions.

If the data has been altered in any way the self.partitions attribute is updated and rebalance() called.

Sets the partitions_collected threading event when done.

add_partitions(partitions)[source]

Ensures that the SharedSet contains the given partitions.

The partitions argument should be a dictionary keyed on topic names who’s values are lists of associated partition IDs.

remove_partitions(old_partitions)[source]

Ensures that the SharedSet does not contain the given partitions.

The partitions argument should be a dictionary keyed on topic names who’s values are lists of associated partition IDs.

rebalance()[source]

Callback fired when membership or partition data changes.

The allocator_fn is called on the new self.members and self.partitions lists to determine the mapping of members to partitions.

If an on_rebalance callback is configured it is called once done.

kiel.zookeeper.allocator.create_attempter(f)[source]

Helper method for methods that call others and use Future directly.

Returns a wrapper function that will set the given Future‘s exception state if the inner function call fails.