flow php

Telemetry

Introduction

Telemetry integration in Flow PHP provides observability into your ETL pipelines through distributed tracing, metrics collection, and structured logging. Understanding what happens inside your data processing workflows is essential for:

  • Debugging - Identify bottlenecks and failures in complex pipelines
  • Monitoring - Track performance metrics in production environments
  • Optimization - Measure throughput and resource usage to improve efficiency
  • Compliance - Maintain audit trails of data processing operations

Flow PHP's telemetry is built on OpenTelemetry-compatible concepts, allowing integration with popular observability platforms like Jaeger, Grafana, Datadog, and others.

Quick Start

The simplest way to get started with telemetry is using console exporters for local development:

<?php

use Flow\Clock\SystemClock;
use function Flow\ETL\DSL\{config_builder, df, from_array, telemetry_options, to_output};
use function Flow\Telemetry\DSL\{
    batching_log_processor, batching_metric_processor, batching_span_processor,
    console_log_exporter, console_metric_exporter, console_span_exporter,
    logger_provider, memory_context_storage, meter_provider, resource, telemetry, tracer_provider
};

$clock = new SystemClock(new DateTimeZone('UTC'));
$contextStorage = memory_context_storage();

$telemetry = telemetry(
    resource(['service.name' => 'my-etl-pipeline']),
    tracer_provider(batching_span_processor(console_span_exporter()), $clock, $contextStorage),
    meter_provider(batching_metric_processor(console_metric_exporter()), $clock),
    logger_provider(batching_log_processor(console_log_exporter()), $clock, $contextStorage),
);
$telemetry->registerShutdownFunction();

df(config_builder()->withTelemetry($telemetry, telemetry_options(
    trace_loading: true,
    trace_transformations: true,
    collect_metrics: true,
)))
    ->read(from_array([
        ['id' => 1, 'name' => 'Alice'],
        ['id' => 2, 'name' => 'Bob'],
    ]))
    ->write(to_output())
    ->run();

Telemetry Options

Flow PHP provides granular control over what telemetry data is collected through telemetry_options():

Option Description Performance Impact
trace_loading Creates spans for each loader execution Low
trace_transformations Creates spans for each transformer execution Medium (many transformers = many spans)
collect_metrics Records counters and throughput metrics Low

Selective Configuration

// Only trace loaders (useful for debugging slow writes)
telemetry_options(trace_loading: true)

// Only collect metrics (minimal overhead)
telemetry_options(collect_metrics: true)

// Full observability
telemetry_options(
    trace_loading: true,
    trace_transformations: true,
    collect_metrics: true,
)

You can also use the fluent builder pattern:

telemetry_options()
    ->traceLoading(true)
    ->traceTransformations(true)
    ->collectMetrics(true)

Configuration

Basic Configuration

Enable telemetry by passing a Telemetry instance to the config builder:

use function Flow\ETL\DSL\{config_builder, telemetry_options};

$config = config_builder()
    ->withTelemetry($telemetry, telemetry_options(
        trace_loading: true,
        collect_metrics: true,
    ))
    ->build();

Console Exporters (Development)

Console exporters output telemetry data directly to stdout with ASCII table formatting. They're ideal for local development and debugging:

use function Flow\Telemetry\DSL\{
    console_span_exporter, console_metric_exporter, console_log_exporter
};

// Spans are displayed as tables with trace IDs, durations, and attributes
$spanExporter = console_span_exporter(colors: true);

// Metrics show counters and throughput values
$metricExporter = console_metric_exporter(colors: true);

// Logs are formatted with severity-based coloring
$logExporter = console_log_exporter(colors: true, maxBodyLength: 100);

OTLP Export (Production)

For production environments, send telemetry to an OTLP-compatible collector using the OTLP bridge:

use Flow\Clock\SystemClock;
use function Flow\ETL\DSL\{config_builder, telemetry_options};
use function Flow\Telemetry\DSL\{
    batching_log_processor, batching_metric_processor, batching_span_processor,
    logger_provider, memory_context_storage, meter_provider, resource, telemetry, tracer_provider
};
use function Flow\Bridge\Telemetry\OTLP\DSL\{
    otlp_curl_transport, otlp_json_serializer,
    otlp_log_exporter, otlp_metric_exporter, otlp_span_exporter
};

$clock = new SystemClock(new DateTimeZone('UTC'));
$contextStorage = memory_context_storage();

// Configure OTLP transport
$transport = otlp_curl_transport('http://localhost:4318', otlp_json_serializer());

$telemetry = telemetry(
    resource([
        'service.name' => 'my-etl-pipeline',
        'service.version' => '1.0.0',
        'service.namespace' => 'my-company',
    ]),
    tracer_provider(batching_span_processor(otlp_span_exporter($transport)), $clock, $contextStorage),
    meter_provider(batching_metric_processor(otlp_metric_exporter($transport)), $clock),
    logger_provider(batching_log_processor(otlp_log_exporter($transport)), $clock, $contextStorage),
);
$telemetry->registerShutdownFunction();

Transport Options

HTTP Transport (PSR-18)

Use with any PSR-18 compatible HTTP client:

use function Flow\Bridge\Telemetry\OTLP\DSL\{otlp_http_transport, otlp_json_serializer};

$transport = otlp_http_transport(
    client: $psr18Client,
    requestFactory: $psr17Factory,
    streamFactory: $psr17Factory,
    endpoint: 'http://localhost:4318',
    serializer: otlp_json_serializer(),
    headers: ['Authorization' => 'Bearer token'],
);

Curl Transport (Async)

Recommended for better performance - uses curl_multi for non-blocking I/O:

use function Flow\Bridge\Telemetry\OTLP\DSL\{otlp_curl_transport, otlp_curl_options, otlp_json_serializer};

$transport = otlp_curl_transport(
    endpoint: 'http://localhost:4318',
    serializer: otlp_json_serializer(),
    options: otlp_curl_options()
        ->withTimeout(60)
        ->withConnectTimeout(15)
        ->withHeader('Authorization', 'Bearer token')
        ->withCompression(),
);

gRPC Transport

For gRPC endpoints (requires ext-grpc):

use function Flow\Bridge\Telemetry\OTLP\DSL\{otlp_grpc_transport, otlp_protobuf_serializer};

$transport = otlp_grpc_transport(
    endpoint: 'localhost:4317',
    serializer: otlp_protobuf_serializer(),
);

Collected Data

Spans

Every DataFrame execution creates a root span with the following attributes:

Attribute Description
dataframe.id Unique identifier for the DataFrame execution
rows.total Total number of rows processed
rows.throughput.per_second Processing throughput
memory.min.mb Minimum memory consumption during execution
memory.max.mb Maximum memory consumption during execution

When trace_loading is enabled, child spans are created for each loader with:

  • Span name: Loader class name (e.g., Flow\ETL\Loader\StreamLoader)
  • Parent: DataFrame span
  • Status: OK on success, ERROR on failure

When trace_transformations is enabled, child spans are created for each transformer with:

  • Span name: Transformer class name (e.g., Flow\ETL\Transformer\EntryNameTransformer)
  • Parent: DataFrame span
  • Status: OK on success, ERROR on failure

Metrics

When collect_metrics is enabled:

Metric Type Description
rows.processed.total Counter Cumulative count of processed rows
rows.processed.throughput Throughput Rows processed per time unit

Logs

Structured logs are emitted at DEBUG level for pipeline events:

  • Pipeline start - Logged with configuration details (cache type, serializer, optimizers, filesystem mounts)
  • Pipeline completion - Logged with summary statistics (total rows, memory usage)
  • Errors - Logged at ERROR level with exception details

Processing and Batching

Telemetry data is processed through configurable processors:

Batching Processors (Recommended)

Collect telemetry in memory and export in batches for better performance:

use function Flow\Telemetry\DSL\{
    batching_span_processor, batching_metric_processor, batching_log_processor
};

// Export spans in batches of 512 (default)
batching_span_processor($exporter, batchSize: 512)

// Export metrics in batches of 512 (default)
batching_metric_processor($exporter, batchSize: 512)

// Export logs in batches of 512 (default)
batching_log_processor($exporter, batchSize: 512)

Pass-Through Processors (Debugging)

Export immediately for real-time visibility during debugging:

use function Flow\Telemetry\DSL\{
    pass_through_span_processor, pass_through_metric_processor, pass_through_log_processor
};

pass_through_span_processor($exporter)
pass_through_metric_processor($exporter)
pass_through_log_processor($exporter)

DSL Functions Reference

Core Telemetry

Function Description
telemetry() Create a Telemetry instance
resource() Create a Resource with attributes
telemetry_options() Configure telemetry options

Providers

Function Description
tracer_provider() Create a TracerProvider
meter_provider() Create a MeterProvider
logger_provider() Create a LoggerProvider

Processors

Function Description
batching_span_processor() Batch span processing
batching_metric_processor() Batch metric processing
batching_log_processor() Batch log processing
pass_through_span_processor() Immediate span export
pass_through_metric_processor() Immediate metric export
pass_through_log_processor() Immediate log export
severity_filtering_log_processor() Filter logs by severity

Exporters (Console)

Function Description
console_span_exporter() Export spans to console
console_metric_exporter() Export metrics to console
console_log_exporter() Export logs to console

Exporters (OTLP)

Function Description
otlp_span_exporter() Export spans via OTLP
otlp_metric_exporter() Export metrics via OTLP
otlp_log_exporter() Export logs via OTLP

Transport (OTLP)

Function Description
otlp_curl_transport() Async curl transport
otlp_http_transport() PSR-18 HTTP transport
otlp_grpc_transport() gRPC transport
otlp_json_serializer() JSON serialization
otlp_protobuf_serializer() Protobuf serialization

Testing

Function Description
memory_span_exporter() Store spans in memory
memory_metric_exporter() Store metrics in memory
memory_log_exporter() Store logs in memory
void_span_processor() No-op span processor
void_metric_processor() No-op metric processor
void_log_processor() No-op log processor

Best Practices

When to Enable Each Option

  • trace_loading: true - Enable when debugging write operations or when you need to identify slow loaders
  • trace_transformations: true - Enable temporarily when debugging transformation logic; disable in production for high-throughput pipelines
  • collect_metrics: true - Safe to enable in production; provides valuable throughput data with minimal overhead

Performance Considerations

  1. Use batching processors - Batching reduces I/O overhead significantly compared to immediate export
  2. Disable transformation tracing in production - Pipelines with many transformations create many spans
  3. Configure appropriate batch sizes - Larger batches are more efficient but use more memory
  4. Use curl transport - The async curl transport has better performance than PSR-18 HTTP clients
  5. Call registerShutdownFunction() - Ensures all pending telemetry is flushed on script termination

Sampling Strategies

For high-volume pipelines, consider sampling to reduce telemetry volume:

use Flow\Telemetry\Tracer\Sampler\AlwaysOnSampler;

// Always sample (default)
tracer_provider($processor, $clock, $contextStorage, new AlwaysOnSampler())

Resource Attributes

Include meaningful resource attributes to identify your service:

resource([
    'service.name' => 'order-processor',
    'service.version' => '2.1.0',
    'service.namespace' => 'ecommerce',
    'deployment.environment' => 'production',
])

Disabling Telemetry

Telemetry is disabled by default. If you've enabled it but want to disable it for certain environments, simply don't pass telemetry configuration:

// No telemetry - uses void providers internally
$config = config_builder()->build();

Or use void providers explicitly:

use function Flow\Telemetry\DSL\{void_span_processor, void_metric_processor, void_log_processor};

$telemetry = telemetry(
    resource(['service.name' => 'my-service']),
    tracer_provider(void_span_processor(), $clock, $contextStorage),
    meter_provider(void_metric_processor(), $clock),
    logger_provider(void_log_processor(), $clock, $contextStorage),
);

Contributors

Join us on GitHub external resource
scroll back to top