diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f5b1fa..167c72c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog][keepachangelog] and this project adher ### Added - gRPC client support +- Added support to set queue options globally or per job [#158] ## Unreleased diff --git a/src/Queue/Contract/HasQueueOptions.php b/src/Queue/Contract/HasQueueOptions.php new file mode 100644 index 0000000..85f7c64 --- /dev/null +++ b/src/Queue/Contract/HasQueueOptions.php @@ -0,0 +1,10 @@ +createPayload($job, $queue, $data), $queue, null, - fn($payload, $queue) => $this->pushRaw($payload, $queue), + fn($payload, $queue) => $this->pushRaw($payload, $queue, $this->getJobOverrideOptions($job)), ); } public function pushRaw($payload, $queue = null, array $options = []): string { - $queue = $this->getQueue($queue); + $queue = $this->getQueue($queue, $options); $task = $queue->dispatch( $queue @@ -45,40 +52,74 @@ public function pushRaw($payload, $queue = null, array $options = []): string return $task->getId(); } - public function later($delay, $job, $data = '', $queue = null): string + private function getQueue(?string $queue = null, array $options = []): QueueInterface { - return $this->enqueueUsing( - $job, - $this->createPayload($job, $queue, $data), - $queue, - $delay, - fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue), - ); + $queue = $this->jobs->connect($queue ?? $this->default, $this->getQueueOptions($options)); + + if (!$this->getStats($queue->getName())->getReady()) { + $queue->resume(); + } + + return $queue; } - public function pop($queue = null): void + private function getQueueOptions(array $overrides = []): OptionsInterface { - throw new \BadMethodCallException('Pop is not supported'); + $config = array_merge($this->defaultOptions, $overrides); + $options = new Options( + $config['delay'] ?? OptionsInterface::DEFAULT_DELAY, + $config['priority'] ?? OptionsInterface::DEFAULT_PRIORITY, + $config['auto_ack'] ?? OptionsInterface::DEFAULT_AUTO_ACK, + ); + + return match ($config['driver'] ?? null) { + Driver::Kafka => KafkaOptions::from($options) + ->withTopic($config['topic'] ?? ($this->defaultOptions['topic'] ?? '')), + default => $options, + }; } - public function size($queue = null): int + private function getStats(?string $queue = null): Stat { - $stats = $this->getStats($queue); + $queue ??= $this->default; - return $stats->getActive() + $stats->getDelayed(); + $stats = $this->rpc->call('jobs.Stat', new Stats(), Stats::class)->getStats(); + + /** @var Stat $stat */ + foreach ($stats as $stat) { + if ($stat->getPipeline() === $queue) { + return $stat; + } + } + + return new Stat(); } - /** - * Get the "available at" UNIX timestamp. - * @param mixed $delay - */ - protected function availableAt($delay = 0): int + private function getJobOverrideOptions(string|object $job): array { - $delay = $this->parseDateInterval($delay); + if (is_string($job) && class_exists($job)) { + $job = app($job); + } - return $delay instanceof \DateTimeInterface - ? Carbon::parse($delay)->diffInSeconds() - : $delay; + if ($job instanceof HasQueueOptions) { + $options = $job->queueOptions(); + if ($options instanceof Options) { + return $options->toArray(); + } + } + + return []; + } + + public function later($delay, $job, $data = '', $queue = null): string + { + return $this->enqueueUsing( + $job, + $this->createPayload($job, $queue, $data), + $queue, + $delay, + fn($payload, $queue) => $this->laterRaw($delay, $payload, $queue, $this->getJobOverrideOptions($job)), + ); } /** @@ -88,8 +129,9 @@ private function laterRaw( \DateTimeInterface|\DateInterval|int $delay, array $payload, ?string $queue = null, + array $options = [] ): string { - $queue = $this->getQueue($queue); + $queue = $this->getQueue($queue, $options); $task = $queue->dispatch( $queue @@ -101,30 +143,28 @@ private function laterRaw( return $task->getId(); } - private function getQueue(?string $queue = null): QueueInterface + /** + * Get the "available at" UNIX timestamp. + * @param mixed $delay + */ + protected function availableAt($delay = 0): int { - $queue = $this->jobs->connect($queue ?? $this->default); - - if (!$this->getStats($queue->getName())->getReady()) { - $queue->resume(); - } + $delay = $this->parseDateInterval($delay); - return $queue; + return $delay instanceof \DateTimeInterface + ? Carbon::parse($delay)->diffInSeconds() + : $delay; } - private function getStats(?string $queue = null): Stat + public function pop($queue = null): void { - $queue ??= $this->default; - - $stats = $this->rpc->call('jobs.Stat', new Stats(), Stats::class)->getStats(); + throw new \BadMethodCallException('Pop is not supported'); + } - /** @var Stat $stat */ - foreach ($stats as $stat) { - if ($stat->getPipeline() === $queue) { - return $stat; - } - } + public function size($queue = null): int + { + $stats = $this->getStats($queue); - return new Stat(); + return $stats->getActive() + $stats->getDelayed(); } }