kiel.clients.producer

class kiel.clients.producer.Producer(brokers, serializer=None, key_maker=None, partitioner=None, batch_size=1, compression=None, required_acks=-1, ack_timeout=500)[source]

Bases: kiel.clients.client.Client

Client class used to “produce” messages to Kafka topics.

Allows for customizing the serializer, key_maker and partitioner functions. By default a JSON serializer is used, along with a no-op key maker and a partitioner that chooses at random.

unsent_count

Property representing the sum total of pending messages to be sent.

produce(*args, **kwargs)[source]

Primary method that queues messages up to be flushed to the brokers.

Performs sanity checks to make sure we’re not closing and that the topic given is known.

If the topic given is not known, the heal() method on the cluster is called and the check is performed again.

Depending on the batch_size attribute this call may not actually send any requests and merely keeps the pending messages in the unsent structure.

queue_retries(topic, msgs)[source]

Re-inserts the given messages into the unsent structure.

This also sets the flag to denote that a cluster “heal” is necessary.

flush(*args, **kwargs)[source]

Transforms the unsent structure to produce requests and sends them.

The first order of business is to order the pending messages in unsent based on partition leader. If a message’s partition leader is not a know broker, the message is queued up to be retried and the flag denoting that a cluster heal() call is needed is set.

Once the legitimate messages are ordered, instances of ProduceRequest are created for each broker and sent.

handle_produce_response(response)[source]

Handler for produce api responses, discards or retries as needed.

For the “no error” result, the corresponding messages are discarded from the sent structure.

For retriable error codes the affected messages are queued up to be retried.

Warning

For fatal error codes the error is logged and no further action is taken. The affected messages are not retried and effectively written over with the next call to produce().

wind_down(*args, **kwargs)[source]

Flushes the unsent messages so that none are lost when closing down.