•5 min read•조회 0
Node.js에서 AI API 스트리밍 응답 제대로 구현하기
OpenAI, Claude 등 AI API의 스트리밍 응답을 Node.js에서 효율적으로 처리하는 실무 패턴을 알아봅니다. SSE, 백프레셔 처리, 에러 핸들링까지.
정다운
ChatGPT나 Claude API를 사용할 때 스트리밍 응답은 필수입니다. 사용자가 긴 응답을 기다리지 않고 실시간으로 텍스트가 나타나는 경험, 이제 직접 구현해봅시다.
왜 스트리밍인가?
// ❌ 일반 요청: 전체 응답 완료까지 대기 (10-30초)
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: '긴 에세이를 써줘' }],
});
// 사용자는 빈 화면만 보며 기다림...
// ✅ 스트리밍: 첫 토큰이 즉시 표시 (~200ms)
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: '긴 에세이를 써줘' }],
stream: true,
});
// 실시간으로 글자가 나타남!기본 구현: OpenAI SDK 스트리밍
import OpenAI from 'openai';
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
async function streamChat(prompt: string) {
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: [{ role: 'user', content: prompt }],
stream: true,
});
let fullContent = '';
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
fullContent += content;
process.stdout.write(content); // 실시간 출력
}
return fullContent;
}Express + SSE로 프론트엔드에 전달하기
Server-Sent Events(SSE)는 스트리밍에 가장 적합한 방식입니다.
// server.ts
import express from 'express';
import OpenAI from 'openai';
const app = express();
const openai = new OpenAI();
app.get('/api/chat/stream', async (req, res) => {
const prompt = req.query.prompt as string;
// SSE 헤더 설정
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
try {
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: [{ role: 'user', content: prompt }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
// SSE 형식: "data: {내용}\n\n"
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.write(`data: ${JSON.stringify({ done: true })}\n\n`);
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: '스트림 오류' })}\n\n`);
res.end();
}
});프론트엔드에서 받기
// React에서 SSE 수신
function useChatStream() {
const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const startStream = useCallback(async (prompt: string) => {
setContent('');
setIsStreaming(true);
const eventSource = new EventSource(
`/api/chat/stream?prompt=${encodeURIComponent(prompt)}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.done) {
eventSource.close();
setIsStreaming(false);
return;
}
if (data.content) {
setContent((prev) => prev + data.content);
}
};
eventSource.onerror = () => {
eventSource.close();
setIsStreaming(false);
};
}, []);
return { content, isStreaming, startStream };
}실무 패턴 1: 타임아웃과 재시도
AI API는 느릴 수 있습니다. 적절한 타임아웃과 재시도 로직이 필수입니다.
import { setTimeout } from 'timers/promises';
async function streamWithRetry(
prompt: string,
maxRetries = 3,
timeoutMs = 60000
) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
const controller = new AbortController();
const timeoutId = setTimeout(timeoutMs).then(() => controller.abort());
try {
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: [{ role: 'user', content: prompt }],
stream: true,
}, {
signal: controller.signal,
});
const chunks: string[] = [];
for await (const chunk of stream) {
chunks.push(chunk.choices[0]?.delta?.content || '');
}
return chunks.join('');
} catch (error) {
if (attempt === maxRetries) throw error;
// 지수 백오프
await setTimeout(Math.pow(2, attempt) * 1000);
console.log(`재시도 ${attempt}/${maxRetries}`);
}
}
}실무 패턴 2: 백프레셔 처리
클라이언트가 느리면 서버 메모리가 폭발할 수 있습니다.
import { Transform } from 'stream';
function createThrottledStream(res: Response, maxBufferSize = 100) {
let buffer: string[] = [];
let draining = false;
const flush = () => {
if (buffer.length === 0 || draining) return;
draining = true;
const chunk = buffer.shift()!;
const canContinue = res.write(`data: ${chunk}\n\n`);
if (canContinue) {
draining = false;
if (buffer.length > 0) setImmediate(flush);
} else {
res.once('drain', () => {
draining = false;
flush();
});
}
};
return {
push(content: string) {
if (buffer.length >= maxBufferSize) {
// 버퍼 초과 시 오래된 데이터 버림 (또는 에러 처리)
console.warn('Buffer overflow, dropping oldest chunk');
buffer.shift();
}
buffer.push(JSON.stringify({ content }));
flush();
},
end() {
buffer.push(JSON.stringify({ done: true }));
flush();
}
};
}실무 패턴 3: 토큰 사용량 추적
스트리밍에서도 토큰 사용량을 추적해야 비용 관리가 됩니다.
interface StreamResult {
content: string;
usage: {
promptTokens: number;
completionTokens: number;
totalTokens: number;
};
}
async function streamWithUsageTracking(prompt: string): Promise<StreamResult> {
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: [{ role: 'user', content: prompt }],
stream: true,
stream_options: { include_usage: true }, // 중요!
});
let content = '';
let usage = { promptTokens: 0, completionTokens: 0, totalTokens: 0 };
for await (const chunk of stream) {
content += chunk.choices[0]?.delta?.content || '';
// 마지막 청크에 usage 정보가 포함됨
if (chunk.usage) {
usage = {
promptTokens: chunk.usage.prompt_tokens,
completionTokens: chunk.usage.completion_tokens,
totalTokens: chunk.usage.total_tokens,
};
}
}
return { content, usage };
}실무 패턴 4: 중단 처리
사용자가 중간에 취소할 수 있어야 합니다.
app.get('/api/chat/stream', async (req, res) => {
const controller = new AbortController();
// 클라이언트 연결 종료 감지
req.on('close', () => {
console.log('Client disconnected, aborting stream');
controller.abort();
});
try {
const stream = await openai.chat.completions.create({
model: 'gpt-4-turbo',
messages: [{ role: 'user', content: req.query.prompt as string }],
stream: true,
}, {
signal: controller.signal,
});
for await (const chunk of stream) {
if (controller.signal.aborted) break;
// ... 처리
}
} catch (error) {
if (error.name === 'AbortError') {
console.log('Stream aborted gracefully');
return;
}
throw error;
}
});결론
AI API 스트리밍은 단순히 stream: true를 붙이는 것 이상입니다. 실서비스에서는:
- SSE로 프론트엔드 연동 - WebSocket보다 간단하고 충분함
- 타임아웃과 재시도 - AI API는 불안정할 수 있음
- 백프레셔 처리 - 메모리 폭발 방지
- 토큰 사용량 추적 - 비용 관리 필수
- 중단 처리 - 사용자 경험과 리소스 관리
이 패턴들을 적용하면 ChatGPT 같은 부드러운 스트리밍 경험을 직접 구현할 수 있습니다.