kiel.cluster

class kiel.cluster.Cluster(bootstrap_hosts)[source]

Bases: object

Class representing a Kafka cluster.

Handles a dict of Connection objects, one for each known broker and keyed off of the broker ID.

Also keeps metadata information for topics, their partitions, and the partition leader brokers.

__getitem__(broker_id)[source]

Proxies to the __getitem__ of the underlying conns dictionary.

Allows for the client to say

self.cluster[broker_id].send()

and such.

__contains__(broker_id)[source]

Proxies the __contains__ method of the conns dictionary.

Allows for the client to test if a broker is present via

broker_id in self.cluster
__iter__()[source]

Procies the __iter__ method of the conns dictionary.

In effect allows for iterating over known broker_id values:

for broker_id in self.cluster:
get_leader(topic, partition_id)[source]

Returns the leader broker ID for a given topic/partition combo.

start(*args, **kwargs)[source]

Establishes connections to the brokers in a cluster as well as gathers topic/partition metadata.

Cycles through each bootstrap host and attempts to send a metadata request. Once a metadata request is successful the heal() method is called.

heal(*args, **kwargs)[source]

Syncs the state of the cluster with metadata retrieved from a broker.

If not response argument is given, a call to get_metatadata() fetches fresh information.

As a first step this will cull any closing/aborted connections from the cluster. This is followed by repeated calls to process_brokers() and process_topics() until both signal that there are no missing brokers or topics.

get_metadata(*args, **kwargs)[source]

Retrieves metadata from a broker in the cluster, optionally limited to a set of topics.

Each connection in the cluster is tried until one works. If no connection in the cluster responds, a NoBrokersError is raised.

process_brokers(*args, **kwargs)[source]

Syncs the self.conn connection dictionary with given broker metadata, returning a set of broker IDs that were in the metadata but had failing connections.

Known connections that are not present in the given metadata will have abort() called on them.

process_topics(response_topics)[source]

Syncs the cluster’s topic/partition metadata with a given response. Returns a set of topic names that were either missing data or had unknown leader IDs.

Works by iterating over the topic metadatas and their partitions, checking for error codes and a connection matching the leader ID.

Once complete the self.topics and self.leaders dictonaries are set with the newly validated information.

stop()[source]

Simple method that calls close() on each connection.