Skip to content
Search
DSL · Elastic search

to_es_bulk_index

Definition

/**
 * https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html.
 *
 * In order to control the size of the single request, use DataFrame::chunkSize() method just before calling DataFrame::load().
 *
 * @param array{
 *  hosts?: array<string>,
 *  connectionParams?: array<mixed>,
 *  retries?: int,
 *  sniffOnStart?: bool,
 *  sslCert?: array<string>,
 *  sslKey?: array<string>,
 *  sslVerification?: bool|string,
 *  elasticMetaHeader?: bool,
 *  includePortInHostHeader?: bool
 * } $config
 * @param string $index
 * @param IdFactory $id_factory
 * @param array<mixed> $parameters - https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-bulk.html - @deprecated use withParameters method instead
 */
to_es_bulk_index(array $config, string $index, IdFactory $id_factory, array $parameters) : ElasticsearchLoader

Usage examples

<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\Elasticsearch\{entry_id_factory, es_hits_to_rows, from_es, to_es_bulk_index};
use function Flow\ETL\DSL\{data_frame, from_array, to_output};
use Symfony\Component\Dotenv\Dotenv;
use function Flow\Filesystem\DSL\fstab;
use function Flow\Filesystem\DSL\path;

require __DIR__ . '/vendor/autoload.php';

$fs = fstab()->for('file');

if ($fs->status(path(__DIR__ . '/.env')) === null) {
    print 'Example skipped. Please create .env file with ELASTICSEARCH_URL.' . PHP_EOL;

    return;
}

$dotenv = new Dotenv();
$dotenv->load(__DIR__ . '/.env');

$elasticsearchUrl = $_ENV['ELASTICSEARCH_URL'];

if (!\is_string($elasticsearchUrl)) {
    print 'Example skipped. ELASTICSEARCH_URL must be a string.' . PHP_EOL;

    return;
}

data_frame()
    ->read(from_array([
        ['id' => 1, 'text' => 'lorem ipsum'],
        ['id' => 2, 'text' => 'lorem ipsum'],
        ['id' => 3, 'text' => 'lorem ipsum'],
        ['id' => 4, 'text' => 'lorem ipsum'],
        ['id' => 5, 'text' => 'lorem ipsum'],
        ['id' => 6, 'text' => 'lorem ipsum'],
    ]))
    ->write(
        to_es_bulk_index(
            [
                'hosts' => [$elasticsearchUrl],
            ],
            $index = 'test_index',
            entry_id_factory('id')
        )
    )
    ->run();

data_frame()
    ->read(from_es(
        [
            'hosts' => [$elasticsearchUrl],
        ],
        [
            'index' => $index,
            'body' => [
                'query' => [
                    'match_all' => ['boost' => 1.0],
                ],
            ],
        ]
    ))
    ->write(to_output(truncate: false))
    ->with(es_hits_to_rows())
    ->write(to_output(truncate: false))
    ->run();

Contributors

Built in the open.

Join us on GitHub
scroll back to top