commit 99fd94580f95dcbfb77b73e2de846f76a5709ef9
Author: Sonny <sonnyba871@gmail.com>
Date: Sat Feb 15 21:45:16 2020 +0100
Use postgres password
As of https://gitlab.com/gitlab-com/support-forum/issues/5199
130 lines
4.5 KiB
Python
130 lines
4.5 KiB
Python
from unittest.mock import MagicMock, patch
|
|
|
|
from django.test import TestCase
|
|
from django.utils.lorem_ipsum import words
|
|
|
|
from newsreader.news.collection.exceptions import (
|
|
StreamDeniedException,
|
|
StreamException,
|
|
StreamNotFoundException,
|
|
StreamParseException,
|
|
StreamTimeOutException,
|
|
)
|
|
from newsreader.news.collection.feed import FeedClient
|
|
from newsreader.news.collection.tests.factories import FeedFactory
|
|
|
|
from .mocks import simple_mock
|
|
|
|
|
|
class FeedClientTestCase(TestCase):
|
|
def setUp(self):
|
|
self.maxDiff = None
|
|
|
|
self.patched_read = patch("newsreader.news.collection.feed.FeedStream.read")
|
|
self.mocked_read = self.patched_read.start()
|
|
|
|
def tearDown(self):
|
|
patch.stopall()
|
|
|
|
def test_client_retrieves_single_rules(self):
|
|
rule = FeedFactory.create()
|
|
mock_stream = MagicMock(rule=rule)
|
|
|
|
self.mocked_read.return_value = (simple_mock, mock_stream)
|
|
|
|
with FeedClient([rule]) as client:
|
|
for data, stream in client:
|
|
self.assertEquals(data, simple_mock)
|
|
self.assertEquals(stream, mock_stream)
|
|
|
|
self.mocked_read.assert_called_once_with()
|
|
|
|
def test_client_catches_stream_exception(self):
|
|
rule = FeedFactory.create()
|
|
mock_stream = MagicMock(rule=rule)
|
|
|
|
self.mocked_read.side_effect = StreamException(message="Stream exception")
|
|
|
|
with FeedClient([rule]) as client:
|
|
for data, stream in client:
|
|
self.assertEquals(data, {"entries": []})
|
|
self.assertEquals(stream.rule.error, "Stream exception")
|
|
self.assertEquals(stream.rule.succeeded, False)
|
|
|
|
self.mocked_read.assert_called_once_with()
|
|
|
|
def test_client_catches_stream_not_found_exception(self):
|
|
rule = FeedFactory.create()
|
|
mock_stream = MagicMock(rule=rule)
|
|
|
|
self.mocked_read.side_effect = StreamNotFoundException(
|
|
message="Stream not found"
|
|
)
|
|
|
|
with FeedClient([rule]) as client:
|
|
for data, stream in client:
|
|
self.assertEquals(data, {"entries": []})
|
|
self.assertEquals(stream.rule.error, "Stream not found")
|
|
self.assertEquals(stream.rule.succeeded, False)
|
|
|
|
self.mocked_read.assert_called_once_with()
|
|
|
|
def test_client_catches_stream_denied_exception(self):
|
|
rule = FeedFactory.create()
|
|
mock_stream = MagicMock(rule=rule)
|
|
|
|
self.mocked_read.side_effect = StreamDeniedException(message="Stream denied")
|
|
|
|
with FeedClient([rule]) as client:
|
|
for data, stream in client:
|
|
self.assertEquals(data, {"entries": []})
|
|
self.assertEquals(stream.rule.error, "Stream denied")
|
|
self.assertEquals(stream.rule.succeeded, False)
|
|
|
|
self.mocked_read.assert_called_once_with()
|
|
|
|
def test_client_catches_stream_timed_out(self):
|
|
rule = FeedFactory.create()
|
|
mock_stream = MagicMock(rule=rule)
|
|
|
|
self.mocked_read.side_effect = StreamTimeOutException(
|
|
message="Stream timed out"
|
|
)
|
|
|
|
with FeedClient([rule]) as client:
|
|
for data, stream in client:
|
|
self.assertEquals(data, {"entries": []})
|
|
self.assertEquals(stream.rule.error, "Stream timed out")
|
|
self.assertEquals(stream.rule.succeeded, False)
|
|
|
|
self.mocked_read.assert_called_once_with()
|
|
|
|
def test_client_catches_stream_parse_exception(self):
|
|
rule = FeedFactory.create()
|
|
mock_stream = MagicMock(rule=rule)
|
|
|
|
self.mocked_read.side_effect = StreamParseException(
|
|
message="Stream has wrong contents"
|
|
)
|
|
|
|
with FeedClient([rule]) as client:
|
|
for data, stream in client:
|
|
self.assertEquals(data, {"entries": []})
|
|
self.assertEquals(stream.rule.error, "Stream has wrong contents")
|
|
self.assertEquals(stream.rule.succeeded, False)
|
|
|
|
self.mocked_read.assert_called_once_with()
|
|
|
|
def test_client_catches_long_exception_text(self):
|
|
rule = FeedFactory.create()
|
|
mock_stream = MagicMock(rule=rule)
|
|
|
|
self.mocked_read.side_effect = StreamParseException(message=words(1000))
|
|
|
|
with FeedClient([rule]) as client:
|
|
for data, stream in client:
|
|
self.assertEquals(data, {"entries": []})
|
|
self.assertEquals(len(stream.rule.error), 1024)
|
|
self.assertEquals(stream.rule.succeeded, False)
|
|
|
|
self.mocked_read.assert_called_once_with()
|