diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml deleted file mode 100644 index 6fc7999a..00000000 --- a/.github/workflows/changelog.yml +++ /dev/null @@ -1,51 +0,0 @@ -name: Update changelog -on: - workflow_dispatch: - release: - types: - - published -jobs: - run: - name: Update changelog - runs-on: ubuntu-latest - env: - CHANGELOG_GITHUB_TOKEN: ${{ secrets.CHANGELOG_GITHUB_TOKEN }} - steps: - - name: Checkout - uses: actions/checkout@v4 - with: - ref: master - - uses: actions/cache/restore@v3 - id: cache-restore - with: - key: changelog-generator - path: build/github-changelog-http-cache - - name: Generate changelog - uses: addnab/docker-run-action@v3 - with: - image: githubchangeloggenerator/github-changelog-generator - options: "-v ${{ github.workspace }}:/github/workspace --env SRC_PATH=/github/workspace --env CHANGELOG_GITHUB_TOKEN --workdir /github/workspace" - run: "github_changelog_generator --user php-amqplib --project php-amqplib" - - name: Commit changes - run: | - git config --local user.email "action@github.com" - git config --local user.name "GitHub Action" - git add CHANGELOG.md - git commit -m "update changelog" || true - - name: Push changes - uses: ad-m/github-push-action@master - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - branch: master - - name: Clear cache - if: always() - uses: snnaplab/delete-branch-cache-action@v1 - with: - key: ${{ steps.cache-restore.outputs.cache-primary-key }} - - name: Save changelog cache - if: always() - uses: actions/cache/save@v3 - with: - key: ${{ steps.cache-restore.outputs.cache-primary-key }} - path: build/github-changelog-http-cache - diff --git a/.github/workflows/code-style.yml b/.github/workflows/code-style.yml deleted file mode 100644 index f2d6dc61..00000000 --- a/.github/workflows/code-style.yml +++ /dev/null @@ -1,25 +0,0 @@ -name: Code style - -on: [push, pull_request] - -permissions: - contents: read - -jobs: - phpcs: - name: PHPCS - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Setup PHP - uses: shivammathur/setup-php@v2 - with: - php-version: '7.4' - - - name: Install dependencies - run: composer update --prefer-dist --no-progress --no-suggest - - - name: Run script - run: vendor/bin/phpcs diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml deleted file mode 100644 index 47774cac..00000000 --- a/.github/workflows/documentation.yml +++ /dev/null @@ -1,44 +0,0 @@ -name: Update documentation page - -on: - workflow_dispatch: - release: - types: [published] - -jobs: - run: - runs-on: ubuntu-latest - name: Regenerate phpDocumentator docs - steps: - - name: Checkout - uses: actions/checkout@v4 - with: - submodules: true - - name: Setup Graphviz - uses: ts-graphviz/setup-graphviz@v1 - - name: Install PHP - uses: shivammathur/setup-php@v2 - with: - php-version: 7.4 - extensions: ctype, dom, hash, iconv, json, libxml, mbstring, simplexml, zlib, xml - coverage: none - - name: Download phpDocumentator - run: wget -qN https://github.com/phpDocumentor/phpDocumentor/releases/download/v3.1.2/phpDocumentor.phar - - name: Generate docs - run: php -d error_reporting=1 ./phpDocumentor.phar run -v --force --defaultpackagename=PhpAmqpLib --title='php-amqplib' -d ./PhpAmqpLib -t ./docs - - name: Commit changes - run: | - cd ./docs - rm -rf ./phpdoc-cache-* - git config --local user.email "action@github.com" - git config --local user.name "GitHub Action" - git add ./* - git commit -m "update documentation" || true - - name: Push changes - uses: ad-m/github-push-action@master - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - directory: ./docs - branch: gh-pages - force: true - diff --git a/.gitignore b/.gitignore index 24f08cf3..0c52850d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ phpDocumentor.phar .phpactor.json .phpunit.result.cache .phpdoc +.idea diff --git a/PhpAmqpLib/Wire/IO/SwooleIO.php b/PhpAmqpLib/Wire/IO/SwooleIO.php index 6892396b..c169e3a1 100644 --- a/PhpAmqpLib/Wire/IO/SwooleIO.php +++ b/PhpAmqpLib/Wire/IO/SwooleIO.php @@ -21,18 +21,18 @@ class SwooleIO extends AbstractIO * @param int $port * @param float $connection_timeout * @param float $read_write_timeout - * @param null $context + * @param mixed $context * @param bool $keepalive * @param int $heartbeat */ public function __construct( - $host, - $port, - $connection_timeout, - $read_write_timeout, - $context = null, - $keepalive = false, - $heartbeat = 0 + string $host, + int $port, + float $connection_timeout, + float $read_write_timeout, + mixed $context = null, + bool $keepalive = false, + int $heartbeat = 0 ) { /* TODO FUTURE enable this check @@ -58,8 +58,11 @@ public function __construct( * @throws AMQPRuntimeException * @throws AMQPIOException */ - public function connect() + public function connect(): void { + // Close any existing connection to prevent resource leaks + $this->close(); + $this->sock = new Client(SWOOLE_SOCK_TCP); // Set socket options before connecting @@ -71,9 +74,12 @@ public function connect() 'open_tcp_nodelay' => true, 'tcp_keepalive' => $this->keepalive, 'package_max_length' => 2 * 1024 * 1024, // 2MB max package + 'socket_buffer_size' => 2 * 1024 * 1024, + 'buffer_output_size' => 2 * 1024 * 1024, ]); if (!$this->sock->connect($this->host, $this->port)) { + $this->close(); throw new AMQPIOException( sprintf( 'Error Connecting to server(%s): %s ', @@ -93,7 +99,7 @@ public function connect() * @throws AMQPTimeoutException * @throws AMQPConnectionClosedException */ - public function read($len) + public function read($len): string { if ($this->sock === null) { throw new AMQPConnectionClosedException('Socket connection is closed'); @@ -115,27 +121,28 @@ public function read($len) // Read remaining bytes from socket while ($remaining > 0) { if (!$this->sock->connected) { + $this->close(); throw new AMQPConnectionClosedException('Broken pipe or closed connection'); } // Swoole recv() returns false on error, empty string on EOF $chunk = $this->sock->recv($remaining, $this->read_timeout); - if ($chunk === false) { + if ($chunk === '' || $chunk === false) { + $this->close(); if ($this->sock->errCode == SOCKET_ETIMEDOUT) { throw new AMQPTimeoutException('Read timeout'); } - throw new AMQPIOException( - sprintf('Error receiving data: %s', swoole_strerror($this->sock->errCode)), - $this->sock->errCode - ); - } - - if ($chunk === '') { + if ($this->sock->errCode !== 0) { + throw new AMQPIOException( + sprintf('Error receiving data: %s', swoole_strerror($this->sock->errCode)), + $this->sock->errCode + ); + } throw new AMQPConnectionClosedException('Connection closed by peer'); } - $data .= $chunk; + $data .= $chunk; $remaining -= strlen($chunk); } @@ -148,30 +155,44 @@ public function read($len) * @throws AMQPRuntimeException * @throws AMQPTimeoutException * @throws AMQPConnectionClosedException + * @throws AMQPIOException */ - public function write($data) + public function write($data): void { if ($this->sock === null || !$this->sock->connected) { + $this->close(); throw new AMQPConnectionClosedException('Socket connection is closed'); } $this->checkBrokerHeartbeat(); - // Swoole send() handles partial writes internally - $result = $this->sock->send($data); + $totalLength = strlen($data); + $offset = 0; - if ($result === false) { - if ($this->sock->errCode == SOCKET_ETIMEDOUT) { - throw new AMQPTimeoutException('Write timeout'); + while ($offset < $totalLength) { + // Send remaining bytes; avoid extra substr() call for the first chunk + $chunk = $offset === 0 ? $data : substr($data, $offset); + + $sent = $this->sock->send($chunk); + + if ($sent === false) { + $this->close(); + if ($this->sock->errCode == SOCKET_ETIMEDOUT) { + throw new AMQPTimeoutException('Write timeout'); + } + throw new AMQPIOException( + sprintf('Error sending data: %s', swoole_strerror($this->sock->errCode)), + $this->sock->errCode + ); } - throw new AMQPIOException( - sprintf('Error sending data: %s', swoole_strerror($this->sock->errCode)), - $this->sock->errCode - ); - } - if ($result !== strlen($data)) { - throw new AMQPIOException('Could not write entire buffer'); + if ($sent === 0) { + // Peer closed the connection + $this->close(); + throw new AMQPConnectionClosedException('Connection closed by peer while writing'); + } + + $offset += $sent; } $this->last_write = microtime(true); @@ -180,7 +201,7 @@ public function write($data) /** * @return void */ - public function close() + public function close(): void { if ($this->sock !== null && $this->sock->connected) { $this->sock->close(); @@ -191,13 +212,21 @@ public function close() $this->buffer = ''; } + /** + * Ensure the socket is closed when the object is destroyed. + */ + public function __destruct() + { + $this->close(); + } + /** * @param int|null $sec * @param int $usec * @return int|bool * @throws AMQPConnectionClosedException */ - protected function do_select(?int $sec, int $usec) + protected function do_select(?int $sec, int $usec): bool|int { if ($this->sock === null || !$this->sock->connected) { throw new AMQPConnectionClosedException('Socket connection is closed'); @@ -222,12 +251,14 @@ protected function do_select(?int $sec, int $usec) } // Connection error if ($this->sock->errCode == SOCKET_ECONNRESET || !$this->sock->connected) { + $this->close(); throw new AMQPConnectionClosedException('Connection reset by peer'); } return false; // Other error } if ($data === '') { + $this->close(); throw new AMQPConnectionClosedException('Connection closed by peer'); } @@ -239,7 +270,7 @@ protected function do_select(?int $sec, int $usec) /** * @return Client|null */ - public function getSocket() + public function getSocket(): ?Client { return $this->sock; }