0.2.3 #99
4 changed files with 56 additions and 14 deletions
|
|
@ -0,0 +1,21 @@
|
||||||
|
# Generated by Django 3.0.5 on 2020-04-22 20:43
|
||||||
|
|
||||||
|
from django.db import migrations
|
||||||
|
|
||||||
|
from django_celery_beat.models import PeriodicTask
|
||||||
|
|
||||||
|
|
||||||
|
def update_task_name(apps, schema_editor):
|
||||||
|
old_task = "newsreader.news.collection.tasks.collect"
|
||||||
|
new_task = "newsreader.news.collection.tasks.FeedTask"
|
||||||
|
|
||||||
|
for task in PeriodicTask.objects.filter(task=old_task):
|
||||||
|
task.task = new_task
|
||||||
|
task.save()
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [("accounts", "0007_auto_20191116_1255")]
|
||||||
|
|
||||||
|
operations = [migrations.RunPython(update_task_name)]
|
||||||
|
|
@ -69,7 +69,7 @@ class User(AbstractUser):
|
||||||
enabled=True,
|
enabled=True,
|
||||||
interval=task_interval,
|
interval=task_interval,
|
||||||
name=f"{self.email}-collection-task",
|
name=f"{self.email}-collection-task",
|
||||||
task="newsreader.news.collection.tasks.collect",
|
task="newsreader.news.collection.tasks.FeedTask",
|
||||||
args=json.dumps([self.pk]),
|
args=json.dumps([self.pk]),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
"pk": 10,
|
"pk": 10,
|
||||||
"fields": {
|
"fields": {
|
||||||
"name": "sonny@bakker.nl-collection-task",
|
"name": "sonny@bakker.nl-collection-task",
|
||||||
"task": "newsreader.news.collection.tasks.collect",
|
"task": "newsreader.news.collection.tasks.FeedTask",
|
||||||
"interval": 4,
|
"interval": 4,
|
||||||
"crontab": null,
|
"crontab": null,
|
||||||
"solar": null,
|
"solar": null,
|
||||||
|
|
@ -31,7 +31,7 @@
|
||||||
"pk": 26,
|
"pk": 26,
|
||||||
"fields": {
|
"fields": {
|
||||||
"name": "sonnyba871@gmail.com-collection-task",
|
"name": "sonnyba871@gmail.com-collection-task",
|
||||||
"task": "newsreader.news.collection.tasks.collect",
|
"task": "newsreader.news.collection.tasks.FeedTask",
|
||||||
"interval": 4,
|
"interval": 4,
|
||||||
"crontab": null,
|
"crontab": null,
|
||||||
"solar": null,
|
"solar": null,
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,42 @@
|
||||||
from django.core.exceptions import ObjectDoesNotExist
|
from django.core.exceptions import ObjectDoesNotExist
|
||||||
|
|
||||||
|
from celery.exceptions import Reject
|
||||||
|
from celery.utils.log import get_task_logger
|
||||||
|
|
||||||
from newsreader.accounts.models import User
|
from newsreader.accounts.models import User
|
||||||
from newsreader.celery import app
|
from newsreader.celery import app
|
||||||
from newsreader.news.collection.feed import FeedCollector
|
from newsreader.news.collection.feed import FeedCollector
|
||||||
from newsreader.utils.celery import MemCacheLock
|
from newsreader.utils.celery import MemCacheLock
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True)
|
logger = get_task_logger(__name__)
|
||||||
def collect(self, user_pk):
|
|
||||||
|
|
||||||
|
class FeedTask(app.Task):
|
||||||
|
name = "newsreader.news.collection.tasks.FeedTask"
|
||||||
|
ignore_result = True
|
||||||
|
|
||||||
|
def run(self, user_pk):
|
||||||
try:
|
try:
|
||||||
user = User.objects.get(pk=user_pk)
|
user = User.objects.get(pk=user_pk)
|
||||||
except ObjectDoesNotExist:
|
except ObjectDoesNotExist:
|
||||||
return
|
message = f"User {user_pk} does not exist"
|
||||||
|
logger.exception(message)
|
||||||
|
|
||||||
|
raise Reject(reason=message, requeue=False)
|
||||||
|
|
||||||
with MemCacheLock(f"{user.email}-task", self.app.oid) as acquired:
|
with MemCacheLock(f"{user.email}-task", self.app.oid) as acquired:
|
||||||
if acquired:
|
if acquired:
|
||||||
|
logger.info(f"Running task for user {user_pk}")
|
||||||
|
|
||||||
rules = user.rules.all()
|
rules = user.rules.all()
|
||||||
|
|
||||||
collector = FeedCollector()
|
collector = FeedCollector()
|
||||||
collector.collect(rules=rules)
|
collector.collect(rules=rules)
|
||||||
|
else:
|
||||||
|
logger.info(f"Cancelling task due to existing lock for user {user_pk}")
|
||||||
|
|
||||||
|
raise Reject(reason="Task already running", requeue=False)
|
||||||
|
|
||||||
|
|
||||||
|
FeedTask = app.register_task(FeedTask())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue