flow php

Quick Start

At this point, you should have a working installation of Flow ETL. If you don't, please go back to the Installation section.

Let's take a look at a simple example of how to use Flow ETL to read a CSV file, transform it and write it to another CSV file.

<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\{from_csv, to_csv};
use function Flow\ETL\DSL\{data_frame, lit, ref, sum, to_output};
use Flow\ETL\Filesystem\SaveMode;

require __DIR__ . '/vendor/autoload.php';

data_frame()
    ->read(from_csv(__DIR__ . '/orders_flow.csv'))
    ->select('created_at', 'total_price', 'discount')
    ->withEntry('created_at', ref('created_at')->cast('date')->dateFormat('Y/m'))
    ->withEntry('revenue', ref('total_price')->minus(ref('discount')))
    ->select('created_at', 'revenue')
    ->groupBy('created_at')
    ->aggregate(sum(ref('revenue')))
    ->sortBy(ref('created_at')->desc())
    ->withEntry('daily_revenue', ref('revenue_sum')->round(lit(2))->numberFormat(lit(2)))
    ->drop('revenue_sum')
    ->write(to_output(truncate: false))
    ->withEntry('created_at', ref('created_at')->toDate('Y/m'))
    ->mode(SaveMode::Overwrite)
    ->write(to_csv(__DIR__ . '/daily_revenue.csv'))
    ->run();

Data Frame

The data_frame() function is the entry point to the Flow ETL DSL.

[!TIP] To maximize developer experience, Flow exposes a DSL (Domain Specific Language). Flow DSL is a set of functions that can be used to build a data processing pipeline. Entire project is written in Object-Oriented style, but DSL is a more convenient way to build a pipeline. Whenever possible, use DSL functions instead of creating objects directly.

It creates a new instance of the Flow\ETL\Flow class, which is the main class of the ETL.

Extraction

The first step in creating a data processing pipeline is to read the data from a data source. Extractors are responsible for reading data from a data source and converting it into a format that can be processed by Flow ETL. All extractors return \Generator and by design will throw rows one by one, this is to ensure that memory consumption is constant and low.

data_frame()
    ->read(from_csv(__DIR__ . '/orders_flow.csv'))

In this example we’re using the from_csv() function to create a new instance of the Flow\ETL\Adapter\CSV\CSVExtractor class.

All file-based extractors accept glob path patterns, allowing you to read multiple files at once.

data_frame()
    ->read(from_csv(__DIR__ . '/reports/*.csv'))

Transformation

Extractors by default are going to read all columns from the data source, you can use the select() function to select only the columns you need. Alternatively you can use the drop() function to drop columns you don't need.

    ->select('created_at', 'total_price', 'discount')

One of the most powerful features of Flow ETL is the ability to transform data using the withEntry() function.

    ->withEntry('created_at', ref('created_at')->cast('date')->dateFormat('Y/m'))
    ->withEntry('revenue', ref('total_price')->minus(ref('discount')))

withEntry() function accepts two arguments, the first one is the name of the new entry (column), the second one is the value of the new column. The value of the new column can be a literal value, a reference to an existing column or a function call.

  • ref('created_at') - creates a reference to the created_at column.
  • ...->cast('date') - casts column to a date type.
  • ...->dateFormat('Y/m') - format date using the Y/m format, as a result created_at becomes a string 2023/01.

You can find all available functions in the DSL.

[!TIP] DSL is nothing more than a set of functions that return instances of Flow PHP objects. You can always create objects directly, but DSL is a more convenient way to build a pipeline. All available ETL functions can be found in the Function namespace.

Loading

Loading, also writing to a data source, is the last step in the data processing pipeline. There can be more than one writer in the pipeline

    ->write(to_output(truncate: false))
    ->mode(SaveMode::Overwrite)
    ->write(to_csv(__DIR__ . '/daily_revenue.csv'))

In this example we’re first using the to_output() which just prints the data to the console as a simple ASCII table without truncating the output.

    ->mode(SaveMode::Overwrite)
    ->write(to_csv(__DIR__ . '/daily_revenue.csv'))

Second write is writing the data to a CSV file, we're using the mode() function to set the save mode to overwrite. There are three save modes available:

  • SaveMode::Append - If data sink already exists, data will be appended. This solution might cause data duplication since it's not check if given rows already existed.
  • SaveMode::ExceptionIfExists - If data sink already exists error will be thrown.
  • SaveMode::Ignore - If data sink already exists, writing will be skipped.
  • SaveMode::Overwrite - If data sink already exists, it will be removed and written again.

[!NOTE] Append mode is not really appending anything to existing files, instead it creates a folder in which it stores outputs under randomized file names. It can be later read using glob-pattern, for example from_csv('/path/to/folder/*.csv').

Lazy Execution

Flow ETL is using lazy execution, which means that the pipeline will not be executed until you call the run() function.

    ->run();

There are few more triggering functions, like fetch(), you can find which functions are @lazy or @trigger looking at the DataFrame source code.


Contributors

Join us on GitHub external resource
scroll back to top