Unified, typed API
One DataFrame for CSV, JSON, XML, Parquet, Avro, REST, RDBMS, Elasticsearch and Meilisearch. Schemas inferred or declared — your call.
Strongly typed. Memory efficient. One DataFrame API for CSV, JSON, XML, Parquet, Avro, REST, RDBMS, Elasticsearch, and more.
Read from any data source with a single typed API.
Map, join, aggregate, window — composable and lazy.
Stream into any sink without blowing up memory.
Pick a topic and explore runnable snippets in the playground.
In pure PHP, every source and sink has its own functions, error model, and memory tradeoffs. With Flow, the pipeline shape stays constant — swap the source or sink and the rest of the code doesn't budge.
<?php
// ⚠ not memory-safe: $rows accumulates every record
$rows = [];
$file = fopen(__DIR__ . '/orders.csv', 'r');
if ($file === false) {
throw new \RuntimeException('Cannot open orders.csv');
}
$headers = fgetcsv($file);
if ($headers === false) {
fclose($file);
throw new \RuntimeException('Empty CSV');
}
while (($line = fgetcsv($file)) !== false) {
if (count($line) !== count($headers)) {
continue; // malformed row, skip
}
$row = array_combine($headers, $line);
// drop rows missing any required field
if ($row['id'] === '' || $row['amount'] === '' || $row['email'] === '') {
continue;
}
// CSV is all strings — cast every column by hand
$row['id'] = (int) $row['id'];
$row['amount'] = (float) $row['amount'];
$row['created_at'] = (new \DateTimeImmutable($row['created_at']))->format(DATE_ATOM);
$rows[] = $row;
}
fclose($file);
// ⚠ not memory-safe: serialises the entire dataset at once
$json = json_encode($rows, JSON_THROW_ON_ERROR | JSON_PRETTY_PRINT);
// ⚠ not memory-safe: writes the whole payload in one call
file_put_contents(__DIR__ . '/orders.json', $json);
<?php
use function Flow\ETL\DSL\{data_frame, ref};
use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\JSON\to_json;
data_frame()
->read(from_csv(__DIR__ . '/orders.csv'))
->autoCast()
->filter(ref('id')->isNotNull())
->filter(ref('amount')->isNotNull())
->filter(ref('email')->isNotNull())
->write(to_json(__DIR__ . '/orders.json'))
->run();
Swap to_json for
to_xlsx,
to_parquet, or
to_postgres — same shape, different destination.
Most pipelines die from inconsistent APIs, mistyped fields, and runaway memory. Flow gives you one cohesive toolset to read, shape, and ship data — without surprises.
One DataFrame for CSV, JSON, XML, Parquet, Avro, REST, RDBMS, Elasticsearch and Meilisearch. Schemas inferred or declared — your call.
Built on generators and iterators. Process gigabytes on small machines without chunking gymnastics or OOMs.
Joins, windows, aggregations, partitioning, filesystem abstractions, telemetry — real tools for real pipelines.
Drop-in replacement for PHP OpenTelemetry. Built-in tracing and metrics, fully compatible with the OTLP protocol.
Open the playground in your browser — no install required — and try Flow on your data.