Skip to content

Commit ab37b28

Browse files
committed
Fixed failed job wrong queue name
1 parent d50f1c7 commit ab37b28

File tree

1 file changed

+25
-14
lines changed

1 file changed

+25
-14
lines changed

src/PubSubQueue.php

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ public function getHandler($subscriber)
9999
*
100100
* @return mixed
101101
*/
102-
public function push($job, $data = '', $queue = null)
102+
public function push($job, $data = '', $topicName = null)
103103
{
104-
return $this->pushRaw($this->createPayload($job, $queue, $data), $queue);
104+
return $this->pushRaw($this->createPayload($job, $topicName, $data), $topicName);
105105
}
106106

107107
/**
@@ -113,10 +113,9 @@ public function push($job, $data = '', $queue = null)
113113
*
114114
* @return array
115115
*/
116-
public function pushRaw($payload, $subscriber = null, array $options = [])
116+
public function pushRaw($payload, $topicName = null, array $options = [])
117117
{
118-
$topic = $this->getTopic($subscriber, true);
119-
118+
$topic = $this->getTopic($topicName, true);
120119
$publish = ['data' => $payload];
121120

122121
if (!empty($options)) {
@@ -159,7 +158,6 @@ public function pop($subscriber = null)
159158
{
160159
$this->subscriber = $subscriber;
161160
$topic = $this->getTopic($this->getQueue($subscriber));
162-
163161
$subscription = $topic->subscription($subscriber);
164162
$messages = $subscription->pull([
165163
'returnImmediately' => true,
@@ -220,12 +218,13 @@ public function acknowledge(Message $message, $queue = null)
220218
*
221219
* @return mixed
222220
*/
223-
public function acknowledgeAndPublish(Message $message, $queue = null, $options = [], $delay = 0)
221+
public function acknowledgeAndPublish(Message $message, $topic = null, $options = [], $delay = 0)
224222
{
225223
if (isset($options['attempts'])) {
226224
$options['attempts'] = (string) $options['attempts'];
227225
}
228-
$topic = $this->getTopic($this->getQueue($queue));
226+
$topic = $this->getTopic($topic);
227+
229228
$subscription = $topic->subscription($this->subscriber);
230229

231230
$subscription->acknowledge($message);
@@ -250,10 +249,9 @@ public function acknowledgeAndPublish(Message $message, $queue = null, $options
250249
*
251250
* @throws \Illuminate\Queue\InvalidPayloadException
252251
*/
253-
protected function createPayload($job, $queue, $data = '')
252+
protected function createPayload($job, $topicName, $data = '')
254253
{
255-
$payload = parent::createPayload($job, $queue, $data);
256-
254+
$payload = parent::createPayload($job, $topicName, $data);
257255
return base64_encode($payload);
258256
}
259257

@@ -276,18 +274,31 @@ protected function createPayloadArray($job, $queue, $data = '')
276274
* Get the current topic.
277275
*
278276
* @param string $queue
279-
* @param string $create
280277
*
281278
* @return \Google\Cloud\PubSub\Topic
282279
*/
283-
public function getTopic($queue, $create = false)
280+
public function getTopic($queue)
284281
{
285-
$queue = $this->getQueue($queue);
286282
$topic = $this->pubsub->topic($queue);
287283

288284
return $topic;
289285
}
290286

287+
/**
288+
* Get the current topic using subscriber.
289+
*
290+
* @param string $queue
291+
*
292+
* @return \Google\Cloud\PubSub\Topic
293+
*/
294+
public function getTopicUsingSubscriber($subscriberName)
295+
{
296+
$topicName = $this->getQueue($subscriberName);
297+
$topic = $this->pubsub->topic($topicName);
298+
299+
return $topic;
300+
}
301+
291302
/**
292303
* Create a new subscription to a topic.
293304
*

0 commit comments

Comments
 (0)