Streaming AI pipeline events on Vercel

2023-07-25

Vercel is a free and easy to use hosting service for Next.js projects. However, it is not a secret that the free plan comes with certain limitations.

When building an AI project, the underlying APIs can often be slow and have latency beyond the default 10-second timeout.

Timeout Error

The good news is that Vercel offers a solution - Edge functions with a 30-second initial response timeout. By switching to Edge functions and using a streaming API, it is possible to overcome the limitation and improve the user experience at the same time.

Sounds great, right? But what if you have a multi-step processing pipeline where each step takes more than 10 seconds? In that case, the standard approach won't work, but you can still use generic streams!

Let's dive in! For simplicity, we'll use a single model, single prompt template, and the same chain type for all OpenAI API calls. Let's imagine the calls depend on each other and cannot be run in parallel (which is often the case in multi-step AI pipelines).

The Edge function API endpoint

// app/api/v1/streams/route.ts
 
import { NextResponse } from "next/server";
 
import { OpenAI } from "langchain/llms/openai";
import { LLMChain } from "langchain/chains";
import { PromptTemplate } from "langchain/prompts";
 
export const runtime = "edge";
 
export async function GET() {
  const model = new OpenAI({
    modelName: "gpt-4",
    openAIApiKey: process.env.OPENAI_API_KEY,
  });
 
  const promptTemplate = new PromptTemplate({
    template: "Andswer as a fun Dad. Respons with an answer only. {joke}",
    inputVariables: ["joke"]
  });
 
  const chain = new LLMChain({ llm: model, prompt: promptTemplate });
  const completions: { joke: string, completion: string }[] = [];
 
  const jokes = [
    "Where do fruits go on vacation?",
    "Where do boats go when they're sick?",
    "Why don't I trust stairs?",
    "What concert costs just 45 cents?",
    "A cheeseburger walks into a bar. The bartender says ...",
    "Why did I decide to sell my vacuum cleaner?",
  ];
 
  for (const joke of jokes) {
    const { text: completion } = await chain.call({ joke });
    completions.push({ joke, completion });
  }
 
  return NextResponse.json(completions);
}

And a simple frontend

// app/v1/streams/page.tsx
 
'use client';
 
import { useState, useEffect } from "react";
import Loader from "@/components/Loader";
 
type JokeCompletion = {
  joke: string,
  completion: string,
};
 
export default function StreamsPage() {
  const [loading, setLoading] = useState(true);
  const [completions, setCompletions] = useState<JokeCompletion[]>([]);
 
  useEffect(() => {
    if (!loading) return;
 
    const fetchData = async () => {
      let response;
      try {
        response = await fetch('/api/v1/streams');
        if (!response.ok) {
          const { error } = await response.json();
          throw new Error(error);
        }
 
        if (!response.body) {
          const error = "Empty response returned.";
          throw new Error(error);
        }
      } catch (error) {
        console.error(error);
        return;
      } finally {
        setLoading(false);
      }
 
      const completions: JokeCompletion[] = await response.json();
      setCompletions(completions);
    };
 
    fetchData();
  }, [loading]);
 
  return (
    <div className="flex flex-col justify-center min-h-screen bg-gray-100 dark:bg-gray-800 h-screen">
      <div className="flex flex-col items-center container mx-auto w-1/2">
        {loading &&
          <div className="flex mt-2 w-full justify-center">
            <Loader text="Loading..." />
          </div>
        }
        {completions.map(({ joke, completion }, idx) => (
          <div key={idx} className="mt-2 w-full">
            Q: {joke}
            <br/>
            A: {completion}
          </div>
        ))}
      </div>
    </div>
  );
};

Now, let’s navigate to http://localhost:3000/v1/streams and see how long it takes to process the requests.

Ugh, that timing won’t work well on Vercel free plan 😥

Let’s introduce the pipeline events stream!

To get started, update the API so it returns a streaming response. The underlying ReadableStream is built using start(controller) method which in turn calls the pipeline and uses the controller to control the stream.

// app/api/v2/streams/route.ts
 
import { NextResponse } from "next/server";
 
import { OpenAI } from "langchain/llms/openai";
import { LLMChain } from "langchain/chains";
import { PromptTemplate } from "langchain/prompts";
 
export const runtime = "edge";
 
enum PipelineEventType {
  PipelineStarted = "PipelineStarted",
  CompletionAdded = "CompletionAdded",
  PipelineCompleted = "PipelineCompleted",
};
 
type PipelineEventData = {
  joke: string,
  completion: string,
};
 
type PipelineEvent = {
  type: PipelineEventType,
  data?: PipelineEventData,
};
 
const encoder = new TextEncoder();
const encodeEvent = (event: PipelineEvent) => {
  return encoder.encode(JSON.stringify(event) + "\n");
};
 
export async function GET() {
  const model = new OpenAI({
    modelName: "gpt-4",
    openAIApiKey: process.env.OPENAI_API_KEY,
  });
 
  const promptTemplate = new PromptTemplate({
    template: "Andswer as a fun Dad. Respons with an answer only. {joke}",
    inputVariables: ["joke"]
  });
 
  const chain = new LLMChain({ llm: model, prompt: promptTemplate });
 
  const jokes = [
    "Where do fruits go on vacation?",
    "Where do boats go when they're sick?",
    "Why don't I trust stairs?",
    "What concert costs just 45 cents?",
    "A cheeseburger walks into a bar. The bartender says ...",
    "Why did I decide to sell my vacuum cleaner?",
  ];
 
  const pipeline = async (controller: ReadableStreamDefaultController) => {
    controller.enqueue(encodeEvent({ type: PipelineEventType.PipelineStarted }));
 
    for (const joke of jokes) {
      const { text: completion } = await chain.call({ joke });
 
      controller.enqueue(encodeEvent({
        type: PipelineEventType.CompletionAdded,
        data: { joke, completion },
      }));
    }
 
    controller.enqueue(encodeEvent({ type: PipelineEventType.PipelineCompleted }));
  }
 
  const progressStream = new ReadableStream({
    async start(controller) {
      await pipeline(controller);
      controller.close();
    },
  });
 
  return new NextResponse(progressStream, {
    status: 200,
    headers: {
      "Content-Type": "text/plain; charset=utf-8",
      "Transfer-Encoding": "chunked",
    },
  });
}

Now, update the frontend code to read the response in chunks and render each event as soon as it becomes available.

// app/v2/streams/page.tsx
 
'use client';
 
import { useState, useEffect } from "react";
import Loader from "@/components/Loader";
 
enum PipelineEventType {
  PipelineStarted = "PipelineStarted",
  CompletionAdded = "CompletionAdded",
  PipelineCompleted = "PipelineCompleted",
};
 
type PipelineEventData = {
  joke: string,
  completion: string,
};
 
type PipelineEvent = {
  type: PipelineEventType,
  data?: PipelineEventData,
};
 
export default function StreamsPage() {
  const [loading, setLoading] = useState(true);
  const [completed, setCompleted] = useState(false);
  const [completions, setCompletions] = useState<PipelineEventData[]>([]);
 
  useEffect(() => {
    if (!loading) return;
 
    const fetchData = async () => {
      let response;
 
      try {
        response = await fetch('/api/v2/streams');
        if (!response.ok) {
          const { error } = await response.json();
          throw new Error(error);
        }
 
        if (!response.body) {
          const error = "Empty response returned.";
          throw new Error(error);
        }
      } catch (error) {
        console.error(error);
        setLoading(false);
        return;
      }
 
      try {
        const reader = response.body.getReader();
        let done, value;
        while (!done) {
          ({ done, value } = await reader.read());
 
          if (done) return;
 
          const decoder = new TextDecoder();
          const chunkText = decoder.decode(value);
          const events = chunkText.split("\n").filter((line) => line !== "").map((line) => JSON.parse(line) as PipelineEvent);
          for (const event of events) {
            processEvent(event);
          }
        }
      } catch (error) {
        console.error(error);
        return;
      }
    };
 
    fetchData();
  }, [loading]);
 
  const processEvent = (event: PipelineEvent) => {
    if (event.type === "CompletionAdded") {
      setCompletions((prev) => {
        return [...prev, event.data! as PipelineEventData]
      });
    }
 
    if (event.type === "PipelineCompleted") {
      setLoading(false);
      setCompleted(true);
      return;
    }
  };
 
  return (
    <div className="flex flex-col justify-center min-h-screen bg-gray-100 dark:bg-gray-800 h-screen">
      <div className="flex flex-col items-center container mx-auto w-1/2">
        {loading &&
          <div className="flex mt-2 w-full justify-center">
            <Loader text="Loading..." />
          </div>
        }
        {completions.map(({ joke, completion }, idx) => (
          <div key={idx} className="mt-2 w-full">
            Q: {joke}
            <br/>
            A: {completion}
          </div>
        ))}
        {completed &&
          <div className="mt-2 w-full text-center">
            🎉 Have fun!
          </div>
        }
      </div>
    </div>
  );
};

Cool, let’s see how it works 🪄

Great! Not only have we made it work with Vercel Edge functions, but we have also improved the user experience by reducing the time it takes to render the initial response.

Cool Kid GIF