Commit 6ebbed2a authored by Sam Ruby's avatar Sam Ruby

Spider threads

parents ba25b691 45f0f921
......@@ -4,7 +4,7 @@ Elias Torres - FOAF OnlineAccounts
Jacques Distler - Template patches
Michael Koziarski - HTTP Auth fix
Brian Ewins - Win32 / Portalocker
Joe Gregorio - Invoke same version of Python for filters
Joe Gregorio - python versioning for filters, verbose tests, spider_threads
Harry Fuecks - Pipe characters in file names, filter bug
Eric van der Vlist - Filters to add language, category information
Chris Dolan - mkdir cache; default template_dirs; fix xsltproc
......
......@@ -98,6 +98,9 @@ use for logging output. Note: this configuration value is processed
<dd>Number of seconds to wait for any given feed</dd>
<dt><del>new_feed_items</del></dt>
<dd>Number of items to take from new feeds</dd>
<dt><ins>spider_threads</ins></dt>
<dd>The number of threads to use when spidering. When set to 0, the default,
no threads are used and spidering follows the traditional algorithm.</dd>
</dl>
</blockquote>
......
......@@ -54,7 +54,10 @@ if __name__ == "__main__":
if not offline:
from planet import spider
spider.spiderPlanet(only_if_new=only_if_new)
try:
spider.spiderPlanet(only_if_new=only_if_new)
except Exception, e:
print e
from planet import splice
doc = splice.splice()
......
......@@ -31,25 +31,4 @@ def getLogger(level, format):
return logger
def setTimeout(timeout):
""" time out rather than hang forever on ultra-slow servers."""
if timeout:
try:
timeout = float(timeout)
except:
logger.warning("Timeout set to invalid value '%s', skipping", timeout)
timeout = None
if timeout:
try:
from planet import timeoutsocket
timeoutsocket.setDefaultSocketTimeout(timeout)
logger.info("Socket timeout set to %d seconds", timeout)
except ImportError:
import socket
if hasattr(socket, 'setdefaulttimeout'):
logger.debug("timeoutsocket not found, using python function")
socket.setdefaulttimeout(timeout)
logger.info("Socket timeout set to %d seconds", timeout)
else:
logger.error("Unable to set timeout to %d seconds", timeout)
......@@ -100,6 +100,7 @@ def __init__():
define_planet('owner_email', '')
define_planet('output_theme', '')
define_planet('output_dir', 'output')
define_planet('spider_threads', 0)
define_planet_list('template_files')
define_planet_list('bill_of_materials')
......@@ -282,6 +283,11 @@ def downloadReadingList(list, orig_config, callback, use_cache=True, re_read=Tru
except:
logger.exception("Unable to read %s readinglist", list)
def http_cache_directory():
if parser.has_option('Planet', 'http_cache_directory'):
parser.get('Planet', 'http_cache_directory')
else:
return os.path.join(cache_directory(), 'sources/http')
def cache_sources_directory():
if parser.has_option('Planet', 'cache_sources_directory'):
......
......@@ -11,7 +11,7 @@ Recommended: Python 2.3 or later
Recommended: CJKCodecs and iconv_codec <http://cjkpython.i18n.org/>
"""
__version__ = "4.2-pre-" + "$Revision: 1.144 $"[11:16] + "-cvs"
__version__ = "4.2-pre-" + "$Revision: 1.146 $"[11:16] + "-cvs"
__license__ = """Copyright (c) 2002-2006, Mark Pilgrim, All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
......
This diff is collapsed.
......@@ -4,10 +4,11 @@ and write each as a set of entries in a cache directory.
"""
# Standard library modules
import time, calendar, re, os
import time, calendar, re, os, urlparse
from xml.dom import minidom
# Planet modules
import planet, config, feedparser, reconstitute, shell
import planet, config, feedparser, reconstitute, shell, socket
from StringIO import StringIO
# Regular expressions to sanitise cache filenames
re_url_scheme = re.compile(r'^\w+:/*(\w+:|www\.)?')
......@@ -116,8 +117,11 @@ def scrub(feed, data):
source.author_detail.has_key('name'):
source.author_detail['name'] = \
str(stripHtml(source.author_detail.name))
def _is_http_uri(uri):
parsed = urlparse.urlparse(uri)
return parsed[0] in ['http', 'https']
def spiderFeed(feed, only_if_new=0):
def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
""" Spider (fetch) a single feed """
log = planet.logger
......@@ -125,6 +129,7 @@ def spiderFeed(feed, only_if_new=0):
sources = config.cache_sources_directory()
if not os.path.exists(sources):
os.makedirs(sources, 0700)
feed_source = filename(sources, feed)
feed_info = feedparser.parse(feed_source)
if feed_info.feed and only_if_new:
......@@ -135,14 +140,25 @@ def spiderFeed(feed, only_if_new=0):
return
# read feed itself
modified = None
try:
modified=time.strptime(
feed_info.feed.get('planet_http_last_modified', None))
except:
pass
data = feedparser.parse(feed_info.feed.get('planet_http_location',feed),
etag=feed_info.feed.get('planet_http_etag',None), modified=modified)
if content:
# httplib2 was used to get the content, so prepare a
# proper object to pass to feedparser.
f = StringIO(content)
setattr(f, 'url', resp_headers.get('-location', feed))
if resp_headers:
if resp_headers.has_key('content-encoding'):
del resp_headers['content-encoding']
setattr(f, 'headers', resp_headers)
data = feedparser.parse(f)
else:
modified = None
try:
modified=time.strptime(
feed_info.feed.get('planet_http_last_modified', None))
except:
pass
data = feedparser.parse(feed_info.feed.get('planet_http_location',feed),
etag=feed_info.feed.get('planet_http_etag',None), modified=modified)
# capture http status
if not data.has_key("status"):
......@@ -326,12 +342,99 @@ def spiderFeed(feed, only_if_new=0):
def spiderPlanet(only_if_new = False):
""" Spider (fetch) an entire planet """
log = planet.getLogger(config.log_level(),config.log_format())
planet.setTimeout(config.feed_timeout())
global index
index = True
for feed in config.subscriptions():
timeout = config.feed_timeout()
try:
socket.setdefaulttimeout(float(timeout))
except:
try:
from planet import timeoutsocket
timeoutsocket.setDefaultSocketTimeout(float(timeout))
log.info("Socket timeout set to %d seconds", timeout)
except:
log.warning("Timeout set to invalid value '%s', skipping", timeout)
if int(config.spider_threads()):
from Queue import Queue, Empty
from threading import Thread
import httplib2
from socket import gaierror, error
work_queue = Queue()
awaiting_parsing = Queue()
http_cache = config.http_cache_directory()
if not os.path.exists(http_cache):
os.makedirs(http_cache, 0700)
def _spider_proc(thread_index):
h = httplib2.Http(http_cache)
try:
while True:
# The non-blocking get will throw an exception when the queue
# is empty which will terminate the thread.
uri = work_queue.get(block=False)
log.info("Fetching %s via %d", uri, thread_index)
try:
(resp, content) = h.request(uri)
awaiting_parsing.put(block=True, item=(resp, content, uri))
except gaierror:
log.error("Fail to resolve server name %s via %d", uri, thread_index)
except error, e:
log.error("HTTP Error: %s in thread-%d", str(e), thread_index)
except Exception, e:
import sys, traceback
type, value, tb = sys.exc_info()
log.error('Error processing %s', uri)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
except Empty, e:
log.info("Thread %d finished", thread_index)
pass
# Load the work_queue with all the HTTP(S) uris.
map(work_queue.put, [uri for uri in config.subscriptions() if _is_http_uri(uri)])
# Start all the worker threads
threads = dict([(i, Thread(target=_spider_proc, args=(i,))) for i in range(int(config.spider_threads()))])
for t in threads.itervalues():
t.start()
# Process the results as they arrive
while work_queue.qsize() or awaiting_parsing.qsize() or threads:
if awaiting_parsing.qsize() == 0 and threads:
time.sleep(1)
while awaiting_parsing.qsize():
item = awaiting_parsing.get(False)
try:
(resp_headers, content, uri) = item
if not resp_headers.fromcache:
if resp_headers.status < 300:
log.info("Parsing pre-fetched %s", uri)
spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
else:
log.error("Status code %d from %s", resp_headers.status, uri)
except Exception, e:
import sys, traceback
type, value, tb = sys.exc_info()
log.error('Error processing %s', uri)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
for index in threads.keys():
if not threads[index].isAlive():
del threads[index]
log.info("Finished threaded part of processing.")
# Process non-HTTP uris if we are threading, otherwise process *all* uris here.
unthreaded_work_queue = [uri for uri in config.subscriptions() if not int(config.spider_threads()) or not _is_http_uri(uri)]
for feed in unthreaded_work_queue:
try:
spiderFeed(feed, only_if_new=only_if_new)
except Exception,e:
......@@ -341,3 +444,6 @@ def spiderPlanet(only_if_new = False):
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
[Planet]
name = test planet
cache_directory = tests/work/spider/cache
spider_threads = 2
# for testing purposes, must equal port number below
test_port = 8098
[http://127.0.0.1:8098/tests/data/spider/testfeed0.atom]
name = not found
[http://127.0.0.1:8098/tests/data/spider/testfeed1b.atom]
name = one
[http://127.0.0.1:8098/tests/data/spider/testfeed2.atom]
name = two
[http://127.0.0.1:8098/tests/data/spider/testfeed3.rss]
name = three
#!/usr/bin/env python
import unittest, os, glob, calendar, shutil
import unittest, os, glob, calendar, shutil, time
from planet.spider import filename, spiderFeed, spiderPlanet
from planet import feedparser, config
import planet
......@@ -43,9 +43,7 @@ class SpiderTest(unittest.TestCase):
self.assertEqual(os.path.join('.', 'xn--8ws00zhy3a.com'),
filename('.', u'http://www.\u8a79\u59c6\u65af.com/'))
def test_spiderFeed(self):
config.load(configfile)
spiderFeed(testfeed % '1b')
def verify_spiderFeed(self):
files = glob.glob(workdir+"/*")
files.sort()
......@@ -64,13 +62,18 @@ class SpiderTest(unittest.TestCase):
self.assertEqual(os.stat(files[2]).st_mtime,
calendar.timegm(data.entries[0].updated_parsed))
def test_spiderFeed(self):
config.load(configfile)
spiderFeed(testfeed % '1b')
self.verify_spiderFeed()
def test_spiderUpdate(self):
config.load(configfile)
spiderFeed(testfeed % '1a')
self.test_spiderFeed()
spiderFeed(testfeed % '1b')
self.verify_spiderFeed()
def test_spiderPlanet(self):
config.load(configfile)
spiderPlanet()
def verify_spiderPlanet(self):
files = glob.glob(workdir+"/*")
# verify that exactly eight files + 1 source dir were produced
......@@ -88,3 +91,48 @@ class SpiderTest(unittest.TestCase):
for link in data.entries[0].source.links if link.rel=='self'])
self.assertEqual('three', data.entries[0].source.author_detail.name)
def test_spiderPlanet(self):
config.load(configfile)
spiderPlanet()
self.verify_spiderPlanet()
def test_spiderThreads(self):
config.load(configfile.replace('config','threaded'))
_PORT = config.parser.getint('Planet','test_port')
log = []
from SimpleHTTPServer import SimpleHTTPRequestHandler
class TestRequestHandler(SimpleHTTPRequestHandler):
def log_message(self, format, *args):
log.append(args)
from threading import Thread
class TestServerThread(Thread):
def __init__(self):
self.ready = 0
self.done = 0
Thread.__init__(self)
def run(self):
from BaseHTTPServer import HTTPServer
httpd = HTTPServer(('',_PORT), TestRequestHandler)
self.ready = 1
while not self.done:
httpd.handle_request()
httpd = TestServerThread()
httpd.start()
while not httpd.ready:
time.sleep(0.1)
try:
spiderPlanet()
finally:
httpd.done = 1
import urllib
urllib.urlopen('http://127.0.0.1:%d/' % _PORT).read()
status = [int(rec[1]) for rec in log if str(rec[0]).startswith('GET ')]
status.sort()
self.assertEqual([200,200,200,200,404], status)
self.verify_spiderPlanet()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment