Laravel: Queue

28.12.2020 в 19:45
5450
+2

Давайте начнем с определения круга задач, которые решают очереди. Например, у вас есть интернет-магазин, где пользователи активно заказывают, а вы активно контролируете различные метрики: частота заказов, отказов, просмотр тех или иных товаров и так далее. Когда пользователь оплачивает заказ, в этой цепочке почти всегда есть платежная система.

И почти всегда платежная система с вашей системой интегрируется посредством отправки коллбэков после успешной/неуспешной оплаты. Получив коллбэк, вы должны сделать все необходимые действия: сформировать чек, отправить письмо на почту пользователю, отправить письмо на почту отделу, который займется сборкой и отправкой заказа, и так далее. Делать все это синхронно в одном потоке неправильно по нескольким причинам:

  1. Это медленно.
  2. Одно действие влияет на всю цепочку: ошибка при отправке письма на почту повлияет на формирование и отправку чека (если вы не предусмотрели обработку ошибок). Привет, налоговая.
  3. Отсутствие повторного воспроизведения. Если вам не удалось отправить письмо, то сообщение в очереди можно заретраить (повторить), это встроено в механизм очередей. Без очередей ретраить вам придется вручную.

Или другой пример: менеджер хочет скачать отчет по заказам за всю прошлую неделю. Сделать синхронно вы не только не должны, но и не сможете: если отчет большой, то браузер просто обрубит слишком длительное соединение с сервером (это можно настраивать, но не надо). Для этого используем очередь: получаем действие менеджера, отправляем задачу в очередь, формируем отчет, а менеджеру отвечаем, что ему на почту упадет ссылка на отчет, также он (отчет) будет выведен в списке отчетов где-нибудь в админке.

На самом деле таких задач может быть много. А в нагруженной системе такими задачами становятся практически все. Если вы переживаете за интерактив с пользователем, в этом случае вы можете использовать веб-сокеты. Тогда вы сразу после исполнения задачи сможете уведомить пользователя о том, что вы что-то сделали.

Laravel со своей стороны предлагает полезный и удобный в настройке механизм очередей. Хотелось бы сказать, что он также удобный и в использовании, и вообще компонент просто класс, но это не так. Про минусы и плюсы будем говорить между делом, а сейчас сосредоточимся на работе очередей.

С чего начинается очередь

Если вы работали с очередями и до фреймворка, то наверняка слышали про rabbitmq, activemq, kafka, redis, beanstalkd и другие брокеры. Так вот, Laravel предлагает единый интерфейс для всех брокеров, вам достаточно указать имя драйвера, и ваше сообщение будет обрабатываться этим брокером. Это реально удобно.

Итак, чтобы ваше сообщение обрабатывалось в очереди, оно должно соответствовать некоторым правилам, диктующимся фреймворком:

  1. Джоба (именно так называются такие сообщения) должна имплементить интерфейс ShouldQueue. Это интерфейс-маркер, у него не методов;
  2. В свойствах queue, connection нужно определить название очереди и драйвер (кафка, редис, кролик и так далее). А можно этого не делать, тогда возьмется драйвер по умолчанию, который указан в переменной QUEUE_CONNECTION из енва. А очередью будет default очередь;
  3. Вся логика должна храниться в методе handle.

У этого подхода есть много минусов. Поскольку джоба является одновременно и сообщением, и обработчиком, вы не можете использовать конструктор для внедрения зависимостей, поскольку сами же и создаете инстанс джобы. В этом случае ларавел позволяет внедрять зависимости через метод handle, но проблема в том, что в ларавел нет контекстного биндинга для методов, поэтому если у вас в приложении у одного интерфейса две реализации и именно в джобу надо прокинуть другую реализацию, а не ту, которую знает сервис-контейнер, придется прокидывать именно реализацию, а не интерфейс, что снижает гибкость кода. На самом деле, я немного приврал, такая возможность все же есть, если обратиться к документации, но в этом случае вам придется указывать все зависимости метода, а не одну конкретную. Лично меня такой подход не устраивает, но кому какое дело.

Второй недостаток этого подхода (хотя стоит признать, что подход тут не причем, причем тут только разработчики фреймворка) — это конфигурация. Она указывается в самой джобе. Внутри класса джобы. Джоба, которая часть бизнес-кода. Джоба, которая может выполниться синхронно или асинхронно, должна содержать в себе настройки инфраструктуры. Это вполне в духе ларавеля, который проповедует "простоту" использования.

Третий, на мой взгляд, самый важный недостаток — это self-handle поведение джобы. Как только вы начинаете использовать такую джобу, вы увеличиваете каплинг в вашем коде, потому что диспатчите конкретную джобу. Если вам понадобится выполнить еще одну отложенную задачу в ответ на какое-либо действие системы, вам придется задиспатчить еще одну джобу рядом. А потом еще одну. И еще одну. Мало того, что вы увеличиваете связанность кода, так еще и нарушаете open-closed. Поэтому в своих проектах я отказался от джобов, ведь можно так же использовать и EventDispatcher, о котором было рассказано в прошлой части обзора. Он чуть менее настраиваемый и гибкий, но он дает возможность добавлять обработчики к одному сообщению, не затрагивая сам код. Такой подход использует мессенджер Symfony.

Допустим, вы создали джобу. Теперь ее нужно задиспатчить, то есть отправить в очередь. Для этого вы можете заинжектить Illuminate\Contracts\Bus\Dispatcher и вызвать у него метод dispatch:

public function dispatch($command)
    {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
            return $this->dispatchToQueue($command);
        }

        return $this->dispatchNow($command);
    }

Dispatcher проверяет, есть ли у него queueResolver и должна ли джоба обрабатываться в очереди (имплементит ли она интерфейс ShouldQueue). Если нет, джоба выполняется немедленно. Нас интересует метод dispatchToQueue:

public function dispatchToQueue($command)
    {
        $connection = $command->connection ?? null;

        $queue = call_user_func($this->queueResolver, $connection);

        if (! $queue instanceof Queue) {
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
        }

        if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        }

        return $this->pushCommandToQueue($queue, $command);
    }

Нам необходимо получить свойство connection, в котором определен драйвер очереди, и после этого получить объект этой очереди. Теперь мы должны разобраться, что из себя представляет queueResolver. Для этого обратимся к BusServiceProvider:

$this->app->singleton(Dispatcher::class, function ($app) {
        return new Dispatcher($app, function ($connection = null) use ($app) {
          return $app[QueueFactoryContract::class]->connection($connection);
       });
 });

queueResolver - это замыкание, внутри которого мы получаем объект для интерфейса Illuminate\Contracts\Queue\Factory. Этим объектом является QueueManager. У него мы вызываем метод connection:

public function connection($name = null)
    {
        $name = $name ?: $this->getDefaultDriver();

        if (! isset($this->connections[$name])) {
            $this->connections[$name] = $this->resolve($name);

            $this->connections[$name]->setContainer($this->app);
        }

        return $this->connections[$name];
    }

Метод connection должен вернуть нам инстанс интерфейса Illuminate\Contracts\Queue\Queue. Этими инстансами могут быть: DatabaseQueue, RedisQueue, SyncQueue, SqsQueue, RabbitMQQueue и другие. Если вы захотите написать собственный драйвер очереди, Queue — это то, что вам необходимо будет сделать, чтобы управлять вашей очередью. Так, например, вы должны будете реализовать метод для получения сообщения из очереди, для записи сообщения в очередь и многое другое.

Итак, мы должны определить соединение и достать для него объект очереди. Для этого фреймворк ищет среди доступных соединений нужное или берет по умолчанию, которое указано в queue.php:default и енве под переменной QUEUE_CONNECTION. Когда находит, то использует так называемый коннектор, чтобы создать объект очереди. Коннектор - это объект, который должен реализовать интерфейс Illuminate\Queue\Connectors\ConnectorInterface с одним единственным методом connect. Этот метод должен вернуть нам объект очереди (напоминаю, что он должен имплементить интерфейс Illuminate\Contracts\Queue\Queue). Давайте посмотрим на DatabaseConnector:

class DatabaseConnector implements ConnectorInterface
{
    /**
     * Database connections.
     *
     * @var \Illuminate\Database\ConnectionResolverInterface
     */
    protected $connections;

    /**
     * Create a new connector instance.
     *
     * @param  \Illuminate\Database\ConnectionResolverInterface  $connections
     * @return void
     */
    public function __construct(ConnectionResolverInterface $connections)
    {
        $this->connections = $connections;
    }

    /**
     * Establish a queue connection.
     *
     * @param  array  $config
     * @return \Illuminate\Contracts\Queue\Queue
     */
    public function connect(array $config)
    {
        return new DatabaseQueue(
            $this->connections->connection($config['connection'] ?? null),
            $config['table'],
            $config['queue'],
            $config['retry_after'] ?? 60
        );
    }
}

Кстати, тут есть забавный баг. Обычно все очереди не только имплементят интерфейс Queue, но и наследуются от абстрактного класса Illuminate\Queue\Queue. Что тут может пойти не так? А то, что у интерфейса Queue нет метода setContainer, который вызывается, я напомню, вот тут:

public function connection($name = null)
    {
        $name = $name ?: $this->getDefaultDriver();

        if (! isset($this->connections[$name])) {
            $this->connections[$name] = $this->resolve($name);

            $this->connections[$name]->setContainer($this->app); // <--- setContainer()??
        }

        return $this->connections[$name];
    }

Но на уровне контракта закреплен именно интерфейс, а не абстрактный класс. То есть абстрактный класс выступает тут не в роли типа, а в роли обычного класса-родителя, который вы обязаны наследовать вместе с имплементацией интерфейса. Ну или вы можете реализовать метод setContainer, который вам может быть вообще не нужен.

Давайте еще раз: у нас есть ConnectorInterface, который вы должны реализовать и вернуть из него объект, реализующий интерфейс Illuminate\Contracts\Queue\Queue. Ни о каком абстрактном классе Queue в этой цепочке речи не идет. Однако Laravel имеет наглость без проверки на наличие вызывать у вашего объекта метод setContainer, что делает дыру в вашем сервисе и в целом нарушает контракт. Почему бы просто не оставить только абстрактный класс Queue с абстрактными методами и методом setContainer? Потому что это Laravel. Одним словом, помните об этом, когда будете писать свой драйвер для очереди: или наследуйте абстрактный класс Illuminate\Queue\Queue, или определяйте метод setContainer.

Метод getConnector должен вернуть нам тот самый объект для интерфейса ConnectorInterface:

protected function getConnector($driver)
    {
        if (! isset($this->connectors[$driver])) {
            throw new InvalidArgumentException("No connector for [$driver].");
        }

        return call_user_func($this->connectors[$driver]);
    }

Тут снова вызывается замыкание. Значит, надо посмотреть, как добавить в QueueManager коннектор. Заглянем в QueueServiceProvider:

protected function registerDatabaseConnector($manager)
    {
        $manager->addConnector('database', function () {
            return new DatabaseConnector($this->app['db']);
        });
    }

Итак, достали коннектор и вызвали у него метод connect:

protected function resolve($name)
    {
        $config = $this->getConfig($name);

        return $this->getConnector($config['driver'])
                        ->connect($config)
                        ->setConnectionName($name);
    }

Получив очередь, возвращаем ее и мы снова оказываемся в Dispatcher:

public function dispatchToQueue($command)
    {
        $connection = $command->connection ?? null;

        $queue = call_user_func($this->queueResolver, $connection); // тут мы уже получили Queue

        if (! $queue instanceof Queue) {
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
        }

        if (method_exists($command, 'queue')) {
            return $command->queue($queue, $command);
        }

        return $this->pushCommandToQueue($queue, $command);
    }

Нас интересует метод pushCommandToQueue:

protected function pushCommandToQueue($queue, $command)
    {
        if (isset($command->queue, $command->delay)) {
            return $queue->laterOn($command->queue, $command->delay, $command);
        }

        if (isset($command->queue)) {
            return $queue->pushOn($command->queue, $command);
        }

        if (isset($command->delay)) {
            return $queue->later($command->delay, $command);
        }

        return $queue->push($command);
    }

В принципе, неважно, какой метод будет вызван, нам нужно только увидеть, как сообщение попадает в очередь. Рассмотрим метод push для двух очередей — базы данных и кролика. Начнем с базы данных:

public function push($job, $data = '', $queue = null)
    {
        return $this->pushToDatabase($queue, $this->createPayload(
            $job, $this->getQueue($queue), $data
        ));
    }

Для начала нам нужно создать само сообщение, делает это абстрактный класс Queue:

protected function createObjectPayload($job, $queue)
    {
        $payload = $this->withCreatePayloadHooks($queue, [
            'uuid' => (string) Str::uuid(),
            'displayName' => $this->getDisplayName($job),
            'job' => 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries' => $job->tries ?? null,
            'maxExceptions' => $job->maxExceptions ?? null,
            'delay' => $this->getJobRetryDelay($job),
            'timeout' => $job->timeout ?? null,
            'timeoutAt' => $this->getJobExpiration($job),
            'data' => [
                'commandName' => $job,
                'command' => $job,
            ],
        ]);

        return array_merge($payload, [
            'data' => [
                'commandName' => get_class($job),
                'command' => serialize(clone $job),
            ],
        ]);
    }

Если вы хотя бы раз копались в таблице с очередями, то сразу должны узнать структуру полей. При обработке нашего сообщения очередью будет использоваться Illuminate\Queue\CallQueuedHandler@call. Примерно также ларавел поступает со слушателями, о которых мы говорили в прошлом обзоре.

Сохраняем в базу джобу и возвращаем ее id:

protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
    {
        return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
            $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts
        ));
    }

На этом цикл жизни джобы "прерывается", дальше она будет доступна уже в демоне вашей очереди, который и вызовет код из метода handle.

Давайте для разнообразия посмотрим, как поступает очередь кролика:

public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw($this->createPayload($job, $queue, $data), $queue, []);
    }

    /**
     * {@inheritdoc}
     *
     * @throws AMQPProtocolChannelException
     */
    public function pushRaw($payload, $queue = null, array $options = [])
    {
        [$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options);

        $this->declareDestination($destination, $exchange, $exchangeType);

        [$message, $correlationId] = $this->createMessage($payload, $attempts);

        $this->channel->basic_publish($message, $exchange, $destination, true, false);

        return $correlationId;
    }

Если вы использовали кролик, то уже знакомы с понятиями exchange, routingKey и другими. В общем и целом, разбираем сообщение, получая оттуда exchange, queue, routingKey и само сообщение. Также мы получаем correlationId, который вернет метод dispatch. Если вы вдруг в базе сохраняете айдишники сообщений, то помните, что RabbitMQQueue айдишники возвращает в виде строки из 32-х символов:

protected function createPayloadArray($job, $queue, $data = '')
    {
        return array_merge(parent::createPayloadArray($job, $queue, $data), [
            'id' => $this->getRandomId(),
        ]);
    }

    protected function getRandomId(): string
    {
        return Str::random(32);
    }

Ну а бдшная очередь вернет какой-нибудь инкрементный айдишник, поэтому вам нужно будет использовать общий тип данных, чтобы сохранять айдишники от любых брокеров в одном поле.

Отправляем сообщение в очередь:

$this->channel->basic_publish($message, $exchange, $destination, true, false);

Собственно, вот и все, джоба пропала с радаров и это произошло быстрее, чем если бы мы выполняли ее синхронно.

Демон

Теперь начинается наиболее интересная часть обработки сообщения. Из документации вы должны помнить, что чтобы запустить обработку сообщений, необходимо вызвать команду queue:work rabbitmq —queue=some_queue. В качестве аргумента вы передаете название драйвера (в нашем случае это rabbitmq), а в качестве опций передаете название очереди. Есть еще много других опций, все можно посмотреть тут. Если не передать название драйвера, возьмется драйвер по умолчанию, а очередью будет default в случае отсутствия опции queue. По умолчанию очередь запускается в режиме демона:

protected function runWorker($connection, $queue)
    {
        $this->worker->setCache($this->cache);

        return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
            $connection, $queue, $this->gatherWorkerOptions()
        );
    }

Если передать опцию once, то выполнится только одна следующая джоба. Нас интересует метод daemon:

while (true) {
   $job = $this->getNextJob(
         $this->manager->connection($connectionName), $queue
    );

    if ($job) {
       $jobsProcessed++;

       $this->runJob($job, $connectionName, $options);
    } else {
        $this->sleep($options->sleep);
     }

Я удалил все лишнее, чтобы вы увидели самое важное, что происходит внутри демона, а именно: запускается бесконечный цикл, внутри которого мы достаем сообщение из нужной очереди и по одному начинаем обрабатывать.

Метод getNextJob обращается к методу pop интерфейса Queue, который мы рассмотрели чуть выше:

$popJobCallback = function ($queue) use ($connection) {
            return $connection->pop($queue);
        };

        try {
            if (isset(static::$popCallbacks[$this->name])) {
                return (static::$popCallbacks[$this->name])($popJobCallback, $queue);
            }

            foreach (explode(',', $queue) as $queue) {
                if (! is_null($job = $popJobCallback($queue))) {
                    return $job;
                }
            }
        } catch (Throwable $e) {
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        }

На самом деле в опцию queue консольного демона можно передать несколько очередей через запятую по их приоритету, поэтому демон разбивает их с помощью explode. В этом случае пока первая очередь не опустеет, до остальных обработка не дойдет.

Метод pop должен вернуть или объект, реализующий интерфейс Illuminate\Contracts\Queue\Job, или null, если сообщений не осталось. Так же, как и в случае с Queue, ваша джоба может не только реализовать интерфейс Job, но и унаследовать абстрактный класс Illuminate\Queue\Jobs\Job.

Теперь нам нужно запустить джобу, для этого вызываем метод fire, этот метод реализован у абстрактного класса Illuminate\Queue\Jobs\Job:

public function fire()
    {
        $payload = $this->payload();

        [$class, $method] = JobName::parse($payload['job']);

        ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }

Строка $this->payload() вызывает метод getRawBody вашей джобы, чтобы та вернула джсон строку сообщения, поскольку в разных брокерах сообщения могут храниться по-разному. Ну, например, посмотрим на pop метод кролика:

public function pop($queue = null)
    {
        try {
            $queue = $this->getQueue($queue);

            /** @var AMQPMessage|null $message */
            if ($message = $this->channel->basic_get($queue)) {
                return $this->currentJob = new RabbitMQJob(
                    $this->container,
                    $this,
                    $message,
                    $this->connectionName,
                    $queue
                );
            }
        } catch (AMQPProtocolChannelException $exception) {
            if ($exception->amqp_reply_code === 404) {
                $this->channel = $this->connection->channel();

                return null;
            }

            throw $exception;
        }

        return null;
    }

В качестве сообщения джоба принимает AMQPMessage, а джоба базы данных получает stdClass, именно поэтому getRawBody — это абстрактный метод, чтобы можно было правильно получить единый формат сообщения.

Вернемся к методу fire. Распарсим джобу, чтобы получить название обработчика и метода:

[$class, $method] = JobName::parse($payload['job']);

($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);

Как я уже говорил, джоба оборачивается в обработчик, а не исполняется напрямую. Этим обработчиком является CallQueuedHandler. В нем мы вызываем метод call, куда передаем джобу и данные.

В методе call мы рассериализовываем джобу и выполняем:

public function call(Job $job, array $data)
    {
        try {
            $command = $this->setJobInstanceIfNecessary(
                $job, unserialize($data['command'])
            );
        } catch (ModelNotFoundException $e) {
            return $this->handleModelNotFound($job, $e);
        }

        $this->dispatchThroughMiddleware($job, $command);

        if (! $job->hasFailed() && ! $job->isReleased()) {
            $this->ensureNextJobInChainIsDispatched($command);
        }

        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }
    }

Если наша джоба имеет трейт InteractsWithQueue, мы добавляем в нее инстанс \Illuminate\Contracts\Queue\Job. Это может быть DatabaseJob, RedisJob, RabbitMQJob и другие. Они позволяет внутри вашей джобы управлять поведением очереди: зарелизить ваше сообщение вручную, удалить, зафейлить и так далее.

Рассериализовав, мы получаем наш исходный объект джобы, который ранее отправили в очередь. Теперь нам нужно вызвать у нее метод handle:

protected function dispatchThroughMiddleware(Job $job, $command)
    {
        return (new Pipeline($this->container))->send($command)
                ->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
                ->then(function ($command) use ($job) {
                    return $this->dispatcher->dispatchNow(
                        $command, $this->resolveHandler($job, $command)
                    );
                });
    }

У джобы могут быть миддлвары, через которые можно пропустить вашу джобу до того, как она обработается. Каждая миддлвара получит в метод handle/__invoke инстанс вашей джобы и замыкание $next. В конце вызываем метод dispatchNow нашего диспатчера:

public function dispatchNow($command, $handler = null)
    {
        if ($handler || $handler = $this->getCommandHandler($command)) {
            $callback = function ($command) use ($handler) {
                return $handler->handle($command);
            };
        } else {
            $callback = function ($command) {
                return $this->container->call([$command, 'handle']);
            };
        }

        return $this->pipeline->send($command)->through($this->pipes)->then($callback);
    }

А вот и заветная строка:

$callback = function ($command) {
    return $this->container->call([$command, 'handle']);
};

Теперь джоба считается выполненной (если, конечно, не было исключений или что-то еще).

События

Еще фреймворк предоставляет возможность слушать события из очереди: перед обработкой, после обработки, во время обработки, во время исключения и так далее. Как их зарегистрировать, написано в документации. Событиями управляет QueueManager:

public function before($callback)
{
     $this->app['events']->listen(Events\JobProcessing::class, $callback);
}

Дальше уже Worker в момент выполнения джобы диспатчит различные события, на которые можно подписаться и что-то сделать. Например, можно сделать мониторинг выполнения джоб: в какой очереди, в каком драйвере, какая джоба выполняется, с какими данными, не было ли исключения и так далее.

Вместо заключения

Queue — действительно очень сложный компонент. Если закрыть глаза не некоторые баги и недостатки, то это удобный и очень гибкий компонент. Для многих проектов его хватит даже для реализации сложной бизнес-логики. Он постоянно развивается, в него добавляются новые возможности, поэтому сразу взять и понять, как он работает, — сложно. Поэтому давайте я еще раз попробую вкратце описать схему работы компонента так, будто мы создаем собственный драйвер очереди.

Допустим, мы создали сообщение и хотим его отправить в нашу очередь. Для этого сначала мы должны реализовать ConnectorInterface, который создаст объект интерфейса Queue. В общем случае ConnectorInterface представляет из себя фабрику: получая настройки подключения, он передает их объекту очереди и возвращает его диспатчеру фреймворка. Наш Queue обязан реализовать некоторые методы, которыми будет управлять фреймворк, самые важные из них — это отправить сообщение в очередь и достать одно сообщение из очереди. Отправлять сообщение в очередь будет диспатчер, а доставать — консольный демон. Перед отправкой сообщения в очередь вы должны его сериализовать в формате, с которым работает фреймворк. Отправив сообщение в очередь, вы должны запустить консольный демон, куда передать в качестве соединения название вашего драйвера. Тогда фреймворк снова создаст объект вашей очереди и на этот раз вызовет метод pop, в котором вы должны вернуть объекта интерфейса Job или null, если сообщений не осталось. Job не только имеет возможность исполнить ваше сообщение, но и управлять различными циклами жизни сообщения: релиз сообщения, удаление сообщение, обработка ошибок. Скажем, если вы реализуете драйвер для кролика, то при релизе должны ackнуть сообщение, чтобы оно не попало обратно в очередь. При вызове метода fire абстрактный класс Job вызывает CallQueuedHandler@call, где мы снова обращаемся к диспатчеру, только уже к методу dispatchNow. В нем вызывается метод handle.

Итак, при написании собственного драйвера вы должны будете реализовать ConnectorInterface для подключения к очереди, интерфейс Queue для отправки сообщения в очередь и получения сообщения из очереди и интерфейс Job для запуска джобы.

Пожалуй, в одном из следующих обзоров мы реализуем собственный драйвер, ну а пока на этом все.

loader
28.12.2020 в 19:45
5450
+2
Комментарии
Новый комментарий

Логические задачи с собеседований