ETL Adapter: PostgreSQL
Flow PHP's Adapter PostgreSQL is designed to seamlessly integrate PostgreSQL within your ETL (Extract, Transform, Load) workflows. This adapter is built on top of the PostgreSQL library, providing efficient data extraction and loading capabilities. By harnessing the Adapter PostgreSQL library, developers can tap into robust features for precise database interaction, simplifying complex data transformations and enhancing data processing efficiency.
Installation
For detailed installation instructions, see the installation page.
Requirements
- PHP 8.3+
- ext-pgsql
- ext-pg_query (optional, for query builder support)
Description
This adapter provides:
Extractors
Three extraction strategies optimized for different use cases:
- Server-Side Cursor: True streaming extraction using PostgreSQL DECLARE CURSOR for maximum memory efficiency
- LIMIT/OFFSET Pagination: Simple pagination suitable for smaller datasets
- Keyset (Cursor) Pagination: Efficient pagination for large datasets with consistent performance
All extractors support:
- Raw SQL strings or Query Builder objects
- Configurable batch/page sizes
- Maximum row limits
- Custom schema definitions
Loader
A flexible loader supporting:
- INSERT: Simple inserts with batch support
- UPDATE: Update existing rows by primary key
- DELETE: Delete rows by primary key
- UPSERT: ON CONFLICT handling for insert-or-update operations
Extractor - Server-Side Cursor
The from_pgsql_cursor extractor uses PostgreSQL's native server-side cursors via DECLARE CURSOR + FETCH.
This is the only way to achieve true low-memory streaming with PHP's ext-pgsql, as the extension
has no unbuffered query mode.
Note: This extractor automatically manages transactions. Cursors require a transaction context, which is auto-started if not already in one.
Basic Usage
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};
$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));
data_frame()
->read(from_pgsql_cursor(
$client,
"SELECT id, name, email FROM users",
fetchSize: 1000
))
->write(to_output())
->run();
With Query Builder
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
use function Flow\PostgreSql\DSL\{col, select, star, table};
data_frame()
->read(from_pgsql_cursor(
$client,
select(star())->from(table('large_table')),
fetchSize: 500
))
->write(to_output())
->run();
With Parameters
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
data_frame()
->read(from_pgsql_cursor(
$client,
"SELECT * FROM orders WHERE status = $1 AND created_at > $2",
parameters: ['pending', '2024-01-01'],
fetchSize: 1000
))
->write(to_output())
->run();
When to Use Each Extractor
| Extractor | Best For | Memory | ORDER BY Required |
|---|---|---|---|
from_pgsql_cursor |
Very large datasets, true streaming | Lowest (server-side) | No |
from_pgsql_key_set |
Large datasets with indexed keys | Medium (page buffered) | Auto-generated |
from_pgsql_limit_offset |
Small-medium datasets | Medium (page buffered) | Yes |
Extractor - LIMIT/OFFSET Pagination
The from_pgsql_limit_offset extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have
performance degradation on very large datasets with high offsets.
Basic Usage
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};
$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));
data_frame()
->read(from_pgsql_limit_offset(
$client,
"SELECT id, name, email FROM users ORDER BY id",
pageSize: 1000
))
->write(to_output())
->run();
With Query Builder
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
use function Flow\PostgreSql\DSL\{asc, col, select, table};
data_frame()
->read(from_pgsql_limit_offset(
$client,
select(col('id'), col('name'), col('email'))
->from(table('users'))
->orderBy(asc(col('id'))),
pageSize: 500
))
->write(to_output())
->run();
With Maximum Row Limit
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_limit_offset;
data_frame()
->read(from_pgsql_limit_offset(
$client,
"SELECT * FROM large_table ORDER BY id",
pageSize: 1000,
maximum: 10000 // Only extract first 10,000 rows
))
->write(to_output())
->run();
Extractor - Keyset (Cursor) Pagination
The from_pgsql_key_set extractor uses keyset pagination (also known as cursor-based pagination). This provides
consistent performance regardless of how deep you paginate, making it ideal for large datasets.
Note: The ORDER BY clause is automatically generated from the keyset configuration. You only need to define the sort order once using
pgsql_pagination_key_asc()orpgsql_pagination_key_desc().
Basic Usage
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_set};
data_frame()
->read(from_pgsql_key_set(
$client,
"SELECT id, name, email FROM users",
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
pageSize: 1000
))
->write(to_output())
->run();
Descending Order
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_desc, pgsql_pagination_key_set};
data_frame()
->read(from_pgsql_key_set(
$client,
"SELECT id, name, created_at FROM orders",
pgsql_pagination_key_set(pgsql_pagination_key_desc('id')), // Newest first
pageSize: 500
))
->write(to_output())
->run();
Composite Keys
For tables with composite ordering, you can specify multiple keys:
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_desc, pgsql_pagination_key_set};
data_frame()
->read(from_pgsql_key_set(
$client,
"SELECT * FROM events",
pgsql_pagination_key_set(
pgsql_pagination_key_desc('created_at'), // First by date descending
pgsql_pagination_key_asc('id') // Then by ID ascending
),
pageSize: 1000
))
->write(to_output())
->run();
With Query Builder
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, pgsql_pagination_key_asc, pgsql_pagination_key_set};
use function Flow\PostgreSql\DSL\{col, select, star, table};
data_frame()
->read(from_pgsql_key_set(
$client,
select(star())->from(table('products')),
pgsql_pagination_key_set(pgsql_pagination_key_asc('product_id')),
pageSize: 500
))
->write(to_output())
->run();
DSL Functions Reference
Extractor Functions
| Function | Description |
|---|---|
from_pgsql_cursor($client, $query, $parameters, $fetchSize, $max) |
Extract using server-side cursor |
from_pgsql_limit_offset($client, $query, $pageSize, $maximum) |
Extract using LIMIT/OFFSET pagination |
from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum) |
Extract using keyset pagination |
Key Functions
| Function | Description |
|---|---|
pgsql_pagination_key_asc($column) |
Create an ascending key for keyset pagination |
pgsql_pagination_key_desc($column) |
Create a descending key for keyset pagination |
pgsql_pagination_key_set(...$keys) |
Create a keyset from one or more keys |
Loader
The to_pgsql_table loader writes data to PostgreSQL tables. It supports INSERT, UPDATE, and DELETE operations with
configurable conflict handling.
Basic Insert
use function Flow\ETL\Adapter\PostgreSql\to_pgsql_table;
use function Flow\ETL\DSL\{df, from_array};
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};
$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));
df()
->read(from_array([
['id' => 1, 'name' => 'Alice', 'email' => '[email protected]'],
['id' => 2, 'name' => 'Bob', 'email' => '[email protected]'],
]))
->write(to_pgsql_table($client, 'users'))
->run();
Insert with Skip Conflicts (ON CONFLICT DO NOTHING)
Skip rows that would cause a constraint violation:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
df()
->read(from_array($data))
->write(
to_pgsql_table($client, 'users')
->withInsertOptions(pgsql_insert_options(skipConflicts: true))
)
->run();
Upsert (ON CONFLICT DO UPDATE)
Update existing rows on conflict using specific columns:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
df()
->read(from_array($data))
->write(
to_pgsql_table($client, 'users')
->withInsertOptions(pgsql_insert_options(
conflictColumns: ['email'], // Detect conflicts on these columns
updateColumns: ['name', 'updated_at'] // Update these columns on conflict
))
)
->run();
Upsert on Constraint
Use a named constraint for conflict detection:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
df()
->read(from_array($data))
->write(
to_pgsql_table($client, 'users')
->withInsertOptions(pgsql_insert_options(
conflictConstraint: 'users_email_key',
updateColumns: ['name']
))
)
->run();
Update Existing Rows
Update rows matching primary key values:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_update_options, to_pgsql_table};
use Flow\ETL\Adapter\PostgreSql\Operation;
df()
->read(from_array([
['id' => 1, 'name' => 'Alice Updated', 'email' => '[email protected]'],
['id' => 2, 'name' => 'Bob Updated', 'email' => '[email protected]'],
]))
->write(
to_pgsql_table($client, 'users')
->withOperation(Operation::UPDATE)
->withUpdateOptions(pgsql_update_options(['id'])) // Match on 'id' column
)
->run();
Delete Rows
Delete rows matching primary key values:
use function Flow\ETL\Adapter\PostgreSql\{pgsql_delete_options, to_pgsql_table};
use Flow\ETL\Adapter\PostgreSql\Operation;
df()
->read(from_array([
['id' => 1],
['id' => 3],
]))
->write(
to_pgsql_table($client, 'users')
->withOperation(Operation::DELETE)
->withDeleteOptions(pgsql_delete_options(['id']))
)
->run();
Loader DSL Functions Reference
| Function | Description |
|---|---|
to_pgsql_table($client, $table) |
Create a PostgreSQL loader for a table |
pgsql_insert_options(...) |
Configure insert behavior (conflicts, upsert) |
pgsql_update_options($primaryKeys) |
Configure update behavior (primary key columns) |
pgsql_delete_options($primaryKeys) |
Configure delete behavior (primary key columns) |
Schema Conversion
Two helpers convert between a Flow Schema and a PostgreSQL table definition:
to_pgsql_schema_table()turns a FlowSchemainto aFlow\PostgreSql\Schema\Table, which can emitCREATE TABLE(and related index/constraint) SQL viatoSql().pgsql_table_to_flow_schema()turns aFlow\PostgreSql\Schema\Tableback into a FlowSchema.
Column types are resolved through the shared EntryTypesMap (Flow type → PostgreSQL column type), and per-column
details — primary keys, unique constraints, indexes, length, precision/scale, defaults, identity, generated columns,
and explicit type overrides — are driven by PostgreSqlMetadata entries attached to each schema definition.
Creating a Table from a Flow Schema
to_pgsql_schema_table() returns a table definition; call toSql() on it and execute each statement to create the
table:
use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;
use function Flow\ETL\Adapter\PostgreSql\to_pgsql_schema_table;
use function Flow\ETL\DSL\{bool_schema, int_schema, json_schema, schema, str_schema};
$table = to_pgsql_schema_table(
schema(
int_schema('id', metadata: PostgreSqlMetadata::primaryKey('pk_users')),
str_schema('name', metadata: PostgreSqlMetadata::length(120)),
str_schema('email', metadata: PostgreSqlMetadata::indexUnique('uq_users_email')),
bool_schema('active', metadata: PostgreSqlMetadata::default(true)),
json_schema('payload'),
),
'users',
);
foreach ($table->toSql() as $sql) {
$client->execute($sql);
}
By default the table is created in the public schema; pass a third argument to target another one:
$table = to_pgsql_schema_table($schema, 'users', 'analytics');
Steering the Conversion with Metadata
PostgreSqlMetadata factories return Metadata objects you attach to a definition via the metadata: argument of the
schema DSL helpers. Combine multiple entries with merge():
use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;
use Flow\PostgreSql\Schema\IdentityGeneration;
use function Flow\ETL\DSL\{float_schema, int_schema, schema, str_schema};
$schema = schema(
int_schema('id', metadata: PostgreSqlMetadata::identity(IdentityGeneration::BY_DEFAULT)),
str_schema('sku', metadata: PostgreSqlMetadata::type('citext')), // explicit type override
float_schema('amount', metadata: PostgreSqlMetadata::precision(10)->merge(PostgreSqlMetadata::scale(2))),
int_schema('total', metadata: PostgreSqlMetadata::generated('price * quantity')),
);
| Metadata | Effect on the generated column |
|---|---|
PostgreSqlMetadata::type($name) |
Force a specific PostgreSQL type, bypassing the type map |
PostgreSqlMetadata::length($n) |
Emit varchar($n) |
PostgreSqlMetadata::precision($p) / ::scale($s) |
Emit numeric($p, $s) |
PostgreSqlMetadata::default($v) |
Set a column DEFAULT |
PostgreSqlMetadata::primaryKey($name) |
Include the column in the table primary key |
PostgreSqlMetadata::indexUnique($name) |
Include the column in a named UNIQUE constraint |
PostgreSqlMetadata::index($name) |
Include the column in a named index |
PostgreSqlMetadata::identity($generation) |
Make the column an identity column |
PostgreSqlMetadata::generated($expr) |
Make the column a generated column |
Columns sharing the same primary key, unique constraint, or index name are grouped together, so composite keys are expressed by attaching the same name to several definitions.
Reading a Flow Schema back from a Table
pgsql_table_to_flow_schema() takes a Flow\PostgreSql\Schema\Table and returns a Flow Schema. Combine it with the
PostgreSQL library's catalog provider to derive a Flow schema from a live table:
use function Flow\ETL\Adapter\PostgreSql\pgsql_table_to_flow_schema;
use function Flow\PostgreSql\DSL\client_catalog_provider;
$table = client_catalog_provider($client, ['public'])
->get()
->get('public')
->table('users');
$schema = pgsql_table_to_flow_schema($table);
Note: The reverse conversion is intentionally lossy. Several Flow types collapse onto the same PostgreSQL type (for example
json,list,map, andstructureall map tojsonb), so a column is mapped back to a single canonical Flow type rather than its original one.
Customizing the Type Mapping
Both helpers accept an optional EntryTypesMap. Its second constructor argument overrides the Flow type → PostgreSQL
column type mapping (the first argument keeps overriding the value-binding types used by the loader):
use Flow\ETL\Adapter\PostgreSql\EntryTypesMap;
use Flow\PostgreSql\QueryBuilder\Schema\ColumnType;
use Flow\Types\Type\Native\StringType;
use function Flow\ETL\Adapter\PostgreSql\to_pgsql_schema_table;
$table = to_pgsql_schema_table(
$schema,
'users',
typesMap: new EntryTypesMap([], [
StringType::class => ColumnType::varchar(255), // default strings to varchar(255) instead of text
]),
);
Schema Conversion DSL Functions Reference
| Function | Description |
|---|---|
to_pgsql_schema_table($schema, $tableName, $databaseSchema, $typesMap) |
Convert a Flow Schema into a PostgreSQL Table |
pgsql_table_to_flow_schema($table, $typesMap) |
Convert a PostgreSQL Table into a Flow Schema |
Found a typo or an outdated section? Edit this page on GitHub