flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ~0.33.0

ChangelogRelease Cycle

play Try Playground

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.


Definition


/**
 * Extractor will automatically try to iterate over whole index using one of the two iteration methods:.
 *
 * - from/size
 * - search_after
 *
 * Search after is selected when you provide define sort parameters in query, otherwise it will fallback to from/size.
 *
 * @param array{
 *  hosts?: array<string>,
 *  connectionParams?: array<mixed>,
 *  retries?: int,
 *  sniffOnStart?: bool,
 *  sslCert?: array<string>,
 *  sslKey?: array<string>,
 *  sslVerification?: bool|string,
 *  elasticMetaHeader?: bool,
 *  includePortInHostHeader?: bool
 * } $config
 * @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-search.html
 * @param ?array<mixed> $pit_params - when used extractor will create point in time to stabilize search results. Point in time is automatically closed when last element is extracted. https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html - @deprecated use withPointInTime method instead
 */
from_es(array $config, array $parameters, ?array $pit_params) : ElasticsearchExtractor

Usage Examples


Example: Data frame - Data reading

<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Elasticsearch\{entry_id_factory, es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{data_frame, from_array, to_output};
use Symfony\Component\Dotenv\Dotenv;
use function Flow\Filesystem\DSL\fstab;
use function Flow\Filesystem\DSL\path;
use function Flow\Filesystem\DSL\protocol;

require __DIR__ . '/vendor/autoload.php';

$fs = fstab()->for(protocol('file'));

if ($fs->status(path(__DIR__ . '/.env')) === null) {
    print 'Example skipped. Please create .env file with Azure Storage Account credentials.' . PHP_EOL;

    return;
}

$dotenv = new Dotenv();
$dotenv->load(__DIR__ . '/.env');

$elasticsearchUrl = $_ENV['ELASTICSEARCH_URL'];

if (!\is_string($elasticsearchUrl)) {
    print 'Example skipped. ELASTICSEARCH_URL must be a string.' . PHP_EOL;

    return;
}

data_frame()
    ->read(from_array([
        ['id' => 1, 'text' => 'lorem ipsum'],
        ['id' => 2, 'text' => 'lorem ipsum'],
        ['id' => 3, 'text' => 'lorem ipsum'],
        ['id' => 4, 'text' => 'lorem ipsum'],
        ['id' => 5, 'text' => 'lorem ipsum'],
        ['id' => 6, 'text' => 'lorem ipsum'],
    ]))
    ->write(
        to_es_bulk_index(
            [
                'hosts' => [$elasticsearchUrl],
            ],
            $index = 'test_index',
            entry_id_factory('id')
        )
    )
    ->run();

data_frame()
    ->read(from_es(
        [
            'hosts' => [$elasticsearchUrl],
        ],
        [
            'index' => $index,
            'body' => [
                'query' => [
                    'match_all' => ['boost' => 1.0],
                ],
            ],
        ]
    ))
    ->write(to_output(truncate: false))
    ->with(es_hits_to_rows())
    ->write(to_output(truncate: false))
    ->run();

Contributors

Join us on GitHub external resource
scroll back to top