Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 92 additions & 20 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Laudis\Neo4j;

use Bolt\error\ConnectException;
use Laudis\Neo4j\Common\DriverSetupManager;
use Laudis\Neo4j\Contracts\ClientInterface;
use Laudis\Neo4j\Contracts\DriverInterface;
Expand All @@ -23,7 +24,7 @@
use Laudis\Neo4j\Databags\Statement;
use Laudis\Neo4j\Databags\SummarizedResult;
use Laudis\Neo4j\Databags\TransactionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Exception\ConnectionPoolException;
use Laudis\Neo4j\Types\CypherList;

/**
Expand Down Expand Up @@ -100,22 +101,84 @@ private function getSession(?string $alias = null): SessionInterface
return $this->boundSessions[$alias] = $this->startSession($alias, $this->defaultSessionConfiguration);
}

/**
* Executes an operation with automatic retry on alternative drivers when connection exceptions occur.
*
* @template T
*
* @param callable(SessionInterface): T $operation The operation to execute
* @param string|null $alias The driver alias to use
*
* @throws ConnectionPoolException When all available drivers have been exhausted
*
* @return T The result of the operation
*/
private function executeWithRetry(callable $operation, ?string $alias = null)
{
$alias ??= $this->driverSetups->getDefaultAlias();
$attemptedDrivers = [];
$lastException = null;

while (true) {
try {
$driver = $this->driverSetups->getDriver($this->defaultSessionConfiguration, $alias);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is sound, but the execution will have a bug. The driver setups will return the same driver every time. We will need to adjust the DriverSetupManager as well. Maybe we can add a reset method to the class for a specific alias, so it verifies the connections again and finds a new driver with a verified connection? If we go that route, we don't need any complex retry logic; catch the exception, reset the drivers in the driver setup manager, and try again.


$driverHash = spl_object_hash($driver);
if (in_array($driverHash, $attemptedDrivers, true)) {
throw $lastException ?? new ConnectionPoolException('No available drivers');
}
$attemptedDrivers[] = $driverHash;

$session = $driver->createSession($this->defaultSessionConfiguration);

return $operation($session);
} catch (ConnectionPoolException|ConnectException $e) {
$lastException = $e;
}
}
}

public function runStatements(iterable $statements, ?string $alias = null): CypherList
{
$runner = $this->getRunner($alias);
if ($runner instanceof SessionInterface) {
return $runner->runStatements($statements, $this->defaultTransactionConfiguration);
$alias ??= $this->driverSetups->getDefaultAlias();

if (array_key_exists($alias, $this->boundTransactions)
&& count($this->boundTransactions[$alias]) > 0) {
$runner = $this->getRunner($alias);
if ($runner instanceof TransactionInterface) {
return $runner->runStatements($statements);
}
}

return $runner->runStatements($statements);
if (array_key_exists($alias, $this->boundSessions)) {
$session = $this->boundSessions[$alias];

return $session->runStatements($statements, $this->defaultTransactionConfiguration);
}

return $this->executeWithRetry(
function (SessionInterface $session) use ($statements) {
return $session->runStatements($statements, $this->defaultTransactionConfiguration);
},
$alias
);
}

public function beginTransaction(?iterable $statements = null, ?string $alias = null, ?TransactionConfiguration $config = null): UnmanagedTransactionInterface
{
$session = $this->getSession($alias);
$alias ??= $this->driverSetups->getDefaultAlias();
$config = $this->getTsxConfig($config);

return $session->beginTransaction($statements, $config);
if (array_key_exists($alias, $this->boundSessions)) {
return $this->boundSessions[$alias]->beginTransaction($statements, $config);
}

return $this->executeWithRetry(
function (SessionInterface $session) use ($statements, $config) {
return $session->beginTransaction($statements, $config);
},
$alias
);
}

public function getDriver(?string $alias): DriverInterface
Expand All @@ -130,27 +193,36 @@ private function startSession(?string $alias, SessionConfiguration $configuratio

public function writeTransaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)
{
$accessMode = $this->defaultSessionConfiguration->getAccessMode();
if ($accessMode === null || $accessMode === AccessMode::WRITE()) {
$session = $this->getSession($alias);
} else {
$sessionConfig = $this->defaultSessionConfiguration->withAccessMode(AccessMode::WRITE());
$session = $this->startSession($alias, $sessionConfig);
$alias ??= $this->driverSetups->getDefaultAlias();
$config = $this->getTsxConfig($config);

if (array_key_exists($alias, $this->boundSessions)) {
return $this->boundSessions[$alias]->writeTransaction($tsxHandler, $config);
}

return $session->writeTransaction($tsxHandler, $this->getTsxConfig($config));
return $this->executeWithRetry(
function (SessionInterface $session) use ($tsxHandler, $config) {
return $session->writeTransaction($tsxHandler, $config);
},
$alias
);
}

public function readTransaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)
{
if ($this->defaultSessionConfiguration->getAccessMode() === AccessMode::READ()) {
$session = $this->getSession($alias);
} else {
$sessionConfig = $this->defaultSessionConfiguration->withAccessMode(AccessMode::WRITE());
$session = $this->startSession($alias, $sessionConfig);
$alias ??= $this->driverSetups->getDefaultAlias();
$config = $this->getTsxConfig($config);

if (array_key_exists($alias, $this->boundSessions)) {
return $this->boundSessions[$alias]->readTransaction($tsxHandler, $config);
}

return $session->readTransaction($tsxHandler, $this->getTsxConfig($config));
return $this->executeWithRetry(
function (SessionInterface $session) use ($tsxHandler, $config) {
return $session->readTransaction($tsxHandler, $config);
},
$alias
);
}

public function transaction(callable $tsxHandler, ?string $alias = null, ?TransactionConfiguration $config = null)
Expand Down
6 changes: 2 additions & 4 deletions src/Neo4j/Neo4jConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Enum\RoutingRoles;
use Laudis\Neo4j\Exception\ConnectionPoolException;
use Psr\Http\Message\UriInterface;
use Psr\Log\LogLevel;
use Psr\SimpleCache\CacheInterface;

use function random_int;

use RuntimeException;

use function sprintf;
use function str_replace;
use function time;
Expand Down Expand Up @@ -165,7 +163,7 @@ public function acquire(SessionConfiguration $config): Generator
}

if ($table === null) {
throw new RuntimeException(sprintf('Cannot connect to host: "%s". Hosts tried: "%s"', $this->data->getUri()->getHost(), implode('", "', $triedAddresses)), previous: $latestError);
throw new ConnectionPoolException(sprintf('Cannot connect to host: "%s". Hosts tried: "%s"', $this->data->getUri()->getHost(), implode('", "', $triedAddresses)), previous: $latestError);
}

$server = $this->getNextServer($table, $config->getAccessMode());
Expand Down
3 changes: 2 additions & 1 deletion src/Neo4j/Neo4jDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Exception\ConnectionPoolException;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Psr\Http\Message\UriInterface;
use Psr\Log\LogLevel;
Expand Down Expand Up @@ -92,7 +93,7 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
$config ??= SessionConfiguration::default();
try {
GeneratorHelper::getReturnFromGenerator($this->pool->acquire($config));
} catch (ConnectException $e) {
} catch (ConnectException|ConnectionPoolException $e) {
$this->pool->getLogger()?->log(LogLevel::WARNING, 'Could not connect to server on URI '.$this->parsedUrl->__toString(), ['error' => $e]);

return false;
Expand Down
122 changes: 122 additions & 0 deletions tests/Unit/ClientExceptionHandlingTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Neo4j PHP Client and Driver package.
*
* (c) Nagels <https://nagels.tech>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Laudis\Neo4j\Tests\Unit;

use Laudis\Neo4j\Client;
use Laudis\Neo4j\Common\DriverSetupManager;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Databags\TransactionConfiguration;
use PHPUnit\Framework\TestCase;
use RuntimeException;

final class ClientExceptionHandlingTest extends TestCase
{
public function testClientRunStatementWithFailingDriver(): void
{
$driverSetupManager = $this->createMock(DriverSetupManager::class);
$sessionConfig = SessionConfiguration::default();
$transactionConfig = TransactionConfiguration::default();

$driverSetupManager->method('getDriver')
->willThrowException(new RuntimeException(
'Cannot connect to any server on alias: default with Uris: (\'neo4j://node1:7687\', \'neo4j://node2:7687\')'
));

$client = new Client($driverSetupManager, $sessionConfig, $transactionConfig);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Cannot connect to any server on alias: default');
$client->run('RETURN 1 as n');
}

public function testClientWriteTransactionWithFailingDriver(): void
{
$driverSetupManager = $this->createMock(DriverSetupManager::class);
$sessionConfig = SessionConfiguration::default();
$transactionConfig = TransactionConfiguration::default();

$driverSetupManager->method('getDriver')
->willThrowException(new RuntimeException(
'Cannot connect to any server on alias: default with Uris: (\'neo4j://node1:7687\', \'neo4j://node2:7687\')'
));

$client = new Client($driverSetupManager, $sessionConfig, $transactionConfig);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Cannot connect to any server');

$client->writeTransaction(function () {
return 'test';
});
}

public function testClientReadTransactionWithFailingDriver(): void
{
$driverSetupManager = $this->createMock(DriverSetupManager::class);
$sessionConfig = SessionConfiguration::default();
$transactionConfig = TransactionConfiguration::default();

$driverSetupManager->method('getDriver')
->willThrowException(new RuntimeException(
'Cannot connect to any server on alias: default with Uris: (\'neo4j://node1:7687\', \'neo4j://node2:7687\')'
));

$client = new Client($driverSetupManager, $sessionConfig, $transactionConfig);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Cannot connect to any server');

$client->readTransaction(function () {
return 'test';
});
}

public function testClientBeginTransactionWithFailingDriver(): void
{
$driverSetupManager = $this->createMock(DriverSetupManager::class);
$sessionConfig = SessionConfiguration::default();
$transactionConfig = TransactionConfiguration::default();

$driverSetupManager->method('getDriver')
->willThrowException(new RuntimeException(
'Cannot connect to any server on alias: default with Uris: (\'neo4j://node1:7687\', \'neo4j://node2:7687\')'
));

$client = new Client($driverSetupManager, $sessionConfig, $transactionConfig);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Cannot connect to any server');

$client->beginTransaction();
}

public function testClientExceptionIncludesFailedAliasInfo(): void
{
$driverSetupManager = $this->createMock(DriverSetupManager::class);
$sessionConfig = SessionConfiguration::default();
$transactionConfig = TransactionConfiguration::default();

$driverSetupManager->method('getDriver')
->willThrowException(new RuntimeException(
'Cannot connect to any server on alias: secondary with Uris: (\'neo4j://node4:7687\', \'neo4j://node5:7687\')'
));

$client = new Client($driverSetupManager, $sessionConfig, $transactionConfig);

$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('Cannot connect to any server on alias: secondary');

$client->run('RETURN 1 as n', [], 'secondary');
}
}
Loading
Loading