Asynchronous processing


1Introduction

Temma offers Asynk, a simple way of performing asynchronous processing. This system is based on the use of the dependency injection component, extending it so that calls to objects managed by the component are processed asynchronously.

Example:

// synchronous call via loader
$this->_loader->MyObject->myMethod($param1, $param2);

//  identical asynchronous call
$this->_loader->asynk->MyObject->myMethod($param1, $param2);

As with the dependency injection component, it is possible to use array notation to execute an object placed in a namespace:

// asynchronous call
$this->_loader->asynk['\My\Name\Space\MyObject']->myMethod($param1, $param2);

Tasks can be executed in several different ways:

  • by Temma workers (programs running in the background)
  • by a Temma script run by crontab every minute
  • by a Temma script run by xinetd each time a task is launched

For worker or crontab execution, tasks can be retrieved from a message queue (Beanstalkd or AWS SQS) or a database (MySQL or Redis). For xinetd execution, only database storage (MySQL or Redis) is possible.

If a message queue is used, task data can be stored in one of two ways: either directly in the queue messages, or in a separate database (MySQL or Redis). Using a database can be useful if tasks require more data than the queue can store (64 KB for Beanstalkd, 256 KB for AWS SQS).

Beanstalkd is the recommended message queue for running tasks with Asynk.
You can also use an Amazon SQS queue, but this requires workers to log in regularly to see if there are any jobs waiting to be executed, and there is a charge for each request made.

The simplest solution is to store tasks in a database, and execute them via crontab. For fast execution of tasks, as close as possible to real time, we recommend adding execution by xinetd server.


2Configuration

2.1Principle

Configuration is based on an extended x-asynk configuration in the etc/temma.php file, containing two parameters:

  • transport: the name of the data source used to transmit tasks.
    Behavior will differ depending on the type of data source:
    • undefined: processing by crontab or worker with MySQL or Redis storage
    • socket: processed by xinetd with MySQL or Redis storage
    • Beanstalk: processed by the Beanstalkd message queue
    • SQS: processed by the AWS SQS message queue
  • storage: the name of the data source that will store messages until they are processed.
    Here again, the behavior depends on the type of source:
    • MySQL: storage in a MySQL database.
    • Redis: storage in a Redis database.
    • Beanstalk: when processed by a Beanstalkd message queue.
    • SQS: when processed by an AWS SQS message queue.

Depending on the value, either parameter may be optional.

Here are the possible combinations:

transport storage Processing
MySQL Processing with crontab or workers
Redis
socket MySQL Processed by xinetd (+ optional crontab)
socket Redis
Beanstalk Beanstalk Processing via workers, on tasks stored in the message queue
SQS SQS
Beanstalk MySQL Processing via workers, on tasks stored in a database (useful if data is too large to be stored in the queue)
Beanstalk Redis
SQS MySQL
SQS Redis

Asynk writes log messages using the Temma/Asynk log class.

Here's an example configuration file:

<?php

return [
    'application' => [
        // data sources
        'dataSources' => [
            // MySQL database
            'db'  => 'mysql://user:pwd@host/database',
            // Amazon SQS message queue
            'sqs' => 'sqs://AKXYZ:PWD@FILE',
        ]
    ],
    // log message thresholds
    'loglevels' => [
        'Temma/Base'  => 'ERR',
        'Temma/Web'   => 'WARN',
        'Temma/Asynk' => 'NOTE',
    ],
    // Asynk configuration
    'x-asynk' => [
        // transport: SQS queue
        'transport' => 'sqs',
        // storage: MySQL database
        'storage'   => 'db'
    ],
];

2.2MySQL storage

If you choose to store your tasks in a MySQL database, you'll need to create the table containing the tasks.

Here's the query for creating this table:

CREATE TABLE Task (
    id             INT UNSIGNED NOT NULL AUTO_INCREMENT,
    dateCreation   DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    dateUpdate     DATETIME ON UPDATE CURRENT_TIMESTAMP,
    status         ENUM('waiting', 'reserved', 'processing', 'error') NOT NULL DEFAULT 'waiting',
    token          CHAR(16) CHARACTER SET ascii COLLATE ascii_general_ci,
    target         TINYTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
    action         TINYTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
    data           MEDIUMTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,
    PRIMARY KEY (id),
    INDEX status (status),
    INDEX token (token)
);

The primary key (id field) can be of type BIGINT if you need to manage more than 4 billion tasks.

If the table is stored in a different database from that of the connection, or if it is named something other than Task, or the fields have other names, this can be specified in the x-asynk extended configuration in etc/temma.php.

<?php

return [
    'x-asynk' => [
        'base'   => 'asynk_db',
        'table'  => 'asynk_tasks',
        'id'     => 'task_id',
        'status' => 'task_status',
        'token'  => 'task_token',
        'target' => 'task_target',
        'action' => 'task_token',
        'data'   => 'task_data',
    ]
];
  • Line 5: Name of the database containing the table.
  • Line 6: Name of the table.
  • Line 7: Name of the field containing the primary key.
  • Line 8: Name of the field containing the status.
  • Line 9: Name of field containing reservation token.
  • Line 10: Field name containing target object.
  • Line 11: Field name containing the target method.
  • Line 12: Field name containing serialized parameters.

It is also possible to use a customized DAO:

<?php

return [
    'x-asynk' => [
        'dao' => '\MyApp\AsynkDao'
    ]
];

3Crontab processing

3.1Introduction to crontab

Crontab processing is very simple to implement. The crontab daemon is installed or installable on all Unix systems, and is independent of other software such as a message queue.

Crontab processing runs every minute. This can cause a slight delay in processing tasks. If you need latency-free processing, you can add xinetd processing (see below).


3.2Temma configuration for crontab

The storage used to save tasks must be declared in the extended x-asynk configuration.

Example file etc/temma.php, with Redis storage:

<?php

return [
    'application' => [
        'dataSources' => [
            'ndb' => 'redis://localhost'
        ]
    ],
    'x-asynk' => [
        'storage'   => 'ndb'
    ],
];

3.3Crontab configuration

There are two ways of configuring the crontab:

  • Modify the contents of the file etc/asynk/crontab to adapt the path to the root of your project.
    Then copy this file to /etc/cron.d/ (under a suitable name), for example with the command:
    sudo cp /path/to/project/etc/asynk/crontab /etc/cron.d/my_project
  • Or add the following line (adapting the path) to the crontab of the desired user:
    * * * * *    cd /path/to/project/; bin/comma AsynkWorker crontab

4Xinetd processing

4.1Introducing xinetd

Xinetd is a “super-demon” that listens on several network ports. Each time it receives an incoming connection, it launches the associated program, and takes charge of network exchanges.
For Asynk management, xinetd listens on port 11137 by default.

When Asynk uses xinetd, asynchronous tasks are processed immediately. However, there are two things to bear in mind:

  • If you need to handle a very large number of simultaneous tasks, xinetd will show its limit. In this case, we recommend using a message queue (Beanstalkd or SQS).
  • It may happen that xinetd is unable to handle certain tasks (the daemon is not running or is saturated). It is therefore advisable to combine xinetd processing with crontab processing (see above), so that tasks not processed by xinetd are processed by the crontab.

4.2Temma configuration for xinetd

In the configuration, a socket data source is required, which Asynk will use to connect to xinetd. The xinetd server can be on the local server or on a remote machine. By default, the connection port used by Asynk is 11137.

Example file etc/temma.php, with MySQL storage:

<?php

return [
    'application' => [
        'dataSources' => [
            'sock' => 'tcp://localhost:11137',
            'db'   => 'mysql://user:password@localhost'
        ]
    ],
    'x-asynk' => [
        'transport' => 'sock',
        'storage'   => 'db'
    ],
];

4.3Xinetd configuration

To configure xinetd, edit the file etc/asynk/xinetd to adapt the path, then copy it to the /etc/xinetd.d/ directory (under a suitable name), for example with the command:

sudo cp /path/to/project/etc/asynk/xinetd /etc/xinetd.d/my_project


4.4(optional) Crontab configuration

Using the crontab in addition to xinetd ensures that all tasks are processed, even if xinetd isn't running or is saturated.

There are two ways of configuring the crontab:

  • Modify the contents of the file etc/asynk/crontab to adapt the path to the root of your project.
    Then copy this file to /etc/cron.d/ (under a suitable name), for example with the command:
    sudo cp /path/to/project/etc/asynk/crontab /etc/cron.d/my_project
  • Or add the following line (adapting the path) to the crontab of the desired user:
    * * * * *    cd /path/to/project/; bin/comma AsynkWorker crontab

5Processing by worker

5.1Worker presentation

Workers are programs that run in the background. You can run as many workers as you like. If only one worker is running, it can be considered a processing daemon.

Workers connect to the data source (message queue or database) to retrieve the tasks to be performed. They retrieve tasks one by one, and process them sequentially. If you have a large number of tasks to process, it's best to have several workers running in parallel, otherwise tasks may pile up faster than they can be processed.

It's important to ensure that a minimum number of workers are running at all times, to avoid the risk of tasks not being processed. It is possible to use a supervisor such as Supervisord, which will automatically restart workers if the minimum number of instances is not ensured.


5.2Temma configuration for workers

Temma configuration must contain information on the storage and, where applicable, the transport of tasks.

Example file etc/temma.php, with Beanstalkd transport and MySQL storage:

<?php

return [
    'application' => [
        'dataSources' => [
            'beanstalk' => 'tcp://localhost:11137',
            'db'        => 'mysql://user:password@localhost'
        ]
    ],
    'x-asynk' => [
        'transport' => 'beanstalk',
        'storage'   => 'db'
    ],
];

Another example of a etc/temma.php file, with SQS transport and storage:

<?php

return [
    'application' => [
        'dataSources' => [
            'sqs' => 'sqs://AKXYZ:PWD@sqs.eu-west-3.amazonaws.com/123456789012/queue_name'
        ]
    ],
    'x-asynk' => [
        'transport' => 'sqs',
        'storage'   => 'sqs'
    ],
];

By default, polling workers wait 60 seconds between two connections to retrieve pending tasks. This applies to workers connecting to an Amazon SQS queue or to a MySQL or Redis database; it does not apply to Beanstalkd queues.

This delay can be modified using the loopDelay parameter in the x-asynk extended configuration:

<?php

return [
    'application' => [
        'dataSources' => [
            'db'        => 'mysql://user:password@localhost'
        ]
    ],
    'x-asynk' => [
        'storage'   => 'db',
        // 90-second delay between two checks
        'loopDelay' => 90
    ],
];

5.3Supervisor configuration

The use of Supervisor is optional, but it can be useful to ensure that workers are running, and that they are restarted if a problem occurs.

Copy the file etc/asynk/supervisor.conf to /etc/supervisor/conf.d/asynk.conf with the following command:

sudo cp /path/to/project/etc/asynk/supervisor.conf /etc/supervisor/conf.d/asynk.conf

Then modify the following parameters in the file:

  • command: Enter the correct path to the root of your project.
    Example: command=/path/to/project/bin/comma AsynkWorker
  • numprocs: Set the number of workers to be executed at the same time.
    Example: numprocs=5

Then force Supervisor to take this configuration into account:

sudo supervisorctl reread
sudo supervisorctl update