kiel.connection

class kiel.connection.Connection(host, port)[source]

Bases: object

This class represents a single connection to a single broker host.

Does not protect against any exceptions when connecting, those are expected to be handled by the cluster object.

The main use of this class is the send() method, used to send protocol request classes over the wire.

Note

This is the only class where the correlation_id should be used. These IDs are used to correlate requests and responses over a single connection and are meaningless outside said connection.

connect(*args, **kwargs)[source]

Connects to the broker host and fires the read_loop callback.

The socket is wrapped in a tornado iostream.IOStream to take advantage of its handy async methods.

close()[source]

Sets the closing attribute to True and calls close() on the underlying stream.

socket_error_handling(*args, **kwds)[source]

helper contextmanager for handling errors during IOStream operations.

Handles the StreamClosedError case by setting the closing flag, logs any unexpected exceptions with a failure message.

send(message)[source]

Sends a serialized request to the broker and returns a pending future.

If any error occurs when writing immediately or asynchronously, the abort() method is called.

The retured Future is stored in the self.pending dictionary keyed on correlation id, so that clients can say

response = yield conn.send(message)

and expect the correctly correlated response (or a raised exception) regardless of when the broker responds.

read_loop(*args, **kwargs)[source]

Infinite loop that reads messages off of the socket while not closed.

When a message is received its corresponding pending Future is set to have the message as its result.

This is never used directly and is fired as a separate callback on the I/O loop via the connect() method.

abort()[source]

Aborts a connection and puts all pending futures into an error state.

If sys.exc_info() is set (i.e. this is being called in an exception handler) then pending futures will have that exc info set. Otherwise a BrokerConnectionError is used.

read_message(*args, **kwargs)[source]

Constructs a response class instance from bytes on the stream.

Works by leveraging the IOStream.read_bytes() method. Steps:

  1. first the size of the entire payload is pulled
  2. then the correlation id so that we can match this response to the corresponding pending Future
  3. the api of the resonse is looked up via the correlation id
  4. the corresponding response class’s deserialize() method is used to decipher the raw payload