Skip to content

Commit 511ca5f

Browse files
committed
Removed burden from Connectors to implement retry().
This major change moves retry() from ConnectionContext into ImportConnector, supervising the retreies from within Porter and removing the burden from Connector and AsyncConnector implementations to implement this feature themselves.
1 parent 1e0599b commit 511ca5f

24 files changed

+330
-343
lines changed

src/Connector/AsyncConnector.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
namespace ScriptFUSION\Porter\Connector;
55

6-
use Amp\Promise;
7-
86
/**
97
* Provides a method for fetching data from a remote source asynchronously.
108
*/
@@ -16,7 +14,7 @@ interface AsyncConnector
1614
* @param ConnectionContext $context Runtime connection settings and methods.
1715
* @param string $source Source.
1816
*
19-
* @return Promise Data.
17+
* @return \Closure Closure that returns a Promise or raw data.
2018
*/
21-
public function fetchAsync(ConnectionContext $context, string $source): Promise;
19+
public function fetchAsync(ConnectionContext $context, string $source): \Closure;
2220
}

src/Connector/CachingConnector.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
/**
1313
* Wraps a connector to cache fetched data using PSR-6-compliant objects.
14+
*
15+
* TODO: Async support
1416
*/
1517
class CachingConnector implements Connector, ConnectorWrapper
1618
{

src/Connector/ConnectionContext.php

Lines changed: 2 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -3,39 +3,16 @@
33

44
namespace ScriptFUSION\Porter\Connector;
55

6-
use Amp\Coroutine;
7-
use Amp\Promise;
8-
use ScriptFUSION\Porter\Connector\FetchExceptionHandler\FetchExceptionHandler;
9-
use ScriptFUSION\Porter\Connector\FetchExceptionHandler\StatelessFetchExceptionHandler;
10-
116
/**
12-
* Specifies runtime connection settings and provides utility methods.
7+
* Specifies runtime connection options.
138
*/
149
final class ConnectionContext
1510
{
1611
private $mustCache;
1712

18-
/**
19-
* User-defined exception handler called when a recoverable exception is thrown by Connector::fetch().
20-
*
21-
* @var FetchExceptionHandler
22-
*/
23-
private $fetchExceptionHandler;
24-
25-
/**
26-
* Resource-defined exception handler called when a recoverable exception is thrown by Connector::fetch().
27-
*
28-
* @var FetchExceptionHandler
29-
*/
30-
private $resourceFetchExceptionHandler;
31-
32-
private $maxFetchAttempts;
33-
34-
public function __construct(bool $mustCache, FetchExceptionHandler $fetchExceptionHandler, int $maxFetchAttempts)
13+
public function __construct(bool $mustCache)
3514
{
3615
$this->mustCache = $mustCache;
37-
$this->fetchExceptionHandler = $fetchExceptionHandler;
38-
$this->maxFetchAttempts = $maxFetchAttempts;
3916
}
4017

4118
/**
@@ -47,95 +24,4 @@ public function mustCache(): bool
4724
{
4825
return $this->mustCache;
4926
}
50-
51-
/**
52-
* Retries the specified callback up to a predefined number of times. If it throws a recoverable exception, the
53-
* resource fetch exception handler is invoked, if set, and then the predefined fetch exception handler.
54-
*
55-
* @param callable $callback Callback.
56-
*
57-
* @return mixed The result of the callback invocation.
58-
*/
59-
public function retry(callable $callback)
60-
{
61-
return \ScriptFUSION\Retry\retry(
62-
$this->maxFetchAttempts,
63-
$callback,
64-
$this->createExceptionHandler()
65-
);
66-
}
67-
68-
/**
69-
* Closes over the specified async generator function with a static factory method that invokes it as a coroutine
70-
* and retries it up to the predefined number of fetch attempts. If it throws a recoverable exception, the resource
71-
* fetch exception handler is invoked, if set, and then the predefined fetch exception handler.
72-
*
73-
* @param \Closure $asyncGenerator Async generator function.
74-
*
75-
* @return Promise The result returned by the async function.
76-
*/
77-
public function retryAsync(\Closure $asyncGenerator): Promise
78-
{
79-
return \ScriptFUSION\Retry\retryAsync(
80-
$this->maxFetchAttempts,
81-
static function () use ($asyncGenerator): Coroutine {
82-
return new Coroutine($asyncGenerator());
83-
},
84-
$this->createExceptionHandler()
85-
);
86-
}
87-
88-
private function createExceptionHandler(): \Closure
89-
{
90-
$userHandlerCloned = $providerHandlerCloned = false;
91-
92-
return function (\Exception $exception) use (&$userHandlerCloned, &$providerHandlerCloned): void {
93-
// Throw exception instead of retrying, if unrecoverable.
94-
if (!$exception instanceof RecoverableConnectorException) {
95-
throw $exception;
96-
}
97-
98-
// Call provider's exception handler, if defined.
99-
if ($this->resourceFetchExceptionHandler) {
100-
self::invokeHandler($this->resourceFetchExceptionHandler, $exception, $providerHandlerCloned);
101-
}
102-
103-
// Call user's exception handler.
104-
self::invokeHandler($this->fetchExceptionHandler, $exception, $userHandlerCloned);
105-
};
106-
}
107-
108-
/**
109-
* Invokes the specified fetch exception handler, cloning it if required.
110-
*
111-
* @param FetchExceptionHandler $handler Fetch exception handler.
112-
* @param \Exception $exception Exception to pass to the handler.
113-
* @param bool $cloned False if handler requires cloning, true if handler has already been cloned.
114-
*/
115-
private static function invokeHandler(FetchExceptionHandler &$handler, \Exception $exception, bool &$cloned): void
116-
{
117-
if (!$cloned && !$handler instanceof StatelessFetchExceptionHandler) {
118-
$handler = clone $handler;
119-
$handler->initialize();
120-
$cloned = true;
121-
}
122-
123-
$handler($exception);
124-
}
125-
126-
/**
127-
* Sets an exception handler to be called when a recoverable exception is thrown by Connector::fetch().
128-
*
129-
* The handler is intended to be set by provider resources only once and is called before the user-defined handler.
130-
*
131-
* @param FetchExceptionHandler $resourceFetchExceptionHandler Exception handler.
132-
*/
133-
public function setResourceFetchExceptionHandler(FetchExceptionHandler $resourceFetchExceptionHandler): void
134-
{
135-
if ($this->resourceFetchExceptionHandler !== null) {
136-
throw new \LogicException('Cannot set resource fetch exception handler: already set!');
137-
}
138-
139-
$this->resourceFetchExceptionHandler = $resourceFetchExceptionHandler;
140-
}
14127
}

src/Connector/ConnectionContextFactory.php

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/Connector/ImportConnector.php

Lines changed: 102 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,123 @@
55

66
use Amp\Promise;
77
use ScriptFUSION\Porter\Cache\CacheUnavailableException;
8-
use ScriptFUSION\Porter\Connector\FetchExceptionHandler\FetchExceptionHandler;
8+
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableConnectorException;
9+
use ScriptFUSION\Porter\Connector\Recoverable\RecoverableExceptionHandler;
10+
use ScriptFUSION\Porter\Connector\Recoverable\StatelessRecoverableExceptionHandler;
911

1012
/**
1113
* Connector whose lifecycle is synchronised with an import operation. Ensures correct ConnectionContext is delivered
1214
* to the wrapped connector.
1315
*
1416
* Do not store references to this connector that would prevent it expiring when an import operation ends.
17+
*
18+
* @internal Do not create instances of this class in client code.
1519
*/
1620
final class ImportConnector implements ConnectorWrapper
1721
{
1822
private $connector;
1923

20-
private $context;
24+
private $connectionContext;
25+
26+
/**
27+
* User-defined exception handler called when a recoverable exception is thrown by Connector::fetch().
28+
*
29+
* @var RecoverableExceptionHandler
30+
*/
31+
private $userReh;
32+
33+
/**
34+
* Resource-defined exception handler called when a recoverable exception is thrown by Connector::fetch().
35+
*
36+
* @var RecoverableExceptionHandler
37+
*/
38+
private $resourceReh;
39+
40+
private $maxFetchAttempts;
2141

2242
/**
2343
* @param Connector|AsyncConnector $connector Wrapped connector.
24-
* @param ConnectionContext $context Connection context.
44+
* @param ConnectionContext $connectionContext Connection context.
45+
* @param RecoverableExceptionHandler $recoverableExceptionHandler
46+
* @param int $maxFetchAttempts
2547
*/
26-
public function __construct($connector, ConnectionContext $context)
27-
{
28-
if ($context->mustCache() && !$connector instanceof CachingConnector) {
48+
public function __construct(
49+
$connector,
50+
ConnectionContext $connectionContext,
51+
RecoverableExceptionHandler $recoverableExceptionHandler,
52+
int $maxFetchAttempts
53+
) {
54+
if ($connectionContext->mustCache() && !$connector instanceof CachingConnector) {
2955
throw CacheUnavailableException::createUnsupported();
3056
}
3157

3258
$this->connector = clone $connector;
33-
$this->context = $context;
59+
$this->connectionContext = $connectionContext;
60+
$this->userReh = $recoverableExceptionHandler;
61+
$this->maxFetchAttempts = $maxFetchAttempts;
3462
}
3563

36-
public function fetch($source)
64+
public function fetch(string $source)
3765
{
38-
return $this->connector->fetch($this->context, $source);
66+
return \ScriptFUSION\Retry\retry(
67+
$this->maxFetchAttempts,
68+
function () use ($source) {
69+
return $this->connector->fetch($this->connectionContext, $source);
70+
},
71+
$this->createExceptionHandler()
72+
);
3973
}
4074

4175
public function fetchAsync(string $source): Promise
4276
{
43-
return $this->connector->fetchAsync($this->context, $source);
77+
return \ScriptFUSION\Retry\retryAsync(
78+
$this->maxFetchAttempts,
79+
function () use ($source): Promise {
80+
return \Amp\call($this->connector->fetchAsync($this->connectionContext, $source));
81+
},
82+
$this->createExceptionHandler()
83+
);
84+
}
85+
86+
private function createExceptionHandler(): \Closure
87+
{
88+
$userHandlerCloned = $resourceHandlerCloned = false;
89+
90+
return function (\Exception $exception) use (&$userHandlerCloned, &$resourceHandlerCloned): void {
91+
// Throw exception instead of retrying, if unrecoverable.
92+
if (!$exception instanceof RecoverableConnectorException) {
93+
throw $exception;
94+
}
95+
96+
// Call resource's exception handler, if defined.
97+
if ($this->resourceReh) {
98+
self::invokeHandler($this->resourceReh, $exception, $resourceHandlerCloned);
99+
}
100+
101+
// Call user's exception handler.
102+
self::invokeHandler($this->userReh, $exception, $userHandlerCloned);
103+
};
104+
}
105+
106+
/**
107+
* Invokes the specified fetch exception handler, cloning it if required.
108+
*
109+
* @param RecoverableExceptionHandler $handler Fetch exception handler.
110+
* @param \Exception $exception Exception to pass to the handler.
111+
* @param bool $cloned False if handler requires cloning, true if handler has already been cloned.
112+
*/
113+
private static function invokeHandler(
114+
RecoverableExceptionHandler &$handler,
115+
\Exception $exception,
116+
bool &$cloned
117+
): void {
118+
if (!$cloned && !$handler instanceof StatelessRecoverableExceptionHandler) {
119+
$handler = clone $handler;
120+
$handler->initialize();
121+
$cloned = true;
122+
}
123+
124+
$handler($exception);
44125
}
45126

46127
/**
@@ -70,12 +151,19 @@ public function findBaseConnector()
70151
}
71152

72153
/**
73-
* Sets the exception handler to be called when a recoverable exception is thrown by Connector::fetch().
154+
* Sets an exception handler to be called when a recoverable exception is thrown by Connector::fetch().
74155
*
75-
* @param FetchExceptionHandler $exceptionHandler Fetch exception handler.
156+
* The handler is intended to be set by provider resources, once only, and is called before the user-defined
157+
* handler.
158+
*
159+
* @param RecoverableExceptionHandler $recoverableExceptionHandler Recoverable exception handler.
76160
*/
77-
public function setExceptionHandler(FetchExceptionHandler $exceptionHandler): void
161+
public function setRecoverableExceptionHandler(RecoverableExceptionHandler $recoverableExceptionHandler): void
78162
{
79-
$this->context->setResourceFetchExceptionHandler($exceptionHandler);
163+
if ($this->resourceReh !== null) {
164+
throw new \LogicException('Cannot set resource\'s recoverable exception handler: already set!');
165+
}
166+
167+
$this->resourceReh = $recoverableExceptionHandler;
80168
}
81169
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Connector;
5+
6+
use ScriptFUSION\Porter\Specification\ImportSpecification;
7+
use ScriptFUSION\StaticClass;
8+
9+
final class ImportConnectorFactory
10+
{
11+
use StaticClass;
12+
13+
/**
14+
* @param Connector|AsyncConnector $connector
15+
* @param ImportSpecification $specification
16+
*
17+
* @return ImportConnector
18+
*/
19+
public static function create($connector, ImportSpecification $specification): ImportConnector
20+
{
21+
return new ImportConnector(
22+
$connector,
23+
new ConnectionContext($specification->mustCache()),
24+
$specification->getRecoverableExceptionHandler(),
25+
$specification->getMaxFetchAttempts()
26+
);
27+
}
28+
}

src/Connector/FetchExceptionHandler/ExponentialSleepFetchExceptionHandler.php renamed to src/Connector/Recoverable/ExponentialSleepRecoverableExceptionHandler.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
<?php
22
declare(strict_types=1);
33

4-
namespace ScriptFUSION\Porter\Connector\FetchExceptionHandler;
4+
namespace ScriptFUSION\Porter\Connector\Recoverable;
55

66
use ScriptFUSION\Retry\ExceptionHandler\ExponentialBackoffExceptionHandler;
77

88
/**
99
* Sleeps for an exponentially increasing series of delays specified in microseconds.
1010
*/
11-
class ExponentialSleepFetchExceptionHandler implements FetchExceptionHandler
11+
class ExponentialSleepRecoverableExceptionHandler implements RecoverableExceptionHandler
1212
{
1313
private $initialDelay;
1414

0 commit comments

Comments
 (0)