flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ^0.10.0

Changelog

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.


Definition


/**
 * Extractor will automatically try to iterate over whole index using one of the two iteration methods:.
 *
 * - from/size
 * - search_after
 *
 * Search after is selected when you provide define sort parameters in query, otherwise it will fallback to from/size.
 *
 * @param array{
 *  hosts?: array<string>,
 *  connectionParams?: array<mixed>,
 *  retries?: int,
 *  sniffOnStart?: boolean,
 *  sslCert?: array<string>,
 *  sslKey?: array<string>,
 *  sslVerification?: boolean|string,
 *  elasticMetaHeader?: boolean,
 *  includePortInHostHeader?: boolean
 * } $config
 * @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-search.html
 * @param ?array<mixed> $pit_params - when used extractor will create point in time to stabilize search results. Point in time is automatically closed when last element is extracted. https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html - @deprecated use withPointInTime method instead
 */
from_es(array $config, array $parameters, ?array $pit_params) : ElasticsearchExtractor

Usage Examples


Example: Data reading - Elasticsearch

<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Elasticsearch\{entry_id_factory, es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{data_frame, from_array, to_stream};
use Symfony\Component\Dotenv\Dotenv;

require __DIR__ . '/vendor/autoload.php';

if (!\file_exists(__DIR__ . '/.env')) {
    print 'Example skipped. Please create .env file with Azure Storage Account credentials.' . PHP_EOL;

    return;
}

$dotenv = new Dotenv();
$dotenv->load(__DIR__ . '/.env');

data_frame()
    ->read(from_array([
        ['id' => 1, 'text' => 'lorem ipsum'],
        ['id' => 2, 'text' => 'lorem ipsum'],
        ['id' => 3, 'text' => 'lorem ipsum'],
        ['id' => 4, 'text' => 'lorem ipsum'],
        ['id' => 5, 'text' => 'lorem ipsum'],
        ['id' => 6, 'text' => 'lorem ipsum'],
    ]))
    ->write(
        to_es_bulk_index(
            [
                'hosts' => [$_ENV['ELASTICSEARCH_URL']],
            ],
            $index = 'test_index',
            entry_id_factory('id')
        )
    )
    ->run();

data_frame()
    ->read(from_es(
        [
            'hosts' => [$_ENV['ELASTICSEARCH_URL']],
        ],
        [
            'index' => $index,
            'body' => [
                'query' => [
                    'match_all' => ['boost' => 1.0],
                ],
            ],
        ]
    ))
    ->write(to_stream(__DIR__ . '/output.raw.txt', truncate: false))
    ->with(es_hits_to_rows())
    ->write(to_stream(__DIR__ . '/output.txt', truncate: false))
    ->run();

Contributors

Join us on GitHub external resource
scroll back to top