Skip to content

Add support to set queue options, either globally using queue.php or by implementing an interface #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 8, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog][keepachangelog] and this project adher
### Added

- gRPC client support
- Added support to set queue options globally or per job [#158]

## Unreleased

Expand Down
8 changes: 8 additions & 0 deletions src/Queue/Contract/HasQueueOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Spiral\RoadRunnerLaravel\Queue\Contract;

interface HasQueueOptions
{
function queueOptions(): array;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add explicit visibility modifier and return type declaration

While interface methods are implicitly public in PHP, it's a best practice to explicitly declare the visibility modifier. Additionally, add a return type declaration for better type safety.

 interface HasQueueOptions
 {
-    function queueOptions(): array;
+    public function queueOptions(): array;
 }

Also consider adding PHPDoc to document the expected structure of the returned array:

interface HasQueueOptions
{
    /**
     * Get queue options for this job.
     * 
     * @return array{
     *     driver?: string,
     *     delay?: int,
     *     priority?: int,
     *     auto_ack?: bool,
     *     topic?: string
     * }
     */
    public function queueOptions(): array;
}
🤖 Prompt for AI Agents
In src/Queue/Contract/HasQueueOptions.php lines 5 to 8, explicitly add the
public visibility modifier to the queueOptions method and declare its return
type as array. Additionally, include a PHPDoc block above the method to document
the expected structure of the returned array, specifying optional keys like
driver, delay, priority, auto_ack, and topic with their respective types for
better clarity and type safety.

1 change: 1 addition & 0 deletions src/Queue/RoadRunnerConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public function connect(array $config): Queue
new Jobs($rpc),
$rpc,
$config['queue'],
$config['options'] ?? [],
);
}
}
44 changes: 38 additions & 6 deletions src/Queue/RoadRunnerQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@
use RoadRunner\Jobs\DTO\V1\Stats;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\RoadRunner\Jobs\KafkaOptions;
use Spiral\RoadRunner\Jobs\Options;
use Spiral\RoadRunner\Jobs\OptionsInterface;
use Spiral\RoadRunner\Jobs\Queue\Driver;
use Spiral\RoadRunner\Jobs\QueueInterface;
use Spiral\RoadRunnerLaravel\Queue\Contract\HasQueueOptions;

final class RoadRunnerQueue extends Queue implements QueueContract
{
public function __construct(
private readonly Jobs $jobs,
private readonly RPCInterface $rpc,
private readonly string $default = 'default',
private readonly array $defaultOptions = [],
) {}

public function push($job, $data = '', $queue = null): string
Expand All @@ -29,13 +35,13 @@ public function push($job, $data = '', $queue = null): string
$this->createPayload($job, $queue, $data),
$queue,
null,
fn($payload, $queue) => $this->pushRaw($payload, $queue),
fn($payload, $queue) => $this->pushRaw($payload, $queue, $this->getJobOverrideOptions($job)),
);
}

public function pushRaw($payload, $queue = null, array $options = []): string
{
$queue = $this->getQueue($queue);
$queue = $this->getQueue($queue, $options);

$task = $queue->dispatch(
$queue
Expand All @@ -52,7 +58,7 @@ public function later($delay, $job, $data = '', $queue = null): string
$this->createPayload($job, $queue, $data),
$queue,
$delay,
fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue),
fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue, $this->getJobOverrideOptions($job)),
);
}

Expand Down Expand Up @@ -88,8 +94,9 @@ private function laterRaw(
\DateTimeInterface|\DateInterval|int $delay,
array $payload,
?string $queue = null,
array $options = []
): string {
$queue = $this->getQueue($queue);
$queue = $this->getQueue($queue, $options);

$task = $queue->dispatch(
$queue
Expand All @@ -101,9 +108,9 @@ private function laterRaw(
return $task->getId();
}

private function getQueue(?string $queue = null): QueueInterface
private function getQueue(?string $queue = null, array $options = []): QueueInterface
{
$queue = $this->jobs->connect($queue ?? $this->default);
$queue = $this->jobs->connect($queue ?? $this->default, $this->getQueueOptions($options));

if (!$this->getStats($queue->getName())->getReady()) {
$queue->resume();
Expand All @@ -127,4 +134,29 @@ private function getStats(?string $queue = null): Stat

return new Stat();
}

private function getJobOverrideOptions(string|object $job): array
{
if ($job instanceof HasQueueOptions) {
return $job->queueOptions();
}

return [];
}

private function getQueueOptions(array $overrides = []): OptionsInterface
{
$config = array_merge($this->defaultOptions, $overrides);
$options = new Options(
$config['delay'] ?? 0,
$config['priority'] ?? 0,
$config['auto_ack'] ?? false
);

return match ($config['driver'] ?? null) {
Driver::Kafka => KafkaOptions::from($options)
->withTopic($config['topic'] ?? 'default'),
default => $options,
};
}
}