flow php

UNIFIED DATA PROCESSING FRAMEWORK

composer require flow-php/etl ~0.33.0

ChangelogRelease Cycle

play Try Playground

elephant
extract

Extracts

Read from various data sources.

arrow
transform

Transforms

Shape and optimize for your needs.

arrow
load

Loads

Store and secure in one of many available data sinks.


Definition


/**
 * https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html.
 *
 * In order to control the size of the single request, use DataFrame::chunkSize() method just before calling DataFrame::load().
 *
 * @param array{
 *  hosts?: array<string>,
 *  connectionParams?: array<mixed>,
 *  retries?: int,
 *  sniffOnStart?: bool,
 *  sslCert?: array<string>,
 *  sslKey?: array<string>,
 *  sslVerification?: bool|string,
 *  elasticMetaHeader?: bool,
 *  includePortInHostHeader?: bool
 * } $config
 * @param string $index
 * @param IdFactory $id_factory
 * @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html - @deprecated use withParameters method instead
 */
to_es_bulk_index(array $config, string $index, IdFactory $id_factory, array $parameters) : ElasticsearchLoader

Usage Examples


Example: Data frame - Data writing

<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Elasticsearch\{entry_id_factory, es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{data_frame, from_array, to_output};
use Symfony\Component\Dotenv\Dotenv;
use function Flow\Filesystem\DSL\fstab;
use function Flow\Filesystem\DSL\path;
use function Flow\Filesystem\DSL\protocol;

require __DIR__ . '/vendor/autoload.php';

$fs = fstab()->for(protocol('file'));

if ($fs->status(path(__DIR__ . '/.env')) === null) {
    print 'Example skipped. Please create .env file with Azure Storage Account credentials.' . PHP_EOL;

    return;
}

$dotenv = new Dotenv();
$dotenv->load(__DIR__ . '/.env');

$elasticsearchUrl = $_ENV['ELASTICSEARCH_URL'];

if (!\is_string($elasticsearchUrl)) {
    print 'Example skipped. ELASTICSEARCH_URL must be a string.' . PHP_EOL;

    return;
}

data_frame()
    ->read(from_array([
        ['id' => 1, 'text' => 'lorem ipsum'],
        ['id' => 2, 'text' => 'lorem ipsum'],
        ['id' => 3, 'text' => 'lorem ipsum'],
        ['id' => 4, 'text' => 'lorem ipsum'],
        ['id' => 5, 'text' => 'lorem ipsum'],
        ['id' => 6, 'text' => 'lorem ipsum'],
    ]))
    ->write(
        to_es_bulk_index(
            [
                'hosts' => [$elasticsearchUrl],
            ],
            $index = 'test_index',
            entry_id_factory('id')
        )
    )
    ->run();

data_frame()
    ->read(from_es(
        [
            'hosts' => [$elasticsearchUrl],
        ],
        [
            'index' => $index,
            'body' => [
                'query' => [
                    'match_all' => ['boost' => 1.0],
                ],
            ],
        ]
    ))
    ->write(to_output(truncate: false))
    ->with(es_hits_to_rows())
    ->write(to_output(truncate: false))
    ->run();

Contributors

Join us on GitHub external resource
scroll back to top