Skip to content

Commit

Permalink
Async querying of types
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 16, 2024
1 parent c9ce9fd commit edac76e
Showing 1 changed file with 64 additions and 29 deletions.
93 changes: 64 additions & 29 deletions src/Internal/PgSqlHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@
use Revolt\EventLoop;
use function Amp\async;

/** @internal */
/**
* @internal
*
* @psalm-type PgSqlTypeMap = array<int, PgSqlType> Map of OID to corresponding PgSqlType.
*/
final class PgSqlHandle extends AbstractHandle
{
private const TYPE_QUERY = <<<SQL
SELECT t.oid, t.typcategory, t.typname, t.typdelim, t.typelem
FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON t.typnamespace=n.oid
WHERE t.typisdefined AND n.nspname IN ('pg_catalog', 'public') ORDER BY t.oid
SQL;

private const DIAGNOSTIC_CODES = [
\PGSQL_DIAG_SEVERITY => "severity",
\PGSQL_DIAG_SQLSTATE => "sqlstate",
Expand All @@ -35,16 +45,16 @@ final class PgSqlHandle extends AbstractHandle
\PGSQL_DIAG_SOURCE_FUNCTION => "source_function",
];

/** @var array<string, array<int, PgSqlType>> */
/** @var array<string, Future<PgSqlTypeMap>> */
private static array $typeCache;

private static ?\Closure $errorHandler = null;

/** @var \PgSql\Connection PostgreSQL connection handle. */
private ?\PgSql\Connection $handle;

/** @var array<int, PgSqlType> */
private readonly array $types;
/** @var PgSqlTypeMap|null */
private ?array $types = null;

/** @var array<non-empty-string, StatementStorage<string>> */
private array $statements = [];
Expand All @@ -57,13 +67,11 @@ final class PgSqlHandle extends AbstractHandle
public function __construct(
\PgSql\Connection $handle,
$socket,
string $id,
private readonly string $id,
PostgresConfig $config,
) {
$this->handle = $handle;

$this->types = (self::$typeCache[$id] ??= self::fetchTypes($handle));

$handle = &$this->handle;
$lastUsedAt = &$this->lastUsedAt;
$deferred = &$this->pendingOperation;
Expand Down Expand Up @@ -171,35 +179,60 @@ public function __construct(
}

/**
* @return array<int, PgSqlType>
* @return Future<PgSqlTypeMap>
*/
private static function fetchTypes(\PgSql\Connection $handle): array
private function fetchTypes(): Future
{
$result = \pg_query($handle, "SELECT t.oid, t.typcategory, t.typname, t.typdelim, t.typelem
FROM pg_catalog.pg_type t JOIN pg_catalog.pg_namespace n ON t.typnamespace=n.oid
WHERE t.typisdefined AND n.nspname IN ('pg_catalog', 'public') ORDER BY t.oid");
if ($this->handle === null) {
throw new \Error("The connection to the database has been closed");
}

$result = \pg_send_query($this->handle, self::TYPE_QUERY);
if ($result === false) {
throw new SqlException(\pg_last_error($handle));
$this->close();
throw new SqlException(\pg_last_error($this->handle));
}

$types = [];
while ($row = \pg_fetch_array($result, mode: \PGSQL_NUM)) {
[$oid, $typeCategory, $typeName, $delimiter, $element] = $row;

\assert(
\is_numeric($oid) && \is_numeric($element),
"OID and element type expected to be integers",
);
\assert(
\is_string($typeCategory) && \is_string($typeName) && \is_string($delimiter),
"Unexpected types in type catalog query results",
);
$this->pendingOperation = $queryDeferred = new DeferredFuture();
$typesDeferred = new DeferredFuture();

$types[(int) $oid] = new PgSqlType($typeCategory, $typeName, $delimiter, (int) $element);
EventLoop::reference($this->poll);
if ($result === 0) {
EventLoop::enable($this->await);
}

return $types;
EventLoop::queue(function () use ($queryDeferred, $typesDeferred): void {
try {
$result = $queryDeferred->getFuture()->await();
if (\pg_result_status($result) !== \PGSQL_TUPLES_OK) {
throw new SqlException(\pg_result_error($result));
}

$types = [];
while ($row = \pg_fetch_array($result, mode: \PGSQL_NUM)) {
[$oid, $typeCategory, $typeName, $delimiter, $element] = $row;

\assert(
\is_numeric($oid) && \is_numeric($element),
"OID and element type expected to be integers",
);
\assert(
\is_string($typeCategory) && \is_string($typeName) && \is_string($delimiter),
"Unexpected types in type catalog query results",
);

$types[(int) $oid] = new PgSqlType($typeCategory, $typeName, $delimiter, (int) $element);
}

$typesDeferred->complete($types);
} catch (\Throwable $exception) {
$this->close();
$typesDeferred->error($exception);
unset(self::$typeCache[$this->id]);
}
});

return $typesDeferred->getFuture();
}

private static function getErrorHandler(): \Closure
Expand All @@ -224,12 +257,12 @@ public function isClosed(): bool
* @param \Closure $function Function to execute.
* @param mixed ...$args Arguments to pass to function.
*
* @return \PgSql\Result
*
* @throws SqlException
*/
private function send(\Closure $function, mixed ...$args): mixed
{
$this->types ??= (self::$typeCache[$this->id] ??= $this->fetchTypes())->await();

while ($this->pendingOperation) {
try {
$this->pendingOperation->getFuture()->await();
Expand Down Expand Up @@ -275,6 +308,8 @@ private function createResult(\PgSql\Result $result, string $sql): PostgresResul
throw new \Error("The connection to the database has been closed");
}

\assert($this->types !== null, 'Expected type array to be populated before creating a result');

switch (\pg_result_status($result)) {
case \PGSQL_EMPTY_QUERY:
throw new SqlQueryError("Empty query string");
Expand Down

0 comments on commit edac76e

Please sign in to comment.