
class kiel.cluster.Cluster(bootstrap_hosts)[source]

Bases: object

Class representing a Kafka cluster.

Handles a dict of Connection objects, one for each known broker and keyed off of the broker ID.

Also keeps metadata information for topics, their partitions, and the partition leader brokers.


Proxies to the __getitem__ of the underlying conns dictionary.

Allows for the client to say


and such.


Proxies the __contains__ method of the conns dictionary.

Allows for the client to test if a broker is present via

broker_id in self.cluster

Procies the __iter__ method of the conns dictionary.

In effect allows for iterating over known broker_id values:

for broker_id in self.cluster:
get_leader(topic, partition_id)[source]

Returns the leader broker ID for a given topic/partition combo.

start(*args, **kwargs)[source]

Establishes connections to the brokers in a cluster as well as gathers topic/partition metadata.

Cycles through each bootstrap host and attempts to send a metadata request. Once a metadata request is successful the heal() method is called.

heal(*args, **kwargs)[source]

Syncs the state of the cluster with metadata retrieved from a broker.

If not response argument is given, a call to get_metatadata() fetches fresh information.

As a first step this will cull any closing/aborted connections from the cluster. This is followed by repeated calls to process_brokers() and process_topics() until both signal that there are no missing brokers or topics.

get_metadata(*args, **kwargs)[source]

Retrieves metadata from a broker in the cluster, optionally limited to a set of topics.

Each connection in the cluster is tried until one works. If no connection in the cluster responds, a NoBrokersError is raised.

process_brokers(*args, **kwargs)[source]

Syncs the self.conn connection dictionary with given broker metadata, returning a set of broker IDs that were in the metadata but had failing connections.

Known connections that are not present in the given metadata will have abort() called on them.


Syncs the cluster’s topic/partition metadata with a given response. Returns a set of topic names that were either missing data or had unknown leader IDs.

Works by iterating over the topic metadatas and their partitions, checking for error codes and a connection matching the leader ID.

Once complete the self.topics and self.leaders dictonaries are set with the newly validated information.


Simple method that calls close() on each connection.