Class Pool

The RxJS wrapper for pg.Pool.

Example

// Create a connection pool
const pool = new Pool();

Example

// Open a connection and query the database

pool.connect()
.pipe(
concatMap((val) => {
const { poolClient } = val;

return poolClient
.query(sql`SELECT * FROM animals`)
.pipe(concatMap((result) => poolClient.release(result)));
}
)
)
.subscribe({
next: (result) => {
console.log(
`Query succeeded. Returned ${result.rows.length} rows.`
);
console.table(result.rows);
},
error: (err) => {
console.error('Query failed:');
console.error(err);
}
});

Example

// Open a connection and stream from the database
pool
.connect()
.pipe(
concatMap((poolClient) => {
const { poolClient } = val;

poolClient
.stream(sql`SELECT * FROM get_animals()`)
// Use finalize to release the pool connection after all rows are returned
.pipe(finalize(() => poolClient.release().subscribe()))
})
)
.subscribe({
next: (val) => {
// Emits individual rows
},
complete: () => {
// Complete is called when all rows have been read
}
});

Example

// Close a connection pool
pool.end().subscribe()

Public Api

Constructors

Properties

_poolNative: Pool
_connect?: Subject<PoolClient>
_aquire?: Subject<PoolClient>
_remove?: Subject<PoolClient>
_error?: Subject<Error>
_release?: Subject<Error>

Accessors

  • get onError(): Subject<Error>
  • Returns Subject<Error>

  • get onRelease(): Subject<Error>
  • Returns Subject<Error>

Methods

  • Opens a database connection using the options defined in the constructor.

    Returns Observable<{
        pool: Pool;
        client: PoolClient;
    }>

  • Closes the connection pool.

    Returns Observable<{
        totalCount: number;
        waitingCount: number;
    }>

  • Parameters

    • Optional eventName: "error" | "connect" | "drain" | "aquire" | "remove"

    Returns this