The producer client is used to produce messages to any number of Kafka topics. TheProducer
class can be imported via thekiel.clients
module, the API for theProducer
class is very simple and mostly a matter of instantiating the class with a list of bootstrap brokers plus the serializer/key maker combo that fits the use-case then yielding to theconnect()
method then producing via theproduce()
method:
from kiel import clients
from tornado import gen
producer = clients.Producer(
["kafka01", "kafka02"],
key_maker=None,
partitioner=None,
serializer=None,
compression=None,
batch_size=1,
required_acks=1,
ack_timeout=500, # milliseconds
)
@gen.coroutine
def run():
yield producer.connect()
yield producer.produce("example.topic", {"my": "message"})
The only required constructor parameter is the list of bootstrap broker hosts. These will be used to retrieve broker and topic metadata, each bootstrap host is attempted one at a time in order until a successful metadata response.
In any non-trivial setup the producer will need to be configured with a partitioning strategy. That is, the producer will need to be configured to send messages to their intended Kafka topic partition. This is done by specifying The Key Maker and The Partitioner.
The “key maker” is a function that takes a message as a single argument and returns a “key” value. This key value will then in turn be used by The Partitioner
For example, messages tracking user activity might have something like a user_id
field that would be handy to key off of:
def key_maker(msg):
return msg.get("user_id")
This key value would be stored in Kafka along with the full message value.
Warning
It’s important to have the key_maker
function be as resilient as
possible (e.g. the example used .get()
rather than indexing). If a
message came through without a user_id
the produce()
call would fail
with a KeyError
.
If no key maker is given, the default function generates None
for all messages.
The “partitioner” is a function that takes two arguments: the key value of a message as generated by The Key Maker and a list of the partition numbers of the target Kafka topic, and is expected to return one of the partition numbers.
A simple example that expects a numeric key value and uses the modulo operator:
def partitioner(key, partitions):
return partitions[key % len(partitions)]
A modulo strategy used along with an incrementing key value is a good way to spread messages across partitions evenly.
Note
The number of partitions for a topic can change over time, if you rely on messages with the same key always winding up in the same place you may want to look into a consistent hashing strategy (e.g. with the hash_ring module).
If no partitioner is given, the default function chooses a random partition.
This strategy spreads messages as evenly as possible, using a counter variable so that the messages’ key space isn’t polluted with a counter.
from tornado import gen
from kiel import clients
counter = 0
def key_maker(msg):
global counter
counter += 1
return counter
def partitioner(key, partitions):
return partitions[key % len(partitions)]
@gen.coroutine
def produce():
p = clients.Producer(
["kafka01"], key_maker=key_maker, partitioner=partitioner
)
yield p.connect()
while True:
yield p.produce("example.topic", {"how": "now", "brown": "cow"}
This strategy attempts to consistently choose the same partition based
on the key value (in this case a user_id
).
from hash_ring import HashRing
from tornado import gen
from kiel import clients
# this could be simplified to a simple operator.itemgetter("user_id")
def key_maker(msg):
return msg["user_id"]
def partitioner(key, partitions):
ring = HashRing(partitions)
return ring.get_node(key)
p = clients.Producer(
["kafka01"], key_maker=key_maker, partitioner=partitioner
)
# meanwhile in some handler somewhere...
@gen.coroutine
def get():
# do some stuff...
yield p.produce("activity.logins", {"user_id": self.user_id})
There are two options that determine what exactly gets copied into Kafka: theserializer
function and thecompression
choice.
This simple function takes a single message object and returns a string representation of the message.
Messages don’t need to be dictionaries, but they do need to be serializeable in order to be passed onto Kafka.
Custom message example:
import json
from kiel import clients
class Thing(object):
def __init__(self, stuff):
self.stuff = stuff
# etc...
def serialize(self):
return json.dumps({"stuff": self.stuff})
def serializer(msg):
return msg.serializer()
p = clients.Producer(["broker01"], serializer=serializer)
@gen.coroutine
def produce():
yield p.connect()
thing = Thing(stuff="foo")
yield p.produce("example.things", thing)
Note
The default serializer is a json_serializer` that merely calls
``json.dumps(msg)
. Note that this assumes messages that are json
serializeable (i.e. dictonaries).
There are three total compression options available:
These are specified via special constants, found in the kiel.constants
module:
from kiel import clients, constants
# with gzip
p = clients.Producer(["kafka01"], compression=constants.GZIP)
# with snappy
p = clients.Producer(["kafka01"], compression=constants.SNAPPY)
The gzip option has no dependencies as the python standard library includes a
gzip
module. The snappy module however requires python-snappy to be
installed (which in turn requires the snappy library and the cffi
module to
be installed).
By default no compression scheme is used.
Note
If you use the snappy compression option, any consumer clients of your messages must also have the snappy dependencies installed.
Options relating to batching and ACKs (i.e. how many brokers ACKnowledge a message before returning) can have a big effect on throughput. No system is created equal so the best strategy is to start with a baseline and tweak the options until a happy throughput/latency ratio is met.
This simple integer option determines how many messages to to “flush” to the brokers. Kafka allows for a single request to contain an arbitrary number of messages targeting any number of topic/partitions.
import random
from tornado import gen
from kiel import clients, constants
# send batches of 10 messages, gzip'ed
p = clients.Producer(["kafka01"], batch_size=10, compression=constants.GZIP)
@gen.coroutine
def send():
while True:
yield p.produce(
"topics.colors", {"color": random.choice(["red", "blue", "white"]
)
Batching is very useful when used in conjunction with the Compression Choices as sets of messages sent to the same partition will be compressed together which is much more efficient.
Warning
Brokers limit the maximum size of accepted requests (via the
message.max.bytes
option, see broker config docs). At this time the producer
isn’t smart enough to split up the requests, but will continually log an error
message each time the error response is recieved from the broker.
This required_acks
option can determine how “durable” the storage of a message
is. There are two useful values: 1
and -1
.
1
: This more or less means “consider the message committed once the target
broker has it”. It can increase throughput but at a greater risk of data loss
if brokers go down.
-1
: This value tells kafka to not consider a message “committed” until all
in-sync replicas acknowledge it.
Warning
Up until Kafka 0.9 it’s been possible to use other values for this option, but that is changing and in future versions of Kafka a “require acks” value > 1 will cause an exception. See KIP-1 for details
The ack_timeout
value tells the recieving brokers how long they can wait for
other acknowledgements. The timeout is not exact, from the protocol docs:
- it does not include network latency,
- the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included
- we will not terminate a local write so if the local write time exceeds this timeout it will not be respected