Commit 9a9b11b0 authored by hydrargyrum's avatar hydrargyrum
Browse files

[reddit] new CapMessages/CapImage module

TODO CapMessagesPost
parent 85d42074
# -*- coding: utf-8 -*-
# Copyright(C) 2017 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from .module import RedditModule
__all__ = ['RedditModule']
# -*- coding: utf-8 -*-
# Copyright(C) 2017 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from weboob.browser import PagesBrowser, URL
from .pages import ListPage, SearchPage, EntryPage, CatchHTTP
class RedditBrowser(PagesBrowser):
BASEURL = 'https://www.reddit.com/r/pics/'
listing = URL(r'(?P<cat>\w*)/?\?count=\d+&after=(?P<after>\w+)',
r'(?P<cat>\w*)/?$',
ListPage)
entry = URL(r'/comments/(?P<id>\w+)/.*', EntryPage)
search = URL(r'search\?sort=(?P<sort>\w+)&restrict_sr=on', SearchPage)
# catch-all to avoid BrowserHTTPSDowngrade
catch_http = URL(r'http://.*', CatchHTTP)
def __init__(self, sub, *args, **kwargs):
super(RedditBrowser, self).__init__(*args, **kwargs)
self.BASEURL = 'https://www.reddit.com/r/%s/' % sub
def iter_images(self, cat=''):
self.listing.go(cat=cat)
return self.page.iter_images()
def search_images(self, pattern, sort='top', nsfw=False):
nsfw = {True: 'yes', False: 'no'}[nsfw]
pattern = '%s nsfw:%s' % (pattern, nsfw)
self.search.go(sort=sort, params={'q': pattern})
return self.page.iter_images()
def iter_threads(self, cat=''):
self.listing.go(cat=cat)
return self.page.iter_threads()
def fill_thread(self, thread):
self.location(thread.url, params={'sort': 'old'})
assert self.entry.is_here()
self.page.fill_thread(thread)
def get_thread(self, id):
self.entry.go(id=id, params={'sort': 'old'})
return self.page.get_thread(id)
def get_image(self, id):
self.entry.go(id=id)
img = self.page.get_image()
img.id = id
return img
# -*- coding: utf-8 -*-
# Copyright(C) 2017 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from weboob.tools.backend import Module, BackendConfig
from weboob.tools.value import Value
from weboob.capabilities.image import CapImage, BaseImage, Thumbnail
from weboob.capabilities.messages import CapMessages, Thread
from weboob.capabilities.collection import CapCollection, Collection
from .browser import RedditBrowser
__all__ = ['RedditModule']
def register_resources_handler(d, *path):
def decorator(func):
d[path] = func
return func
return decorator
class RedditModule(Module, CapImage, CapCollection, CapMessages):
NAME = 'reddit'
DESCRIPTION = u'reddit website'
MAINTAINER = u'Vincent A'
EMAIL = 'dev@indigo.re'
LICENSE = 'AGPLv3+'
VERSION = '1.4'
CONFIG = BackendConfig(
Value('subreddit', label='Name of the sub-reddit', regexp='[^/]+', default='pics'),
)
BROWSER = RedditBrowser
def create_default_browser(self):
return self.create_browser(self.config['subreddit'].get())
def get_file(self, _id):
raise NotImplementedError()
def get_image(self, id):
return self.browser.get_image(id)
def search_file(self, pattern, sortby=CapImage.SEARCH_RELEVANCE):
return self.browser.search_images(pattern, sortby, True)
def search_image(self, pattern, sortby=CapImage.SEARCH_RELEVANCE, nsfw=False):
sorting = {
CapImage.SEARCH_RELEVANCE: 'relevance',
CapImage.SEARCH_RATING: 'top',
CapImage.SEARCH_VIEWS: 'top', # not implemented
CapImage.SEARCH_DATE: 'new',
}
sortby = sorting[sortby]
return self.browser.search_images(pattern, sortby, nsfw)
def iter_threads(self):
return self.browser.iter_threads()
def get_thread(self, id):
return self.browser.get_thread(id)
def iter_resources(self, objs, split_path):
for k in self.RESOURCES:
if len(k) == len(split_path) and all(a is None or a == b for a, b in zip(k, split_path)):
f = self.RESOURCES[k]
return f(self, objs, *split_path)
RESOURCES = {}
@register_resources_handler(RESOURCES)
def iter_resources_root(self, objs):
return [
Collection(['hot'], 'Hot threads'),
Collection(['new'], 'New threads'),
Collection(['rising'], 'Rising threads'),
Collection(['controversial'], 'Controversial threads'),
Collection(['top'], 'Top threads'),
]
@register_resources_handler(RESOURCES, None)
def iter_resources_dir(self, objs, key):
if key == 'hot':
key = ''
if Thread in objs:
return self.iter_threads(cat=key)
if BaseImage in objs:
return self.browser.iter_images(cat=key)
return []
def fill_data(self, obj, fields):
if 'thumbnail' in fields and not obj.thumbnail.data:
obj.thumbnail.data = self.browser.open(obj.thumbnail.url).content
if 'data' in fields:
obj.data = self.browser.open(obj.url).content
def fill_thread(self, obj, fields):
if 'root' in fields:
self.browser.fill_thread(obj)
OBJECTS = {
BaseImage: fill_data,
Thumbnail: fill_data,
Thread: fill_thread,
}
# -*- coding: utf-8 -*-
# Copyright(C) 2017 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from collections import OrderedDict
from weboob.browser.elements import method, ListElement, ItemElement, SkipItem
from weboob.browser.filters.standard import CleanText, Regexp, Field, DateTime
from weboob.browser.filters.html import AbsoluteLink, Link, Attr, CleanHTML
from weboob.browser.pages import HTMLPage, RawPage, pagination
from weboob.capabilities.image import BaseImage, Thumbnail
from weboob.capabilities.messages import Thread, Message
from weboob.tools.compat import urljoin
class list_entry(ItemElement):
obj_title = CleanText('.//a[has-class("title")]')
obj_date = DateTime(Attr('.//time[@class="live-timestamp"]', 'datetime'))
obj__page = AbsoluteLink('.//a[has-class("comments")]')
obj_id = Regexp(Field('_page'), '/comments/([^/]+)/')
class ListPage(HTMLPage):
@pagination
@method
class iter_images(ListElement):
item_xpath = '//div[has-class("entry")]'
class item(list_entry):
klass = BaseImage
obj_author = CleanText('.//a[has-class("author")]')
def obj_thumbnail(self):
path = Attr('..//a[has-class("thumbnail")]/img', 'src', default=None)(self)
if path is None:
raise SkipItem('not an image thread')
return Thumbnail(urljoin(self.page.url, path))
def obj_url(self):
self.obj_thumbnail()
url = urljoin(self.page.url, Link('..//a[has-class("thumbnail")]')(self))
if url != Field('_page')(self):
return url
# TODO lazy load with fillobj?
return self.page.browser.open(url).page.get_image_url()
next_page = Link('//a[contains(@rel,"next")]', default=None)
@pagination
@method
class iter_threads(ListElement):
item_xpath = '//div[has-class("entry")]'
class item(list_entry):
klass = Thread
obj_url = Field('_page')
next_page = Link('//a[contains(@rel,"next")]', default=None)
class SearchPage(HTMLPage):
@pagination
@method
class iter_images(ListElement):
item_xpath = '//div[has-class("search-result")]'
class item(ItemElement):
klass = BaseImage
obj__page = AbsoluteLink('.//a[has-class("search-comments")]')
obj_id = Regexp(Field('_page'), '/comments/([^/]+)/')
obj_date = DateTime(Attr('.//time', 'datetime'))
obj_title = CleanText('.//a[has-class("search-title")]')
obj_author = CleanText('.//a[has-class("author")]')
def obj_thumbnail(self):
path = Attr('./a[has-class("thumbnail")]/img', 'src', default=None)(self)
if path is None:
raise SkipItem('not an image thread')
return Thumbnail(urljoin(self.page.url, path))
def obj_url(self):
self.obj_thumbnail()
url = urljoin(self.page.url, Link('./a[has-class("thumbnail")]')(self))
if url != Field('_page')(self):
return url
# TODO lazy load with fillobj?
return self.page.browser.open(url).page.get_image_url()
class EntryPage(HTMLPage):
@method
class get_image(ItemElement):
klass = BaseImage
obj_title = CleanText('//div[@id="siteTable"]//a[has-class("title")]')
obj_date = DateTime(Attr('//div[@id="siteTable"]//time', 'datetime'))
obj_author = CleanText('//div[@id="siteTable"]//a[has-class("author")]')
def obj_thumbnail(self):
path = Attr('//div[@id="siteTable"]//a[has-class("thumbnail")]/img', 'src', default=None)(self)
if path is None:
raise SkipItem('not an image thread')
return Thumbnail(urljoin(self.page.url, path))
def obj_url(self):
return self.page.get_image_url()
def obj__page(self):
return self.page.url
def get_image_url(self):
if self.doc.xpath('//video[@class="preview"]'):
raise SkipItem('Videos are not implemented')
return urljoin(self.url, Link('//a[img[@class="preview"]]')(self.doc))
def get_thread(self, id):
thr = Thread(id=id)
self.fill_thread(thr)
thr.date = thr.root.date
thr.title = thr.root.title
thr.url = thr.root.url
return thr
def fill_thread(self, thread):
thread.root = None
msgs = OrderedDict()
title = CleanText('//a[has-class("title")]')(self.doc)
for m in self.iter_messages():
m.thread = thread
if not m.url:
assert not thread.root, 'there cannot be 2 roots'
thread.root = m
m.id = thread.id
m.parent = None
m.url = self.url
else:
assert m.id not in msgs
msgs[m.id] = m
m.id = '%s.%s' % (thread.id, m.id)
for m in msgs.values():
if m is thread.root:
continue
if m._parent_part:
m.parent = msgs[m._parent_part]
else:
m.parent = thread.root
m.parent.children.append(m)
m.title = 'Re: %s' % title
thread.root.title = title
@method
class iter_messages(ListElement):
item_xpath = '//div[has-class("entry")]'
class item(ItemElement):
klass = Message
# TODO deleted messages, collapsed messages, pagination
def condition(self):
if len(self.el.xpath('./span[@class="morecomments"]')):
return False
if len(self.el.xpath('.//div[has-class("usertext")][has-class("grayed")]')):
return False
if len(self.el.xpath('./ancestor::div[@id="siteTable_deleted"]')):
return False
return True
obj_content = CleanHTML('.//div[has-class("usertext-body")]')
obj_sender = CleanText('.//a[has-class("author")]')
obj_date = DateTime(Attr('.//time[@class="live-timestamp"]', 'datetime'))
obj_url = AbsoluteLink('.//a[@data-event-action="permalink"]', default='')
obj_id = Regexp(Field('url'), '/(\w+)/$', default=None)
obj__parent_part = Regexp(Link('.//a[@data-event-action="parent"]', default=''), r'#(\w+)', default=None)
def obj_children(self):
return []
class CatchHTTP(RawPage):
pass
# -*- coding: utf-8 -*-
# Copyright(C) 2017 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
from contextlib import contextmanager
from weboob.capabilities.image import BaseImage
from weboob.tools.test import BackendTest
@contextmanager
def using_url(backend, url):
old = backend.browser.BASEURL
try:
backend.browser.BASEURL = url
yield
finally:
backend.browser.BASEURL = old
class RedditTest(BackendTest):
MODULE = 'reddit'
def test_colls(self):
colls = list(self.backend.iter_resources((BaseImage,), []))
self.assertTrue(all(len(c.split_path) == 1 for c in colls))
self.assertSetEqual({'hot', 'top', 'new', 'controversial', 'rising'},
set(c.split_path[0] for c in colls))
def test_images(self):
with using_url(self.backend, 'https://www.reddit.com/r/BotanicalPorn/'):
n = -1
for n, img in zip(range(10), self.backend.iter_resources((BaseImage,), ['hot'])):
self.assertTrue(img.id)
self.assertTrue(img.title)
self.assertTrue(img.url)
self.assertTrue(img.thumbnail.url)
self.assertTrue(img.date)
self.assertTrue(img.author)
self.assertEqual(n, 9)
new = self.backend.get_image(img.id)
self.assertEqual(new.id, img.id)
self.assertEqual(new.date, img.date)
self.assertEqual(new.title, img.title)
self.assertEqual(new.url, img.url)
self.assertEqual(new.thumbnail.url, img.thumbnail.url)
self.assertEqual(new.author, img.author)
def test_search(self):
with using_url(self.backend, 'https://www.reddit.com/r/BotanicalPorn/'):
n = -1
for n, img in zip(range(10), self.backend.search_image('lily')):
self.assertTrue(img.id)
self.assertTrue(img.title)
self.assertTrue(img.url)
self.assertTrue(img.thumbnail.url)
self.assertTrue(img.date)
self.assertTrue(img.author)
self.assertEqual(n, 9)
def test_thread(self):
expanded = False
for i, thr in zip(range(10), self.backend.iter_threads()):
self.assertTrue(thr.title)
self.assertTrue(thr.date)
if not expanded:
new = self.backend.get_thread(thr.id)
self.assertEqual(thr.id, new.id)
self.assertEqual(thr.title, new.title)
j = -1
for j, msg in enumerate(new.iter_all_messages()):
self.assertIs(msg.thread, new)
self.assertTrue(msg.title)
self.assertTrue(msg.sender)
self.assertTrue(msg.id)
if msg is new.root:
self.assertIsNone(msg.parent)
else:
self.assertTrue(msg.content)
self.assertTrue(msg.parent)
self.assertIn(msg, msg.parent.children)
if j > 10:
expanded = True
self.assertEqual(i, 9)
......@@ -92,6 +92,7 @@ popolemploi
pornhub
ratp
razibus
reddit
regionsjob
relaiscolis
s2e
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment