Skip to main content

Web Streams Basics

ReadableStream and WriteableStream are from Web Streams API. They represent a stream of data that can be read from or written to asynchronously.

@yume-chan/stream-extra package

@yume-chan/stream-extra package re-exports the native ReadableStream, WriteableStream, and TransformStream classes from globalThis, with bundled type definitions for them.

This is mainly for TypeScript users, as Web Streams API, being a Web standard, is only available in lib.dom.d.ts. When using Tango in Node.js, we don't want to include the entire lib.dom.d.ts which pollutes the global scope with Web APIs.

However, the type definitions in @yume-chan/stream-extra is not compatible with the ones in lib.dom.d.ts, which is mainly due to Chrome failing to implement the async iterator protocol for ReadableStream. When interoperating with native Web APIs like Blob.stream() or Response.body, explicit type casts are required.

import { ReadableStream } from "@yume-chan/stream-extra";

declare function someApi(source: ReadableStream<Uint8Array>): void;

someApi(new Blob([]).stream()); // Error
someApi(new Blob([]).stream() as unknown as ReadableStream<Uint8Array>); // OK, using the imported `ReadableStream` type
someApi(new Blob([]).stream() as never); // OK

ReadableStream

To create a ReadableStream, you only need to provide a data source:

const stream: ReadableStream<number> = new ReadableStream<number>({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});

In TypeScript, the generic parameter of ReadableStream is the type of the data it produces.

The close method closes the stream, which means no more data can be added to the stream.

enqueue

The enqueue method adds a value to the stream, it can be called anytime, anywhere, as long as you have a reference to the controller object. For example you can do this:

let controller: ReadableStreamDefaultController<number>;
const stream: ReadableStream<number> = new ReadableStream<number>({
start(_controller) {
controller = _controller;
},
});

setTimeout(() => {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
}, 1000);

pull and backpressure

However, continuing enqueuing data in start is not a good practice. If the ReadableStream produces data faster than the consumer can consume (or there is no consumer attached), the buffered data will take more and more memory.

The pull method in ReadableStreamSource is called when the internal buffer size is below the high water mark. You can use this method to control the data produce speed:

let value = 0;
const stream: ReadableStream<number> = new ReadableStream<number>({
pull(controller) {
controller.enqueue(value);
value += 1;
if (value > 3) {
controller.close();
}
},
});

The high water mark is the suggested maximum buffer size, enqueuing data above this size will still work. The high water mark is specified when creating the ReadableStream:

const stream: ReadableStream<number> = new ReadableStream<number>(
{
pull(controller) {
controller.enqueue(value);
value += 1;
if (value > 30) {
controller.close();
}
},
},
{ highWaterMark: 10 },
);

In this example, the high water mark is 10, so the pull method will immediately be called 10 times, and then be called again when the consumer consumes some data.

In practice, usually you are converting some other data source to streams. How can you limit the data produce speed depends on the data source.

WriteableStream

WriteableStream is the opposite of ReadableStream, it represents a stream of data that can be written to. To create a WriteableStream, you only need to provide a data sink:

const stream: WriteableStream<number> = new WriteableStream<number>({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});

In TypeScript, the generic parameter of WriteableStream is the type of the data it consumes.

The write callback can be asynchronous, and return when the data is consumed.

The close callback is called when the stream is closed.

Pipe

ReadableStreams can be piped to WriteableStreams:

const readable = new ReadableStream<number>({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});

const writeable = new WriteableStream<number>({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});

await readable.pipeTo(writeable);

The output is:

1
2
3
closed

This is usually how you consume a ReadableStream.

note

Browser's native ReadableStream can only pipeTo the native WriteableStream, this is for performance reasons.

TransformStream

A TransformStream has a WritableStream writable and a ReadableStream readable. When data is written to writable, the transform callback is called, and the transformed data is written to readable.

const transform = new TransformStream<number, string>({
transform(chunk, controller) {
controller.enqueue(chunk.toString());
},
});

const readable = new ReadableStream<number>({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});

const writeable = new WriteableStream<string>({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});

await Promise.all([readable.pipeTo(transform.writable), transform.readable.pipeTo(writeable)]);

The output is:

"1"
"2"
"3"
closed

The transform callback can be asynchronous, and enqueue zero, one or more data to the controller.

To simplify the two pipeTo calls, ReadableStream also has a pipeThrough method:

await readable.pipeThrough(transform).pipeTo(writeable);

In fact, pipeThrough method works for any object that has a WritableStream writable and a ReadableStream readable field, it returns the readable field.

Extra streams

PushReadableStream

@yume-chan/stream-extra package provides a PushReadableStream class, which can simplify the process of converting push style data sources to streams. The enqueue method of PushReadableStreamController returns a Promise that resolves when the data is consumed.

import { PushReadableStream } from "@yume-chan/stream-extra";

const stream = new PushReadableStream<number>(async (controller) => {
await controller.enqueue(1);
await controller.enqueue(2);
await controller.enqueue(3);
controller.close();
});