From b53c40a3ddda1e6cfde3caad8af7d59e43db6047 Mon Sep 17 00:00:00 2001 From: Sonny Bakker Date: Sat, 26 Sep 2020 22:45:04 +0200 Subject: [PATCH] Add twitter timeline task --- src/newsreader/news/collection/tasks.py | 34 +++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/newsreader/news/collection/tasks.py b/src/newsreader/news/collection/tasks.py index a04c5f9..926b05b 100644 --- a/src/newsreader/news/collection/tasks.py +++ b/src/newsreader/news/collection/tasks.py @@ -114,6 +114,40 @@ class RedditTokenTask(app.Task): user.save() +class TwitterTimelineTask(app.Task): + name = "TwitterTimelineTask" + ignore_result = True + + def run(self, user_pk): + from newsreader.news.collection.twitter import ( + TwitterCollector, + TwitterTimeLineScheduler, + ) + + try: + user = User.objects.get(pk=user_pk) + except ObjectDoesNotExist: + message = f"User {user_pk} does not exist" + logger.exception(message) + + raise Reject(reason=message, requeue=False) + + with MemCacheLock("f{user.email}-timeline-task", self.app.oid) as acquired: + if acquired: + logger.info(f"Running twitter timeline task for user {user_pk}") + + scheduler = TwitterTimeLineScheduler(user) + timelines = scheduler.get_scheduled_rules() + + collector = TwitterCollector() + collector.collect(rules=timelines) + else: + logger.warning(f"Cancelling task due to existing lock") + + raise Reject(reason="Task already running", requeue=False) + + FeedTask = app.register_task(FeedTask()) RedditTask = app.register_task(RedditTask()) RedditTokenTask = app.register_task(RedditTokenTask()) +TwitterTimelineTask = app.register_task(TwitterTimelineTask())