Resolve "Celery task deduplication"
This commit is contained in:
parent
7c888c1461
commit
a3a2033e37
4 changed files with 35 additions and 10 deletions
|
|
@ -82,9 +82,9 @@ python linting:
|
|||
- source env/bin/activate
|
||||
- pip install -r requirements/gitlab.txt
|
||||
script:
|
||||
- isort -rc src/ --check-only
|
||||
- black -l 88 --check src/
|
||||
- autoflake --check --remove-all-unused-imports --ignore-init-module-imports --recursive src/
|
||||
- isort src/ --check-only --recursive
|
||||
- black src/ --line-length 88 --check
|
||||
- autoflake src/ --check --recursive --remove-all-unused-imports --ignore-init-module-imports
|
||||
|
||||
deploy:
|
||||
stage: deploy
|
||||
|
|
|
|||
|
|
@ -6,5 +6,5 @@ django-debug-toolbar==2.0
|
|||
django-extensions==2.1.9
|
||||
|
||||
black==19.3b0
|
||||
isort==4.3.20
|
||||
autoflake==1.3
|
||||
isort==4.3.21
|
||||
autoflake==1.3.1
|
||||
|
|
|
|||
|
|
@ -3,15 +3,18 @@ from django.core.exceptions import ObjectDoesNotExist
|
|||
from newsreader.accounts.models import User
|
||||
from newsreader.celery import app
|
||||
from newsreader.news.collection.feed import FeedCollector
|
||||
from newsreader.utils.celery import MemCacheLock
|
||||
|
||||
|
||||
@app.task
|
||||
def collect(user_pk):
|
||||
@app.task(bind=True)
|
||||
def collect(self, user_pk):
|
||||
try:
|
||||
user = User.objects.get(pk=user_pk)
|
||||
except ObjectDoesNotExist:
|
||||
return
|
||||
|
||||
with MemCacheLock(f"{user.email}-task", self.app.oid) as acquired:
|
||||
if acquired:
|
||||
rules = user.rules.all()
|
||||
|
||||
collector = FeedCollector()
|
||||
|
|
|
|||
22
src/newsreader/utils/celery.py
Normal file
22
src/newsreader/utils/celery.py
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
from django.core.cache import cache
|
||||
|
||||
from celery.five import monotonic
|
||||
|
||||
|
||||
LOCK_EXPIRE = 60 * 10 # 10 minutes
|
||||
|
||||
|
||||
class MemCacheLock:
|
||||
def __init__(self, lock_id, oid):
|
||||
self.lock_id = lock_id
|
||||
self.oid = oid
|
||||
|
||||
self.timeout_at = monotonic() + LOCK_EXPIRE - 3
|
||||
|
||||
def __enter__(self):
|
||||
self.status = cache.add(self.lock_id, self.oid, LOCK_EXPIRE)
|
||||
return self.status
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
if monotonic() < self.timeout_at and self.status:
|
||||
cache.delete(self.lock_id)
|
||||
Loading…
Add table
Add a link
Reference in a new issue