Node.js에서 Web Streams API를 사용하여 데이터 스트림 탐색하기
Ethan Miller
Product Engineer · Leapcell

소개
오늘날 데이터 중심 세계에서 많은 양의 정보를 효율적으로 처리하는 것은 고성능 및 확장이 가능한 애플리케이션 구축에 매우 중요합니다. 일반적으로 처리 전에 전체 데이터셋을 메모리에 로드하는 기존 접근 방식은 수 기가바이트 크기의 파일을 다루거나, 실시간 데이터 피드를 사용하거나, 순차적인 작업을 수행할 때 빠르게 병목 현상을 일으킵니다. 이때 데이터 스트리밍의 강력함이 등장합니다. 데이터를 한 번에 모두 처리하는 대신 청크(chunk) 단위로 처리함으로써 애플리케이션은 낮은 메모리 사용량을 유지하고, 지연 시간을 줄이며, 반응성을 향상시킬 수 있습니다.
Node.js는 자체 stream
모듈을 오랫동안 제공해 왔지만, 원래 브라우저를 위해 설계된 Web Streams API의 등장은 다양한 JavaScript 환경에서 스트리밍 데이터를 일관되게 처리할 수 있는 표준화되고 강력한 대안을 제시했습니다. 이 글에서는 Node.js 내에서 Web Streams API를 효과적으로 활용하여 데이터 처리를 고도로 효율화하는 방법을 자세히 살펴봅니다.
Web Streams 이해하기
구현 세부 사항을 깊이 들어가기 전에 Web Streams API의 몇 가지 핵심 개념을 명확히 해 봅시다. Web Streams API는 본질적으로 데이터 스트림을 생성, 조합 및 소비할 수 있는 인터페이스를 정의합니다. 세 가지 기본 스트림 유형은 다음과 같습니다.
ReadableStream
: 순차적으로 데이터를 읽을 수 있는 데이터 소스를 나타냅니다. 이를 수돗가라고 생각하면 지속적으로 흐르는 물을 받을 수 있습니다.WritableStream
: 데이터를 순차적으로 쓸 수 있는 데이터 대상을 나타냅니다. 이것은 물을 쏟아낼 수 있는 배수구와 같습니다.TransformStream
:WritableStream
과ReadableStream
역할을 모두 수행합니다. 데이터를 통과하면서 처리하여 한 형식에서 다른 형식으로 변환합니다. 물이 흐르면서 정화하는 필터를 상상해 보세요.
이러한 스트림 유형은 처리되는 정보의 기본 단위인 "청크(chunk)" 데이터를 기반으로 작동합니다. Web Streams API의 주요 장점은 약속(promise) 기반 인터페이스를 통해 현대적인 비동기 JavaScript 패턴, 특히 async/await
와 원활하게 통합된다는 것입니다.
Node.js에서 Web Streams를 사용하는 이유?
Node.js에는 네이티브 stream
모듈이 있지만, Web Streams API는 몇 가지 매력적인 이점을 제공합니다.
- 일관성: 브라우저와 Node.js 환경 모두에서 데이터 스트리밍을 위한 통합 API를 제공하여 풀스택 JavaScript 개발자의 개발을 단순화합니다.
- 조합성: 스트림의 쉬운 체인 연결 및 파이핑을 위해 설계되어 최소한의 노력으로 복잡한 데이터 변환 파이프라인을 활성화합니다.
- 백프레셔(Backpressure) 처리: 생산자가 소비자를 압도하지 않도록 하는 내장 메커니즘으로, 안정적이고 효율적인 성능에 중요합니다.
- Async Iterator:
ReadableStream
은 기본적으로async
이터러블이므로for await...of
루프를 사용하여 우아하게 소비할 수 있습니다.
Node.js에서 Web Streams 구현하기
실제 예제를 통해 Node.js 컨텍스트 내에서 각 스트림 유형을 사용하는 방법을 살펴보겠습니다.
1. ReadableStream 생성하기
ReadableStream
은 데이터 청크를 푸시할 수 있는 ReadableStreamDefaultController
와 함께 생성될 수 있습니다.
import { ReadableStream } from 'node:stream/web'; // 'node:stream/web'에서 가져오기 async function createNumberStream(limit) { let counter = 0; return new ReadableStream({ async start(controller) { console.log('ReadableStream started'); while (counter < limit) { // 비동기 작업 시뮬레이션 await new Promise(resolve => setTimeout(resolve, 50)); controller.enqueue(counter++); console.log(`Enqueued: ${counter - 1}`); } controller.close(); console.log('ReadableStream closed'); }, pull(controller) { // 스트림이 더 많은 데이터를 원할 때 이 메서드가 호출됩니다. // 이 예제에서는 `start`에서 선제적으로 데이터를 푸시합니다. //console.log('Pull requested'); }, cancel() { console.log('ReadableStream cancelled'); } }); } // ReadableStream 소비하기 async function consumeStream() { const numberStream = await createNumberStream(5); console.log('--- Consuming Number Stream ---'); for await (const chunk of numberStream) { console.log(`Received: ${chunk}`); } console.log('--- Consumption Complete ---'); } consumeStream();
이 예제에서 createNumberStream
은 숫자 시퀀스를 생성합니다. start
메서드는 데이터 생산 로직이 있는 곳이며, controller.enqueue()
를 사용하여 데이터를 푸시합니다. for await...of
루프는 스트림을 소비하는 깔끔하고 관용적인 방법을 제공합니다.
2. WritableStream 생성하기
WritableStream
을 사용하면 대상으로 데이터를 쓸 수 있습니다.
import { WritableStream } from 'node:stream/web'; async function createLoggingWritableStream() { return new WritableStream({ async start(controller) { console.log('WritableStream started'); }, async write(chunk, controller) { // 비동기 쓰기 작업 시뮬레이션 await new Promise(resolve => setTimeout(resolve, 20)); console.log(`Writing: ${chunk}`); }, async close() { console.log('WritableStream closed'); }, async abort(reason) { console.error('WritableStream aborted', reason); } }); } // WritableStream 사용하기 async function writeToStream() { const loggingStream = await createLoggingWritableStream(); const writer = loggingStream.getWriter(); console.log('--- Writing Data ---'); for (let i = 0; i < 5; i++) { await writer.write(`Data chunk ${i}`); } await writer.close(); console.log('--- Writing Complete ---'); } writeToStream();
여기서 createLoggingWritableStream
은 수신하는 각 청크를 단순히 로깅합니다. getWriter()
에서 얻은 writer
객체를 사용하여 WritableStream
에 데이터를 푸시합니다.
3. TransformStream 생성하기
TransformStream
은 파이프라인에서 데이터 수정의 중추입니다.
import { ReadableStream, WritableStream, TransformStream } from 'node:stream/web'; // 숫자를 제곱 값으로 변환하는 TransformStream 생성 function createDoublerTransformStream() { return new TransformStream({ start(controller) { console.log('TransformStream started'); }, transform(chunk, controller) { console.log(`Transforming: ${chunk} -> ${chunk * 2}`); controller.enqueue(chunk * 2); }, flush(controller) { console.log('TransformStream flushed (all input processed)'); } }); } // 스트림 연결: 원시 숫자 -> 두 배된 숫자 -> 로깅됨 async function chainStreams() { const numberStream = new ReadableStream({ start(controller) { for (let i = 1; i <= 3; i++) { controller.enqueue(i); } controller.close(); } }); const doubledStream = createDoublerTransformStream(); const loggingStream = new WritableStream({ write(chunk) { console.log(`Received final: ${chunk}`); }, }); console.log('--- Chaining Streams ---'); await numberStream .pipeThrough(doubledStream) // 변환 스트림을 통해 파이프 .pipeTo(loggingStream); // 쓰기 가능한 스트림으로 파이프 console.log('--- Chaining Complete ---'); } chainStreams();
이 예제는 스트림 파이프라인을 구성하는 핵심 메서드인 pipeThrough()
및 pipeTo()
를 보여줍니다. pipeThrough()
는 TransformStream
을 연결하고, pipeTo()
는 최종 ReadableStream
을 WritableStream
에 연결합니다.
애플리케이션 시나리오
Node.js의 Web Streams API는 다음과 같은 경우에 특히 강력합니다.
- 파일 처리: CSV, 로그 파일 또는 수 기가바이트 크기의 아카이브와 같은 대용량 파일을 메모리 부족 없이 청크 단위로 읽고 쓰는 작업.
- 네트워크 프록시/로드 밸런서: 전체 요청/응답을 버퍼링하지 않고 클라이언트와 서버 간의 데이터를 효율적으로 전달합니다.
- 실시간 데이터 처리: 웹 소켓이나 메시지 큐에서 들어오는 데이터를 처리하고, 변환을 수행한 다음, 다운스트림 서비스로 푸시합니다.
- 데이터 압축/암호화: 스트림 기반 압축(예: Gzip) 또는 암호화를 데이터 파이프라인 내에서 직접 구현합니다.
- API 파이프라이닝: 한 API 호출의 출력이 다음 API 호출의 입력이 되는 여러 API 호출을 연결하면서 데이터를 스트리밍합니다.
예를 들어, 대용량 CSV 파일을 다운로드하여 내용을 필터링한 다음 필터링된 데이터를 다른 서비스에 업로드하는 시나리오를 생각해 봅시다. 스트림 기반 접근 방식은 다음과 같을 것입니다.
import { ReadableStream, WritableStream, TransformStream } from 'node:stream/web'; import { createReadStream, createWriteStream } from 'node:fs'; import { pipeline } from 'node:stream/promises'; // 네이티브 Node.js 스트림용 // 키워드로부터 줄을 필터링하는 클래스 class LineFilterTransform extends TransformStream { constructor(keyword) { let buffer = ''; super({ transform(chunk, controller) { buffer += new TextDecoder().decode(chunk); const lines = buffer.split('\n'); buffer = lines.pop(); // 마지막 (불완전한) 줄 유지 for (const line of lines) { if (line.includes(keyword)) { controller.enqueue(new TextEncoder().encode(line + '\n')); } } }, flush(controller) { if (buffer.length > 0 && buffer.includes(keyword)) { controller.enqueue(new TextEncoder().encode(buffer + '\n')); } } }); } } // 데모용 더미 대용량 파일 생성 // fs.writeFileSync('large_data.txt', Array(1000).fill('Hello world\nAnother line with Node.js\n').join('')); async function processLargeFile() { const inputFile = createReadStream('large_data.txt'); // 네이티브 Node.js Readable Stream const outputFile = createWriteStream('filtered_data.txt'); // 네이티브 Node.js Writable Stream const webReadable = ReadableStream.from(inputFile); // 네이티브를 Web Stream으로 변환 const webWritable = new WritableStream({ async write(chunk) { outputFile.write(chunk); }, close() { outputFile.end(); }, abort(reason) { outputFile.destroy(reason); } }); const filterStream = new LineFilterTransform('Node.js'); console.log('--- Processing Large File ---'); await webReadable .pipeThrough(filterStream) .pipeTo(webWritable); console.log('--- File Processing Complete ---'); console.log('Filtered data written to filtered_data.txt'); } // 실행하기 전에 'large_data.txt'가 일부 내용과 함께 존재하는지 확인 // 테스트를 위해 다음을 사용하여 생성할 수 있습니다. // require('fs').writeFileSync('large_data.txt', Array(1000).fill('Hello world\nAnother line with Node.js\n').join('')); processLargeFile().catch(console.error);
이 예제는 ReadableStream.from()
및 사용자 지정 WritableStream
어댑터를 사용하여 네이티브 Node.js 스트림과 Web Streams를 연결하는 방법을 보여주며, 강력한 파일 처리 파이프라인을 가능하게 합니다.
결론
Web Streams API는 JavaScript에서 데이터 스트림을 처리하는 현대적이고 효율적이며 표준화된 접근 방식을 제공하여 Node.js 애플리케이션에 상당한 이점을 제공합니다. ReadableStream
, WritableStream
, TransformStream
을 활용함으로써 개발자는 정보 흐름을 우아하게 관리하는 강력하고 메모리 효율적이며 고도로 조합 가능한 데이터 파이프라인을 구축할 수 있습니다. Web Streams API를 활용하면 Node.js에서 비동기 데이터 처리의 성능과 아키텍처적 깔끔함이 한 단계 향상됩니다.