From 5e4a4453b6f06b6db3d754a71d093285b159a606 Mon Sep 17 00:00:00 2001 From: Henrik Malmberg Date: Wed, 23 Jul 2025 12:31:23 +0200 Subject: [PATCH 1/7] feat: enhance RoadRunnerQueue and RoadRunnerConnector with options handling --- src/Queue/RoadRunnerConnector.php | 20 ++++++++++++++++++++ src/Queue/RoadRunnerQueue.php | 4 +++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/Queue/RoadRunnerConnector.php b/src/Queue/RoadRunnerConnector.php index 86cffbd..70c8f4b 100644 --- a/src/Queue/RoadRunnerConnector.php +++ b/src/Queue/RoadRunnerConnector.php @@ -10,6 +10,10 @@ use Spiral\Goridge\RPC\RPC; use Spiral\RoadRunner\Environment; use Spiral\RoadRunner\Jobs\Jobs; +use Spiral\RoadRunner\Jobs\KafkaOptions; +use Spiral\RoadRunner\Jobs\Options; +use Spiral\RoadRunner\Jobs\OptionsInterface; +use Spiral\RoadRunner\Jobs\Queue\Driver; final class RoadRunnerConnector implements ConnectorInterface { @@ -25,7 +29,23 @@ public function connect(array $config): Queue return new RoadRunnerQueue( new Jobs($rpc), $rpc, + $this->getOptions($config['options'] ?? []), $config['queue'], ); } + + private function getOptions(array $config): OptionsInterface + { + $options = new Options( + $config['delay'] ?? 0, + $config['priority'] ?? 0, + $config['auto_ack'] ?? false + ); + + return match ($config['driver'] ?? null) { + Driver::Kafka => KafkaOptions::from($options) + ->withTopic($config['topic'] ?? 'default'), + default => $options, + }; + } } diff --git a/src/Queue/RoadRunnerQueue.php b/src/Queue/RoadRunnerQueue.php index 23d3ff7..6253249 100644 --- a/src/Queue/RoadRunnerQueue.php +++ b/src/Queue/RoadRunnerQueue.php @@ -12,6 +12,7 @@ use RoadRunner\Jobs\DTO\V1\Stats; use Spiral\Goridge\RPC\RPCInterface; use Spiral\RoadRunner\Jobs\Jobs; +use Spiral\RoadRunner\Jobs\OptionsInterface; use Spiral\RoadRunner\Jobs\QueueInterface; final class RoadRunnerQueue extends Queue implements QueueContract @@ -19,6 +20,7 @@ final class RoadRunnerQueue extends Queue implements QueueContract public function __construct( private readonly Jobs $jobs, private readonly RPCInterface $rpc, + private readonly OptionsInterface $options, private readonly string $default = 'default', ) {} @@ -103,7 +105,7 @@ private function laterRaw( private function getQueue(?string $queue = null): QueueInterface { - $queue = $this->jobs->connect($queue ?? $this->default); + $queue = $this->jobs->connect($queue ?? $this->default, $this->options); if (!$this->getStats($queue->getName())->getReady()) { $queue->resume(); From c26cf5660177eae7c5bacf0372f1edfa2237cc99 Mon Sep 17 00:00:00 2001 From: Henrik Malmberg Date: Wed, 23 Jul 2025 13:02:21 +0200 Subject: [PATCH 2/7] feat: implement HasQueueOptions interface and refactor RoadRunnerQueue for options handling --- src/Queue/Contract/HasQueueOptions.php | 8 +++++ src/Queue/RoadRunnerConnector.php | 21 +----------- src/Queue/RoadRunnerQueue.php | 44 ++++++++++++++++++++++---- 3 files changed, 46 insertions(+), 27 deletions(-) create mode 100644 src/Queue/Contract/HasQueueOptions.php diff --git a/src/Queue/Contract/HasQueueOptions.php b/src/Queue/Contract/HasQueueOptions.php new file mode 100644 index 0000000..9e8ce13 --- /dev/null +++ b/src/Queue/Contract/HasQueueOptions.php @@ -0,0 +1,8 @@ +getOptions($config['options'] ?? []), $config['queue'], + $config['options'] ?? [], ); } - - private function getOptions(array $config): OptionsInterface - { - $options = new Options( - $config['delay'] ?? 0, - $config['priority'] ?? 0, - $config['auto_ack'] ?? false - ); - - return match ($config['driver'] ?? null) { - Driver::Kafka => KafkaOptions::from($options) - ->withTopic($config['topic'] ?? 'default'), - default => $options, - }; - } } diff --git a/src/Queue/RoadRunnerQueue.php b/src/Queue/RoadRunnerQueue.php index 6253249..255f202 100644 --- a/src/Queue/RoadRunnerQueue.php +++ b/src/Queue/RoadRunnerQueue.php @@ -12,16 +12,20 @@ use RoadRunner\Jobs\DTO\V1\Stats; use Spiral\Goridge\RPC\RPCInterface; use Spiral\RoadRunner\Jobs\Jobs; +use Spiral\RoadRunner\Jobs\KafkaOptions; +use Spiral\RoadRunner\Jobs\Options; use Spiral\RoadRunner\Jobs\OptionsInterface; +use Spiral\RoadRunner\Jobs\Queue\Driver; use Spiral\RoadRunner\Jobs\QueueInterface; +use Spiral\RoadRunnerLaravel\Queue\Contract\HasQueueOptions; final class RoadRunnerQueue extends Queue implements QueueContract { public function __construct( private readonly Jobs $jobs, private readonly RPCInterface $rpc, - private readonly OptionsInterface $options, private readonly string $default = 'default', + private readonly array $defaultOptions = [], ) {} public function push($job, $data = '', $queue = null): string @@ -31,13 +35,13 @@ public function push($job, $data = '', $queue = null): string $this->createPayload($job, $queue, $data), $queue, null, - fn($payload, $queue) => $this->pushRaw($payload, $queue), + fn($payload, $queue) => $this->pushRaw($payload, $queue, $this->getJobOverrideOptions($job)), ); } public function pushRaw($payload, $queue = null, array $options = []): string { - $queue = $this->getQueue($queue); + $queue = $this->getQueue($queue, $options); $task = $queue->dispatch( $queue @@ -54,7 +58,7 @@ public function later($delay, $job, $data = '', $queue = null): string $this->createPayload($job, $queue, $data), $queue, $delay, - fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue), + fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue, $this->getJobOverrideOptions($job)), ); } @@ -90,8 +94,9 @@ private function laterRaw( \DateTimeInterface|\DateInterval|int $delay, array $payload, ?string $queue = null, + array $options = [] ): string { - $queue = $this->getQueue($queue); + $queue = $this->getQueue($queue, $options); $task = $queue->dispatch( $queue @@ -103,9 +108,9 @@ private function laterRaw( return $task->getId(); } - private function getQueue(?string $queue = null): QueueInterface + private function getQueue(?string $queue = null, array $options = []): QueueInterface { - $queue = $this->jobs->connect($queue ?? $this->default, $this->options); + $queue = $this->jobs->connect($queue ?? $this->default, $this->getQueueOptions($options)); if (!$this->getStats($queue->getName())->getReady()) { $queue->resume(); @@ -129,4 +134,29 @@ private function getStats(?string $queue = null): Stat return new Stat(); } + + private function getJobOverrideOptions(string|object $job): array + { + if ($job instanceof HasQueueOptions) { + return $job->queueOptions(); + } + + return []; + } + + private function getQueueOptions(array $overrides = []): OptionsInterface + { + $config = array_merge($this->defaultOptions, $overrides); + $options = new Options( + $config['delay'] ?? 0, + $config['priority'] ?? 0, + $config['auto_ack'] ?? false + ); + + return match ($config['driver'] ?? null) { + Driver::Kafka => KafkaOptions::from($options) + ->withTopic($config['topic'] ?? 'default'), + default => $options, + }; + } } From 3e5149db9115e665040034d7b3d77ea0579dc882 Mon Sep 17 00:00:00 2001 From: Henrik Malmberg Date: Wed, 23 Jul 2025 13:10:32 +0200 Subject: [PATCH 3/7] update CHANGELOG to include changes --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f5b1fa..167c72c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog][keepachangelog] and this project adher ### Added - gRPC client support +- Added support to set queue options globally or per job [#158] ## Unreleased From 6bd9276826bc5303891b410f423033f5f8768999 Mon Sep 17 00:00:00 2001 From: Henrik Malmberg Date: Wed, 23 Jul 2025 13:25:26 +0200 Subject: [PATCH 4/7] fix: change queueOptions method visibility to public in HasQueueOptions interface --- src/Queue/Contract/HasQueueOptions.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queue/Contract/HasQueueOptions.php b/src/Queue/Contract/HasQueueOptions.php index 9e8ce13..941ffd2 100644 --- a/src/Queue/Contract/HasQueueOptions.php +++ b/src/Queue/Contract/HasQueueOptions.php @@ -4,5 +4,5 @@ interface HasQueueOptions { - function queueOptions(): array; + public function queueOptions(): array; } From 70048d1eca862a2a081e482d2cb8d1f062e96b53 Mon Sep 17 00:00:00 2001 From: Henrik Malmberg Date: Wed, 23 Jul 2025 13:28:34 +0200 Subject: [PATCH 5/7] feat: handle cases where $job is a string --- src/Queue/RoadRunnerQueue.php | 137 ++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 64 deletions(-) diff --git a/src/Queue/RoadRunnerQueue.php b/src/Queue/RoadRunnerQueue.php index 255f202..7aa1af6 100644 --- a/src/Queue/RoadRunnerQueue.php +++ b/src/Queue/RoadRunnerQueue.php @@ -26,7 +26,8 @@ public function __construct( private readonly RPCInterface $rpc, private readonly string $default = 'default', private readonly array $defaultOptions = [], - ) {} + ) { + } public function push($job, $data = '', $queue = null): string { @@ -51,40 +52,75 @@ public function pushRaw($payload, $queue = null, array $options = []): string return $task->getId(); } - public function later($delay, $job, $data = '', $queue = null): string + private function getQueue(?string $queue = null, array $options = []): QueueInterface { - return $this->enqueueUsing( - $job, - $this->createPayload($job, $queue, $data), - $queue, - $delay, - fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue, $this->getJobOverrideOptions($job)), - ); + $queue = $this->jobs->connect($queue ?? $this->default, $this->getQueueOptions($options)); + + if (!$this->getStats($queue->getName())->getReady()) { + $queue->resume(); + } + + return $queue; } - public function pop($queue = null): void + private function getQueueOptions(array $overrides = []): OptionsInterface { - throw new \BadMethodCallException('Pop is not supported'); + $config = array_merge($this->defaultOptions, $overrides); + $options = new Options( + $config['delay'] ?? 0, + $config['priority'] ?? 0, + $config['auto_ack'] ?? false + ); + + return match ($config['driver'] ?? null) { + Driver::Kafka => KafkaOptions::from($options) + ->withTopic($config['topic'] ?? 'default'), + default => $options, + }; } - public function size($queue = null): int + private function getStats(?string $queue = null): Stat { - $stats = $this->getStats($queue); + $queue ??= $this->default; - return $stats->getActive() + $stats->getDelayed(); + $stats = $this->rpc->call('jobs.Stat', new Stats(), Stats::class)->getStats(); + + /** @var Stat $stat */ + foreach ($stats as $stat) { + if ($stat->getPipeline() === $queue) { + return $stat; + } + } + + return new Stat(); } - /** - * Get the "available at" UNIX timestamp. - * @param mixed $delay - */ - protected function availableAt($delay = 0): int + private function getJobOverrideOptions(string|object $job): array { - $delay = $this->parseDateInterval($delay); + if (is_string($job) && class_exists($job)) { + $job = app($job); - return $delay instanceof \DateTimeInterface - ? Carbon::parse($delay)->diffInSeconds() - : $delay; + if ($job instanceof HasQueueOptions) { + return $job->queueOptions(); + } + } + + if ($job instanceof HasQueueOptions) { + return $job->queueOptions(); + } + + return []; + } + + public function later($delay, $job, $data = '', $queue = null): string + { + return $this->enqueueUsing( + $job, + $this->createPayload($job, $queue, $data), + $queue, + $delay, + fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue, $this->getJobOverrideOptions($job)), + ); } /** @@ -108,55 +144,28 @@ private function laterRaw( return $task->getId(); } - private function getQueue(?string $queue = null, array $options = []): QueueInterface - { - $queue = $this->jobs->connect($queue ?? $this->default, $this->getQueueOptions($options)); - - if (!$this->getStats($queue->getName())->getReady()) { - $queue->resume(); - } - - return $queue; - } - - private function getStats(?string $queue = null): Stat + /** + * Get the "available at" UNIX timestamp. + * @param mixed $delay + */ + protected function availableAt($delay = 0): int { - $queue ??= $this->default; - - $stats = $this->rpc->call('jobs.Stat', new Stats(), Stats::class)->getStats(); - - /** @var Stat $stat */ - foreach ($stats as $stat) { - if ($stat->getPipeline() === $queue) { - return $stat; - } - } + $delay = $this->parseDateInterval($delay); - return new Stat(); + return $delay instanceof \DateTimeInterface + ? Carbon::parse($delay)->diffInSeconds() + : $delay; } - private function getJobOverrideOptions(string|object $job): array + public function pop($queue = null): void { - if ($job instanceof HasQueueOptions) { - return $job->queueOptions(); - } - - return []; + throw new \BadMethodCallException('Pop is not supported'); } - private function getQueueOptions(array $overrides = []): OptionsInterface + public function size($queue = null): int { - $config = array_merge($this->defaultOptions, $overrides); - $options = new Options( - $config['delay'] ?? 0, - $config['priority'] ?? 0, - $config['auto_ack'] ?? false - ); + $stats = $this->getStats($queue); - return match ($config['driver'] ?? null) { - Driver::Kafka => KafkaOptions::from($options) - ->withTopic($config['topic'] ?? 'default'), - default => $options, - }; + return $stats->getActive() + $stats->getDelayed(); } } From b82fdb514a95c1cd50046124fbff9ccea4b9ed0d Mon Sep 17 00:00:00 2001 From: Henrik Malmberg Date: Wed, 23 Jul 2025 13:31:57 +0200 Subject: [PATCH 6/7] feat: update options handling in RoadRunnerQueue to use default values from OptionsInterface --- src/Queue/RoadRunnerQueue.php | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Queue/RoadRunnerQueue.php b/src/Queue/RoadRunnerQueue.php index 7aa1af6..6c5bf0a 100644 --- a/src/Queue/RoadRunnerQueue.php +++ b/src/Queue/RoadRunnerQueue.php @@ -13,6 +13,7 @@ use Spiral\Goridge\RPC\RPCInterface; use Spiral\RoadRunner\Jobs\Jobs; use Spiral\RoadRunner\Jobs\KafkaOptions; +use Spiral\RoadRunner\Jobs\KafkaOptionsInterface; use Spiral\RoadRunner\Jobs\Options; use Spiral\RoadRunner\Jobs\OptionsInterface; use Spiral\RoadRunner\Jobs\Queue\Driver; @@ -67,14 +68,14 @@ private function getQueueOptions(array $overrides = []): OptionsInterface { $config = array_merge($this->defaultOptions, $overrides); $options = new Options( - $config['delay'] ?? 0, - $config['priority'] ?? 0, - $config['auto_ack'] ?? false + $config['delay'] ?? OptionsInterface::DEFAULT_DELAY, + $config['priority'] ?? OptionsInterface::DEFAULT_PRIORITY, + $config['auto_ack'] ?? OptionsInterface::DEFAULT_AUTO_ACK, ); return match ($config['driver'] ?? null) { Driver::Kafka => KafkaOptions::from($options) - ->withTopic($config['topic'] ?? 'default'), + ->withTopic($config['topic'] ?? ($this->defaultOptions['topic'] ?? '')), default => $options, }; } From 9fe83780ec65ee036ef0487ec628930c46e55e01 Mon Sep 17 00:00:00 2001 From: Henrik Malmberg Date: Mon, 28 Jul 2025 08:53:50 +0200 Subject: [PATCH 7/7] fix: update queueOptions method return type to OptionsInterface in HasQueueOptions --- src/Queue/Contract/HasQueueOptions.php | 4 +++- src/Queue/RoadRunnerQueue.php | 10 ++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Queue/Contract/HasQueueOptions.php b/src/Queue/Contract/HasQueueOptions.php index 941ffd2..85f7c64 100644 --- a/src/Queue/Contract/HasQueueOptions.php +++ b/src/Queue/Contract/HasQueueOptions.php @@ -2,7 +2,9 @@ namespace Spiral\RoadRunnerLaravel\Queue\Contract; +use Spiral\RoadRunner\Jobs\OptionsInterface; + interface HasQueueOptions { - public function queueOptions(): array; + public function queueOptions(): OptionsInterface; } diff --git a/src/Queue/RoadRunnerQueue.php b/src/Queue/RoadRunnerQueue.php index 6c5bf0a..1718420 100644 --- a/src/Queue/RoadRunnerQueue.php +++ b/src/Queue/RoadRunnerQueue.php @@ -13,7 +13,6 @@ use Spiral\Goridge\RPC\RPCInterface; use Spiral\RoadRunner\Jobs\Jobs; use Spiral\RoadRunner\Jobs\KafkaOptions; -use Spiral\RoadRunner\Jobs\KafkaOptionsInterface; use Spiral\RoadRunner\Jobs\Options; use Spiral\RoadRunner\Jobs\OptionsInterface; use Spiral\RoadRunner\Jobs\Queue\Driver; @@ -100,14 +99,13 @@ private function getJobOverrideOptions(string|object $job): array { if (is_string($job) && class_exists($job)) { $job = app($job); - - if ($job instanceof HasQueueOptions) { - return $job->queueOptions(); - } } if ($job instanceof HasQueueOptions) { - return $job->queueOptions(); + $options = $job->queueOptions(); + if ($options instanceof Options) { + return $options->toArray(); + } } return [];