Ныряем в кроличью нору: йинещбоос кодяроп меуритнараг

Давайте честно, проектировать надежные событийно-ориентированные системы сложно. Во-первых, ни один брокер сообщений не гарантирует вам exactly-once доставку сообщения, и вам придется добиваться этого собственными силами. Во-вторых, без хорошего понимания работы вашего брокера – а RabbitMQ – сложный брокер – вы не сможете настроить надежную обработку сообщений. Кроме того, есть еще одно важное свойство, проявляемое в событийно-ориентированных системах, которое не надо игнорировать, – это порядок сообщений. События всегда происходят в определенном порядке и должны в том же порядке быть обработаны консьюмерами, иначе ваша система может начать вести себя неожиданно.

Как происходит так, что порядок сообщений может быть нарушен? Представим, что у нас большой поток событий (пример из прошлой статьи) и сервис подписок, который выдает подписки на наш продукт, основываясь на событиях платежа. Чтобы справляться с нагрузками, мы естественно поднимаем много консьюмеров. Это позволяет параллельно обрабатывать большое количество событий, но приводит к проблеме несогласованности данных, потому что нагрузка на консьюмеры может быть разной, как и обработка сообщений по длительности. По-другому это называют гонкой данных: два консьюмера одновременно берут два противоречивых события (например, события об успешном и отмененном платежах) и мы начинаем зависить от порядка их выполнения. Мы хотим, чтобы событие об успешном платеже обработалось раньше, чем событие об отмененном, иначе мы ошибочно выдадим подписку пользователю бесплатно.

Первое, что приходит на ум: давайте запускать один консьюмер. В небольшой системе это было бы рабочим решением (до поры, до времени), но не в нашей: мы должны уметь горизонтально масштабироваться. Другой безумный вариант, который я встречал в интернете, предлагает создавать очередь на пользователя. Разумеется, это совершенно нерабочий вариант, но в нем есть рациональное зерно, а именно – мы должны масштабироваться не количеством консьюмеров, а количеством очередей. Осталось придумать, как это сделать. И тут на помощь к нам приходят алгоритмы, а именно алгоритм согласованного хэширования. Это довольно популярный алгоритм, используемый в балансировке трафика, sticky сессиях, различных структурах данных (например, map из go) и так далее. Суть алгоритма заключается в поиске для заданного ключа определенного места из ограниченного числа нод/бакетов/etc. На псевдоязыке его можно описать следующим образом:

nodes = 10
key = "some key"
hash = hash(key)
hash % nodes

Таким образом, мы для одного и того же ключа всегда будем находить конкретную ноду. Кроме того, мы всегда будем получать число в пределах десяти, благодаря делению по модулю.

И для RabbitMQ уже существует плагин, который добавляет новый тип обменника с поддержкой согласованного хэширования.

Разворачиваем RabbitMQ

В этот раз нам понадобится написать Dockerfile, чтобы включить нужный нам плагин:

FROM rabbitmq:3.11.6-management-alpine

RUN set eux; \
    rabbitmq-plugins enable --offline rabbitmq_consistent_hash_exchange

И конфигурация для docker-compose:

version: '3.0'

services:
  rabbitmq:
    build:
      context: .
      dockerfile: Dockerfile
    restart: always
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    ports:
      - "127.0.0.1:5673:5672"
      - "127.0.0.1:15673:15672"

Описываем топологию

Клиент я буду использовать тот же самый, что был в прошлой статье, поэтому перейдем сразу к описанию топологии:

<?php

declare(strict_types=1);

use Bunny\Client;

require_once __DIR__.'/vendor/autoload.php';

$bunny = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$bunny->connect();

$channel = $bunny->channel();

$channel->exchangeDeclare('events', durable: true);
$channel->exchangeDeclare('subscriptions', exchangeType: 'x-consistent-hash', durable: true, arguments: [
    'hash-header' => 'x-message-id',
]);

$channel->exchangeBind('subscriptions', 'events', 'payment.completed');
$channel->exchangeBind('subscriptions', 'events', 'payment.canceled');
$channel->exchangeBind('subscriptions', 'events', 'payment.refunded');

for ($i = 1; $i <= 10; $i++) {
     $channel->queueDeclare("subscriptions.events.$i", durable: true);
     $channel->queueBind("subscriptions.events.$i", 'subscriptions', $i);
}
  1. Для начала мы создаем общий обменник для всех событий:
$channel->exchangeDeclare('events', durable: true);
  1. Создаем обменник со специальным типом x-consistent-hash и в аргументах указываем ключ, на основе которого необходимо брать хэш-функцию. У нас это будет заголовок x-message-id:
$channel->exchangeDeclare('subscriptions', exchangeType: 'x-consistent-hash', durable: true, arguments: [
    'hash-header' => 'x-message-id',
]);
  1. Далее создаем биндинг нашего обменника к обменнику с событиями только по тем событиям, что интересны нашему сервису подписок:
$channel->exchangeBind('subscriptions', 'events', 'payment.completed');
$channel->exchangeBind('subscriptions', 'events', 'payment.canceled');
$channel->exchangeBind('subscriptions', 'events', 'payment.refunded');
  1. И в конце создаем 10 очередей к нашему обменнику:
for ($i = 1; $i <= 10; $i++) {
     $channel->queueDeclare("subscriptions.events.$i", durable: true);
     $channel->queueBind("subscriptions.events.$i", 'subscriptions', $i);
}

Посмотрим, что получилось с обменниками:

exchanges

И очередями:
queues

Публикуем сообщения

Публиковать сообщения необходимо в обменник events, дальше оттуда сообщения отправятся в зависимости от роутинг ключа в обменник subscriptions, откуда кролик их распределит по очередям в зависимости от результата хэширования заголовка x-message-id.

<?php

declare(strict_types=1);

use Bunny\Client;

require_once __DIR__.'/vendor/autoload.php';

$bunny = new Client([
    'host'      => 'localhost:5673',
    'vhost'     => '/',
    'user'      => 'guest',
    'password'  => 'guest',
]);

$bunny->connect();

$channel = $bunny->channel();

$channel->publish('{"id": 1}', headers: ['x-message-id' => 1], exchange: 'events', routingKey: 'payment.completed');
$channel->publish('{"id": 1}', headers: ['x-message-id' => 1], exchange: 'events', routingKey: 'payment.canceled');
$channel->publish('{"id": 1}', headers: ['x-message-id' => 1], exchange: 'events', routingKey: 'payment.refunded');

$channel->publish('{"id": 2}', headers: ['x-message-id' => 2], exchange: 'events', routingKey: 'payment.completed');
$channel->publish('{"id": 2}', headers: ['x-message-id' => 2], exchange: 'events', routingKey: 'payment.canceled');
$channel->publish('{"id": 2}', headers: ['x-message-id' => 2], exchange: 'events', routingKey: 'payment.refunded');

Посмотрим, что получилось в дашборде кролика:
consistent

На первый взгляд, распределилось поровну, давайте посмотрим, так ли это:
queue

Как видим, сообщения с одним x-message-id попали в одну очередь. Теперь мы можем запускать по одному консьюмеру на очередь и быть уверены в порядке обработки наших сообщений.

Запускаем консьюмеры

Осталась последняя проблема: как гарантировать, что в один момент времени будет запущен только один консьюмер? С одной стороны, ответ очевидный: просто не запускайте больше одного. С другой стороны, об этом можно легко забыть или не знать, поэтому ограничивать надо гарантиями, а не договоренностями. И на этот случай у RabbitMQ есть два способа:

  1. Запустить консьюмер с флагом exclusive:
$channel = $bunny->channel();

$channel->qos(0, 1);

$channel->run(function (\Bunny\Message $message, \Bunny\Channel $channel): void {
   //
}, 'subscriptions.events.1', exclusive: true);

$bunny->run();

В этом случае второй консьюмер получит ошибку ACCESS_REFUSED, и поэтому на стороне приложения надо будет ее обработать, чтобы не войти в цикл попыток подключиться.

  1. Задекларировать очередь со специальным аргументом x-single-active-consumer:
$channel->queueDeclare('subscriptions.events.1', durable: true, arguments: [
    'x-single-active-consumer' => true,
]);

В этом случае RabbitMQ сам будет обрабатывать множественные подключения к одной очереди и будет держать остальных консьюмеров подключенными, чтобы при падении первого сразу передать сообщения другому. Но поскольку очереди в кролике иммутабельные, такой вариант не подходит, если очередь уже была создана ранее. Также он не подойдет, если вам все же захочется потом запустить несколько консьюмеров в случае аварийных ситуаций, поэтому я предпочитаю первый вариант.

Таким образом, с помощью RabbitMQ мы обеспечили себе не только возможность горизонтально масштабироваться, но и сохранять порядок сообщений.

loader
Комментарии
К этому посту больше нельзя оставлять новые комментарии
Логические задачи с собеседований