Skip to content
Open
6 changes: 6 additions & 0 deletions src/Basic/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Laudis\Neo4j\Contracts\AuthenticateInterface;
use Laudis\Neo4j\Contracts\DriverInterface;
use Laudis\Neo4j\Databags\DriverConfiguration;
use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\DriverFactory;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
Expand Down Expand Up @@ -44,6 +45,11 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
return $this->driver->verifyConnectivity($config);
}

public function getServerInfo(?SessionConfiguration $config = null): ServerInfo
{
return $this->driver->getServerInfo($config);
}

public static function create(string|UriInterface $uri, ?DriverConfiguration $configuration = null, ?AuthenticateInterface $authenticate = null): self
{
$driver = DriverFactory::create($uri, $configuration, $authenticate, SummarizedResultFormatter::create());
Expand Down
49 changes: 37 additions & 12 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class BoltConnection implements ConnectionInterface
*/
private array $subscribedResults = [];
private bool $inTransaction = false;
/** @var array<string, bool> Track if this connection was ever used for a query */
private array $connectionUsed = [
'reader' => false,
'writer' => false,
];

/**
* @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}
Expand Down Expand Up @@ -199,13 +204,6 @@ public function reset(): void
$this->subscribedResults = [];
}

private function prepareForBegin(): void
{
if (in_array($this->getServerState(), ['STREAMING', 'TX_STREAMING'], true)) {
$this->discardUnconsumedResults();
}
}

/**
* Begins a transaction.
*
Expand Down Expand Up @@ -254,6 +252,12 @@ public function run(
?AccessMode $mode,
?iterable $tsxMetadata,
): array {
if ($mode === AccessMode::WRITE()) {
$this->connectionUsed['writer'] = true;
} else {
$this->connectionUsed['reader'] = true;
}

if ($this->isInTransaction()) {
$extra = [];
} else {
Expand Down Expand Up @@ -323,15 +327,28 @@ public function close(): void
{
try {
if ($this->isOpen()) {
if ($this->isStreaming()) {
if ($this->isStreaming() && (($this->connectionUsed['reader'] ?? false) || ($this->connectionUsed['writer'] ?? false))) {
$this->discardUnconsumedResults();
}
$message = $this->messageFactory->createGoodbyeMessage();
$message->send();

unset($this->boltProtocol); // has to be set to null as the sockets don't recover nicely contrary to what the underlying code might lead you to believe;
if (($this->connectionUsed['reader'] ?? false) || ($this->connectionUsed['writer'] ?? false)) {
try {
$message = $this->messageFactory->createGoodbyeMessage();
$message->send();
} catch (Throwable $e) {
$this->logger?->log(LogLevel::DEBUG, 'Failed to send GOODBYE message during connection close', [
'error' => $e->getMessage(),
'connection' => $this->getServerAddress()->__toString(),
]);
}
}

unset($this->boltProtocol);
}
} catch (Throwable) {
} catch (Throwable $e) {
$this->logger?->log(LogLevel::DEBUG, 'Error during connection close', [
'error' => $e->getMessage(),
]);
}
}

Expand Down Expand Up @@ -437,6 +454,7 @@ public function assertNoFailure(Response $response): void
public function discardUnconsumedResults(): void
{
$this->logger?->log(LogLevel::DEBUG, 'Discarding unconsumed results');

$this->subscribedResults = array_values(array_filter(
$this->subscribedResults,
static fn (WeakReference $ref): bool => $ref->get() !== null
Expand All @@ -451,6 +469,13 @@ public function discardUnconsumedResults(): void
$state = $this->getServerState();
$this->logger?->log(LogLevel::DEBUG, "Server state before discard: {$state}");

if (!($this->connectionUsed['reader'] ?? false) && !($this->connectionUsed['writer'] ?? false)) {
$this->logger?->log(LogLevel::DEBUG, 'Skipping discard - connection never used');
$this->subscribedResults = [];

return;
}

try {
if (in_array($state, ['STREAMING', 'TX_STREAMING'], true)) {
$this->discard(null);
Expand Down
33 changes: 33 additions & 0 deletions src/Bolt/BoltDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
use Laudis\Neo4j\Contracts\DriverInterface;
use Laudis\Neo4j\Contracts\SessionInterface;
use Laudis\Neo4j\Databags\DriverConfiguration;
use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Psr\Http\Message\UriInterface;
use Psr\Log\LogLevel;
Expand Down Expand Up @@ -97,6 +99,37 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
return true;
}

/**
* Gets server information without running a query.
*
* Acquires a connection from the pool and extracts server metadata.
* The pool handles all connection management, routing, and retries.
*
* @throws Exception if unable to acquire a connection
*/
public function getServerInfo(?SessionConfiguration $config = null): ServerInfo
{
$config ??= SessionConfiguration::default()->withAccessMode(AccessMode::READ());

$connectionGenerator = $this->pool->acquire($config);
/**
* @var BoltConnection $connection
*
* @psalm-suppress UnnecessaryVarAnnotation
*/
$connection = GeneratorHelper::getReturnFromGenerator($connectionGenerator);

try {
return new ServerInfo(
$connection->getServerAddress(),
$connection->getProtocol(),
$connection->getServerAgent()
);
} finally {
$this->pool->release($connection);
}
}

public function closeConnections(): void
{
$this->pool->close();
Expand Down
2 changes: 0 additions & 2 deletions src/Bolt/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ private function acquireConnection(TransactionConfiguration $config, SessionConf
*/
$connection = GeneratorHelper::getReturnFromGenerator($connectionGenerator);

// We try and let the server do the timeout management.
// Since the client should not run indefinitely, we just add the client side by two, just in case
$timeout = $config->getTimeout();
if ($timeout !== null) {
$timeout = ($timeout < 30) ? 30 : $timeout;
Expand Down
9 changes: 9 additions & 0 deletions src/Contracts/DriverInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Laudis\Neo4j\Contracts;

use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Types\CypherList;
use Laudis\Neo4j\Types\CypherMap;
Expand All @@ -35,6 +36,14 @@ public function createSession(?SessionConfiguration $config = null): SessionInte
*/
public function verifyConnectivity(?SessionConfiguration $config = null): bool;

/**
* Gets server information without running a query.
*
* Acquires a connection from the pool and extracts server metadata.
* The pool handles all connection management, routing, and retries.
*/
public function getServerInfo(?SessionConfiguration $config = null): ServerInfo;

/**
* Closes all connections in the pool.
*/
Expand Down
18 changes: 18 additions & 0 deletions src/Databags/SessionConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,27 @@ public function __construct(
private readonly ?AccessMode $accessMode = null,
private readonly ?array $bookmarks = null,
private readonly ?Neo4jLogger $logger = null,
private readonly ?string $impersonatedUser = null,
) {
}

public function withImpersonatedUser(?string $user): self
{
return new self(
$this->database,
$this->fetchSize,
$this->accessMode,
$this->bookmarks,
$this->logger,
$user
);
}

public function getImpersonatedUser(): ?string
{
return $this->impersonatedUser;
}

/**
* @pure
*
Expand Down
14 changes: 14 additions & 0 deletions src/Neo4j/Neo4jConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ private function getNextServer(RoutingTable $table, AccessMode $mode): Uri
$servers = $table->getWithRole(RoutingRoles::FOLLOWER());
}

if (empty($servers)) {
throw new RuntimeException('No available servers found for the requested access mode');
}

return Uri::create($servers[random_int(0, count($servers) - 1)]);
}

Expand Down Expand Up @@ -259,6 +263,16 @@ public function close(): void
$this->cache->clear();
}

/**
* Forces a routing table refresh for the given configuration.
* This will cause the next acquire() call to fetch a fresh routing table.
*/
public function refreshRoutingTable(SessionConfiguration $config): void
{
$key = $this->createKey($this->data, $config);
$this->cache->delete($key);
}

/**
* @return Generator<string>
*/
Expand Down
38 changes: 38 additions & 0 deletions src/Neo4j/Neo4jDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use function is_string;

use Laudis\Neo4j\Authentication\Authenticate;
use Laudis\Neo4j\Bolt\BoltConnection;
use Laudis\Neo4j\Bolt\Session;
use Laudis\Neo4j\Common\DNSAddressResolver;
use Laudis\Neo4j\Common\GeneratorHelper;
Expand All @@ -28,7 +29,9 @@
use Laudis\Neo4j\Contracts\DriverInterface;
use Laudis\Neo4j\Contracts\SessionInterface;
use Laudis\Neo4j\Databags\DriverConfiguration;
use Laudis\Neo4j\Databags\ServerInfo;
use Laudis\Neo4j\Databags\SessionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Psr\Http\Message\UriInterface;
use Psr\Log\LogLevel;
Expand Down Expand Up @@ -75,6 +78,8 @@ public static function create(string|UriInterface $uri, ?DriverConfiguration $co
/**
* @psalm-mutation-free
*
* @psalm-suppress UnnecessaryVarAnnotation
*
* @throws Exception
*/
public function createSession(?SessionConfiguration $config = null): SessionInterface
Expand All @@ -99,6 +104,39 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool
return true;
}

/**
* Gets server information without running a query.
*
* Acquires a connection from the pool and extracts server metadata.
* The pool handles all connection management, routing, and retries.
*
* @throws Exception if unable to acquire a connection
*/
public function getServerInfo(?SessionConfiguration $config = null): ServerInfo
{
$config ??= SessionConfiguration::default()->withAccessMode(AccessMode::READ());

$this->pool->refreshRoutingTable($config);

$connectionGenerator = $this->pool->acquire($config);
/**
* @var BoltConnection $connection
*
* @psalm-suppress UnnecessaryVarAnnotation
*/
$connection = GeneratorHelper::getReturnFromGenerator($connectionGenerator);

try {
return new ServerInfo(
$connection->getServerAddress(),
$connection->getProtocol(),
$connection->getServerAgent()
);
} finally {
$this->pool->release($connection);
}
}

public function closeConnections(): void
{
$this->pool->close();
Expand Down
1 change: 1 addition & 0 deletions testkit
Submodule testkit added at 905752
2 changes: 2 additions & 0 deletions testkit-backend/src/Handlers/DriverClose.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public function __construct(MainRepository $repository)
*/
public function handle($request): DriverResponse
{
$driver = $this->repository->getDriver($request->getDriverId());
$driver->closeConnections();
$this->repository->removeDriver($request->getDriverId());

return new DriverResponse($request->getDriverId());
Expand Down
Loading
Loading