Laravel: Event Dispatcher
Event Dispatcher является одним из самых простых и в то же время одним из самых полезных компонентов Laravel. Почему простым, потому что в общем случае dispatcher просто хранит массив вида событие ⇒ слушатели и в момент, когда вы вызываете dispatch(событие), достает эти слушатели и по очереди их запускает. А почему полезным, потому что события предоставляют точки расширения логики без изменения кода. Таким образом вы уменьшаете каплинг (связанность) вашего приложения, что является хорошим преимуществом на пути масштабирования функционала в будущем.
Давайте для начала напишем свой небольшой dispatcher, чтобы посмотреть, как в общем случае устроен механизм:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Illuminate\Support\Str;
final class EventDispatcher
{
private $listeners = [];
/**
* @param string $event
* @param string|callable $listener
*/
public function listen(string $event, $listener): void
{
$this->listeners[$event][] = $this->makeListener($listener);
}
/**
* @param object $event
*/
public function dispatch(object $event): void
{
$eventName = get_class($event);
if (!isset($this->listeners[$eventName])) {
return;
}
$listeners = $this->listeners[$eventName];
foreach ($listeners as $listener) {
$listener($event);
}
}
/**
* @param $listener
*
* @return callable
*/
private function makeListener($listener): callable
{
if (\is_string($listener)) {
[$class, $method] = Str::parseCallback($listener, 'handle');
return [new $class, $method];
}
if (\is_callable($listener)) {
return $listener;
}
throw new RuntimeException(sprintf('Listener must be class-string or callable, got %s', gettype($listener)));
}
}
final class TestEvent
{
public $payload;
}
final class TestEventListener
{
/**
* @param object $event
* @return mixed|void
*/
public function handle(TestEvent $event)
{
$event->payload[] = [__CLASS__, __METHOD__];
}
}
$dispatcher = new EventDispatcher();
$dispatcher->listen(TestEvent::class, function (TestEvent $event) {
$event->payload[] = [__FUNCTION__];
});
$dispatcher->listen(TestEvent::class, TestEventListener::class);
$dispatcher->dispatch($event = new TestEvent);
dd(
$event,
// TestEvent {#10
// +payload: array:2 [
// 0 => array:1 [
// 0 => "{closure}"
// ]
// 1 => array:2 [
// 0 => "TestEventListener"
// 1 => "TestEventListener::handle"
// ]
// ]
// }
);
В первую очередь, нам нужен метод, с помощью которого мы сможем зарегистрировать наши слушатели. Это метод listen:
public function listen(string $event, $listener): void
{
$this->listeners[$event][] = $this->makeListener($listener);
}
Для названия события, которое является ключом массива, добавляем слушатели, предварительно их создав. Мы позволяем регистрировать два вида слушателей: класс-строка или замыкание. Причем класс-строку можно зарегистрировать с кастомным методом. Например: TestEventListener::class.'@on', где on - название метода внутри класса TestEventListener, который будет обрабатывать событие. По умолчанию будет использоваться метод handle. В ларавеле это работает точно так же, но об этом позже.
private function makeListener($listener): callable
{
if (\is_string($listener)) {
[$class, $method] = Str::parseCallback($listener, 'handle');
return [new $class, $method];
}
if (\is_callable($listener)) {
return $listener;
}
throw new RuntimeException(sprintf('Listener must be class-string or callable, got %s', gettype($listener)));
}
Приводим объект к замыканию, применив трюк с массивом, где первый элемент — объект, а второй — метод этого объекта. Теперь вы можете вызвать массив в качестве замыкания.
Регистрируем наши слушатели и диспатчим событие:
$dispatcher = new EventDispatcher();
$dispatcher->listen(TestEvent::class, function (TestEvent $event) {
$event->payload[] = [__FUNCTION__];
});
$dispatcher->listen(TestEvent::class, TestEventListener::class);
$dispatcher->dispatch($event = new TestEvent);
В момент диспатча происходит следующее:
- Для начала проверяем, есть ли для брошенного события слушатели: для этого берем полное имя класса события и ищем по нему в массиве listeners;
- Если не нашли, просто выходим;
- Если нашли, обходим их и запускаем как замыкания, передав в качестве аргумента объект события.
public function dispatch(object $event): void
{
$eventName = get_class($event);
if (!isset($this->listeners[$eventName])) {
return;
}
$listeners = $this->listeners[$eventName];
foreach ($listeners as $listener) {
$listener($event);
}
}
Это и есть основная способность диспатчера, никакой магии.
Теперь разберем, как это работает в Laravel и что еще умеет диспатчер там.
Для начала вспомним, что для регистрации событий используется сервис-провайдер, но не обычный сервис-провайдер, а так называемый EventServiceProvider. Отнаследовав его, вы должны зарегистрировать слушатели в свойстве $listen или в свойстве $subscribe. Отличие заключается в том, что у подписчика (subscriber) есть метод subscribe, куда фреймворк передаст Dispatcher, чтобы вы зарегистрировали события:
final class UserStatesSubscriber
{
public function (Dispatcher $dispatcher)
{
$dispatcher->listen(
UserCreated::class,
self::class . '@whenUserCreated',
);
}
}
Такая возможность позволяет вам логически группировать несколько событий в одном месте, однако на моей практике подписчики в ларавеле не снискали популярность. Тем не менее, в итоге подписчик разделяется на несколько слушателей. Так работает в ларавеле, так работает и в симфони.
Передаем в метод listen событие(-я) и слушатель:
public function listen($events, $listener)
{
foreach ((array) $events as $event) {
if (Str::contains($event, '*')) {
$this->setupWildcardListen($event, $listener);
} else {
$this->listeners[$event][] = $this->makeListener($listener);
}
}
}
Вы можете прослушивать несколько событий сразу, для этого используется wildcard, например: 'user_states.*'. В этом случае вы будете ловить в этом слушателе все события вида user_states.create, user_states.change, user_states.ban и так далее. Так или иначе добавляем событие в массив в качестве ключа и в качестве значения — объект слушателя, созданный контейнером. Если вы захотите передать объект сами, вам необходимо будет использовать метод __invoke, чтобы его можно было вызвать как функцию:
public function makeListener($listener, $wildcard = false)
{
if (is_string($listener)) {
return $this->createClassListener($listener, $wildcard);
}
return function ($event, $payload) use ($listener, $wildcard) {
if ($wildcard) {
return $listener($event, $payload);
}
return $listener(...array_values($payload));
};
}
Внутри диспатчера ларавел активно использует контейнер. Разумеется, это большой минус ларавел, так как при желании стянуть себе на проект их диспатчер, вы потянете и его (контейнер) тоже. Разработчики ларавел явно не слышали про Common-Closure принцип по отношению к пакетам или почему-то его не любят. По теме принципов хорошего дизайна пакетов советую почитать issue на гитхабе к пакету infection от одного из его авторов.
С замыканиями все понятно, давайте посмотрим, как ларавел создает слушатель из класса-строки:
public function createClassListener($listener, $wildcard = false)
{
return function ($event, $payload) use ($listener, $wildcard) {
if ($wildcard) {
return call_user_func($this->createClassCallable($listener), $event, $payload);
}
return call_user_func_array(
$this->createClassCallable($listener), $payload
);
};
}
Нам необходимо создать класс как замыкание, чтобы привести слушатели к одному виду, для этого диспатчер вызывает метод createClassCallable:
protected function createClassCallable($listener)
{
[$class, $method] = $this->parseClassCallable($listener);
if ($this->handlerShouldBeQueued($class)) {
return $this->createQueuedHandlerCallable($class, $method);
}
return [$this->container->make($class), $method];
}
Парсим строку (помним, что мы зарегистрировали слушатель как строку), получая класс и метод. Напомню, как выглядит регистрация слушателя:
use Illuminate\Foundation\Support\Providers\EventServiceProvider as LaravelEventServiceProvider;
final class EventServiceProvider extends LaravelEventServiceProvider
{
protected $listen = [
UserRegistered::class => [
SendEmailVerificationNotification::class,
AnotherUserRegisteredListener::class . '@on',
],
];
}
Если вы не указали метод с помощью конструкции '@on', будет использован метод handle.
Смотрим, нужно ли отложить выполнение слушателя (да, слушатель можно превратить в джобу, если вы вдруг не знали):
if ($this->handlerShouldBeQueued($class)) {
return $this->createQueuedHandlerCallable($class, $method);
}
И тут можно обнаружить интересные косяки диспатчера. Чтобы вы лучше поняли проблему, о которой я расскажу, давайте посмотрим на то, как можно слушатель превратить в джобу:
final class SomeEventListener implements ShouldQueue
{
public $connection = 'rabbitmq';
public $queue = 'queued_events';
public function __construct() {}
public function handle(SomeEvent $event) {}
}
Мы помечаем слушатель интерфейсом-маркером ShouldQueue, увидев который, диспатчер поймет, что слушатель необходимо отложить, то есть запустить в очереди. Вы можете настроить то, в какую очередь попадет ваш слушатель и какой драйвер будет использоваться: база данных, кролик, sqs, beanstalkd и так далее. Например, локально вы используете один драйвер (например, базу данных), а на проде — другой. В случае ларавеля это очень удобно, и за это ребят стоит похвалить, так как вам нет нужды поднимать локально брокеры вроде кролика или кафки, потому что все настройки актуальны для всех драйверов, и поэтому локально хватит и базы данных. Однако... вы не сможете переопределить соединение в зависимости от окружения или от чего-либо еще, потому что единственный способ определить соединение — это свойство класса. И нет, даже если вы определите его в конструкторе, где вы смогли бы использовать хелперы вроде config или контекстный биндинг, диспатчер все равно его не увидит. Давайте разберемся, почему:
protected function handlerShouldBeQueued($class)
{
try {
return (new ReflectionClass($class))->implementsInterface(
ShouldQueue::class
);
} catch (Exception $e) {
return false;
}
}
Имплементит ли наш слушатель интерфейс ShouldQueue? Да, значит, откладываем его:
protected function createQueuedHandlerCallable($class, $method)
{
return function () use ($class, $method) {
$arguments = array_map(function ($a) {
return is_object($a) ? clone $a : $a;
}, func_get_args());
if ($this->handlerWantsToBeQueued($class, $arguments)) {
$this->queueHandler($class, $method, $arguments);
}
};
}
А может, слушатель все же не хочет выполняться вообще (handlerWantsToBeQueued)? Да, пусть название метода (shouldQueue) не сбивает вас с толку. Если вы подумали, что результат метода влияет на то, как будет исполняться слушатель — синхронно или асинхронно, — то вы ошибаетесь, отрицательный результат приведет к тому, что слушатель не будет выполнен вообще. Именование очень странное, как минимум понятнее было бы назови они его shouldBeExecuted или как-то так:
protected function handlerWantsToBeQueued($class, $arguments)
{
$instance = $this->container->make($class);
if (method_exists($instance, 'shouldQueue')) {
return $instance->shouldQueue($arguments[0]);
}
return true;
}
Обратите внимание на следующую строку:
$instance = $this->container->make($class);
Абсолютно для всех слушателей, которые хотят отложиться, мы в этом методе всегда создаем полноценный объект с помощью контейнера и вызываем метод shouldQueue. Ну, вызвали и вызвали, а проблема в чем? А проблема в методе queueHandler, вызываемом следом, если слушатель все же надо отложить:
protected function queueHandler($class, $method, $arguments)
{
[$listener, $job] = $this->createListenerAndJob($class, $method, $arguments);
$connection = $this->resolveQueue()->connection(
$listener->connection ?? null
);
$queue = method_exists($listener, 'viaQueue')
? $listener->viaQueue()
: $listener->queue ?? null;
isset($listener->delay)
? $connection->laterOn($queue, $listener->delay, $job)
: $connection->pushOn($queue, $job);
}
В самом его начале диспатчер вызывает метод createListenerAndJob, чтобы создать слушатель и джобу для него, однако:
protected function createListenerAndJob($class, $method, $arguments)
{
$listener = (new ReflectionClass($class))->newInstanceWithoutConstructor();
return [$listener, $this->propagateListenerOptions(
$listener, new CallQueuedListener($class, $method, $arguments)
)];
}
Однако он зачем-то снова создает инстанс слушателя, но уже с помощью рефлексии и без конструктора. Без конструктора, Карл! Теперь вы понимаете, почему у вас не получится переопределять соединение слушателя, используя конструктор? Зато у вас получится переопределить очередь, куда попадет слушатель, вам достаточно определить метод viaQueue(). Но это, во-первых, чрезвычайно идиотское решение, а во-вторых, вы можете себе представить ситуацию, когда вам надо будет сменить очередь? Чаще всего вам потребуется сменить именно соединение, а не очередь. Я заводил PR на эту тему, на что мне ответили, что у них нет в планах делать так много изменений в диспатчере, но предложили сделать PR с реализацией метода viaConnection (facepalm). Поэтому придется или переопределять диспатчер, или использовать джобы.
Итак, если слушатель надо отложить, диспатчер создает джобу CallQueuedListener. Это специальная джоба-обертка конкретно для слушателей, она принимает класс слушателя, метод и аргументы (объект события). После того, как джоба будет запущена, выполнится метод handle:
public function handle(Container $container)
{
$this->prepareData();
$handler = $this->setJobInstanceIfNecessary(
$this->job, $container->make($this->class)
);
$handler->{$this->method}(...array_values($this->data));
}
Рассериализуем данные (объект события), создадим инстанс слушателя и выполним метод (handle или любой другой, который вы определили в сервис-провайдере), передав туда объект события. Таким образом, повесив интерфейс ShouldQueue, вы сделали свой синхронный слушатель асинхронным, и это чрезвычайно полезная возможность диспатчера ларавеля, несмотря на проблемы, которые я описал выше.
Вернемся немного назад, туда, где создавалась джоба:
protected function queueHandler($class, $method, $arguments)
{
[$listener, $job] = $this->createListenerAndJob($class, $method, $arguments);
$connection = $this->resolveQueue()->connection(
$listener->connection ?? null
);
$queue = method_exists($listener, 'viaQueue')
? $listener->viaQueue()
: $listener->queue ?? null;
isset($listener->delay)
? $connection->laterOn($queue, $listener->delay, $job)
: $connection->pushOn($queue, $job);
}
Чтобы джобу куда-то запушить, необходимо определить соединение, для этого в диспатчере есть queueResolver, сетится он в провайдере в момент бутстрапа приложения. Используем его, чтобы определить драйвер. Если вы не указали у слушателя свойство connection, будет использован драйвер по умолчанию, указанный в .env переменной QUEUE_CONNECTION. Если указано свойство delay, откладываем джобу, если нет — пушим ее в очередь, после чего она выполнится сразу, как только до нее доберется воркер (демон).
В качестве примера возьмем DatabaseQueue. Для начала мы должны распарсить джобу:
protected function createObjectPayload($job, $queue)
{
$payload = $this->withCreatePayloadHooks($queue, [
'displayName' => $this->getDisplayName($job),
'job' => 'Illuminate\Queue\[email protected]',
'maxTries' => $job->tries ?? null,
'delay' => $this->getJobRetryDelay($job),
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'data' => [
'commandName' => $job,
'command' => $job,
],
]);
return array_merge($payload, [
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
]);
}
Узнаете структура массива? Если вы хоть раз запускали очередь через базу и смотрели в таблицу jobs, вам это должно быть знакомо. После этого вставляем джобу в базу:
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
{
return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
$this->getQueue($queue), $payload, $this->availableAt($delay), $attempts
));
}
protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
{
return [
'queue' => $queue,
'attempts' => $attempts,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->currentTime(),
'payload' => $payload,
];
}
Дальше уже воркер достанет эти записи и будет их исполнять. Подробнее про очереди и джобы поговорим в отдельной статье, а пока вернемся к слушателям.
Итак, слушатели мы зарегистрировали, что дальше? Дальше мы бросаем событие и смотрим, что происходит:
Если событие помечено интерфейсом ShouldBroadcast, значит, его надо послать по веб-сокетам.
if ($this->shouldBroadcast($payload)) {
$this->broadcastEvent($payload[0]);
}
Достаем все слушатели для нашего события и вызываем каждый по очереди. Однако если вы передадите третьим аргументом флаг true, вызов слушателей будет идти до тех пор, пока кто-то из них вернет не nullable ответ. Если вернулся ответ, то диспатчер выйдет из цикла, а следующие слушатели выполнены не будут. Также чтобы остановить вызов слушателей, вы можете вернуть в каком-либо из них в качестве ответа false. Во всех остальных случаях диспатчер будет копить ответы от слушателей и вернет их одним массивом после того, как все они будут выполнены.
$responses = [];
foreach ($this->getListeners($event) as $listener) {
$response = $listener($event, $payload);
if ($halt && ! is_null($response)) {
return $response;
}
if ($response === false) {
break;
}
$responses[] = $response;
}
return $halt ? null : $responses;
На днях был предложен интересный PR, который позволит вам диспатчить событие внутри транзакции и не бояться, что оно исполнится, если транзакция зафейлится, так как в этом случае диспатчер создаст коллбэк у DatabaseTransactionsManager, который будет исполнен только в случае успешного завершения транзакции. PR хоть и полезный, но в духе разработчиков фреймворка сделан из рук вон плохо: например, автор PR предлагает помечать такие слушатели свойством $dispatchAfterCommit, что, конечно, лишает вас возможности управлять этим свойством в рантайме и добавляет много магии в происходящее. И к тому же вы не сможете использовать эту фичу на версиях фреймворка младше 8. Но можно обойтись своим решением:
final class ExtendedEventDispatcher extends Dispatcher
{
/**
* @psalm-param class-string|string $target
* @param object $event
* @param string $target
*
* @return void
*/
public function dispatchAfter(object $event, string $target = TransactionCommitted::class): void
{
$this->listen($target, function (object $commited) use ($event): void {
$this->dispatch($event);
});
}
}
Не забудем заменить диспатчер на свой:
final class AppServiceProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton('events', function ($app) {
return (new ExtendedEventDispatcher($app))->setQueueResolver(function () use ($app) {
return $app->make(QueueFactoryContract::class);
});
});
}
}
Можно пользоваться. На этом обзор диспатчера можно закончить.
Комментарии