0%

使用celery之web应用

起源

  • web 应用采用同步方式,响应时间过长。
  • 现有web项目基于flask, celery异步任务功能健全, 方便, 更加适合。

处理流程

demo

local.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# -*- coding:utf-8 -*-

from flask_script import Manager
from flask import request
from make_celery import make_celery
from config import config

app = Flask(__name__)
app.config.from_object(config['local'])
config['local'].init_app(app)
app.config.update(RESTFUL_JSON=dict(ensure_ascii=False))

from celery_task import celery_test

# 初始化celery
celery = make_celery(app)
manager = Manager(app=app)

@app.route('/', methods=['GET', 'POST'])
def index():
test = request.args.get('test','')
msg = {'test': test}
result = celery_test.delay(msg)
return ','.join(['status:{}'.format(result.status),
'task_id:{}'.format(result.task_id)
])


if __name__ == '__main__':
manager.run()

config.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding:utf-8 -*-


class Config:

@staticmethod
def init_app(app):
pass


class LocalConfig(Config):
CONFIG_NAME = 'local'

# CELERY
CELERY_BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = (
"celery_task",
)

config = {
'local': LocalConfig,
}

make_celery.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# -*- encoding: utf-8 -*-
from celery import Celery

celery = None


def make_celery(app):
global celery
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)

celery.Task = ContextTask
return celery

celery_task.py

1
2
3
4
5
6
7
8
from make_celery import celery


@celery.task()
def celery_test(msg):
# deal msg
print(msg)
return 'balabala'

启动

python local.py runserver
celery worker -A local.celery -l info
访问: http://127.0.0.1:5000/

思考和深入

* 学习基础用法.
* 学习高级用法.
* 学习相关依赖库.