celery 와 Rabbitmq 를 이용해서 비동기 처리를 해보자 - 1
🐇 개괄
구현해야 하는 기능은 유저들의 리뷰 개수에 따라서 유저의 프로필에 메달을 띄우는 것이었다. 내가 생각한 로직은 각 유저의 리뷰 개수를 계산하고 그에 따라 금메달, 은메달, 동메달의 커트라인에 해당하는 리뷰 개수를 구한 뒤에 유저의 프로필을 반환할 때 해당 커트라인을 참고해서 계산해 반환하는 것이었다. 이는 매번 변하기 때문에 유저 프로필을 반환할 때 계산하는 것이 더 낫다고 판단했다. (현시점으로는 그렇다.) 이러한 사고 과정을 통해, 그렇다면 사용자가 적은 시간대에 유저마다의 리뷰 개수를 세는 프로세스가 필요하다고 생각했다. 따라서 비동기 처리를 고려하게 되었다.
🐇 해당 게시물의 목표
1. celery 를 붙이고 정상적으로 구동되는지 확인하기
2. 스케줄러가 정상적으로 동작하는지 확인하기
=> 모두 의미 없는 테스트 코드로 진행할 예정
🥬 celery 는 무엇일까? 🥬
celery 를 이용하여 비동기 처리 태스크로 비동기로 처리할 일을 묶어 정의하고, 메시지 브로커 (해당 게시글에서는 Rabbitmq를 이용했지만 redis 도 이용 가능함) 에서 작업을 가져와 처리할 수 있다. 즉, 태스크를 브로커를 통해 전달하고 이를 worker 가 처리하는 구조이다. 태스크 정의와 worker 의 역할을 celery 가 한다.
🐇 celery 붙이기
pip install celery
pip freeze > requirements.txt
우선, celery 를 붙이기 위해서 패키지를 설치한다. requirements 도 함께 업데이트해줬다.
webapp/config/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery('config')
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request!r}")
celery 를 동작시키고 테스트하는 것이 먼저이기 때문에 임시로 작성한 코드임을 유의하길 바란다.
celery 와 관련한 세팅을 해준다. namespace="CELERY" 는 해당 prefix 로 시작하는 설정은 모두 celery 와 관련된 것임을 명시하는 것이다. autodiscover_tasks 는 말 그대로 자동으로 tasks 를 전부 찾아준다.
config/settings.py
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
settings.py 에는 위와 같이 설정해줬다. 앞서 설명한대로 CELERY 라는 prefix 를 붙여 명시해주어야 celery 가 이를 찾을 수 있다.
review/tasks/test.py
from config.celery import app
@app.task
def testMethod():
print("test")
나의 경우에는 review 라는 app 에서 task 를 실행할 예정이었기 때문에 위와 같이 테스트할 태스크를 작성해주었다. 또한, celery 의 경우에는 각 app 아래 디렉토리에 tasks.py 를 정의한다길래 나의 경우에는 폴더로 묶었기 때문에 __init__ 에 따로 아래와 같이 추가로 작성해주었다.
review/tasks/__init__.py
from .test import testMethod
celery -A config worker -l INFO
이후 개발 디버깅 용으로 실행을 해보기 위해서는 터미널에 위와 같이 작성해준다. config 가 아니라고 하더라도 app 의 이름을 명시해주면 된다. (celery.py 가 있는 앱을 실행해주면 된다)
실행하고 나면, 아직 브로커가 없기에 위와 같은 오류가 뜰 것이다.
그러나, 위와 같이 tasks 목록에 내가 정의한 태스크가 뜬다면 성공한 것이다.
🐇 Rabbitmq 란? 🐇
celery 는 큐에 직접 접근하지 않고 메시지 브로커를 통해서만 접근하게 되는데, 메시지 브로커 역할을 하는 Rabbitmq 는 메시지를 많은 사용자에게 전달하거나 요청을 다른 API 에게 위임하고 빠른 응답을 하는 경우에 사용한다.
위의 사진에서 producer 와 consumer 의 역할을 celery 가 한다고 보면 된다.
여기서 exchange 는 어떤 queue 에 전송할지 결정하는 객체이고 binding 은 메시지를 라우팅 할 규칙을 정하는 객체이다.
🐇 Rabbitmq 붙이기
brew install rabbitmq
맥북의 경우, 터미널에 위와 같이 입력하여 설치를 하면 된다.
sudo rabbitmq-server -detached
이후 백그라운드로 rabbitmq-server 를 구동시킨다.
http://localhost:15672/ 로 접속하면 위와 같은 화면이 뜰 것이다. 이는 rabbitMq 에서 제공하는 gui 를 이용하는 것이다. 초기에는 guest/guest 로 로그인이 가능하다.
🐇 Rabbitmq 태스크 calling 하기
새로운 터미널을 열어 이전에 정의해뒀던 태스크를 콜링할 수 있다.
나의 경우에는 review 라는 앱에 tasks 에 testMethod 라는 태스크를 import 하고, delay 를 이용해 이를 콜링한 것이다. () 안에는 파라미터가 필요한 경우에 넣으면 된다. 이에 대한 결과는 해시 형태로 값이 출력된다.
이전에 백그라운드에서 rabbitmq 를 구동시킨 터미널로 돌아와보면, 같은 해시값의 태스크가 succeeded 된 것을 확인할 수 있다. 이후에 의미 없는 테스트 코드가 아닌 실제 코드를 구동할 때에도 위와 같이 calling 하면 될 것이다.
🐇 주기적으로 태스크를 부르기 (스케줄러 붙이기)
내가 원하는 최종적인 기능은, 매일 자정마다 태스크를 수행하는 것이었다. 따라서, 스케줄링에 대한 별도의 처리도 추가적으로 필요했다.
config/celery.py
import os
from celery import Celery
from celery.schedules import crontab
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery('config',
backend='rpc://',
inlcude=['review.tasks'])
app.config_from_object("django.conf:settings", namespace="CELERY")
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
app.conf.beat_schedule = {
'add-every-3-hours-contrab': {
'task': 'tasks.testMethod',
'schedule': crontab(minute='*/60'), #매초마다 실행하기
},
}
if __name__ == '__main__':
app.start()
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request!r}")
최종적으로 위와 같은 celery 설정 파일이 작성되었다.
app.conf.beat_schedule = {
'add-every-3-hours-contrab': {
'task': 'tasks.testMethod',
'schedule': crontab(minute='*/60'), #매초마다 실행하기
},
}
여기서 위의 부분은 스케줄링을 위한 설정이다. 3-hours 는 임시로 설정해둔 스케줄의 이름이기 때문에 무시해도 좋다.
task 는 어떤 태스크에 해당하는 스케줄인지 명시한 것이다. schedule 은 crontab 표현식을 이용해 얼마나 자주 실행할지 명시한 것이다. 나의 경우에는 테스트를 위한 것이기에 빠른 확인을 위해 매초 실행될 수 있도록 했다.
위와 같이 설정해두고 다시 worker 를 구동시켜 줬더니, 아래와 같이 gui 에 변화가 나타났다.
🐇 다음의 목표
- 실제 프로덕션 코드로 테스트 코드 수정하고 구동하기
- 배포 과정에서 어떻게 이루어져야 하는지 학습하기
🐇 참고자료
https://blog.dudaji.com/general/2020/05/25/rabbitmq.html
RabbitMQ란?
서비스를 개발하면서 API-Gateway에 토큰 캐시를 구축하기 위해 RabbitMQ를 공부했던 내용을 정리해 보았습니다.
blog.dudaji.com
https://lucky516.tistory.com/3
Django에서 Celery 이용하기 두번째
docs.celeryproject.org/en/stable/getting-started/next-steps.html#next-steps Next Steps — Celery 5.0.5 documentation This document describes the current stable version of Celery (5.0). For development docs, go here. Next Steps The First Steps with Celery
lucky516.tistory.com
https://beomi.github.io/2017/03/19/Introduction-to-Celery/
[번역]셀러리 입문하기 - Beomi's Tech blog
2017-03-19 [번역]셀러리 입문하기 글 작성 시점 최신 버전 v4.0.2의 문서입니다. 원문: http://docs.celeryproject.org/en/latest/getting-started/introduction.html 셀러리 입문하기 태스크 큐란 무엇인가? (What’s a Task Qu
beomi.github.io
(Django) Celery로 Django 스케줄러 구현하기 - [테스트]
Celery와 Celery Beat를 사용해 스케줄러를 구현하고, 이를 배포하는 과정까지 진행해보도록 하겠습니다.
jangwon.io