From d97927820496a45faae27b21282e03817b6272a5 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 11 Dec 2017 22:12:53 -0600 Subject: [PATCH] Close connection on flush failure --- lib/PgSqlHandle.php | 13 +++++++++++-- lib/PqHandle.php | 22 +++++++++++++++++----- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/lib/PgSqlHandle.php b/lib/PgSqlHandle.php index e2da4bf..c9de234 100644 --- a/lib/PgSqlHandle.php +++ b/lib/PgSqlHandle.php @@ -115,7 +115,7 @@ public function __construct($handle, $socket) { } }); - $this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, $handle) { + $this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, &$listeners, &$handle) { $flush = \pg_flush($handle); if ($flush === 0) { return; // Not finished sending data, listen again. @@ -124,7 +124,16 @@ public function __construct($handle, $socket) { Loop::disable($watcher); if ($flush === false) { - $deferred->fail(new FailureException(\pg_last_error($handle))); + $exception = new ConnectionException(\pg_last_error($handle)); + $handle = null; // Marks connection as dead. + + foreach ($listeners as $listener) { + $listener->fail($exception); + } + + if ($deferred !== null) { + $deferred->fail($exception); + } } }); diff --git a/lib/PqHandle.php b/lib/PqHandle.php index fffc436..53cdf44 100644 --- a/lib/PqHandle.php +++ b/lib/PqHandle.php @@ -66,11 +66,10 @@ public function __construct(pq\Connection $handle) { $this->poll = Loop::onReadable($this->handle->socket, static function ($watcher) use (&$deferred, &$listeners, &$handle) { if ($handle->poll() === pq\Connection::POLLING_FAILED) { + $exception = new ConnectionException($handle->errorMessage); $handle = null; // Marks connection as dead. Loop::disable($watcher); - $exception = new ConnectionException($handle->errorMessage); - foreach ($listeners as $listener) { $listener->fail($exception); } @@ -97,9 +96,22 @@ public function __construct(pq\Connection $handle) { } }); - $this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) { - if (!$handle->flush()) { - return; // Not finished sending data, continue polling for writability. + $this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, &$listeners, &$handle) { + try { + if (!$handle->flush()) { + return; // Not finished sending data, continue polling for writability. + } + } catch (pq\Exception $exception) { + $exception = new ConnectionException("Flushing the connection failed", 0, $exception); + $handle = null; // Marks connection as dead. + + foreach ($listeners as $listener) { + $listener->fail($exception); + } + + if ($deferred !== null) { + $deferred->fail($exception); + } } Loop::disable($watcher);