• Блог
  • Очереди в MODX из коробки

Привет, друзья! Уверен, немногие из вас знают, что в MODX есть очереди. Да-да. Просто они скрываются за псевдонимом modRegistry. Я в своё время узнал об этом сервисе из статьи "Использование modRegistry". И даже использую его в AdminTools. Хотя в то время я не особо понимал, почему для временного хранения нужно использовать именно modRegistry, если гораздо проще для этого использовать механизм кэширования. Никаких тебе коннектов и сабскрайбов. Но я старался не отставать от авторитетных товарищей, несмотря на то, что и они не давали ответа на этот вопрос.

А ответ, на самом деле, простой — modRegistry в первую очередь предназначен для работы с очередями сообщений, а для временного хранения лучше использовать кэш. Но что такое очереди и для чего они нужны?

Очереди

Основное предназначение очередей — отложить выполнение задач, на обработку которых тратится много времени, для последующей обработки в другом потоке, где время выполнения не критично. Это может быть отправка писем, операции импорта, сложные задачи расчётов и т.п. Если выполнять такие задачи на фронте, то пользователь столкнётся с проблемой долгого ответа от сервера. А если задача очень сложная, то она может не закончить выполнение из-за ограничения в 30 секунд вэб-сервера. Поэтому такие задачи стараются вынести из потока пользователя.

Одноразовые или редкие задачи выполняют в консоли вручную. А для повторяющихся используют планировщик заданий Cron, который может запускать указанные файлы с определённой периодичностью.

modRegistry

Если заглянуть в документацию, то мы увидим сухое описание реализаций абстрации регистра и несколько примеров, которые совершенно не раскрывают основное предназначение сервиса. Смысл примера с пивом скрыт даже от любителей этого напитка, а пример с сессией показывает вариант временного хранения, с которым не хуже справится кэш-менеджер. Кроме того, реализация не соответствует задаче — данные сохраняются для пользователя, а по условию задачи он ещё не залогинен.

Но именно функционал временного хранения часто используется в самом MODX. В регистре хранятся состояния объектов ExtJs. Такие как активные вкладки в разных интерфейсах, размеры и положения окон и другая служебная информация. Все эти данные хранятся в файловой системе (core/cache/registry/state/). А ещё этот механизм используется в интерфейсе нового ресурса. Некоторые, наверно, обращали внимание, что когда у ещё несохранённого документа меняешь шаблон, MODX перегружает страницу и в адресную строку добавляет параметр reload с каким-то хешем. А главное — на странице все введённые данные сохраняются. Это результат работы modRegistry. Перед перезагрузкой страницы данные сохраняются в очередь с ключом, указанным в параметре reload, а затем достаются при запросе страницы. В данном случае информация хранится в базе данных.

Руководство по работе с очередями в MODX

Некоторые приложения для MODX используют свои очереди. В качестве примера можно привести Tickets, Sendex. Есть отдельный пакет simpleQueue. Некоторые разработчики только ради очереди ставят Tickets. Уверен, после данной статьи они перестанут это делать. Итак, давайте вместе докопаемся до подробностей.

Инициализация очереди

Есть абстрактный уровень и две реализации — для файловой системы (modFileRegister) и для базы данных (modDbRegister). Их назвали регистрами, но я бы обозвал их менеджерами очередей. Ибо многие уже знакомы с очередями и процесс познания пойдёт значительно легче.

Первое, что нужно сделать, это загрузить сервис. Следом выбрать одну из реализаций.

<?php
// 1. Загружаем сервис
$modx->getService('registry', 'registry.modRegistry');
// 2a. Регистрируем файловый менеджер очередей
$modx->registry->addRegister('food', 'registry.modFileRegister', array('directory' => 'food'));
// 2b. Регистрируем менеджер очередей базы данных
$modx->registry->addRegister('food', 'registry.modDbRegister', array('directory' => 'food'));
// 3. Зарегистрированный менеджер доступен так
$modx->registry->food;

Это пример из документации. Я бы немного оптимизировал код:

<?php
// 1. Загружаем сервис
$registry = $modx->getService('registry', 'registry.modRegistry');
// 2a. Регистрируем файловый менеджер очередей
$foodQM = $registry->getRegister('food', 'registry.modFileRegister');
// 2b. Регистрируем менеджер очередей базы данных
$foodQM = $registry->getRegister('food', 'registry.modDbRegister');
// 3. Зарегистрированный менеджер доступен так
$foodQM;

Тут нужно пояснить.

  • Вместо метода addRegister() лучше использовать метод getRegister(). Он возвращает существующий объект менеджера очередей с указанным именем или создаёт новый, в случае его отсутствия, и также его вернёт. Хотя объект $modx->registry->food тоже будет доступен.
  • Также я удалил третий аргумент в виде массива параметров array('directory' => 'food'). Параметр 'directory' актуален только для файлового менеджера (для базы данных-то он зачем?) и позволяет изменить название папки, если вы не хотите, чтобы она называлась также как и очередь. Т.е.
  • // Будет создана папка core/cache/registry/food/
    $foodQM = $registry->getRegister('food', 'registry.modFileRegister');
    // Будет создана папка core/cache/registry/meal/
    $foodQM = $registry->getRegister('food', 'registry.modFileRegister', ['directory' => 'meal']);  
    

Словами это можно описать так — мы создали очередь «food», для работы с которой предназначен менеджер $foodQM.

Соединение с очередью

После инициализации менеджера очередей документация указывает, что необходимо подключиться к очереди. Если что-то пойдёт не так, то вернётся значение false.

$connected = $modx->registry->food->connect();

В нашем случае короче

$connected = $foodQM->connect();

Но заглянув в код могу сказать следующее, что для DB менеджера это делать вообще не нужно. Этот метод состоит из одной строчки

public function connect(array $attributes = array()) {
    return true;
}

А для файлового менеджера этот метод проверяет существование папки с указанным именем очереди, и если такой нет, то создаёт её. Так что лучше этот метод всё-таки вызывать. Хотя, честно говоря, можно было бы сделать также, как и для DB менеджера, у которого очередь создаётся (если её ещё нет) при инициализации менеджера в конструкторе класса. И тогда одним методом меньше, а голове легче. А ещё лучше сделать ленивую инициализацию в методе connect(), который вызывать при записи сообщений в очередь.

Темы и подписки

Все сообщения в очереди группируются по темам (топикам по-забугорному). В документации для очереди «food» создаётся тема "/beer/". Глядя на слэши интуиция подсказывает, что тема является частью пути. Конечно, не очень кашерно заставлять разработчика писать слэши. Тем более, что для БД они не нужны. А для файловой очереди их можно добавить где-то внутри менеджера. К слову сказать, они и добавляются, если их нет. Но из-за некорректной проверки без слэшей работать не будет. А всего-то нужно две строчки поменять местами. Просто запомните.

Итак, тема — это путь. Но путь — это понятие файловой структуры. А значит темы — это (в нашем случае) подпапки каталога «food». В них будут создаваться файлы сообщений очереди. А что касательно менеджера очередей для базы данных, то там немного другая структура. В базе данных есть 3 таблицы —

  • modx_register_queues — для очередей. Если сравнивать с файловым менеджером — это первый уровень папок.
  • modx_register_topics — для тем. Привязана к таблице очередей. Это второй уровень папок.
  • modx_register_messages — для сообщений. Привязана к таблице тем. По аналогии с файловым менеджером — это файлы.

Чтобы работать с очередями необходимо подписаться на нужные темы:

$foodQM->subscribe("/beer/");

Подписаться можно на несколько тем, но делать это придётся последовательно. Метод subscribe() добавляет указанную тему в массив тем, хранящихся в открытом свойстве $subscriptions (пример использования этого свойства в конце статьи). Но важно помнить — без подписки ни отправить сообщение в очередь, ни прочитать не получиться.

При необходимости можно отписаться от ненужной темы:

$foodQM->unsubscribe("/beer/");

Отправка сообщений в очередь

Отправлять можно как одно сообщение, так и несколько. Причём сообщение может быть как именованным, так и нет. Имя (ключ сообщения) задаётся в ключе массива. Если он не задан, то имя будет сформировано автоматически в виде временной отметки. Рассмотрим пример для файлового менеджера очередей. Создадим сообщение без имени (ключа).

$foodQM->send("/beer/", "It's Miller Time!");

Будет создан файл registry/food/beer/20200725T094538-000.msg.php (дата и время будут другие). Расшифруем — очередь food, тема beer, сообщение 20200725T094538-000. В конец файла добавляется трехзначная нумерация для уникальности, так как при создании нескольких сообщений временная метка будет одинаковая.

Если заглянуть в файл, то увидим возврат нашего сообщения

<?php
return "It's Miller Time!";

Все просто. Создать несколько сообщений не сложнее.

# 1. Сообщения без имени (ключей). 
$foodQM->send("/beer/", array("beer1", "beer2", "beer3"));
// Будут созданы 3 файла - registry/food/beer/20200725T094538-000.msg.php
//                         registry/food/beer/20200725T094538-001.msg.php
//                         registry/food/beer/20200725T094538-002.msg.php

# 2. Сообщения с ключами. 
$foodQM->send("/beer/", array("Heineken" => "not so good", "Pabst Blue Ribbon" => "rocks"));
// Будут созданы 2 файла - registry/food/beer/Heineken.msg.php
//                         registry/food/beer/Pabst Blue Ribbon.msg.php

Для сообщений в БД это работает также, только имя (указанное или автоматически сгенерированное) записывается в поле `id` таблицы сообщений и в имени нет дополнительной нумерации -000. За неё отвечает автоинкрементное поле `id`.

Я бы советовал для очередей использовать базу данных. Она больше подходит для такого рода операций.

Кроме того, третим аргументом в метод send() можно передать массив параметров:

  • delay — количество секунд, через которое сообщение будет доступно для чтения.
  • ttl — время жизни сообщения в секундах.
  • kill — если указать true, то процесс чтения прерывается и следующие сообщения обработаны не будут.

Передавать можно не только строки, но и массивы, что гораздо востребованней. Как видите, ничего сложного. Но у меня есть один вопрос к разработчикам — зачем нужны подписки, если при отправке всё равно нужно указать тему? Моё мнение — это нелогично и можно было бы убрать требование подписки.

Чтение сообщений из очереди

Чтобы на фронте быстро вернуть запрос пользователю, все сложные задачи помещаются в очередь для последующей обработки в Cron. А для этого нужно создать файл с доступом к очереди сообщений и настроить периодичность его запуска. Давайте разберёмся, как зачитывать сообщения из очереди.

<?php
// 1. Загружаем сервис
$registry = $modx->getService('registry', 'registry.modRegistry');
// 2a. Регистрируем или файловый менеджер очередей
$foodQM = $registry->getRegister('food', 'registry.modFileRegister');
// 2b. Или менеджер очередей базы данных
$foodQM = $registry->getRegister('food', 'registry.modDbRegister');
// 3. Обязательно подписываемся на нужные темы
$foodQM->subscribe("/beer/");
// 4. Получаем сообщения из подписанных тем.
$messages = $foodQM->read([
    'poll_limit' => 1,
    'msg_limit' => 10
]);

Результатом будет индексированный массив без ключей.

array:5 [
  0 => "beer1"
  1 => "beer2"
  2 => "beer3"
  3 => "not so good"
  4 => "rocks"
]

Без ключей сложно разобраться в этом списке. Чтобы они появились, нужно передать опцию include_keys.

$messages = $foodQM->read([
    'poll_limit' => 1,
    'msg_limit' => 10,
    'include_keys' => true,
]);

// Результат
array:5 [
  "20200725T094538-000" => "beer1"
  "20200725T094538-001" => "beer2"
  "20200725T094538-002" => "beer3"
  "Heineken" => "not so good"
  "Pabst Blue Ribbon" => "rocks"
]

Так значительно лучше. Но у метода read() опций достаточно много. Вот их полный список:

  • poll_limit — количество итераций чтения. По-умолчанию, стоит значение 0 и менеджер запускает бесконечный цикл, которым можно управлять следующими опциями. Не знаю, зачем это нужно. Ибо на фронте всё подвиснет, а в кроне и так указан цикл запуска скрипта. Поэтому, самое лучшее — ставить 1. В документации написано также, но значение по-умолчанию почему-то 0.
  • poll_interval — количество секунд между запросами, если в предыдущей опции указано больше 1 или 0.
  • time_limit — время работы цикла запросов в секундах. Опять же, если poll_limit != 1. Если не указан, то берётся из php.ini.
  • msg_limit — максимальное количество сообщений для чтения из очереди. По умолчанию 5 сообщений. Зависит от частоты Cron. Если раз в минуту, то можно попробовать и 100. Всё зависит от сложности задачи.
  • remove_read — указывает, следует ли удалять сообщение при прочтении. При работе с очередями как правило записи удаляются. Если что-то пошло не так, то задачу нужно опять вернуть в очередь. По-умолчанию, true.
  • include_keys — указывает, должна ли операция чтения включать ключи сообщения. Если false, только сообщения возвращаются в простом индексированном массиве. По-умолчанию, false.

Очистка очереди сообщений

Очередь можно очистить одним заросом. При этом сама очередь и тема останутся, будут удалены только сообщения. И что интересно, в данном случае подписываться на тему не обязательно.

//$foodQM->subscribe("/beer/");  // подписка не нужна.
$foodQM->clear("/beer/");

Удаление очереди и темы

Для файловой реализации готового решения нет. Нужно вручную удалить или ненужную папку темы или сразу папку очереди из каталога регистра core/cache/registry/. У реализации для БД есть решение через xPDO —

// Удалить тему. Будут удалены и зависимые сообщения.
$topic = $modx->getObject('registry.db.modDbRegisterTopic', ['name' => '/beer/']);
$topic->remove();
// Удалить очередь вместе с темами.
$queue = $modx->getObject('registry.db.modDbRegisterQueue', ['name' => 'food']);
$queue->remove();

Пример

С теорией закончили, давайте пощупаем сервис на практике. Возьмём пример более реальный. Задача: при добавлении нового комментария к статье на сайте нужно уведомить автора статьи, модераторов и подписанных пользователей. Если комментарий к другому комментарию, то уведомить ещё и его автора. Таким образом, у нас будет 4 темы.

// Можно создать метод в классе комментария или процессора 
$registry = $modx->getService('registry', 'registry.modRegistry');
$mailQM = $registry->getRegister('mail', 'registry.modDbRegister');
// Тема для автора статьи
$mailQM->subscribe("/author/");
// Тема для модераторов
$mailQM->subscribe("/moderators/");
// Тема для подписчиков
$mailQM->subscribe("/subscrubers/");
// Тема для автора комментария
$mailQM->subscribe("/comment_author/");

// Теперь нужно добавить сообщения в очередь
$mailQM->send("/author/", [['user' => $author->id, 'comment' => $comment->id]]); // Массив в массиве, чтобы было не 2 сообщения с id автора и комментария, а одно с массивом данных.  
$mailQM->send("/comment_author/", [['user' => $commentAuthor->id, 'comment' => $comment->id]]);
foreach($moderators as $moderator) {
    $mailQM->send("/moderators/", [['user' => $moderator['id'], 'comment' => $comment->id]]); 
}
foreach($subscrubers as $subscruber) {
    $mailQM->send("/subscrubers/", [['user' => $subscruber['id'], 'comment' => $comment->id]]); 
}

Код условный. Определения некоторых переменных опущены, как и некоторые проверки. Чисто для понимания идеи. Вот бы ещё убрать подписки, которые не несут никакой смысловой нагрузки, и код сократится в 2 раза.

Комментарий пользователя без задержек появляется на странице. А уже на сервере в фоновом режиме тихо и спокойно скрипт обработки очереди будет разгребать данные задачи.

Зачитываем все топики очереди «mail».

$registry = $modx->getService('registry', 'registry.modRegistry');
$mailQM = $registry->getRegister('mail', 'registry.modDbRegister');

// Зачитываем сообщения по очереди, а не все сразу, чтобы использовать разные шаблоны для ответа
foreach(["/author/", "/comment_author/", "/moderators/", "/subscrubers/"] as $topic) {
    // Подписываемся только на одну тему через свойство, без методов subscribe() и unsubscribe().
    $mailQM->subscriptions = [$topic];
    $messages = $mailQM->read([
        'poll_limit' => 1,
        'msg_limit' => 10,
        'remove_read' => true,
    ]);
    if (empty($messages)) {
        continue;
    }
    
    // Так как комментарий один и тот же, в целях оптимизации лучше получить его до цикла.
    $comment = $modx->getObject('CommentClass', ['id' => $messages[0]['comment']]);
    $tpl = $modx->getChunk('tpl.mail.' . trim($topic, '/'), $comment->toArray());
    foreach($messages as $key => $message) {
        $user = $modx->getObject('modUser', ['id' => $message['user']]);
        if ($modx->user->sendEmail($tpl)) {
            unset($messages[$key]);
        }
    }
    if (!empty($messages)) {
        $modx->log(modX::LOG_LEVEL_ERROR, 'Не всем пользователям темы ' . $topic .  ' удалось отправить письма!');
        ... Добавляем опять в очередь необработанные соообщения
    }
}

Вот как-то так. Данная тема требует определённой квалификации и знаний. И мне кажется, что это решение легче освоить, чем писать свою реализацию очередей с нуля.

P.S. С видео я пока притормозил. Времени на них уходит минимум в 2 раза больше, чем на статью, а просмотров в разы меньше. Или MODX не очень популярная тема на Youtube, или я лыжи плохо обул. Так что ухожу в творческий отпуск. Буду ждать музу.

1   3486

Комментарии ()

  1. Семён 01 августа 2020 # +1
    Очень познавательно, спасибо!:v
    1. Александр 25 августа 2020 # +1
      Статья огонь! Спасибо, Серега.

      Или MODX не очень популярная тема на Youtube, или я лыжи плохо обул
      Как по мне, такие темы лучше прочитать, чем прослушать))
      1. Сергей Шлоков 26 августа 2020 # 0
        А мне интересно ещё и в живую смотреть как работает код.:r

      Вы должны авторизоваться, чтобы оставлять комментарии.

      Выделите опечатку и нажмите Ctrl + Enter, чтобы отправить сообщение об ошибке.