flow php

Example: Elasticsearch

Topic: Data reading

Code

<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Elasticsearch\{entry_id_factory, es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{data_frame, from_array, to_stream};
use Symfony\Component\Dotenv\Dotenv;

require __DIR__ . '/../../../autoload.php';

if (!\file_exists(__DIR__ . '/.env')) {
    print 'Example skipped. Please create .env file with Azure Storage Account credentials.' . PHP_EOL;

    return;
}

$dotenv = new Dotenv();
$dotenv->load(__DIR__ . '/.env');

data_frame()
    ->read(from_array([
        ['id' => 1, 'text' => 'lorem ipsum'],
        ['id' => 2, 'text' => 'lorem ipsum'],
        ['id' => 3, 'text' => 'lorem ipsum'],
        ['id' => 4, 'text' => 'lorem ipsum'],
        ['id' => 5, 'text' => 'lorem ipsum'],
        ['id' => 6, 'text' => 'lorem ipsum'],
    ]))
    ->write(
        to_es_bulk_index(
            [
                'hosts' => [$_ENV['ELASTICSEARCH_URL']],
            ],
            $index = 'test_index',
            entry_id_factory('id')
        )
    )
    ->run();

data_frame()
    ->read(from_es(
        [
            'hosts' => [$_ENV['ELASTICSEARCH_URL']],
        ],
        [
            'index' => $index,
            'body' => [
                'query' => [
                    'match_all' => ['boost' => 1.0],
                ],
            ],
        ]
    ))
    ->write(to_stream(__DIR__ . '/output.raw.txt', truncate: false))
    ->transform(es_hits_to_rows())
    ->write(to_stream(__DIR__ . '/output.txt', truncate: false))
    ->run();

Output

+----+-------------+
| id |        text |
+----+-------------+
|  1 | lorem ipsum |
|  2 | lorem ipsum |
|  3 | lorem ipsum |
|  4 | lorem ipsum |
|  5 | lorem ipsum |
|  6 | lorem ipsum |
+----+-------------+
6 rows

+------+-----------+---------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| took | timed_out |                                           _shards |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 hits |
+------+-----------+---------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|    0 |     false | {"total":1,"successful":1,"skipped":0,"failed":0} | {"total":{"value":6,"relation":"eq"},"max_score":1,"hits":[{"_index":"test_index","_type":"_doc","_id":"1","_score":1,"_source":{"id":1,"text":"lorem ipsum"}},{"_index":"test_index","_type":"_doc","_id":"2","_score":1,"_source":{"id":2,"text":"lorem ipsum"}},{"_index":"test_index","_type":"_doc","_id":"3","_score":1,"_source":{"id":3,"text":"lorem ipsum"}},{"_index":"test_index","_type":"_doc","_id":"4","_score":1,"_source":{"id":4,"text":"lorem ipsum"}},{"_index":"test_index","_type":"_doc","_id":"5","_score":1,"_source":{"id":5,"text":"lorem ipsum"}},{"_index":"test_index","_type":"_doc","_id":"6","_score":1,"_source":{"id":6,"text":"lorem ipsum"}}]} |
+------+-----------+---------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 rows

Contributors

Join us on GitHub external resource
scroll back to top