The Producer Client

The producer client is used to produce messages to any number of Kafka topics. The Producer class can be imported via the kiel.clients module, the API for the Producer class is very simple and mostly a matter of instantiating the class with a list of bootstrap brokers plus the serializer/key maker combo that fits the use-case then yielding to the connect() method then producing via the produce() method:
from kiel import clients
from tornado import gen

producer = clients.Producer(
    ["kafka01", "kafka02"],
    key_maker=None,
    partitioner=None,
    serializer=None,
    compression=None,
    batch_size=1,
    required_acks=1,
    ack_timeout=500,  # milliseconds
)

@gen.coroutine
def run():
    yield producer.connect()
    yield producer.produce("example.topic", {"my": "message"})

The only required constructor parameter is the list of bootstrap broker hosts. These will be used to retrieve broker and topic metadata, each bootstrap host is attempted one at a time in order until a successful metadata response.

Which Message Goes Where

In any non-trivial setup the producer will need to be configured with a partitioning strategy. That is, the producer will need to be configured to send messages to their intended Kafka topic partition. This is done by specifying The Key Maker and The Partitioner.

The Key Maker

The “key maker” is a function that takes a message as a single argument and returns a “key” value. This key value will then in turn be used by The Partitioner

For example, messages tracking user activity might have something like a user_id field that would be handy to key off of:

def key_maker(msg):
    return msg.get("user_id")

This key value would be stored in Kafka along with the full message value.

Warning

It’s important to have the key_maker function be as resilient as possible (e.g. the example used .get() rather than indexing). If a message came through without a user_id the produce() call would fail with a KeyError.

If no key maker is given, the default function generates None for all messages.

The Partitioner

The “partitioner” is a function that takes two arguments: the key value of a message as generated by The Key Maker and a list of the partition numbers of the target Kafka topic, and is expected to return one of the partition numbers.

A simple example that expects a numeric key value and uses the modulo operator:

def partitioner(key, partitions):
    return partitions[key % len(partitions)]

A modulo strategy used along with an incrementing key value is a good way to spread messages across partitions evenly.

Note

The number of partitions for a topic can change over time, if you rely on messages with the same key always winding up in the same place you may want to look into a consistent hashing strategy (e.g. with the hash_ring module).

If no partitioner is given, the default function chooses a random partition.

Modulo Strategy Example

This strategy spreads messages as evenly as possible, using a counter variable so that the messages’ key space isn’t polluted with a counter.

from tornado import gen
from kiel import clients

counter = 0

def key_maker(msg):
    global counter

    counter += 1
    return counter

def partitioner(key, partitions):
    return partitions[key % len(partitions)]

@gen.coroutine
def produce():
    p = clients.Producer(
        ["kafka01"], key_maker=key_maker, partitioner=partitioner
    )

    yield p.connect()

    while True:
        yield p.produce("example.topic", {"how": "now", "brown": "cow"}

Consistent Hashing Example

This strategy attempts to consistently choose the same partition based on the key value (in this case a user_id).

from hash_ring import HashRing
from tornado import gen
from kiel import clients

# this could be simplified to a simple operator.itemgetter("user_id")
def key_maker(msg):
    return msg["user_id"]

def partitioner(key, partitions):
    ring = HashRing(partitions)
    return ring.get_node(key)

p = clients.Producer(
    ["kafka01"], key_maker=key_maker, partitioner=partitioner
)

# meanwhile in some handler somewhere...
@gen.coroutine
def get():
    # do some stuff...
    yield p.produce("activity.logins", {"user_id": self.user_id})

Compression and Serialization

There are two options that determine what exactly gets copied into Kafka: the serializer function and the compression choice.

The Serializer

This simple function takes a single message object and returns a string representation of the message.

Messages don’t need to be dictionaries, but they do need to be serializeable in order to be passed onto Kafka.

Custom message example:

import json
from kiel import clients


class Thing(object):

    def __init__(self, stuff):
        self.stuff = stuff

    # etc...

    def serialize(self):
        return json.dumps({"stuff": self.stuff})


def serializer(msg):
    return msg.serializer()

p = clients.Producer(["broker01"], serializer=serializer)


@gen.coroutine
def produce():
    yield p.connect()

    thing = Thing(stuff="foo")

    yield p.produce("example.things", thing)

Note

The default serializer is a json_serializer` that merely calls ``json.dumps(msg). Note that this assumes messages that are json serializeable (i.e. dictonaries).

Compression Choices

There are three total compression options available:

These are specified via special constants, found in the kiel.constants module:

from kiel import clients, constants

# with gzip
p = clients.Producer(["kafka01"], compression=constants.GZIP)

# with snappy
p = clients.Producer(["kafka01"], compression=constants.SNAPPY)

The gzip option has no dependencies as the python standard library includes a gzip module. The snappy module however requires python-snappy to be installed (which in turn requires the snappy library and the cffi module to be installed).

By default no compression scheme is used.

Note

If you use the snappy compression option, any consumer clients of your messages must also have the snappy dependencies installed.

Batch Size and ACKs

Options relating to batching and ACKs (i.e. how many brokers ACKnowledge a message before returning) can have a big effect on throughput. No system is created equal so the best strategy is to start with a baseline and tweak the options until a happy throughput/latency ratio is met.

Batch Size

This simple integer option determines how many messages to to “flush” to the brokers. Kafka allows for a single request to contain an arbitrary number of messages targeting any number of topic/partitions.

import random
from tornado import gen
from kiel import clients, constants

# send batches of 10 messages, gzip'ed
p = clients.Producer(["kafka01"], batch_size=10, compression=constants.GZIP)

@gen.coroutine
def send():
    while True:
        yield p.produce(
            "topics.colors", {"color": random.choice(["red", "blue", "white"]
        )

Batching is very useful when used in conjunction with the Compression Choices as sets of messages sent to the same partition will be compressed together which is much more efficient.

Warning

Brokers limit the maximum size of accepted requests (via the message.max.bytes option, see broker config docs). At this time the producer isn’t smart enough to split up the requests, but will continually log an error message each time the error response is recieved from the broker.

Required ACKs

This required_acks option can determine how “durable” the storage of a message is. There are two useful values: 1 and -1.

1: This more or less means “consider the message committed once the target broker has it”. It can increase throughput but at a greater risk of data loss if brokers go down.

-1: This value tells kafka to not consider a message “committed” until all in-sync replicas acknowledge it.

Warning

Up until Kafka 0.9 it’s been possible to use other values for this option, but that is changing and in future versions of Kafka a “require acks” value > 1 will cause an exception. See KIP-1 for details

ACK Timeout

The ack_timeout value tells the recieving brokers how long they can wait for other acknowledgements. The timeout is not exact, from the protocol docs:

  1. it does not include network latency,
  2. the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included
  3. we will not terminate a local write so if the local write time exceeds this timeout it will not be respected