diff --git a/src/newsreader/accounts/migrations/0008_auto_20200422_2243.py b/src/newsreader/accounts/migrations/0008_auto_20200422_2243.py new file mode 100644 index 0000000..657245a --- /dev/null +++ b/src/newsreader/accounts/migrations/0008_auto_20200422_2243.py @@ -0,0 +1,21 @@ +# Generated by Django 3.0.5 on 2020-04-22 20:43 + +from django.db import migrations + + +def update_task_name(apps, schema_editor): + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + + old_task = "newsreader.news.collection.tasks.collect" + new_task = "newsreader.news.collection.tasks.FeedTask" + + for task in PeriodicTask.objects.filter(task=old_task): + task.task = new_task + task.save() + + +class Migration(migrations.Migration): + + dependencies = [("accounts", "0007_auto_20191116_1255")] + + operations = [migrations.RunPython(update_task_name)] diff --git a/src/newsreader/accounts/models.py b/src/newsreader/accounts/models.py index 423b97b..0b2799f 100644 --- a/src/newsreader/accounts/models.py +++ b/src/newsreader/accounts/models.py @@ -69,7 +69,7 @@ class User(AbstractUser): enabled=True, interval=task_interval, name=f"{self.email}-collection-task", - task="newsreader.news.collection.tasks.collect", + task="newsreader.news.collection.tasks.FeedTask", args=json.dumps([self.pk]), ) diff --git a/src/newsreader/fixtures/default-fixture.json b/src/newsreader/fixtures/default-fixture.json index e0de28f..7b7ecdf 100644 --- a/src/newsreader/fixtures/default-fixture.json +++ b/src/newsreader/fixtures/default-fixture.json @@ -4,7 +4,7 @@ "pk": 10, "fields": { "name": "sonny@bakker.nl-collection-task", - "task": "newsreader.news.collection.tasks.collect", + "task": "newsreader.news.collection.tasks.FeedTask", "interval": 4, "crontab": null, "solar": null, @@ -31,7 +31,7 @@ "pk": 26, "fields": { "name": "sonnyba871@gmail.com-collection-task", - "task": "newsreader.news.collection.tasks.collect", + "task": "newsreader.news.collection.tasks.FeedTask", "interval": 4, "crontab": null, "solar": null, diff --git a/src/newsreader/news/collection/tasks.py b/src/newsreader/news/collection/tasks.py index b2dbf58..6888cba 100644 --- a/src/newsreader/news/collection/tasks.py +++ b/src/newsreader/news/collection/tasks.py @@ -1,21 +1,42 @@ from django.core.exceptions import ObjectDoesNotExist +from celery.exceptions import Reject +from celery.utils.log import get_task_logger + from newsreader.accounts.models import User from newsreader.celery import app from newsreader.news.collection.feed import FeedCollector from newsreader.utils.celery import MemCacheLock -@app.task(bind=True) -def collect(self, user_pk): - try: - user = User.objects.get(pk=user_pk) - except ObjectDoesNotExist: - return +logger = get_task_logger(__name__) - with MemCacheLock(f"{user.email}-task", self.app.oid) as acquired: - if acquired: - rules = user.rules.all() - collector = FeedCollector() - collector.collect(rules=rules) +class FeedTask(app.Task): + name = "newsreader.news.collection.tasks.FeedTask" + ignore_result = True + + def run(self, user_pk): + try: + user = User.objects.get(pk=user_pk) + except ObjectDoesNotExist: + message = f"User {user_pk} does not exist" + logger.exception(message) + + raise Reject(reason=message, requeue=False) + + with MemCacheLock(f"{user.email}-task", self.app.oid) as acquired: + if acquired: + logger.info(f"Running task for user {user_pk}") + + rules = user.rules.all() + + collector = FeedCollector() + collector.collect(rules=rules) + else: + logger.info(f"Cancelling task due to existing lock for user {user_pk}") + + raise Reject(reason="Task already running", requeue=False) + + +FeedTask = app.register_task(FeedTask())