Skip to content

Add support to set queue options, either globally using queue.php or by implementing an interface #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

bohemima
Copy link

@bohemima bohemima commented Jul 23, 2025

Description

This PR adds support to set queue options, both globally and per job.

    'connections' => [
        // ... other connections
        'roadrunner' => [
            'driver' => 'roadrunner',
            'queue' => env('RR_QUEUE', 'default'),
            'retry_after' => (int) env('RR_QUEUE_RETRY_AFTER', 90),
            'after_commit' => false,
            'options' => [
                'driver' => Driver::Kafka,
                'delay' => (int) env('RR_QUEUE_DELAY', 0),
                'priority' => (int) env('RR_QUEUE_PRIORITY', 0),
                'auto_ack' => (bool) env('RR_QUEUE_AUTO_ACK', true),
                'topic' => env('RR_QUEUE_KAFKA_TOPIC', 'default-topic-for-jobs'),
            ]
        ],
    ],

Or by implementing HasQueueOptions interface on the job:

<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Spiral\RoadRunnerLaravel\Queue\Contract\HasQueueOptions;

class TestJob implements ShouldQueue, HasQueueOptions
{
    use Queueable;

    public function __construct() {}

    public function handle(): void
    {
        // Do work
    }

    function queueOptions(): array
    {
        return [
            'topic' => 'some-other-topic',
        ];
    }
}

Fixes #158

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I wrote unit tests for my code (if tests is required for my changes)
  • I have made changes in CHANGELOG.md file

Summary by CodeRabbit

  • New Features
    • Added support for setting queue options globally or per job, allowing greater flexibility in queue configuration.
  • Documentation
    • Updated changelog to reflect new queue options feature.

@bohemima bohemima requested a review from butschster as a code owner July 23, 2025 11:11
Copy link

coderabbitai bot commented Jul 23, 2025

Walkthrough

A new interface for queue options was introduced, and the RoadRunner queue system was enhanced to allow setting queue options globally or per job. The connector and queue classes were updated to handle these options, enabling dynamic configuration such as Kafka topic selection. The changelog was updated to reflect these changes.

Changes

File(s) Change Summary
CHANGELOG.md Added entry describing support for setting queue options globally or per job.
src/Queue/Contract/HasQueueOptions.php Introduced new HasQueueOptions interface with queueOptions() method.
src/Queue/RoadRunnerConnector.php Modified to pass optional 'options' from config to queue constructor.
src/Queue/RoadRunnerQueue.php Enhanced to support per-job and global queue options, added new methods and updated signatures.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant Connector as RoadRunnerConnector
    participant Queue as RoadRunnerQueue
    participant Job as Job (implements HasQueueOptions?)
    participant RRQueue as RoadRunner Queue Driver

    App->>Connector: connect(config)
    Connector->>Queue: new RoadRunnerQueue(..., config['options'] ?? [])
    App->>Queue: push(job)
    Queue->>Job: queueOptions() (if HasQueueOptions)
    Queue->>RRQueue: getQueue(queue, mergedOptions)
    RRQueue-->>Queue: QueueInterface
    Queue->>RRQueue: dispatch job with options
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~45 minutes

Assessment against linked issues

Objective Addressed Explanation
Support setting Kafka topic options in RoadRunner Laravel queue (#158)

Assessment against linked issues: Out-of-scope changes

No out-of-scope changes detected.

Poem

In the warren where queues now hop,
Options can be set, on the fly or atop.
Kafka’s topic, a breeze to choose,
With interfaces new, there’s much less to lose.
Each job can whisper its own request,
The burrow’s dispatch is now truly blessed!
🐇✨

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 PHPStan (2.1.17)

Note: Using configuration file /phpstan.neon.dist.
Path /fixes does not exist

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/Queue/RoadRunnerQueue.php (1)

147-161: Consider documenting the auto_ack default behavior

The auto_ack parameter defaults to false, which means messages won't be automatically acknowledged. This could lead to message redelivery if not handled properly by the job processor.

Consider adding a comment explaining this behavior or documenting it in the configuration file to ensure developers are aware of the implications:

 private function getQueueOptions(array $overrides = []): OptionsInterface
 {
     $config = array_merge($this->defaultOptions, $overrides);
+    // Note: auto_ack defaults to false, requiring manual acknowledgment of messages
     $options = new Options(
         $config['delay'] ?? 0,
         $config['priority'] ?? 0,
         $config['auto_ack'] ?? false
     );
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 47c04da and 3e5149d.

📒 Files selected for processing (4)
  • CHANGELOG.md (1 hunks)
  • src/Queue/Contract/HasQueueOptions.php (1 hunks)
  • src/Queue/RoadRunnerConnector.php (1 hunks)
  • src/Queue/RoadRunnerQueue.php (6 hunks)
🔇 Additional comments (2)
CHANGELOG.md (1)

10-10: LGTM!

The changelog entry is properly formatted and clearly describes the new feature.

src/Queue/RoadRunnerConnector.php (1)

25-30: LGTM!

The implementation correctly passes the options configuration to the RoadRunnerQueue constructor with a sensible default (empty array) when options are not configured.

Comment on lines 5 to 8
interface HasQueueOptions
{
function queueOptions(): array;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add explicit visibility modifier and return type declaration

While interface methods are implicitly public in PHP, it's a best practice to explicitly declare the visibility modifier. Additionally, add a return type declaration for better type safety.

 interface HasQueueOptions
 {
-    function queueOptions(): array;
+    public function queueOptions(): array;
 }

Also consider adding PHPDoc to document the expected structure of the returned array:

interface HasQueueOptions
{
    /**
     * Get queue options for this job.
     * 
     * @return array{
     *     driver?: string,
     *     delay?: int,
     *     priority?: int,
     *     auto_ack?: bool,
     *     topic?: string
     * }
     */
    public function queueOptions(): array;
}
🤖 Prompt for AI Agents
In src/Queue/Contract/HasQueueOptions.php lines 5 to 8, explicitly add the
public visibility modifier to the queueOptions method and declare its return
type as array. Additionally, include a PHPDoc block above the method to document
the expected structure of the returned array, specifying optional keys like
driver, delay, priority, auto_ack, and topic with their respective types for
better clarity and type safety.

@butschster
Copy link
Contributor

Maybe it's better to use OptionsInterfaces as return type for queueOptions

@bohemima
Copy link
Author

Maybe it's better to use OptionsInterfaces as return type for queueOptions

The reason to return array is so we can merge it with the global options

@butschster
Copy link
Contributor

butschster commented Jul 26, 2025

Maybe it's better to use OptionsInterfaces as return type for queueOptions

The reason to return array is so we can merge it with the global options

There is merge method in Options and KafkaOptions https://github.com/roadrunner-php/jobs/blob/4.x/src/KafkaOptions.php#L133. We can build object from default config and merge it into provided by Job::queueOptions

What do you think?

@butschster butschster self-assigned this Jul 26, 2025
@butschster butschster added the type:enhancement New feature or request label Jul 26, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/Queue/RoadRunnerQueue.php (1)

159-162: Address the unused parameter warning.

The static analysis correctly identifies that the $queue parameter is unused. Since this method throws an exception and doesn't use the parameter, consider removing it or adding a comment explaining why it's kept for interface compliance.

-    public function pop($queue = null): void
+    public function pop($queue = null): void // @phpmd-ignore-param $queue - kept for interface compliance

Or if the parameter isn't required by the interface:

-    public function pop($queue = null): void
+    public function pop(): void
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b82fdb5 and 9fe8378.

📒 Files selected for processing (2)
  • src/Queue/Contract/HasQueueOptions.php (1 hunks)
  • src/Queue/RoadRunnerQueue.php (5 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
src/Queue/RoadRunnerQueue.php (2)
src/Queue/RoadRunnerConnector.php (1)
  • connect (19-31)
src/Queue/Contract/HasQueueOptions.php (1)
  • queueOptions (9-9)
🪛 PHPMD (2.15.0)
src/Queue/RoadRunnerQueue.php

159-159: Avoid unused parameters such as '$queue'. (Unused Code Rules)

(UnusedFormalParameter)

🔇 Additional comments (13)
src/Queue/Contract/HasQueueOptions.php (1)

7-10: LGTM! Good design choice with OptionsInterface return type.

The interface is well-designed with proper type safety. Returning OptionsInterface instead of a plain array (as discussed in PR comments) is the correct approach as it provides better type safety and leverages existing merge capabilities in the Options classes.

src/Queue/RoadRunnerQueue.php (12)

15-20: LGTM! Proper imports for queue options functionality.

The new imports are correctly added to support the queue options feature, including the necessary classes from RoadRunner Jobs and the new HasQueueOptions interface.


28-30: LGTM! Constructor properly extended for default options.

The constructor correctly accepts the new $defaultOptions parameter with appropriate default value and readonly visibility modifier.


39-39: LGTM! Proper integration of job-specific options in push method.

The push method correctly retrieves job-specific options using getJobOverrideOptions and passes them to pushRaw. This enables per-job queue option customization as intended.


45-45: LGTM! Options parameter properly passed to getQueue.

The pushRaw method correctly passes the options array to getQueue for processing.


55-64: LGTM! Enhanced queue connection with options and auto-resume.

The getQueue method properly:

  • Merges options using getQueueOptions
  • Checks queue readiness using stats
  • Auto-resumes queue if not ready

This improves reliability and supports the new options functionality.


66-80: LGTM! Well-implemented queue options handling with driver-specific support.

The getQueueOptions method:

  • Properly merges default and override options
  • Creates appropriate Options instance with sensible defaults
  • Supports driver-specific options (Kafka with topic)
  • Uses match expression for clean driver selection

82-96: LGTM! Proper stats retrieval with fallback.

The getStats method correctly:

  • Handles null queue parameter with default
  • Iterates through stats to find matching pipeline
  • Returns empty Stat as fallback

121-121: LGTM! Consistent job options handling in later method.

The later method correctly applies the same job options pattern as the push method, ensuring consistency across delayed job scheduling.


132-134: LGTM! Proper options handling in laterRaw method.

The laterRaw method correctly accepts and passes options to getQueue, maintaining the options chain for delayed jobs.


150-157: LGTM! Improved availableAt method with better type hints.

The method is now properly documented and handles different delay types correctly. The protected visibility is appropriate for potential inheritance.


164-169: LGTM! Proper queue size calculation using stats.

The size method correctly uses the new getStats method and calculates total queue size by combining active and delayed jobs.


98-112: Confirm presence of Options::toArray()
I searched the local codebase and didn’t find a toArray() method on any Options class. If Options comes from an external dependency (e.g. Spiral\RoadRunner\Jobs\Options), please:

  • Verify that the imported Options class defines toArray().
  • If it does not, add or polyfill that method (or adjust this call) to prevent a fatal error at runtime.

File needing attention:
• src/Queue/RoadRunnerQueue.php (lines 98–112)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

How to set topic when using Kafka as a queue in RR?
2 participants