Docs
Stream Response
Stream Response
Utils to return regular data updates that can be consumed as a stream on client.
Installation
Create lib/stream-response.ts
import { z } from "zod";
export interface StreamStatus {
queue: string[];
started: boolean;
finished: boolean;
error: string | null;
}
/**
* Manages the status updates of the streamed response.
*/
export function createStreamStatus(): StreamStatus {
return {
queue: [],
started: false,
finished: false,
error: null,
};
}
export class Streamer {
private encoder: TextEncoder;
private status: StreamStatus;
private lastYield: Date | null = null;
constructor() {
this.encoder = new TextEncoder();
this.status = createStreamStatus();
}
start() {
this.status.started = true;
}
stop() {
this.status.finished = true;
}
setError(error: unknown) {
this.status.error = String(error);
}
reset() {
this.status = createStreamStatus();
}
canYield() {
if (!this.lastYield) {
this.lastYield = new Date();
}
return new Date().getTime() - this.lastYield.getTime() > 500;
}
yield(data: string, checkIfCanYield = true) {
if (checkIfCanYield && !this.canYield()) {
return;
}
this.lastYield = new Date();
this.status.queue.push(data);
}
getStatus() {
return this.status;
}
getEncoder() {
return this.encoder;
}
getError() {
return this.status.error;
}
emptyQueue() {
this.status.queue = [];
}
getQueue() {
return this.status.queue;
}
encode(data: string | undefined, type: "message" | "error" = "message") {
return this.encoder.encode(createEncoderMessage(data, type));
}
pop() {
return this.encode(this.status.queue.shift());
}
}
/**
* Create this object on top of your route handler to manage the stream.
*
* Usage:
* 1. Set status.started to true when you start streaming.
* 2. Push the updates to status.queue. Check if enough time has passed with "canYield".
* 3. Set status.finished to true when you are done streaming.
* 4. Return "streamResponse(iterator(<object>))" from your route handler.
*/
export const createStreamer = () => ({
encoder: new TextEncoder(),
status: createStreamStatus(),
});
/**
* Get the current status of the stream.
*/
export function isStreaming(streamStatus: StreamStatus) {
return (
streamStatus.queue.length > 0 ||
(!streamStatus.started && !streamStatus.finished) ||
(!streamStatus.finished && streamStatus.started) ||
streamStatus.error
);
}
/**
* Yield the stream updates.
*/
export function createEncoderMessage(
data: string | undefined,
type: "message" | "error" = "message",
) {
if (data) {
return JSON.stringify({
status: type === "message" ? "ok" : "error",
data: data,
});
}
return undefined;
}
/**
* Iterator for the stream. Use this as return with "streamResponse(iterator(<object>))".
*/
export async function* createIteratorFromStreamer(streamer: Streamer) {
while (isStreaming(streamer.getStatus())) {
const error = streamer.getError();
if (error) {
yield streamer.encode(error, "error");
streamer.emptyQueue();
streamer.stop();
}
if (streamer.getQueue().length > 0) {
yield streamer.pop();
}
await new Promise((resolve) => setTimeout(resolve, 200));
}
}
/**
* Convert the iterator to a stream.
*/
export function iteratorToStream(iterator: AsyncGenerator) {
return new ReadableStream({
async pull(controller) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const { value, done } = await iterator.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
});
}
/**
* Use to return the stream from the route handler.
*/
export function streamResponse(iterator: AsyncGenerator) {
return new Response(iteratorToStream(iterator));
}
/**
* Read the streamed response on the client.
*/
export async function readDataStream(
response: Response,
whileReading: (chunck: { data: string }) => void,
): Promise<
| {
ok: true;
}
| {
ok: false;
error: string;
type: "BAD_RESPONSE" | "BAD_BODY" | "CUSTOM";
}
> {
if (!response.ok) {
return {
ok: false,
error: "",
type: "BAD_RESPONSE",
};
}
if (!response.body) {
return {
ok: false,
error: "",
type: "BAD_BODY",
};
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let finishedStreaming = false;
let error = "";
while (!finishedStreaming) {
const { value, done } = await reader.read();
finishedStreaming = done;
const chunk = decoder.decode(value);
if (!chunk) {
continue;
}
try {
const chunkJson = JSON.parse(chunk) as unknown;
const parsedChunk = z
.object({
status: z.union([z.literal("ok"), z.literal("error")]),
data: z.string(),
})
.safeParse(chunkJson);
if (!parsedChunk.success) {
error = "Error while parsing streamed message: wrong JSON format";
finishedStreaming = true;
} else {
if (parsedChunk.data.status === "error") {
finishedStreaming = true;
error = parsedChunk.data.data;
} else {
whileReading({
data: parsedChunk.data.data,
});
}
}
} catch (err) {
error = "Error while parsing streamed message: invalid JSON";
finishedStreaming = true;
}
}
if (error) {
return {
ok: false,
error,
type: "CUSTOM",
};
}
return {
ok: true,
};
}