Skip to content
Search

ETL Adapter: PostgreSQL

Flow PHP's Adapter PostgreSQL is designed to seamlessly integrate PostgreSQL within your ETL (Extract, Transform, Load) workflows. This adapter is built on top of the PostgreSQL library, providing efficient data extraction and loading capabilities. By harnessing the Adapter PostgreSQL library, developers can tap into robust features for precise database interaction, simplifying complex data transformations and enhancing data processing efficiency.

Installation

For detailed installation instructions, see the installation page.

Requirements

  • PHP 8.3+
  • ext-pgsql
  • ext-pg_query (optional, for query builder support)

Description

This adapter provides:

Extractors

Three extraction strategies optimized for different use cases:

  • Server-Side Cursor: True streaming extraction using PostgreSQL DECLARE CURSOR for maximum memory efficiency
  • LIMIT/OFFSET Pagination: Simple pagination suitable for smaller datasets
  • Keyset (Cursor) Pagination: Efficient pagination for large datasets with consistent performance

All extractors support:

  • Raw SQL strings or Query Builder objects
  • Configurable batch/page sizes
  • Maximum row limits
  • Custom schema definitions

Loader

A flexible loader supporting:

  • INSERT: Simple inserts with batch support
  • UPDATE: Update existing rows by primary key
  • DELETE: Delete rows by primary key
  • UPSERT: ON CONFLICT handling for insert-or-update operations

Extractor - Server-Side Cursor

The from_pgsql_cursor extractor uses PostgreSQL's native server-side cursors via DECLARE CURSOR + FETCH. This is the only way to achieve true low-memory streaming with PHP's ext-pgsql, as the extension has no unbuffered query mode.

Note: This extractor automatically manages transactions. Cursors require a transaction context, which is auto-started if not already in one.

Basic Usage

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};

$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));

data_frame()
    ->read(from_pgsql_cursor(
        $client,
        "SELECT id, name, email FROM users",
        fetchSize: 1000
    ))
    ->write(to_output())
    ->run();

With Query Builder

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
use function Flow\PostgreSql\DSL\{col, select, star, table};

data_frame()
    ->read(from_pgsql_cursor(
        $client,
        select(star())->from(table('large_table')),
        fetchSize: 500
    ))
    ->write(to_output())
    ->run();

With Parameters

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;

data_frame()
    ->read(from_pgsql_cursor(
        $client,
        "SELECT * FROM orders WHERE status = $1 AND created_at > $2",
        parameters: ['pending', '2024-01-01'],
        fetchSize: 1000
    ))
    ->write(to_output())
    ->run();

When to Use Each Extractor

Extractor Best For Memory ORDER BY Required
from_pgsql_cursor Very large datasets, true streaming Lowest (server-side) No
from_pgsql_key_set Large datasets with indexed keys Medium (page buffered) Auto-generated
from_pgsql_limit_offset Small-medium datasets Medium (page buffered) Yes

Extractor - LIMIT/OFFSET Pagination

The from_pgsql_limit_offset extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have performance degradation on very large datasets with high offsets.

Basic Usage

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};

$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));

data_frame()
    ->read(from_pgsql_limit_offset(
        $client,
        "SELECT id, name, email FROM users ORDER BY id",
        pageSize: 1000
    ))
    ->write(to_output())
    ->run();

With Query Builder

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
use function Flow\PostgreSql\DSL\{asc, col, select, table};

data_frame()
    ->read(from_pgsql_limit_offset(
        $client,
        select(col('id'), col('name'), col('email'))
            ->from(table('users'))
            ->orderBy(asc(col('id'))),
        pageSize: 500
    ))
    ->write(to_output())
    ->run();

With Maximum Row Limit

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;

data_frame()
    ->read(from_pgsql_limit_offset(
        $client,
        "SELECT * FROM large_table ORDER BY id",
        pageSize: 1000,
        maximum: 10000  // Only extract first 10,000 rows
    ))
    ->write(to_output())
    ->run();

Extractor - Keyset (Cursor) Pagination

The from_pgsql_key_set extractor uses keyset pagination (also known as cursor-based pagination). This provides consistent performance regardless of how deep you paginate, making it ideal for large datasets.

Note: The ORDER BY clause is automatically generated from the keyset configuration. You only need to define the sort order once using pgsql_pagination_key_asc() or pgsql_pagination_key_desc().

Basic Usage

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_set};

data_frame()
    ->read(from_pgsql_key_set(
        $client,
        "SELECT id, name, email FROM users",
        pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
        pageSize: 1000
    ))
    ->write(to_output())
    ->run();

Descending Order

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_desc, pgsql_pagination_key_set};

data_frame()
    ->read(from_pgsql_key_set(
        $client,
        "SELECT id, name, created_at FROM orders",
        pgsql_pagination_key_set(pgsql_pagination_key_desc('id')),  // Newest first
        pageSize: 500
    ))
    ->write(to_output())
    ->run();

Composite Keys

For tables with composite ordering, you can specify multiple keys:

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_desc, pgsql_pagination_key_set};

data_frame()
    ->read(from_pgsql_key_set(
        $client,
        "SELECT * FROM events",
        pgsql_pagination_key_set(
            pgsql_pagination_key_desc('created_at'),  // First by date descending
            pgsql_pagination_key_asc('id')             // Then by ID ascending
        ),
        pageSize: 1000
    ))
    ->write(to_output())
    ->run();

With Query Builder

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_set};
use function Flow\PostgreSql\DSL\{col, select, star, table};

data_frame()
    ->read(from_pgsql_key_set(
        $client,
        select(star())->from(table('products')),
        pgsql_pagination_key_set(pgsql_pagination_key_asc('product_id')),
        pageSize: 500
    ))
    ->write(to_output())
    ->run();

DSL Functions Reference

Extractor Functions

Function Description
from_pgsql_cursor($client, $query, $parameters, $fetchSize, $max) Extract using server-side cursor
from_pgsql_limit_offset($client, $query, $pageSize, $maximum) Extract using LIMIT/OFFSET pagination
from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum) Extract using keyset pagination

Key Functions

Function Description
pgsql_pagination_key_asc($column) Create an ascending key for keyset pagination
pgsql_pagination_key_desc($column) Create a descending key for keyset pagination
pgsql_pagination_key_set(...$keys) Create a keyset from one or more keys

Loader

The to_pgsql_table loader writes data to PostgreSQL tables. It supports INSERT, UPDATE, and DELETE operations with configurable conflict handling.

Basic Insert

use function Flow\ETL\Adapter\PostgreSql\to_pgsql_table;
use function Flow\ETL\DSL\{df, from_array};
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};

$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));

df()
    ->read(from_array([
        ['id' => 1, 'name' => 'Alice', 'email' => '[email protected]'],
        ['id' => 2, 'name' => 'Bob', 'email' => '[email protected]'],
    ]))
    ->write(to_pgsql_table($client, 'users'))
    ->run();

Insert with Skip Conflicts (ON CONFLICT DO NOTHING)

Skip rows that would cause a constraint violation:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};

df()
    ->read(from_array($data))
    ->write(
        to_pgsql_table($client, 'users')
            ->withInsertOptions(pgsql_insert_options(skipConflicts: true))
    )
    ->run();

Upsert (ON CONFLICT DO UPDATE)

Update existing rows on conflict using specific columns:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};

df()
    ->read(from_array($data))
    ->write(
        to_pgsql_table($client, 'users')
            ->withInsertOptions(pgsql_insert_options(
                conflictColumns: ['email'],        // Detect conflicts on these columns
                updateColumns: ['name', 'updated_at']  // Update these columns on conflict
            ))
    )
    ->run();

Upsert on Constraint

Use a named constraint for conflict detection:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};

df()
    ->read(from_array($data))
    ->write(
        to_pgsql_table($client, 'users')
            ->withInsertOptions(pgsql_insert_options(
                conflictConstraint: 'users_email_key',
                updateColumns: ['name']
            ))
    )
    ->run();

Update Existing Rows

Update rows matching primary key values:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_update_options, to_pgsql_table};
use Flow\ETL\Adapter\PostgreSql\Operation;

df()
    ->read(from_array([
        ['id' => 1, 'name' => 'Alice Updated', 'email' => '[email protected]'],
        ['id' => 2, 'name' => 'Bob Updated', 'email' => '[email protected]'],
    ]))
    ->write(
        to_pgsql_table($client, 'users')
            ->withOperation(Operation::UPDATE)
            ->withUpdateOptions(pgsql_update_options(['id']))  // Match on 'id' column
    )
    ->run();

Delete Rows

Delete rows matching primary key values:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_delete_options, to_pgsql_table};
use Flow\ETL\Adapter\PostgreSql\Operation;

df()
    ->read(from_array([
        ['id' => 1],
        ['id' => 3],
    ]))
    ->write(
        to_pgsql_table($client, 'users')
            ->withOperation(Operation::DELETE)
            ->withDeleteOptions(pgsql_delete_options(['id']))
    )
    ->run();

Loader DSL Functions Reference

Function Description
to_pgsql_table($client, $table) Create a PostgreSQL loader for a table
pgsql_insert_options(...) Configure insert behavior (conflicts, upsert)
pgsql_update_options($primaryKeys) Configure update behavior (primary key columns)
pgsql_delete_options($primaryKeys) Configure delete behavior (primary key columns)

Schema Conversion

Two helpers convert between a Flow Schema and a PostgreSQL table definition:

  • to_pgsql_schema_table() turns a Flow Schema into a Flow\PostgreSql\Schema\Table, which can emit CREATE TABLE (and related index/constraint) SQL via toSql().
  • pgsql_table_to_flow_schema() turns a Flow\PostgreSql\Schema\Table back into a Flow Schema.

Column types are resolved through the shared EntryTypesMap (Flow type → PostgreSQL column type), and per-column details — primary keys, unique constraints, indexes, length, precision/scale, defaults, identity, generated columns, and explicit type overrides — are driven by PostgreSqlMetadata entries attached to each schema definition.

Creating a Table from a Flow Schema

to_pgsql_schema_table() returns a table definition; call toSql() on it and execute each statement to create the table:

use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;

use function Flow\ETL\Adapter\PostgreSql\to_pgsql_schema_table;
use function Flow\ETL\DSL\{bool_schema, int_schema, json_schema, schema, str_schema};

$table = to_pgsql_schema_table(
    schema(
        int_schema('id', metadata: PostgreSqlMetadata::primaryKey('pk_users')),
        str_schema('name', metadata: PostgreSqlMetadata::length(120)),
        str_schema('email', metadata: PostgreSqlMetadata::indexUnique('uq_users_email')),
        bool_schema('active', metadata: PostgreSqlMetadata::default(true)),
        json_schema('payload'),
    ),
    'users',
);

foreach ($table->toSql() as $sql) {
    $client->execute($sql);
}

By default the table is created in the public schema; pass a third argument to target another one:

$table = to_pgsql_schema_table($schema, 'users', 'analytics');

Steering the Conversion with Metadata

PostgreSqlMetadata factories return Metadata objects you attach to a definition via the metadata: argument of the schema DSL helpers. Combine multiple entries with merge():

use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;
use Flow\PostgreSql\Schema\IdentityGeneration;

use function Flow\ETL\DSL\{float_schema, int_schema, schema, str_schema};

$schema = schema(
    int_schema('id', metadata: PostgreSqlMetadata::identity(IdentityGeneration::BY_DEFAULT)),
    str_schema('sku', metadata: PostgreSqlMetadata::type('citext')),                    // explicit type override
    float_schema('amount', metadata: PostgreSqlMetadata::precision(10)->merge(PostgreSqlMetadata::scale(2))),
    int_schema('total', metadata: PostgreSqlMetadata::generated('price * quantity')),
);
Metadata Effect on the generated column
PostgreSqlMetadata::type($name) Force a specific PostgreSQL type, bypassing the type map
PostgreSqlMetadata::length($n) Emit varchar($n)
PostgreSqlMetadata::precision($p) / ::scale($s) Emit numeric($p, $s)
PostgreSqlMetadata::default($v) Set a column DEFAULT
PostgreSqlMetadata::primaryKey($name) Include the column in the table primary key
PostgreSqlMetadata::indexUnique($name) Include the column in a named UNIQUE constraint
PostgreSqlMetadata::index($name) Include the column in a named index
PostgreSqlMetadata::identity($generation) Make the column an identity column
PostgreSqlMetadata::generated($expr) Make the column a generated column

Columns sharing the same primary key, unique constraint, or index name are grouped together, so composite keys are expressed by attaching the same name to several definitions.

Reading a Flow Schema back from a Table

pgsql_table_to_flow_schema() takes a Flow\PostgreSql\Schema\Table and returns a Flow Schema. Combine it with the PostgreSQL library's catalog provider to derive a Flow schema from a live table:

use function Flow\ETL\Adapter\PostgreSql\pgsql_table_to_flow_schema;
use function Flow\PostgreSql\DSL\client_catalog_provider;

$table = client_catalog_provider($client, ['public'])
    ->get()
    ->get('public')
    ->table('users');

$schema = pgsql_table_to_flow_schema($table);

Note: The reverse conversion is intentionally lossy. Several Flow types collapse onto the same PostgreSQL type (for example json, list, map, and structure all map to jsonb), so a column is mapped back to a single canonical Flow type rather than its original one.

Customizing the Type Mapping

Both helpers accept an optional EntryTypesMap. Its second constructor argument overrides the Flow type → PostgreSQL column type mapping (the first argument keeps overriding the value-binding types used by the loader):

use Flow\ETL\Adapter\PostgreSql\EntryTypesMap;
use Flow\PostgreSql\QueryBuilder\Schema\ColumnType;
use Flow\Types\Type\Native\StringType;

use function Flow\ETL\Adapter\PostgreSql\to_pgsql_schema_table;

$table = to_pgsql_schema_table(
    $schema,
    'users',
    typesMap: new EntryTypesMap([], [
        StringType::class => ColumnType::varchar(255),  // default strings to varchar(255) instead of text
    ]),
);

Schema Conversion DSL Functions Reference

Function Description
to_pgsql_schema_table($schema, $tableName, $databaseSchema, $typesMap) Convert a Flow Schema into a PostgreSQL Table
pgsql_table_to_flow_schema($table, $typesMap) Convert a PostgreSQL Table into a Flow Schema

Found a typo or an outdated section? Edit this page on GitHub


Contributors

Built in the open.

Join us on GitHub
scroll back to top