5 min read조회 0

Node.js에서 AI API 스트리밍 응답 제대로 구현하기

OpenAI, Claude 등 AI API의 스트리밍 응답을 Node.js에서 효율적으로 처리하는 실무 패턴을 알아봅니다. SSE, 백프레셔 처리, 에러 핸들링까지.

정다운

ChatGPT나 Claude API를 사용할 때 스트리밍 응답은 필수입니다. 사용자가 긴 응답을 기다리지 않고 실시간으로 텍스트가 나타나는 경험, 이제 직접 구현해봅시다.

왜 스트리밍인가?

// ❌ 일반 요청: 전체 응답 완료까지 대기 (10-30초)
const response = await openai.chat.completions.create({
  model: 'gpt-4',
  messages: [{ role: 'user', content: '긴 에세이를 써줘' }],
});
// 사용자는 빈 화면만 보며 기다림...
 
// ✅ 스트리밍: 첫 토큰이 즉시 표시 (~200ms)
const stream = await openai.chat.completions.create({
  model: 'gpt-4',
  messages: [{ role: 'user', content: '긴 에세이를 써줘' }],
  stream: true,
});
// 실시간으로 글자가 나타남!

기본 구현: OpenAI SDK 스트리밍

import OpenAI from 'openai';
 
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
 
async function streamChat(prompt: string) {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: [{ role: 'user', content: prompt }],
    stream: true,
  });
 
  let fullContent = '';
  
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    fullContent += content;
    process.stdout.write(content); // 실시간 출력
  }
 
  return fullContent;
}

Express + SSE로 프론트엔드에 전달하기

Server-Sent Events(SSE)는 스트리밍에 가장 적합한 방식입니다.

// server.ts
import express from 'express';
import OpenAI from 'openai';
 
const app = express();
const openai = new OpenAI();
 
app.get('/api/chat/stream', async (req, res) => {
  const prompt = req.query.prompt as string;
  
  // SSE 헤더 설정
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  res.flushHeaders();
 
  try {
    const stream = await openai.chat.completions.create({
      model: 'gpt-4-turbo',
      messages: [{ role: 'user', content: prompt }],
      stream: true,
    });
 
    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        // SSE 형식: "data: {내용}\n\n"
        res.write(`data: ${JSON.stringify({ content })}\n\n`);
      }
    }
 
    res.write(`data: ${JSON.stringify({ done: true })}\n\n`);
    res.end();
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: '스트림 오류' })}\n\n`);
    res.end();
  }
});

프론트엔드에서 받기

// React에서 SSE 수신
function useChatStream() {
  const [content, setContent] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
 
  const startStream = useCallback(async (prompt: string) => {
    setContent('');
    setIsStreaming(true);
 
    const eventSource = new EventSource(
      `/api/chat/stream?prompt=${encodeURIComponent(prompt)}`
    );
 
    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      
      if (data.done) {
        eventSource.close();
        setIsStreaming(false);
        return;
      }
      
      if (data.content) {
        setContent((prev) => prev + data.content);
      }
    };
 
    eventSource.onerror = () => {
      eventSource.close();
      setIsStreaming(false);
    };
  }, []);
 
  return { content, isStreaming, startStream };
}

실무 패턴 1: 타임아웃과 재시도

AI API는 느릴 수 있습니다. 적절한 타임아웃과 재시도 로직이 필수입니다.

import { setTimeout } from 'timers/promises';
 
async function streamWithRetry(
  prompt: string,
  maxRetries = 3,
  timeoutMs = 60000
) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    const controller = new AbortController();
    const timeoutId = setTimeout(timeoutMs).then(() => controller.abort());
 
    try {
      const stream = await openai.chat.completions.create({
        model: 'gpt-4-turbo',
        messages: [{ role: 'user', content: prompt }],
        stream: true,
      }, {
        signal: controller.signal,
      });
 
      const chunks: string[] = [];
      for await (const chunk of stream) {
        chunks.push(chunk.choices[0]?.delta?.content || '');
      }
 
      return chunks.join('');
    } catch (error) {
      if (attempt === maxRetries) throw error;
      
      // 지수 백오프
      await setTimeout(Math.pow(2, attempt) * 1000);
      console.log(`재시도 ${attempt}/${maxRetries}`);
    }
  }
}

실무 패턴 2: 백프레셔 처리

클라이언트가 느리면 서버 메모리가 폭발할 수 있습니다.

import { Transform } from 'stream';
 
function createThrottledStream(res: Response, maxBufferSize = 100) {
  let buffer: string[] = [];
  let draining = false;
 
  const flush = () => {
    if (buffer.length === 0 || draining) return;
    
    draining = true;
    const chunk = buffer.shift()!;
    
    const canContinue = res.write(`data: ${chunk}\n\n`);
    
    if (canContinue) {
      draining = false;
      if (buffer.length > 0) setImmediate(flush);
    } else {
      res.once('drain', () => {
        draining = false;
        flush();
      });
    }
  };
 
  return {
    push(content: string) {
      if (buffer.length >= maxBufferSize) {
        // 버퍼 초과 시 오래된 데이터 버림 (또는 에러 처리)
        console.warn('Buffer overflow, dropping oldest chunk');
        buffer.shift();
      }
      buffer.push(JSON.stringify({ content }));
      flush();
    },
    end() {
      buffer.push(JSON.stringify({ done: true }));
      flush();
    }
  };
}

실무 패턴 3: 토큰 사용량 추적

스트리밍에서도 토큰 사용량을 추적해야 비용 관리가 됩니다.

interface StreamResult {
  content: string;
  usage: {
    promptTokens: number;
    completionTokens: number;
    totalTokens: number;
  };
}
 
async function streamWithUsageTracking(prompt: string): Promise<StreamResult> {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4-turbo',
    messages: [{ role: 'user', content: prompt }],
    stream: true,
    stream_options: { include_usage: true }, // 중요!
  });
 
  let content = '';
  let usage = { promptTokens: 0, completionTokens: 0, totalTokens: 0 };
 
  for await (const chunk of stream) {
    content += chunk.choices[0]?.delta?.content || '';
    
    // 마지막 청크에 usage 정보가 포함됨
    if (chunk.usage) {
      usage = {
        promptTokens: chunk.usage.prompt_tokens,
        completionTokens: chunk.usage.completion_tokens,
        totalTokens: chunk.usage.total_tokens,
      };
    }
  }
 
  return { content, usage };
}

실무 패턴 4: 중단 처리

사용자가 중간에 취소할 수 있어야 합니다.

app.get('/api/chat/stream', async (req, res) => {
  const controller = new AbortController();
  
  // 클라이언트 연결 종료 감지
  req.on('close', () => {
    console.log('Client disconnected, aborting stream');
    controller.abort();
  });
 
  try {
    const stream = await openai.chat.completions.create({
      model: 'gpt-4-turbo',
      messages: [{ role: 'user', content: req.query.prompt as string }],
      stream: true,
    }, {
      signal: controller.signal,
    });
 
    for await (const chunk of stream) {
      if (controller.signal.aborted) break;
      // ... 처리
    }
  } catch (error) {
    if (error.name === 'AbortError') {
      console.log('Stream aborted gracefully');
      return;
    }
    throw error;
  }
});

결론

AI API 스트리밍은 단순히 stream: true를 붙이는 것 이상입니다. 실서비스에서는:

  1. SSE로 프론트엔드 연동 - WebSocket보다 간단하고 충분함
  2. 타임아웃과 재시도 - AI API는 불안정할 수 있음
  3. 백프레셔 처리 - 메모리 폭발 방지
  4. 토큰 사용량 추적 - 비용 관리 필수
  5. 중단 처리 - 사용자 경험과 리소스 관리

이 패턴들을 적용하면 ChatGPT 같은 부드러운 스트리밍 경험을 직접 구현할 수 있습니다.

💬 댓글