假设我有一个代表一项任务的模型:
class Task(models.Model):
on_status = models.BooleanField()
def run(self):
if self.on_status:
# do stuff
我使用 Celery Beat 运行此任务,并且在 Gunicorn 上运行仪表板,两者都使用相同的数据库。
app = Celery("app")
app.conf.beat_schedule = {
"run_task": {
"task": "run_tasks",
"schedule": 5.0,
"options": {
"expires": 6.0,
},
},
}
tasks = Task.objects.all()
@app.task
def run_tasks():
for task in tasks:
task.run()
我可以on_status
从仪表板进行更改,但随后我需要更新self.on_status
Celery Beat 进程内的实例。是否有命令可以从数据库更新属性值,或者是否有其他方法?
无论您如何使用 celery-beat,核心问题似乎都是如何刷新 django 模型对象。这是通过以下方式完成的
refresh_from_db
:https://docs.djangoproject.com/en/5.1/ref/models/instances/#refreshing-objects-from-database