Skip to main content

Consumable pattern

The Consumable pattern is intended to be used with Web Streams API. It allows the consumer to tell the producer when the value is no longer needed.

Problem

Web Streams API doesn't provide a way to track where the data is, for example:

const readable = new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.enqueue(new Uint8Array([4, 5, 6]));
controller.close();
},
});

const transform = new TransformStream({
transform(chunk, controller) {
console.log("transform", chunk);
controller.enqueue(chunk);
},
});

const writable = new WritableStream({
write(chunk) {
console.log("write", chunk);
},
});

await readable.pipeThrough(transform).pipeTo(writable);

In readable, controller.enqueue is a synchronous operation, the data source can't know where the data is going and when it's finally consumed.

readable can't enqueuing the same Uint8Array with modified content multiple times, which reduces memory allocation and GC overheads (so better performance), because readable can't know when the buffer is consumed and it's safe to modify it.

Consumable<T>

The Consumable class is a wrapper around a value of type T. It allows the consumer to tell the producer when the value is no longer needed.

import { Consumable } from "@yume-chan/stream-extra";

function useData(data: Consumable<Uint8Array>) {
console.log(data.value); // use the value
data.consume();
}

async function produceData() {
const data = new Consumable(new Uint8Array([1, 2, 3]));
useData(data);
await data.consumed;
}

Let's use the Consumable pattern to re-write the previous example:

import {
Consumable,
ReadableStream,
TransformStream,
WritableStream,
} from "@yume-chan/stream-extra";

const buffer = new Uint8Array(3);

new ReadableStream({
start(controller) {
buffer[0] = 1;
buffer[1] = 2;
buffer[2] = 3;

const consumable1 = new Consumable(buffer);
controller.enqueue(consumable1);
await consumable1.consumed;

buffer[0] = 4;
buffer[1] = 5;
buffer[2] = 6;

const consumable2 = new Consumable(buffer);
controller.enqueue(consumable2);
await consumable2.consumed;

controller.close();
},
})
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
console.log(chunk.value);
controller.enqueue(chunk);
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
console.log(chunk.value);
chunk.consume();
},
})
);

Here, we create a Consumable for each enqueue operation, and await for the consumed promise to be settled before modifying the buffer.

It doesn't completely eliminate allocations, but if the size of Consumable objects is smaller than the Uint8Array you're enqueuing (which is the usual case), it's still beneficial.

tryConsume

Consumable#tryConsume method is a helper method to consume the value by calling a callback:

declare class Consumable<T> {
tryConsume<U>(callback: (value: T) => U): U;
}

tryConsume invokes the callback immediately with the inner value, and mark the Consumable as consumed when the callback completes (returns or throws). If the callback returns a Promise, tryConsume waits for the Promise to be settled and then mark the Consumable as consumed.

import { Consumable } from "@yume-chan/stream-extra";

function useData(data: Consumable<Uint8Array>) {
data.tryConsume((value) => {
console.log(value);
});
}

function useDataAsync(data: Consumable<Uint8Array>) {
data.tryConsume(async (value) => {
// Can be async
await delay(1000);
console.log(value);
});
}

It returns the result of the callback:

import { Consumable } from "@yume-chan/stream-extra";

const consumable = new Consumable(new Uint8Array([1, 2, 3]));
const result = consumable.tryConsume((value) => {
return 42;
});

console.log(result); // 42

Improve debugging experience

Normally, when piping data around streams, Chrome DevTools doesn't track the data flow, so the call stack is always empty:

import { ReadableStream, WritableStream } from "@yume-chan/stream-extra";

new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
debugger; // Here the call stack is empty: it doesn't tell you where the `chunk` comes from
},
})
);

With tryConsume, Consumable will synthesis a call stack using console.createTask:

import {
Consumable,
ReadableStream,
WritableStream,
} from "@yume-chan/stream-extra";

new ReadableStream({
start(controller) {
controller.enqueue(new Consumable(new Uint8Array([1, 2, 3])));
controller.close();
},
}).pipeTo(
new WritableStream({
write(chunk) {
chunk.tryConsume((value) => {
debugger; // The call stack will point to line 9 (`new Consumable(...)`)
});
},
})
);

MaybeConsumable<T>

Most streams are actually accepting MaybeConsumable<T>, which means they can accept both T and Consumable<T>:

import {
Consumable,
WritableStream,
MaybeConsumable,
} from "@yume-chan/stream-extra";

declare const stream: WritableStream<MaybeConsumable<Uint8Array>>;

const writer = stream.getWriter();
await writer.write(new Consumable(new Uint8Array([4, 5, 6]))); // works
await writer.write(new Uint8Array([1, 2, 3])); // also works

This allows a blob stream to be directly piped into a WritableStream:

declare const blob: File | Blob;

blob.stream().pipeTo(writable);

You can choose to use Consumable<T> when either:

  • You need to re-use the buffer, so you want to track when it's consumed.
  • You want to improve debugging experience by synthesizing a call stack.

tryConsume

MaybeConsumable namespace also provides a helper method tryConsume. The value argument can be either T or Consumable<T>.

declare namespace MaybeConsumable {
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R
): R;
}

If the value is a Consumable, it behaves the same as Consumable#tryConsume. If it's not, it just invokes the callback immediately with the value.

import { MaybeConsumable } from "@yume-chan/stream-extra";

MaybeConsumable.tryConsume(new Uint8Array([1, 2, 3]), (value) => {
console.log(value); // Uint8Array [1, 2, 3]
});

MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([4, 5, 6])),
(value) => {
console.log(value); // Uint8Array [4, 5, 6]
}
);

await MaybeConsumable.tryConsume(
new Consumable(new Uint8Array([7, 8, 9])),
async (value) => {
// Can be async
await delay(1000);
console.log(value); // Uint8Array [7, 8, 9]
}
);