Commit a8589ce1 authored by Markus Shepherd's avatar Markus Shepherd 🙈
Browse files

add MultiFeedExporter extension; linting

parent b93a13d3
[ludoj]
disable=broad-except,protected-access,too-many-arguments,too-many-ancestors,too-many-instance-attributes,too-many-locals,too-many-statements
max-branches=20
max-returns=10
min-public-methods=1
min-similarity-lines=20
......@@ -19,12 +19,12 @@ pip install -Ur requirements.txt
Run a spider like so:
```bash
scrapy crawl <spider> -o 'feeds/%(name)s/%(time)s.csv'
scrapy crawl <spider> -o 'feeds/%(name)s/%(time)s/%(class)s.csv'
```
where `<spider>` is one of the IDs above.
You can run `scrapy check` to perform contract tests for all spiders, or
You can run `scrapy check` to perform contract tests for all spiders, or
`scrapy check <spider>` to test one particular spider. If tests fails,
there most likely has been some change on the website and the spider needs
updating.
# -*- coding: utf-8 -*-
''' Scrapy extensions '''
import logging
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.extensions.feedexport import FeedExporter
from scrapy.utils.misc import load_object
from twisted.internet.defer import DeferredList, maybeDeferred
LOGGER = logging.getLogger(__name__)
def _safe_load_object(obj):
return load_object(obj) if isinstance(obj, str) else obj
class MultiFeedExporter(object):
''' allows exporting several types of items in the same spider '''
@classmethod
def from_crawler(cls, crawler):
''' init from crawler '''
obj = cls(crawler.settings)
crawler.signals.connect(obj._open_spider, signals.spider_opened)
crawler.signals.connect(obj._close_spider, signals.spider_closed)
crawler.signals.connect(obj._item_scraped, signals.item_scraped)
return obj
def __init__(self, settings, exporter=FeedExporter):
self.settings = settings
self.urifmt = self.settings.get('MULTI_FEED_URI') or self.settings.get('FEED_URI')
if not self.settings.getbool('MULTI_FEED_ENABLED') or not self.urifmt:
raise NotConfigured
self.exporter_cls = _safe_load_object(exporter)
self.item_classes = ()
self._exporters = {}
LOGGER.info('MultiFeedExporter URI: <%s>', self.urifmt)
LOGGER.info('MultiFeedExporter exporter class: %r', self.exporter_cls)
def _open_spider(self, spider):
self.item_classes = (
getattr(spider, 'item_classes', None)
or self.settings.getlist('MULTI_FEED_ITEM_CLASSES') or ())
if isinstance(self.item_classes, str):
self.item_classes = self.item_classes.split(',')
self.item_classes = tuple(map(_safe_load_object, self.item_classes))
LOGGER.info('MultiFeedExporter item classes: %s', self.item_classes)
for item_cls in self.item_classes:
# pylint: disable=cell-var-from-loop
def _uripar(params, spider, *, cls_name=item_cls.__name__):
params['class'] = cls_name
LOGGER.info('_uripar(%r, %r, %r)', params, spider, cls_name)
return params
exporter = self.exporter_cls(self.settings)
exporter._uripar = _uripar
exporter.open_spider(spider)
self._exporters[item_cls] = exporter
LOGGER.info(self._exporters)
def _close_spider(self, spider):
return DeferredList(
maybeDeferred(exporter.close_spider, spider) for exporter in self._exporters.values())
def _item_scraped(self, item, spider):
item_cls = type(item)
exporter = self._exporters.get(item_cls)
if exporter is None:
LOGGER.warning('no exporter found for class %r', item_cls)
else:
item = exporter.item_scraped(item, spider)
return item
# -*- coding: utf-8 -*-
''' Scrapy items '''
from scrapy import Field, Item
class GameItem(Item):
''' item representing a game '''
name = Field(required=True)
alt_name = Field()
year = Field(dtype=int, default=None)
......@@ -44,3 +49,9 @@ class GameItem(Item):
wikipedia_id = Field()
dbpedia_id = Field()
luding_id = Field(dtype=int, default=None)
class RatingItem(Item):
''' item representing a rating '''
bgg_id = Field(dtype=int, default=None)
# -*- coding: utf-8 -*-
''' Scrapy item loaders '''
from __future__ import unicode_literals
from collections import OrderedDict
......@@ -10,6 +12,8 @@ from scrapy.loader.processors import TakeFirst, MapCompose
from w3lib.html import remove_tags, replace_entities
def normalize_space(item, preserve_newline=False):
''' normalize space in a string '''
if preserve_newline:
try:
return '\n'.join(normalize_space(line) for line in item.split('\n')).strip()
......@@ -23,22 +27,25 @@ def normalize_space(item, preserve_newline=False):
return ''
class GameLoader(ItemLoader):
''' loader for GameItem '''
default_input_processor = MapCompose(remove_tags, replace_entities,
replace_entities, normalize_space)
default_output_processor = TakeFirst()
def clear_list(self, items):
# pylint: disable=no-self-use
def _clear_list(self, items):
return list(OrderedDict.fromkeys(item for item in items if item))
alt_name_out = clear_list
alt_name_out = _clear_list
description_in = MapCompose(remove_tags, replace_entities, replace_entities,
partial(normalize_space, preserve_newline=True))
designer_out = clear_list
artist_out = clear_list
publisher_out = clear_list
designer_out = _clear_list
artist_out = _clear_list
publisher_out = _clear_list
image_url_out = clear_list
video_url_out = clear_list
external_link_out = clear_list
image_url_out = _clear_list
video_url_out = _clear_list
external_link_out = _clear_list
# -*- coding: utf-8 -*-
''' Scrapy item pipelines '''
from __future__ import unicode_literals
from scrapy.exceptions import DropItem
class ValidatePipeline(object):
''' validate items '''
# pylint: disable=no-self-use,unused-argument
def process_item(self, item, spider):
''' verify if all required fields are present '''
if all(item.get(field) for field in item.fields if item.fields[field].get('required')):
return item
else:
raise DropItem('Missing required field in {:s}'.format(item))
raise DropItem('Missing required field in {:s}'.format(item))
class DataTypePipeline(object):
''' convert fields to their required data type '''
# pylint: disable=no-self-use,unused-argument
def process_item(self, item, spider):
''' convert to data type '''
for field in item.fields:
dtype = item.fields[field].get('dtype')
......
# -*- coding: utf-8 -*-
''' Scrapy settings '''
BOT_NAME = 'ludoj'
SPIDER_MODULES = ['ludoj.spiders']
NEWSPIDER_MODULE = 'ludoj.spiders'
FEED_EXPORT_FIELDS = ('name', 'alt_name', 'year',
'game_type', 'description',
'designer', 'artist', 'publisher',
'url', 'image_url',
'video_url', 'external_link', 'list_price',
'min_players', 'max_players',
'min_age', 'max_age',
'min_time', 'max_time',
'rank', 'num_votes', 'avg_rating',
'stddev_rating', 'bayes_rating',
'worst_rating', 'best_rating',
'complexity', 'easiest_complexity', 'hardest_complexity',
'bgg_id', 'freebase_id', 'wikidata_id',
'wikipedia_id', 'dbpedia_id', 'luding_id')
FEED_EXPORT_FIELDS = (
'name', 'alt_name', 'year',
'game_type', 'description',
'designer', 'artist', 'publisher',
'url', 'image_url',
'video_url', 'external_link', 'list_price',
'min_players', 'max_players',
'min_age', 'max_age',
'min_time', 'max_time',
'rank', 'num_votes', 'avg_rating',
'stddev_rating', 'bayes_rating',
'worst_rating', 'best_rating',
'complexity', 'easiest_complexity', 'hardest_complexity',
'bgg_id', 'freebase_id', 'wikidata_id',
'wikipedia_id', 'dbpedia_id', 'luding_id')
# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = 'ludoj (+http://www.yourdomain.com)'
......@@ -65,9 +68,12 @@ DEFAULT_REQUEST_HEADERS = {
# Enable or disable extensions
# See http://scrapy.readthedocs.org/en/latest/topics/extensions.html
#EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
#}
EXTENSIONS = {
'scrapy.extensions.feedexport.FeedExporter': None,
'ludoj.extensions.MultiFeedExporter': 500,
}
MULTI_FEED_ENABLED = True
# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
......
# -*- coding: utf-8 -*-
''' BoardGameGeek spider '''
from __future__ import unicode_literals
from scrapy import Request, Spider
......@@ -7,13 +9,16 @@ from scrapy import Request, Spider
from ludoj.items import GameItem
from ludoj.loaders import GameLoader
def extract_bgg_id(url):
def _extract_bgg_id(url):
return int(url.split('/')[2]) if url else None
class BggSpider(Spider):
''' BoardGameGeek spider '''
name = 'bgg'
allowed_domains = ['boardgamegeek.com']
start_urls = ['https://boardgamegeek.com/browse/boardgame/']
item_classes = (GameItem,)
# https://www.boardgamegeek.com/wiki/page/BGG_XML_API2
xml_api_url = 'https://www.boardgamegeek.com/xmlapi2/thing?id={id}&stats=1&versions=1&videos=1'
......@@ -31,13 +36,14 @@ class BggSpider(Spider):
for game in response.css('tr#row_'):
url = game.css('td.collection_objectname a::attr(href)').extract_first()
bgg_id = extract_bgg_id(url)
bgg_id = _extract_bgg_id(url)
if bgg_id is not None:
request = Request(self.xml_api_url.format(id=bgg_id), callback=self.parse_game)
request.meta['profile_url'] = response.urljoin(url) if url else None
yield request
# pylint: disable=no-self-use
def parse_game(self, response):
"""
@url https://www.boardgamegeek.com/xmlapi2/thing?id=13&stats=1&versions=1&videos=1
......
# -*- coding: utf-8 -*-
''' Luding spider '''
from __future__ import unicode_literals
import re
......@@ -12,19 +14,19 @@ from scrapy import Spider, Request
from ludoj.items import GameItem
from ludoj.loaders import GameLoader
def extract_redirects(urls):
def _extract_redirects(urls):
for url in urls:
url = urlparse(url)
query = parse_qs(url.query)
for link in query.get('URL') or ():
yield link
def extract_luding_id(url):
def _extract_luding_id(url):
url = urlparse(url)
query = parse_qs(url.query)
return query.get('gameid') or None
def extract_bgg_ids(urls):
def _extract_bgg_ids(urls):
for url in urls:
url = urlparse(url)
if 'boardgamegeek.com' in url.hostname:
......@@ -34,10 +36,13 @@ def extract_bgg_ids(urls):
pass
class LudingSpider(Spider):
''' Luding spider '''
name = 'luding'
allowed_domains = ['luding.org']
start_urls = ['http://luding.org/cgi-bin/GameFirstLetter.py?letter={}'.format(letter)
for letter in string.ascii_uppercase + '0']
item_classes = (GameItem,)
def parse(self, response):
"""
......@@ -51,6 +56,7 @@ class LudingSpider(Spider):
if url:
yield Request(response.urljoin(url), callback=self.parse_game)
# pylint: disable=no-self-use
def parse_game(self, response):
"""
@url http://luding.org/cgi-bin/GameData.py?f=00w^E4W&gameid=1508
......@@ -61,12 +67,12 @@ class LudingSpider(Spider):
min_players max_players min_age
"""
h1 = response.css('h1')
game = h1.xpath('following-sibling::table')
headline = response.css('h1')
game = headline.xpath('following-sibling::table')
ldr = GameLoader(item=GameItem(), selector=game, response=response)
ldr.add_value('name', h1.extract_first())
ldr.add_value('name', headline.extract_first())
ldr.add_xpath('year', 'tr[td = "Year:"]/td[2]')
ldr.add_xpath('game_type', 'tr[td = "Type:"]/td[2]')
ldr.add_xpath('description', 'tr[td = "Box text:"]/td[2]')
......@@ -79,7 +85,7 @@ class LudingSpider(Spider):
images = game.css('img::attr(src)').extract()
ldr.add_value('image_url', {response.urljoin(i) for i in images})
links = game.xpath('.//a/@href[starts-with(., "/cgi-bin/Redirect.py")]').extract()
links = frozenset(extract_redirects(response.urljoin(link) for link in links))
links = frozenset(_extract_redirects(response.urljoin(link) for link in links))
ldr.add_value('external_link', links)
players = game.xpath('tr[td = "No. of players:"]/td[2]/text()').extract_first()
......@@ -92,7 +98,7 @@ class LudingSpider(Spider):
# ldr.add_xpath('min_time', 'minplaytime/@value')
# ldr.add_xpath('max_time', 'maxplaytime/@value')
ldr.add_value('bgg_id', extract_bgg_ids(links))
ldr.add_value('luding_id', extract_luding_id(response.url))
ldr.add_value('bgg_id', _extract_bgg_ids(links))
ldr.add_value('luding_id', _extract_luding_id(response.url))
return ldr.load_item()
# -*- coding: utf-8 -*-
''' Spielen.de spider '''
from __future__ import unicode_literals
import re
......@@ -9,24 +11,25 @@ from scrapy import Request, Spider
from ludoj.items import GameItem
from ludoj.loaders import GameLoader
def parse_interval(text):
def _parse_interval(text):
match = re.match(r'^.*?(\d+)(\s*-\s*(\d+))?.*$', text)
if match:
return match.group(1), match.group(3)
else:
return None, None
return None, None
def parse_int(text):
def _parse_int(text):
match = re.match(r'^.*?(\d+).*$', text)
if match:
return match.group(1)
else:
return None
return None
class SpielenSpider(Spider):
''' Spielen.de spider '''
name = "spielen"
allowed_domains = ["spielen.de"]
start_urls = ['http://gesellschaftsspiele.spielen.de/alle-brettspiele/']
item_classes = (GameItem,)
def parse(self, response):
"""
......@@ -45,6 +48,7 @@ class SpielenSpider(Spider):
if url:
yield Request(response.urljoin(url), callback=self.parse_game)
# pylint: disable=no-self-use
def parse_game(self, response):
"""
@url http://gesellschaftsspiele.spielen.de/alle-brettspiele/catan-das-spiel/
......@@ -83,13 +87,13 @@ class SpielenSpider(Spider):
ldr.add_value('video_url', (response.urljoin(v) for v in videos if v))
players = game.xpath('.//b[. = "Spieler:"]/following-sibling::text()').extract_first()
min_players, max_players = parse_interval(players) if players else (None, None)
min_players, max_players = _parse_interval(players) if players else (None, None)
ldr.add_value('min_players', min_players)
ldr.add_value('max_players', max_players)
age = game.xpath('.//b[. = "Alter:"]/following-sibling::text()').extract_first()
ldr.add_value('min_age', parse_int(age) if age else None)
ldr.add_value('min_age', _parse_int(age) if age else None)
time = game.xpath('.//b[. = "Dauer:"]/following-sibling::text()').extract_first()
min_time, max_time = parse_interval(time) if time else (None, None)
min_time, max_time = _parse_interval(time) if time else (None, None)
ldr.add_value('min_time', min_time)
ldr.add_value('max_time', max_time)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment