Add twitter scheduler tests

This commit is contained in:
Sonny Bakker 2020-09-26 19:34:10 +02:00
parent 8c69a22b32
commit e21c875928
3 changed files with 90 additions and 19 deletions

View file

@ -0,0 +1,63 @@
from json import JSONDecodeError
from unittest.mock import patch
from django.test import TestCase
from newsreader.accounts.tests.factories import UserFactory
from newsreader.news.collection.exceptions import StreamException
from newsreader.news.collection.twitter import TwitterTimeLineScheduler
class TwitterTimeLineSchedulerTestCase(TestCase):
def setUp(self):
patched_fetch = patch("newsreader.news.collection.twitter.fetch")
self.mocked_fetch = patched_fetch.start()
def test_simple(self):
user = UserFactory(twitter_oauth_token="foo", twitter_oauth_token_secret="bar")
self.mocked_fetch.return_value.json.return_value = {
"rate_limit_context": {"application": "dummykey"},
"resources": {
"statuses": {
"/statuses/user_timeline": {
"limit": 1500,
"remaining": 1500,
"reset": 1601141386,
}
}
},
}
scheduler = TwitterTimeLineScheduler(user)
self.assertEquals(scheduler.get_current_ratelimit(), 1500)
def test_stream_exception(self):
user = UserFactory(twitter_oauth_token=None, twitter_oauth_token_secret=None)
self.mocked_fetch.side_effect = StreamException
scheduler = TwitterTimeLineScheduler(user)
self.assertEquals(scheduler.get_current_ratelimit(), None)
def test_json_decode_error(self):
user = UserFactory(twitter_oauth_token="foo", twitter_oauth_token_secret="bar")
self.mocked_fetch.return_value.json.side_effect = JSONDecodeError(
"foo", "bar", 10
)
scheduler = TwitterTimeLineScheduler(user)
self.assertEquals(scheduler.get_current_ratelimit(), None)
def test_unexpected_contents(self):
user = UserFactory(twitter_oauth_token="foo", twitter_oauth_token_secret="bar")
self.mocked_fetch.return_value.json.return_value = {"foo": "bar"}
scheduler = TwitterTimeLineScheduler(user)
self.assertEquals(scheduler.get_current_ratelimit(), None)

View file

@ -3,11 +3,13 @@ import logging
from datetime import datetime
from json import JSONDecodeError
from django.conf import settings
from django.utils.html import format_html, urlize
import pytz
from ftfy import fix_text
from requests_oauthlib import OAuth1 as OAuth
from newsreader.news.collection.base import (
PostBuilder,
@ -166,28 +168,36 @@ class TwitterTimeLineScheduler(Scheduler):
self.user = user
if not timelines:
self.timelines = user.rules.enabled(
type=RuleTypeChoices.twitter_timeline
).order_by("last_run")[:200]
self.timelines = (
user.rules.enabled()
.filter(type=RuleTypeChoices.twitter_timeline)
.order_by("last_run")[:200]
)
else:
self.timelines = timelines
def get_scheduled_rules(self):
if (
not self.user.twitter_oauth_token
or not self.user.twitter_oauth_token_secret
):
return []
max_amount = self.get_current_ratelimit()
return self.timelines[:max_amount] if max_amount else []
def get_current_ratelimit(self):
endpoint = "application/rate_limit_status.json?resources=statuses"
# TODO add appropriate authentication (OAuth 1.0a) headers
if (
not self.user.twitter_oauth_token
or not self.user.twitter_oauth_token_secret
):
return
oauth = OAuth(
settings.TWITTER_CONSUMER_ID,
client_secret=settings.TWITTER_CONSUMER_SECRET,
resource_owner_key=self.user.twitter_oauth_token,
resource_owner_secret=self.user.twitter_oauth_token_secret,
)
try:
response = fetch(f"{TWITTER_API_URL}/{endpoint}")
response = fetch(f"{TWITTER_API_URL}/{endpoint}", auth=oauth)
except StreamException:
logger.exception(f"Unable to retrieve current ratelimit for {self.user.pk}")
return
@ -198,9 +208,7 @@ class TwitterTimeLineScheduler(Scheduler):
logger.exception(f"Unable to parse ratelimit request for {self.user.pk}")
return
if not "resources" in payload or not "statuses" in payload["resources"]:
return []
statuses = payload["resources"]["statuses"]
return statuses.get("/statuses/user_timeline", 0)
try:
return payload["resources"]["statuses"]["/statuses/user_timeline"]["limit"]
except KeyError:
return

View file

@ -25,12 +25,12 @@ def build_publication_date(dt, tz):
return published_parsed.astimezone(pytz.utc)
def fetch(url, headers={}):
def fetch(url, auth=None, headers={}):
headers = {**DEFAULT_HEADERS, **headers}
with ResponseHandler() as response_handler:
try:
response = requests.get(url, headers=headers)
response = requests.get(url, auth=auth, headers=headers)
response_handler.handle_response(response)
except RequestException as exception:
response_handler.map_exception(exception)