Metrics and analytics
Pipelines expose metrics which allow you to measure data ingested, processed, and delivered to sinks.
The metrics displayed in the Cloudflare dashboard ↗ are queried from Cloudflare's GraphQL Analytics API. You can access the metrics programmatically via GraphQL or HTTP client.
Pipelines export the below metrics within the pipelinesOperatorAdaptiveGroups dataset. These metrics track data read and processed by pipeline operators.
| Metric | GraphQL Field Name | Description |
|---|---|---|
| Bytes In | bytesIn | Total number of bytes read by the pipeline (filter by streamId_neq: "" to get data read from streams) |
| Records In | recordsIn | Total number of records read by the pipeline (filter by streamId_neq: "" to get data read from streams) |
| Decode Errors | decodeErrors | Number of messages that could not be deserialized in the stream schema |
For a detailed breakdown of why events were dropped (including specific error types like missing_field, type_mismatch, parse_failure, and null_value), refer to User error metrics.
The pipelinesOperatorAdaptiveGroups dataset provides the following dimensions for filtering and grouping queries:
pipelineId- ID of the pipelinestreamId- ID of the source streamdatetime- Timestamp of the operationdate- Timestamp of the operation, truncated to the start of a daydatetimeHour- Timestamp of the operation, truncated to the start of an hour
Pipelines export the below metrics within the pipelinesSinkAdaptiveGroups dataset. These metrics track data delivery to sinks.
| Metric | GraphQL Field Name | Description |
|---|---|---|
| Bytes Written | bytesWritten | Total number of bytes written to the sink, after compression |
| Records Written | recordsWritten | Total number of records written to the sink |
| Files Written | filesWritten | Number of files written to the sink |
| Row Groups Written | rowGroupsWritten | Number of row groups written (for Parquet files) |
| Uncompressed Bytes Written | uncompressedBytesWritten | Total number of bytes written before compression |
The pipelinesSinkAdaptiveGroups dataset provides the following dimensions for filtering and grouping queries:
pipelineId- ID of the pipelinesinkId- ID of the destination sinkdatetime- Timestamp of the operationdate- Timestamp of the operation, truncated to the start of a daydatetimeHour- Timestamp of the operation, truncated to the start of an hour
Pipelines track events that are dropped during processing due to deserialization errors. When a structured stream receives events that do not match its defined schema, those events are accepted during ingestion but dropped during processing. The pipelinesUserErrorsAdaptiveGroups dataset provides visibility into these dropped events, telling you which events were dropped and why. You can explore the full schema of this dataset using GraphQL introspection.
| Metric | GraphQL Field Name | Description |
|---|---|---|
| Count | count | Number of events that failed validation |
The pipelinesUserErrorsAdaptiveGroups dataset provides the following dimensions for filtering and grouping queries:
pipelineId- ID of the pipelineerrorFamily- Category of the error (for example,deserialization)errorType- Specific error type within the familydate- Date of the error, truncated to start of daydatetime- Timestamp of the errordatetimeHour- Timestamp of the error, truncated to the start of an hourdatetimeMinute- Timestamp of the error, truncated to the start of a minute
| Error family | Error type | Description |
|---|---|---|
deserialization | missing_field | A required field defined in the stream schema was not present in the event |
deserialization | type_mismatch | A field value did not match the expected type in the schema (for example, string sent where number expected) |
deserialization | parse_failure | The event could not be parsed as valid JSON, or a field value could not be parsed into the expected type |
deserialization | null_value | A required field was present but had a null value |
Per-pipeline analytics are available in the Cloudflare dashboard. To view current and historical metrics for a pipeline:
- Log in to the Cloudflare dashboard ↗ and select your account.
- Go to Pipelines > Pipelines.
- Select a pipeline.
- Go to the Metrics tab to view its metrics or Errors tab to view dropped events.
You can optionally select a time window to query. This defaults to the last 24 hours.
You can programmatically query analytics for your pipelines via the GraphQL Analytics API. This API queries the same datasets as the Cloudflare dashboard and supports GraphQL introspection.
Pipelines GraphQL datasets require an accountTag filter with your Cloudflare account ID.
This query returns the total bytes and records read by a pipeline from streams, along with any decode errors.
query PipelineOperatorMetrics( $accountTag: String! $pipelineId: String! $datetimeStart: Time! $datetimeEnd: Time!) { viewer { accounts(filter: { accountTag: $accountTag }) { pipelinesOperatorAdaptiveGroups( limit: 10000 filter: { pipelineId: $pipelineId streamId_neq: "" datetime_geq: $datetimeStart datetime_leq: $datetimeEnd } ) { sum { bytesIn recordsIn decodeErrors } } } }}This query returns detailed metrics about data written to a specific sink, including file and compression statistics.
query PipelineSinkMetrics( $accountTag: String! $pipelineId: String! $sinkId: String! $datetimeStart: Time! $datetimeEnd: Time!) { viewer { accounts(filter: { accountTag: $accountTag }) { pipelinesSinkAdaptiveGroups( limit: 10000 filter: { pipelineId: $pipelineId sinkId: $sinkId datetime_geq: $datetimeStart datetime_leq: $datetimeEnd } ) { sum { bytesWritten recordsWritten filesWritten rowGroupsWritten uncompressedBytesWritten } } } }}This query returns a summary of events that were dropped due to schema validation failures, grouped by error type and ordered by frequency.
query GetPipelineUserErrors( $accountTag: String! $pipelineId: String! $datetimeStart: Time! $datetimeEnd: Time!) { viewer { accounts(filter: { accountTag: $accountTag }) { pipelinesUserErrorsAdaptiveGroups( limit: 100 filter: { pipelineId: $pipelineId datetime_geq: $datetimeStart datetime_leq: $datetimeEnd } orderBy: [count_DESC] ) { count dimensions { date errorFamily errorType } } } }}Example response:
{ "data": { "viewer": { "accounts": [ { "pipelinesUserErrorsAdaptiveGroups": [ { "count": 679, "dimensions": { "date": "2026-02-19", "errorFamily": "deserialization", "errorType": "missing_field" } }, { "count": 392, "dimensions": { "date": "2026-02-19", "errorFamily": "deserialization", "errorType": "type_mismatch" } }, { "count": 363, "dimensions": { "date": "2026-02-19", "errorFamily": "deserialization", "errorType": "parse_failure" } }, { "count": 44, "dimensions": { "date": "2026-02-19", "errorFamily": "deserialization", "errorType": "null_value" } } ] } ] } }, "errors": null}You can filter by a specific error type by adding errorType to the filter:
pipelinesUserErrorsAdaptiveGroups( limit: 100 filter: { pipelineId: $pipelineId datetime_geq: $datetimeStart datetime_leq: $datetimeEnd errorType: "type_mismatch" } orderBy: [count_DESC])To query errors across all pipelines on an account, omit the pipelineId filter and include pipelineId in the dimensions:
pipelinesUserErrorsAdaptiveGroups( limit: 100 filter: { datetime_geq: $datetimeStart datetime_leq: $datetimeEnd } orderBy: [count_DESC]) { count dimensions { pipelineId errorFamily errorType }}