Consumable pattern
The Consumable
pattern is intended to be used with Web Streams API. It allows the consumer to tell the producer when the value is no longer needed.
Problem
Web Streams API doesn't provide a way to track where the data is, for example:
const readable = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
controller.close();
},
});
const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk);
controller.enqueue(chunk);
},
});
const writable = new WritableStream({
write(chunk) {
console.log("write", chunk);
},
});
await readable.pipeThrough(transform).pipeTo(writable);
In readable
, controller.enqueue
is a synchronous operation, the data source can't know where the data is going and when it's finally consumed.
readable
can't enqueuing the same Uint8Array
with modified content multiple times, which reduces memory allocation and GC overheads (so better performance), because readable
can't know when the buffer is consumed and it's safe to modify it.
Consumable<T>
The Consumable
class is a wrapper around a value of type T
. It allows the consumer to tell the producer when the value is no longer needed.
import { Consumable } from "@yume-chan/stream-extra";
function useData(data: Consumable<Uint8Array>) {
console.log(data.value); // use the value
data.consume();
}
async function produceData() {
const data = new Consumable(new Uint8Array([1, 2, 3]));
useData(data);
await data.consumed;
}
Let's use the Consumable
pattern to re-write the previous example:
import {
Consumable,
ReadableStream,
TransformStream,
WritableStream,
} from "@yume-chan/stream-extra";
const buffer = new Uint8Array(3);
new ReadableStream({
start(controller) {
buffer[0] = 1;
buffer[1] = 2;
buffer[2] = 3;
const consumable1 = new Consumable(buffer);
controller.enqueue(consumable1);
await consumable1.consumed;
buffer[0] = 4;
buffer[1] = 5;
buffer[2] = 6;
const consumable2 = new Consumable(buffer);
controller.enqueue(consumable2);
await consumable2.consumed;
controller.close();
},
})
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
console.log(chunk.value);
controller.enqueue(chunk);
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
console.log(chunk.value);
chunk.consume();
},
})
);
Here, we create a Consumable
for each enqueue
operation, and await
for the consumed
promise to be settled before modifying the buffer.
It doesn't completely eliminate allocations, but if the size of Consumable
objects is smaller than the Uint8Array
you're enqueuing (which is the usual case), it's still beneficial.
tryConsume
Consumable#tryConsume
method is a helper method to consume the value by calling a callback:
declare class Consumable<T> {
tryConsume<U>(callback: (value: T) => U): U;
}
tryConsume
invokes the callback immediately with the inner value, and mark the Consumable
as consumed when the callback completes (returns or throws). If the callback returns a Promise
, tryConsume
waits for the Promise
to be settled and then mark the Consumable
as consumed.
- JavaScript
- TypeScript
function useData(data) {
data.tryConsume((value) => {
console.log(value);
});
}
function useDataAsync(data) {
data.tryConsume(async (value) => {
// Can be async
await delay(1000);
console.log(value);
});
}
import { Consumable } from "@yume-chan/stream-extra";
function useData(data: Consumable<Uint8Array>) {
data.tryConsume((value) => {
console.log(value);
});
}
function useDataAsync(data: Consumable<Uint8Array>) {
data.tryConsume(async (value) => {
// Can be async
await delay(1000);
console.log(value);
});
}
It returns the result of the callback:
import { Consumable } from "@yume-chan/stream-extra";
const consumable = new Consumable(new Uint8Array([1, 2, 3]));
const result = consumable.tryConsume((value) => {
return 42;
});
console.log(result); // 42
Improve debugging experience
Normally, when piping data around streams, Chrome DevTools doesn't track the data flow, so the call stack is always empty:
import { ReadableStream, WritableStream } from "@yume-chan/stream-extra";
new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
debugger; // Here the call stack is empty: it doesn't tell you where the `chunk` comes from
},
})
);
With tryConsume
, Consumable
will synthesis a call stack using console.createTask
:
import {
Consumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";
new ReadableStream({
start(controller) {
controller.enqueue(new Consumable(new Uint8Array([1, 2, 3])));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
chunk.tryConsume((value) => {
debugger; // The call stack will point to line 9 (`new Consumable(...)`)
});
},
})
);
MaybeConsumable<T>
The MaybeConsumable<T>
type is a shorthand for T | Consumable<T>
. For convenience, most WritableStream
s in Tango accepts both T
and Consumable<T>
:
import {
Consumable,
WritableStream,
MaybeConsumable,
} from "@yume-chan/stream-extra";
declare const stream: WritableStream<MaybeConsumable<Uint8Array>>;
const writer = stream.getWriter();
await writer.write(new Consumable(new Uint8Array([4, 5, 6]))); // works
await writer.write(new Uint8Array([1, 2, 3])); // also works
This also allows a blob stream to be directly piped into them:
declare const blob: File | Blob;
blob.stream().pipeTo(writable);
You might choose to use Consumable<T>
when either:
- You need to re-use the buffer, so you want to track when it's consumed.
- You want to improve debugging experience by synthesizing a call stack.
tryConsume
MaybeConsumable
namespace also provides a helper method tryConsume
. The value
argument can be either T
or Consumable<T>
.
declare namespace MaybeConsumable {
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R
): R;
}
If the value
is a Consumable
, it behaves the same as Consumable#tryConsume
. If it's not, it just invokes the callback immediately with the value.
import { MaybeConsumable } from "@yume-chan/stream-extra";
MaybeConsumable.tryConsume(new Uint8Array([1, 2, 3]), (value) => {
console.log(value); // Uint8Array [1, 2, 3]
});
MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([4, 5, 6])),
(value) => {
console.log(value); // Uint8Array [4, 5, 6]
}
);
await MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([7, 8, 9])),
async (value) => {
// Can be async
await delay(1000);
console.log(value); // Uint8Array [7, 8, 9]
}
);
WritableStream
The MaybeConsumable.WritableStream<T>
class combines WritableStream<T>
and tryConsume
to help you correctly handle the data.
import { MaybeConsumable } from "@yume-chan/stream-extra";
const writable = new MaybeConsumable.WritableStream<Uint8Array>({
// `write` can be `async`
async write(chunk) {
console.log(chunk); // You can only use `chunk` in `write` callback
},
});
You must not use the chunk
outside the write
callback, unless you make a copy yourself. It might be overwritten with other data after your write
callback returns.
For example, WebSocket
's send
method will copy the data into an internal transmission buffer, so it's safe to use that in write
:
- JavaScript
- TypeScript
import { MaybeConsumable } from "@yume-chan/stream-extra";
const writable = new MaybeConsumable.WritableStream({
write(chunk) {
socket.send(chunk);
},
});
import { MaybeConsumable } from "@yume-chan/stream-extra";
declare const socket: WebSocket;
const writable = new MaybeConsumable.WritableStream<Uint8Array>({
write(chunk) {
socket.send(chunk);
},
});