Как передать ответ LLM из FastAPI в React?

Я хочу транслировать ответ LLM (ollama), используя fastapi и react. Я могу успешно получить ответ от LLM без потоковой передачи, но при попытке потоковой передачи получаю ошибку react. Ответ LLM успешно передается, когда я печатаю каждый фрагмент в fastapi. Я использую @microsoft/fetch-event-source для трансляции ответа.

Я создал репозиторий на GitHub, чтобы помочь вам помочь мне.

Вот код fastapi и react, о котором идет речь:

реагировать

Что-то происходит, что вызывает срабатывание console.info("another error onopen").

import { useState } from "react";
import {
  fetchEventSource,
  EventStreamContentType,
} from "@microsoft/fetch-event-source";

class FatalError extends Error {}

const StreamResponse = ({ input }: { input: string }) => {
  const [answer, setAnswer] = useState("");

  const handleClick = () => {
    fetchEventSource("http://localhost:8000/stream", {
      body: JSON.stringify({ question: input }),
      method: "POST",
      headers: {
        "Content-type": "application/json",
        Accept: "text/event-stream",
      },

      async onopen(response) {
        console.info("onopen");
        console.info(response);

        if (
          response.ok &&
          response.headers.get("content-type") === EventStreamContentType
        ) {
          // setAnswer(response.answer)
          return; // everything's good
        } else if (
          response.status >= 400 &&
          response.status < 500 &&
          response.status !== 429
        ) {
          console.info("fatal error onopen");
          // client-side errors are usually non-retriable:
          // throw new FatalError();
        } else {
          console.info("another error onopen");
          // throw new RetriableError();
        }
      },
      onmessage(msg) {
        // if the server emits an error message, throw an exception
        // so it gets handled by the onerror callback below:
        console.info("onmessage");
        console.info(msg);
        if (msg.event === "FatalError") {
          throw new FatalError(msg.data);
        }
      },
      onclose() {
        console.info("onclose");
        // if the server closes the connection unexpectedly, retry:
        // throw new RetriableError();
      },
      onerror(err) {
        console.info("onerror");
        console.info(err);
        if (err instanceof FatalError) {
          throw err; // rethrow to stop the operation
        } else {
          // do nothing to automatically retry. You can also
          // return a specific retry interval here.
        }
      },
    });
  };

  return (
    <div style = {{ width: "100%" }}>
      <div style = {{ display: "flex", gap: 10, alignItems: "center" }}>
        <button onClick = {handleClick} style = {{ height: 24 }}>
          Submit question with streaming
        </button>
      </div>
      <p>Response</p>
      <div style = {{ border: 1, borderStyle: "solid", height: 100 }}>
        {answer}
      </div>
    </div>
  );
};

export default StreamResponse;

фастапи

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from langchain_community.llms import Ollama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser


app = FastAPI()

origins = ['*']

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=False,
    allow_methods=['*'],
    allow_headers=['*']
)

ollama = Ollama(
    base_url = "http://localhost:11434",
    model = "llama3"
)

system_prompt = (
    "You are an assistant for question-answering tasks. "
    "Use the following pieces of retrieved context to answer "
    "the question. If you don't know the answer, say that you "
    "don't know. Use three sentences maximum and keep the "
    "answer concise."
)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("human", "{input}"),
    ]
)

chain = (prompt | ollama | StrOutputParser())


class Question(BaseModel):
    question: str


@app.get('/test')
def read_test():
    return {'hello': 'world'}


@app.post('/nostream')
def no_stream_llm(question: Question):
    answer = chain.invoke({'input': question.question})
    print(answer)
    return {'answer': answer}


def stream_answer(question):
    for chunk in chain.stream(question):
        print(chunk, end='', flush=True)
        yield chunk


@app.post('/stream')
def stream_response_from_llm(question: Question):
    return StreamingResponse(stream_answer(question=question.question), media_type = "text/event-stream")

Возможно, этот ответ окажется для вас полезным.

Chris 03.08.2024 09:30
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Навигация по приложениям React: Исчерпывающее руководство по React Router
Навигация по приложениям React: Исчерпывающее руководство по React Router
React Router стала незаменимой библиотекой для создания одностраничных приложений с навигацией в React. В этой статье блога мы подробно рассмотрим...
Массив зависимостей в React
Массив зависимостей в React
Все о массиве Dependency и его связи с useEffect.
0
1
54
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я смог разобраться. Рабочий код представлен ниже, его также можно найти в этом репозитории. Код ни в коем случае не идеален, но он должен работать. Вот некоторые из моих наблюдений, пока я пытался разобраться в проблеме:

  1. Независимо от того, используете ли вы EventSource или fetchEventSource (@microsoft/fetch-event-source), оба должны появиться внутри useEffect. Я пытался создать функцию, которая настраивала бы это при нажатии кнопки (аналогично NoStreamResponse.tsx, но это не сработало (не знаю почему).
  2. Поскольку EventSource поддерживает только GET, я попробовал POST с fetchEventSource и заметил, что он всегда повторяет попытку (согласно console.info("retriableerror"). Когда я реализовал GET с fetchEventSource, этот журнал никогда не запускался.
  3. В серверной части, по какой-то причине, которую я еще не определил, оператор yield должен был быть в следующем формате: yield f'data: {chunk}\n\n. Оба \n\n были необходимы для того, чтобы это работало (опять же, не знаю почему).
  4. Мне пришлось включить asyncio.sleep(), чтобы замедлить ответ и его можно было транслировать. Без него поток не захватывается.

Рабочий код

main.py

import asyncio

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from langchain_community.llms import Ollama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser


app = FastAPI()

origins = ['*']

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=False,
    allow_methods=['*'],
    allow_headers=['*']
)

ollama = Ollama(
    base_url = "http://localhost:11434",
    model = "llama3"
)

system_prompt = (
    "You are an assistant for question-answering tasks. "
    "Use the following pieces of retrieved context to answer "
    "the question. If you don't know the answer, say that you "
    "don't know. Use three sentences maximum and keep the "
    "answer concise."
)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("human", "{input}"),
    ]
)

chain = (prompt | ollama | StrOutputParser())


class Question(BaseModel):
    question: str


@app.post('/nostream')
def no_stream_llm(question: Question):
    answer = chain.invoke({'input': question.question})
    print(answer)
    return {'answer': answer}


async def stream_answer(question):
    for chunk in chain.stream(question):
        print(chunk, end='', flush=True)
        yield f'data: {chunk}\n\n'
        await asyncio.sleep(0.25)


@app.get('/stream-with-get')
async def stream_response_from_llm_get(question: str):
    return StreamingResponse(stream_answer(question=question), media_type='text/event-stream')


@app.post('/stream-with-post')
async def stream_response_from_llm_post(question: Question):
    return StreamingResponse(stream_answer(question=question.question), media_type='text/event-stream')

App.tsx

import { useState } from "react";
import NoStreamResponse from "./components/NoStreamResponse";
import StreamResponseEventSource from "./components/StreamResponseEventSource";
import StreamResponseFetchEventSourcePost from "./components/StreamResponseFetchEventSourcePost";

function App() {
  const [input] = useState("What color is the sky?");

  return (
    <div style = {{ display: "flex", flexDirection: "column", gap: 50 }}>
      <p>Question: {input}</p>
      <NoStreamResponse input = {input} />
      <StreamResponseEventSource input = {input} />
      <StreamResponseFetchEventSourcePost input = {input} />
    </div>
  );
}

export default App;

NoStreamResponse.tsx

import { useState } from "react";

interface Response {
  answer: string;
}

const NoStreamResponse = ({ input }: { input: string }) => {
  const [answer, setAnswer] = useState("");

  const handleClick = () => {
    const handleResponse = (response: Response) => {
      console.info(response);
      setAnswer(response.answer);
    };

    fetch("http://localhost:8000/nostream", {
      body: JSON.stringify({ question: input }),
      method: "POST",
      headers: { "Content-type": "application/json" },
    })
      .then((response) => response.json())
      .then((response) => handleResponse(response))
      .catch((error) => console.error(error));
  };
  return (
    <div style = {{ width: "100%" }}>
      <div style = {{ display: "flex", gap: 10, alignItems: "center" }}>
        <button onClick = {handleClick} style = {{ height: 24 }}>
          Submit question with no stream
        </button>
      </div>
      <p>Response</p>
      <div style = {{ border: 1, borderStyle: "solid", height: 100 }}>
        {answer}
      </div>
    </div>
  );
};

export default NoStreamResponse;

StreamResponseEventSource.tsx

import { useState, useEffect } from "react";

const StreamResponseEventSource = ({ input }: { input: string }) => {
  const [answer, setAnswer] = useState("");
  const [startStream, setStartStream] = useState(false);

  useEffect(() => {
    if (startStream) {
      setAnswer("");
      const eventSource = new EventSource(
        `http://localhost:8000/stream-with-get?question=${input}`
      );

      eventSource.onmessage = function (event) {
        console.info(event);
        setAnswer((prevAnswer) => prevAnswer + event.data);
      };

      eventSource.onerror = function (err) {
        console.error("EventSource failed.");
        console.error(err);
        eventSource.close();
      };

      return () => {
        setStartStream(false);
        eventSource.close();
      };
    }
  }, [startStream, input]);

  return (
    <div style = {{ width: "100%" }}>
      <div style = {{ display: "flex", gap: 10, alignItems: "center" }}>
        <button onClick = {() => setStartStream(true)} style = {{ height: 24 }}>
          Stream with EventSource
        </button>
      </div>
      <p>Response</p>
      <div style = {{ border: 1, borderStyle: "solid", height: 100 }}>
        {answer}
      </div>
    </div>
  );
};

export default StreamResponseEventSource;

StreamResponseFetchEventSourceGet.tsx

import { useState, useEffect } from "react";
import {
  fetchEventSource,
  EventStreamContentType,
} from "@microsoft/fetch-event-source";

class RetriableError extends Error {}
class FatalError extends Error {}

const StreamResponseFetchEventSourceGet = ({ input }: { input: string }) => {
  const [answer, setAnswer] = useState("");
  const [startStream, setStartStream] = useState(false);

  useEffect(() => {
    if (startStream) {
      setAnswer("");

      fetchEventSource(
        `http://localhost:8000/stream-with-get?question=${input}`,
        {
          async onopen(response) {
            if (
              response.ok &&
              response.headers.get("content-type") === EventStreamContentType
            ) {
              console.info("everytings good");
              return; // everything's good
            } else if (
              response.status >= 400 &&
              response.status < 500 &&
              response.status !== 429
            ) {
              // client-side errors are usually non-retriable:
              throw new FatalError();
            } else {
              console.info("retriableerror");
              // throw new RetriableError();
            }
          },
          onmessage(event) {
            // if the server emits an error message, throw an exception
            // so it gets handled by the onerror callback below:
            if (event.event === "FatalError") {
              throw new FatalError(event.data);
            }
            console.info(event);
            setAnswer((prevMessages) => prevMessages + event.data);
          },
          onclose() {
            // if the server closes the connection unexpectedly, retry:
            console.info("onclose");
            // throw new RetriableError();
          },
          onerror(err) {
            if (err instanceof FatalError) {
              throw err; // rethrow to stop the operation
            } else {
              console.info("onerror");
              // do nothing to automatically retry. You can also
              // return a specific retry interval here.
            }
          },
        }
      );

      return () => {
        setStartStream(false);
      };
    }
  }, [startStream, input]);

  return (
    <div style = {{ width: "100%" }}>
      <div style = {{ display: "flex", gap: 10, alignItems: "center" }}>
        <button onClick = {() => setStartStream(true)} style = {{ height: 24 }}>
          Stream with fetchEventSource (GET)
        </button>
      </div>
      <p>Response</p>
      <div style = {{ border: 1, borderStyle: "solid", height: 100 }}>
        {answer}
      </div>
    </div>
  );
};

export default StreamResponseFetchEventSourceGet;

StreamResponseFetchEventSourcePost.tsx

import { useState, useEffect } from "react";
import {
  fetchEventSource,
  EventStreamContentType,
} from "@microsoft/fetch-event-source";

// class RetriableError extends Error {}
class FatalError extends Error {}

const StreamResponseFetchEventSourcePost = ({ input }: { input: string }) => {
  const [answer, setAnswer] = useState("");
  const [startStream, setStartStream] = useState(false);

  useEffect(() => {
    if (startStream) {
      setAnswer("");

      fetchEventSource("http://localhost:8000/stream-with-post", {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
        },
        body: JSON.stringify({ question: input }),
        async onopen(response) {
          if (
            response.ok &&
            response.headers.get("content-type") === EventStreamContentType
          ) {
            console.info("everything is good");
            return; // everything's good
          } else if (
            response.status >= 400 &&
            response.status < 500 &&
            response.status !== 429
          ) {
            // client-side errors are usually non-retriable:
            throw new FatalError();
          } else {
            // NOTE: This triggers for POST, but not GET. Not sure why
            console.info("retriableerror");
            // throw new RetriableError();
          }
        },
        onmessage(event) {
          // if the server emits an error message, throw an exception
          // so it gets handled by the onerror callback below:
          if (event.event === "FatalError") {
            throw new FatalError(event.data);
          }
          console.info(event);
          setAnswer((prevMessages) => prevMessages + event.data);
        },
        onclose() {
          // if the server closes the connection unexpectedly, retry:
          console.info("onclose");
          // throw new RetriableError();
        },
        onerror(err) {
          if (err instanceof FatalError) {
            throw err; // rethrow to stop the operation
          } else {
            console.info("onerror");
            // do nothing to automatically retry. You can also
            // return a specific retry interval here.
          }
        },
      });

      return () => {
        setStartStream(false);
      };
    }
  }, [startStream, input]);

  return (
    <div style = {{ width: "100%" }}>
      <div style = {{ display: "flex", gap: 10, alignItems: "center" }}>
        <button onClick = {() => setStartStream(true)} style = {{ height: 24 }}>
          Stream with fetchEventSource (POST)
        </button>
      </div>
      <p>Response</p>
      <div style = {{ border: 1, borderStyle: "solid", height: 100 }}>
        {answer}
      </div>
    </div>
  );
};

export default StreamResponseFetchEventSourcePost;

Другие вопросы по теме