Django와 Flask에서 Celery를 사용한 비동기 작업 처리
Olivia Novak
Dev Intern · Leapcell

소개
웹 개발의 세계에서 사용자 경험은 무엇보다 중요합니다. 느리게 로딩되는 페이지나 응답 없는 애플리케이션은 사용자에게 좌절감을 안겨주고 이탈하게 만들 수 있습니다. 종종 애플리케이션은 본질적으로 시간이 많이 소요되는 작업을 수행해야 합니다. 이미지 처리, 대량 이메일 발송, 복잡한 보고서 생성 또는 외부 API와의 상호 작용 등을 생각해 보세요. 이러한 작업이 웹 요청의 일부로 동기식으로 실행되면 애플리케이션의 메인 스레드를 차단하여 상당한 지연과 좋지 않은 사용자 경험을 유발할 수 있습니다. 이것이 바로 비동기 작업 큐가 등장하는 곳으로, 이러한 무거운 작업을 메인 요청-응답 주기에서 오프로드할 수 있도록 합니다. 이 글에서는 강력한 분산 작업 큐인 Celery를 인기 있는 Python 웹 프레임워크인 Django 및 Flask에 통합하여 이러한 어려운 작업을 우아하게 처리하고 애플리케이션의 응답성을 유지하며 전반적인 성능을 향상시키는 방법을 살펴봅니다.
비동기 작업을 위한 핵심 개념
통합 세부 정보에 들어가기 전에 Celery를 사용하여 비동기 작업 시스템을 구축할 때 관련된 주요 구성 요소와 개념에 대한 공통된 이해를 확립해 봅시다.
Celery: Celery의 핵심은 분산 메시지 전달을 기반으로 하는 비동기 작업 큐/작업 큐입니다. 대량의 메시지를 처리하고 실시간(거의 실시간) 구성 요소를 가진 작업을 제공하도록 설계되었습니다.
브로커(Broker): Celery는 메시지를 보내고 받기 위한 메시지 브로커를 필요로 합니다. 브로커는 실행해야 하는 작업을 저장하고 작업자에게 전달하는 중개자 역할을 합니다. 인기 있는 선택지로는 RabbitMQ, Redis, Amazon SQS가 있습니다.
작업자(Worker): Celery 작업자는 메시지 브로커에서 새 작업을 지속적으로 모니터링하는 별도의 프로세스입니다. 작업을 검색하면 작업자는 이를 실행하고 선택적으로 결과를 저장합니다. 작업 처리 기능을 확장하기 위해 여러 작업자를 실행할 수 있습니다.
작업(Task): Celery에서 작업은 작업자가 비동기적으로 실행하는 호출 가능한 Python 함수입니다. 이러한 함수는 일반적으로 애플리케이션에서 정의되고 Celery 작업으로 인식되도록 장식됩니다.
결과 백엔드(Result Backend) (선택 사항): 작업이 완료된 후 해당 결과 또는 상태를 검색하고 싶을 수 있습니다. 결과 백엔드는 이 정보를 저장합니다. 일반적인 선택지로는 Redis, SQLAlchemy, Django ORM, Memcached가 있습니다.
Django에서 Celery 구현
Celery를 Django 프로젝트에 통합하는 것은 몇 가지 간단한 단계, 즉 구성 설정, 작업 정의 및 필요한 구성 요소 실행으로 이루어집니다.
1. 설치
먼저 Celery와 선택한 브로커를 설치합니다. 이 예에서는 Redis를 브로커 및 결과 백엔드로 사용합니다.
pip install celery redis
2. Django 프로젝트 설정
Django 프로젝트에 Celery
인스턴스를 생성합니다. 일반적으로 이는 proj_name/celery.py
파일에서 수행됩니다.
# proj_name/celery.py import os from celery import Celery # 'celery' 프로그램에 대한 기본 Django 설정 모듈을 설정합니다. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj_name.settings') app = Celery('proj_name') # 여기서 문자열을 사용하면 작업자가 구성 개체를 하위 프로세스로 직렬화할 필요가 없습니다. # - namespace='CELERY'는 모든 Celery 관련 구성 키에 `CELERY_` 접두사가 있어야 함을 의미합니다. app.config_from_object('django.conf:settings', namespace='CELERY') # 등록된 모든 Django 앱 구성에서 작업 모듈을 로드합니다. app.autodiscover_tasks() @app.task(bind=True, ignore_result=True) def debug_task(self): print(f'Request: {self.request!r}')
그런 다음 Django가 시작될 때 로드되도록 프로젝트의 __init__.py
에 이 Celery 앱을 가져옵니다.
# proj_name/__init__.py from .celery import app as celery_app
3. settings.py
의 Celery 구성
Django 프로젝트의 settings.py
에 Celery 구성을 추가합니다.
# proj_name/settings.py # ... 기존 설정 ... CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' # 또는 원하는 시간대
4. 작업 정의
비동기 작업을 정의하기 위해 Django 앱 중 하나(예: my_app/tasks.py
) 내에 tasks.py
파일을 만듭니다.
# my_app/tasks.py import time from celery import shared_task @shared_task def send_marketing_emails(user_ids): print(f"Starting to send emails to {len(user_ids)} users...") for user_id in user_ids: # 시간이 많이 소요되는 이메일 전송 프로세스 시뮬레이션 time.sleep(2) print(f"Email sent to user {user_id}") return f"Finished sending emails to {len(user_ids)} users." @shared_task def generate_report(report_id): print(f"Generating report {report_id}...") time.sleep(10) # 긴 보고서 생성 시뮬레이션 print(f"Report {report_id} generated successfully!") return {"status": "completed", "report_id": report_id}
5. 작업 호출
Django 뷰 또는 애플리케이션의 다른 부분에서 이러한 작업을 호출할 수 있습니다.
# my_app/views.py from django.http import HttpResponse from .tasks import send_marketing_emails, generate_report def email_sending_view(request): user_ids_to_email = [1, 2, 3, 4, 5] # .delay()는 .apply_async()의 바로 가기입니다. send_marketing_emails.delay(user_ids_to_email) return HttpResponse("Email sending initiated. Check logs for progress.") def report_generation_view(request): report_task = generate_report.delay("monthly_sales") # 나중에 상태를 확인하기 위해 작업 ID를 가져올 수 있습니다. return HttpResponse(f"Report generation initiated. Task ID: {report_task.id}") def check_report_status(request, task_id): from celery.result import AsyncResult result = AsyncResult(task_id) return HttpResponse(f"Task ID: {task_id}, Status: {result.status}, Result: {result.result}")
6. Celery 작업자와 브로커 실행
실행 중인 Redis 서버(또는 선택한 브로커)와 Celery 작업자가 필요합니다.
Redis 시작:
redis-server
Django 프로젝트 루트에서 Celery 작업자 시작:
celery -A proj_name worker -l info
-A proj_name
은 Django 프로젝트를 지정합니다. -l info
는 로깅 수준을 설정합니다.
Celery와 Flask 통합
Flask의 경우에도 애플리케이션 컨텍스트와 구성에 중점을 두는 프로세스가 상당히 유사합니다.
1. 설치
pip install celery redis Flask
2. Flask 애플리케이션 설정
celery_app.py
파일을 만들거나 app.py
에 직접 통합합니다. 일반적인 패턴은 Celery 애플리케이션 팩토리를 만드는 것입니다.
# app.py from flask import Flask, jsonify from celery import Celery import time def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery flask_app = Flask(__name__) flask_app.config.update( CELERY_BROKER_URL='redis://localhost:6379/0', CELERY_RESULT_BACKEND='redis://localhost:6379/0', CELERY_ACCEPT_CONTENT = ['json'], CELERY_TASK_SERIALIZER = 'json', CELERY_RESULT_SERIALIZER = 'json', CELERY_TIMEZONE = 'Asia/Shanghai' ) celery_app = make_celery(flask_app) # 작업 정의 @celery_app.task def long_running_task(x, y): print(f"Starting long running task with {x} and {y}...") time.sleep(5) result = x + y print(f"Task completed. Result: {result}") return result @celery_app.task(bind=True) def background_download(self, url): print(f"Downloading from {url}...") time.sleep(7) # 다운로드 진행률 시뮬레이션 for i in range(1, 11): self.update_state(state='PROGRESS', meta={'current': i * 10, 'total': 100}) time.sleep(0.5) print(f"Finished downloading from {url}") return {"status": "success", "url": url} # Flask 라우트 @flask_app.route('/') def index(): return "Hello, Flask with Celery!" @flask_app.route('/start_task/<int:num1>/<int:num2>') def start_task(num1, num2): task = long_running_task.delay(num1, num2) return jsonify({"message": "Task started", "task_id": task.id}) @flask_app.route('/download/<path:url>') def start_download(url): task = background_download.delay(url) return jsonify({"message": "Download initiated", "task_id": task.id}) @flask_app.route('/check_task/<task_id>') def check_task(task_id): from celery.result import AsyncResult result = AsyncResult(task_id, app=celery_app) if result.state == 'PENDING': response = {'state': result.state, 'status': 'Pending...'} elif result.state == 'PROGRESS': response = {'state': result.state, 'status': 'Running...', 'data': result.info.get('current', 0)} elif result.state == 'SUCCESS': response = {'state': result.state, 'status': 'Completed', 'result': result.result} else: response = {'state': result.state, 'status': str(result.info)} # 뭔가 잘못되었습니다. return jsonify(response) if __name__ == '__main__': flask_app.run(debug=True)
3. Celery 작업자와 브로커 실행
Redis 시작:
redis-server
일반적으로 app.py
파일을 포함하는 디렉터리에서 Celery 작업자 시작:
celery -A app.celery_app worker -l info
여기서 -A app.celery_app
은 app.py
내의 celery_app
이라는 Celery 앱 인스턴스를 가리킵니다.
애플리케이션 시나리오
Celery는 비동기 처리가 유익한 수많은 시나리오에 적용할 수 있습니다.
- 이메일 발송: 환영 이메일, 뉴스레터, 비밀번호 재설정 링크 또는 알림 발송.
- 이미지/비디오 처리: 썸네일 생성, 크기 조정, 워터마킹, 비디오 인코딩.
- 보고서 생성: 컴파일하는 데 시간이 오래 걸리는 복잡한 PDF, CSV 또는 Excel 보고서 생성.
- 데이터 가져오기/내보내기: 처리를 위한 대형 파일 업로드 처리 또는 대형 데이터 덤프 생성.
- API 통합: 느리거나 속도 제한이 있을 수 있는 타사 API에 대한 요청.
- 예약된 작업: Celery Beat를 사용하여 정기적으로 작업을 실행합니다(예: 일일 데이터 백업, 주간 통계 컴파일).
이러한 작업을 오프로드함으로써 웹 서버는 사용자 요청에 신속하게 응답하는 데 집중할 수 있으며, 이는 훨씬 더 부드럽고 즐거운 사용자 경험으로 이어집니다.
결론
Celery를 Django 또는 Flask 애플리케이션에 통합하는 것은 시간이 많이 소요되는 작업을 비동기적으로 처리하는 강력한 패턴입니다. 이러한 작업을 메인 요청-응답 주기에서 분리함으로써 애플리케이션의 응답성, 확장성 및 전반적인 사용자 경험을 크게 향상시킬 수 있습니다. 설정은 간단하며 애플리케이션 성능 및 안정성 측면에서 상당한 이점을 제공하므로 강력한 웹 개발을 위한 필수 도구입니다. 어려운 작업을 오프로드하는 것은 유연하고 반응성이 뛰어난 사용자 인터페이스를 유지하는 데 중요합니다.