昔ちょっと Celery を試した事があったが、ジョブキューなのが必要になったので再入門してみた。
WAF には Flask を使っている。
Celery には Flask の Extension があるが、Celery3.0 になってから非推奨になっており、Celery 本体の API を使う事が推奨されている。
Flask-Celery には Flask-Script と連動できたが、自分で管理する必要がある。
extensions.py
# -*- coding: utf-8 -*- import project.configs.celeryconfig as config from celery import Celery celery = Celery('SampleTask') celery.config_from_object(config) __all__ = ['celery']
tasks/sample.py
# -*- coding: utf-8 -*- from celery.contrib.methods import task_method from project.extensions import celery class SampleTask(object): def __init__(self): pass @celery.task(filter=task_method, name='SampleTask.run') def run(self, task_data): """ Interface for Celery task. :param task_data: """ return 'task {0}'.format(task_data)
views.py
# -*- coding: utf-8 -*- from flask import Blueprint, current_app, request, jsonify from celery.task.control import revoke from project.tasks.sample import SampleTask app = Blueprint('task', __name__, url_prefix='/task') @app.route('/', strict_slashes=False, methods=['POST']) def compile(): logger = current_app.logger form = request.form['data'] t = SampleTask() result = t.run.delay(form) logger.info('Add task({0})'.format(result.id)) return jsonify(response={'taskid': result.id}) @app.route('/<task_id>', strict_slashes=False, methods=['GET']) def compile_result(task_id): logger = current_app.logger t = SampleTask() result = t.run.AsyncResult(task_id) if result.status == 'PENDING': logger.info('task id({0}) not exists'.format(task_id)) return jsonify(response={'result': None}) response = result.get() revoke(task_id, terminate=True) return jsonify(response={'result': response})
ワーカーを起動する。
$ celery -A project.tasks.sample worker -l info -E
シェルで直接起動してるが、プロダクション用には Supervisor で管理しているので、Supervisor の conf ファイルで起動コマンドを記述する。
ワーカーはサンプルとかチュートリアルは大抵関数ベースで書かれてるけど、クラスベースでのやり方を知りたかったので書いてみた。
クラスベースではただの @celrey.task だけではだめで、@celery.task(filter=task_method) としなければ動作しない。
関数ベースのだと簡単に動いたのに、クラスベースではエラーが出て動かなくてハマった。