41 lines
1.0 KiB
JavaScript
41 lines
1.0 KiB
JavaScript
/* global ReadableStream TransformStream */
|
|
|
|
export function transformStream(readable, transformer, oncancel) {
|
|
if (typeof TransformStream === 'function') {
|
|
return readable.pipeThrough(new TransformStream(transformer));
|
|
}
|
|
const reader = readable.getReader();
|
|
return new ReadableStream({
|
|
start(controller) {
|
|
if (transformer.start) {
|
|
return transformer.start(controller);
|
|
}
|
|
},
|
|
async pull(controller) {
|
|
let enqueued = false;
|
|
const wrappedController = {
|
|
enqueue(d) {
|
|
enqueued = true;
|
|
controller.enqueue(d);
|
|
}
|
|
};
|
|
while (!enqueued) {
|
|
const data = await reader.read();
|
|
if (data.done) {
|
|
if (transformer.flush) {
|
|
await transformer.flush(controller);
|
|
}
|
|
return controller.close();
|
|
}
|
|
await transformer.transform(data.value, wrappedController);
|
|
}
|
|
},
|
|
cancel(reason) {
|
|
readable.cancel(reason);
|
|
if (oncancel) {
|
|
oncancel(reason);
|
|
}
|
|
}
|
|
});
|
|
}
|