from_es
Definition
/**
* Extractor will automatically try to iterate over whole index using one of the two iteration methods:.
*
* - from/size
* - search_after
*
* Search after is selected when you provide define sort parameters in query, otherwise it will fallback to from/size.
*
* @param array{
* hosts?: array<string>,
* connectionParams?: array<mixed>,
* retries?: int,
* sniffOnStart?: bool,
* sslCert?: array<string>,
* sslKey?: array<string>,
* sslVerification?: bool|string,
* elasticMetaHeader?: bool,
* includePortInHostHeader?: bool
* } $config
* @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/search-search.html
* @param ?array<mixed> $pit_params - when used extractor will create point in time to stabilize search results. Point in time is automatically closed when last element is extracted. https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html - @deprecated use withPointInTime method instead
*/
from_es(array $config, array $parameters, ?array $pit_params) : ElasticsearchExtractor Usage examples
<?php
declare(strict_types=1);
use function Flow\ETL\Adapter\Elasticsearch\{entry_id_factory, es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{data_frame, from_array, to_output};
use Symfony\Component\Dotenv\Dotenv;
use function Flow\Filesystem\DSL\fstab;
use function Flow\Filesystem\DSL\path;
require __DIR__ . '/vendor/autoload.php';
$fs = fstab()->for('file');
if ($fs->status(path(__DIR__ . '/.env')) === null) {
print 'Example skipped. Please create .env file with ELASTICSEARCH_URL.' . PHP_EOL;
return;
}
$dotenv = new Dotenv();
$dotenv->load(__DIR__ . '/.env');
$elasticsearchUrl = $_ENV['ELASTICSEARCH_URL'];
if (!\is_string($elasticsearchUrl)) {
print 'Example skipped. ELASTICSEARCH_URL must be a string.' . PHP_EOL;
return;
}
data_frame()
->read(from_array([
['id' => 1, 'text' => 'lorem ipsum'],
['id' => 2, 'text' => 'lorem ipsum'],
['id' => 3, 'text' => 'lorem ipsum'],
['id' => 4, 'text' => 'lorem ipsum'],
['id' => 5, 'text' => 'lorem ipsum'],
['id' => 6, 'text' => 'lorem ipsum'],
]))
->write(
to_es_bulk_index(
[
'hosts' => [$elasticsearchUrl],
],
$index = 'test_index',
entry_id_factory('id')
)
)
->run();
data_frame()
->read(from_es(
[
'hosts' => [$elasticsearchUrl],
],
[
'index' => $index,
'body' => [
'query' => [
'match_all' => ['boost' => 1.0],
],
],
]
))
->write(to_output(truncate: false))
->with(es_hits_to_rows())
->write(to_output(truncate: false))
->run();