Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
"ext-json": "*",
"symfony/framework-bundle": "^4|^5",
"nesbot/carbon": "^1|^2",
"ramsey/uuid": "^3.7|^4.1"
"ramsey/uuid": "^3.7|^4.1",
"symfony/serializer": "^4|^5",
"symfony/property-access": "^4|^5"
},
"require-dev": {
"phpunit/phpunit": "^8",
Expand Down
2,675 changes: 1,386 additions & 1,289 deletions composer.lock

Large diffs are not rendered by default.

88 changes: 88 additions & 0 deletions src/Mcfedr/QueueManagerBundle/Controller/PubSubController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace Mcfedr\QueueManagerBundle\Controller;

use Google\Auth\AccessToken;
use Mcfedr\QueueManagerBundle\Exception\UnrecoverableJobExceptionInterface;
use Mcfedr\QueueManagerBundle\Model\PubSubData;
use Mcfedr\QueueManagerBundle\Model\PubSubMessage;
use Mcfedr\QueueManagerBundle\Queue\PubSubJob;
use Mcfedr\QueueManagerBundle\Runner\JobExecutor;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
use Symfony\Component\Routing\Annotation\Route;
use Symfony\Component\Serializer\SerializerInterface;

class PubSubController extends AbstractController
{
public const DEFAULT_AUDIENCE = 'default_audience';
/**
* @var JobExecutor
*/
private $jobExecutor;
/**
* @var AccessToken
*/
private $accessToken;
/**
* @var SerializerInterface
*/
private $serializer;

public function __construct(JobExecutor $jobExecutor, AccessToken $accessToken, SerializerInterface $serializer)
{
$this->jobExecutor = $jobExecutor;
$this->accessToken = $accessToken;
$this->serializer = $serializer;
}

/**
* @Route("/pubsub/{queue}", name="pubsub", methods={"POST"})
*/
public function pubsub(Request $request, string $queue)
{
set_time_limit(0);

$headers = getallheaders();
if (!($auth = $request->headers->get('Authorization')) && isset($headers['Authorization'])) {
$auth = $headers['Authorization'];
}

if (!$auth) {
throw new AccessDeniedHttpException('Authorization header not provided.');
}
$jwt = str_replace('Bearer ', '', $auth);

$payload = $this->accessToken->verify($jwt);
$audience = $this->getParameter("mcfedr_queue_manager.{$queue}.audience");
if (!$payload || !isset($payload['aud']) || ($payload['aud'] !== $audience && self::DEFAULT_AUDIENCE !== $audience)) {
throw new AccessDeniedHttpException('Could not verify token!');
}

$message = $this->serializer->deserialize($request->getContent(), PubSubMessage::class, 'json')->getMessage();

if (!$message || !isset($message['data'])) {
return new Response();
}

/** @var PubSubData $data */
$data = $this->serializer->deserialize(base64_decode($message['data'], true), PubSubData::class, 'json');

try {
$this->jobExecutor->executeJob(new PubSubJob(
$data->getName(),
$data->getArguments(),
null,
$data->getRetryCount()
));
} catch (UnrecoverableJobExceptionInterface $e) {
return new Response();
}

return new Response();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Mcfedr\QueueManagerBundle\DependencyInjection;

use Mcfedr\QueueManagerBundle\Controller\PubSubController;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;

Expand Down Expand Up @@ -70,6 +71,7 @@ public function getConfigTreeBuilder(): TreeBuilder
->children()
->scalarNode('topic')->end()
->scalarNode('subscription')->end()
->scalarNode('audience')->defaultValue(PubSubController::DEFAULT_AUDIENCE)->end()
->end()
->end()
->end()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ private function createManager(ContainerBuilder $container, array $config, strin
$container->setDefinition($pubSubClientName, $pubSubClient);
$bindings[PubSubClient::class.' $pubSubClient'] = new Reference($pubSubClientName);
}
foreach ($mergedOptions['pub_sub_queues'] as $queueName => $pub_sub_queue) {
$container->setParameter("mcfedr_queue_manager.{$queueName}.audience", $pub_sub_queue['audience'] ?? null);
}

break;
}
Expand Down
53 changes: 53 additions & 0 deletions src/Mcfedr/QueueManagerBundle/Model/PubSubData.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

declare(strict_types=1);

namespace Mcfedr\QueueManagerBundle\Model;

class PubSubData
{
/**
* @var string
*/
private $name;

/**
* @var array
*/
private $arguments;

/**
* @var int
*/
private $retryCount;

public function getName(): string
{
return $this->name;
}

public function setName(string $name): void
{
$this->name = $name;
}

public function getArguments(): array
{
return $this->arguments;
}

public function setArguments(array $arguments): void
{
$this->arguments = $arguments;
}

public function getRetryCount(): int
{
return $this->retryCount;
}

public function setRetryCount(int $retryCount): void
{
$this->retryCount = $retryCount;
}
}
25 changes: 25 additions & 0 deletions src/Mcfedr/QueueManagerBundle/Model/PubSubMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Mcfedr\QueueManagerBundle\Model;

class PubSubMessage
{
/**
* @var array
*/
private $message;

public function getMessage(): array
{
return $this->message;
}

public function setMessage(array $message): self
{
$this->message = $message;

return $this;
}
}
2 changes: 1 addition & 1 deletion src/Mcfedr/QueueManagerBundle/Queue/JobBatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,6 @@ public function getRetries(): array

public function getOption(string $option): ?string
{
return isset($this->options[$option]) ? $this->options[$option] : null;
return $this->options[$option] ?? null;
}
}
10 changes: 10 additions & 0 deletions src/Mcfedr/QueueManagerBundle/Resources/config/routes.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8" ?>
<routes xmlns="http://symfony.com/schema/routing"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/routing
https://symfony.com/schema/routing/routing-1.0.xsd">

<route id="pubsub" path="/pubsub/{queue}"
controller="Mcfedr\QueueManagerBundle\Controller\PubSubController::pubsub"
methods="POST"/>
</routes>
12 changes: 12 additions & 0 deletions src/Mcfedr/QueueManagerBundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,15 @@ services:
mcfedr_periodic_queue_driver.worker:
public: true
alias: Mcfedr\QueueManagerBundle\Worker\PeriodicWorker

Google\Auth\AccessToken:
public: true

Mcfedr\QueueManagerBundle\Controller\PubSubController:
public: true
autowire: true
arguments:
$accessToken: '@Google\Auth\AccessToken'
calls:
- setContainer: [ '@Psr\Container\ContainerInterface' ]
tags: [ 'container.service_subscriber' ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

declare(strict_types=1);

namespace Mcfedr\QueueManagerBundle\Tests\Controller;

use Google\Auth\AccessToken;
use Symfony\Bundle\FrameworkBundle\KernelBrowser;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;

/**
* @internal
*/
final class ControllerTest extends WebTestCase
{
/** @var KernelBrowser */
protected $client;

protected function setUp(): void
{
self::ensureKernelShutdown();
$this->client = static::createClient([], ['HTTPS' => true]);
$this->client->followRedirects(false);
}

/**
* @dataProvider queueDataProvider()
*/
public function testController($queue, $audience): void
{
$accessTokenMock = $this->getMockBuilder(AccessToken::class)->onlyMethods(['verify'])->getMock();
$accessTokenMock->method('verify')->willReturn(['aud' => $audience]);
static::$container->set(AccessToken::class, $accessTokenMock);
$this->client->request('POST', "/pubsub/{$queue}", [], [], ['HTTP_AUTHORIZATION' => 'Bearer token'], '{"message":{"data":"eyJuYW1lIjoid29ya2VyX3dpdGhfYV9uYW1lIiwiYXJndW1lbnRzIjpbeyJmb28iOiAiYmFyIn1dLCJyZXRyeUNvdW50IjowfQ==","messageId":"1748167746376305","message_id":"1748167746376305","publishTime":"2020-11-19T15:10:18.439Z","publish_time":"2020-11-19T15:10:18.439Z"},"subscription":"projects/project/subscriptions/subscriptions"}');

static::assertEquals(200, $this->client->getResponse()->getStatusCode());
}

public function testControllerBadAudience(): void
{
$accessTokenMock = $this->getMockBuilder(AccessToken::class)->onlyMethods(['verify'])->getMock();
$accessTokenMock->method('verify')->willReturn(['aud' => 'audience1']);
static::$container->set(AccessToken::class, $accessTokenMock);
$this->client->request('POST', '/pubsub/default', [], [], ['HTTP_AUTHORIZATION' => 'Bearer token'], '{"message":{"data":"eyJuYW1lIjoid29ya2VyX3dpdGhfYV9uYW1lIiwiYXJndW1lbnRzIjpbeyJmb28iOiAiYmFyIn1dLCJyZXRyeUNvdW50IjowfQ==","messageId":"1748167746376305","message_id":"1748167746376305","publishTime":"2020-11-19T15:10:18.439Z","publish_time":"2020-11-19T15:10:18.439Z"},"subscription":"projects/project/subscriptions/subscriptions"}');

static::assertEquals(403, $this->client->getResponse()->getStatusCode());
}

public function testControllerNoAudienceConfigured(): void
{
$accessTokenMock = $this->getMockBuilder(AccessToken::class)->onlyMethods(['verify'])->getMock();
$accessTokenMock->method('verify')->willReturn(['aud' => 'audience1']);
static::$container->set(AccessToken::class, $accessTokenMock);
$this->client->request('POST', '/pubsub/no_audience_queue', [], [], ['HTTP_AUTHORIZATION' => 'Bearer token'], '{"message":{"data":"eyJuYW1lIjoid29ya2VyX3dpdGhfYV9uYW1lIiwiYXJndW1lbnRzIjpbeyJmb28iOiAiYmFyIn1dLCJyZXRyeUNvdW50IjowfQ==","messageId":"1748167746376305","message_id":"1748167746376305","publishTime":"2020-11-19T15:10:18.439Z","publish_time":"2020-11-19T15:10:18.439Z"},"subscription":"projects/project/subscriptions/subscriptions"}');

static::assertEquals(200, $this->client->getResponse()->getStatusCode());
}

public function testControllerNoAudienceFromPubSub(): void
{
$accessTokenMock = $this->getMockBuilder(AccessToken::class)->onlyMethods(['verify'])->getMock();
$accessTokenMock->method('verify')->willReturn(['aud' => 'https://example.com']);
static::$container->set(AccessToken::class, $accessTokenMock);
$this->client->request('POST', '/pubsub/no_audience_queue', [], [], ['HTTP_AUTHORIZATION' => 'Bearer token'], '{"message":{"data":"eyJuYW1lIjoid29ya2VyX3dpdGhfYV9uYW1lIiwiYXJndW1lbnRzIjpbeyJmb28iOiAiYmFyIn1dLCJyZXRyeUNvdW50IjowfQ==","messageId":"1748167746376305","message_id":"1748167746376305","publishTime":"2020-11-19T15:10:18.439Z","publish_time":"2020-11-19T15:10:18.439Z"},"subscription":"projects/project/subscriptions/subscriptions"}');

static::assertEquals(200, $this->client->getResponse()->getStatusCode());
}

public function queueDataProvider()
{
return [
['default', 'audience'],
['other_queue', 'audience1'],
];
}
}
13 changes: 13 additions & 0 deletions tests/config_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ framework:
session:
storage_id: session.storage.mock_file
secret: 'fake_secret'
router:
resource: "%kernel.project_dir%/tests/routing.yml"
strict_requirements: ~
utf8: true

doctrine:
dbal:
Expand Down Expand Up @@ -74,6 +78,14 @@ mcfedr_queue_manager:
default_topic: 'projects/project/topics/test-topic'
pub_sub_queues:
default:
audience: 'audience'
topic: 'projects/project/topics/test-topic'
subscription: 'test_sub'
other_queue:
audience: 'audience1'
topic: 'projects/project/topics/test-topic'
subscription: 'test_sub'
no_audience_queue:
topic: 'projects/project/topics/test-topic'
subscription: 'test_sub'
realgcp:
Expand All @@ -83,6 +95,7 @@ mcfedr_queue_manager:
default_topic: '%env(TEST_TOPIC)%'
pub_sub_queues:
default:
audience: 'audience'
topic: '%env(TEST_TOPIC)%'
subscription: '%env(TEST_SUB)%'

Expand Down
3 changes: 3 additions & 0 deletions tests/routing.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mcfedr_queue_manager:
resource: '@McfedrQueueManagerBundle/Resources/config/routes.xml'

4 changes: 4 additions & 0 deletions tests/routing_anno.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mcfedr_queue_manager:
resource: "@McfedrQueueManagerBundle/Controller/"
type: annotation
prefix: /