This blog walks through an example of how to stream real-time LLM responses and display them on multiple clients simultaneously, using Redis Pub/Sub for event publishing and subscription, and React Query for fetching the real-time stream.
System Design
The system is composed of three main components:
1. Client
-
Generates unique session ID for each new stream, composes request URL, and persists session ID client-side.
-
Accepts user prompt, passes the prompt and session ID to stream generator.
-
Retrieves session ID on refresh/reconnect and refetches stream.
-
Reads and renders text stream response.
2. Stream Generator
-
Generates a key derived from the session ID for the Redis Stream and Pub/Sub channel.
-
Communicates with an LLM API to receive streamed chunks, appends new chunks to the specific Redis Stream, and publishes updates to the Pub/Sub channel.
-
Notifies the system when the stream starts or ends.
3. Stream Consumer
-
Accepts connections from clients and parses the session ID.
-
Generates a Redis Consumer Group for each SSE connection, enabling multiple clients to read from the stream.
-
Subscribes to the Pub/Sub channel associated with the session ID and pipes new chunks to the client.
Sample Implementation
The full code can be quite long, and there are various ways to implement it. This blog will walk through a sample focusing on key parts for demonstration purposes.
1. Client
To maintain and update the current session's lifecycle, we use a custom hook useSession for session ID generation, synchronization, and regeneration.
// hooks/useSession.ts
import { useRouter } from "next/navigation";
import { useEffect, useState } from "react";
// ...
export const useSession = () => {
const [sessionId, setSessionId] = useState<string>("");
const router = useRouter();
useEffect(() => {
// effect to check if session id exists and sync it to local state & url
// if not, generate a new one and sync it to local state & url
// ...
}, []);
const clearSessionId = () => {
// remove session id from url and localStorage
// ...
};
const generateSessionId = () => {
// generate new session and store it in state, localStorage and updates url
// ...
};
return {
sessionId,
generateSessionId,
clearSessionId,
};
};
In the main app, we use the custom useSession hook to maintain and update information associated with the session ID.
We use the useMutation hook from React Query to expose a mutate function, which is called upon form submission to trigger the server API. The useMutation hook also exposes an onSuccess callback, allowing us to connect to the running stream and update the UI state to display streaming or loading indicators.
Also imported from React Query, the useQuery hook is used to expose the refetch function for connecting to a running stream and to handle retry logic using the retry callback provided by useQuery.
// app/page.tsx
import { useSession } from "@/hooks/useSession";
import { useMutation, useQuery } from "@tanstack/react-query";
import { FormEvent, useRef, useState, useEffect } from "react";
// ...
export default function Home() {
// retrieve info of current session and functions for updating its lifecycle
const { sessionId, generateSessionId, clearSessionId } = useSession();
const [prompt, setPrompt] = useState(""); // state for prompt
const [response, setResponse] = useState(""); // state for response
// ...
// expose a mutate function to be triggered upon user prompt
// and pass user prompt and session id to the server
const { mutate } = useMutation({
mutationFn: async (newSessionId: string) => {
await fetch("/api/generate-stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ prompt, sessionId: newSessionId }),
});
},
onSuccess: () => {
// connect to running stream
refetch();
// update ui for loading/streaming
// ...
},
});
// expose a function 'refetch' to connect to a running stream
const { refetch } = useQuery({
queryKey: ["stream", sessionId],
queryFn: async () => {
// fetch logic for stream
// "Content-Type": "text/event-stream"
// ...
const res = await fetch(
`/api/consume-stream?sessionId=${sessionId}`,
{
headers: { "Content-Type": "text/event-stream" },
}
);
// ...
const reader = res.body
.pipeThrough(new TextDecoderStream())
.getReader();
// use reader to read in chunks of data and update state
// ...
},
refetchOnWindowFocus: false,
refetchOnMount: false,
retry(failureCount, error) {
// retry logic upon failure
// ...
},
});
// ... helper functions (eg. handle form submission and reset)
// ... UI
return <ChatbotUI />
}
2. Stream Generator
The generate-stream API receives the session ID and prompt from the client and uses them to call the LLM API. The result is streamed back in chunks using the streamText function from the AI SDK.
Each chunk is appended to certain Redis Stream using redis.xadd, and updates are published via redis.publish. Events may also be published when the stream is initialized and when it ends, including relevant metadata such as the total number of chunks, stream duration, and other details.
// api/generate-stream/route.ts
import { streamText } from "ai"
import { redis } from "@/utils"
// ...
export async function POST(req: NextRequest) {
// parses prompt and session id from request
// ...
// build streamKey from session id
const streamKey = `llm:stream:${sessionId}`;
// publish start of the stream
await redis.xadd(streamKey, "*", metadataMessage);
await redis.publish(streamKey, { type: MessageType.METADATA });
// ...
// generate llm response
// ...
await new Promise(
async (resolve, reject) => {
const { textStream } = streamText({
model: openai("gpt-4o"),
prompt,
onError: (err) => reject(err),
onFinish: async () => {
resolve({
// ...
}),
})
for await (const chunk of textStream) {
if (chunk) {
const chunkMessage: ChunkMessage = {
type: MessageType.CHUNK,
content: chunk,
}
// write chunk to redis stream
await redis.xadd(streamKey, "*", chunkMessage)
// publish to channel
await redis.publish(streamKey, { type: MessageType.CHUNK })
}
}
}
)
// publish end of the stream
await redis.xadd(streamKey, "*", metadataMessage);
await redis.publish(streamKey, { type: MessageType.METADATA });
// ...
}
3. Stream Consumer
The consume-stream API parses the session ID from the client and creates a Redis Consumer Group using redis.xgroup.
This API acts as a bridge between Redis and the client by establishing a Server-Sent Events (SSE) connection with the headers "Content-Type": "text/event-stream" and "Connection": "keep-alive".
Chunked messages are read using redis.xreadgroup as part of the consumer group and piped to the client via the ReadableStream API.
// api/consume-stream/route.ts
import { redis } from "@/utils"
// ...
export async function GET(req: NextRequest) {
// parse session id and build stream key
const { searchParams } = new URL(req.url);
const sessionId = searchParams.get("sessionId");
const streamKey = `llm:stream:${sessionId}`;
// check if streamKey exists
const keyExists = await redis.exists(streamKey);
// ...
// generate id for redis consumer group
const groupName = `sse-group-${nanoid()}`;
// create redis consumer group
try {
await redis.xgroup(streamKey, {
type: "CREATE",
group: groupName,
id: "0",
});
} catch (_err) {}
// pipe response to client
const response = new Response(
new ReadableStream({
async start(controller) {
const readStreamMessages = async () => {
// read messages from Redis stream as part of a Redis consumer group
const chunks = (await redis.xreadgroup(
groupName,
`consumer-1`,
streamKey,
">"
)) as StreamData[];
if (chunks?.length > 0) {
const [_streamKey, messages] = chunks[0];
for (const [_messageId, fields] of messages) {
const rawObj = arrToObj(fields);
const validatedMessage = validateMessage(rawObj);
if (validatedMessage) {
controller.enqueue(json(validatedMessage));
}
}
}
};
await readStreamMessages();
// subscribe to pub/sub channel
const subscription = redis.subscribe(streamKey);
// read stream
subscription.on("message", async () => {
await readStreamMessages();
});
// handle subscription cancel/abort
// ...
},
}),
{
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
},
}
);
return response;
}