Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 0 additions & 51 deletions .github/workflows/changelog.yml

This file was deleted.

25 changes: 0 additions & 25 deletions .github/workflows/code-style.yml

This file was deleted.

44 changes: 0 additions & 44 deletions .github/workflows/documentation.yml

This file was deleted.

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ phpDocumentor.phar
.phpactor.json
.phpunit.result.cache
.phpdoc
.idea
101 changes: 66 additions & 35 deletions PhpAmqpLib/Wire/IO/SwooleIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ class SwooleIO extends AbstractIO
* @param int $port
* @param float $connection_timeout
* @param float $read_write_timeout
* @param null $context
* @param mixed $context
* @param bool $keepalive
* @param int $heartbeat
*/
public function __construct(
$host,
$port,
$connection_timeout,
$read_write_timeout,
$context = null,
$keepalive = false,
$heartbeat = 0
string $host,
int $port,
float $connection_timeout,
float $read_write_timeout,
mixed $context = null,
bool $keepalive = false,
int $heartbeat = 0
) {
/*
TODO FUTURE enable this check
Expand All @@ -58,8 +58,11 @@ public function __construct(
* @throws AMQPRuntimeException
* @throws AMQPIOException
*/
public function connect()
public function connect(): void
{
// Close any existing connection to prevent resource leaks
$this->close();

$this->sock = new Client(SWOOLE_SOCK_TCP);

// Set socket options before connecting
Expand All @@ -71,9 +74,12 @@ public function connect()
'open_tcp_nodelay' => true,
'tcp_keepalive' => $this->keepalive,
'package_max_length' => 2 * 1024 * 1024, // 2MB max package
'socket_buffer_size' => 2 * 1024 * 1024,
'buffer_output_size' => 2 * 1024 * 1024,
]);

if (!$this->sock->connect($this->host, $this->port)) {
$this->close();
throw new AMQPIOException(
sprintf(
'Error Connecting to server(%s): %s ',
Expand All @@ -93,7 +99,7 @@ public function connect()
* @throws AMQPTimeoutException
* @throws AMQPConnectionClosedException
*/
public function read($len)
public function read($len): string
{
if ($this->sock === null) {
throw new AMQPConnectionClosedException('Socket connection is closed');
Expand All @@ -115,27 +121,28 @@ public function read($len)
// Read remaining bytes from socket
while ($remaining > 0) {
if (!$this->sock->connected) {
$this->close();
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}

// Swoole recv() returns false on error, empty string on EOF
$chunk = $this->sock->recv($remaining, $this->read_timeout);

if ($chunk === false) {
if ($chunk === '' || $chunk === false) {
$this->close();
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
throw new AMQPTimeoutException('Read timeout');
}
throw new AMQPIOException(
sprintf('Error receiving data: %s', swoole_strerror($this->sock->errCode)),
$this->sock->errCode
);
}

if ($chunk === '') {
if ($this->sock->errCode !== 0) {
throw new AMQPIOException(
sprintf('Error receiving data: %s', swoole_strerror($this->sock->errCode)),
$this->sock->errCode
);
}
throw new AMQPConnectionClosedException('Connection closed by peer');
}

$data .= $chunk;
$data .= $chunk;
$remaining -= strlen($chunk);
}

Expand All @@ -148,30 +155,44 @@ public function read($len)
* @throws AMQPRuntimeException
* @throws AMQPTimeoutException
* @throws AMQPConnectionClosedException
* @throws AMQPIOException
*/
public function write($data)
public function write($data): void
{
if ($this->sock === null || !$this->sock->connected) {
$this->close();
throw new AMQPConnectionClosedException('Socket connection is closed');
}

$this->checkBrokerHeartbeat();

// Swoole send() handles partial writes internally
$result = $this->sock->send($data);
$totalLength = strlen($data);
$offset = 0;

if ($result === false) {
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
throw new AMQPTimeoutException('Write timeout');
while ($offset < $totalLength) {
// Send remaining bytes; avoid extra substr() call for the first chunk
$chunk = $offset === 0 ? $data : substr($data, $offset);

$sent = $this->sock->send($chunk);

if ($sent === false) {
$this->close();
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
throw new AMQPTimeoutException('Write timeout');
}
throw new AMQPIOException(
sprintf('Error sending data: %s', swoole_strerror($this->sock->errCode)),
$this->sock->errCode
);
}
throw new AMQPIOException(
sprintf('Error sending data: %s', swoole_strerror($this->sock->errCode)),
$this->sock->errCode
);
}

if ($result !== strlen($data)) {
throw new AMQPIOException('Could not write entire buffer');
if ($sent === 0) {
// Peer closed the connection
$this->close();
throw new AMQPConnectionClosedException('Connection closed by peer while writing');
}

$offset += $sent;
}

$this->last_write = microtime(true);
Expand All @@ -180,7 +201,7 @@ public function write($data)
/**
* @return void
*/
public function close()
public function close(): void
{
if ($this->sock !== null && $this->sock->connected) {
$this->sock->close();
Expand All @@ -191,13 +212,21 @@ public function close()
$this->buffer = '';
}

/**
* Ensure the socket is closed when the object is destroyed.
*/
public function __destruct()
{
$this->close();
}

/**
* @param int|null $sec
* @param int $usec
* @return int|bool
* @throws AMQPConnectionClosedException
*/
protected function do_select(?int $sec, int $usec)
protected function do_select(?int $sec, int $usec): bool|int
{
if ($this->sock === null || !$this->sock->connected) {
throw new AMQPConnectionClosedException('Socket connection is closed');
Expand All @@ -222,12 +251,14 @@ protected function do_select(?int $sec, int $usec)
}
// Connection error
if ($this->sock->errCode == SOCKET_ECONNRESET || !$this->sock->connected) {
$this->close();
throw new AMQPConnectionClosedException('Connection reset by peer');
}
return false; // Other error
}

if ($data === '') {
$this->close();
throw new AMQPConnectionClosedException('Connection closed by peer');
}

Expand All @@ -239,7 +270,7 @@ protected function do_select(?int $sec, int $usec)
/**
* @return Client|null
*/
public function getSocket()
public function getSocket(): ?Client
{
return $this->sock;
}
Expand Down