일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
- DjangoRestFramework
- Continuous Delivery
- CI
- 코루틴
- nestedfunction
- 도커
- dry-yasg
- DjangoCache
- django
- F객체
- Git
- database
- EC2
- DRF
- Coroutine
- aws
- QuerySet
- docker
- Continuous Deployment
- to_attr
- CD
- 백준
- Python
- annotate
- Prefetch_related
- racecondition
- apitestcase
- aggregate
- testcase
- Transaction
- Today
- Total
BackEnd King KY
TIL29 - Celery - Distributed Task Queue 본문

Celery - Distributed Task Queue
저는 현재 회사에서 celery를 이용해서 비동기 작업큐들을 돌리고 있는데요.
그냥 Celery설정 후 돌리는 것보다 이게 어떤건지 그 내부를 알고 싶어서 작성하게 되었습니다.
개인적으로 실행을 해보며 작업 돌아가는 건 본 상태라 개념적으로 접근하려고 합니다.
모든 출처는 Celery 공식문서입니다.
왜 Celery를 사용하나요?
Celery는 방대한 양의 메시지를 처리하는 간단하고 유연하며 안정적인 분산 시스템이며 이러한 시스템을 유지 관리하는 데 필요한 도구를 운영에 제공합니다. 라고 공식문서에 나와있습니다.
말 그대로 방대한 양의 작업을 분산해서 처리할 수 있다고 이해하면 됩니다.
지금 저의 상황을 예로 설명하자면, 저는 전국 초/중/고 3주치 시간표를 수집 및 저장하는 작업하고 있습니다.
초등학교 약 1천만개, 중학교 약 500만개, 고등학교 약 500만개로 매 주 2000만개의 데이터를 수집 및 저장하는 작업을 하고 있습니다.
그 외 다른 정보에 대해서도 수집하지만 양이 가장 많은 시간표로 예를 들었습니다.
일반적으로 각 학교별 수집/저장하는 함수를 만드려면
def get_time_tables():
get_elementary_school_time_tables()
get_middle_school_time_tables()
get_high_school_time_tables()
return True
이런식으로 함수가 순차적으로 실행되는데, 대용량데이터를 이렇게 Queue를 두어서 돌리면 시간이 매우 많이 필요합니다.
중간에 한 번 에러나거나 그러면 그 시간만큼 다시 돌려야 하는 불상사가 생기게 됩니다.
또한 위에 읽어보시면 제가 "매 주" 2000만개의 데이터를 수집 및 저장한다고 했는데, 제가 매주 회사에 출근해서 함수를 실행시켜주는 것일까요?
아닙니다. 제가 매 주 해당 작업이 돌아가도록 설정했기 때문인데요. 스케줄링이 가능하다는 것은 Celery의 가장 큰 장점 중 하나입니다.
즉, 이러한 문제를 해결하기 위해 사용하는 것이 Celery입니다.
Celery with Django
Celery에서는 Django에서 어떻게 사용하면 되는지 공식문서에 친절하게 나와있습니다.
우선, Django 프로젝트를 만들게되면 아래와 같은 구조로 디렉토리가 생성됩니다.
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
그리고 나서, Celery 설정을 해줘야 할 모듈을 만들어줘야 하는데, 이에 대해서도 Celery가 권장해주는 부분이 있습니다.
then the recommended way is to create a new proj/proj/celery.py module that defines the Celery instance:
즉, settings.py의 레벨과 동일하게 모듈을 생성하라는 의미입니다. 그에 맞게 파일을 만들고 안에서 설정해보겠습니다.
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
Celery를 import한 후, app 설정을 해주면 됩니다.
그리고 config_from_object
는 Celery 관련 설정에 대해 읽어오기 위해 사용하는 메소드입니다.
CELERY로 시작하는 설정에 대해 읽어온 후 적용하겠다는 의미입니다.
autodiscover_tasks
는 모든 앱 내에서 설정된 tasks.py 모듈을 찾아내는 메소드입니다.
Searches a list of packages for a "tasks.py" module
파라미터 없이 설정하면 Django에 위임합니다. 특정앱을 지정할꺼면 ['app_name1', 'app_name2'] 이렇게 리스트형태로 설정해주면 됩니다.
그리고 설정해줬다면 Celery를 설정할 함수에 데코레이터를 붙여주면 됩니다
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
이렇게 task라는 메소드를 가져온 뒤, kwargs형태로 bind=True를 설정해주면 됩니다.
bind=True는 3.1버전에 새로 생긴 옵션이며, 작업 인스턴스를 쉽게 참조한다고 합니다.
Celery Options
3.1버전에 추가된 bind=True에 대해서, bound tasks라는 내용이 공식문서에 작성되어있습니다.
bind=True가 설정된 메소드의 경우, self를 항상 설정해줘야 합니다.
즉, 해당 메소드는 Celery 작업 인스턴스라는 걸 선언한다는 의미로 생각하면 될 것 같습니다.
그리고 작업 인스턴스가 여러 이유로 중간에 끊길 수도 있습니다.
현재 제가 작업하는 것에 대해 예를 들면, 시간표 데이터는 나이스 오픈 API를 통해 수집하는데 나이스에 요청을 보내는 과정에서 작업이 끊길 수 있습니다.
그럴 경우, 작업이 다시 실행되도록 설정해야 하며 그 때 retry 옵션을 설정해야 합니다.
Outro
이정도가 제가 현재 작업하고 있는 사항에 대한 내용입니다.
Celery 내부 기능에 대해 더 사용할 일이 있으면 그 때 새롭게 포스팅 하겠습니다.
'Python' 카테고리의 다른 글
TIL35 - 코루틴 사용하기(2/6) : 코루틴바깥으로 값 전달하기 (0) | 2022.10.09 |
---|---|
TIL35 - 코루틴 사용하기(1/6) : 코루틴에 값 보내기 (0) | 2022.10.09 |
TIL30 - CSV파일 만들기(Using Python) (0) | 2022.07.10 |
TIL23 - heapq (0) | 2022.04.07 |
TIL7 - Nested Function & decorator (0) | 2022.02.20 |