flow php

ETL Adapter: PostgreSQL

Flow PHP's Adapter PostgreSQL is designed to seamlessly integrate PostgreSQL within your ETL (Extract, Transform, Load) workflows. This adapter is built on top of the PostgreSQL library, providing efficient data extraction and loading capabilities. By harnessing the Adapter PostgreSQL library, developers can tap into robust features for precise database interaction, simplifying complex data transformations and enhancing data processing efficiency.

Installation

composer require flow-php/etl-adapter-postgresql:~0.29.0

Requirements

  • PHP 8.3+
  • ext-pgsql
  • ext-pg_query (optional, for query builder support)

Description

This adapter provides:

Extractors

Two extraction strategies optimized for different use cases:

  • LIMIT/OFFSET Pagination: Simple pagination suitable for smaller datasets
  • Keyset (Cursor) Pagination: Efficient pagination for large datasets with consistent performance

Both extractors support:

  • Raw SQL strings or Query Builder objects
  • Configurable page sizes
  • Maximum row limits
  • Custom schema definitions

Loader

A flexible loader supporting:

  • INSERT: Simple inserts with batch support
  • UPDATE: Update existing rows by primary key
  • DELETE: Delete rows by primary key
  • UPSERT: ON CONFLICT handling for insert-or-update operations

Extractor - LIMIT/OFFSET Pagination

The from_pgsql_limit_offset extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have performance degradation on very large datasets with high offsets.

Basic Usage

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};

$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));

data_frame()
    ->read(from_pgsql_limit_offset(
        $client,
        "SELECT id, name, email FROM users ORDER BY id",
        pageSize: 1000
    ))
    ->write(to_output())
    ->run();

With Query Builder

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
use function Flow\PostgreSql\DSL\{asc, col, select, table};

data_frame()
    ->read(from_pgsql_limit_offset(
        $client,
        select(col('id'), col('name'), col('email'))
            ->from(table('users'))
            ->orderBy(asc(col('id'))),
        pageSize: 500
    ))
    ->write(to_output())
    ->run();

With Maximum Row Limit

use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;

data_frame()
    ->read(from_pgsql_limit_offset(
        $client,
        "SELECT * FROM large_table ORDER BY id",
        pageSize: 1000,
        maximum: 10000  // Only extract first 10,000 rows
    ))
    ->write(to_output())
    ->run();

Extractor - Keyset (Cursor) Pagination

The from_pgsql_key_set extractor uses keyset pagination (also known as cursor-based pagination). This provides consistent performance regardless of how deep you paginate, making it ideal for large datasets.

Note: The ORDER BY clause is automatically generated from the keyset configuration. You only need to define the sort order once using pgsql_pagination_key_asc() or pgsql_pagination_key_desc().

Basic Usage

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_set};

data_frame()
    ->read(from_pgsql_key_set(
        $client,
        "SELECT id, name, email FROM users",
        pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
        pageSize: 1000
    ))
    ->write(to_output())
    ->run();

Descending Order

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_desc, pgsql_pagination_key_set};

data_frame()
    ->read(from_pgsql_key_set(
        $client,
        "SELECT id, name, created_at FROM orders",
        pgsql_pagination_key_set(pgsql_pagination_key_desc('id')),  // Newest first
        pageSize: 500
    ))
    ->write(to_output())
    ->run();

Composite Keys

For tables with composite ordering, you can specify multiple keys:

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_desc, pgsql_pagination_key_set};

data_frame()
    ->read(from_pgsql_key_set(
        $client,
        "SELECT * FROM events",
        pgsql_pagination_key_set(
            pgsql_pagination_key_desc('created_at'),  // First by date descending
            pgsql_pagination_key_asc('id')             // Then by ID ascending
        ),
        pageSize: 1000
    ))
    ->write(to_output())
    ->run();

With Query Builder

use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_set};
use function Flow\PostgreSql\DSL\{col, select, star, table};

data_frame()
    ->read(from_pgsql_key_set(
        $client,
        select(star())->from(table('products')),
        pgsql_pagination_key_set(pgsql_pagination_key_asc('product_id')),
        pageSize: 500
    ))
    ->write(to_output())
    ->run();

DSL Functions Reference

Extractor Functions

Function Description
from_pgsql_limit_offset($client, $query, $pageSize, $maximum) Extract using LIMIT/OFFSET pagination
from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum) Extract using keyset pagination

Key Functions

Function Description
pgsql_pagination_key_asc($column) Create an ascending key for keyset pagination
pgsql_pagination_key_desc($column) Create a descending key for keyset pagination
pgsql_pagination_key_set(...$keys) Create a keyset from one or more keys

Loader

The to_pgsql_table loader writes data to PostgreSQL tables. It supports INSERT, UPDATE, and DELETE operations with configurable conflict handling.

Basic Insert

use function Flow\ETL\Adapter\PostgreSql\to_pgsql_table;
use function Flow\ETL\DSL\{df, from_array};
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};

$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));

df()
    ->read(from_array([
        ['id' => 1, 'name' => 'Alice', 'email' => '[email protected]'],
        ['id' => 2, 'name' => 'Bob', 'email' => '[email protected]'],
    ]))
    ->write(to_pgsql_table($client, 'users'))
    ->run();

Insert with Skip Conflicts (ON CONFLICT DO NOTHING)

Skip rows that would cause a constraint violation:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};

df()
    ->read(from_array($data))
    ->write(
        to_pgsql_table($client, 'users')
            ->withInsertOptions(pgsql_insert_options(skipConflicts: true))
    )
    ->run();

Upsert (ON CONFLICT DO UPDATE)

Update existing rows on conflict using specific columns:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};

df()
    ->read(from_array($data))
    ->write(
        to_pgsql_table($client, 'users')
            ->withInsertOptions(pgsql_insert_options(
                conflictColumns: ['email'],        // Detect conflicts on these columns
                updateColumns: ['name', 'updated_at']  // Update these columns on conflict
            ))
    )
    ->run();

Upsert on Constraint

Use a named constraint for conflict detection:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};

df()
    ->read(from_array($data))
    ->write(
        to_pgsql_table($client, 'users')
            ->withInsertOptions(pgsql_insert_options(
                conflictConstraint: 'users_email_key',
                updateColumns: ['name']
            ))
    )
    ->run();

Update Existing Rows

Update rows matching primary key values:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_update_options, to_pgsql_table};
use Flow\ETL\Adapter\PostgreSql\Operation;

df()
    ->read(from_array([
        ['id' => 1, 'name' => 'Alice Updated', 'email' => '[email protected]'],
        ['id' => 2, 'name' => 'Bob Updated', 'email' => '[email protected]'],
    ]))
    ->write(
        to_pgsql_table($client, 'users')
            ->withOperation(Operation::UPDATE)
            ->withUpdateOptions(pgsql_update_options(['id']))  // Match on 'id' column
    )
    ->run();

Delete Rows

Delete rows matching primary key values:

use function Flow\ETL\Adapter\PostgreSql\{pgsql_delete_options, to_pgsql_table};
use Flow\ETL\Adapter\PostgreSql\Operation;

df()
    ->read(from_array([
        ['id' => 1],
        ['id' => 3],
    ]))
    ->write(
        to_pgsql_table($client, 'users')
            ->withOperation(Operation::DELETE)
            ->withDeleteOptions(pgsql_delete_options(['id']))
    )
    ->run();

Loader DSL Functions Reference

Function Description
to_pgsql_table($client, $table) Create a PostgreSQL loader for a table
pgsql_insert_options(...) Configure insert behavior (conflicts, upsert)
pgsql_update_options($primaryKeys) Configure update behavior (primary key columns)
pgsql_delete_options($primaryKeys) Configure delete behavior (primary key columns)

Contributors

Join us on GitHub external resource
scroll back to top