Ныряем в кроличью нору

RabbitMQ является чуть ли не самым популярным решением для реализации очередей на сегодняшний день. И в то же время одним из самых сложных с точки зрения разработки и эксплуатации. Если вы поищите статьи по теме, с большой доли вероятности найдете материалы, в которых не погружаются дальше декларации простой очереди и публикации и получения сообщений из нее. И как только вы столкнетесь с проблемами, связанными с гарантиями доставки, реализацией отложенных сообщений, переполнения очередей, получения сообщений батчами, вам придется погрузиться в документацию или книги в поисках ответов на эти вопросы. Данная статья целиком основана на моем опыте работы с кроликом и содержит ответы на большинство популярных задач, решаемых этим брокером очередей. Во всех примерах будет использоваться PHP, однако то же самое почти с теми же терминами и определениями можно сделать на любом другом языке программирования.

Разворачиваем RabbitMQ

Я использую docker, поэтому ниже приведу конфигурацию для docker-compose, чтобы развернуть кролик с помощью него:

version: '3.0'

services:
  rabbitmq:
    image: rabbitmq:3.7.7-management-alpine
    restart: always
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    ports:
      - "127.0.0.1:5673:5672"
      - "127.0.0.1:15673:15672"

Клиент

В качестве клиента я буду использовать bunny/bunny, поэтому устанавливаем его с помощью composer командой composer req bunny/bunny и подключаемся к кролику:

<?php declare(strict_types=1);

require_once __DIR__.'/vendor/autoload.php';

use Bunny\Client;

$client = new Client([
    'host'      => 'localhost:5673', // или rabbitmq, если php у вас тоже в докере
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$client->connect();

Начнем с определений

Публикация

Когда вы публикуете сообщения, вы публикуете их в обменник, а не очередь. Даже если вы явно не создали обменник, вы будете использовать специальный встроенный обменник с названием AMQP default. Его можно найти по адресу http://localhost:15673/#/exchanges в списке обменников.

exchanges

Если вы нажмете на него, то в расширенном обзоре можете увидеть следующее описание: The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.

Другими словами, для простых ситуаций вы можете не создавать обменник и привязывать его к очереди, а только создать очередь и начинать публиковать в нее сообщения. Так делают большинство библиотек в составе фреймворков – например, Laravel. Давайте попробуем это сделать:

<?php declare(strict_types=1);

require_once __DIR__.'/vendor/autoload.php';

use Bunny\Client;

$client = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$client->connect();
$channel = $client->channel();

$channel->queueDeclare('simple', durable: true);
$channel->publish('{"hello": "world"}', routingKey: 'simple');

После этого вернитесь в дашборд кролика на вкладку с очередями, вы должны увидеть следующее:
queues

Получение сообщений

Получать сообщения вы будете уже напрямую из очереди, а не обменника. Зачем тогда нужен обменник, спросите вы? Обменники позволяют организовать мощный и гибкий роутинг между очередями, но к этому мы вернемся чуть позже. Итак, чтобы получать сообщения, у вас есть два варианта: с помощью команд basic.get или basic.consume. Я бы не хотел в этой статье подробнее объяснять разницу между ними, отмечу лишь, что второй способ предпочтительнее, а почему – можно прочитать в данной статье. Попробуем получить сообщение первым способом:

<?php declare(strict_types=1);

require_once __DIR__.'/vendor/autoload.php';

use Bunny\Client;

$client = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$client->connect();
$channel = $client->channel();

$msg = $channel->get('simple');
if (null !== $msg) {
    $channel->ack($msg);
    var_dump($msg->content);
}

Получив сообщение, вы должны его либо подтвердить, вызвав команду ack, либо отвергнуть командами nack или reject. Отличие между nack и reject состоит в том, что nack'нуть вы можете сразу несколько сообщений, а reject'нуть только одно. В общем случае вы всегда можете использовать nack. Когда вы вызвали ack, кролик удаляет сообщение из очереди. Если же вы вызвали nack, то кролик либо его вернет обратно в очередь, если вы вместе с nack передали опцию requeue=true, либо будет действовать в зависимости от логики, заложенной в вашей очереди – например, переместит его в dead-letter очередь, о чем подробнее будет рассказано дальше, либо так же удалит.

Теперь получим сообщение вторым способом:

<?php declare(strict_types=1);

require_once __DIR__.'/vendor/autoload.php';

use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;

$client = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$client->connect();
$channel = $client->channel();

$channel->qos(prefetchCount: 1);

$channel->consume(function (Message $message, Channel $channel): void {
    var_dump($message->content);
    $channel->ack($message);
}, 'simple');

$client->run();

Используя второй способ, вы регистрируете консьюмер, который также отражается в информации об очереди в пункте Consumers:

queue-inspect

Обратите внимание на поле Prefetch count. Оно говорит кролику, что консьюмеру надо отдать только одно сообщение до его подтверждения консьюмером. Таким образом вы можете, во-первых, гарантировать, что ваши сообщения никогда не пропадут, так как кролик будет давать следующее сообщение только после подтверждения обработки предыдущего командами ack или nack, а во-вторых, балансировать нагрузку между консьюмерами, чтобы одному консьюмеру не выпало больше сообщений, чем другому. Поэтому настоятельно советую следить за этим числом и никогда не указывать 0, потому что тогда вашему консьюмеру будет передаваться неограниченное количество сообщений, что в случае нагрузок может привести к деградации производительности консьюмера и его падению, в результате чего вы либо потеряете сообщение, либо обработаете его повторно. Это число задается вызовом команды basic.qos: $channel->qos(prefetchCount: 1).

Также в конфигурации консьюмера вы можете увидеть поле Ack required. Оно говорит кролику, что ему необходимо ожидать подтверждения обработки сообщений от консьюмера. Отключение этой опции приведет к увеличению производительности, поскольку тогда кролик не будет ждать подтверждения, но в то же время вы рискуете потерять сообщение, если ваш консьюмер упадет на его обработке, так как тогда оно не вернется обратно в очередь. Поэтому всегда используйте режим подтверждения сообщений консьюмером, только если не уверены, что вам это не нужно.

А теперь серьезно

Все это были достаточно игрушечные примеры, которые не только не реализуют надежную работу с сообщениями, но и местами используют не самые эффективные способы доставки и получения. Основные проблемы начинаются, когда вы от монолитной архитектуры приложения уходите в микросервисную, что накладывает некоторые ограничения на использование специфичных для определенного стека инструментов. Например, вам будет трудно настроить взаимодействие с помощью кролика между приложением на go и приложением на Laravel, если вы будете использовать компонент Queue, который имеет собственный неудобный для межсервисного использования формат сообщения.

Поэтому вам придется использовать голый клиент для работы с кроликом и другой подход к работе с сообщениями. Давайте поставим перед собой задачу: мы должны обрабатывать большое количество событий, в которых в том числе могут быть заинтересованы другие сервисы. И сформулируем некоторые нефункциональные требования к реализации:

  1. Если консьюмер не может обработать сообщение в данный момент, он должен отложить его на определенное время, приступив к следующему;
  2. Если после N ретраев консьюмер все же не смог обработать сообщение, оно должно переместиться в специальную очередь;
  3. Поскольку событий будет очень много, консьюмеры должны быть достаточно производительными, чтобы справляться с нагрузками, однако все же необходимо зафиксировать размер очереди, чтобы не упираться в диск;
  4. Мы должны уметь обрабатывать батчи сообщений.

И все это необходимо реализовать с помощью кролика без привлечения сторонних инструментов вроде базы данных или кафки.

Начнем с декларации обменника

<?php declare(strict_types=1);

use Bunny\Client;

require_once __DIR__.'/vendor/autoload.php';

$bunny = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$bunny->connect();

$channel = $bunny->channel();
$channel->exchangeDeclare('events', exchangeType: 'direct', durable: true);

Наш обменник будет иметь тип direct, что значит, что сообщения будут попадать только в те очереди, которые имеют биндинг к этому обменнику по ключу роутинга, совпадающему с ключом роутинга в сообщении. На этом работа паблишера заканчивается. В распределенной системе, где в событиях могут быть заинтересованы разные сервисы, паблишеру и подписчикам не надо знать друг о друге, и тем более паблишеру не надо знать о том, какие очереди существуют. Его дело – публиковать события в обменник, где ключом роутинга будет название события. Зная названия событий, подписчики могут создать очереди и начинать получать сообщения из них.

Например, в нашей системе будет существовать событие payment_succeeded и три сервиса, заинтересованных в его получении: сервис аналитики, сервис нотификаций, который будет отправлять сообщения пользователям после успешной оплаты, и сервис подписок, предоставляющий услуги после успешного завершения оплаты. Каждый из них должен получить свою копию сообщения и реализовывать свои стратегии повторов и хранения сообщений в случае ошибок.

Сервис аналитики

Поскольку сервис аналитики будет очень нагруженным, нам допустимо потерять часть сообщений, и поэтому мы можем ограничить размер очереди.

<?php declare(strict_types=1);

use Bunny\Client;

require_once __DIR__.'/vendor/autoload.php';

$bunny = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$bunny->connect();

$channel = $bunny->channel();

$channel->queueDeclare('events.analytics-service', durable: true, arguments: [
    'x-max-length' => 100_000_000,
    'x-overflow' => 'reject-publish',
]);
$channel->queueBind('events.analytics-service', 'events', 'payment_succeeded');

Итак, мы создали очередь events.analytics-service и с помощью специальных аргументов x-max-length и x-overflow сказали кролику, что наша очередь ограничена миллионом сообщений, а все остальные сообщения кролик должен откидывать. И после чего привязали нашу очередь к обменнику events по роутинг ключу payment_succeeded.

Теперь мы можем опубликовать сообщение:

$channel->publish('{"paymentId": 1}', exchange: 'events', routingKey: 'payment_succeeded');

Идем смотреть на результат в дашборд кролика:
events.analytics-service

На вкладке Features у очереди events.analytics-service вы можете заметить, что появились два новых лейбла, которые как раз говорят об использовании x-max-length и x-overflow аргументов.

Сервис подписок

К этому сервису предъявляются куда более серьезные требования: например, мы должны обработать все события без исключения, а те, что так и не удалось обработать даже спустя определенное количество ретраев, должны перемещаться в специальную очередь для полуавтоматического разбора. Опишем топологию, которая будет соответствовать этим требованиям:

<?php declare(strict_types=1);

use Bunny\Client;

require_once __DIR__.'/vendor/autoload.php';

$bunny = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$bunny->connect();

$channel = $bunny->channel();

$channel->exchangeDeclare('subscriptions-service', durable: true);

$channel->queueDeclare('subscriptions-service.failed', durable: true);
$channel->queueBind('subscriptions-service.failed', 'subscriptions-service', 'failed');

$channel->queueDeclare('events.subscriptions-service', durable: true, arguments: [
    'x-dead-letter-exchange' => 'subscriptions-service',
    'x-dead-letter-routing-key' => 'failed',
]);
$channel->queueBind('events.subscriptions-service', 'events', 'payment_succeeded');
$channel->queueBind('events.subscriptions-service', 'subscriptions-service', 'events.subscriptions-service');

$channel->queueDeclare('subscriptions-service.delayed.6000', durable: true, arguments: [
    'x-dead-letter-exchange' => 'subscriptions-service',
    'x-dead-letter-routing-key' => 'events.subscriptions-service',
    'x-message-ttl' => 6000,
]);
$channel->queueBind('subscriptions-service.delayed.6000', 'subscriptions-service', 'delay.6000');

Кода много, поэтому разберем по пунктам, что здесь происходит:

  • Для начала создадим отдельный обменник для этого сервиса:
$channel->exchangeDeclare('subscriptions-service', durable: true);
  • Создадим и привяжем к этому обменнику очередь для тех сообщений, которые не получится обработать за вменяемое количество ретраев:
$channel->queueDeclare('subscriptions-service.failed', durable: true);
$channel->queueBind('subscriptions-service.failed', 'subscriptions-service', 'failed');
  • Теперь создадим нашу основную очередь и в аргументах укажем обменник и роутинг ключ для очереди, куда надо отправлять сообщения, которые будут nack'нуты с флагом requeue=false:
$channel->queueDeclare('events.subscriptions-service', durable: true, arguments: [
    'x-dead-letter-exchange' => 'subscriptions-service',
    'x-dead-letter-routing-key' => 'failed',
]);

Другими словами, если ваш консьюмер вызовет $channel->nack($message, requeue: false), то сообщение из очереди events.subscriptions-service будет перемещено кроликом в очередь subscriptions-service.failed автоматически.

  • Привяжем нашу основную очередь к двум обменникам: обменнику с событиями, то есть к events, и к обменнику нашего сервиса, то есть к subscriptions-service:
$channel->queueBind('events.subscriptions-service', 'events', 'payment_succeeded');
$channel->queueBind('events.subscriptions-service', 'subscriptions-service', 'events.subscriptions-service');
  • И напоследок создадим очередь для отложенных сообщений:

$channel->queueDeclare('subscriptions-service.delayed.6000', durable: true, arguments: [
    'x-dead-letter-exchange' => 'subscriptions-service',
    'x-dead-letter-routing-key' => 'events.subscriptions-service',
    'x-message-ttl' => 6000,
]);
$channel->queueBind('subscriptions-service.delayed.6000', 'subscriptions-service', 'delay.6000');

В аргументах x-dead-letter-exchange и x-dead-letter-routing-key мы указываем обменник и роутинг ключ, по которым надо будет отправить сообщение после того, как x-message-ttl (6 секунд) истечет, то есть наше сообщение должно будет вернуться в основную очередь – в events.subscriptions-service. Также создаем биндинг с обменником нашего сервиса.

Теперь рассмотрим, как мог бы выглядеть консьюмер для такой очереди:

<?php declare(strict_types=1);

require_once __DIR__.'/vendor/autoload.php';

use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;

$client = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$client->connect();
$channel = $client->channel();

$channel->qos(prefetchCount: 1);

$channel->consume(function (Message $message, Channel $channel): void {
    try {
        // handle logic here
        $channel->ack($message);
    } catch (\Throwable) {
        $retryCount = $message->headers['retryCount'] ?? 1;
        if ($retryCount >= 5) {
            $channel->nack($message, requeue: false);
        } else {
            $channel->ack($message);
            $channel->publish($message->content, ['retryCount' => $retryCount + 1] + $message->headers, exchange: 'subscriptions-service', routingKey: 'delay.6000');
        }
    }
}, 'events.subscriptions-service');

$client->run();
  • Устанавливаем prefetch-count в одно сообщение, чтобы кролик не давал нам больше одного за ack:
$channel->qos(prefetchCount: 1);
  • Если никаких проблем при обработке сообщения не произошло, спокойно ack'аем его и переходим к следующему:

    try {
    // handle logic here
    $channel->ack($message);
    } ...
  • Если же произошла ошибка, нам необходимо посмотреть, сколько уже ретраев было выполнено:
$retryCount = $message->headers['retryCount'] ?? 1;
  • Если их больше пяти (число 5 здесь для упрощения; в реальном приложении его лучше вынести в конфигурацию либо в заголовки сообщения), нам необходимо nack'нуть сообщение с флагом requeue=false, чтобы сообщение отправилось в очередь subscriptions-service.failed:
if ($retryCount >= 5) {
    $channel->nack($message, requeue: false);
}

После чего вам достаточно настроить алерты через прометеус + графану, чтобы не забывать доставать сообщения из этой очереди и разбираться, что же пошло не так.

  • Если же мы пока еще не исчерпали лимит попыток, ack'аем текущее сообщение и создаем новое на его основе, которое отправим в специальную очередь для отложенных сообщений, где оно будет томиться 6 секунд, прежде чем кролик его переложит обратно в нашу очередь events.subscriptions-service:
} else {
   $channel->ack($message);
   $channel->publish($message->content, ['retryCount' => $retryCount + 1] + $message->headers, exchange: 'subscriptions-service', routingKey: 'delay.6000');
}

Не забываем увеличивать количество попыток и сохранять его в заголовках сообщения. Обратите внимание, что нам не надо читать сообщения из очереди subscriptions-service.delayed.6000, поскольку сообщения оттуда автоматически будут перемещаться в нашу основную очередь, из которой мы уже читаем.

Сервис нотификаций

И последний сервис из нашего списка, к которому, кроме надежности, предъявляется еще такое требование как возможность получения сообщений батчами, так как сервис отправки уведомлений, которым пользуется этот сервис, не любит частые обращения к своему API и просит отправлять уведомлениями батчами по 100 штук. Поскольку реализовать делэи в этом случае будет непросто, мы обойдемся только dead-letter очередью, куда будем складывать батчи, которые не удалось отправить. Опишем топологию:

<?php declare(strict_types=1);

use Bunny\Client;

require_once __DIR__.'/vendor/autoload.php';

$bunny = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$bunny->connect();

$channel = $bunny->channel();

$channel->exchangeDeclare('notifications-service', durable: true);

$channel->queueDeclare('notifications-service.failed', durable: true);
$channel->queueBind('notifications-service.failed', 'notifications-service', 'failed');

$channel->queueDeclare('events.notifications-service', durable: true, arguments: [
    'x-dead-letter-exchange' => 'notifications-service',
    'x-dead-letter-routing-key' => 'failed',
]);
$channel->queueBind('events.notifications-service', 'events', 'payment_succeeded');
$channel->queueBind('events.notifications-service', 'notifications-service', 'events.notifications-service');

На самом деле в кролике нет нативного способа получить батч сообщений за раз. Для этого вам необходимо собирать сообщения на стороне приложения, а после этого акнуть их всех, когда достаточного размера батч соберется и вы его обработаете. Для этого нам необходимо использовать комбинацию prefetch count + multiple флаг у ack или nack операций:

<?php declare(strict_types=1);

require_once __DIR__.'/vendor/autoload.php';

use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;

$client = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$client->connect();
$channel = $client->channel();

$channel->qos(prefetchCount: 100);

$messages = [];

$channel->consume(function (Message $message, Channel $channel) use (&$messages): void {
    $messages[] = $message;
    if (\count($messages) >= 100) {
        try {
            //

            $channel->ack($message, multiple: true);
        } catch (\Throwable) {
            $channel->nack($message, multiple: true, requeue: false);
        } finally {
            $messages = [];
        }
    }
}, 'events.notifications-service');
  • Устанавливаем prefetch-count в размер нашего батча:
$channel->qos(prefetchCount: 100);
  • Как только мы набрали батч в 100 сообщений, обрабатываем его и акаем сразу все сообщения, отправленные вплоть до последнего, передав команде флаг multiple: true:
$channel->ack($message, multiple: true);
  • Если же произошла ошибка, накаем все сообщения вплоть до последнего, передав флаг multiple: true, а также флаг requeue: false, иначе наши сообщения вернутся в очередь обратно и сразу будут получены консьюмером снова:
$channel->nack($message, multiple: true, requeue: false);

Обратите внимание на статистику нашей очереди. Пока мы набираем батч, все наши сообщения отображаются в графе Unacked:
batch

Если вдруг ваш консьюмер упадет, не отправив ни ack, ни nack, сообщения просто вернутся обратно в очередь, что нас вполне устраивает.

Подытожим

Мы смогли настроить настоящую продакшн-топологию очередей, использующую всю мощь и возможности кролика. На самом деле от топологии зависит многое: надежность, своевременность получения сообщений, гарантии доставки, эффективность самого кролика. Кроме того, топология является иммутабельной и не позволяет изменять свойства очередей после создания, поэтому, создав топологию один раз, вам придется с ней жить постоянно либо тяжело и долго (зависит от объема сообщений и сложности топологии) мигрировать на другую.

Также на эффективности работы с кроликом сказывается то, как вы с ним работаете. Например, кролик не любит (впрочем, как и многие другие сервисы), когда вы часто и много к нему подключаетесь, поэтому все общение с кроликом реализовано посредством каналов. Однако и их частое создание определенно не идет кролику на пользу, поэтому если у вас нет возможности использовать постоянное подключение к кролику, то есть если вы используете PHP, тогда ставьте перед кроликом amqpproxy, который за вас будет держать пул соединений к кролику.

Кроме того, если вы используете фреймворки по типу Laravel, обратите внимание, как в них устроена работа с кроликом. Например, как я уже ранее говорил, кролик предпочитает отдавать сообщения через basic.consume, а не basic.get, и именно с помощью basic.get реализовано получение сообщений в популярном пакете для работы с кроликом для Laravel. Справедливости ради, в этом пакете реализована возможность получать сообщения и через basic.consume, однако не все о ней знают.

И напоследок, не надо декларировать обменник и очереди каждый раз, когда вы публикуете сообщение, как, опять же, сделано во многих фреймворках для PHP. Сделайте это один раз, описав топологию в виде консольной команды либо в конфигурации, как это сделано в операторах kubernetes для кролика.

loader
Комментарии
К этому посту больше нельзя оставлять новые комментарии
Логические задачи с собеседований