起源
处理流程
demo
local.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30# -*- coding:utf-8 -*-
from flask_script import Manager
from flask import request
from make_celery import make_celery
from config import config
app = Flask(__name__)
app.config.from_object(config['local'])
config['local'].init_app(app)
app.config.update(RESTFUL_JSON=dict(ensure_ascii=False))
from celery_task import celery_test
# 初始化celery
celery = make_celery(app)
manager = Manager(app=app)
def index():
test = request.args.get('test','')
msg = {'test': test}
result = celery_test.delay(msg)
return ','.join(['status:{}'.format(result.status),
'task_id:{}'.format(result.task_id)
])
if __name__ == '__main__':
manager.run()
config.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24# -*- coding:utf-8 -*-
class Config:
def init_app(app):
pass
class LocalConfig(Config):
CONFIG_NAME = 'local'
# CELERY
CELERY_BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = (
"celery_task",
)
config = {
'local': LocalConfig,
}
make_celery.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18# -*- encoding: utf-8 -*-
from celery import Celery
celery = None
def make_celery(app):
global celery
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
celery_task.py1
2
3
4
5
6
7
8from make_celery import celery
def celery_test(msg):
# deal msg
print(msg)
return 'balabala'
启动
python local.py runserver
celery worker -A local.celery -l info
访问: http://127.0.0.1:5000/
思考和深入
* 学习基础用法.
* 学习高级用法.
* 学习相关依赖库.