Consuming and Publishing Celery Tasks in C++ via AMQP

Celery is an asynchronous task queue based on distributed message passing. It is written in Python, but the protocol can be implemented in any language. However, there is currently no C++ client that is able to publish (send) and consume (receive) tasks. This is needed when your project is written in a combination of Python and C++, and you would like to process tasks in both of these languages. In the present post, I describe a way of interoperating between Python and C++ workers via the AMQP back-end (RabbitMQ).

Introduction

I expect you to have a basic understanding of Celery. If you have no experience with it, I recommend going through the official tutorial, which explains everything you will need to follow this blog post (choosing and installing a message broker, installing Celery, creating tasks, and starting a worker that executes the tasks).

Let’s start with the motivation. When your project is either entirely written in Python, or you do not need to send and process tasks in another language, the situation is easy: just use Celery and its Python worker. However, many projects are written in multiple languages, in which case you have to find a way of interoperating with Celery in them. For PHP, you can use celery-php, or for Node.js, you can use node-celery. One of the other languages that are commonly used together with Python is C++ because of its raw computing speed and support for proper parallel execution with threads. For C++ though, you are out of luck as there is currently no client able to interoperate with Celery.

In this post, I describe a way of sending and processing Celery tasks in C++ when using the AMQP back-end, implemented by the RabbitMQ message broker. This will allow you to send tasks from Python and process them in C++ or vice versa.

Alright, let’s dig in. First, we will take a look at the easier, Python part. Then, we will move to the harder, C++ part.

Python Part

We will implement a simple hello task that accepts two arguments, name (string) and age (int), and prints them to the standard output. After you understand the protocol, it is easy to realize more complex tasks.

Publishing Tasks

First, we need to setup a Celery application. We do this in a tasks.py module:

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest:guest@localhost:5672//')

I assume that you have a running RabbitMQ server on localhost, accessible via the guest account. Otherwise, you have to adjust the broker URI above.

Then, we define the task:

@app.task(ignore_result=True)
def hello(name, age):
	print('Hello {}. You are {} years old.'.format(name, age))

We will ignore task results, as result handling is out of scope for this post.

To publish a task (request its execution in a worker), run the Python interpreter and call hello.delay():

from tasks import hello

hello.delay('Petr', 30)

Note that we have to call hello via delay(). If we called just hello('Petr', 30), the task would be executed synchronously in the current process, which we do not want.

Consuming and Executing Tasks

To execute tasks, we need to have a worker running. Celery comes with a pre-packaged worker that you can start by executing the following command from your shell:

$ celery -A tasks worker --loglevel=info

To stop the worker, press Ctrl-C.

Try to publish a few tasks from another shell and watch them being executed by the worker.

C++ Part

For communication with our RabbitMQ server, we will use SimpleAmqpClient. It is is a C++ wrapper over the lower-level rabbitmq-c C library by the same author. According to its README, it is known to work under Linux, Windows, and Mac OS X.

Publishing Tasks

First, we need to figure out the format of task messages. Luckily, the format is documented here. We will use Version 2, which is the current default. If you want to check message creation in Celery source code, go to line 354 in the amqp.py module. To illustrate, I have included a screenshot of a sample message sent by Celery, taken from the RabbitMQ Web Management:



Great. Let’s start by creating a connection to our message broker (RabbitMQ):

auto channel = AmqpClient::Channel::Create(
	/*host*/"localhost",
	/*port*/5672,
	/*username*/"guest",
	/*password*/"guest",
	/*vhost*/"/"
);

The returned value from Create() is a channel handler wrapped in boost::shared_ptr. Beware that the channel is not thread-safe, so if you have a multithreaded application, each thread requires its own channel.

By the way, technically, in AMQP (the used protocol), a single connection can contain multiple channels, where channels can be thought of as “lightweight connections that share a single TCP connection”. However, the used AMQP library only allows creation of channels, not connections. I have included this note here to prevent confusion.

Next, we create the body of the message. We will assume that Celery uses the JSON serializer, which is the default. If you use a different serializer, you will need to adjust the code below. When using the JSON serializer, the body is a three-item array, where the first item contains positional arguments (args), the second item contains keyword arguments (kwargs), and the third item contains various other data, like callbacks. To pass task arguments, we will only need the first item:

auto body = json{
	{"Petr", 30},   // args
	json::object(), // kwargs
	json::object()  // other data
};

For serializing and deserializing JSON in C++, I use the nlohmann/json library with the using json = nlohmann::json; convenience type alias.

Now that we have a body, we can create the message:

auto msg = AmqpClient::BasicMessage::Create(body.dump());

The dump() method serializes the JSON into a std::string.

Celery requires the following two attributes to be set:

msg->ContentType("application/json");
msg->ContentEncoding("utf-8");

The first one tells it to use the JSON serializer. The second one specifies the encoding of the message (we will assume UTF-8).

Next, Celery requires two headers: id and task. The former can be any unique string you want, although Celery uses UUIDs. The latter has to be the name of the task, as registered in the Python part:

msg->HeaderTable({
	{"id", "3149beef-be66-4b0e-ba47-2fc46e4edac3"},
	{"task", "tasks.hello"}
});

In a real-world scenario, you would probably want to generate a random ID ;-).

Finally, we send the message:

channel->BasicPublish("celery", "celery", msg);

The first two arguments are the target exchange and routing key, respectively. If you are unsure what these terms mean, read the AMQP description. We use default Celery values. Since the celery exchange is so-called direct, the routing key prescribes the queue into which the message should be sent. If your application uses a custom message routing, you will need to adjust these two arguments.

You can verify that the message has been sent by starting the Python worker. If everything went well, you should see the task being executed. Great! Let’s see how we can execute the tasks in C++.

Consuming and Executing Tasks

Just like when we implemented task sending, we first need a connection to our message broker (RabbitMQ):

auto channel = AmqpClient::Channel::Create(
	// Use the same arguments as before.
);

To start consuming messages, we need to call channel->BasicConsume():

auto consumer_tag = channel->BasicConsume(
	/*queue*/"celery",
	/*consumer_tag*/"",
	/*no_local*/true,
	/*no_ack*/false,
	/*exclusive*/false,
	/*message_prefetch_count*/1
);

It takes a lot of arguments, so let me explain the most important ones:

  • queue: The name of the queue from which we want to receive messages. We use the default Celery queue.
  • consumer_tag: The name of the consumer. When we pass the empty string, the library will generate a tag for us and return it.
  • no_ack: When set to false, it disables automatic acknowledgements. Instead, we acknowledge messages manually after we have successfully processed the tasks they represent. Otherwise, a message would be acknowledged right after it was received, even if the execution of its task later fails.
  • exclusive: Setting this to false allows other workers to consume messages from the queue (we may not be the only worker that is running).
  • message_prefetch_count: The maximal number of unacknowledged messages that the broker will deliver to us. We want to always receive just a single message (i.e. no buffering).

To receive a message, call channel->BasicConsumeMessage():

auto envelope = channel->BasicConsumeMessage(consumer_tag);
auto message = envelope->Message();

We pass it the consumer tag that the library generated for us. The method blocks until a message is received.

Next, we need to parse task arguments from the body of the message. Again, we assume that Celery used the default serializer (JSON):

auto body = json::parse(message->Body());
auto name = body[0][0].get<std::string>();
auto age = body[0][1].get<int>();

Now that we have the arguments, nothing prevents us from processing the message

std::cout << "Hello " << name << ". You are " << age << " years old.\n";

and acknowledging it as we have successfully finished the execution of its task:

channel->BasicAck(envelope);

Finally, when the worker ends, we should signal to the broker that we are no longer consuming from the queue, which we started when we called channel->BasicConsume().

channel->BasicCancel(consumer_tag);

You can now build the worker, run it, and send a task to it. It should be properly executed. However, it would be better if the worker kept receiving and executing tasks in a loop. This brings us to the last point we will address.

Receiving Tasks in a Loop and Stopping the Worker

Receiving tasks in a loop is easy: just add a while (true) loop. The hard part is how to gracefully stop the worker. Will use signal handling for that.

First, we need a global flag that tells the worker whether it should keep running or stop:

volatile std::sig_atomic_t keep_running = 1;

The volatile keyword instructs the compiler not to optimize accesses to this variable as we will modify this flag asynchronously from a signal handler. Since there is no direct call to the signal handler, the optimizer may otherwise optimize the signal handler away, which is undesirable. The type of the variable is sig_atomic_t. This is an integral type that can be accessed as an atomic entity even in the presence of asynchronous interrupts made by signals.

Next, we define the signal handler:

extern "C" void signal_handler(int signal) {
	if (signal == SIGINT || signal == SIGTERM) {
		keep_running = 0;
	}
}

When we receive either SIGINT (sent when you press Ctrl-C in the terminal) or SIGTERM, it signalizes to the worker that it should stop running.

As a side note, extern "C" is needed because signal handlers are expected to have C linkage and, in general, only use the features from the common subset of C and C++. It is implementation-defined if a function with C++ linkage can be used as a signal handler.

Signal handling is set up via std::signal():

std::signal(SIGINT, signal_handler);
std::signal(SIGTERM, signal_handler);

When either of these two signals is received, our signal handler gets called.

Finally, we have to change the way we receive tasks:

while (keep_running) {
	AmqpClient::Envelope::ptr_t envelope;
	auto message_delivered = channel->BasicConsumeMessage(
		consumer_tag,
		envelope,
		/*timeout*/1000/*ms*/
	);
	if (!message_delivered) {
		continue;
	}

	// The same message-processing code as before.
	// ...
}

To explain, we keep looping until the user signalizes to us that we should stop. Since the originally used call to channel->BasicConsumeMessage() was blocking, we need a different one in order to periodically check the keep_running variable. We use an overloaded version of this method that allows us to pass a timeout after which the call returns. When the timeout expires, the call returns false, which causes the loop to be restarted.

What We Have Skipped

The following topics were out of scope for the present post:

  • Use of other message brokers. Apart from AMQP (RabbitMQ), Celery supports Redis, Amazon SQS, and Apache Zookeeper.
  • Use of other serializers, such as pickle, YAML, or msgpack.
  • Use of result back-ends. If you want to keep task results, you have to use a result back-end.
  • Error handling. In real-word scenarios, you need to be prepared that the broker may be unavailable, there may be connection errors, etc.
  • Concurrent processing. Unlike the Celery worker, which is able to process tasks concurrently, our sample worker deals with tasks synchronously, one by one.
  • Other message attributes and settings, such persistent delivery modes (if you do not want to lose messages when the broker crashes or is restarted).

Complete Source Code

The complete source code for both parts (Python and C++), including detailed comments, is available on GitHub.

2 Comments

Leave a Comment.