v2.5 Symfony Messenger Integration: CQRS and Async Message Processing

Symfony Messenger Integration: CQRS and Async Message Processing

API Platform provides an integration with the Symfony Messenger Component.

This feature allows to implement the Command Query Responsibility Segregation (CQRS) pattern in a convenient way. It also makes it easy to send messages through the web API that will be consumed asynchronously.

Many transports are supported to dispatch messages to async consumers, including RabbitMQ, Apache Kafka, Amazon SQS and Google Pub/Sub.

Installing Symfony Messenger

To enable the support of Messenger, install the library:

 $ docker-compose exec php composer require messenger

Dispatching a Resource through the Message Bus

Set the messenger attribute to true, and API Platform will automatically dispatch the API Resource instance as a message using the message bus provided by the Messenger Component:

```php // api/src/Entity/ResetPasswordRequest.php

namespace App\Entity;

use ApiPlatform\Core\Annotation\ApiResource; use Symfony\Component\Validator\Constraints as Assert;

/**

  • @ApiResource(
  • messenger=true,
    
  • collectionOperations={
    
  •     "post"={"status"=202}
    
  • },
    
  • itemOperations={},
    
  • output=false
    
  • ) / final class ResetPasswordRequest { /*
    • @var string
    • @Assert\NotBlank */ public $username; }

```yaml
# api/config/api_platform/resources.yaml
resources:
  App\Entity\ResetPasswordRequest:
    collectionOperations:
      post:
        status: 202
    itemOperations: []
    attributes:
        messenger: true
        output: false

Because the messenger attribute is true, when a POST is handled by API Platform, the corresponding instance of the ResetPasswordRequest will be dispatched.

For this example, only the POST operation is enabled. We use the status attribute to configure API Platform to return a 202 Accepted HTTP status code. It indicates that the request has been received and will be treated later, without giving an immediate return to the client. Finally, the output attribute is set to false, so the HTTP response that will be generated by API Platform will be empty, and the serialization process will be skipped.

Note: using messenger=true or messenger="input" triggers a dedicated Messenger data persister. Therefore, any other built-in data persisters won’t be called (e.g. Doctrine data persister). If you want to send a message after saving an object (like a Doctrine entity), prefer creating a custom data persister to save your object and dispatch your own message manually.

Registering a Message Handler

To process the message that will be dispatched, a handler must be created:

<?php

// api/src/Handler/ResetPasswordRequestHandler.php

namespace App\Handler;

use App\Entity\ResetPasswordRequest;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

final class ResetPasswordRequestHandler implements MessageHandlerInterface
{
    public function __invoke(ResetPasswordRequest $forgotPassword)
    {
        // do something with the resource
    }
}

That’s all!

By default, the handler will process your message synchronously. If you want it to be consumed asynchronously (e.g. by a worker machine), configure a transport and the consumer.

Accessing the Data Returned by the Handler

API Platform automatically uses the Symfony\Component\Messenger\Stamp\HandledStamp when set. It means that if you use a synchronous handler, the data returned by the __invoke method replaces the original data.

In cases where multiple handlers are registered, the last handler return value will be used as output. If none are returned, ensure resource configuration defines no output with output=false. Handler ordering can be configured using messenger priority tag.

Detecting Removals

When a DELETE operation occurs, API Platform automatically adds a ApiPlatform\Core\Bridge\Symfony\Messenger\RemoveStamp “stamp” instance to the “envelope”. To differentiate typical persists calls (create and update) and removal calls, check for the presence of this stamp using a custom “middleware”.

Using Messenger with an Input Object

Set the messenger attribute to input, and API Platform will automatically dispatch the given Input as a message instead of the Resource. Indeed, it’ll add a default DataTransformer (see input/output documentation) that handles the given input.

<?php

// api/src/Entity/User.php

namespace App\Entity;

use ApiPlatform\Core\Annotation\ApiResource;
use App\Dto\ResetPasswordRequest;

/**
 * @ApiResource(
 *     collectionOperations={
 *         "post"={"status"=202}
 *     },
 *     itemOperations={},
 *     messenger="input",
 *     input=ResetPasswordRequest::class,
 *     output=false
 * )
 */
final class User
{
}

Where ResetPasswordRequest would be:

<?php

// api/src/Dto/ResetPasswordRequest.php

namespace App\Dto;
use Symfony\Component\Validator\Constraints as Assert;

final class ResetPasswordRequest
{
    /**
     * @var string
     * @Assert\NotBlank
     */
    public $var;
}

For this example, only the POST operation is enabled on /users. As above, we use the status attribute to configure API Platform to return a 202 Accepted HTTP status code. It indicates that the request has been received and will be treated later, without giving an immediate return to the client. Finally, the output attribute is set to false, so the HTTP response that will be generated by API Platform will be empty, and the serialization process will be skipped.

In this case, when a POST request is issued on /users the message handler will receive an App\Dto\ResetPasswordRequest object instead a User because we specified it as input and set messenger=input:

<?php

// api/src/Handler/ResetPasswordRequestHandler.php

namespace App\Handler;

use App\Dto\ResetPasswordRequest;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

final class ResetPasswordRequestHandler implements MessageHandlerInterface
{
    public function __invoke(ResetPasswordRequest $forgotPassword)
    {
        // do something with the resource
    }
}

You can also help us improve the documentation of this page.

Made with love by

Les-Tilleuls.coop can help you design and develop your APIs and web projects, and train your teams in API Platform, Symfony, Next.js, Kubernetes and a wide range of other technologies.

Learn more

Copyright © 2023 Kévin Dunglas

Sponsored by Les-Tilleuls.coop