사이드 프로젝트

celery 와 Rabbitmq 를 이용해서 비동기 처리를 해보자 - 1

바랄 희 2022. 12. 4. 14:47

🐇 개괄

구현해야 하는 기능은 유저들의 리뷰 개수에 따라서 유저의 프로필에 메달을 띄우는 것이었다. 내가 생각한 로직은 각 유저의 리뷰 개수를 계산하고 그에 따라 금메달, 은메달, 동메달의 커트라인에 해당하는 리뷰 개수를 구한 뒤에 유저의 프로필을 반환할 때 해당 커트라인을 참고해서 계산해 반환하는 것이었다. 이는 매번 변하기 때문에 유저 프로필을 반환할 때 계산하는 것이 더 낫다고 판단했다. (현시점으로는 그렇다.) 이러한 사고 과정을 통해, 그렇다면 사용자가 적은 시간대에 유저마다의 리뷰 개수를 세는 프로세스가 필요하다고 생각했다. 따라서 비동기 처리를 고려하게 되었다.

 

🐇 해당 게시물의 목표

1. celery 를 붙이고 정상적으로 구동되는지 확인하기

2. 스케줄러가 정상적으로 동작하는지 확인하기

=> 모두 의미 없는 테스트 코드로 진행할 예정

 

🥬 celery 는 무엇일까? 🥬

celery 를 이용하여 비동기 처리 태스크로 비동기로 처리할 일을 묶어 정의하고, 메시지 브로커 (해당 게시글에서는 Rabbitmq를 이용했지만 redis 도 이용 가능함) 에서 작업을 가져와 처리할 수 있다. 즉, 태스크를 브로커를 통해 전달하고 이를 worker 가 처리하는 구조이다. 태스크 정의와 worker 의 역할을 celery 가 한다.

 

🐇 celery 붙이기

pip install celery
pip freeze > requirements.txt

우선, celery 를 붙이기 위해서 패키지를 설치한다. requirements 도 함께 업데이트해줬다.

 

 

webapp/config/celery.py

import os

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

app = Celery('config')

app.config_from_object("django.conf:settings", namespace="CELERY")

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f"Request: {self.request!r}")

celery 를 동작시키고 테스트하는 것이 먼저이기 때문에 임시로 작성한 코드임을 유의하길 바란다.

celery 와 관련한 세팅을 해준다. namespace="CELERY" 는 해당 prefix 로 시작하는 설정은 모두 celery 와 관련된 것임을 명시하는 것이다. autodiscover_tasks 는 말 그대로 자동으로 tasks 를 전부 찾아준다.

 

config/settings.py

CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

settings.py 에는 위와 같이 설정해줬다. 앞서 설명한대로 CELERY 라는 prefix 를 붙여 명시해주어야 celery 가 이를 찾을 수 있다.

 

review/tasks/test.py

from config.celery import app

@app.task
def testMethod():
    print("test")

나의 경우에는 review 라는 app 에서 task 를 실행할 예정이었기 때문에 위와 같이 테스트할 태스크를 작성해주었다. 또한, celery 의 경우에는 각 app 아래 디렉토리에 tasks.py 를 정의한다길래 나의 경우에는 폴더로 묶었기 때문에 __init__ 에 따로 아래와 같이 추가로 작성해주었다.

 

review/tasks/__init__.py

from .test import testMethod

 

celery -A config worker -l INFO

 

 이후 개발 디버깅 용으로 실행을 해보기 위해서는 터미널에 위와 같이 작성해준다. config 가 아니라고 하더라도 app 의 이름을 명시해주면 된다. (celery.py 가 있는 앱을 실행해주면 된다)

 

실행하고 나면, 아직 브로커가 없기에 위와 같은 오류가 뜰 것이다.

그러나, 위와 같이 tasks 목록에 내가 정의한 태스크가 뜬다면 성공한 것이다.

 

🐇 Rabbitmq 란?  🐇

celery 는 큐에 직접 접근하지 않고 메시지 브로커를 통해서만 접근하게 되는데, 메시지 브로커 역할을 하는 Rabbitmq 는 메시지를 많은 사용자에게 전달하거나 요청을 다른 API 에게 위임하고 빠른 응답을 하는 경우에 사용한다.

위의 사진에서 producer 와 consumer 의 역할을 celery 가 한다고 보면 된다.

여기서 exchange 는 어떤 queue 에 전송할지 결정하는 객체이고 binding 은 메시지를 라우팅 할 규칙을 정하는 객체이다.

 

🐇 Rabbitmq 붙이기

 

brew install rabbitmq

맥북의 경우, 터미널에 위와 같이 입력하여 설치를 하면 된다.

 

sudo rabbitmq-server -detached

이후 백그라운드로 rabbitmq-server 를 구동시킨다. 

 

 

http://localhost:15672/ 로 접속하면 위와 같은 화면이 뜰 것이다. 이는 rabbitMq 에서 제공하는 gui 를 이용하는 것이다. 초기에는 guest/guest 로 로그인이 가능하다.

 

🐇 Rabbitmq 태스크 calling 하기

새로운 터미널을 열어 이전에 정의해뒀던 태스크를 콜링할 수 있다.

나의 경우에는 review 라는 앱에 tasks 에 testMethod 라는 태스크를 import 하고, delay 를 이용해 이를 콜링한 것이다. () 안에는 파라미터가 필요한 경우에 넣으면 된다. 이에 대한 결과는 해시 형태로 값이 출력된다.

이전에 백그라운드에서 rabbitmq 를 구동시킨 터미널로 돌아와보면, 같은 해시값의 태스크가 succeeded 된 것을 확인할 수 있다. 이후에 의미 없는 테스트 코드가 아닌 실제 코드를 구동할 때에도 위와 같이 calling 하면 될 것이다.

🐇 주기적으로 태스크를 부르기 (스케줄러 붙이기)

내가 원하는 최종적인 기능은, 매일 자정마다 태스크를 수행하는 것이었다. 따라서, 스케줄링에 대한 별도의 처리도 추가적으로 필요했다.

 

config/celery.py

import os

from celery import Celery
from celery.schedules import crontab

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

app = Celery('config',
             backend='rpc://',
             inlcude=['review.tasks'])

app.config_from_object("django.conf:settings", namespace="CELERY")

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

app.conf.beat_schedule = {
	'add-every-3-hours-contrab': {
		'task': 'tasks.testMethod',
		'schedule': crontab(minute='*/60'), #매초마다 실행하기
	},
}

if __name__ == '__main__':
    app.start()

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f"Request: {self.request!r}")

최종적으로 위와 같은 celery 설정 파일이 작성되었다. 

 

app.conf.beat_schedule = {
	'add-every-3-hours-contrab': {
		'task': 'tasks.testMethod',
		'schedule': crontab(minute='*/60'), #매초마다 실행하기
	},
}

여기서 위의 부분은 스케줄링을 위한 설정이다. 3-hours 는 임시로 설정해둔 스케줄의 이름이기 때문에 무시해도 좋다.

task 는 어떤 태스크에 해당하는 스케줄인지 명시한 것이다. schedule 은 crontab 표현식을 이용해 얼마나 자주 실행할지 명시한 것이다. 나의 경우에는 테스트를 위한 것이기에 빠른 확인을 위해 매초 실행될 수 있도록 했다.

위와 같이 설정해두고 다시 worker 를 구동시켜 줬더니, 아래와 같이 gui 에 변화가 나타났다.

 

 

🐇 다음의 목표

- 실제 프로덕션 코드로 테스트 코드 수정하고 구동하기

- 배포 과정에서 어떻게 이루어져야 하는지 학습하기

 

 

🐇 참고자료

https://blog.dudaji.com/general/2020/05/25/rabbitmq.html

 

RabbitMQ란?

서비스를 개발하면서 API-Gateway에 토큰 캐시를 구축하기 위해 RabbitMQ를 공부했던 내용을 정리해 보았습니다.

blog.dudaji.com

 

https://lucky516.tistory.com/3

 

Django에서 Celery 이용하기 두번째

docs.celeryproject.org/en/stable/getting-started/next-steps.html#next-steps Next Steps — Celery 5.0.5 documentation This document describes the current stable version of Celery (5.0). For development docs, go here. Next Steps The First Steps with Celery

lucky516.tistory.com

https://beomi.github.io/2017/03/19/Introduction-to-Celery/

 

[번역]셀러리 입문하기 - Beomi's Tech blog

2017-03-19 [번역]셀러리 입문하기 글 작성 시점 최신 버전 v4.0.2의 문서입니다. 원문: http://docs.celeryproject.org/en/latest/getting-started/introduction.html 셀러리 입문하기 태스크 큐란 무엇인가? (What’s a Task Qu

beomi.github.io

https://jangwon.io/django/2018/10/01/(Django)-Celery%EB%A1%9C-Django-%EC%8A%A4%EC%BC%80%EC%A4%84%EB%9F%AC-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0-%ED%85%8C%EC%8A%A4%ED%8A%B8/ 

 

(Django) Celery로 Django 스케줄러 구현하기 - [테스트]

Celery와 Celery Beat를 사용해 스케줄러를 구현하고, 이를 배포하는 과정까지 진행해보도록 하겠습니다.

jangwon.io