你所需要的,不仅仅是一个好用的代理。
Celery是一个专注于实时处理和任务调度分布式任务队列。通过RabbitMQ、Redis、MongoDB等消息代理,把任务发给执行任务的Worker以达到异步执行。
我写的那本《Python Web开发实战》的样章就是 《使用Celery》 ,建议看下面内容之前先读一下这篇文章。
接下来的内容假设你已经对Celery有了一定的了解。对 wechat-admin 项目来说,使用Celery要做如下事情:
首先我们创建一个目录(wechat),专门用来存放celery任务相关的内容,目录下文件列表如下:
❯ tree wechat
├── __init__.py
├── celery.py # 名为celery.py是主程序,启动的时候可以直接`celery -A wechat worker -l info -B`
├── celeryconfig.py # 配置文件
└── tasks.py # 存放任务逻辑
0 directories, 4 files
我们挨个看看
看文件名字就知道了,这个是放配置的文件:
❯ cat celeryconfig.py
from config import REDIS_URL
BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
指定消息代理和执行结果都使用Redis,任务(消息)使用msgpack序列化,结果使用json序列化,任务结果保存时间24小时等
主程序有点Flask的app.py的感觉:
❯ cat celery.py
from celery import Celery
from celery.signals import worker_ready
from models.redis import db, LISTENER_TASK_KEY
app = Celery('wechat', include=['wechat.tasks'])
app.config_from_object('wechat.celeryconfig')
@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection() as conn: # noqa
task_id = sender.app.send_task('wechat.tasks.listener')
db.set(LISTENER_TASK_KEY, task_id)
if __name__ == '__main__':
app.start()
这段代码有2点需要解释一下:
tasks.py这个文件包含了很多业务逻辑,为了演示我省略部分代码。不过代码还是很长,所以我直接在对应行数的代码上加注释来解释了:
❯ cat tasks.py
from datetime import timedelta
from celery.task import periodic_task
from celery.task.control import revoke
from wechat.celery import app
from wxpy.exceptions import ResponseError
from itchat.signals import logged_out
def restart_listener(sender, **kw):
# 重启tasks.listener这个任务
task_id = r.get(LISTENER_TASK_KEY)
if task_id:
revoke(str(task_id, 'utf-8'))
task_id = app.send_task('wechat.tasks.listener')
r.set(LISTENER_TASK_KEY, task_id)
logged_out.connect(restart_listener)
from wxpy.signals import stopped
from libs.wx import get_bot
from views.api import json_api
from models.redis import db as r, LISTENER_TASK_KEY
from app import app as sse_api
stopped.connect(restart_listener)
bot = get_bot()
def _retrieve_data(update=False):
_update_contact(bot, update)
_update_group(bot, update)
_update_mp(bot, update)
@app.task
def listener():
# 不用全局的bot,因为在import listener的过程中会
# 注册各种函数(处理自动加群、接受消息、踢人以及各种插件功能)
from libs.listener import bot
with json_api.app_context():
bot.join()
@app.task
def retrieve_data():
# 使用Flask应用中的方法都需要放在对应的上下文内
with json_api.app_context():
_retrieve_data(True)
@app.task
def update_contact(update=False):
# 都是业务逻辑,就省略了,这样分开写是可以单独的更新一种类型的数据
...
@app.task
def update_group(update=False):
...
@app.task
def update_mp(update=False):
...
# periodic_task就是定时任务,表示周期性的执行某任务
@periodic_task(run_every=timedelta(seconds=60), time_limit=5)
def send_notify():
# 发送新消息数量提醒
...
上一篇我说SSE的时候忘说了一点,就是更新消息提醒。在Web页面标记已读的时候,会POST到/readall接口,后端清空新通知数量。这是由于SSE的单向特点造成的,如果使用socketio(WebSocket)的话可以直接emit到后端,就不用HTTP这种方案了