Celery を使ってみる

昔ちょっと Celery を試した事があったが、ジョブキューなのが必要になったので再入門してみた。
WAF には Flask を使っている。


Celery には Flask の Extension があるが、Celery3.0 になってから非推奨になっており、Celery 本体の API を使う事が推奨されている。
Flask-Celery には Flask-Script と連動できたが、自分で管理する必要がある。


extensions.py

# -*- coding: utf-8 -*- 
import project.configs.celeryconfig as config
from celery import Celery
 

celery = Celery('SampleTask')
celery.config_from_object(config)

 
__all__ = ['celery']

tasks/sample.py

# -*- coding: utf-8 -*-
from celery.contrib.methods import task_method
from project.extensions import celery
 
 
class SampleTask(object):
    def __init__(self):
        pass 
    
    @celery.task(filter=task_method, name='SampleTask.run')
    def run(self, task_data):
        """
        Interface for Celery task.
    
        :param task_data:
        """
        return 'task {0}'.format(task_data)

views.py

# -*- coding: utf-8 -*-
 
from flask import Blueprint, current_app, request, jsonify
from celery.task.control import revoke
from project.tasks.sample import SampleTask
 
 
app = Blueprint('task', __name__, url_prefix='/task')
 
@app.route('/', strict_slashes=False, methods=['POST'])
def compile():
    logger = current_app.logger
 
    form = request.form['data']
 
    t = SampleTask()
 
    result = t.run.delay(form)
    logger.info('Add task({0})'.format(result.id))
 
    return jsonify(response={'taskid': result.id})
 
 
@app.route('/<task_id>', strict_slashes=False, methods=['GET'])
def compile_result(task_id):
    logger = current_app.logger
 
    t = SampleTask()
 
    result = t.run.AsyncResult(task_id)
    if result.status == 'PENDING':
        logger.info('task id({0}) not exists'.format(task_id))
        return jsonify(response={'result': None})
 
    response = result.get()
    revoke(task_id, terminate=True)
 
    return jsonify(response={'result': response})

ワーカーを起動する。

$ celery -A project.tasks.sample worker -l info -E

シェルで直接起動してるが、プロダクション用には Supervisor で管理しているので、Supervisor の conf ファイルで起動コマンドを記述する。


ワーカーはサンプルとかチュートリアルは大抵関数ベースで書かれてるけど、クラスベースでのやり方を知りたかったので書いてみた。
クラスベースではただの @celrey.task だけではだめで、@celery.task(filter=task_method) としなければ動作しない。


関数ベースのだと簡単に動いたのに、クラスベースではエラーが出て動かなくてハマった。