Introduction
ETL Adapter: PostgreSQL
Flow PHP's Adapter PostgreSQL is designed to seamlessly integrate PostgreSQL within your ETL (Extract, Transform, Load) workflows. This adapter is built on top of the PostgreSQL library, providing efficient data extraction and loading capabilities. By harnessing the Adapter PostgreSQL library, developers can tap into robust features for precise database interaction, simplifying complex data transformations and enhancing data processing efficiency.
Installation
composer require flow-php/etl-adapter-postgresql:~0.29.0
Requirements
- PHP 8.3+
- ext-pgsql
- ext-pg_query (optional, for query builder support)
Description
This adapter provides:
Extractors
Two extraction strategies optimized for different use cases:
- LIMIT/OFFSET Pagination: Simple pagination suitable for smaller datasets
- Keyset (Cursor) Pagination: Efficient pagination for large datasets with consistent performance
Both extractors support:
- Raw SQL strings or Query Builder objects
- Configurable page sizes
- Maximum row limits
- Custom schema definitions
Loader
A flexible loader supporting:
- INSERT: Simple inserts with batch support
- UPDATE: Update existing rows by primary key
- DELETE: Delete rows by primary key
- UPSERT: ON CONFLICT handling for insert-or-update operations
Extractor - LIMIT/OFFSET Pagination
The from_pgsql_limit_offset extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have
performance degradation on very large datasets with high offsets.
Basic Usage
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};
$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));
data_frame()
->read(from_pgsql_limit_offset(
$client,
"SELECT id, name, email FROM users ORDER BY id",
pageSize: 1000
))
->write(to_output())
->run();
With Query Builder
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
use function Flow\PostgreSql\DSL\{asc, col, select, table};
data_frame()
->read(from_pgsql_limit_offset(
$client,
select(col('id'), col('name'), col('email'))
->from(table('users'))
->orderBy(asc(col('id'))),
pageSize: 500
))
->write(to_output())
->run();
With Maximum Row Limit
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
data_frame()
->read(from_pgsql_limit_offset(
$client,
"SELECT * FROM large_table ORDER BY id",
pageSize: 1000,
maximum: 10000 // Only extract first 10,000 rows
))
->write(to_output())
->run();
Extractor - Keyset (Cursor) Pagination
The from_pgsql_key_set extractor uses keyset pagination (also known as cursor-based pagination). This provides
consistent performance regardless of how deep you paginate, making it ideal for large datasets.
Note: The ORDER BY clause is automatically generated from the keyset configuration. You only need to define the sort order once using
pgsql_pagination_key_asc()orpgsql_pagination_key_desc().
Basic Usage
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_set};
data_frame()
->read(from_pgsql_key_set(
$client,
"SELECT id, name, email FROM users",
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
pageSize: 1000
))
->write(to_output())
->run();
Descending Order
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_desc, pgsql_pagination_key_set};
data_frame()
->read(from_pgsql_key_set(
$client,
"SELECT id, name, created_at FROM orders",
pgsql_pagination_key_set(pgsql_pagination_key_desc('id')), // Newest first
pageSize: 500
))
->write(to_output())
->run();
Composite Keys
For tables with composite ordering, you can specify multiple keys:
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_desc, pgsql_pagination_key_set};
data_frame()
->read(from_pgsql_key_set(
$client,
"SELECT * FROM events",
pgsql_pagination_key_set(
pgsql_pagination_key_desc('created_at'), // First by date descending
pgsql_pagination_key_asc('id') // Then by ID ascending
),
pageSize: 1000
))
->write(to_output())
->run();
With Query Builder
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_set};
use function Flow\PostgreSql\DSL\{col, select, star, table};
data_frame()
->read(from_pgsql_key_set(
$client,
select(star())->from(table('products')),
pgsql_pagination_key_set(pgsql_pagination_key_asc('product_id')),
pageSize: 500
))
->write(to_output())
->run();
DSL Functions Reference
Extractor Functions
| Function | Description |
|---|---|
from_pgsql_limit_offset($client, $query, $pageSize, $maximum) |
Extract using LIMIT/OFFSET pagination |
from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum) |
Extract using keyset pagination |
Key Functions
| Function | Description |
|---|---|
pgsql_pagination_key_asc($column) |
Create an ascending key for keyset pagination |
pgsql_pagination_key_desc($column) |
Create a descending key for keyset pagination |
pgsql_pagination_key_set(...$keys) |
Create a keyset from one or more keys |
Loader
The to_pgsql_table loader writes data to PostgreSQL tables. It supports INSERT, UPDATE, and DELETE operations with
configurable conflict handling.
Basic Insert
use function Flow\ETL\Adapter\PostgreSql\to_pgsql_table;
use function Flow\ETL\DSL\{df, from_array};
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};
$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));
df()
->read(from_array([
['id' => 1, 'name' => 'Alice', 'email' => '[email protected]'],
['id' => 2, 'name' => 'Bob', 'email' => '[email protected]'],
]))
->write(to_pgsql_table($client, 'users'))
->run();
Insert with Skip Conflicts (ON CONFLICT DO NOTHING)
Skip rows that would cause a constraint violation:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
df()
->read(from_array($data))
->write(
to_pgsql_table($client, 'users')
->withInsertOptions(pgsql_insert_options(skipConflicts: true))
)
->run();
Upsert (ON CONFLICT DO UPDATE)
Update existing rows on conflict using specific columns:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
df()
->read(from_array($data))
->write(
to_pgsql_table($client, 'users')
->withInsertOptions(pgsql_insert_options(
conflictColumns: ['email'], // Detect conflicts on these columns
updateColumns: ['name', 'updated_at'] // Update these columns on conflict
))
)
->run();
Upsert on Constraint
Use a named constraint for conflict detection:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
df()
->read(from_array($data))
->write(
to_pgsql_table($client, 'users')
->withInsertOptions(pgsql_insert_options(
conflictConstraint: 'users_email_key',
updateColumns: ['name']
))
)
->run();
Update Existing Rows
Update rows matching primary key values:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_update_options, to_pgsql_table};
use Flow\ETL\Adapter\PostgreSql\Operation;
df()
->read(from_array([
['id' => 1, 'name' => 'Alice Updated', 'email' => '[email protected]'],
['id' => 2, 'name' => 'Bob Updated', 'email' => '[email protected]'],
]))
->write(
to_pgsql_table($client, 'users')
->withOperation(Operation::UPDATE)
->withUpdateOptions(pgsql_update_options(['id'])) // Match on 'id' column
)
->run();
Delete Rows
Delete rows matching primary key values:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_delete_options, to_pgsql_table};
use Flow\ETL\Adapter\PostgreSql\Operation;
df()
->read(from_array([
['id' => 1],
['id' => 3],
]))
->write(
to_pgsql_table($client, 'users')
->withOperation(Operation::DELETE)
->withDeleteOptions(pgsql_delete_options(['id']))
)
->run();
Loader DSL Functions Reference
| Function | Description |
|---|---|
to_pgsql_table($client, $table) |
Create a PostgreSQL loader for a table |
pgsql_insert_options(...) |
Configure insert behavior (conflicts, upsert) |
pgsql_update_options($primaryKeys) |
Configure update behavior (primary key columns) |
pgsql_delete_options($primaryKeys) |
Configure delete behavior (primary key columns) |