Drupal 9, 10 Queue Worker with Batch Processing and Guzzle API

Kushal Bansal
5 min readAug 5, 2023

In this blog post, we will explore the powerful features of Drupal 9.4 Queue Worker with Batch Processing and Guzzle API Integration, using a custom module named ‘Custom Queue Worker.’ By integrating the Guzzle HTTP client, we will fetch data from an external API endpoint and efficiently process it using batch operations. The Queue Worker API enables background processing, and Guzzle simplifies the API requests, making it a seamless experience for developers to manage large-scale data updates.

Example Module: ‘Custom Queue Worker’

To demonstrate Drupal 9.4 Queue Worker with Batch Processing and Guzzle API Integration, we will create a custom module called ‘Custom Queue Worker.’ In this example, we will fetch data from an external API and process it using batch operations.

Step 1: Module Setup

Let’s set up the module:

  1. Create a new folder named ‘custom_queue_worker’ inside the ‘modules/custom’ directory of your Drupal installation.
  2. Inside the ‘custom_queue_worker’ folder, create two files:
    a. ‘custom_queue_worker.info.yml’: This file contains the module metadata.
    b. ‘custom_queue_worker.module’: This file holds the module’s PHP code.
  3. Add the following code to ‘custom_queue_worker.info.yml’:
name: 'Custom Queue Worker'
type: module
description: 'Demonstrates Drupal 9.4 Queue Worker with Batch Processing and Guzzle API Integration.'
core_version_requirement: ^9 || ^10
package: Custom
dependencies:
- 'queue'
- 'http_client'

Step 2: Create the Services Configuration

Next, let’s define the service configuration for the ‘Custom Queue Worker’ module.

  1. Create a new file named ‘custom_queue_worker.services.yml’ inside the ‘custom_queue_worker’ module folder.
  2. Add the following code to ‘custom_queue_worker.services.yml’:
services:
custom_queue_worker.http_client:
class: GuzzleHttp\Client
factory: ['@http_client_factory', 'fromOptions']
arguments:
- base_uri: 'https://api.example.com'

custom_queue_worker.batch_builder:
class: Drupal\Core\Batch\BatchBuilder

custom_queue_worker.queue_worker:
class: Drupal\custom_queue_worker\Plugin\QueueWorker\CustomQueueWorkerQueueWorker
arguments:
- '@http_client_factory'
- '@batch.builder'
tags:
- { name: queue_worker }

Step 3: Implement Batch Processing with Guzzle API Integration

Next, let’s implement the batch processing with Guzzle API integration:

  1. Add the following code to ‘custom_queue_worker.module’:
<?php

use Drupal\Core\Queue\QueueWorkerBase;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use GuzzleHttp\Client;
use Drupal\Core\Batch\BatchBuilder;
use Drupal\Core\Batch\BatchManagerInterface;

/**
* Implements hook_help().
*/
function custom_queue_worker_help($route_name, $route_parameters) {
// ... (same as before) ...
}

/**
* Implements hook_menu().
*/
function custom_queue_worker_menu() {
// ... (same as before) ...
}

/**
* Batch processing callback.
*/
function custom_queue_worker_run_batch() {
// ... (same as before) ...
}

Step 4: Create the Queue Worker

Now, let’s create the Queue Worker with batch processing and Guzzle API integration:

  1. Create a new file named ‘CustomQueueWorkerQueueWorker.php’ inside the ‘src/Plugin/QueueWorker’ directory within the ‘custom_queue_worker’ module folder.
  2. Add the following code to ‘CustomQueueWorkerQueueWorker.php’:
<?php

namespace Drupal\custom_queue_worker\Plugin\QueueWorker;

use Drupal\Core\Queue\QueueWorkerBase;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use GuzzleHttp\Client;
use Drupal\Core\Batch\BatchBuilderInterface;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Psr7\Request;

/**
* Process batch API data fetcher.
*
* @QueueWorker(
* id = "custom_queue_worker_queue",
* title = @Translation("Custom Queue Worker Queue"),
* cron = {"time" = 60}
* )
*/
class CustomQueueWorkerQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {

/**
* The HTTP client.
*
* @var \GuzzleHttp\ClientInterface
*/
protected $httpClient;

/**
* The batch builder.
*
* @var \Drupal\Core\Batch\BatchBuilderInterface
*/
protected $batchBuilder;

/**
* Constructs a new CustomQueueWorkerQueueWorker object.
*
* @param \GuzzleHttp\Client $http_client
* The Guzzle HTTP client.
* @param \Drupal\Core\Batch\BatchBuilderInterface $batch_builder
* The batch builder.
*/
public function __construct(Client $http_client, BatchBuilderInterface $batch_builder) {
$this->httpClient = $http_client;
$this->batchBuilder = $batch_builder;
}

/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$container->get('custom_queue_worker.http_client'),
$container->get('custom_queue_worker.batch_builder')
);
}

/**
* {@inheritdoc}
*/
public function processItem($data) {
$start = $data['start'];
$end = $data['end'];

$api_url = '/api/data/' . $start . '/' . $end;

try {
$response = $this->httpClient->get($api_url);
if ($response->getStatusCode() === 200) {
$api_data = json_decode($response->getBody(), TRUE);

// Batch processing to handle API data.
$batch = $this->batchBuilder->setTitle(t('Processing API data from @start to @end', ['@start' => $start, '@end' => $end]))
->setFinishCallback([$this, 'batchFinished'])
->setInitMessage(t('Batch processing is starting.'))
->setProgressMessage(t('Processed @current out of @total items.'))
->setErrorMessage(t('An error occurred during batch processing.'))
->addOperation([$this, 'processAPIData'], [$api_data])
->set('api_data', $api_data)
->set('processed_items', 0)
->set('total_items', count($api_data))
->execute();
}
} catch (RequestException $e) {
watchdog_exception('custom_queue_worker', $e);
}
}

/**
* Batch process API data.
*
* @param array $api_data
* The API data.
* @param array $context
* The batch context.
*/
public function processAPIData(array $api_data, array &$context) {
$batch_size = 10; // Number of items to process in each batch.

$start = $context['sandbox']['processed_items'];
$end = $start + $batch_size;
$end = min($end, count($api_data));

// Your logic to process the API data here.
// Example: Save data to entities, update nodes, etc.

// Update batch progress.
$context['sandbox']['processed_items'] += $end - $start;
$context['message'] = t('Processed @current out of @total items.', [
'@current' => $context['sandbox']['processed_items'],
'@total' => $context['sandbox']['total_items'],
]);

// Set the batch progress percentage.
$context['finished'] = $context['sandbox']['processed_items'] / $context['sandbox']['total_items'];
}

/**
* Batch finished callback.
*
* @param bool $success
* TRUE if the batch processed successfully, otherwise FALSE.
* @param array $results
* The batch results.
* @param array $operations
* The batch operations.
*/
public function batchFinished($success, array $results, array $operations) {
if ($success) {
drupal_set_message(t('Batch processing completed successfully.'));
} else {
drupal_set_message(t('Batch processing encountered errors.'), 'error');
}
}
}

Step 5: Enable the Module

To run our example, enable the ‘Custom Queue Worker’ module through the Drupal admin interface or by using Drush: drush en custom_queue_worker.

Step 6: Run the Fetch Data Example

To run the example, follow these steps:

  1. Visit the URL ‘/example/fetch_data’ on your Drupal site while logged in as an administrator.
  2. A message will be displayed, indicating that “Fetch Data Example with Guzzle API Integration is processing in the background. You will be notified once it completes.”
  3. The background processing will start automatically, and you will see progress messages in the Drupal status report or log, indicating the number of items processed.
  4. The batch process will handle the data received from the external API, allowing you to process and save data to entities or update nodes, depending on the specific use case.

Conclusion:

In this blog post, we explored the powerful features of Drupal 9.4 Queue Worker with Batch Processing and Guzzle API Integration. By integrating Guzzle HTTP client, we fetched data from an external API endpoint and efficiently processed it using batch operations. The Queue Worker API allowed for seamless background processing of tasks, enabling Drupal to handle large-scale data updates without affecting the main request/response cycle.

With the ‘Custom Queue Worker’ module as a practical example, we demonstrated how Drupal’s flexible architecture and powerful APIs facilitate seamless integration of external services, unlocking the full potential of your Drupal-powered websites and applications.

As you explore and experiment with Drupal’s capabilities, remember that the possibilities are endless. Whether it’s integrating with external APIs, handling complex data processing, or delivering exceptional user experiences, Drupal empowers you to build robust, scalable, and future-proof solutions. Happy coding!

--

--

Kushal Bansal

I am a Software Developer. I enjoy working in frontend and backend technologies. https://www.drupal.org/u/kushal-bansal