Add twitter client

This commit is contained in:
Sonny Bakker 2020-09-26 20:44:00 +02:00
parent 3db3a05ca0
commit 02bb95fde8

View file

@ -1,9 +1,11 @@
import logging import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime from datetime import datetime
from json import JSONDecodeError from json import JSONDecodeError
from django.conf import settings from django.conf import settings
from django.utils import timezone
from django.utils.html import format_html, urlize from django.utils.html import format_html, urlize
import pytz import pytz
@ -22,7 +24,9 @@ from newsreader.news.collection.choices import RuleTypeChoices, TwitterPostTypeC
from newsreader.news.collection.exceptions import ( from newsreader.news.collection.exceptions import (
StreamDeniedException, StreamDeniedException,
StreamException, StreamException,
StreamNotFoundException,
StreamParseException, StreamParseException,
StreamTimeOutException,
StreamTooManyException, StreamTooManyException,
) )
from newsreader.news.collection.utils import fetch, truncate_text from newsreader.news.collection.utils import fetch, truncate_text
@ -174,7 +178,55 @@ class TwitterStream(PostStream):
class TwitterClient(PostClient): class TwitterClient(PostClient):
pass stream = TwitterStream
def __enter__(self):
streams = [self.stream(timeline) for timeline in self.rules]
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(stream.read): stream for stream in streams}
for future in as_completed(futures):
stream = futures[future]
try:
payload = future.result()
stream.rule.error = None
stream.rule.succeeded = True
yield payload
except StreamTooManyException as e:
logger.exception("Ratelimit hit, aborting twitter calls")
self.set_rule_error(stream.rule, e)
break
except StreamDeniedException as e:
logger.warning(f"Access token expired for user {stream.user.pk}")
stream.rule.user.twitter_oauth_token = None
stream.rule.user.twitter_oauth_token_secret = None
stream.rule.user.save()
self.set_rule_error(stream.rule, e)
break
except (StreamNotFoundException, StreamTimeOutException) as e:
logger.warning(f"Request failed for {stream.rule.screen_name}")
self.set_rule_error(stream.rule, e)
continue
except StreamException as e:
logger.exception(f"Request failed for {stream.rule.screen_name}")
self.set_rule_error(stream.rule, e)
continue
finally:
stream.rule.last_run = timezone.now()
stream.rule.save()
class TwitterCollector(PostCollector): class TwitterCollector(PostCollector):