flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ^0.10.0

Changelog

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.

Building Custom Data Extractor - Flow PHP


Our goal is to extract a list of Account Summaries from Google Analytics API.

The very first step is to get familiar with the data structure of the dataset we are going to extract.
So let's take a look at Account Summaries API documentation.

Account Summary

{
  "name": string,
  "account": string,
  "displayName": string,
  "propertySummaries": [
    {
      object (PropertySummary)
    }
  ]
}

Property Summary

{
  "property": string,
  "displayName": string,
  "propertyType": enum (PropertyType), // integer
  "parent": string
}

Ok, not too bad, we have one list of structures (Property Summary is a structure) and few other not nullable fields.
This should give us the following schema:

schema
|-- account: string
|-- name: string
|-- displayName: string
|-- propertySummaries: list<structure{property: string, displayName: string, propertyType: integer, parent: string}>

Now that we have our schema, we can start building our custom data extractor.
Our next step is to figure out if there is any existing SDK that we can use to interact with Google Analytics API.
In this case, we are going to use Google Analytics Admin SDK.

Sometimes we might not be able to find any SDK that fits our needs, in such cases we would have to build our own HTTP client and handle the API requests manually.

Let's start by preparing our Extractor class. We will call it AccountSummariesExtractor.
First we need to make sure that our extractor is implementing the Flow\ETL\Extractor interface.
Technically speaking, it's all we need but to make our extractor to be compatible with Flow DataFrame::limit() function, we should additionally implement Flow\ETL\Extractor\LimitableExtractor interface.

Most of the Flow\ETL\Extractor\LimitableExtractor logic is reusable so to avoid code duplications we are going to use Flow\ETL\Extractor\Limitable trait in our extractor.

<?php

namespace Flow\ETL\Adapter\GoogleAnalytics;

use Flow\ETL\FlowContext;
use Google\Analytics\Admin\V1beta\AccountSummary;
use Google\Analytics\Admin\V1beta\AnalyticsAdminServiceClient;
use Google\Analytics\Admin\V1beta\PropertySummary;
use Flow\ETL\Extractor;
use Flow\ETL\Extractor\Limitable;
use Flow\ETL\Extractor\LimitableExtractor;

final class AccountSummariesExtractor implements Extractor, LimitableExtractor
{
    use Limitable;

    public function __construct(
        private readonly AnalyticsAdminServiceClient $client,
        private readonly int $pageSize = 200
    ) {
        if ($this->pageSize < 1 || $this->pageSize > 200) {
            throw new \Flow\ETL\Exception\InvalidArgumentException('Page size must be greater than 0 and lower than 200.');
        }
    }

    public function extract(FlowContext $context): \Generator
    {
        // TODO
    }
}

Our extractor boilerplate is ready, let's try to implement the extract() method logic.
FlowContext $context is a container for all parameters/services that are shared between all the stages of the ETL process.
We can use it to access things like EntryFactory or Config however we are not going to use it in this example.

Let's start by fetching the list of Account Summaries from Google Analytics API, iterating over the list and yield each Account Summary.

<?php

namespace Flow\ETL\Adapter\GoogleAnalytics;

use Flow\ETL\Extractor\Signal;
use Flow\ETL\FlowContext;
use Google\Analytics\Admin\V1beta\AccountSummary;
use Google\Analytics\Admin\V1beta\AnalyticsAdminServiceClient;
use Google\Analytics\Admin\V1beta\PropertySummary;
use Flow\ETL\Extractor;
use Flow\ETL\Extractor\Limitable;
use Flow\ETL\Extractor\LimitableExtractor;
use function Flow\ETL\DSL\rows;

final class AccountSummariesExtractor implements Extractor, LimitableExtractor
{
    // code from previous snippet

    public function extract(FlowContext $context): \Generator
    {
        $list = $this->client->listAccountSummaries(['pageSize' => $this->pageSize]);

        /** @var AccountSummary $account */
        foreach ($list->iterateAllElements() as $accountSummary) {
            $signal = yield rows(ga_account_summary_to_row($accountSummary));
            $this->countRow();

            if ($signal === Signal::STOP || $this->reachedLimit()) {
                return;
            }
        }

        // TODO: Implement pagination
    }
}

ga_account_summary_to_row function is responsible for transforming the Account Summary structure into a row that fits our schema.
It could be a private method of our Extractor, however, extracting it to separated function will make our code cleaner and easier to test/use later.

<?php

namespace Flow\ETL\Adapter\GoogleAnalytics;

use Flow\ETL\Row;
use Google\Analytics\Admin\V1beta\AccountSummary;
use Google\Analytics\Admin\V1beta\AnalyticsAdminServiceClient;
use Google\Analytics\Admin\V1beta\PropertySummary;
use function Flow\ETL\DSL\{list_entry, row, str_entry, structure_element, type_integer, type_list, type_string, type_structure};

function ga_account_summary_to_row(AccountSummary $accountSummary) : Row
{
    return row(
        str_entry('account', $accountSummary->getAccount()),
        str_entry('name', $accountSummary->getName()),
        str_entry('displayName', $accountSummary->getDisplayName()),
        list_entry(
            'propertySummaries',
            array_map(
                static fn(PropertySummary $propertySummary) => [
                    'property' => $propertySummary->getProperty(),
                    'displayName' => $propertySummary->getDisplayName(),
                    'propertyType' => $propertySummary->getPropertyType(),
                    'parent' => $propertySummary->getParent(),
                ],
                \iterator_to_array($accountSummary->getPropertySummaries())
            ),
            type_list(
                type_structure(
                    [
                        structure_element('property', type_string()),
                        structure_element('displayName', type_string()),
                        structure_element('propertyType', type_integer()),
                        structure_element('parent', type_string()),
                    ]
                )
            ),
        )
    );
}

Our final step would be to implement pagination logic, since Google Analytics API returns only up to 200 Account Summaries per page.
Typically, we are not going to have more than 200 Account Summaries, but it's always good to be prepared for such cases.

<?php

namespace Flow\ETL\Adapter\GoogleAnalytics;

use Flow\ETL\Extractor\Signal;
use Flow\ETL\FlowContext;
use Google\Analytics\Admin\V1beta\AccountSummary;
use Google\Analytics\Admin\V1beta\AnalyticsAdminServiceClient;
use Google\Analytics\Admin\V1beta\PropertySummary;
use Flow\ETL\Extractor;
use Flow\ETL\Extractor\Limitable;
use Flow\ETL\Extractor\LimitableExtractor;
use function Flow\ETL\DSL\rows;

final class AccountSummariesExtractor implements Extractor, LimitableExtractor
{
    // code from previous snippet

    public function extract(FlowContext $context): \Generator
    {
        $list = $this->client->listAccountSummaries(['pageSize' => $this->pageSize]);

        // code from previous snippet

        while ($list->getPage()->hasNextPage()) {
            $list = $this->client->listAccountSummaries(['pageSize' => $this->pageSize, 'pageToken' => $list->getPage()->getNextPageToken()]);

            foreach ($list->iterateAllElements() as $accountSummary) {
                $signal = yield rows(ga_account_summary_to_row($accountSummary));
                $this->countRow();

                if ($signal === Signal::STOP || $this->reachedLimit()) {
                    return;
                }
            }
        }
    }
}

That's it! Our custom data extractor is ready to be used.
We can now use it in our ETL process to extract Account Summaries from Google Analytics API.

<?php

use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\to_output;
use Flow\ETL\Adapter\GoogleAnalytics\AccountSummariesExtractor;

// $client = new AnalyticsAdminServiceClient([
//     'credentials' => $credentials
// ]);

df()
    ->read(new AccountSummariesExtractor($client))
    ->limit(2)
    ->collect()
    ->write(to_output())
    ->run();

// Output
// +--------------------+----------------------+--------------+----------------------+
// |            account |                 name |  displayName |    propertySummaries |
// +--------------------+----------------------+--------------+----------------------+
// | accounts/111111111 | accountSummaries/111 | norbert.tech | [{"property":"proper |
// | accounts/222222222 | accountSummaries/222 |     aeon-php | [{"property":"proper |
// +--------------------+----------------------+--------------+----------------------+

We are almost done, at this point we can fetch all Account Summaries from Google Analytics API and transform them into a DataFrame.
Our final but also optional step would be to prepare a dsl function returning our extractor that is going to improve the readability of our ETL data processing pipeline.

<?php

use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\to_output;
use function Flow\ETL\Adapter\GoogleAnalytics\from_ga_account_summaries;

df()
    ->read(from_ga_account_summaries($client))
    ->limit(2)
    ->collect()
    ->write(to_output())
    ->run();

Contributors

Join us on GitHub external resource
scroll back to top