Skip to main content
Version: next

Consumable pattern

The Consumable pattern is intended to be used with Web Streams API. It allows the consumer to tell the producer when the value is no longer needed.

Problem

Web Streams API doesn't provide a way to track where the data is, for example:

const readable = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
controller.close();
},
});

const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk);
controller.enqueue(chunk);
},
});

const writable = new WritableStream({
write(chunk) {
console.log("write", chunk);
},
});

await readable.pipeThrough(transform).pipeTo(writable);

In readable, controller.enqueue is a synchronous operation, the data source can't know where the data is going and when it's finally consumed.

readable can't enqueuing the same Uint8Array with modified content multiple times, which reduces memory allocation and GC overheads (so better performance), because readable can't know when the buffer is consumed and it's safe to modify it.

Consumable<T>

The Consumable class is a wrapper around a value of type T. It allows the consumer to tell the producer when the value is no longer needed.

import { Consumable } from "@yume-chan/stream-extra";

function useData(data: Consumable<Uint8Array>) {
console.log(data.value); // use the value
data.consume();
}

async function produceData() {
const data = new Consumable(new Uint8Array([1, 2, 3]));
useData(data);
await data.consumed;
}

Let's use the Consumable pattern to re-write the previous example:

import {
Consumable,
ReadableStream,
TransformStream,
WritableStream,
} from "@yume-chan/stream-extra";

const buffer = new Uint8Array(3);

new ReadableStream({
start(controller) {
buffer[0] = 1;
buffer[1] = 2;
buffer[2] = 3;

const consumable1 = new Consumable(buffer);
controller.enqueue(consumable1);
await consumable1.consumed;

buffer[0] = 4;
buffer[1] = 5;
buffer[2] = 6;

const consumable2 = new Consumable(buffer);
controller.enqueue(consumable2);
await consumable2.consumed;

controller.close();
},
})
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
console.log(chunk.value);
controller.enqueue(chunk);
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
console.log(chunk.value);
chunk.consume();
},
})
);

Here, we create a Consumable for each enqueue operation, and await for the consumed promise to be settled before modifying the buffer.

It doesn't completely eliminate allocations, but if the size of Consumable objects is smaller than the Uint8Array you're enqueuing (which is the usual case), it's still beneficial.

tryConsume

Consumable#tryConsume method is a helper method to consume the value by calling a callback:

declare class Consumable<T> {
tryConsume<U>(callback: (value: T) => U): U;
}

tryConsume invokes the callback immediately with the inner value, and mark the Consumable as consumed when the callback completes (returns or throws). If the callback returns a Promise, tryConsume waits for the Promise to be settled and then mark the Consumable as consumed.

import { Consumable } from "@yume-chan/stream-extra";

function useData(data: Consumable<Uint8Array>) {
data.tryConsume((value) => {
console.log(value);
});
}

function useDataAsync(data: Consumable<Uint8Array>) {
data.tryConsume(async (value) => {
// Can be async
await delay(1000);
console.log(value);
});
}

It returns the result of the callback:

import { Consumable } from "@yume-chan/stream-extra";

const consumable = new Consumable(new Uint8Array([1, 2, 3]));
const result = consumable.tryConsume((value) => {
return 42;
});

console.log(result); // 42

Improve debugging experience

Normally, when piping data around streams, Chrome DevTools doesn't track the data flow, so the call stack is always empty:

import { ReadableStream, WritableStream } from "@yume-chan/stream-extra";

new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
debugger; // Here the call stack is empty: it doesn't tell you where the `chunk` comes from
},
})
);

With tryConsume, Consumable will synthesis a call stack using console.createTask:

import {
Consumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";

new ReadableStream({
start(controller) {
controller.enqueue(new Consumable(new Uint8Array([1, 2, 3])));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
chunk.tryConsume((value) => {
debugger; // The call stack will point to line 9 (`new Consumable(...)`)
});
},
})
);

MaybeConsumable<T>

The MaybeConsumable<T> type is a shorthand for T | Consumable<T>. For convenience, most WritableStreams in Tango accepts both T and Consumable<T>:

import {
Consumable,
WritableStream,
MaybeConsumable,
} from "@yume-chan/stream-extra";

declare const stream: WritableStream<MaybeConsumable<Uint8Array>>;

const writer = stream.getWriter();
await writer.write(new Consumable(new Uint8Array([4, 5, 6]))); // works
await writer.write(new Uint8Array([1, 2, 3])); // also works

This also allows a blob stream to be directly piped into them:

declare const blob: File | Blob;

blob.stream().pipeTo(writable);

You might choose to use Consumable<T> when either:

  • You need to re-use the buffer, so you want to track when it's consumed.
  • You want to improve debugging experience by synthesizing a call stack.

tryConsume

MaybeConsumable namespace also provides a helper method tryConsume. The value argument can be either T or Consumable<T>.

declare namespace MaybeConsumable {
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R
): R;
}

If the value is a Consumable, it behaves the same as Consumable#tryConsume. If it's not, it just invokes the callback immediately with the value.

import { MaybeConsumable } from "@yume-chan/stream-extra";

MaybeConsumable.tryConsume(new Uint8Array([1, 2, 3]), (value) => {
console.log(value); // Uint8Array [1, 2, 3]
});

MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([4, 5, 6])),
(value) => {
console.log(value); // Uint8Array [4, 5, 6]
}
);

await MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([7, 8, 9])),
async (value) => {
// Can be async
await delay(1000);
console.log(value); // Uint8Array [7, 8, 9]
}
);

WritableStream

The MaybeConsumable.WritableStream<T> class combines WritableStream<T> and tryConsume to help you correctly handle the data.

import { MaybeConsumable } from "@yume-chan/stream-extra";

const writable = new MaybeConsumable.WritableStream<Uint8Array>({
// `write` can be `async`
async write(chunk) {
console.log(chunk); // You can only use `chunk` in `write` callback
},
});

You must not use the chunk outside the write callback, unless you make a copy yourself. It might be overwritten with other data after your write callback returns.

For example, WebSocket's send method will copy the data into an internal transmission buffer, so it's safe to use that in write:

import { MaybeConsumable } from "@yume-chan/stream-extra";

declare const socket: WebSocket;

const writable = new MaybeConsumable.WritableStream<Uint8Array>({
write(chunk) {
socket.send(chunk);
},
});