Web Streams Basics
ReadableStream
and WriteableStream
are from Web Streams API. They represent a stream of data that can be read from or written to asynchronously.
@yume-chan/stream-extra
package
@yume-chan/stream-extra
package re-exports the native ReadableStream
, WriteableStream
, and TransformStream
classes from globalThis
, with bundled type definitions for them.
This is mainly for TypeScript users, as Web Streams API, being a Web standard, is only available in lib.dom.d.ts
. When using Tango in Node.js, we don't want to include the entire lib.dom.d.ts
which pollutes the global scope with Web APIs.
However, the type definitions in @yume-chan/stream-extra
is not compatible with the ones in lib.dom.d.ts
, which is mainly due to Chrome failing to implement the async iterator protocol for ReadableStream
. When interoperating with native Web APIs like Blob.stream()
or Response.body
, explicit type casts are required.
import { ReadableStream } from "@yume-chan/stream-extra";
declare function someApi(source: ReadableStream<Uint8Array>): void;
someApi(new Blob([]).stream()); // Error
someApi(new Blob([]).stream() as unknown as ReadableStream<Uint8Array>); // OK, using the imported `ReadableStream` type
someApi(new Blob([]).stream() as never); // OK
ReadableStream
To create a ReadableStream
, you only need to provide a data source:
- JavaScript
- TypeScript
const stream = new ReadableStream({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});
const stream: ReadableStream<number> = new ReadableStream<number>({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});
In TypeScript, the generic parameter of ReadableStream
is the type of the data it produces.
The close
method closes the stream, which means no more data can be added to the stream.
enqueue
The enqueue
method adds a value to the stream, it can be called anytime, anywhere, as long as you have a reference to the controller
object. For example you can do this:
- JavaScript
- TypeScript
let controller;
const stream = new ReadableStream({
start(_controller) {
controller = _controller;
},
});
setTimeout(() => {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
}, 1000);
let controller: ReadableStreamDefaultController<number>;
const stream: ReadableStream<number> = new ReadableStream<number>({
start(_controller) {
controller = _controller;
},
});
setTimeout(() => {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
}, 1000);
pull
and backpressure
However, continuing enqueuing data in start
is not a good practice. If the ReadableStream
produces data faster than the consumer can consume (or there is no consumer attached), the buffered data will take more and more memory.
The pull
method in ReadableStreamSource
is called when the internal buffer size is below the high water mark. You can use this method to control the data produce speed:
- JavaScript
- TypeScript
let value = 0;
const stream = new ReadableStream({
pull(controller) {
controller.enqueue(value);
value += 1;
if (value > 3) {
controller.close();
}
},
});
let value = 0;
const stream: ReadableStream<number> = new ReadableStream<number>({
pull(controller) {
controller.enqueue(value);
value += 1;
if (value > 3) {
controller.close();
}
},
});
The high water mark is the suggested maximum buffer size, enqueuing data above this size will still work. The high water mark is specified when creating the ReadableStream
:
- JavaScript
- TypeScript
const stream = new ReadableStream(
{
pull(controller) {
controller.enqueue(value);
value += 1;
if (value > 30) {
controller.close();
}
},
},
{ highWaterMark: 10 },
);
const stream: ReadableStream<number> = new ReadableStream<number>(
{
pull(controller) {
controller.enqueue(value);
value += 1;
if (value > 30) {
controller.close();
}
},
},
{ highWaterMark: 10 },
);
In this example, the high water mark is 10, so the pull
method will immediately be called 10 times, and then be called again when the consumer consumes some data.
In practice, usually you are converting some other data source to streams. How can you limit the data produce speed depends on the data source.
WriteableStream
WriteableStream
is the opposite of ReadableStream
, it represents a stream of data that can be written to. To create a WriteableStream
, you only need to provide a data sink:
- JavaScript
- TypeScript
const stream = new WriteableStream({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});
const stream: WriteableStream<number> = new WriteableStream<number>({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});
In TypeScript, the generic parameter of WriteableStream
is the type of the data it consumes.
The write
callback can be asynchronous, and return when the data is consumed.
The close
callback is called when the stream is closed.
Pipe
ReadableStream
s can be piped to WriteableStream
s:
- JavaScript
- TypeScript
const readable = new ReadableStream({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});
const writeable = new WriteableStream({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});
await readable.pipeTo(writeable);
const readable = new ReadableStream<number>({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});
const writeable = new WriteableStream<number>({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});
await readable.pipeTo(writeable);
The output is:
1
2
3
closed
This is usually how you consume a ReadableStream
.
Browser's native ReadableStream
can only pipeTo
the native WriteableStream
, this is for performance reasons.
TransformStream
A TransformStream
has a WritableStream
writable
and a ReadableStream
readable
. When data is written to writable
, the transform callback is called, and the transformed data is written to readable
.
- JavaScript
- TypeScript
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toString());
},
});
const readable = new ReadableStream({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});
const writeable = new WriteableStream({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});
await Promise.all([readable.pipeTo(transform.writable), transform.readable.pipeTo(writeable)]);
const transform = new TransformStream<number, string>({
transform(chunk, controller) {
controller.enqueue(chunk.toString());
},
});
const readable = new ReadableStream<number>({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.enqueue(3);
controller.close();
},
});
const writeable = new WriteableStream<string>({
write(chunk) {
console.log(chunk);
},
close() {
console.log("closed");
},
});
await Promise.all([readable.pipeTo(transform.writable), transform.readable.pipeTo(writeable)]);
The output is:
"1"
"2"
"3"
closed
The transform
callback can be asynchronous, and enqueue zero, one or more data to the controller
.
To simplify the two pipeTo
calls, ReadableStream
also has a pipeThrough
method:
await readable.pipeThrough(transform).pipeTo(writeable);
In fact, pipeThrough
method works for any object that has a WritableStream
writable
and a ReadableStream
readable
field, it returns the readable
field.
Extra streams
PushReadableStream
@yume-chan/stream-extra
package provides a PushReadableStream
class, which can simplify the process of converting push style data sources to streams. The enqueue
method of PushReadableStreamController
returns a Promise
that resolves when the data is consumed.
- JavaScript
- TypeScript
import { PushReadableStream } from "@yume-chan/stream-extra";
const stream = new PushReadableStream(async (controller) => {
await controller.enqueue(1);
await controller.enqueue(2);
await controller.enqueue(3);
controller.close();
});
import { PushReadableStream } from "@yume-chan/stream-extra";
const stream = new PushReadableStream<number>(async (controller) => {
await controller.enqueue(1);
await controller.enqueue(2);
await controller.enqueue(3);
controller.close();
});