Writing to streams
Send events to streams using Worker bindings or HTTP endpoints for client-side applications and external systems.
Worker bindings provide a secure way to send data to streams from Workers without managing API tokens or credentials.
Add a pipeline binding to your Wrangler file that points to your stream:
{ "pipelines": [ { "pipeline": "<STREAM_ID>", "binding": "STREAM" } ]}[[pipelines]]pipeline = "<STREAM_ID>"binding = "STREAM"The pipeline binding exposes a method for sending data to your stream:
Sends an array of JSON-serializable records to the stream. Returns a Promise that resolves when records are confirmed as ingested.
export default { async fetch(request, env, ctx) { const events = await request.json();
await env.STREAM.send(events);
return new Response("Events sent"); },};export default { async fetch(request, env, ctx): Promise<Response> { const events = await request.json<Record<string, unknown>[]>();
await env.STREAM.send(events);
return new Response("Events sent"); },} satisfies ExportedHandler<Env>;When a stream has a defined schema, running wrangler types generates schema-specific TypeScript types for your pipeline bindings. Instead of the generic Pipeline<PipelineRecord>, your bindings get a named record type with full autocomplete and compile-time type checking. Refer to the wrangler types documentation to learn more.
After running wrangler types, the generated worker-configuration.d.ts file contains a named record type inside the Cloudflare namespace. The type name is derived from the stream name (not the binding name), converted to PascalCase with a Record suffix.
Below is an example of what generated types look like in worker-configuration.d.ts for a stream named ecommerce_stream:
declare namespace Cloudflare { type EcommerceStreamRecord = { user_id: string; event_type: string; product_id?: string; amount?: number; }; interface Env { STREAM: import("cloudflare:pipelines").Pipeline<Cloudflare.EcommerceStreamRecord>; }}wrangler types falls back to the generic Pipeline<PipelineRecord> type in the following scenarios:
- Not authenticated: Run
wrangler loginto enable typed pipeline bindings. - Stream not found: The stream ID in your Wrangler configuration does not match an existing stream.
- Unstructured stream: The stream was created without a schema.
Each stream provides an optional HTTP endpoint for ingesting data from external applications, browsers, or any system that can make HTTP requests.
HTTP endpoints follow this format:
https://{stream-id}.ingest.cloudflare.comFind your stream's endpoint URL in the Cloudflare dashboard under Pipelines > Streams or using the Wrangler CLI:
npx wrangler pipelines streams get <STREAM_ID>Send events as JSON arrays via POST requests:
curl -X POST https://{stream-id}.ingest.cloudflare.com \ -H "Content-Type: application/json" \ -d '[ { "user_id": "12345", "event_type": "purchase", "product_id": "widget-001", "amount": 29.99 } ]'When authentication is enabled for your stream, include the API token in the Authorization header:
curl -X POST https://{stream-id}.ingest.cloudflare.com \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_API_TOKEN" \ -d '[{"event": "test"}]'The API token must have Workers Pipeline Send permission. To learn more, refer to the Create API token documentation.
Streams handle validation differently based on their configuration:
- Structured streams: Events must match the defined schema fields and types.
- Unstructured streams: Accept any valid JSON structure. Data is stored in a single
valuecolumn.
For structured streams, ensure your events match the schema definition. Invalid events will be accepted but dropped, so validate your data before sending to avoid dropped events. When using Worker bindings, run wrangler types to generate typed pipeline bindings that catch schema violations at compile time. You can also query the user error metrics to monitor dropped events and diagnose schema validation issues.