Skip to content
Search
DSL · Elastic search

from_es

Definition

/**
 * Extractor will automatically try to iterate over whole index using one of the two iteration methods:.
 *
 * - from/size
 * - search_after
 *
 * Search after is selected when you provide define sort parameters in query, otherwise it will fallback to from/size.
 *
 * @param array{
 *  hosts?: array<string>,
 *  connectionParams?: array<mixed>,
 *  retries?: int,
 *  sniffOnStart?: bool,
 *  sslCert?: array<string>,
 *  sslKey?: array<string>,
 *  sslVerification?: bool|string,
 *  elasticMetaHeader?: bool,
 *  includePortInHostHeader?: bool
 * } $config
 * @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-search.html
 * @param ?array<mixed> $pit_params - when used extractor will create point in time to stabilize search results. Point in time is automatically closed when last element is extracted. https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html - @deprecated use withPointInTime method instead
 */
from_es(array $config, array $parameters, ?array $pit_params) : ElasticsearchExtractor

Usage examples

<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Elasticsearch\{entry_id_factory, es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{data_frame, from_array, to_output};
use Symfony\Component\Dotenv\Dotenv;
use function Flow\Filesystem\DSL\fstab;
use function Flow\Filesystem\DSL\path;

require __DIR__ . '/vendor/autoload.php';

$fs = fstab()->for('file');

if ($fs->status(path(__DIR__ . '/.env')) === null) {
    print 'Example skipped. Please create .env file with ELASTICSEARCH_URL.' . PHP_EOL;

    return;
}

$dotenv = new Dotenv();
$dotenv->load(__DIR__ . '/.env');

$elasticsearchUrl = $_ENV['ELASTICSEARCH_URL'];

if (!\is_string($elasticsearchUrl)) {
    print 'Example skipped. ELASTICSEARCH_URL must be a string.' . PHP_EOL;

    return;
}

data_frame()
    ->read(from_array([
        ['id' => 1, 'text' => 'lorem ipsum'],
        ['id' => 2, 'text' => 'lorem ipsum'],
        ['id' => 3, 'text' => 'lorem ipsum'],
        ['id' => 4, 'text' => 'lorem ipsum'],
        ['id' => 5, 'text' => 'lorem ipsum'],
        ['id' => 6, 'text' => 'lorem ipsum'],
    ]))
    ->write(
        to_es_bulk_index(
            [
                'hosts' => [$elasticsearchUrl],
            ],
            $index = 'test_index',
            entry_id_factory('id')
        )
    )
    ->run();

data_frame()
    ->read(from_es(
        [
            'hosts' => [$elasticsearchUrl],
        ],
        [
            'index' => $index,
            'body' => [
                'query' => [
                    'match_all' => ['boost' => 1.0],
                ],
            ],
        ]
    ))
    ->write(to_output(truncate: false))
    ->with(es_hits_to_rows())
    ->write(to_output(truncate: false))
    ->run();

Contributors

Built in the open.

Join us on GitHub
scroll back to top