생성형 AI 미니 프로젝트(3): Open AI 실시간 응답 구현하기 (feat. SSE)
Skala 과정에서 생성형 AI 대해 새롭게 알게 되었습니다.
생성형 AI 수업 중 진행하게 된 미니 프로젝트 구현 과정을 기술합니다.
Open AI의 응답을 실시간으로 클라이언트에 전달하고 싶어서
Fast API의 StreaminResponse를 사용한 SSE를 통해 해당 기능을 구현했습니다!
실시간 번역 기능 구현하기
openai_router.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from typing import Union
from fastapi import APIRouter, Header, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.database import get_db
from app.services.openai_service import get_streaming_message_from_openai
from app.services.log_script_service import logger
from fastapi.responses import StreamingResponse
from app.dto.message_request import MessageRequest
router = APIRouter()
@router.post(
"/translate",
summary="Generate messages using OpenAI (streaming)",
)
async def get_generated_messages_with_header(
data: MessageRequest,
x_script_id: Union[str, None] = Header(default=None),
db: AsyncSession = Depends(get_db)
):
"""
## Open AI에 번역 요청
- Header: "x-script-id" 포함되어야 함!
- data: MessageRequest
- lang: 언어 코드 (e.g., "en-US", "ko-KR")
- message: 번역할 메시지
"""
await logger(db, data, x_script_id)
return StreamingResponse(
get_streaming_message_from_openai(data),
media_type="text/event-stream"
)
.
.
.
StreamingResponse를 사용해서 SSE를 구현합니다.
openai_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import logging
import os
from fastapi import HTTPException
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_teddynote.messages import stream_response
from app.dto.message_request import MessageRequest
OPENAI_API_KEY=os.getenv("OPENAI_API_KEY")
model = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.5,
openai_api_key=OPENAI_API_KEY,
streaming=True
)
templete='{text}을 {lang}로 번역해주세요. 번역 된 문장만 출력해주세요.'
prompt = PromptTemplate.from_template(templete)
output_parser = StrOutputParser()
chain = prompt | model | output_parser
async def get_streaming_message_from_openai(data: MessageRequest):
try:
# 스트리밍 응답 생성
response = chain.astream({
"text": data.message,
"lang": data.lang
})
async for token in response:
content = f"data: {token}\n\n"
yield content
yield "data: [DONE]\n\n"
except Exception as e:
# 에러 발생 시 스트리밍 방식으로 에러 메시지 반환
logging.error(f"Error occurred: {str(e)}")
raise HTTPException(status_code=500, detail=f"OpenAI Error: {str(e)}")
수업 시간에 배웠던 chain 과 StrOutputParser 을 사용해서 프롬프트와 모델을 잇고, 응답을 예쁘게(원하는 정보만) 받을 수 있게 설정해주었습니다.
streaming 옵션을 True로 설정해서 응답을 스트리밍 방식으로 하도록 해주고 astream 으로 응답을 받아왔습니다.
stream은 동기 방식으로 스트리밍 응답을 주는 함수이고,astream이 비동기 방식으로 스트리밍 응답을 주는 함수이기 때문에astream을 사용했습니다!
aysnc for 을 사용해서 응답이 올 때마다 SSE로 결과를 전송할 수 있도록 해주었습니다.
마치며
이전 STT에 비해 굉장히 싱겁게(?) 끝났지만,,
옵션과 함수를 사용하기 위해 ChatOpenAI 객체를 뜯어봤습니다,,
쪼금 오래걸렸어용,,
참고하시고 행복한 코딩 생활 하세욥 😀
참고 자료
This post is licensed under CC BY 4.0 by the author.