Refactor RedditBuilder

This commit is contained in:
Sonny Bakker 2020-10-11 21:50:23 +02:00
parent 90553168df
commit c68a58136c
2 changed files with 101 additions and 138 deletions

View file

@ -122,99 +122,119 @@ class RedditBuilder(PostBuilder):
if not "data" in self.payload or not "children" in self.payload["data"]:
return
posts = self.payload["data"]["children"]
rule = self.stream.rule
entries = self.payload["data"]["children"]
for post in posts:
if not "data" in post or post["kind"] != REDDIT_POST:
for entry in entries:
if not "data" in entry:
continue
elif entry.get("kind") != REDDIT_POST:
continue
elif not "id" in entry["data"]:
continue
data = post["data"]
remote_identifier = entry["data"]["id"]
remote_identifier = data["id"]
title = truncate_text(Post, "title", data["title"])
author = truncate_text(Post, "author", data["author"])
post_url_fragment = data["permalink"]
direct_url = data["url"]
is_text_post = data["is_self"]
if remote_identifier in results:
if remote_identifier in results or remote_identifier in self.existing_posts:
continue
if is_text_post:
uncleaned_body = data["selftext_html"]
unescaped_body = unescape(uncleaned_body) if uncleaned_body else ""
body = self.sanitize_fragment(unescaped_body) if unescaped_body else ""
elif direct_url.endswith(REDDIT_IMAGE_EXTENSIONS):
body = format_html(
"<div><img alt='{title}' src='{url}' loading='lazy' /></div>",
url=direct_url,
title=title,
)
elif data["is_video"]:
video_info = data["secure_media"]["reddit_video"]
body = format_html(
"<div><video controls muted><source src='{url}' type='video/mp4' /></video></div>",
url=video_info["fallback_url"],
)
elif direct_url.endswith(REDDIT_VIDEO_EXTENSIONS):
extension = next(
extension.replace(".", "")
for extension in REDDIT_VIDEO_EXTENSIONS
if direct_url.endswith(extension)
)
if extension == "gifv":
body = format_html(
"<div><video controls muted><source src='{url}' type='video/mp4' /></video></div>",
url=direct_url.replace(extension, "mp4"),
)
else:
body = format_html(
"<div><video controls muted><source src='{url}' type='video/{extension}' /></video></div>",
url=direct_url,
extension=extension,
)
else:
body = format_html(
"<div><a target='_blank' rel='noopener noreferrer' alt='{title}' href='{url}' class='link'>Direct url</a></div>",
url=direct_url,
title=title,
)
try:
parsed_date = datetime.fromtimestamp(post["data"]["created_utc"])
created_date = pytz.utc.localize(parsed_date)
except (OverflowError, OSError):
logging.warning(
f"Failed parsing timestamp from {REDDIT_URL}{post_url_fragment}"
)
created_date = timezone.now()
post_data = {
"remote_identifier": remote_identifier,
"title": title,
"body": body,
"author": author,
"url": f"{REDDIT_URL}{post_url_fragment}",
"publication_date": created_date,
"rule": rule,
}
if remote_identifier in self.existing_posts:
existing_post = self.existing_posts[remote_identifier]
for key, value in post_data.items():
setattr(existing_post, key, value)
results[existing_post.remote_identifier] = existing_post
post = self.build_post(entry["data"])
except KeyError:
logger.exception(f"Failed building post {remote_identifier}")
continue
results[remote_identifier] = Post(**post_data)
results[remote_identifier] = post
self.instances = results.values()
def build_post(self, entry):
rule = self.stream.rule
remote_identifier = entry["id"]
title = truncate_text(Post, "title", entry["title"])
author = truncate_text(Post, "author", entry["author"])
post_url_fragment = entry["permalink"]
direct_url = entry["url"]
if entry["is_self"]:
body = self.get_text_post(entry)
elif direct_url.endswith(REDDIT_IMAGE_EXTENSIONS):
body = self.get_image_post(entry)
elif entry["is_video"]:
body = self.get_native_video_post(entry)
elif direct_url.endswith(REDDIT_VIDEO_EXTENSIONS):
body = self.get_video_post(entry)
else:
body = self.get_url_post(entry)
try:
parsed_date = datetime.fromtimestamp(entry["created_utc"])
created_date = pytz.utc.localize(parsed_date)
except (OverflowError, OSError):
logging.warning(
f"Failed parsing timestamp from {REDDIT_URL}{post_url_fragment}"
)
created_date = timezone.now()
post_entry = {
"remote_identifier": remote_identifier,
"title": title,
"body": body,
"author": author,
"url": f"{REDDIT_URL}{post_url_fragment}",
"publication_date": created_date,
"rule": rule,
}
return Post(**post_entry)
def get_text_post(self, entry):
uncleaned_body = entry["selftext_html"]
unescaped_body = unescape(uncleaned_body) if uncleaned_body else ""
return self.sanitize_fragment(unescaped_body) if unescaped_body else ""
def get_image_post(self, entry):
return format_html(
"<div><img alt='{title}' src='{url}' loading='lazy' /></div>",
url=entry["url"],
title=entry["title"],
)
def get_native_video_post(self, entry):
video_info = entry["secure_media"]["reddit_video"]
return format_html(
"<div><video controls muted><source src='{url}' type='video/mp4' /></video></div>",
url=video_info["fallback_url"],
)
def get_video_post(self, entry):
url = entry["url"]
extension = next(
extension.replace(".", "")
for extension in REDDIT_VIDEO_EXTENSIONS
if url.endswith(extension)
)
if extension == "gifv":
return format_html(
"<div><video controls muted><source src='{url}' type='video/mp4' /></video></div>",
url=url.replace(extension, "mp4"),
)
return format_html(
"<div><video controls muted><source src='{url}' type='video/{extension}' /></video></div>",
url=url,
extension=extension,
)
def get_url_post(self, entry):
return format_html(
"<div><a target='_blank' rel='noopener noreferrer' alt='{title}' href='{url}' class='link'>Direct url</a></div>",
url=entry["url"],
title=entry["title"],
)
class RedditStream(PostStream):
rule_type = RuleTypeChoices.subreddit

View file

@ -86,52 +86,6 @@ class RedditBuilderTestCase(TestCase):
self.assertEquals(Post.objects.count(), 0)
def test_update_posts(self):
subreddit = SubredditFactory()
existing_post = RedditPostFactory(
remote_identifier="hm0qct",
author="Old author",
title="Old title",
body="Old body",
url="https://bbc.com/",
rule=subreddit,
)
builder = RedditBuilder
mock_stream = Mock(rule=subreddit)
with builder(simple_mock, mock_stream) as builder:
builder.build()
builder.save()
posts = {post.remote_identifier: post for post in Post.objects.all()}
self.assertCountEqual(
("hm0qct", "hna75r", "hngs71", "hngsj8", "hnd7cy"), posts.keys()
)
existing_post.refresh_from_db()
self.assertEquals(existing_post.remote_identifier, "hm0qct")
self.assertEquals(existing_post.author, "AutoModerator")
self.assertEquals(
existing_post.title,
"Linux Experiences/Rants or Education/Certifications thread - July 06, 2020",
)
self.assertIn(
"This megathread is also to hear opinions from anyone just starting out "
"with Linux or those that have used Linux (GNU or otherwise) for a long time.",
existing_post.body,
)
self.assertEquals(
existing_post.publication_date,
pytz.utc.localize(datetime(2020, 7, 6, 6, 11, 22)),
)
self.assertEquals(
existing_post.url,
"https://www.reddit.com/r/linux/comments/hm0qct/linux_experiencesrants_or_educationcertifications/",
)
def test_html_sanitizing(self):
builder = RedditBuilder
@ -225,17 +179,6 @@ class RedditBuilderTestCase(TestCase):
("hm0qct", "hna75r", "hngs71", "hngsj8", "hnd7cy"), posts.keys()
)
duplicate_post.refresh_from_db()
self.assertEquals(
duplicate_post.publication_date,
pytz.utc.localize(datetime(2020, 7, 6, 6, 11, 22)),
)
self.assertEquals(
duplicate_post.title,
"Linux Experiences/Rants or Education/Certifications thread - July 06, 2020",
)
def test_image_post(self):
builder = RedditBuilder