From 02bb95fde86ff9a9ec38da7f91f28b02f6cb7eda Mon Sep 17 00:00:00 2001 From: Sonny Bakker Date: Sat, 26 Sep 2020 20:44:00 +0200 Subject: [PATCH] Add twitter client --- src/newsreader/news/collection/twitter.py | 54 ++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/src/newsreader/news/collection/twitter.py b/src/newsreader/news/collection/twitter.py index e2be78f..b503636 100644 --- a/src/newsreader/news/collection/twitter.py +++ b/src/newsreader/news/collection/twitter.py @@ -1,9 +1,11 @@ import logging +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from json import JSONDecodeError from django.conf import settings +from django.utils import timezone from django.utils.html import format_html, urlize import pytz @@ -22,7 +24,9 @@ from newsreader.news.collection.choices import RuleTypeChoices, TwitterPostTypeC from newsreader.news.collection.exceptions import ( StreamDeniedException, StreamException, + StreamNotFoundException, StreamParseException, + StreamTimeOutException, StreamTooManyException, ) from newsreader.news.collection.utils import fetch, truncate_text @@ -174,7 +178,55 @@ class TwitterStream(PostStream): class TwitterClient(PostClient): - pass + stream = TwitterStream + + def __enter__(self): + streams = [self.stream(timeline) for timeline in self.rules] + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(stream.read): stream for stream in streams} + + for future in as_completed(futures): + stream = futures[future] + + try: + payload = future.result() + + stream.rule.error = None + stream.rule.succeeded = True + + yield payload + except StreamTooManyException as e: + logger.exception("Ratelimit hit, aborting twitter calls") + + self.set_rule_error(stream.rule, e) + + break + except StreamDeniedException as e: + logger.warning(f"Access token expired for user {stream.user.pk}") + + stream.rule.user.twitter_oauth_token = None + stream.rule.user.twitter_oauth_token_secret = None + stream.rule.user.save() + + self.set_rule_error(stream.rule, e) + + break + except (StreamNotFoundException, StreamTimeOutException) as e: + logger.warning(f"Request failed for {stream.rule.screen_name}") + + self.set_rule_error(stream.rule, e) + + continue + except StreamException as e: + logger.exception(f"Request failed for {stream.rule.screen_name}") + + self.set_rule_error(stream.rule, e) + + continue + finally: + stream.rule.last_run = timezone.now() + stream.rule.save() class TwitterCollector(PostCollector):