This commit is contained in:
sonny 2020-04-22 23:15:36 +02:00
parent 85d0d6a721
commit d34cc5d507
4 changed files with 56 additions and 14 deletions

View file

@ -0,0 +1,21 @@
# Generated by Django 3.0.5 on 2020-04-22 20:43
from django.db import migrations
def update_task_name(apps, schema_editor):
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
old_task = "newsreader.news.collection.tasks.collect"
new_task = "newsreader.news.collection.tasks.FeedTask"
for task in PeriodicTask.objects.filter(task=old_task):
task.task = new_task
task.save()
class Migration(migrations.Migration):
dependencies = [("accounts", "0007_auto_20191116_1255")]
operations = [migrations.RunPython(update_task_name)]

View file

@ -69,7 +69,7 @@ class User(AbstractUser):
enabled=True, enabled=True,
interval=task_interval, interval=task_interval,
name=f"{self.email}-collection-task", name=f"{self.email}-collection-task",
task="newsreader.news.collection.tasks.collect", task="newsreader.news.collection.tasks.FeedTask",
args=json.dumps([self.pk]), args=json.dumps([self.pk]),
) )

View file

@ -4,7 +4,7 @@
"pk": 10, "pk": 10,
"fields": { "fields": {
"name": "sonny@bakker.nl-collection-task", "name": "sonny@bakker.nl-collection-task",
"task": "newsreader.news.collection.tasks.collect", "task": "newsreader.news.collection.tasks.FeedTask",
"interval": 4, "interval": 4,
"crontab": null, "crontab": null,
"solar": null, "solar": null,
@ -31,7 +31,7 @@
"pk": 26, "pk": 26,
"fields": { "fields": {
"name": "sonnyba871@gmail.com-collection-task", "name": "sonnyba871@gmail.com-collection-task",
"task": "newsreader.news.collection.tasks.collect", "task": "newsreader.news.collection.tasks.FeedTask",
"interval": 4, "interval": 4,
"crontab": null, "crontab": null,
"solar": null, "solar": null,

View file

@ -1,21 +1,42 @@
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from celery.exceptions import Reject
from celery.utils.log import get_task_logger
from newsreader.accounts.models import User from newsreader.accounts.models import User
from newsreader.celery import app from newsreader.celery import app
from newsreader.news.collection.feed import FeedCollector from newsreader.news.collection.feed import FeedCollector
from newsreader.utils.celery import MemCacheLock from newsreader.utils.celery import MemCacheLock
@app.task(bind=True) logger = get_task_logger(__name__)
def collect(self, user_pk):
try:
user = User.objects.get(pk=user_pk)
except ObjectDoesNotExist:
return
with MemCacheLock(f"{user.email}-task", self.app.oid) as acquired:
if acquired:
rules = user.rules.all()
collector = FeedCollector() class FeedTask(app.Task):
collector.collect(rules=rules) name = "newsreader.news.collection.tasks.FeedTask"
ignore_result = True
def run(self, user_pk):
try:
user = User.objects.get(pk=user_pk)
except ObjectDoesNotExist:
message = f"User {user_pk} does not exist"
logger.exception(message)
raise Reject(reason=message, requeue=False)
with MemCacheLock(f"{user.email}-task", self.app.oid) as acquired:
if acquired:
logger.info(f"Running task for user {user_pk}")
rules = user.rules.all()
collector = FeedCollector()
collector.collect(rules=rules)
else:
logger.info(f"Cancelling task due to existing lock for user {user_pk}")
raise Reject(reason="Task already running", requeue=False)
FeedTask = app.register_task(FeedTask())