diff --git a/composer.json b/composer.json
index 3f7af79..bdb6052 100644
--- a/composer.json
+++ b/composer.json
@@ -26,6 +26,7 @@
"require": {
"php": ">=8.3",
"php-amqplib/php-amqplib": "^3.7",
+ "dragonmantank/cron-expression": "^3.0",
"utopia-php/console": "0.0.*",
"utopia-php/framework": "0.33.*",
"utopia-php/telemetry": "*",
diff --git a/composer.lock b/composer.lock
index 91313d9..cc1d3ee 100644
--- a/composer.lock
+++ b/composer.lock
@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
- "content-hash": "65969e3acad5417806c2128aef929e66",
+ "content-hash": "92017a561ec855898b19a7d78453a115",
"packages": [
{
"name": "brick/math",
@@ -143,6 +143,70 @@
],
"time": "2025-08-20T19:15:30+00:00"
},
+ {
+ "name": "dragonmantank/cron-expression",
+ "version": "v3.6.0",
+ "source": {
+ "type": "git",
+ "url": "https://github.com/dragonmantank/cron-expression.git",
+ "reference": "d61a8a9604ec1f8c3d150d09db6ce98b32675013"
+ },
+ "dist": {
+ "type": "zip",
+ "url": "https://api.github.com/repos/dragonmantank/cron-expression/zipball/d61a8a9604ec1f8c3d150d09db6ce98b32675013",
+ "reference": "d61a8a9604ec1f8c3d150d09db6ce98b32675013",
+ "shasum": ""
+ },
+ "require": {
+ "php": "^8.2|^8.3|^8.4|^8.5"
+ },
+ "replace": {
+ "mtdowling/cron-expression": "^1.0"
+ },
+ "require-dev": {
+ "phpstan/extension-installer": "^1.4.3",
+ "phpstan/phpstan": "^1.12.32|^2.1.31",
+ "phpunit/phpunit": "^8.5.48|^9.0"
+ },
+ "type": "library",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "3.x-dev"
+ }
+ },
+ "autoload": {
+ "psr-4": {
+ "Cron\\": "src/Cron/"
+ }
+ },
+ "notification-url": "https://packagist.org/downloads/",
+ "license": [
+ "MIT"
+ ],
+ "authors": [
+ {
+ "name": "Chris Tankersley",
+ "email": "chris@ctankersley.com",
+ "homepage": "https://github.com/dragonmantank"
+ }
+ ],
+ "description": "CRON for PHP: Calculate the next or previous run date and determine if a CRON expression is due",
+ "keywords": [
+ "cron",
+ "schedule"
+ ],
+ "support": {
+ "issues": "https://github.com/dragonmantank/cron-expression/issues",
+ "source": "https://github.com/dragonmantank/cron-expression/tree/v3.6.0"
+ },
+ "funding": [
+ {
+ "url": "https://github.com/dragonmantank",
+ "type": "github"
+ }
+ ],
+ "time": "2025-10-31T18:51:33+00:00"
+ },
{
"name": "google/protobuf",
"version": "v4.33.2",
@@ -4540,5 +4604,5 @@
"platform-dev": {
"ext-redis": "*"
},
- "plugin-api-version": "2.6.0"
+ "plugin-api-version": "2.9.0"
}
diff --git a/docker-compose.yml b/docker-compose.yml
index 6b230a3..b00f81e 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -10,6 +10,7 @@ services:
- swoole
- swoole-amqp
- swoole-redis-cluster
+ - swoole-redis-streams
- workerman
swoole:
@@ -35,6 +36,17 @@ services:
redis-cluster-0:
condition: service_healthy
+ swoole-redis-streams:
+ container_name: swoole-redis-streams
+ build: ./tests/Queue/servers/SwooleRedisStreams/.
+ command: php /usr/src/code/tests/Queue/servers/SwooleRedisStreams/worker.php
+ volumes:
+ - ./vendor:/usr/src/code/vendor
+ - ./src:/usr/src/code/src
+ - ./tests:/usr/src/code/tests
+ depends_on:
+ - redis
+
swoole-amqp:
container_name: swoole-amqp
build: ./tests/Queue/servers/AMQP/.
@@ -61,8 +73,6 @@ services:
redis:
container_name: redis
image: "redis:alpine"
- ports:
- - "6379:6379"
redis-cluster-0:
image: docker.io/bitnamilegacy/redis-cluster:7.4
diff --git a/phpstan.neon b/phpstan.neon
index 6852b4c..7e8118f 100644
--- a/phpstan.neon
+++ b/phpstan.neon
@@ -6,4 +6,12 @@ parameters:
- tests
scanDirectories:
- - vendor/swoole
\ No newline at end of file
+ - vendor/swoole
+
+ # phpredis stubs don't include Redis Streams methods (xAdd, xGroup, etc.)
+ # These methods exist at runtime but PHPStan's stubs are incomplete
+ # See: https://github.com/phpredis/phpredis-stubs/issues
+ ignoreErrors:
+ -
+ message: '#Call to an undefined method RedisCluster::(x(Add|Group|ReadGroup|Ack|Pending|Claim|AutoClaim|Del|Len|Trim|Info|Range|RevRange)|eval)\(\)#'
+ path: src/Queue/Connection/RedisStreamCluster.php
\ No newline at end of file
diff --git a/phpunit.xml b/phpunit.xml
index 1b8f40d..e004d23 100644
--- a/phpunit.xml
+++ b/phpunit.xml
@@ -12,5 +12,8 @@
./tests/Queue/E2E/Adapter
+
+ ./tests/Queue/Unit
+
\ No newline at end of file
diff --git a/src/Queue/Broker/RedisStreams.php b/src/Queue/Broker/RedisStreams.php
new file mode 100644
index 0000000..a4da641
--- /dev/null
+++ b/src/Queue/Broker/RedisStreams.php
@@ -0,0 +1,931 @@
+consumerId = 'worker-' . \uniqid();
+ }
+
+ /**
+ * Set the consumer ID for this broker instance.
+ *
+ * @param string $consumerId
+ * @return void
+ */
+ public function setConsumerId(string $consumerId): void
+ {
+ $this->consumerId = $consumerId;
+ }
+
+ /**
+ * Get the consumer ID for this broker instance.
+ *
+ * @return string
+ */
+ public function getConsumerId(): string
+ {
+ return $this->consumerId;
+ }
+
+
+ /**
+ * @inheritDoc
+ */
+ public function enqueue(Queue $queue, array $payload): bool
+ {
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+
+ // Ensure consumer group exists
+ $this->ensureConsumerGroup($streamKey, $groupName);
+
+ $messageData = [
+ 'pid' => \uniqid(more_entropy: true),
+ 'queue' => $queue->name,
+ 'timestamp' => \time(),
+ 'payload' => $payload,
+ ];
+
+ $encodedData = \json_encode($messageData);
+ if ($encodedData === false) {
+ throw new \RuntimeException('Failed to encode message data: ' . \json_last_error_msg());
+ }
+
+ $fields = [
+ 'data' => $encodedData,
+ 'retry_count' => '0',
+ ];
+
+ $result = $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+
+ return $result !== false;
+ }
+
+ /**
+ * Enqueue a job to be processed after a delay.
+ *
+ * @param Queue $queue
+ * @param array $payload
+ * @param int $delaySeconds Seconds to delay before processing
+ * @return bool
+ */
+ public function enqueueDelayed(Queue $queue, array $payload, int $delaySeconds): bool
+ {
+ if ($delaySeconds < 0) {
+ throw new \InvalidArgumentException('Delay seconds must be non-negative');
+ }
+
+ $delayedKey = $this->getDelayedKey($queue);
+
+ $messageData = [
+ 'pid' => \uniqid(more_entropy: true),
+ 'queue' => $queue->name,
+ 'timestamp' => \time(),
+ 'payload' => $payload,
+ ];
+
+ $encodedData = \json_encode($messageData);
+ if ($encodedData === false) {
+ throw new \RuntimeException('Failed to encode message data: ' . \json_last_error_msg());
+ }
+
+ $fields = [
+ 'data' => $encodedData,
+ 'retry_count' => '0',
+ ];
+
+ // Score is the timestamp when the job should be processed (in milliseconds)
+ $executeAt = (int)(\microtime(true) * 1000) + ($delaySeconds * 1000);
+
+ $encodedFields = \json_encode($fields);
+ if ($encodedFields === false) {
+ throw new \RuntimeException('Failed to encode field data: ' . \json_last_error_msg());
+ }
+
+ $result = $this->connection->sortedSetAdd($delayedKey, $executeAt, $encodedFields);
+
+ return $result >= 0;
+ }
+
+ /**
+ * Enqueue a job to be processed at a specific time.
+ *
+ * @param Queue $queue
+ * @param array $payload
+ * @param int $timestamp Unix timestamp when the job should be processed
+ * @return bool
+ */
+ public function enqueueAt(Queue $queue, array $payload, int $timestamp): bool
+ {
+ $delaySeconds = \max(0, $timestamp - \time());
+ return $this->enqueueDelayed($queue, $payload, $delaySeconds);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function retry(Queue $queue, ?int $limit = null): void
+ {
+ $dlqKey = $this->getDlqKey($queue);
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+
+ // Ensure group exists
+ $this->ensureConsumerGroup($streamKey, $groupName);
+
+ // Read from DLQ stream
+ $entries = $this->connection->streamRange($dlqKey, '-', '+', $limit ?? 100);
+
+ $processed = 0;
+ $idsToDelete = [];
+
+ foreach ($entries as $entryId => $fields) {
+ if ($limit !== null && $processed >= $limit) {
+ break;
+ }
+
+ // Reset retry count and re-add to main stream
+ $fields['retry_count'] = '0';
+ unset($fields['error'], $fields['failed_at']);
+
+ $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+ $idsToDelete[] = $entryId;
+ $processed++;
+ }
+
+ // Delete retried entries from DLQ
+ if (!empty($idsToDelete)) {
+ $this->connection->streamDel($dlqKey, $idsToDelete);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
+ {
+ if ($failedJobs) {
+ return $this->connection->streamLen($this->getDlqKey($queue));
+ }
+
+ $streamSize = $this->connection->streamLen($this->getStreamKey($queue));
+ $delayedSize = $this->connection->sortedSetSize($this->getDelayedKey($queue));
+
+ return $streamSize + $delayedSize;
+ }
+
+
+ /**
+ * @inheritDoc
+ */
+ public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
+ {
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+ $dlqKey = $this->getDlqKey($queue);
+ $delayedKey = $this->getDelayedKey($queue);
+
+ // Ensure consumer groups exist
+ $this->ensureConsumerGroup($streamKey, $groupName);
+ $this->ensureConsumerGroup($dlqKey, $groupName);
+
+ while (!$this->closed) {
+ try {
+ // 1. Process due scheduled jobs
+ $this->processScheduledJobs($queue);
+
+ // 2. Process due delayed jobs
+ $this->processDelayedJobs($queue, $delayedKey, $streamKey);
+
+ // 3. Claim abandoned messages from crashed consumers
+ $this->claimAbandonedMessages($streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+
+ // 4. Read new messages from stream
+ $entries = $this->connection->streamReadGroup(
+ $groupName,
+ $this->consumerId,
+ [$streamKey],
+ 1,
+ self::BLOCK_TIMEOUT_MS
+ );
+
+ if ($entries === false || empty($entries)) {
+ continue;
+ }
+
+ foreach ($entries[$streamKey] ?? [] as $entryId => $fields) {
+ $this->processEntry($entryId, $fields, $streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+ }
+ } catch (\RedisException $e) {
+ if ($this->closed) {
+ break;
+ }
+ throw $e;
+ }
+ }
+ }
+
+ /**
+ * Consume from multiple queues simultaneously.
+ *
+ * @param Queue[] $queues Array of queues to consume from
+ * @param callable $messageCallback Receives (Message $message, Queue $queue)
+ * @param callable $successCallback Receives (Message $message, Queue $queue)
+ * @param callable $errorCallback Receives (Message $message, Queue $queue, \Throwable $error)
+ * @return void
+ */
+ public function consumeMultiple(array $queues, callable $messageCallback, callable $successCallback, callable $errorCallback): void
+ {
+ $streamKeys = [];
+ $queueMap = [];
+
+ foreach ($queues as $queue) {
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+
+ // Ensure consumer groups exist
+ $this->ensureConsumerGroup($streamKey, $groupName);
+ $this->ensureConsumerGroup($this->getDlqKey($queue), $groupName);
+
+ $streamKeys[] = $streamKey;
+ $queueMap[$streamKey] = $queue;
+ }
+
+ while (!$this->closed) {
+ try {
+ // Process scheduled and delayed jobs for all queues
+ foreach ($queues as $queue) {
+ $this->processScheduledJobs($queue);
+ $this->processDelayedJobs($queue, $this->getDelayedKey($queue), $this->getStreamKey($queue));
+
+ // Claim abandoned messages
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+ $dlqKey = $this->getDlqKey($queue);
+ $this->claimAbandonedMessages($streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+ }
+
+ // Read from each queue individually with its own consumer group
+ // Note: Redis XREADGROUP requires a single consumer group, so we can't
+ // read from multiple streams with different groups in one call
+ foreach ($queues as $queue) {
+ $streamKey = $this->getStreamKey($queue);
+ $groupName = $this->getGroupName($queue);
+ $dlqKey = $this->getDlqKey($queue);
+
+ $entries = $this->connection->streamReadGroup(
+ $groupName,
+ $this->consumerId,
+ [$streamKey],
+ 1,
+ 0 // Non-blocking to check all queues quickly
+ );
+
+ if ($entries === false || empty($entries)) {
+ continue;
+ }
+
+ foreach ($entries[$streamKey] ?? [] as $entryId => $fields) {
+ $this->processEntry($entryId, $fields, $streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+ }
+ }
+
+ // Brief sleep to prevent tight loop when all queues are empty
+ \usleep(10000); // 10ms
+ } catch (\RedisException $e) {
+ if ($this->closed) {
+ break;
+ }
+ throw $e;
+ }
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function close(): void
+ {
+ $this->closed = true;
+ }
+
+
+ /**
+ * Register a recurring schedule.
+ *
+ * @param Queue $queue
+ * @param Schedule $schedule
+ * @return bool
+ */
+ public function schedule(Queue $queue, Schedule $schedule): bool
+ {
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+
+ // Store schedule definition
+ $encodedSchedule = \json_encode($schedule->toArray());
+ if ($encodedSchedule === false) {
+ throw new \RuntimeException('Failed to encode schedule data: ' . \json_last_error_msg());
+ }
+
+ $stored = $this->connection->hashSet($schedulesKey, $schedule->id, $encodedSchedule);
+
+ if (!$stored) {
+ return false;
+ }
+
+ // Calculate and store next run time
+ $nextRun = $schedule->getNextRunTime();
+ if ($nextRun !== null) {
+ $this->connection->sortedSetAdd($nextKey, (float)($nextRun * 1000), $schedule->id);
+ }
+
+ return true;
+ }
+
+ /**
+ * Remove a recurring schedule.
+ *
+ * @param Queue $queue
+ * @param string $scheduleId
+ * @return bool
+ */
+ public function unschedule(Queue $queue, string $scheduleId): bool
+ {
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+
+ $this->connection->hashDel($schedulesKey, $scheduleId);
+ $this->connection->sortedSetRemove($nextKey, $scheduleId);
+
+ return true;
+ }
+
+ /**
+ * Get a schedule by ID.
+ *
+ * @param Queue $queue
+ * @param string $scheduleId
+ * @return Schedule|null
+ */
+ public function getSchedule(Queue $queue, string $scheduleId): ?Schedule
+ {
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $data = $this->connection->hashGet($schedulesKey, $scheduleId);
+
+ if ($data === false) {
+ return null;
+ }
+
+ $decodedData = \json_decode($data, true);
+ if ($decodedData === null && \json_last_error() !== JSON_ERROR_NONE) {
+ throw new \RuntimeException('Failed to decode schedule data: ' . \json_last_error_msg());
+ }
+
+ return Schedule::fromArray($decodedData);
+ }
+
+ /**
+ * Get all schedules for a queue.
+ *
+ * @param Queue $queue
+ * @return Schedule[]
+ */
+ public function getSchedules(Queue $queue): array
+ {
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $all = $this->connection->hashGetAll($schedulesKey);
+
+ $schedules = [];
+ foreach ($all as $id => $data) {
+ $decodedData = \json_decode($data, true);
+ if ($decodedData === null && \json_last_error() !== JSON_ERROR_NONE) {
+ // Skip corrupted schedule data rather than failing completely
+ continue;
+ }
+ $schedules[$id] = Schedule::fromArray($decodedData);
+ }
+
+ return $schedules;
+ }
+
+ /**
+ * Pause a schedule.
+ *
+ * @param Queue $queue
+ * @param string $scheduleId
+ * @return bool
+ */
+ public function pauseSchedule(Queue $queue, string $scheduleId): bool
+ {
+ $schedule = $this->getSchedule($queue, $scheduleId);
+ if ($schedule === null) {
+ return false;
+ }
+
+ $paused = $schedule->pause();
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+
+ $encodedSchedule = \json_encode($paused->toArray());
+ if ($encodedSchedule === false) {
+ throw new \RuntimeException('Failed to encode schedule data: ' . \json_last_error_msg());
+ }
+
+ // Update schedule and remove from next execution queue
+ $this->connection->hashSet($schedulesKey, $scheduleId, $encodedSchedule);
+ $this->connection->sortedSetRemove($nextKey, $scheduleId);
+
+ return true;
+ }
+
+ /**
+ * Resume a paused schedule.
+ *
+ * @param Queue $queue
+ * @param string $scheduleId
+ * @return bool
+ */
+ public function resumeSchedule(Queue $queue, string $scheduleId): bool
+ {
+ $schedule = $this->getSchedule($queue, $scheduleId);
+ if ($schedule === null) {
+ return false;
+ }
+
+ $resumed = $schedule->resume();
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+
+ $encodedSchedule = \json_encode($resumed->toArray());
+ if ($encodedSchedule === false) {
+ throw new \RuntimeException('Failed to encode schedule data: ' . \json_last_error_msg());
+ }
+
+ // Update schedule and add next execution time
+ $this->connection->hashSet($schedulesKey, $scheduleId, $encodedSchedule);
+
+ $nextRun = $resumed->getNextRunTime();
+ if ($nextRun !== null) {
+ $this->connection->sortedSetAdd($nextKey, (float)($nextRun * 1000), $scheduleId);
+ }
+
+ return true;
+ }
+
+
+ /**
+ * Get stream information.
+ *
+ * @param Queue $queue
+ * @return array
+ */
+ public function getStreamInfo(Queue $queue): array
+ {
+ return $this->connection->streamInfo($this->getStreamKey($queue));
+ }
+
+ /**
+ * Get consumer group information.
+ *
+ * @param Queue $queue
+ * @return array
+ */
+ public function getGroupInfo(Queue $queue): array
+ {
+ $groups = $this->connection->streamGroupInfo($this->getStreamKey($queue));
+ $groupName = $this->getGroupName($queue);
+
+ foreach ($groups as $group) {
+ if (($group['name'] ?? '') === $groupName) {
+ return $group;
+ }
+ }
+
+ return [];
+ }
+
+ /**
+ * Get consumers information for the queue's consumer group.
+ *
+ * @param Queue $queue
+ * @return array
+ */
+ public function getConsumersInfo(Queue $queue): array
+ {
+ return $this->connection->streamConsumersInfo(
+ $this->getStreamKey($queue),
+ $this->getGroupName($queue)
+ );
+ }
+
+ /**
+ * Get the consumer lag (messages waiting to be delivered).
+ *
+ * @param Queue $queue
+ * @return int
+ */
+ public function getLag(Queue $queue): int
+ {
+ $groupInfo = $this->getGroupInfo($queue);
+ return $groupInfo['lag'] ?? 0;
+ }
+
+ /**
+ * Get count of delayed jobs.
+ *
+ * @param Queue $queue
+ * @return int
+ */
+ public function getDelayedCount(Queue $queue): int
+ {
+ return $this->connection->sortedSetSize($this->getDelayedKey($queue));
+ }
+
+ /**
+ * Get pending message count (messages being processed).
+ *
+ * @param Queue $queue
+ * @return int
+ */
+ public function getPendingCount(Queue $queue): int
+ {
+ $pending = $this->connection->streamPendingSummary(
+ $this->getStreamKey($queue),
+ $this->getGroupName($queue)
+ );
+
+ return $pending[0] ?? 0;
+ }
+
+ /**
+ * Get messages from stream (for replay/history).
+ *
+ * @param Queue $queue
+ * @param string $start Start ID ('-' for minimum)
+ * @param string $end End ID ('+' for maximum)
+ * @param int|null $count Max messages
+ * @return Message[]
+ */
+ public function getMessages(Queue $queue, string $start = '-', string $end = '+', ?int $count = null): array
+ {
+ $entries = $this->connection->streamRange($this->getStreamKey($queue), $start, $end, $count);
+
+ $messages = [];
+ foreach ($entries as $id => $fields) {
+ $data = \json_decode($fields['data'] ?? '{}', true);
+ if ($data === null && \json_last_error() !== JSON_ERROR_NONE) {
+ // Skip corrupted message data
+ continue;
+ }
+ $data['streamId'] = $id;
+ $messages[] = new Message($data);
+ }
+
+ return $messages;
+ }
+
+ /**
+ * Get a specific message by ID.
+ *
+ * @param Queue $queue
+ * @param string $id Stream entry ID
+ * @return Message|null
+ */
+ public function getMessage(Queue $queue, string $id): ?Message
+ {
+ $entries = $this->connection->streamRange($this->getStreamKey($queue), $id, $id, 1);
+
+ if (empty($entries)) {
+ return null;
+ }
+
+ $fields = \reset($entries);
+ $data = \json_decode($fields['data'] ?? '{}', true);
+ if ($data === null && \json_last_error() !== JSON_ERROR_NONE) {
+ throw new \RuntimeException('Failed to decode message data: ' . \json_last_error_msg());
+ }
+ $data['streamId'] = $id;
+
+ return new Message($data);
+ }
+
+ /**
+ * Manually trim the stream.
+ *
+ * @param Queue $queue
+ * @param int $maxLen Maximum length to keep
+ * @return int Number of entries trimmed
+ */
+ public function trimStream(Queue $queue, int $maxLen): int
+ {
+ // Use exact trimming (not approximate) for manual trim operations
+ return $this->connection->streamTrim($this->getStreamKey($queue), $maxLen, false);
+ }
+
+ /**
+ * Delete a consumer from the consumer group.
+ *
+ * @param Queue $queue
+ * @param string $consumerId
+ * @return int Number of pending messages that were owned by the consumer
+ */
+ public function deleteConsumer(Queue $queue, string $consumerId): int
+ {
+ return $this->connection->streamDeleteConsumer(
+ $this->getStreamKey($queue),
+ $this->getGroupName($queue),
+ $consumerId
+ );
+ }
+
+
+ /**
+ * Process a single stream entry.
+ */
+ private function processEntry(
+ string $entryId,
+ array $fields,
+ string $streamKey,
+ string $groupName,
+ string $dlqKey,
+ Queue $queue,
+ callable $messageCallback,
+ callable $successCallback,
+ callable $errorCallback
+ ): void {
+ $messageData = \json_decode($fields['data'] ?? '{}', true);
+ $messageData['timestamp'] = (int)($messageData['timestamp'] ?? \time());
+ $messageData['streamId'] = $entryId;
+ $retryCount = (int)($fields['retry_count'] ?? 0);
+
+ $message = new Message($messageData);
+
+ // Update stats
+ $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.total");
+ $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.processing");
+
+ try {
+ $messageCallback($message);
+
+ // Acknowledge the message
+ $this->connection->streamAck($streamKey, $groupName, $entryId);
+
+ $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.success");
+ $successCallback($message);
+ } catch (\Throwable $th) {
+ // Acknowledge the failed message to remove from pending
+ $this->connection->streamAck($streamKey, $groupName, $entryId);
+
+ if ($retryCount < $this->maxRetries) {
+ // Re-add to stream with incremented retry count
+ $fields['retry_count'] = (string)($retryCount + 1);
+ $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+ } else {
+ // Move to DLQ
+ $fields['error'] = $th->getMessage();
+ $fields['failed_at'] = (string)\time();
+ $this->connection->streamAdd($dlqKey, $fields, '*', $this->maxStreamLength);
+ $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.failed");
+ }
+
+ $errorCallback($message, $th);
+ } finally {
+ $this->connection->decrement("{$queue->namespace}.stats.{$queue->name}.processing");
+ }
+ }
+
+ /**
+ * Ensure consumer group exists.
+ */
+ private function ensureConsumerGroup(string $streamKey, string $groupName): void
+ {
+ $this->connection->streamCreateGroup($streamKey, $groupName, '0', true);
+ }
+
+ /**
+ * Claim abandoned messages from crashed consumers.
+ */
+ private function claimAbandonedMessages(
+ string $streamKey,
+ string $groupName,
+ string $dlqKey,
+ Queue $queue,
+ callable $messageCallback,
+ callable $successCallback,
+ callable $errorCallback
+ ): void {
+ $result = $this->connection->streamAutoClaim(
+ $streamKey,
+ $groupName,
+ $this->consumerId,
+ $this->claimIdleTimeMs,
+ '0-0',
+ 10
+ );
+
+ if (empty($result) || empty($result[1])) {
+ return;
+ }
+
+ // Process claimed messages
+ foreach ($result[1] as $entryId => $fields) {
+ if (\is_array($fields)) {
+ $this->processEntry($entryId, $fields, $streamKey, $groupName, $dlqKey, $queue, $messageCallback, $successCallback, $errorCallback);
+ }
+ }
+ }
+
+ /**
+ * Process delayed jobs that are now due.
+ */
+ private function processDelayedJobs(Queue $queue, string $delayedKey, string $streamKey): void
+ {
+ $now = (int)(\microtime(true) * 1000);
+
+ // Only check periodically
+ if ($now - $this->lastDelayedCheck < self::DELAYED_CHECK_INTERVAL_MS) {
+ return;
+ }
+ $this->lastDelayedCheck = $now;
+
+ // Get jobs that are now due (read without removing to prevent job loss on crash)
+ $dueJobs = $this->connection->sortedSetRangeByScore($delayedKey, 0, (float)$now, 100);
+
+ foreach ($dueJobs as $member => $score) {
+ // In zRangeByScore with scores, member is the value and score is the key when WITHSCORES is used
+ // But without WITHSCORES option, we get a simple array of members
+ $jobData = is_string($member) ? $member : $score;
+ $fields = \json_decode($jobData, true);
+
+ if ($fields && \json_last_error() === JSON_ERROR_NONE) {
+ // Add to stream first - if this fails, job stays in delayed set for retry
+ $streamId = $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+
+ // Only remove from delayed set after successful add to prevent job loss
+ if ($streamId !== false) {
+ $this->connection->sortedSetRemove($delayedKey, $jobData);
+ }
+ }
+ }
+ }
+
+ /**
+ * Process scheduled jobs that are now due.
+ */
+ private function processScheduledJobs(Queue $queue): void
+ {
+ $now = (int)(\microtime(true) * 1000);
+
+ // Only check periodically
+ if ($now - $this->lastScheduleCheck < self::SCHEDULE_CHECK_INTERVAL_MS) {
+ return;
+ }
+ $this->lastScheduleCheck = $now;
+
+ $schedulesKey = $this->getSchedulesKey($queue);
+ $nextKey = $this->getScheduleNextKey($queue);
+ $streamKey = $this->getStreamKey($queue);
+
+ // Get schedules that are due (read without removing to prevent duplicate processing)
+ $dueScheduleIds = $this->connection->sortedSetRangeByScore($nextKey, 0, (float)$now, 100);
+
+ foreach ($dueScheduleIds as $scheduleId) {
+ $scheduleData = $this->connection->hashGet($schedulesKey, $scheduleId);
+ if ($scheduleData === false) {
+ // Schedule was deleted, remove from next run queue
+ $this->connection->sortedSetRemove($nextKey, $scheduleId);
+ continue;
+ }
+
+ $decodedData = \json_decode($scheduleData, true);
+ if ($decodedData === null && \json_last_error() !== JSON_ERROR_NONE) {
+ // Invalid JSON, remove from queue to prevent infinite loop
+ $this->connection->sortedSetRemove($nextKey, $scheduleId);
+ continue;
+ }
+
+ $schedule = Schedule::fromArray($decodedData);
+
+ // Remove from next run queue first (atomic operation)
+ $removed = $this->connection->sortedSetRemove($nextKey, $scheduleId);
+
+ // Skip if another consumer already processed this (removed = 0)
+ if ($removed === 0) {
+ continue;
+ }
+
+ // Skip if not active (paused, max runs reached, etc.)
+ if (!$schedule->isActive()) {
+ continue;
+ }
+
+ // Enqueue the job
+ $messageData = [
+ 'pid' => \uniqid(more_entropy: true),
+ 'queue' => $queue->name,
+ 'timestamp' => \time(),
+ 'payload' => $schedule->payload,
+ 'schedule_id' => $schedule->id,
+ ];
+
+ $encodedData = \json_encode($messageData);
+ if ($encodedData === false) {
+ throw new \RuntimeException('Failed to encode message data: ' . \json_last_error_msg());
+ }
+
+ $fields = [
+ 'data' => $encodedData,
+ 'retry_count' => '0',
+ ];
+
+ $this->connection->streamAdd($streamKey, $fields, '*', $this->maxStreamLength);
+
+ // Update run count
+ $updated = $schedule->incrementRunCount();
+ $encodedUpdated = \json_encode($updated->toArray());
+ if ($encodedUpdated === false) {
+ throw new \RuntimeException('Failed to encode schedule data: ' . \json_last_error_msg());
+ }
+ $this->connection->hashSet($schedulesKey, $scheduleId, $encodedUpdated);
+
+ // Calculate and store next run time
+ $nextRun = $updated->getNextRunTime(\time());
+ if ($nextRun !== null && $updated->isActive()) {
+ $this->connection->sortedSetAdd($nextKey, (float)($nextRun * 1000), $scheduleId);
+ }
+ }
+ }
+
+
+ private function getStreamKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.stream.{$queue->name}";
+ }
+
+ private function getDlqKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.stream.{$queue->name}.dlq";
+ }
+
+ private function getDelayedKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.delayed.{$queue->name}";
+ }
+
+ private function getGroupName(Queue $queue): string
+ {
+ return "{$queue->namespace}.group.{$queue->name}";
+ }
+
+ private function getSchedulesKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.schedules.{$queue->name}";
+ }
+
+ private function getScheduleNextKey(Queue $queue): string
+ {
+ return "{$queue->namespace}.schedule.next.{$queue->name}";
+ }
+}
diff --git a/src/Queue/Connection/RedisStream.php b/src/Queue/Connection/RedisStream.php
new file mode 100644
index 0000000..acce640
--- /dev/null
+++ b/src/Queue/Connection/RedisStream.php
@@ -0,0 +1,463 @@
+getRedis();
+
+ if ($maxLen !== null) {
+ // Use exact MAXLEN (approximate=false) by default for reliable trimming
+ // Note: approximate trimming with phpredis may not trim immediately
+ return $redis->xAdd($stream, $id, $fields, $maxLen, $approximate);
+ }
+
+ return $redis->xAdd($stream, $id, $fields);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamCreateGroup(string $stream, string $group, string $id = '0', bool $mkstream = true): bool
+ {
+ $redis = $this->getRedis();
+
+ try {
+ $result = $redis->xGroup('CREATE', $stream, $group, $id, $mkstream);
+ // phpredis may return false instead of throwing on BUSYGROUP
+ if ($result === false) {
+ $error = $redis->getLastError();
+ $redis->clearLastError();
+ if ($error !== null && str_contains($error, 'BUSYGROUP')) {
+ return true;
+ }
+ return false;
+ }
+ return (bool)$result;
+ } catch (\RedisException $e) {
+ // Group already exists - BUSYGROUP error
+ if (str_contains($e->getMessage(), 'BUSYGROUP')) {
+ return true;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDestroyGroup(string $stream, string $group): bool
+ {
+ $redis = $this->getRedis();
+
+ try {
+ $result = $redis->xGroup('DESTROY', $stream, $group);
+ // phpredis may return false instead of throwing on errors
+ if ($result === false) {
+ $error = $redis->getLastError();
+ $redis->clearLastError();
+ // Stream doesn't exist or group doesn't exist - treat as success (already destroyed)
+ if ($error !== null && (
+ str_contains($error, 'NOGROUP') ||
+ str_contains($error, 'no such key') ||
+ str_contains($error, 'key to exist')
+ )) {
+ return true;
+ }
+ return false;
+ }
+ // Result of 0 means the group didn't exist - that's fine, it's "destroyed"
+ return true;
+ } catch (\RedisException $e) {
+ // Group doesn't exist
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return true;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDeleteConsumer(string $stream, string $group, string $consumer): int
+ {
+ try {
+ return $this->getRedis()->xGroup('DELCONSUMER', $stream, $group, $consumer);
+ } catch (\RedisException $e) {
+ // Group doesn't exist
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return 0;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamReadGroup(
+ string $group,
+ string $consumer,
+ array $streams,
+ int $count = 1,
+ int $block = 0,
+ bool $noack = false
+ ): array|false {
+ $streamIds = [];
+ foreach ($streams as $stream) {
+ $streamIds[$stream] = '>'; // Read only new messages
+ }
+
+ // Build options array for xReadGroup
+ $options = [];
+ if ($noack) {
+ $options['NOACK'] = true;
+ }
+
+ $redis = $this->getRedis();
+
+ // phpredis doesn't support NOACK in xReadGroup directly, so we need to use rawCommand
+ if ($noack) {
+ // Build the command manually for NOACK support
+ $command = ['XREADGROUP', 'GROUP', $group, $consumer];
+ if ($count > 0) {
+ $command[] = 'COUNT';
+ $command[] = (string)$count;
+ }
+ if ($block > 0) {
+ $command[] = 'BLOCK';
+ $command[] = (string)$block;
+ }
+ $command[] = 'NOACK';
+ $command[] = 'STREAMS';
+
+ foreach ($streamIds as $stream => $id) {
+ $command[] = $stream;
+ }
+ foreach ($streamIds as $stream => $id) {
+ $command[] = $id;
+ }
+
+ try {
+ $result = $redis->rawCommand(...$command);
+ return $result ?: false;
+ } catch (\RedisException $e) {
+ return false;
+ }
+ }
+
+ $result = $redis->xReadGroup(
+ $group,
+ $consumer,
+ $streamIds,
+ $count,
+ $block > 0 ? $block : null
+ );
+
+ return $result ?: false;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamAck(string $stream, string $group, string|array $ids): int
+ {
+ $ids = is_array($ids) ? $ids : [$ids];
+ return $this->getRedis()->xAck($stream, $group, $ids);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamPendingSummary(string $stream, string $group): array
+ {
+ try {
+ return $this->getRedis()->xPending($stream, $group) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamPending(
+ string $stream,
+ string $group,
+ string $start = '-',
+ string $end = '+',
+ int $count = 100,
+ ?string $consumer = null
+ ): array {
+ try {
+ if ($consumer !== null) {
+ return $this->getRedis()->xPending($stream, $group, $start, $end, $count, $consumer) ?: [];
+ }
+ return $this->getRedis()->xPending($stream, $group, $start, $end, $count) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamClaim(
+ string $stream,
+ string $group,
+ string $consumer,
+ int $minIdleTime,
+ array $ids,
+ bool $justId = false
+ ): array {
+ try {
+ $options = $justId ? ['JUSTID'] : [];
+ return $this->getRedis()->xClaim($stream, $group, $consumer, $minIdleTime, $ids, $options) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamAutoClaim(
+ string $stream,
+ string $group,
+ string $consumer,
+ int $minIdleTime,
+ string $start = '0-0',
+ int $count = 100
+ ): array {
+ try {
+ $result = $this->getRedis()->xAutoClaim($stream, $group, $consumer, $minIdleTime, $start, $count);
+ return $result ?: ['0-0', [], []];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return ['0-0', [], []];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDel(string $stream, array $ids): int
+ {
+ if (empty($ids)) {
+ return 0;
+ }
+ return $this->getRedis()->xDel($stream, $ids);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamLen(string $stream): int
+ {
+ return $this->getRedis()->xLen($stream);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamTrim(string $stream, int $maxLen, bool $approximate = true): int
+ {
+ return $this->getRedis()->xTrim($stream, $maxLen, $approximate);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamInfo(string $stream): array
+ {
+ try {
+ return $this->getRedis()->xInfo('STREAM', $stream) ?: [];
+ } catch (\RedisException $e) {
+ // Stream doesn't exist yet
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamGroupInfo(string $stream): array
+ {
+ try {
+ return $this->getRedis()->xInfo('GROUPS', $stream) ?: [];
+ } catch (\RedisException $e) {
+ // Stream doesn't exist yet
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamConsumersInfo(string $stream, string $group): array
+ {
+ try {
+ return $this->getRedis()->xInfo('CONSUMERS', $stream, $group) ?: [];
+ } catch (\RedisException $e) {
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamRange(string $stream, string $start = '-', string $end = '+', ?int $count = null): array
+ {
+ if ($count !== null) {
+ return $this->getRedis()->xRange($stream, $start, $end, $count) ?: [];
+ }
+ return $this->getRedis()->xRange($stream, $start, $end) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamRevRange(string $stream, string $end = '+', string $start = '-', ?int $count = null): array
+ {
+ if ($count !== null) {
+ return $this->getRedis()->xRevRange($stream, $end, $start, $count) ?: [];
+ }
+ return $this->getRedis()->xRevRange($stream, $end, $start) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetAdd(string $key, float $score, string $member): int
+ {
+ return $this->getRedis()->zAdd($key, $score, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetPopByScore(string $key, float $min, float $max, int $limit = 100): array
+ {
+ $redis = $this->getRedis();
+
+ // Limit to prevent Lua stack overflow (unpack has ~8000 item limit)
+ if ($limit > 5000) {
+ $limit = 5000;
+ }
+
+ // Use Lua script for atomic pop by score
+ $script = <<<'LUA'
+local members = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3])
+if #members > 0 then
+ redis.call('ZREM', KEYS[1], unpack(members))
+end
+return members
+LUA;
+
+ $result = $redis->eval($script, [$key, (string)$min, (string)$max, (string)$limit], 1);
+ return $result ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetRangeByScore(string $key, float $min, float $max, ?int $limit = null): array
+ {
+ $options = [];
+ if ($limit !== null) {
+ $options['limit'] = [0, $limit];
+ }
+ return $this->getRedis()->zRangeByScore($key, (string)$min, (string)$max, $options) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetSize(string $key): int
+ {
+ return $this->getRedis()->zCard($key);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetRemove(string $key, string $member): int
+ {
+ return $this->getRedis()->zRem($key, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetScore(string $key, string $member): float|false
+ {
+ return $this->getRedis()->zScore($key, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashSet(string $key, string $field, string $value): bool
+ {
+ return $this->getRedis()->hSet($key, $field, $value) !== false;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashGet(string $key, string $field): string|false
+ {
+ return $this->getRedis()->hGet($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashGetAll(string $key): array
+ {
+ return $this->getRedis()->hGetAll($key) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashDel(string $key, string $field): int
+ {
+ return $this->getRedis()->hDel($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashExists(string $key, string $field): bool
+ {
+ return $this->getRedis()->hExists($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashLen(string $key): int
+ {
+ return $this->getRedis()->hLen($key);
+ }
+}
diff --git a/src/Queue/Connection/RedisStreamCluster.php b/src/Queue/Connection/RedisStreamCluster.php
new file mode 100644
index 0000000..d297708
--- /dev/null
+++ b/src/Queue/Connection/RedisStreamCluster.php
@@ -0,0 +1,422 @@
+getRedis();
+
+ if ($maxLen !== null) {
+ // Use exact MAXLEN (approximate=false) by default for reliable trimming
+ return $redis->xAdd($stream, $id, $fields, $maxLen, $approximate);
+ }
+
+ return $redis->xAdd($stream, $id, $fields);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamCreateGroup(string $stream, string $group, string $id = '0', bool $mkstream = true): bool
+ {
+ $redis = $this->getRedis();
+
+ try {
+ $result = $redis->xGroup('CREATE', $stream, $group, $id, $mkstream);
+ // phpredis may return false instead of throwing on BUSYGROUP
+ if ($result === false) {
+ $error = $redis->getLastError();
+ $redis->clearLastError();
+ if ($error !== null && str_contains($error, 'BUSYGROUP')) {
+ return true;
+ }
+ return false;
+ }
+ return (bool)$result;
+ } catch (\RedisException $e) {
+ // Group already exists - BUSYGROUP error
+ if (str_contains($e->getMessage(), 'BUSYGROUP')) {
+ return true;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDestroyGroup(string $stream, string $group): bool
+ {
+ $redis = $this->getRedis();
+
+ try {
+ $result = $redis->xGroup('DESTROY', $stream, $group);
+ // phpredis may return false instead of throwing on errors
+ if ($result === false) {
+ $error = $redis->getLastError();
+ $redis->clearLastError();
+ // Stream doesn't exist or group doesn't exist - treat as success (already destroyed)
+ if ($error !== null && (
+ str_contains($error, 'NOGROUP') ||
+ str_contains($error, 'no such key') ||
+ str_contains($error, 'key to exist')
+ )) {
+ return true;
+ }
+ return false;
+ }
+ // Result of 0 means the group didn't exist - that's fine, it's "destroyed"
+ return true;
+ } catch (\RedisException $e) {
+ // Group doesn't exist
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return true;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDeleteConsumer(string $stream, string $group, string $consumer): int
+ {
+ try {
+ return $this->getRedis()->xGroup('DELCONSUMER', $stream, $group, $consumer);
+ } catch (\RedisException $e) {
+ // Group doesn't exist
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return 0;
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamReadGroup(
+ string $group,
+ string $consumer,
+ array $streams,
+ int $count = 1,
+ int $block = 0,
+ bool $noack = false
+ ): array|false {
+ $streamIds = [];
+ foreach ($streams as $stream) {
+ $streamIds[$stream] = '>'; // Read only new messages
+ }
+
+ $result = $this->getRedis()->xReadGroup(
+ $group,
+ $consumer,
+ $streamIds,
+ $count,
+ $block > 0 ? $block : null
+ );
+
+ return $result ?: false;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamAck(string $stream, string $group, string|array $ids): int
+ {
+ $ids = is_array($ids) ? $ids : [$ids];
+ return $this->getRedis()->xAck($stream, $group, $ids);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamPendingSummary(string $stream, string $group): array
+ {
+ try {
+ return $this->getRedis()->xPending($stream, $group) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamPending(
+ string $stream,
+ string $group,
+ string $start = '-',
+ string $end = '+',
+ int $count = 100,
+ ?string $consumer = null
+ ): array {
+ try {
+ if ($consumer !== null) {
+ return $this->getRedis()->xPending($stream, $group, $start, $end, $count, $consumer) ?: [];
+ }
+ return $this->getRedis()->xPending($stream, $group, $start, $end, $count) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamClaim(
+ string $stream,
+ string $group,
+ string $consumer,
+ int $minIdleTime,
+ array $ids,
+ bool $justId = false
+ ): array {
+ try {
+ $options = $justId ? ['JUSTID'] : [];
+ return $this->getRedis()->xClaim($stream, $group, $consumer, $minIdleTime, $ids, $options) ?: [];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return [];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamAutoClaim(
+ string $stream,
+ string $group,
+ string $consumer,
+ int $minIdleTime,
+ string $start = '0-0',
+ int $count = 100
+ ): array {
+ try {
+ $result = $this->getRedis()->xAutoClaim($stream, $group, $consumer, $minIdleTime, $start, $count);
+ return $result ?: ['0-0', [], []];
+ } catch (\RedisException $e) {
+ if (str_contains($e->getMessage(), 'NOGROUP')) {
+ return ['0-0', [], []];
+ }
+ throw $e;
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamDel(string $stream, array $ids): int
+ {
+ if (empty($ids)) {
+ return 0;
+ }
+ return $this->getRedis()->xDel($stream, $ids);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamLen(string $stream): int
+ {
+ return $this->getRedis()->xLen($stream);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamTrim(string $stream, int $maxLen, bool $approximate = true): int
+ {
+ return $this->getRedis()->xTrim($stream, $maxLen, $approximate);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamInfo(string $stream): array
+ {
+ try {
+ return $this->getRedis()->xInfo('STREAM', $stream) ?: [];
+ } catch (\RedisException $e) {
+ // Stream doesn't exist yet
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamGroupInfo(string $stream): array
+ {
+ try {
+ return $this->getRedis()->xInfo('GROUPS', $stream) ?: [];
+ } catch (\RedisException $e) {
+ // Stream doesn't exist yet
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamConsumersInfo(string $stream, string $group): array
+ {
+ try {
+ return $this->getRedis()->xInfo('CONSUMERS', $stream, $group) ?: [];
+ } catch (\RedisException $e) {
+ return [];
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamRange(string $stream, string $start = '-', string $end = '+', ?int $count = null): array
+ {
+ if ($count !== null) {
+ return $this->getRedis()->xRange($stream, $start, $end, $count) ?: [];
+ }
+ return $this->getRedis()->xRange($stream, $start, $end) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function streamRevRange(string $stream, string $end = '+', string $start = '-', ?int $count = null): array
+ {
+ if ($count !== null) {
+ return $this->getRedis()->xRevRange($stream, $end, $start, $count) ?: [];
+ }
+ return $this->getRedis()->xRevRange($stream, $end, $start) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetAdd(string $key, float $score, string $member): int
+ {
+ return $this->getRedis()->zAdd($key, $score, $member);
+ }
+
+ /**
+ * @inheritDoc
+ *
+ * Note: In cluster mode, Lua scripts must use keys from the same hash slot.
+ * This works because we use a single key.
+ */
+ public function sortedSetPopByScore(string $key, float $min, float $max, int $limit = 100): array
+ {
+ $redis = $this->getRedis();
+
+ // Use Lua script for atomic pop by score
+ $script = <<<'LUA'
+local members = redis.call('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', 0, ARGV[3])
+if #members > 0 then
+ redis.call('ZREM', KEYS[1], unpack(members))
+end
+return members
+LUA;
+
+ $result = $redis->eval($script, [$key, (string)$min, (string)$max, (string)$limit], 1);
+ return $result ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetRangeByScore(string $key, float $min, float $max, ?int $limit = null): array
+ {
+ $options = [];
+ if ($limit !== null) {
+ $options['limit'] = [0, $limit];
+ }
+ return $this->getRedis()->zRangeByScore($key, (string)$min, (string)$max, $options) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetSize(string $key): int
+ {
+ return $this->getRedis()->zCard($key);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetRemove(string $key, string $member): int
+ {
+ return $this->getRedis()->zRem($key, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sortedSetScore(string $key, string $member): float|false
+ {
+ return $this->getRedis()->zScore($key, $member);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashSet(string $key, string $field, string $value): bool
+ {
+ return $this->getRedis()->hSet($key, $field, $value) !== false;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashGet(string $key, string $field): string|false
+ {
+ return $this->getRedis()->hGet($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashGetAll(string $key): array
+ {
+ return $this->getRedis()->hGetAll($key) ?: [];
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashDel(string $key, string $field): int
+ {
+ return $this->getRedis()->hDel($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashExists(string $key, string $field): bool
+ {
+ return $this->getRedis()->hExists($key, $field);
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function hashLen(string $key): int
+ {
+ return $this->getRedis()->hLen($key);
+ }
+}
diff --git a/src/Queue/Schedule.php b/src/Queue/Schedule.php
new file mode 100644
index 0000000..0d521f5
--- /dev/null
+++ b/src/Queue/Schedule.php
@@ -0,0 +1,278 @@
+isActive()) {
+ return null;
+ }
+
+ $now = time();
+ $baseTime = $lastRun ?? $now;
+
+ // Apply startAt constraint
+ if ($this->startAt !== null && $baseTime < $this->startAt) {
+ $baseTime = $this->startAt;
+ }
+
+ if ($this->cron !== null) {
+ // Cron-based schedule
+ $cronExpression = new CronExpression($this->cron);
+ $nextRun = $cronExpression->getNextRunDate(\DateTime::createFromFormat('U', (string)$baseTime))->getTimestamp();
+ } else {
+ // Interval-based schedule
+ if ($lastRun === null) {
+ // First run - use startAt or now
+ $nextRun = $this->startAt ?? $now;
+ } else {
+ // Subsequent runs - add interval
+ $nextRun = $lastRun + $this->interval;
+
+ // If we've passed the next run time, schedule for next interval from now
+ if ($nextRun < $now) {
+ $elapsed = $now - $lastRun;
+ $intervals = (int)ceil($elapsed / $this->interval);
+ $nextRun = $lastRun + ($intervals * $this->interval);
+ }
+ }
+ }
+
+ // Check endAt constraint
+ if ($this->endAt !== null && $nextRun > $this->endAt) {
+ return null;
+ }
+
+ return $nextRun;
+ }
+
+ /**
+ * Check if the schedule is still active (not exceeded limits).
+ *
+ * @return bool
+ */
+ public function isActive(): bool
+ {
+ if ($this->paused) {
+ return false;
+ }
+
+ // Check max runs
+ if ($this->maxRuns !== null && $this->runCount >= $this->maxRuns) {
+ return false;
+ }
+
+ // Check end time
+ if ($this->endAt !== null && time() > $this->endAt) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Check if the schedule is paused.
+ *
+ * @return bool
+ */
+ public function isPaused(): bool
+ {
+ return $this->paused;
+ }
+
+ /**
+ * Get the current run count.
+ *
+ * @return int
+ */
+ public function getRunCount(): int
+ {
+ return $this->runCount;
+ }
+
+ /**
+ * Increment the run count and return a new instance.
+ *
+ * @return self
+ */
+ public function incrementRunCount(): self
+ {
+ return new self(
+ $this->id,
+ $this->payload,
+ $this->cron,
+ $this->interval,
+ $this->startAt,
+ $this->endAt,
+ $this->maxRuns,
+ $this->runCount + 1,
+ $this->paused
+ );
+ }
+
+ /**
+ * Create a paused copy of this schedule.
+ *
+ * @return self
+ */
+ public function pause(): self
+ {
+ return new self(
+ $this->id,
+ $this->payload,
+ $this->cron,
+ $this->interval,
+ $this->startAt,
+ $this->endAt,
+ $this->maxRuns,
+ $this->runCount,
+ true
+ );
+ }
+
+ /**
+ * Create a resumed copy of this schedule.
+ *
+ * @return self
+ */
+ public function resume(): self
+ {
+ return new self(
+ $this->id,
+ $this->payload,
+ $this->cron,
+ $this->interval,
+ $this->startAt,
+ $this->endAt,
+ $this->maxRuns,
+ $this->runCount,
+ false
+ );
+ }
+
+ /**
+ * Serialize to array for storage.
+ *
+ * @return array
+ */
+ public function toArray(): array
+ {
+ return [
+ 'id' => $this->id,
+ 'payload' => $this->payload,
+ 'cron' => $this->cron,
+ 'interval' => $this->interval,
+ 'startAt' => $this->startAt,
+ 'endAt' => $this->endAt,
+ 'maxRuns' => $this->maxRuns,
+ 'runCount' => $this->runCount,
+ 'paused' => $this->paused,
+ ];
+ }
+
+ /**
+ * Deserialize from array.
+ *
+ * @param array $data
+ * @return self
+ */
+ public static function fromArray(array $data): self
+ {
+ return new self(
+ $data['id'],
+ $data['payload'],
+ $data['cron'] ?? null,
+ $data['interval'] ?? null,
+ $data['startAt'] ?? null,
+ $data['endAt'] ?? null,
+ $data['maxRuns'] ?? null,
+ $data['runCount'] ?? 0,
+ $data['paused'] ?? false,
+ );
+ }
+
+ /**
+ * Get a human-readable description of the schedule.
+ *
+ * @return string
+ */
+ public function getDescription(): string
+ {
+ if ($this->cron !== null) {
+ return "Cron: {$this->cron}";
+ }
+
+ $interval = $this->interval;
+
+ // Try to find the largest clean unit that divides evenly
+ if ($interval >= 86400 && $interval % 86400 === 0) {
+ $days = (int)($interval / 86400);
+ return "Every {$days} day" . ($days !== 1 ? 's' : '');
+ }
+ if ($interval >= 3600 && $interval % 3600 === 0) {
+ $hours = (int)($interval / 3600);
+ return "Every {$hours} hour" . ($hours !== 1 ? 's' : '');
+ }
+ if ($interval >= 60 && $interval % 60 === 0) {
+ $minutes = (int)($interval / 60);
+ return "Every {$minutes} minute" . ($minutes !== 1 ? 's' : '');
+ }
+
+ return "Every {$interval} second" . ($interval !== 1 ? 's' : '');
+ }
+}
diff --git a/src/Queue/StreamConnection.php b/src/Queue/StreamConnection.php
new file mode 100644
index 0000000..ab8d2e1
--- /dev/null
+++ b/src/Queue/StreamConnection.php
@@ -0,0 +1,332 @@
+getPublisher();
+
+ // Enqueue a delayed job
+ $result = $publisher->enqueueDelayed($this->getQueue(), [
+ 'type' => 'test_string',
+ 'value' => 'delayed job'
+ ], 1);
+
+ $this->assertTrue($result);
+
+ // Check delayed count
+ $delayedCount = $publisher->getDelayedCount($this->getQueue());
+ $this->assertGreaterThanOrEqual(1, $delayedCount);
+
+ // Wait for the job to be processed
+ // Worker's consume loop has a 2s block timeout + 1s delayed check interval
+ sleep(5);
+
+ // Delayed count should be back to 0
+ $delayedCount = $publisher->getDelayedCount($this->getQueue());
+ $this->assertEquals(0, $delayedCount);
+ }
+
+ /**
+ * Test scheduled job enqueueing.
+ */
+ public function testScheduledJobs(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Enqueue a job scheduled for 2 seconds from now
+ $result = $publisher->enqueueAt($this->getQueue(), [
+ 'type' => 'test_string',
+ 'value' => 'scheduled job'
+ ], time() + 2);
+
+ $this->assertTrue($result);
+
+ sleep(3);
+ }
+
+ /**
+ * Test stream observability.
+ */
+ public function testObservability(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Enqueue a job first
+ $publisher->enqueue($this->getQueue(), [
+ 'type' => 'test_string',
+ 'value' => 'observability test'
+ ]);
+
+ sleep(1);
+
+ // Test getStreamInfo
+ $info = $publisher->getStreamInfo($this->getQueue());
+ $this->assertIsArray($info);
+
+ // Test getGroupInfo
+ $groupInfo = $publisher->getGroupInfo($this->getQueue());
+ $this->assertIsArray($groupInfo);
+
+ // Test getConsumersInfo
+ $consumers = $publisher->getConsumersInfo($this->getQueue());
+ $this->assertIsArray($consumers);
+
+ // Test getQueueSize
+ $size = $publisher->getQueueSize($this->getQueue());
+ $this->assertIsInt($size);
+
+ // Test getLag
+ $lag = $publisher->getLag($this->getQueue());
+ $this->assertIsInt($lag);
+ }
+
+ /**
+ * Test message history/replay functionality.
+ */
+ public function testMessageHistory(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Enqueue several jobs
+ for ($i = 0; $i < 5; $i++) {
+ $publisher->enqueue($this->getQueue(), [
+ 'type' => 'test_number',
+ 'value' => $i
+ ]);
+ }
+
+ sleep(1);
+
+ // Get message history
+ $messages = $publisher->getMessages($this->getQueue(), '-', '+', 10);
+ $this->assertIsArray($messages);
+ }
+
+ /**
+ * Test schedule management.
+ */
+ public function testScheduleManagement(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ $schedule = new \Utopia\Queue\Schedule(
+ id: 'e2e-test-schedule',
+ payload: ['type' => 'test_string', 'value' => 'scheduled'],
+ interval: 300
+ );
+
+ // Create schedule
+ $result = $publisher->schedule($this->getQueue(), $schedule);
+ $this->assertTrue($result);
+
+ // Retrieve schedule
+ $retrieved = $publisher->getSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertNotNull($retrieved);
+ $this->assertEquals('e2e-test-schedule', $retrieved->id);
+
+ // Pause schedule
+ $result = $publisher->pauseSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertTrue($result);
+
+ $paused = $publisher->getSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertTrue($paused->isPaused());
+
+ // Resume schedule
+ $result = $publisher->resumeSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertTrue($result);
+
+ $resumed = $publisher->getSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertFalse($resumed->isPaused());
+
+ // List schedules
+ $schedules = $publisher->getSchedules($this->getQueue());
+ $this->assertArrayHasKey('e2e-test-schedule', $schedules);
+
+ // Remove schedule
+ $result = $publisher->unschedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertTrue($result);
+
+ $deleted = $publisher->getSchedule($this->getQueue(), 'e2e-test-schedule');
+ $this->assertNull($deleted);
+ }
+
+ /**
+ * Test cron schedule.
+ */
+ public function testCronSchedule(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ $schedule = new \Utopia\Queue\Schedule(
+ id: 'e2e-cron-schedule',
+ payload: ['type' => 'test_string', 'value' => 'cron job'],
+ cron: '*/5 * * * *'
+ );
+
+ $result = $publisher->schedule($this->getQueue(), $schedule);
+ $this->assertTrue($result);
+
+ $retrieved = $publisher->getSchedule($this->getQueue(), 'e2e-cron-schedule');
+ $this->assertEquals('*/5 * * * *', $retrieved->cron);
+
+ // Cleanup
+ $publisher->unschedule($this->getQueue(), 'e2e-cron-schedule');
+ }
+
+ /**
+ * Test stream trimming.
+ */
+ public function testStreamTrimming(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Enqueue many messages
+ for ($i = 0; $i < 20; $i++) {
+ $publisher->enqueue($this->getQueue(), [
+ 'type' => 'test_number',
+ 'value' => $i
+ ]);
+ }
+
+ sleep(1);
+
+ // Trim the stream
+ $trimmed = $publisher->trimStream($this->getQueue(), 5);
+ $this->assertGreaterThan(0, $trimmed);
+ }
+
+ /**
+ * Test pending count.
+ */
+ public function testPendingCount(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ $pending = $publisher->getPendingCount($this->getQueue());
+ $this->assertIsInt($pending);
+ $this->assertGreaterThanOrEqual(0, $pending);
+ }
+
+ /**
+ * Test queue size with failed jobs.
+ */
+ public function testQueueSizeWithFailedJobs(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ $failedSize = $publisher->getQueueSize($this->getQueue(), true);
+ $this->assertIsInt($failedSize);
+ $this->assertGreaterThanOrEqual(0, $failedSize);
+ }
+
+ /**
+ * Test consumer ID management.
+ */
+ public function testConsumerIdManagement(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Get default consumer ID
+ $defaultId = $publisher->getConsumerId();
+ $this->assertStringStartsWith('worker-', $defaultId);
+
+ // Set custom consumer ID
+ $publisher->setConsumerId('e2e-test-consumer');
+ $this->assertEquals('e2e-test-consumer', $publisher->getConsumerId());
+ }
+
+ /**
+ * Test various payload types are preserved.
+ */
+ public function testPayloadTypes(): void
+ {
+ /** @var RedisStreams $publisher */
+ $publisher = $this->getPublisher();
+
+ // Test complex nested payload
+ $complexPayload = [
+ 'type' => 'test_assoc',
+ 'value' => [
+ 'string' => 'test',
+ 'number' => 123,
+ 'float' => 1.23,
+ 'bool' => true,
+ 'null' => null,
+ 'array' => [1, 2, 3],
+ 'nested' => [
+ 'deep' => 'value'
+ ]
+ ]
+ ];
+
+ $result = $publisher->enqueue($this->getQueue(), $complexPayload);
+ $this->assertTrue($result);
+
+ sleep(1);
+
+ // Verify messages can be retrieved
+ $messages = $publisher->getMessages($this->getQueue(), '-', '+', 1);
+ $this->assertNotEmpty($messages);
+ }
+}
diff --git a/tests/Queue/Unit/RedisStreamConnectionTest.php b/tests/Queue/Unit/RedisStreamConnectionTest.php
new file mode 100644
index 0000000..a809b56
--- /dev/null
+++ b/tests/Queue/Unit/RedisStreamConnectionTest.php
@@ -0,0 +1,507 @@
+connection = new RedisStream('redis', 6379);
+ $this->testPrefix = 'test-' . uniqid() . '-';
+ }
+
+ protected function tearDown(): void
+ {
+ // Clean up test keys
+ $this->cleanupTestKeys();
+ $this->connection->close();
+ }
+
+ private function cleanupTestKeys(): void
+ {
+ $redis = new \Redis();
+ $redis->connect('redis', 6379);
+
+ $keys = $redis->keys($this->testPrefix . '*');
+ if (!empty($keys)) {
+ $redis->del($keys);
+ }
+
+ $redis->close();
+ }
+
+
+ public function testStreamAdd(): void
+ {
+ $stream = $this->testPrefix . 'stream';
+
+ $id = $this->connection->streamAdd($stream, ['field1' => 'value1', 'field2' => 'value2']);
+
+ $this->assertIsString($id);
+ $this->assertMatchesRegularExpression('/^\d+-\d+$/', $id);
+ }
+
+ public function testStreamAddWithMaxLen(): void
+ {
+ $stream = $this->testPrefix . 'stream-maxlen';
+
+ // Add 10 entries with maxlen of 5 (exact trimming by default)
+ for ($i = 0; $i < 10; $i++) {
+ $this->connection->streamAdd($stream, ['index' => (string)$i], '*', 5);
+ }
+
+ $len = $this->connection->streamLen($stream);
+ // With exact trimming (default), should have exactly 5
+ $this->assertEquals(5, $len);
+ }
+
+ public function testStreamCreateGroup(): void
+ {
+ $stream = $this->testPrefix . 'stream-group';
+ $group = 'test-group';
+
+ // Create group (also creates stream with MKSTREAM)
+ $result = $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->assertTrue($result);
+
+ // Creating same group again should return true (BUSYGROUP handled)
+ $result = $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->assertTrue($result);
+ }
+
+ public function testStreamDestroyGroup(): void
+ {
+ $stream = $this->testPrefix . 'stream-destroy';
+ $group = 'test-group';
+
+ // Create and then destroy
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $result = $this->connection->streamDestroyGroup($stream, $group);
+ $this->assertTrue($result);
+
+ // Destroying non-existent group should return true (NOGROUP handled)
+ $result = $this->connection->streamDestroyGroup($stream, 'non-existent');
+ $this->assertTrue($result);
+ }
+
+ public function testStreamReadGroupAndAck(): void
+ {
+ $stream = $this->testPrefix . 'stream-read';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ // Create group and add message
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $messageId = $this->connection->streamAdd($stream, ['data' => 'test-message']);
+
+ // Read message
+ $result = $this->connection->streamReadGroup($group, $consumer, [$stream], 1, 100);
+
+ $this->assertIsArray($result);
+ $this->assertArrayHasKey($stream, $result);
+ $this->assertNotEmpty($result[$stream]);
+
+ // Get the entry ID from result
+ $entryId = array_key_first($result[$stream]);
+
+ // Acknowledge
+ $ackCount = $this->connection->streamAck($stream, $group, $entryId);
+ $this->assertEquals(1, $ackCount);
+ }
+
+ public function testStreamPendingSummary(): void
+ {
+ $stream = $this->testPrefix . 'stream-pending';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ // Setup
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'message1']);
+ $this->connection->streamAdd($stream, ['data' => 'message2']);
+
+ // Read without acknowledging
+ $this->connection->streamReadGroup($group, $consumer, [$stream], 2, 100);
+
+ // Check pending
+ $pending = $this->connection->streamPendingSummary($stream, $group);
+
+ $this->assertIsArray($pending);
+ $this->assertEquals(2, $pending[0]); // 2 pending messages
+ }
+
+ public function testStreamPendingDetails(): void
+ {
+ $stream = $this->testPrefix . 'stream-pending-detail';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ // Setup
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'message1']);
+
+ // Read without acknowledging
+ $this->connection->streamReadGroup($group, $consumer, [$stream], 1, 100);
+
+ // Get pending details
+ $pending = $this->connection->streamPending($stream, $group, '-', '+', 10);
+
+ $this->assertIsArray($pending);
+ $this->assertCount(1, $pending);
+ $this->assertEquals($consumer, $pending[0][1]); // Consumer name
+ }
+
+ public function testStreamClaim(): void
+ {
+ $stream = $this->testPrefix . 'stream-claim';
+ $group = 'test-group';
+ $consumer1 = 'consumer-1';
+ $consumer2 = 'consumer-2';
+
+ // Setup
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $messageId = $this->connection->streamAdd($stream, ['data' => 'claim-test']);
+
+ // Consumer 1 reads
+ $this->connection->streamReadGroup($group, $consumer1, [$stream], 1, 100);
+
+ // Consumer 2 claims (with 0 idle time for testing)
+ $claimed = $this->connection->streamClaim($stream, $group, $consumer2, 0, [$messageId]);
+
+ $this->assertIsArray($claimed);
+ $this->assertNotEmpty($claimed);
+ }
+
+ public function testStreamAutoClaim(): void
+ {
+ $stream = $this->testPrefix . 'stream-autoclaim';
+ $group = 'test-group';
+ $consumer1 = 'consumer-1';
+ $consumer2 = 'consumer-2';
+
+ // Setup
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'autoclaim-test']);
+
+ // Consumer 1 reads
+ $this->connection->streamReadGroup($group, $consumer1, [$stream], 1, 100);
+
+ // Consumer 2 auto-claims (with 0 idle time for testing)
+ $result = $this->connection->streamAutoClaim($stream, $group, $consumer2, 0, '0-0', 10);
+
+ $this->assertIsArray($result);
+ $this->assertCount(3, $result); // [next_id, claimed_entries, deleted_ids]
+ }
+
+ public function testStreamDel(): void
+ {
+ $stream = $this->testPrefix . 'stream-del';
+
+ $id1 = $this->connection->streamAdd($stream, ['data' => 'message1']);
+ $id2 = $this->connection->streamAdd($stream, ['data' => 'message2']);
+
+ $this->assertEquals(2, $this->connection->streamLen($stream));
+
+ $deleted = $this->connection->streamDel($stream, [$id1]);
+ $this->assertEquals(1, $deleted);
+ $this->assertEquals(1, $this->connection->streamLen($stream));
+ }
+
+ public function testStreamLen(): void
+ {
+ $stream = $this->testPrefix . 'stream-len';
+
+ $this->assertEquals(0, $this->connection->streamLen($stream));
+
+ $this->connection->streamAdd($stream, ['data' => '1']);
+ $this->connection->streamAdd($stream, ['data' => '2']);
+ $this->connection->streamAdd($stream, ['data' => '3']);
+
+ $this->assertEquals(3, $this->connection->streamLen($stream));
+ }
+
+ public function testStreamTrim(): void
+ {
+ $stream = $this->testPrefix . 'stream-trim';
+
+ // Add 10 entries
+ for ($i = 0; $i < 10; $i++) {
+ $this->connection->streamAdd($stream, ['index' => (string)$i]);
+ }
+
+ $this->assertEquals(10, $this->connection->streamLen($stream));
+
+ // Trim to 5
+ $trimmed = $this->connection->streamTrim($stream, 5, false);
+ $this->assertEquals(5, $trimmed);
+ $this->assertEquals(5, $this->connection->streamLen($stream));
+ }
+
+ public function testStreamInfo(): void
+ {
+ $stream = $this->testPrefix . 'stream-info';
+
+ $this->connection->streamAdd($stream, ['data' => 'test']);
+
+ $info = $this->connection->streamInfo($stream);
+
+ $this->assertIsArray($info);
+ $this->assertArrayHasKey('length', $info);
+ $this->assertEquals(1, $info['length']);
+ }
+
+ public function testStreamGroupInfo(): void
+ {
+ $stream = $this->testPrefix . 'stream-group-info';
+ $group = 'test-group';
+
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+
+ $groups = $this->connection->streamGroupInfo($stream);
+
+ $this->assertIsArray($groups);
+ $this->assertCount(1, $groups);
+ $this->assertEquals($group, $groups[0]['name']);
+ }
+
+ public function testStreamConsumersInfo(): void
+ {
+ $stream = $this->testPrefix . 'stream-consumers-info';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'test']);
+ $this->connection->streamReadGroup($group, $consumer, [$stream], 1, 100);
+
+ $consumers = $this->connection->streamConsumersInfo($stream, $group);
+
+ $this->assertIsArray($consumers);
+ $this->assertCount(1, $consumers);
+ $this->assertEquals($consumer, $consumers[0]['name']);
+ }
+
+ public function testStreamRange(): void
+ {
+ $stream = $this->testPrefix . 'stream-range';
+
+ $id1 = $this->connection->streamAdd($stream, ['index' => '1']);
+ $id2 = $this->connection->streamAdd($stream, ['index' => '2']);
+ $id3 = $this->connection->streamAdd($stream, ['index' => '3']);
+
+ // Get all
+ $entries = $this->connection->streamRange($stream, '-', '+');
+ $this->assertCount(3, $entries);
+
+ // Get with count
+ $entries = $this->connection->streamRange($stream, '-', '+', 2);
+ $this->assertCount(2, $entries);
+
+ // Get specific range
+ $entries = $this->connection->streamRange($stream, $id2, $id2);
+ $this->assertCount(1, $entries);
+ $this->assertEquals('2', $entries[$id2]['index']);
+ }
+
+ public function testStreamRevRange(): void
+ {
+ $stream = $this->testPrefix . 'stream-revrange';
+
+ $this->connection->streamAdd($stream, ['index' => '1']);
+ $this->connection->streamAdd($stream, ['index' => '2']);
+ $this->connection->streamAdd($stream, ['index' => '3']);
+
+ $entries = $this->connection->streamRevRange($stream, '+', '-', 2);
+
+ $this->assertCount(2, $entries);
+ // First entry should be the latest (index 3)
+ $firstEntry = reset($entries);
+ $this->assertEquals('3', $firstEntry['index']);
+ }
+
+ public function testStreamDeleteConsumer(): void
+ {
+ $stream = $this->testPrefix . 'stream-del-consumer';
+ $group = 'test-group';
+ $consumer = 'test-consumer';
+
+ $this->connection->streamCreateGroup($stream, $group, '0', true);
+ $this->connection->streamAdd($stream, ['data' => 'test']);
+ $this->connection->streamReadGroup($group, $consumer, [$stream], 1, 100);
+
+ // Delete consumer
+ $pending = $this->connection->streamDeleteConsumer($stream, $group, $consumer);
+ $this->assertIsInt($pending);
+
+ // Consumer should be gone
+ $consumers = $this->connection->streamConsumersInfo($stream, $group);
+ $this->assertEmpty($consumers);
+ }
+
+
+ public function testSortedSetAdd(): void
+ {
+ $key = $this->testPrefix . 'zset-add';
+
+ $result = $this->connection->sortedSetAdd($key, 1.0, 'member1');
+ $this->assertEquals(1, $result);
+
+ // Adding same member updates score, returns 0
+ $result = $this->connection->sortedSetAdd($key, 2.0, 'member1');
+ $this->assertEquals(0, $result);
+ }
+
+ public function testSortedSetPopByScore(): void
+ {
+ $key = $this->testPrefix . 'zset-pop';
+
+ $this->connection->sortedSetAdd($key, 100, 'a');
+ $this->connection->sortedSetAdd($key, 200, 'b');
+ $this->connection->sortedSetAdd($key, 300, 'c');
+ $this->connection->sortedSetAdd($key, 400, 'd');
+
+ // Pop scores 0-250
+ $popped = $this->connection->sortedSetPopByScore($key, 0, 250, 10);
+
+ $this->assertCount(2, $popped);
+ $this->assertContains('a', $popped);
+ $this->assertContains('b', $popped);
+
+ // Verify they're removed
+ $this->assertEquals(2, $this->connection->sortedSetSize($key));
+ }
+
+ public function testSortedSetRangeByScore(): void
+ {
+ $key = $this->testPrefix . 'zset-range';
+
+ $this->connection->sortedSetAdd($key, 100, 'a');
+ $this->connection->sortedSetAdd($key, 200, 'b');
+ $this->connection->sortedSetAdd($key, 300, 'c');
+
+ $members = $this->connection->sortedSetRangeByScore($key, 150, 350);
+
+ $this->assertCount(2, $members);
+ $this->assertContains('b', $members);
+ $this->assertContains('c', $members);
+ }
+
+ public function testSortedSetSize(): void
+ {
+ $key = $this->testPrefix . 'zset-size';
+
+ $this->assertEquals(0, $this->connection->sortedSetSize($key));
+
+ $this->connection->sortedSetAdd($key, 1, 'a');
+ $this->connection->sortedSetAdd($key, 2, 'b');
+
+ $this->assertEquals(2, $this->connection->sortedSetSize($key));
+ }
+
+ public function testSortedSetRemove(): void
+ {
+ $key = $this->testPrefix . 'zset-remove';
+
+ $this->connection->sortedSetAdd($key, 1, 'member');
+
+ $result = $this->connection->sortedSetRemove($key, 'member');
+ $this->assertEquals(1, $result);
+
+ $result = $this->connection->sortedSetRemove($key, 'non-existent');
+ $this->assertEquals(0, $result);
+ }
+
+ public function testSortedSetScore(): void
+ {
+ $key = $this->testPrefix . 'zset-score';
+
+ $this->connection->sortedSetAdd($key, 123.456, 'member');
+
+ $score = $this->connection->sortedSetScore($key, 'member');
+ $this->assertEquals(123.456, $score);
+
+ $score = $this->connection->sortedSetScore($key, 'non-existent');
+ $this->assertFalse($score);
+ }
+
+
+ public function testHashSet(): void
+ {
+ $key = $this->testPrefix . 'hash-set';
+
+ $result = $this->connection->hashSet($key, 'field1', 'value1');
+ $this->assertTrue($result);
+ }
+
+ public function testHashGet(): void
+ {
+ $key = $this->testPrefix . 'hash-get';
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+
+ $value = $this->connection->hashGet($key, 'field1');
+ $this->assertEquals('value1', $value);
+
+ $value = $this->connection->hashGet($key, 'non-existent');
+ $this->assertFalse($value);
+ }
+
+ public function testHashGetAll(): void
+ {
+ $key = $this->testPrefix . 'hash-getall';
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+ $this->connection->hashSet($key, 'field2', 'value2');
+
+ $all = $this->connection->hashGetAll($key);
+
+ $this->assertEquals(['field1' => 'value1', 'field2' => 'value2'], $all);
+ }
+
+ public function testHashDel(): void
+ {
+ $key = $this->testPrefix . 'hash-del';
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+
+ $result = $this->connection->hashDel($key, 'field1');
+ $this->assertEquals(1, $result);
+
+ $result = $this->connection->hashDel($key, 'non-existent');
+ $this->assertEquals(0, $result);
+ }
+
+ public function testHashExists(): void
+ {
+ $key = $this->testPrefix . 'hash-exists';
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+
+ $this->assertTrue($this->connection->hashExists($key, 'field1'));
+ $this->assertFalse($this->connection->hashExists($key, 'non-existent'));
+ }
+
+ public function testHashLen(): void
+ {
+ $key = $this->testPrefix . 'hash-len';
+
+ $this->assertEquals(0, $this->connection->hashLen($key));
+
+ $this->connection->hashSet($key, 'field1', 'value1');
+ $this->connection->hashSet($key, 'field2', 'value2');
+
+ $this->assertEquals(2, $this->connection->hashLen($key));
+ }
+}
diff --git a/tests/Queue/Unit/RedisStreamsBrokerTest.php b/tests/Queue/Unit/RedisStreamsBrokerTest.php
new file mode 100644
index 0000000..a16c107
--- /dev/null
+++ b/tests/Queue/Unit/RedisStreamsBrokerTest.php
@@ -0,0 +1,429 @@
+connection = new RedisStream('redis', 6379);
+ $this->broker = new RedisStreams($this->connection, 1000, 3, 1000);
+ $this->testNamespace = 'test-' . uniqid();
+ $this->queue = new Queue('test-queue', $this->testNamespace);
+ }
+
+ protected function tearDown(): void
+ {
+ // Clean up test keys
+ $this->cleanupTestKeys();
+ $this->connection->close();
+ }
+
+ private function cleanupTestKeys(): void
+ {
+ $redis = new \Redis();
+ $redis->connect('redis', 6379);
+
+ $keys = $redis->keys($this->testNamespace . '*');
+ if (!empty($keys)) {
+ $redis->del($keys);
+ }
+
+ $redis->close();
+ }
+
+
+ public function testEnqueue(): void
+ {
+ $result = $this->broker->enqueue($this->queue, ['task' => 'test', 'data' => 123]);
+
+ $this->assertTrue($result);
+ $this->assertGreaterThan(0, $this->broker->getQueueSize($this->queue));
+ }
+
+ public function testEnqueueMultiple(): void
+ {
+ for ($i = 0; $i < 5; $i++) {
+ $this->broker->enqueue($this->queue, ['index' => $i]);
+ }
+
+ $this->assertEquals(5, $this->broker->getQueueSize($this->queue));
+ }
+
+ public function testGetQueueSize(): void
+ {
+ $this->assertEquals(0, $this->broker->getQueueSize($this->queue));
+
+ $this->broker->enqueue($this->queue, ['test' => 1]);
+ $this->broker->enqueue($this->queue, ['test' => 2]);
+
+ $this->assertEquals(2, $this->broker->getQueueSize($this->queue));
+ }
+
+ public function testGetQueueSizeFailedJobs(): void
+ {
+ // Initially no failed jobs
+ $this->assertEquals(0, $this->broker->getQueueSize($this->queue, true));
+ }
+
+ public function testConsumerId(): void
+ {
+ // Default consumer ID
+ $defaultId = $this->broker->getConsumerId();
+ $this->assertStringStartsWith('worker-', $defaultId);
+
+ // Set custom consumer ID
+ $this->broker->setConsumerId('custom-worker-123');
+ $this->assertEquals('custom-worker-123', $this->broker->getConsumerId());
+ }
+
+
+ public function testEnqueueDelayed(): void
+ {
+ $result = $this->broker->enqueueDelayed($this->queue, ['task' => 'delayed'], 60);
+
+ $this->assertTrue($result);
+ $this->assertEquals(1, $this->broker->getDelayedCount($this->queue));
+ }
+
+ public function testEnqueueAt(): void
+ {
+ $futureTime = time() + 3600;
+ $result = $this->broker->enqueueAt($this->queue, ['task' => 'scheduled'], $futureTime);
+
+ $this->assertTrue($result);
+ $this->assertEquals(1, $this->broker->getDelayedCount($this->queue));
+ }
+
+ public function testGetDelayedCount(): void
+ {
+ $this->assertEquals(0, $this->broker->getDelayedCount($this->queue));
+
+ $this->broker->enqueueDelayed($this->queue, ['task' => 1], 60);
+ $this->broker->enqueueDelayed($this->queue, ['task' => 2], 120);
+
+ $this->assertEquals(2, $this->broker->getDelayedCount($this->queue));
+ }
+
+ public function testQueueSizeIncludesDelayed(): void
+ {
+ $this->broker->enqueue($this->queue, ['immediate' => true]);
+ $this->broker->enqueueDelayed($this->queue, ['delayed' => true], 60);
+
+ // Total size should include both immediate and delayed
+ $this->assertEquals(2, $this->broker->getQueueSize($this->queue));
+ }
+
+
+ public function testScheduleCron(): void
+ {
+ $schedule = new Schedule(
+ id: 'test-cron-schedule',
+ payload: ['type' => 'cron-job'],
+ cron: '*/5 * * * *'
+ );
+
+ $result = $this->broker->schedule($this->queue, $schedule);
+ $this->assertTrue($result);
+
+ // Verify schedule was stored
+ $retrieved = $this->broker->getSchedule($this->queue, 'test-cron-schedule');
+ $this->assertNotNull($retrieved);
+ $this->assertEquals('test-cron-schedule', $retrieved->id);
+ $this->assertEquals('*/5 * * * *', $retrieved->cron);
+ }
+
+ public function testScheduleInterval(): void
+ {
+ $schedule = new Schedule(
+ id: 'test-interval-schedule',
+ payload: ['type' => 'interval-job'],
+ interval: 300
+ );
+
+ $result = $this->broker->schedule($this->queue, $schedule);
+ $this->assertTrue($result);
+
+ $retrieved = $this->broker->getSchedule($this->queue, 'test-interval-schedule');
+ $this->assertNotNull($retrieved);
+ $this->assertEquals(300, $retrieved->interval);
+ }
+
+ public function testUnschedule(): void
+ {
+ $schedule = new Schedule(
+ id: 'to-remove',
+ payload: ['remove' => true],
+ interval: 60
+ );
+
+ $this->broker->schedule($this->queue, $schedule);
+ $this->assertNotNull($this->broker->getSchedule($this->queue, 'to-remove'));
+
+ $result = $this->broker->unschedule($this->queue, 'to-remove');
+ $this->assertTrue($result);
+
+ $this->assertNull($this->broker->getSchedule($this->queue, 'to-remove'));
+ }
+
+ public function testGetSchedules(): void
+ {
+ $this->broker->schedule($this->queue, new Schedule('sched-1', ['a' => 1], interval: 60));
+ $this->broker->schedule($this->queue, new Schedule('sched-2', ['b' => 2], interval: 120));
+ $this->broker->schedule($this->queue, new Schedule('sched-3', ['c' => 3], cron: '0 * * * *'));
+
+ $schedules = $this->broker->getSchedules($this->queue);
+
+ $this->assertCount(3, $schedules);
+ $this->assertArrayHasKey('sched-1', $schedules);
+ $this->assertArrayHasKey('sched-2', $schedules);
+ $this->assertArrayHasKey('sched-3', $schedules);
+ }
+
+ public function testPauseSchedule(): void
+ {
+ $schedule = new Schedule(
+ id: 'pausable',
+ payload: ['pause' => 'test'],
+ interval: 60
+ );
+
+ $this->broker->schedule($this->queue, $schedule);
+
+ $result = $this->broker->pauseSchedule($this->queue, 'pausable');
+ $this->assertTrue($result);
+
+ $retrieved = $this->broker->getSchedule($this->queue, 'pausable');
+ $this->assertTrue($retrieved->isPaused());
+ $this->assertFalse($retrieved->isActive());
+ }
+
+ public function testResumeSchedule(): void
+ {
+ $schedule = new Schedule(
+ id: 'resumable',
+ payload: ['resume' => 'test'],
+ interval: 60
+ );
+
+ $this->broker->schedule($this->queue, $schedule);
+ $this->broker->pauseSchedule($this->queue, 'resumable');
+
+ $result = $this->broker->resumeSchedule($this->queue, 'resumable');
+ $this->assertTrue($result);
+
+ $retrieved = $this->broker->getSchedule($this->queue, 'resumable');
+ $this->assertFalse($retrieved->isPaused());
+ $this->assertTrue($retrieved->isActive());
+ }
+
+ public function testPauseNonExistentSchedule(): void
+ {
+ $result = $this->broker->pauseSchedule($this->queue, 'non-existent');
+ $this->assertFalse($result);
+ }
+
+ public function testResumeNonExistentSchedule(): void
+ {
+ $result = $this->broker->resumeSchedule($this->queue, 'non-existent');
+ $this->assertFalse($result);
+ }
+
+
+ public function testGetStreamInfo(): void
+ {
+ // Add some messages first
+ $this->broker->enqueue($this->queue, ['test' => 1]);
+ $this->broker->enqueue($this->queue, ['test' => 2]);
+
+ $info = $this->broker->getStreamInfo($this->queue);
+
+ $this->assertIsArray($info);
+ $this->assertArrayHasKey('length', $info);
+ $this->assertEquals(2, $info['length']);
+ }
+
+ public function testGetStreamInfoEmpty(): void
+ {
+ // Empty stream (doesn't exist yet)
+ $info = $this->broker->getStreamInfo($this->queue);
+ $this->assertIsArray($info);
+ }
+
+ public function testGetGroupInfo(): void
+ {
+ // Need to trigger group creation by enqueueing
+ $this->broker->enqueue($this->queue, ['test' => true]);
+
+ $groupInfo = $this->broker->getGroupInfo($this->queue);
+
+ $this->assertIsArray($groupInfo);
+ if (!empty($groupInfo)) {
+ $this->assertArrayHasKey('name', $groupInfo);
+ }
+ }
+
+ public function testGetConsumersInfo(): void
+ {
+ $this->broker->enqueue($this->queue, ['test' => true]);
+
+ $consumers = $this->broker->getConsumersInfo($this->queue);
+ $this->assertIsArray($consumers);
+ }
+
+ public function testGetLag(): void
+ {
+ $lag = $this->broker->getLag($this->queue);
+ $this->assertIsInt($lag);
+ $this->assertGreaterThanOrEqual(0, $lag);
+ }
+
+ public function testGetPendingCount(): void
+ {
+ $pending = $this->broker->getPendingCount($this->queue);
+ $this->assertIsInt($pending);
+ $this->assertGreaterThanOrEqual(0, $pending);
+ }
+
+ public function testGetMessages(): void
+ {
+ $this->broker->enqueue($this->queue, ['msg' => 1]);
+ $this->broker->enqueue($this->queue, ['msg' => 2]);
+ $this->broker->enqueue($this->queue, ['msg' => 3]);
+
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 10);
+
+ $this->assertIsArray($messages);
+ $this->assertCount(3, $messages);
+
+ foreach ($messages as $message) {
+ $this->assertInstanceOf(Message::class, $message);
+ }
+ }
+
+ public function testGetMessagesWithLimit(): void
+ {
+ for ($i = 0; $i < 10; $i++) {
+ $this->broker->enqueue($this->queue, ['index' => $i]);
+ }
+
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 5);
+
+ $this->assertCount(5, $messages);
+ }
+
+ public function testGetMessage(): void
+ {
+ $this->broker->enqueue($this->queue, ['specific' => 'message']);
+
+ // Get all messages to find the ID
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 1);
+ $this->assertNotEmpty($messages);
+
+ $firstMessage = $messages[0];
+ $streamId = $firstMessage->getPayload()['streamId'] ?? null;
+
+ // Skip if streamId is not available in payload
+ if ($streamId) {
+ $retrieved = $this->broker->getMessage($this->queue, $streamId);
+ $this->assertNotNull($retrieved);
+ }
+ }
+
+ public function testGetMessageNonExistent(): void
+ {
+ $message = $this->broker->getMessage($this->queue, '0-0');
+ $this->assertNull($message);
+ }
+
+ public function testTrimStream(): void
+ {
+ // Add 20 messages
+ for ($i = 0; $i < 20; $i++) {
+ $this->broker->enqueue($this->queue, ['index' => $i]);
+ }
+
+ $initialSize = $this->broker->getQueueSize($this->queue) - $this->broker->getDelayedCount($this->queue);
+ $this->assertEquals(20, $initialSize);
+
+ // Trim to 10 (uses exact trimming, not approximate)
+ $trimmed = $this->broker->trimStream($this->queue, 10);
+
+ // Verify stream was trimmed
+ $finalSize = $this->broker->getQueueSize($this->queue) - $this->broker->getDelayedCount($this->queue);
+ $this->assertLessThanOrEqual(10, $finalSize);
+ $this->assertGreaterThanOrEqual(0, $trimmed);
+ }
+
+ public function testDeleteConsumer(): void
+ {
+ $this->broker->enqueue($this->queue, ['test' => true]);
+
+ // This will create the consumer group
+ $pending = $this->broker->deleteConsumer($this->queue, 'non-existent-consumer');
+ $this->assertIsInt($pending);
+ }
+
+
+ public function testMessageFormat(): void
+ {
+ $payload = ['key' => 'value', 'nested' => ['a' => 1, 'b' => 2]];
+ $this->broker->enqueue($this->queue, $payload);
+
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 1);
+ $this->assertCount(1, $messages);
+
+ $message = $messages[0];
+ $this->assertInstanceOf(Message::class, $message);
+
+ $messagePayload = $message->getPayload();
+ $this->assertEquals('value', $messagePayload['key']);
+ $this->assertEquals(['a' => 1, 'b' => 2], $messagePayload['nested']);
+ }
+
+ public function testMessageTimestamp(): void
+ {
+ $beforeEnqueue = time();
+ $this->broker->enqueue($this->queue, ['test' => true]);
+ $afterEnqueue = time();
+
+ $messages = $this->broker->getMessages($this->queue, '-', '+', 1);
+ $message = $messages[0];
+
+ $timestamp = $message->getTimestamp();
+ $this->assertGreaterThanOrEqual($beforeEnqueue, $timestamp);
+ $this->assertLessThanOrEqual($afterEnqueue, $timestamp);
+ }
+
+
+ public function testRetryEmptyDLQ(): void
+ {
+ // Should not throw when DLQ is empty
+ $this->broker->retry($this->queue);
+ $this->assertTrue(true);
+ }
+
+ public function testRetryWithLimit(): void
+ {
+ // Should not throw
+ $this->broker->retry($this->queue, 5);
+ $this->assertTrue(true);
+ }
+}
diff --git a/tests/Queue/Unit/ScheduleTest.php b/tests/Queue/Unit/ScheduleTest.php
new file mode 100644
index 0000000..4bfc81b
--- /dev/null
+++ b/tests/Queue/Unit/ScheduleTest.php
@@ -0,0 +1,450 @@
+ 'cleanup'],
+ cron: '*/5 * * * *'
+ );
+
+ $this->assertEquals('test-cron', $schedule->id);
+ $this->assertEquals(['task' => 'cleanup'], $schedule->payload);
+ $this->assertEquals('*/5 * * * *', $schedule->cron);
+ $this->assertNull($schedule->interval);
+ $this->assertTrue($schedule->isActive());
+ }
+
+ public function testIntervalScheduleCreation(): void
+ {
+ $schedule = new Schedule(
+ id: 'test-interval',
+ payload: ['task' => 'sync'],
+ interval: 300
+ );
+
+ $this->assertEquals('test-interval', $schedule->id);
+ $this->assertEquals(300, $schedule->interval);
+ $this->assertNull($schedule->cron);
+ $this->assertTrue($schedule->isActive());
+ }
+
+ public function testInvalidScheduleNoCronOrInterval(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Either cron or interval must be specified');
+
+ new Schedule(
+ id: 'invalid',
+ payload: []
+ );
+ }
+
+ public function testInvalidScheduleBothCronAndInterval(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Cannot specify both cron and interval');
+
+ new Schedule(
+ id: 'invalid',
+ payload: [],
+ cron: '* * * * *',
+ interval: 60
+ );
+ }
+
+ public function testInvalidCronExpression(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Invalid cron expression');
+
+ new Schedule(
+ id: 'invalid',
+ payload: [],
+ cron: 'not a valid cron'
+ );
+ }
+
+ public function testInvalidIntervalZero(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Interval must be greater than 0');
+
+ new Schedule(
+ id: 'invalid',
+ payload: [],
+ interval: 0
+ );
+ }
+
+ public function testIntervalNextRunTime(): void
+ {
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60
+ );
+
+ $now = time();
+ $nextRun = $schedule->getNextRunTime();
+
+ // First run should be now or very close to now
+ $this->assertLessThanOrEqual($now + 1, $nextRun);
+
+ // Second run should be 60 seconds after first
+ $nextRunAfterFirst = $schedule->getNextRunTime($nextRun);
+ $this->assertEquals($nextRun + 60, $nextRunAfterFirst);
+ }
+
+ public function testCronNextRunTime(): void
+ {
+ // Every minute
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ cron: '* * * * *'
+ );
+
+ $now = time();
+ $nextRun = $schedule->getNextRunTime();
+
+ // Should be within the next minute
+ $this->assertGreaterThan($now, $nextRun);
+ $this->assertLessThanOrEqual($now + 60, $nextRun);
+ }
+
+ public function testStartAtConstraint(): void
+ {
+ $futureTime = time() + 3600; // 1 hour from now
+
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60,
+ startAt: $futureTime
+ );
+
+ $nextRun = $schedule->getNextRunTime();
+
+ // Should not run before startAt
+ $this->assertGreaterThanOrEqual($futureTime, $nextRun);
+ }
+
+ public function testEndAtConstraint(): void
+ {
+ $pastTime = time() - 3600; // 1 hour ago
+
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60,
+ endAt: $pastTime
+ );
+
+ // Schedule should not be active since endAt has passed
+ $this->assertFalse($schedule->isActive());
+ $this->assertNull($schedule->getNextRunTime());
+ }
+
+ public function testMaxRunsConstraint(): void
+ {
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60,
+ maxRuns: 3
+ );
+
+ $this->assertTrue($schedule->isActive());
+
+ // Simulate 3 runs
+ $schedule = $schedule->incrementRunCount();
+ $this->assertEquals(1, $schedule->getRunCount());
+
+ $schedule = $schedule->incrementRunCount();
+ $this->assertEquals(2, $schedule->getRunCount());
+
+ $schedule = $schedule->incrementRunCount();
+ $this->assertEquals(3, $schedule->getRunCount());
+
+ // Should no longer be active after max runs
+ $this->assertFalse($schedule->isActive());
+ $this->assertNull($schedule->getNextRunTime());
+ }
+
+ public function testPauseAndResume(): void
+ {
+ $schedule = new Schedule(
+ id: 'test',
+ payload: [],
+ interval: 60
+ );
+
+ $this->assertTrue($schedule->isActive());
+ $this->assertFalse($schedule->isPaused());
+
+ // Pause
+ $paused = $schedule->pause();
+ $this->assertTrue($paused->isPaused());
+ $this->assertFalse($paused->isActive());
+ $this->assertNull($paused->getNextRunTime());
+
+ // Resume
+ $resumed = $paused->resume();
+ $this->assertFalse($resumed->isPaused());
+ $this->assertTrue($resumed->isActive());
+ $this->assertNotNull($resumed->getNextRunTime());
+ }
+
+ public function testToArrayAndFromArray(): void
+ {
+ $schedule = new Schedule(
+ id: 'test-serialization',
+ payload: ['key' => 'value', 'nested' => ['a' => 1]],
+ cron: '0 9 * * *',
+ startAt: 1704067200,
+ endAt: 1735689600,
+ maxRuns: 100
+ );
+
+ $array = $schedule->toArray();
+
+ $this->assertEquals('test-serialization', $array['id']);
+ $this->assertEquals(['key' => 'value', 'nested' => ['a' => 1]], $array['payload']);
+ $this->assertEquals('0 9 * * *', $array['cron']);
+ $this->assertEquals(1704067200, $array['startAt']);
+ $this->assertEquals(1735689600, $array['endAt']);
+ $this->assertEquals(100, $array['maxRuns']);
+
+ // Reconstruct from array
+ $reconstructed = Schedule::fromArray($array);
+
+ $this->assertEquals($schedule->id, $reconstructed->id);
+ $this->assertEquals($schedule->payload, $reconstructed->payload);
+ $this->assertEquals($schedule->cron, $reconstructed->cron);
+ $this->assertEquals($schedule->startAt, $reconstructed->startAt);
+ $this->assertEquals($schedule->endAt, $reconstructed->endAt);
+ $this->assertEquals($schedule->maxRuns, $reconstructed->maxRuns);
+ }
+
+ public function testGetDescription(): void
+ {
+ $cronSchedule = new Schedule(
+ id: 'cron',
+ payload: [],
+ cron: '0 9 * * *'
+ );
+ $this->assertStringContainsString('Cron:', $cronSchedule->getDescription());
+
+ $secondsSchedule = new Schedule(
+ id: 'seconds',
+ payload: [],
+ interval: 30
+ );
+ $this->assertStringContainsString('second', $secondsSchedule->getDescription());
+
+ $minutesSchedule = new Schedule(
+ id: 'minutes',
+ payload: [],
+ interval: 300
+ );
+ $this->assertStringContainsString('minute', $minutesSchedule->getDescription());
+
+ $hoursSchedule = new Schedule(
+ id: 'hours',
+ payload: [],
+ interval: 7200
+ );
+ $this->assertStringContainsString('hour', $hoursSchedule->getDescription());
+
+ $daysSchedule = new Schedule(
+ id: 'days',
+ payload: [],
+ interval: 172800
+ );
+ $this->assertStringContainsString('day', $daysSchedule->getDescription());
+ }
+
+ public function testInvalidIntervalNegative(): void
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('Interval must be greater than 0');
+
+ new Schedule(
+ id: 'invalid',
+ payload: [],
+ interval: -10
+ );
+ }
+
+ public function testCronVariousExpressions(): void
+ {
+ // Daily at midnight
+ $daily = new Schedule('daily', [], cron: '0 0 * * *');
+ $this->assertEquals('0 0 * * *', $daily->cron);
+
+ // Every hour
+ $hourly = new Schedule('hourly', [], cron: '0 * * * *');
+ $this->assertEquals('0 * * * *', $hourly->cron);
+
+ // Weekdays at 9am
+ $weekdays = new Schedule('weekdays', [], cron: '0 9 * * 1-5');
+ $this->assertEquals('0 9 * * 1-5', $weekdays->cron);
+
+ // Every 15 minutes
+ $quarter = new Schedule('quarter', [], cron: '*/15 * * * *');
+ $this->assertEquals('*/15 * * * *', $quarter->cron);
+ }
+
+ public function testIntervalCatchUp(): void
+ {
+ $schedule = new Schedule(
+ id: 'catch-up',
+ payload: [],
+ interval: 60
+ );
+
+ // Simulate a last run 5 minutes ago
+ $lastRun = time() - 300;
+ $nextRun = $schedule->getNextRunTime($lastRun);
+
+ // Should catch up - next run should be in the future or very close
+ $this->assertGreaterThanOrEqual(time() - 1, $nextRun);
+ }
+
+ public function testImmutableOperations(): void
+ {
+ $original = new Schedule(
+ id: 'immutable-test',
+ payload: ['data' => 'value'],
+ interval: 60
+ );
+
+ // Increment should return new instance
+ $incremented = $original->incrementRunCount();
+ $this->assertEquals(0, $original->getRunCount());
+ $this->assertEquals(1, $incremented->getRunCount());
+
+ // Pause should return new instance
+ $paused = $original->pause();
+ $this->assertFalse($original->isPaused());
+ $this->assertTrue($paused->isPaused());
+
+ // Resume should return new instance
+ $resumed = $paused->resume();
+ $this->assertTrue($paused->isPaused());
+ $this->assertFalse($resumed->isPaused());
+ }
+
+ public function testFromArrayWithDefaults(): void
+ {
+ $minimal = [
+ 'id' => 'minimal',
+ 'payload' => ['test' => true],
+ 'interval' => 60,
+ ];
+
+ $schedule = Schedule::fromArray($minimal);
+
+ $this->assertEquals('minimal', $schedule->id);
+ $this->assertNull($schedule->cron);
+ $this->assertEquals(60, $schedule->interval);
+ $this->assertNull($schedule->startAt);
+ $this->assertNull($schedule->endAt);
+ $this->assertNull($schedule->maxRuns);
+ $this->assertEquals(0, $schedule->getRunCount());
+ $this->assertFalse($schedule->isPaused());
+ }
+
+ public function testFromArrayPreservesState(): void
+ {
+ $data = [
+ 'id' => 'stateful',
+ 'payload' => [],
+ 'interval' => 60,
+ 'runCount' => 5,
+ 'paused' => true,
+ ];
+
+ $schedule = Schedule::fromArray($data);
+
+ $this->assertEquals(5, $schedule->getRunCount());
+ $this->assertTrue($schedule->isPaused());
+ }
+
+ public function testEndAtInFuture(): void
+ {
+ $futureEndAt = time() + 3600; // 1 hour from now
+
+ $schedule = new Schedule(
+ id: 'future-end',
+ payload: [],
+ interval: 60,
+ endAt: $futureEndAt
+ );
+
+ $this->assertTrue($schedule->isActive());
+ $nextRun = $schedule->getNextRunTime();
+ $this->assertNotNull($nextRun);
+ $this->assertLessThan($futureEndAt, $nextRun);
+ }
+
+ public function testNextRunRespectsFutureEndAt(): void
+ {
+ // endAt is 30 seconds from now
+ $endAt = time() + 30;
+
+ $schedule = new Schedule(
+ id: 'end-soon',
+ payload: [],
+ interval: 60, // 60 second interval
+ endAt: $endAt
+ );
+
+ // First run is now
+ $firstRun = $schedule->getNextRunTime();
+ $this->assertNotNull($firstRun);
+
+ // Next run would be 60s after first run, which is after endAt
+ $secondRun = $schedule->getNextRunTime($firstRun);
+ $this->assertNull($secondRun); // Should be null because it would exceed endAt
+ }
+
+ public function testComplexPayload(): void
+ {
+ $complexPayload = [
+ 'string' => 'value',
+ 'number' => 42,
+ 'float' => 3.14,
+ 'bool' => true,
+ 'null' => null,
+ 'array' => [1, 2, 3],
+ 'nested' => [
+ 'deep' => [
+ 'value' => 'found'
+ ]
+ ]
+ ];
+
+ $schedule = new Schedule(
+ id: 'complex',
+ payload: $complexPayload,
+ interval: 60
+ );
+
+ $this->assertEquals($complexPayload, $schedule->payload);
+
+ // Verify serialization preserves structure
+ $array = $schedule->toArray();
+ $reconstructed = Schedule::fromArray($array);
+
+ $this->assertEquals($complexPayload, $reconstructed->payload);
+ }
+}
diff --git a/tests/Queue/servers/SwooleRedisStreams/Dockerfile b/tests/Queue/servers/SwooleRedisStreams/Dockerfile
new file mode 100644
index 0000000..eb30cec
--- /dev/null
+++ b/tests/Queue/servers/SwooleRedisStreams/Dockerfile
@@ -0,0 +1,5 @@
+FROM phpswoole/swoole:php8.3-alpine
+
+RUN apk add autoconf build-base
+
+RUN docker-php-ext-enable redis
diff --git a/tests/Queue/servers/SwooleRedisStreams/worker.php b/tests/Queue/servers/SwooleRedisStreams/worker.php
new file mode 100644
index 0000000..499ed2e
--- /dev/null
+++ b/tests/Queue/servers/SwooleRedisStreams/worker.php
@@ -0,0 +1,33 @@
+job()->inject('message')->action(handleRequest(...));
+
+$server
+ ->error()
+ ->inject('error')
+ ->action(function ($th) {
+ echo $th->getMessage() . PHP_EOL;
+ });
+
+$server->workerStart()->action(function () {
+ echo 'Worker Started (Redis Streams)' . PHP_EOL;
+});
+
+$server->workerStop()->action(function () {
+ echo 'Worker Stopped (Redis Streams)' . PHP_EOL;
+});
+
+$server->start();