Skip to content
Snippets Groups Projects
Commit 3198e381 authored by Cy8aer's avatar Cy8aer
Browse files

Merge branch 'thigg/podqast-discover-fixes'

parents 3d0dceb0 ae443ee6
No related branches found
No related tags found
3 merge requests!73Discover fixes,!72migration for lost episodes,!71Async logs
Pipeline #315470818 passed
Showing
with 18 additions and 3152 deletions
......@@ -18,11 +18,14 @@ DISTFILES += qml/podqast.qml \
qml/components/EpisodeContextMenu.qml \
qml/components/FyydDe.py \
qml/components/FyydDePython.qml \
qml/components/LogHandler.qml \
qml/components/MigrationHandler.qml \
qml/components/timeutil.js \
qml/cover/CoverPage.qml \
qml/pages/DataMigration.qml \
qml/pages/DiscoverFyyd.qml \
qml/pages/Log.qml \
qml/pages/SubscribePodcast.qml \
rpm/podqast.changes.in \
rpm/podqast.changes.run.in \
rpm/podqast.spec \
......@@ -32,22 +35,6 @@ DISTFILES += qml/podqast.qml \
qml/params.yml \
qml/pages/GpodderNetPython.qml \
images/freak-show.jpg \
python/mygpoclient/__init__.py \
python/mygpoclient/api.py \
python/mygpoclient/api_test.py \
python/mygpoclient/feeds.py \
python/mygpoclient/http.py \
python/mygpoclient/http_test.py \
python/mygpoclient/json.py \
python/mygpoclient/json_test.py \
python/mygpoclient/locator.py \
python/mygpoclient/locator_test.py \
python/mygpoclient/public.py \
python/mygpoclient/public_test.py \
python/mygpoclient/simple.py \
python/mygpoclient/simple_test.py \
python/mygpoclient/testing.py \
python/mygpoclient/util.py \
python/feedparser/ \
python/peewee.py \
python/playhouse/ \
......@@ -57,7 +44,6 @@ DISTFILES += qml/podqast.qml \
qml/podqast.qml \
qml/pages/Podsearchorg.qml \
qml/pages/Podsearch.qml \
qml/pages/Podcast.qml \
qml/pages/FeedParserPython.qml \
qml/pages/Poddescription.qml \
qml/pages/PodcastListItem.qml \
......
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""gpodder.net API Client Library
This is mygpoclient, the gpodder.net API Client Library for Python.
"""
__author__ = 'Thomas Perl <thp@gpodder.org>'
__version__ = '1.8'
__website__ = 'http://gpodder.org/mygpoclient/'
__license__ = 'GNU General Public License v3 or later'
# Default settings for the API client (server root url and API version)
ROOT_URL = 'http://gpodder.net'
VERSION = 2
TOPLIST_DEFAULT = 50
# You can overwrite this value from your application if you want
user_agent = 'mygpoclient/%s (+%s)' % (__version__, __website__)
# Version checking
def require_version(minimum_required):
"""Require a minimum version of the library
Returns True if the minimum library version constraint is
satisfied, False otherwise. Use this to check for newer API
methods and changes in the public API as described in NEWS.
>>> require_version('1.0')
True
>>> require_version('1.2')
True
>>> require_version(__version__)
True
>>> require_version('99.99')
False
"""
this_version = [int(x) for x in __version__.split('.')]
minimum_required = [int(x) for x in minimum_required.split('.')]
return minimum_required <= this_version
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
try:
# Python 2
str = unicode
except:
# Python 3
pass
import mygpoclient
from mygpoclient import util
from mygpoclient import simple
from mygpoclient import public
# Additional error types for the advanced API client
class InvalidResponse(Exception): pass
class UpdateResult(object):
"""Container for subscription update results
Attributes:
update_urls - A list of (old_url, new_url) tuples
since - A timestamp value for use in future requests
"""
def __init__(self, update_urls, since):
self.update_urls = update_urls
self.since = since
class SubscriptionChanges(object):
"""Container for subscription changes
Attributes:
add - A list of URLs that have been added
remove - A list of URLs that have been removed
since - A timestamp value for use in future requests
"""
def __init__(self, add, remove, since):
self.add = add
self.remove = remove
self.since = since
class EpisodeActionChanges(object):
"""Container for added episode actions
Attributes:
actions - A list of EpisodeAction objects
since - A timestamp value for use in future requests
"""
def __init__(self, actions, since):
self.actions = actions
self.since = since
class PodcastDevice(object):
"""This class encapsulates a podcast device
Attributes:
device_id - The ID used to refer to this device
caption - A user-defined "name" for this device
type - A valid type of podcast device (see VALID_TYPES)
subscriptions - The number of podcasts this device is subscribed to
"""
VALID_TYPES = ('desktop', 'laptop', 'mobile', 'server', 'other')
def __init__(self, device_id, caption, type, subscriptions):
# Check if the device type is valid
if type not in self.VALID_TYPES:
raise ValueError('Invalid device type "%s" (see VALID_TYPES)' % type)
# Check if subsciptions is a numeric value
try:
int(subscriptions)
except:
raise ValueError('Subscription must be a numeric value but was %s' % subscriptions)
self.device_id = device_id
self.caption = caption
self.type = type
self.subscriptions = int(subscriptions)
def __str__(self):
"""String representation of this device
>>> device = PodcastDevice('mygpo', 'My Device', 'mobile', 10)
>>> print(device)
PodcastDevice('mygpo', 'My Device', 'mobile', 10)
"""
return '%s(%r, %r, %r, %r)' % (self.__class__.__name__,
self.device_id, self.caption, self.type, self.subscriptions)
@classmethod
def from_dictionary(cls, d):
return cls(d['id'], d['caption'], d['type'], d['subscriptions'])
class EpisodeAction(object):
"""This class encapsulates an episode action
The mandatory attributes are:
podcast - The feed URL of the podcast
episode - The enclosure URL or GUID of the episode
action - One of 'download', 'play', 'delete' or 'new'
The optional attributes are:
device - The device_id on which the action has taken place
timestamp - When the action took place (in XML time format)
started - The start time of a play event in seconds
position - The current position of a play event in seconds
total - The total time of the episode (for play events)
The attribute "position" is only valid for "play" action types.
"""
VALID_ACTIONS = ('download', 'play', 'delete', 'new', 'flattr')
def __init__(self, podcast, episode, action,
device=None, timestamp=None,
started=None, position=None, total=None):
# Check if the action is valid
if action not in self.VALID_ACTIONS:
raise ValueError('Invalid action type "%s" (see VALID_ACTIONS)' % action)
# Disallow play-only attributes for non-play actions
if action != 'play':
if started is not None:
raise ValueError('Started can only be set for the "play" action')
elif position is not None:
raise ValueError('Position can only be set for the "play" action')
elif total is not None:
raise ValueError('Total can only be set for the "play" action')
# Check the format of the timestamp value
if timestamp is not None:
if util.iso8601_to_datetime(timestamp) is None:
raise ValueError('Timestamp has to be in ISO 8601 format but was %s' % timestamp)
# Check if we have a "position" value if we have started or total
if position is None and (started is not None or total is not None):
raise ValueError('Started or total set, but no position given')
# Check that "started" is a number if it's set
if started is not None:
try:
started = int(started)
except ValueError:
raise ValueError('Started must be an integer value (seconds) but was %s' % started)
# Check that "position" is a number if it's set
if position is not None:
try:
position = int(position)
except ValueError:
raise ValueError('Position must be an integer value (seconds) but was %s' % position)
# Check that "total" is a number if it's set
if total is not None:
try:
total = int(total)
except ValueError:
raise ValueError('Total must be an integer value (seconds) but was %s' % total)
self.podcast = podcast
self.episode = episode
self.action = action
self.device = device
self.timestamp = timestamp
self.started = started
self.position = position
self.total = total
@classmethod
def from_dictionary(cls, d):
return cls(d['podcast'], d['episode'], d['action'],
d.get('device'), d.get('timestamp'),
d.get('started'), d.get('position'), d.get('total'))
def to_dictionary(self):
d = {}
for mandatory in ('podcast', 'episode', 'action'):
value = getattr(self, mandatory)
d[mandatory] = value
for optional in ('device', 'timestamp',
'started', 'position', 'total'):
value = getattr(self, optional)
if value is not None:
d[optional] = value
return d
class MygPodderClient(simple.SimpleClient):
"""gpodder.net API Client
This is the API client that implements both the Simple and
Advanced API of gpodder.net. See the SimpleClient class
for a smaller class that only implements the Simple API.
"""
@simple.needs_credentials
def get_subscriptions(self, device):
# Overloaded to accept PodcastDevice objects as arguments
device = getattr(device, 'device_id', device)
return simple.SimpleClient.get_subscriptions(self, device)
@simple.needs_credentials
def put_subscriptions(self, device, urls):
# Overloaded to accept PodcastDevice objects as arguments
device = getattr(device, 'device_id', device)
return simple.SimpleClient.put_subscriptions(self, device, urls)
@simple.needs_credentials
def update_subscriptions(self, device_id, add_urls=[], remove_urls=[]):
"""Update the subscription list for a given device.
Returns a UpdateResult object that contains a list of (sanitized)
URLs and a "since" value that can be used for future calls to
pull_subscriptions.
For every (old_url, new_url) tuple in the updated_urls list of
the resulting object, the client should rewrite the URL in its
subscription list so that new_url is used instead of old_url.
"""
uri = self._locator.add_remove_subscriptions_uri(device_id)
if not all(isinstance(x, str) for x in add_urls):
raise ValueError('add_urls must be a list of strings but was %s' % add_urls)
if not all(isinstance(x, str) for x in remove_urls):
raise ValueError('remove_urls must be a list of strings but was %s' % remove_urls)
data = {'add': add_urls, 'remove': remove_urls}
response = self._client.POST(uri, data)
if response is None:
raise InvalidResponse('Got empty response')
if 'timestamp' not in response:
raise InvalidResponse('Response does not contain timestamp')
try:
since = int(response['timestamp'])
except ValueError:
raise InvalidResponse('Invalid value %s for timestamp in response' % response['timestamp'])
if 'update_urls' not in response:
raise InvalidResponse('Response does not contain update_urls')
try:
update_urls = [(a, b) for a, b in response['update_urls']]
except:
raise InvalidResponse('Invalid format of update_urls in response: %s' % response['update_urls'])
if not all(isinstance(a, str) and isinstance(b, str) \
for a, b in update_urls):
raise InvalidResponse('Invalid format of update_urls in response: %s' % update_urls)
return UpdateResult(update_urls, since)
@simple.needs_credentials
def pull_subscriptions(self, device_id, since=None):
"""Downloads subscriptions since the time of the last update
The "since" parameter should be a timestamp that has been
retrieved previously by a call to update_subscriptions or
pull_subscriptions.
Returns a SubscriptionChanges object with two lists (one for
added and one for removed podcast URLs) and a "since" value
that can be used for future calls to this method.
"""
uri = self._locator.subscription_updates_uri(device_id, since)
data = self._client.GET(uri)
if data is None:
raise InvalidResponse('Got empty response')
if 'add' not in data:
raise InvalidResponse('List of added podcasts not in response')
if 'remove' not in data:
raise InvalidResponse('List of removed podcasts not in response')
if 'timestamp' not in data:
raise InvalidResponse('Timestamp missing from response')
if not all(isinstance(x, str) for x in data['add']):
raise InvalidResponse('Invalid value(s) in list of added podcasts: %s' % data['add'])
if not all(isinstance(x, str) for x in data['remove']):
raise InvalidResponse('Invalid value(s) in list of removed podcasts: %s' % data['remove'])
try:
since = int(data['timestamp'])
except ValueError:
raise InvalidResponse('Timestamp has invalid format in response: %s' % data['timestamp'])
return SubscriptionChanges(data['add'], data['remove'], since)
@simple.needs_credentials
def upload_episode_actions(self, actions=[]):
"""Uploads a list of EpisodeAction objects to the server
Returns the timestamp that can be used for retrieving changes.
"""
uri = self._locator.upload_episode_actions_uri()
actions = [action.to_dictionary() for action in actions]
response = self._client.POST(uri, actions)
if response is None:
raise InvalidResponse('Got empty response')
if 'timestamp' not in response:
raise InvalidResponse('Response does not contain timestamp')
try:
since = int(response['timestamp'])
except ValueError:
raise InvalidResponse('Invalid value %s for timestamp in response' % response['timestamp'])
return since
@simple.needs_credentials
def download_episode_actions(self, since=None,
podcast=None, device_id=None):
"""Downloads a list of EpisodeAction objects from the server
Returns a EpisodeActionChanges object with the list of
new actions and a "since" timestamp that can be used for
future calls to this method when retrieving episodes.
"""
uri = self._locator.download_episode_actions_uri(since,
podcast, device_id)
data = self._client.GET(uri)
if data is None:
raise InvalidResponse('Got empty response')
if 'actions' not in data:
raise InvalidResponse('Response does not contain actions')
if 'timestamp' not in data:
raise InvalidResponse('Response does not contain timestamp')
try:
since = int(data['timestamp'])
except ValueError:
raise InvalidResponse('Invalid value for timestamp: ' +
data['timestamp'])
dicts = data['actions']
try:
actions = [EpisodeAction.from_dictionary(d) for d in dicts]
except KeyError:
raise InvalidResponse('Missing keys in action list response')
return EpisodeActionChanges(actions, since)
@simple.needs_credentials
def update_device_settings(self, device_id, caption=None, type=None):
"""Update the description of a device on the server
This changes the caption and/or type of a given device
on the server. If the device does not exist, it is
created with the given settings.
The parameters caption and type are both optional and
when set to a value other than None will be used to
update the device settings.
Returns True if the request succeeded, False otherwise.
"""
uri = self._locator.device_settings_uri(device_id)
data = {}
if caption is not None:
data['caption'] = caption
if type is not None:
data['type'] = type
return (self._client.POST(uri, data) is None)
@simple.needs_credentials
def get_devices(self):
"""Returns a list of this user's PodcastDevice objects
The resulting list can be used to display a selection
list to the user or to determine device IDs to pull
the subscription list from.
"""
uri = self._locator.device_list_uri()
dicts = self._client.GET(uri)
if dicts is None:
raise InvalidResponse('No response received')
try:
return [PodcastDevice.from_dictionary(d) for d in dicts]
except KeyError:
raise InvalidResponse('Missing keys in device list response')
def get_favorite_episodes(self):
"""Returns a List of Episode Objects containing the Users
favorite Episodes"""
uri = self._locator.favorite_episodes_uri()
return [public.Episode.from_dict(d) for d in self._client.GET(uri)]
def get_settings(self, type, scope_param1=None, scope_param2=None):
"""Returns a Dictionary with the set settings for the type & specified scope"""
uri = self._locator.settings_uri(type, scope_param1, scope_param2)
return self._client.GET(uri)
def set_settings(self, type, scope_param1, scope_param2, set={}, remove=[]):
"""Returns a Dictionary with the set settings for the type & specified scope"""
uri = self._locator.settings_uri(type, scope_param1, scope_param2)
data = {}
data["set"] = set
data["remove"] = remove
return self._client.POST(uri, data)
This diff is collapsed.
# -*- coding: utf-8 -*-
# mygpo-feedservice Client
# Copyright (C) 2011 Stefan Kögl
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
import time
try:
# Python 3
from urllib.parse import urljoin, urlencode
except ImportError:
# Python 2
from urlparse import urljoin
from urllib import urlencode
from datetime import datetime
from email import utils
import mygpoclient.json
try:
# Prefer the usage of the simplejson module, as it
# is most likely newer if it's installed as module
# than the built-in json module (and is mandatory
# in Python versions before 2.6, anyway).
import simplejson as json
except ImportError:
# Python 2.6 ships the "json" module by default
import json
BASE_URL='http://mygpo-feedservice.appspot.com'
class FeedServiceResponse(list):
"""
Encapsulates the relevant data of a mygpo-feedservice response
"""
def __init__(self, feeds, last_modified, feed_urls):
super(FeedServiceResponse, self).__init__(feeds)
self.last_modified = last_modified
self.feed_urls = feed_urls
self.indexed_feeds = {}
for feed in feeds:
for url in feed['urls']:
self.indexed_feeds[url] = feed
def get_feeds(self):
"""
Returns the parsed feeds in order of the initial request
"""
return (self.get_feed(url) for url in self.feed_urls)
def get_feed(self, url):
"""
Returns the parsed feed for the given URL
"""
return self.indexed_feeds.get(url, None)
class FeedserviceClient(mygpoclient.json.JsonClient):
"""A special-cased JsonClient for mygpo-feedservice"""
def __init__(self, username=None, password=None, base_url=BASE_URL):
self._base_url = base_url
super(FeedserviceClient, self).__init__(username, password)
def _prepare_request(self, method, uri, data):
"""Sets headers required by mygpo-feedservice
Expects a dict with keys feed_urls and (optionally) last_modified"""
# send URLs as POST data to avoid any length
# restrictions for the query parameters
post_data = [('url', feed_url) for feed_url in data['feed_urls']]
post_data = urlencode(post_data)
# call _prepare_request directly from HttpClient, because
# JsonClient would JSON-encode our POST-data
request = mygpoclient.http.HttpClient._prepare_request(method, uri, post_data)
request.add_header('Accept', 'application/json')
request.add_header('Accept-Encoding', 'gzip')
last_modified = data.get('last_modified', None)
if last_modified is not None:
request.add_header('If-Modified-Since', self.format_header_date(last_modified))
return request
def _process_response(self, response):
""" Extract Last-modified header and passes response body
to JsonClient for decoding"""
last_modified = self.parse_header_date(response.headers['last-modified'])
feeds = super(FeedserviceClient, self)._process_response(response)
return feeds, last_modified
def parse_feeds(self, feed_urls, last_modified=None, strip_html=False,
use_cache=True, inline_logo=False, scale_logo=None,
logo_format=None):
"""
Passes the given feed-urls to mygpo-feedservice to be parsed
and returns the response
"""
url = self.build_url(strip_html=strip_html, use_cache=use_cache,
inline_logo=inline_logo, scale_logo=scale_logo,
logo_format=logo_format)
request_data = dict(feed_urls=feed_urls, last_modified=last_modified)
feeds, last_modified = self.POST(url, request_data)
return FeedServiceResponse(feeds, last_modified, feed_urls)
def build_url(self, **kwargs):
"""
Parameter such as strip_html, scale_logo, etc are pased as kwargs
"""
query_url = urljoin(self._base_url, 'parse')
args = list(kwargs.items())
args = [k_v for k_v in args if k_v[1] is not None]
# boolean flags are represented as 1 and 0 in the query-string
args = [(k_v1[0], int(k_v1[1]) if isinstance(k_v1[1], bool) else k_v1[1]) for k_v1 in args]
args = urlencode(dict(args))
url = '%s?%s' % (query_url, args)
return url
@staticmethod
def parse_header_date(date_str):
"""
Parses dates in RFC2822 format to datetime objects
"""
if not date_str:
return None
ts = time.mktime(utils.parsedate(date_str))
return datetime.utcfromtimestamp(ts)
@staticmethod
def format_header_date(datetime_obj):
"""
Formats the given datetime object for use in HTTP headers
"""
return utils.formatdate(time.mktime(datetime_obj.timetuple()))
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
try:
from urllib import request
from urllib.error import HTTPError
from http.cookiejar import CookieJar
except ImportError:
import urllib2 as request
from urllib2 import HTTPError
from cookielib import CookieJar
import mygpoclient
class SimpleHttpPasswordManager(request.HTTPPasswordMgr):
"""Simplified password manager for urllib2
This class always provides the username/password combination that
is passed to it as constructor argument, independent of the realm
or authuri that is used.
"""
# The maximum number of authentication retries
MAX_RETRIES = 3
def __init__(self, username, password):
self._username = username
self._password = password
self._count = 0
def find_user_password(self, realm, authuri):
self._count += 1
if self._count > self.MAX_RETRIES:
return (None, None)
return (self._username, self._password)
class HttpRequest(request.Request):
"""Request object with customizable method
The default behaviour of Request is unchanged:
>>> request = HttpRequest('http://example.org/')
>>> request.get_method()
'GET'
>>> request = HttpRequest('http://example.org/', data='X')
>>> request.get_method()
'POST'
However, it's possible to customize the method name:
>>> request = HttpRequest('http://example.org/', data='X')
>>> request.set_method('PUT')
>>> request.get_method()
'PUT'
"""
def set_method(self, method):
setattr(self, '_method', method)
def get_method(self):
if hasattr(self, '_method'):
return getattr(self, '_method')
else:
return request.Request.get_method(self)
# Possible exceptions that will be raised by HttpClient
class Unauthorized(Exception): pass
class NotFound(Exception): pass
class BadRequest(Exception): pass
class UnknownResponse(Exception): pass
class HttpClient(object):
"""A comfortable HTTP client
This class hides the gory details of the underlying HTTP protocol
from the rest of the code by providing a simple interface for doing
requests and handling authentication.
"""
def __init__(self, username=None, password=None):
self._username = username
self._password = password
self._cookie_jar = CookieJar()
cookie_handler = request.HTTPCookieProcessor(self._cookie_jar)
if username is not None and password is not None:
password_manager = SimpleHttpPasswordManager(username, password)
auth_handler = request.HTTPBasicAuthHandler(password_manager)
self._opener = request.build_opener(auth_handler, cookie_handler)
else:
self._opener = request.build_opener(cookie_handler)
@staticmethod
def _prepare_request(method, uri, data):
"""Prepares the HttpRequest object"""
if data is None:
request = HttpRequest(uri)
else:
request = HttpRequest(uri, data)
request.set_method(method)
request.add_header('User-agent', mygpoclient.user_agent)
return request
@staticmethod
def _process_response(response):
return response.read()
def _request(self, method, uri, data, **kwargs):
"""Request and exception handling
Carries out a request with a given method (GET, POST, PUT) on
a given URI with optional data (data only makes sense for POST
and PUT requests and should be None for GET requests).
"""
request = self._prepare_request(method, uri, data)
try:
response = self._opener.open(request)
except HTTPError as http_error:
if http_error.code == 404:
raise NotFound()
elif http_error.code == 401:
raise Unauthorized()
elif http_error.code == 400:
raise BadRequest()
else:
raise UnknownResponse(http_error.code)
return self._process_response(response)
def GET(self, uri):
"""Convenience method for carrying out a GET request"""
return self._request('GET', uri, None)
def POST(self, uri, data):
"""Convenience method for carrying out a POST request"""
return self._request('POST', uri, data)
def PUT(self, uri, data):
"""Convenience method for carrying out a PUT request"""
return self._request('PUT', uri, data)
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import codecs
import base64
from mygpoclient.http import (HttpClient, Unauthorized, BadRequest,
UnknownResponse, NotFound)
import unittest
import multiprocessing
try:
# Python 3
from http.server import BaseHTTPRequestHandler, HTTPServer
except ImportError:
# Python 2
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
str = unicode
def http_server(port, username, password, response):
storage = {}
class Handler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def _checks(self):
if not self._check_auth():
return False
elif not self._check_errors():
return False
else:
return True
def _check_auth(self):
if self.path.startswith('/auth'):
authorization = self.headers.get('authorization', None)
if authorization is not None:
auth_type, credentials = authorization.split(None, 1)
if auth_type.lower() == 'basic':
credentials = base64.b64decode(credentials.encode('utf-8'))
auth_user, auth_pass = credentials.decode('utf-8').split(':', 1)
if username == auth_user and password == auth_pass:
return True
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm="Fake HTTP Server"')
self.end_headers()
return False
return True
def _check_errors(self):
if self.path.startswith('/badrequest'):
self.send_response(400)
self.end_headers()
return False
elif self.path.startswith('/notfound'):
self.send_response(404)
self.end_headers()
return False
elif self.path.startswith('/invaliderror'):
self.send_response(444)
self.end_headers()
return False
return True
def do_POST(self):
if not self._checks():
return
input_data = self.rfile.read(int(self.headers.get('content-length')))
self.send_response(200)
self.end_headers()
self.wfile.write(codecs.encode(input_data.decode('utf-8'), 'rot-13').encode('utf-8'))
def do_PUT(self):
if not self._checks():
return
input_data = self.rfile.read(int(self.headers.get('content-length')))
storage[self.path] = input_data
self.send_response(200)
self.end_headers()
self.wfile.write(b'PUT OK')
def do_GET(self):
if not self._checks():
return
self.send_response(200)
self.end_headers()
if self.path in storage:
self.wfile.write(storage[self.path])
else:
self.wfile.write(response)
def log_request(*args):
pass
HTTPServer(('127.0.0.1', port), Handler).serve_forever()
class Test_HttpClient(unittest.TestCase):
USERNAME = 'john'
PASSWORD = 'secret'
PORT = 9876
URI_BASE = 'http://localhost:%(PORT)d' % locals()
RESPONSE = b'Test_GET-HTTP-Response-Content'
DUMMYDATA = b'fq28cnyp3ya8ltcy;ny2t8ay;iweuycowtc'
def setUp(self):
self.server_process = multiprocessing.Process(target=http_server,
args=(self.PORT, self.USERNAME, self.PASSWORD, self.RESPONSE))
self.server_process.start()
import time
time.sleep(.1)
def tearDown(self):
self.server_process.terminate()
import time
time.sleep(.1)
def test_UnknownResponse(self):
client = HttpClient()
path = self.URI_BASE+'/invaliderror'
self.assertRaises(UnknownResponse, client.GET, path)
def test_NotFound(self):
client = HttpClient()
path = self.URI_BASE+'/notfound'
self.assertRaises(NotFound, client.GET, path)
def test_Unauthorized(self):
client = HttpClient('invalid-username', 'invalid-password')
path = self.URI_BASE+'/auth'
self.assertRaises(Unauthorized, client.GET, path)
def test_BadRequest(self):
client = HttpClient()
path = self.URI_BASE+'/badrequest'
self.assertRaises(BadRequest, client.GET, path)
def test_GET(self):
client = HttpClient()
path = self.URI_BASE+'/noauth'
self.assertEquals(client.GET(path), self.RESPONSE)
def test_authenticated_GET(self):
client = HttpClient(self.USERNAME, self.PASSWORD)
path = self.URI_BASE+'/auth'
self.assertEquals(client.GET(path), self.RESPONSE)
def test_unauthenticated_GET(self):
client = HttpClient()
path = self.URI_BASE+'/auth'
self.assertRaises(Unauthorized, client.GET, path)
def test_POST(self):
client = HttpClient()
path = self.URI_BASE+'/noauth'
self.assertEquals(client.POST(path, self.DUMMYDATA), codecs.encode(self.DUMMYDATA.decode('utf-8'), 'rot-13').encode('utf-8'))
def test_authenticated_POST(self):
client = HttpClient(self.USERNAME, self.PASSWORD)
path = self.URI_BASE+'/auth'
self.assertEquals(client.POST(path, self.DUMMYDATA), codecs.encode(self.DUMMYDATA.decode('utf-8'), 'rot-13').encode('utf-8'))
def test_unauthenticated_POST(self):
client = HttpClient()
path = self.URI_BASE+'/auth'
self.assertRaises(Unauthorized, client.POST, path, self.DUMMYDATA)
def test_PUT(self):
client = HttpClient()
path = self.URI_BASE+'/noauth'
self.assertEquals(client.PUT(path, self.DUMMYDATA), b'PUT OK')
def test_GET_after_PUT(self):
client = HttpClient()
for i in range(10):
path = self.URI_BASE + '/file.%(i)d.txt' % locals()
client.PUT(path, self.RESPONSE + str(i).encode('utf-8'))
self.assertEquals(client.GET(path), self.RESPONSE + str(i).encode('utf-8'))
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Fix gPodder bug 900 (so "import json" doesn't import this module)
from __future__ import absolute_import
import mygpoclient
try:
# Python 3
bytes = bytes
except:
# Python 2
bytes = str
import json
from mygpoclient import http
# Additional exceptions for JSON-related errors
class JsonException(Exception): pass
class JsonClient(http.HttpClient):
"""A HttpClient with built-in JSON support
This client will automatically marshal and unmarshal data for
JSON-related web services so that code using this class will
not need to care about (de-)serialization of data structures.
"""
def __init__(self, username=None, password=None):
http.HttpClient.__init__(self, username, password)
@staticmethod
def encode(data):
"""Encodes a object into its JSON string repesentation
>>> JsonClient.encode(None) is None
True
>>> JsonClient.encode([1,2,3]) == b'[1, 2, 3]'
True
>>> JsonClient.encode(42) == b'42'
True
"""
if data is None:
return None
else:
return json.dumps(data).encode('utf-8')
@staticmethod
def decode(data):
"""Decodes a response string to a Python object
>>> JsonClient.decode(b'')
>>> JsonClient.decode(b'[1,2,3]')
[1, 2, 3]
>>> JsonClient.decode(b'42')
42
"""
if data == b'':
return None
data = data.decode('utf-8')
try:
return json.loads(data)
except ValueError:
raise JsonException('Value error while parsing response: ' + data)
@staticmethod
def _prepare_request(method, uri, data):
data = JsonClient.encode(data)
return http.HttpClient._prepare_request(method, uri, data)
@staticmethod
def _process_response(response):
data = http.HttpClient._process_response(response)
return JsonClient.decode(data)
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
try:
# Python 3
from io import BytesIO
from urllib import request
except ImportError:
# Python 2
from StringIO import StringIO as BytesIO
import urllib2 as request
from mygpoclient import http
from mygpoclient import json
import unittest
import minimock
def fqname(o):
return o.__module__ + "." + o.__name__
class Test_JsonClient(unittest.TestCase):
PORT = 9876
URI_BASE = 'http://localhost:%(PORT)d' % locals()
USERNAME = 'john'
PASSWORD = 'secret'
@classmethod
def setUpClass(cls):
cls.odName = fqname(request.OpenerDirector)
cls.boName = fqname(request.build_opener)
def setUp(self):
self.mockopener = minimock.Mock(self.odName)
request.build_opener = minimock.Mock(self.boName)
request.build_opener.mock_returns = self.mockopener
def tearDown(self):
minimock.restore()
def mock_setHttpResponse(self, value):
self.mockopener.open.mock_returns = BytesIO(value)
def test_parseResponse_worksWithDictionary(self):
client = json.JsonClient(self.USERNAME, self.PASSWORD)
self.mock_setHttpResponse(b'{"a": "B", "c": "D"}')
items = list(sorted(client.GET(self.URI_BASE + '/').items()))
self.assertEquals(items, [('a', 'B'), ('c', 'D')])
def test_parseResponse_worksWithIntegerList(self):
client = json.JsonClient(self.USERNAME, self.PASSWORD)
self.mock_setHttpResponse(b'[1,2,3,6,7]')
self.assertEquals(client.GET(self.URI_BASE + '/'), [1,2,3,6,7])
def test_parseResponse_emptyString_returnsNone(self):
client = json.JsonClient(self.USERNAME, self.PASSWORD)
self.mock_setHttpResponse(b'')
self.assertEquals(client.GET(self.URI_BASE + '/'), None)
def test_invalidContent_raisesJsonException(self):
client = json.JsonClient(self.USERNAME, self.PASSWORD)
self.mock_setHttpResponse(b'this is not a valid json string')
self.assertRaises(json.JsonException, client.GET, self.URI_BASE + '/')
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import mygpoclient
import os
try:
# Python 3
from urllib.parse import quote_plus, quote
except ImportError:
# Python 2
from urllib import quote_plus, quote
from mygpoclient import util
class Locator(object):
"""URI Locator for API endpoints
This helper class abstracts the URIs for the gpodder.net
webservice and provides a nice facility for generating API
URIs and checking parameters.
"""
SIMPLE_FORMATS = ('opml', 'json', 'txt')
SETTINGS_TYPES = ('account', 'device', 'podcast', 'episode')
def __init__(self, username, root_url=mygpoclient.ROOT_URL,
version=mygpoclient.VERSION):
self._username = username
if root_url.endswith('/'):
root_url = root_url[:-1]
if root_url.startswith('http'):
self._simple_base = root_url
self._base = '%(root_url)s/api/%(version)s' % locals()
else:
self._simple_base = 'http://%(root_url)s' % locals()
self._base = 'http://%(root_url)s/api/%(version)s' % locals()
def _convert_since(self, since):
"""Convert "since" into a numeric value
This is internally used for value-checking.
"""
try:
return int(since)
except ValueError:
raise ValueError('since must be a numeric value (or None)')
def subscriptions_uri(self, device_id=None, format='opml'):
"""Get the Simple API URI for a subscription list
>>> locator = Locator('john')
>>> locator.subscriptions_uri('n800')
'http://gpodder.net/subscriptions/john/n800.opml'
>>> locator.subscriptions_uri('ipod', 'txt')
'http://gpodder.net/subscriptions/john/ipod.txt'
"""
if format not in self.SIMPLE_FORMATS:
raise ValueError('Unsupported file format')
username = self._username
if device_id is None:
path = '%(username)s.%(format)s' % locals()
else:
path = '%(username)s/%(device_id)s.%(format)s' % locals()
return util.join(self._simple_base,
'subscriptions', path)
def toplist_uri(self, count=50, format='opml'):
"""Get the Simple API URI for the toplist
>>> locator = Locator(None)
>>> locator.toplist_uri()
'http://gpodder.net/toplist/50.opml'
>>> locator.toplist_uri(70)
'http://gpodder.net/toplist/70.opml'
>>> locator.toplist_uri(10, 'json')
'http://gpodder.net/toplist/10.json'
"""
if format not in self.SIMPLE_FORMATS:
raise ValueError('Unsupported file format')
filename = 'toplist/%(count)d.%(format)s' % locals()
return util.join(self._simple_base, filename)
def suggestions_uri(self, count=10, format='opml'):
"""Get the Simple API URI for user suggestions
>>> locator = Locator('john')
>>> locator.suggestions_uri()
'http://gpodder.net/suggestions/10.opml'
>>> locator.suggestions_uri(50)
'http://gpodder.net/suggestions/50.opml'
>>> locator.suggestions_uri(70, 'json')
'http://gpodder.net/suggestions/70.json'
"""
if format not in self.SIMPLE_FORMATS:
raise ValueError('Unsupported file format')
filename = 'suggestions/%(count)d.%(format)s' % locals()
return util.join(self._simple_base, filename)
def search_uri(self, query, format='opml'):
"""Get the Simple API URI for podcast search
>>> locator = Locator(None)
>>> locator.search_uri('outlaws')
'http://gpodder.net/search.opml?q=outlaws'
>>> locator.search_uri(':something?', 'txt')
'http://gpodder.net/search.txt?q=%3Asomething%3F'
>>> locator.search_uri('software engineering', 'json')
'http://gpodder.net/search.json?q=software+engineering'
"""
if format not in self.SIMPLE_FORMATS:
raise ValueError('Unsupported file format')
query = quote_plus(query)
filename = 'search.%(format)s?q=%(query)s' % locals()
return util.join(self._simple_base, filename)
def add_remove_subscriptions_uri(self, device_id):
"""Get the Advanced API URI for uploading list diffs
>>> locator = Locator('bill')
>>> locator.add_remove_subscriptions_uri('n810')
'http://gpodder.net/api/2/subscriptions/bill/n810.json'
"""
filename = '%(device_id)s.json' % locals()
return util.join(self._base,
'subscriptions', self._username, filename)
def subscription_updates_uri(self, device_id, since=None):
"""Get the Advanced API URI for downloading list diffs
The parameter "since" is optional and should be a numeric
value (otherwise a ValueError is raised).
>>> locator = Locator('jen')
>>> locator.subscription_updates_uri('n900')
'http://gpodder.net/api/2/subscriptions/jen/n900.json'
>>> locator.subscription_updates_uri('n900', 1234)
'http://gpodder.net/api/2/subscriptions/jen/n900.json?since=1234'
"""
filename = '%(device_id)s.json' % locals()
if since is not None:
since = self._convert_since(since)
filename += '?since=%(since)d' % locals()
return util.join(self._base,
'subscriptions', self._username, filename)
def upload_episode_actions_uri(self):
"""Get the Advanced API URI for uploading episode actions
>>> locator = Locator('thp')
>>> locator.upload_episode_actions_uri()
'http://gpodder.net/api/2/episodes/thp.json'
"""
filename = self._username + '.json'
return util.join(self._base, 'episodes', filename)
def download_episode_actions_uri(self, since=None,
podcast=None, device_id=None):
"""Get the Advanced API URI for downloading episode actions
The parameter "since" is optional and should be a numeric
value (otherwise a ValueError is raised).
Both "podcast" and "device_id" are optional and exclusive:
"podcast" should be a podcast URL
"device_id" should be a device ID
>>> locator = Locator('steve')
>>> locator.download_episode_actions_uri()
'http://gpodder.net/api/2/episodes/steve.json'
>>> locator.download_episode_actions_uri(since=1337)
'http://gpodder.net/api/2/episodes/steve.json?since=1337'
>>> locator.download_episode_actions_uri(podcast='http://example.org/episodes.rss')
'http://gpodder.net/api/2/episodes/steve.json?podcast=http%3A//example.org/episodes.rss'
>>> locator.download_episode_actions_uri(since=2000, podcast='http://example.com/')
'http://gpodder.net/api/2/episodes/steve.json?since=2000&podcast=http%3A//example.com/'
>>> locator.download_episode_actions_uri(device_id='ipod')
'http://gpodder.net/api/2/episodes/steve.json?device=ipod'
>>> locator.download_episode_actions_uri(since=54321, device_id='ipod')
'http://gpodder.net/api/2/episodes/steve.json?since=54321&device=ipod'
"""
if podcast is not None and device_id is not None:
raise ValueError('must not specify both "podcast" and "device_id"')
filename = self._username + '.json'
params = []
if since is not None:
since = str(self._convert_since(since))
params.append(('since', since))
if podcast is not None:
params.append(('podcast', podcast))
if device_id is not None:
params.append(('device', device_id))
if params:
filename += '?' + '&'.join('%s=%s' % (key, quote(value)) for key, value in params)
return util.join(self._base, 'episodes', filename)
def device_settings_uri(self, device_id):
"""Get the Advanced API URI for setting per-device settings uploads
>>> locator = Locator('mike')
>>> locator.device_settings_uri('ipod')
'http://gpodder.net/api/2/devices/mike/ipod.json'
"""
filename = '%(device_id)s.json' % locals()
return util.join(self._base, 'devices', self._username, filename)
def device_list_uri(self):
"""Get the Advanced API URI for retrieving the device list
>>> locator = Locator('jeff')
>>> locator.device_list_uri()
'http://gpodder.net/api/2/devices/jeff.json'
"""
filename = self._username + '.json'
return util.join(self._base, 'devices', filename)
def toptags_uri(self, count=50):
"""Get the Advanced API URI for retrieving the top Tags
>>> locator = Locator(None)
>>> locator.toptags_uri()
'http://gpodder.net/api/2/tags/50.json'
>>> locator.toptags_uri(70)
'http://gpodder.net/api/2/tags/70.json'
"""
filename = '%(count)d.json' % locals()
return util.join(self._base, 'tags', filename)
def podcasts_of_a_tag_uri(self, tag, count=50):
"""Get the Advanced API URI for retrieving the top Podcasts of a Tag
>>> locator = Locator(None)
>>> locator.podcasts_of_a_tag_uri('linux')
'http://gpodder.net/api/2/tag/linux/50.json'
>>> locator.podcasts_of_a_tag_uri('linux',70)
'http://gpodder.net/api/2/tag/linux/70.json'
"""
filename = '%(tag)s/%(count)d.json' % locals()
return util.join(self._base, 'tag', filename)
def podcast_data_uri(self, podcast_url):
"""Get the Advanced API URI for retrieving Podcast Data
>>> locator = Locator(None)
>>> locator.podcast_data_uri('http://podcast.com')
'http://gpodder.net/api/2/data/podcast.json?url=http%3A//podcast.com'
"""
filename = 'podcast.json?url=%s' % quote(podcast_url)
return util.join(self._base, 'data', filename)
def episode_data_uri(self, podcast_url, episode_url):
"""Get the Advanced API URI for retrieving Episode Data
>>> locator = Locator(None)
>>> locator.episode_data_uri('http://podcast.com','http://podcast.com/foo')
'http://gpodder.net/api/2/data/episode.json?podcast=http%3A//podcast.com&url=http%3A//podcast.com/foo'
"""
filename = 'episode.json?podcast=%s&url=%s' % (quote(podcast_url), quote(episode_url))
return util.join(self._base, 'data', filename)
def favorite_episodes_uri(self):
"""Get the Advanced API URI for listing favorite episodes
>>> locator = Locator('mike')
>>> locator.favorite_episodes_uri()
'http://gpodder.net/api/2/favorites/mike.json'
"""
filename = self._username + '.json'
return util.join(self._base, 'favorites', filename)
def settings_uri(self, type, scope_param1, scope_param2):
"""Get the Advanced API URI for retrieving or saving Settings
Depending on the Type of setting scope_param2 or scope_param1 and scope_param2 are
not necessary.
>>> locator = Locator('joe')
>>> locator.settings_uri('account',None,None)
'http://gpodder.net/api/2/settings/joe/account.json'
>>> locator.settings_uri('device','foodevice',None)
'http://gpodder.net/api/2/settings/joe/device.json?device=foodevice'
>>> locator.settings_uri('podcast','http://podcast.com',None)
'http://gpodder.net/api/2/settings/joe/podcast.json?podcast=http%3A//podcast.com'
>>> locator.settings_uri('episode','http://podcast.com','http://podcast.com/foo')
'http://gpodder.net/api/2/settings/joe/episode.json?podcast=http%3A//podcast.com&episode=http%3A//podcast.com/foo'
"""
if type not in self.SETTINGS_TYPES:
raise ValueError('Unsupported Setting Type')
filename = self._username + '/%(type)s.json' % locals()
if type is 'device':
if scope_param1 is None:
raise ValueError('Devicename not specified')
filename += '?device=%(scope_param1)s' % locals()
if type is 'podcast':
if scope_param1 is None:
raise ValueError('Podcast URL not specified')
filename += '?podcast=%s' % quote(scope_param1)
if type is 'episode':
if (scope_param1 is None) or (scope_param2 is None):
raise ValueError('Podcast or Episode URL not specified')
filename += '?podcast=%s&episode=%s' % (quote(scope_param1), quote(scope_param2))
return util.join(self._base, 'settings' , filename)
def root_uri(self):
""" Get the server's root URI.
>>> locator = Locator(None)
>>> locator.root_uri()
'http://gpodder.net'
"""
return self._simple_base
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mygpoclient import locator
import unittest
class Test_Exceptions(unittest.TestCase):
def setUp(self):
self.locator = locator.Locator('jane')
def test_subscriptions_uri_exceptions(self):
"""Test if unsupported formats raise a ValueError"""
self.assertRaises(ValueError,
self.locator.subscriptions_uri, 'gpodder', 'html')
def test_toplist_uri_exceptions(self):
"""Test if unsupported formats raise a ValueError"""
self.assertRaises(ValueError,
self.locator.toplist_uri, 10, 'html')
def test_suggestions_uri_exceptions(self):
"""Test if unsupported formats raise a ValueError"""
self.assertRaises(ValueError,
self.locator.suggestions_uri, 20, 'jpeg')
def test_search_uri_exception(self):
"""Test if unsupported formats raise a ValueError"""
self.assertRaises(ValueError,
self.locator.search_uri, 30, 'mp3')
def test_subscription_updates_uri_exceptions(self):
"""Test if wrong "since" values raise a ValueError"""
self.assertRaises(ValueError,
self.locator.subscription_updates_uri, 'ipod', 'anytime')
def test_download_episode_actions_uri_exceptions(self):
"""Test if using both "podcast" and "device_id" raises a ValueError"""
self.assertRaises(ValueError,
self.locator.download_episode_actions_uri,
podcast='http://example.org/episodes.rss',
device_id='gpodder')
def test_device_settings_uri_exception(self):
"""Test if using no parameter for a device Setting raises a ValueError"""
self.assertRaises(ValueError,
self.locator.settings_uri, type='device',
scope_param1=None, scope_param2=None)
def test_podcast_settings_uri_exception(self):
"""Test if using no parameter for a podcast Setting raises a ValueError"""
self.assertRaises(ValueError,
self.locator.settings_uri, type='podcast',
scope_param1=None, scope_param2=None)
def test_episode_settings_uri_exception(self):
"""Test if only using one parameter for a episode Setting raises a ValueError"""
self.assertRaises(ValueError,
self.locator.settings_uri, type='episode',
scope_param1='http://www.podcast.com', scope_param2=None)
def test_unsupported_settings_uri_exception2(self):
"""Test if unsupported setting type raises a ValueError"""
self.assertRaises(ValueError,
self.locator.settings_uri, type='foobar',
scope_param1=None, scope_param2=None)
def test_subscriptions_uri_no_device(self):
"""Test that no device returns user subscriptions"""
self.assertEquals(self.locator.subscriptions_uri(),
'http://gpodder.net/subscriptions/jane.opml')
def test_root_uri(self):
"""Test that root_uri trivially works"""
self.assertEquals(self.locator.root_uri(),
'http://gpodder.net')
def test_create_with_url(self):
"""Test locator creation with a root URL instead of host"""
loc = locator.Locator('hello', 'https://gpo.self.hosted/my')
self.assertEquals(loc.toplist_uri(),
'https://gpo.self.hosted/my/toplist/50.opml')
def test_create_with_url_slash(self):
"""Test locator creation with a root URL ending with a slash"""
loc = locator.Locator('hello', 'https://gpo.self.hosted/my/')
self.assertEquals(loc.toplist_uri(),
'https://gpo.self.hosted/my/toplist/50.opml')
def test_create_with_host(self):
"""Test locator creation with a host"""
loc = locator.Locator('hello', 'gpo.self.hosted')
self.assertEquals(loc.toplist_uri(),
'http://gpo.self.hosted/toplist/50.opml')
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import mygpoclient
from mygpoclient import locator
from mygpoclient import json
from mygpoclient import simple
class Tag(object):
"""Container class for a tag in the top tag list
Attributes:
tag - The name of the tag
usage - Usage of the tag
"""
REQUIRED_KEYS = ('tag', 'usage')
def __init__(self, tag, usage):
self.tag = tag
self.usage = usage
@classmethod
def from_dict(cls, d):
for key in cls.REQUIRED_KEYS:
if key not in d:
raise ValueError('Missing keys for tag')
return cls(*(d.get(k) for k in cls.REQUIRED_KEYS))
def __eq__(self, other):
"""Test two tag objects for equality
>>> Tag('u', 123) == Tag('u', 123)
True
>>> Tag('u', 123) == Tag('a', 345)
False
>>> Tag('u', 123) == 'x'
False
"""
if not isinstance(other, self.__class__):
return False
return all(getattr(self, k) == getattr(other, k) \
for k in self.REQUIRED_KEYS)
class Episode(object):
"""Container Class for Episodes
Attributes:
title -
url -
podcast_title -
podcast_url -
description -
website -
released -
mygpo_link -
"""
REQUIRED_KEYS = ('title', 'url', 'podcast_title', 'podcast_url',
'description', 'website', 'released', 'mygpo_link')
def __init__(self, title, url, podcast_title, podcast_url, description, website, released, mygpo_link):
self.title = title
self.url = url
self.podcast_title = podcast_title
self.podcast_url = podcast_url
self.description = description
self.website = website
self.released = released
self.mygpo_link = mygpo_link
@classmethod
def from_dict(cls, d):
for key in cls.REQUIRED_KEYS:
if key not in d:
raise ValueError('Missing keys for episode')
return cls(*(d.get(k) for k in cls.REQUIRED_KEYS))
def __eq__(self, other):
"""Test two Episode objects for equality
>>> Episode('a','b','c','d','e','f','g','h') == Episode('a','b','c','d','e','f','g','h')
True
>>> Episode('a','b','c','d','e','f','g','h') == Episode('s','t','u','v','w','x','y','z')
False
>>> Episode('a','b','c','d','e','f','g','h') == 'x'
False
"""
if not isinstance(other, self.__class__):
return False
return all(getattr(self, k) == getattr(other, k) \
for k in self.REQUIRED_KEYS)
class PublicClient(object):
"""Client for the gpodder.net "anonymous" API
This is the API client implementation that provides a
pythonic interface to the parts of the gpodder.net
Simple API that don't need user authentication.
"""
FORMAT = 'json'
def __init__(self, root_url=mygpoclient.ROOT_URL, client_class=json.JsonClient):
"""Creates a new Public API client
The parameter root_url is optional and defaults to
the main webservice. It can be either a hostname or
a full URL (to force https, for instance).
The parameter client_class is optional and should
not need to be changed in normal use cases. If it
is changed, it should provide the same interface
as the json.JsonClient class in mygpoclient.
"""
self._locator = locator.Locator(None, root_url)
self._client = client_class(None, None)
def get_toplist(self, count=mygpoclient.TOPLIST_DEFAULT):
"""Get a list of most-subscribed podcasts
Returns a list of simple.Podcast objects.
The parameter "count" is optional and describes
the amount of podcasts that are returned. The
default value is 50, the minimum value is 1 and
the maximum value is 100.
"""
uri = self._locator.toplist_uri(count, self.FORMAT)
return [simple.Podcast.from_dict(x) for x in self._client.GET(uri)]
def search_podcasts(self, query):
"""Search for podcasts on the webservice
Returns a list of simple.Podcast objects.
The parameter "query" specifies the search
query as a string.
"""
uri = self._locator.search_uri(query, self.FORMAT)
return [simple.Podcast.from_dict(x) for x in self._client.GET(uri)]
def get_podcasts_of_a_tag(self, tag, count=mygpoclient.TOPLIST_DEFAULT):
"""Get a list of most-subscribed podcasts of a Tag
Returns a list of simple.Podcast objects.
The parameter "tag" specifies the tag as a String
The parameter "count" is optional and describes
the amount of podcasts that are returned. The
default value is 50, the minimum value is 1 and
the maximum value is 100.
"""
uri = self._locator.podcasts_of_a_tag_uri(tag, count)
return [simple.Podcast.from_dict(x) for x in self._client.GET(uri)]
def get_toptags(self, count=mygpoclient.TOPLIST_DEFAULT):
"""Get a list of most-used tags
Returns a list of Tag objects.
The parameter "count" is optional and describes
the amount of podcasts that are returned. The
default value is 50, the minimum value is 1 and
the maximum value is 100.
"""
uri = self._locator.toptags_uri(count)
return [Tag.from_dict(x) for x in self._client.GET(uri)]
def get_podcast_data(self, podcast_uri):
"""Get Metadata for the specified Podcast
Returns a simple.Podcast object.
The parameter "podcast_uri" specifies the URL of the Podcast.
"""
uri = self._locator.podcast_data_uri(podcast_uri)
return simple.Podcast.from_dict(self._client.GET(uri))
def get_episode_data(self, podcast_uri, episode_uri):
"""Get Metadata for the specified Episode
Returns a Episode object.
The parameter "podcast_uri" specifies the URL of the Podcast,
which this Episode belongs to
The parameter "episode_uri" specifies the URL of the Episode
"""
uri = self._locator.episode_data_uri(podcast_uri, episode_uri)
return Episode.from_dict(self._client.GET(uri))
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mygpoclient import public
from mygpoclient import simple
from mygpoclient import testing
import unittest
class Test_Tag(unittest.TestCase):
def test_tagFromDict_raisesValueError_missingKey(self):
self.assertRaises(ValueError,public.Tag.from_dict, {'tag':'abcde'} )
class Test_Episode(unittest.TestCase):
def test_episodeFromDict_raisesValueError_missingKey(self):
self.assertRaises(ValueError,public.Episode.from_dict, {'title':'foobar','podcast_url':'http://www.podcast.com'})
class Test_PublicClient(unittest.TestCase):
TOPLIST_JSON = b"""
[{
"website": "http://linuxoutlaws.com/podcast",
"description": "Open source talk with a serious attitude",
"title": "Linux Outlaws",
"url": "http://feeds.feedburner.com/linuxoutlaws",
"subscribers_last_week": 1736,
"subscribers": 1736,
"mygpo_link": "http://www.gpodder.net/podcast/11092",
"logo_url": "http://linuxoutlaws.com/files/albumart-itunes.jpg"
},
{
"website": "http://syndication.mediafly.com/redirect/show/d581e9b773784df7a56f37e1138c037c",
"description": "We are not talking dentistry here; FLOSS all about Free Libre Open Source Software. Join hosts Randal Schwartz and Leo Laporte every Saturday as they talk with the most interesting and important people in the Open Source and Free Software community.",
"title": "FLOSS Weekly Video (large)",
"url": "http://feeds.twit.tv/floss_video_large",
"subscribers_last_week": 50,
"subscribers": 50,
"mygpo_link": "http://www.gpodder.net/podcast/31991",
"logo_url": "http://static.mediafly.com/publisher/images/06cecab60c784f9d9866f5dcb73227c3/icon-150x150.png"
}]
"""
TOPLIST = [
simple.Podcast('http://feeds.feedburner.com/linuxoutlaws',
'Linux Outlaws',
'Open source talk with a serious attitude',
'http://linuxoutlaws.com/podcast',
1736, 1736,
'http://www.gpodder.net/podcast/11092',
'http://linuxoutlaws.com/files/albumart-itunes.jpg'),
simple.Podcast('http://feeds.twit.tv/floss_video_large',
'FLOSS Weekly Video (large)',
'We are not talking dentistry here; FLOSS all about Free Libre Open Source Software. Join hosts Randal Schwartz and Leo Laporte every Saturday as they talk with the most interesting and important people in the Open Source and Free Software community.',
'http://syndication.mediafly.com/redirect/show/d581e9b773784df7a56f37e1138c037c',
50, 50,
'http://www.gpodder.net/podcast/31991',
'http://static.mediafly.com/publisher/images/06cecab60c784f9d9866f5dcb73227c3/icon-150x150.png'),
]
SEARCHRESULT_JSON = b"""
[{
"website": "http://linuxoutlaws.com/podcast",
"description": "Open source talk with a serious attitude",
"title": "Linux Outlaws",
"url": "http://feeds.feedburner.com/linuxoutlaws",
"subscribers_last_week": 1736,
"subscribers": 1736,
"mygpo_link": "http://www.gpodder.net/podcast/11092",
"logo_url": "http://linuxoutlaws.com/files/albumart-itunes.jpg"
},
{
"website": "http://syndication.mediafly.com/redirect/show/d581e9b773784df7a56f37e1138c037c",
"description": "We are not talking dentistry here; FLOSS all about Free Libre Open Source Software. Join hosts Randal Schwartz and Leo Laporte every Saturday as they talk with the most interesting and important people in the Open Source and Free Software community.",
"title": "FLOSS Weekly Video (large)",
"url": "http://feeds.twit.tv/floss_video_large",
"subscribers_last_week": 50,
"subscribers": 50,
"mygpo_link": "http://www.gpodder.net/podcast/31991",
"logo_url": "http://static.mediafly.com/publisher/images/06cecab60c784f9d9866f5dcb73227c3/icon-150x150.png"
}]
"""
SEARCHRESULT = [
simple.Podcast('http://feeds.feedburner.com/linuxoutlaws',
'Linux Outlaws',
'Open source talk with a serious attitude',
'http://linuxoutlaws.com/podcast',
1736, 1736,
'http://www.gpodder.net/podcast/11092',
'http://linuxoutlaws.com/files/albumart-itunes.jpg'),
simple.Podcast('http://feeds.twit.tv/floss_video_large',
'FLOSS Weekly Video (large)',
'We are not talking dentistry here; FLOSS all about Free Libre Open Source Software. Join hosts Randal Schwartz and Leo Laporte every Saturday as they talk with the most interesting and important people in the Open Source and Free Software community.',
'http://syndication.mediafly.com/redirect/show/d581e9b773784df7a56f37e1138c037c',
50, 50,
'http://www.gpodder.net/podcast/31991',
'http://static.mediafly.com/publisher/images/06cecab60c784f9d9866f5dcb73227c3/icon-150x150.png'),
]
TOPTAGS_JSON = b"""
[
{"tag": "Technology",
"usage": 530 },
{"tag": "Arts",
"usage": 400}
]
"""
TOPTAGS = [
public.Tag('Technology',530),
public.Tag('Arts',400)
]
PODCAST_JSON = b"""
{
"website": "http://linuxoutlaws.com/podcast",
"description": "Open source talk with a serious attitude",
"title": "Linux Outlaws",
"url": "http://feeds.feedburner.com/linuxoutlaws",
"subscribers_last_week": 1736,
"subscribers": 1736,
"mygpo_link": "http://www.gpodder.net/podcast/11092",
"logo_url": "http://linuxoutlaws.com/files/albumart-itunes.jpg"
}
"""
PODCAST = simple.Podcast('http://feeds.feedburner.com/linuxoutlaws',
'Linux Outlaws',
'Open source talk with a serious attitude',
'http://linuxoutlaws.com/podcast',
1736, 1736,
'http://www.gpodder.net/podcast/11092',
'http://linuxoutlaws.com/files/albumart-itunes.jpg')
EPISODE_JSON = b"""
{"title": "TWiT 245: No Hitler For You",
"url": "http://www.podtrac.com/pts/redirect.mp3/aolradio.podcast.aol.com/twit/twit0245.mp3",
"podcast_title": "this WEEK in TECH - MP3 Edition",
"podcast_url": "http://leo.am/podcasts/twit",
"description": "[...]",
"website": "http://www.podtrac.com/pts/redirect.mp3/aolradio.podcast.aol.com/twit/twit0245.mp3",
"released": "2010-12-25T00:30:00",
"mygpo_link": "http://gpodder.net/episode/1046492"}
"""
EPISODE = public.Episode('TWiT 245: No Hitler For You',
'http://www.podtrac.com/pts/redirect.mp3/aolradio.podcast.aol.com/twit/twit0245.mp3',
'this WEEK in TECH - MP3 Edition',
'http://leo.am/podcasts/twit',
'[...]',
'http://www.podtrac.com/pts/redirect.mp3/aolradio.podcast.aol.com/twit/twit0245.mp3',
'2010-12-25T00:30:00',
'http://gpodder.net/episode/1046492'
)
def setUp(self):
self.fake_client = testing.FakeJsonClient()
self.client = public.PublicClient(client_class=self.fake_client)
def test_getToplist(self):
self.fake_client.response_value = self.TOPLIST_JSON
result = self.client.get_toplist()
self.assertEquals(result, self.TOPLIST)
self.assertEquals(len(self.fake_client.requests), 1)
def test_searchPodcasts(self):
self.fake_client.response_value = self.SEARCHRESULT_JSON
result = self.client.search_podcasts('wicked')
self.assertEquals(result, self.SEARCHRESULT)
self.assertEquals(len(self.fake_client.requests), 1)
def test_getPodcastsOfATag(self):
self.fake_client.response_value = self.SEARCHRESULT_JSON
result = self.client.get_podcasts_of_a_tag('wicked')
self.assertEquals(result, self.SEARCHRESULT)
self.assertEquals(len(self.fake_client.requests), 1)
def test_getTopTags(self):
self.fake_client.response_value = self.TOPTAGS_JSON
result = self.client.get_toptags()
self.assertEquals(result, self.TOPTAGS)
self.assertEquals(len(self.fake_client.requests), 1)
def test_getPodcastData(self):
self.fake_client.response_value = self.PODCAST_JSON
result = self.client.get_podcast_data('http://feeds.feedburner.com/linuxoutlaws')
self.assertEquals(result, self.PODCAST)
self.assertEquals(len(self.fake_client.requests), 1)
def test_getEpisodeData(self):
self.fake_client.response_value = self.EPISODE_JSON
result = self.client.get_episode_data('http://leo.am/podcasts/twit','http://www.podtrac.com/pts/redirect.mp3/aolradio.podcast.aol.com/twit/twit0245.mp3')
self.assertEquals(result, self.EPISODE)
self.assertEquals(len(self.fake_client.requests), 1)
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from functools import wraps
import mygpoclient
from mygpoclient import locator
from mygpoclient import json
class MissingCredentials(Exception):
""" Raised when instantiating a SimpleClient without credentials """
def needs_credentials(f):
""" apply to all methods that initiate requests that require credentials """
@wraps(f)
def _wrapper(self, *args, **kwargs):
if not self.username or not self.password:
raise MissingCredentials
return f(self, *args, **kwargs)
return _wrapper
class Podcast(object):
"""Container class for a podcast
Encapsulates the metadata for a podcast.
Attributes:
url - The URL of the podcast feed
title - The title of the podcast
description - The description of the podcast
"""
REQUIRED_FIELDS = ('url', 'title', 'description', 'website', 'subscribers',
'subscribers_last_week', 'mygpo_link', 'logo_url')
def __init__(self, url, title, description, website, subscribers, subscribers_last_week, mygpo_link, logo_url):
self.url = url
self.title = title
self.description = description
self.website = website
self.subscribers = subscribers
self.subscribers_last_week = subscribers_last_week
self.mygpo_link = mygpo_link
self.logo_url = logo_url
@classmethod
def from_dict(cls, d):
for key in cls.REQUIRED_FIELDS:
if key not in d:
raise ValueError('Missing keys for toplist podcast')
return cls(*(d.get(k) for k in cls.REQUIRED_FIELDS))
def __eq__(self, other):
"""Test two Podcast objects for equality
>>> Podcast('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h') == Podcast('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h')
True
>>> Podcast('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h') == Podcast('s', 't', 'u', 'v', 'w', 'x', 'y', 'z')
False
>>> Podcast('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h') == 'a'
False
"""
if not isinstance(other, self.__class__):
return False
return all(getattr(self, k) == getattr(other, k) \
for k in self.REQUIRED_FIELDS)
class SimpleClient(object):
"""Client for the gpodder.net Simple API
This is the API client implementation that provides a
pythonic interface to the gpodder.net Simple API.
"""
FORMAT = 'json'
def __init__(self, username, password, root_url=mygpoclient.ROOT_URL,
client_class=json.JsonClient):
"""Creates a new Simple API client
Username and password must be specified and are
the user's login data for the webservice.
The parameter root_url is optional and defaults to
the main webservice. It can be either a hostname or
a full URL (to force https, for instance).
The parameter client_class is optional and should
not need to be changed in normal use cases. If it
is changed, it should provide the same interface
as the json.JsonClient class in mygpoclient.
"""
self.username = username
self.password = password
self._locator = locator.Locator(username, root_url)
self._client = client_class(username, password)
@needs_credentials
def get_subscriptions(self, device_id):
"""Get a list of subscriptions for a device
Returns a list of URLs (one per subscription) for
the given device_id that reflects the current list
of subscriptions.
Raises http.NotFound if the device does not exist.
"""
uri = self._locator.subscriptions_uri(device_id, self.FORMAT)
return self._client.GET(uri)
@needs_credentials
def put_subscriptions(self, device_id, urls):
"""Update a device's subscription list
Sets the server-side subscription list for the device
"device_id" to be equivalent to the URLs in the list of
strings "urls".
The device will be created if it does not yet exist.
Returns True if the update was successful, False otherwise.
"""
uri = self._locator.subscriptions_uri(device_id, self.FORMAT)
return (self._client.PUT(uri, urls) == None)
@needs_credentials
def get_suggestions(self, count=10):
"""Get podcast suggestions for the user
Returns a list of Podcast objects that are
to be suggested to the user.
The parameter count is optional and if
specified has to be a value between 1
and 100 (with 10 being the default), and
determines how much search results are
returned (at maximum).
"""
uri = self._locator.suggestions_uri(count, self.FORMAT)
return [Podcast.from_dict(x) for x in self._client.GET(uri)]
@property
def locator(self):
""" read-only access to the locator """
return self._locator
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mygpoclient import simple
from mygpoclient import testing
import unittest
class Test_Podcast(unittest.TestCase):
def test_podcastFromDict_raisesValueError_missingKey(self):
self.assertRaises(ValueError,
simple.Podcast.from_dict, {'url': 'a', 'title': 'b'})
class Test_SimpleClient(unittest.TestCase):
USERNAME = 'a'
PASSWORD = 'b'
DEVICE_NAME = 'x'
SUBSCRIPTIONS = [
'http://lugradio.org/episodes.rss',
'http://feeds2.feedburner.com/LinuxOutlaws',
]
SUBSCRIPTIONS_JSON = b"""
["http://lugradio.org/episodes.rss",
"http://feeds2.feedburner.com/LinuxOutlaws"]
"""
SUGGESTIONS = [
simple.Podcast('http://feeds.feedburner.com/linuxoutlaws',
'Linux Outlaws',
'Open source talk with a serious attitude',
'http://linuxoutlaws.com/podcast',
1736, 1736,
'http://www.gpodder.net/podcast/11092',
'http://linuxoutlaws.com/files/albumart-itunes.jpg'),
simple.Podcast('http://feeds.twit.tv/floss_video_large',
'FLOSS Weekly Video (large)',
'We are not talking dentistry here; FLOSS all about Free Libre Open Source Software. Join hosts Randal Schwartz and Leo Laporte every Saturday as they talk with the most interesting and important people in the Open Source and Free Software community.',
'http://syndication.mediafly.com/redirect/show/d581e9b773784df7a56f37e1138c037c',
50, 50,
'http://www.gpodder.net/podcast/31991',
'http://static.mediafly.com/publisher/images/06cecab60c784f9d9866f5dcb73227c3/icon-150x150.png'),
]
SUGGESTIONS_JSON = b"""
[{
"website": "http://linuxoutlaws.com/podcast",
"description": "Open source talk with a serious attitude",
"title": "Linux Outlaws",
"url": "http://feeds.feedburner.com/linuxoutlaws",
"subscribers_last_week": 1736,
"subscribers": 1736,
"mygpo_link": "http://www.gpodder.net/podcast/11092",
"logo_url": "http://linuxoutlaws.com/files/albumart-itunes.jpg"
},
{
"website": "http://syndication.mediafly.com/redirect/show/d581e9b773784df7a56f37e1138c037c",
"description": "We are not talking dentistry here; FLOSS all about Free Libre Open Source Software. Join hosts Randal Schwartz and Leo Laporte every Saturday as they talk with the most interesting and important people in the Open Source and Free Software community.",
"title": "FLOSS Weekly Video (large)",
"url": "http://feeds.twit.tv/floss_video_large",
"subscribers_last_week": 50,
"subscribers": 50,
"mygpo_link": "http://www.gpodder.net/podcast/31991",
"logo_url": "http://static.mediafly.com/publisher/images/06cecab60c784f9d9866f5dcb73227c3/icon-150x150.png"
}]
"""
def setUp(self):
self.fake_client = testing.FakeJsonClient()
self.client = simple.SimpleClient(self.USERNAME, self.PASSWORD,
client_class=self.fake_client)
def test_putSubscriptions(self):
self.fake_client.response_value = b''
result = self.client.put_subscriptions(self.DEVICE_NAME, self.SUBSCRIPTIONS)
self.assertEquals(result, True)
self.assertEquals(len(self.fake_client.requests), 1)
def test_getSubscriptions(self):
self.fake_client.response_value = self.SUBSCRIPTIONS_JSON
subscriptions = self.client.get_subscriptions(self.DEVICE_NAME)
self.assertEquals(subscriptions, self.SUBSCRIPTIONS)
self.assertEquals(len(self.fake_client.requests), 1)
def test_getSuggestions(self):
self.fake_client.response_value = self.SUGGESTIONS_JSON
suggestions = self.client.get_suggestions(50)
self.assertEquals(suggestions, self.SUGGESTIONS)
self.assertEquals(len(self.fake_client.requests), 1)
class Test_MissingCredentials(unittest.TestCase):
DEVICE_NAME = 'unit-test-device'
def test_getSubscriptions_UserAndPassAreNone(self):
client = simple.SimpleClient(None, None, client_class=testing.FakeJsonClient())
self.assertRaises(simple.MissingCredentials, client.get_subscriptions, (self.DEVICE_NAME,))
def test_getSubscriptions_EmptyUserAndPass(self):
client = simple.SimpleClient('', '', client_class=testing.FakeJsonClient())
self.assertRaises(simple.MissingCredentials, client.get_subscriptions, (self.DEVICE_NAME,))
def test_getSubscriptions_EmptyPassword(self):
client = simple.SimpleClient('user', '', client_class=testing.FakeJsonClient())
self.assertRaises(simple.MissingCredentials, client.get_subscriptions, (self.DEVICE_NAME,))
def test_getSubscriptions_EmptyUsername(self):
client = simple.SimpleClient('', 'pass', client_class=testing.FakeJsonClient())
self.assertRaises(simple.MissingCredentials, client.get_subscriptions, (self.DEVICE_NAME,))
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mygpoclient import json
class FakeJsonClient(object):
"""Fake implementation of a JsonClient used for testing
Set the response using response_value and check the list
of requests this object got using the requests list.
"""
def __init__(self):
self.requests = []
self.response_value = ''
def __call__(self, *args, **kwargs):
"""Fake a constructor for an existing object
>>> fake_class = FakeJsonClient()
>>> fake_object = fake_class('username', 'password')
>>> fake_object == fake_class
True
"""
return self
def _request(self, method, uri, data):
self.requests.append((method, uri, data))
data = json.JsonClient.encode(data)
return json.JsonClient.decode(self.response_value)
def GET(self, uri):
return self._request('GET', uri, None)
def POST(self, uri, data):
return self._request('POST', uri, data)
def PUT(self, uri, data):
return self._request('PUT', uri, data)
# -*- coding: utf-8 -*-
# gpodder.net API Client
# Copyright (C) 2009-2013 Thomas Perl and the gPodder Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import mygpoclient
import datetime
def join(*args):
"""Join separate URL parts to a full URL"""
return '/'.join(args)
def iso8601_to_datetime(s):
"""Convert a ISO8601-formatted string to datetime
>>> iso8601_to_datetime('2009-12-29T19:25:33')
datetime.datetime(2009, 12, 29, 19, 25, 33)
>>> iso8601_to_datetime('2009-12-29T19:25:33.1')
datetime.datetime(2009, 12, 29, 19, 25, 33, 100000)
>>> iso8601_to_datetime('2009-12-29T19:25:33Z')
datetime.datetime(2009, 12, 29, 19, 25, 33)
>>> iso8601_to_datetime('xXxXxXxXxxxxXxxxXxx')
>>>
"""
for format in ('%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%SZ'):
try:
return datetime.datetime.strptime(s, format)
except ValueError:
continue
return None
def datetime_to_iso8601(dt):
"""Convert a datetime to a ISO8601-formatted string
>>> datetime_to_iso8601(datetime.datetime(2009, 12, 29, 19, 25, 33))
'2009-12-29T19:25:33'
"""
return dt.strftime('%Y-%m-%dT%H:%M:%S')
def position_to_seconds(s):
"""Convert a position string to its amount of seconds
>>> position_to_seconds('00:00:01')
1
>>> position_to_seconds('00:01:00')
60
>>> position_to_seconds('01:00:00')
3600
>>> position_to_seconds('02:59:59')
10799
>>> position_to_seconds('100:00:00')
360000
"""
hours, minutes, seconds = (int(x) for x in s.split(':', 2))
return (((hours*60)+minutes)*60)+seconds
def seconds_to_position(seconds):
"""Convert the amount of seconds to a position string
>>> seconds_to_position(1)
'00:00:01'
>>> seconds_to_position(60)
'00:01:00'
>>> seconds_to_position(60*60)
'01:00:00'
>>> seconds_to_position(59 + 60*59 + 60*60*2)
'02:59:59'
>>> seconds_to_position(60*60*100)
'100:00:00'
"""
minutes = int(seconds/60)
seconds = seconds % 60
hours = int(minutes/60)
minutes = minutes % 60
return '%02d:%02d:%02d' % (hours, minutes, seconds)
......@@ -6,27 +6,27 @@ from peewee import DateTimeField, ForeignKeyField
from podcast import POST_ID_TYPE
from podcast.constants import BaseModel, db
from podcast.podpost import Podpost, PodpostFactory
from podcast.util import chunks
logger = logging.getLogger(__name__)
class AbstractPodpostListEntry(BaseModel):
insert_date: DateTimeField = DateTimeField(default=time.time()) # unused yet
podpost: ForeignKeyField = ForeignKeyField(Podpost, primary_key=True,on_delete='CASCADE')
podpost: ForeignKeyField = ForeignKeyField(Podpost, primary_key=True, on_delete='CASCADE')
@classmethod
def insert_many_podposts(cls, post_ids: List[POST_ID_TYPE]):
with db.atomic():
insert_count = cls.insert_many(({'podpost': id} for id in post_ids), fields=[cls.podpost]).on_conflict_ignore().execute()
if insert_count != len(post_ids):
logger.warning("Tried to insert %d but could only %d", len(post_ids), insert_count)
for _, _, chunk in chunks(post_ids, 500):
cls.insert_many(({'podpost': id} for id in chunk),
fields=[cls.podpost]).on_conflict_ignore().execute()
@classmethod
def of_podpost(cls, postid: POST_ID_TYPE):
if type(postid) != POST_ID_TYPE:
postid = POST_ID_TYPE(postid)
if PodpostFactory().get_podpost(postid) == None:
if not PodpostFactory().exists(postid):
# db not started?
# make sure db is properly closed
logger.error("Trying to add post %s to %s, but post does not exist" % (str(cls), postid))
......
......@@ -6,4 +6,5 @@ import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("peewee").setLevel(logging.INFO)
logging.getLogger('asyncio').setLevel(logging.WARNING)
POST_ID_TYPE = int
......@@ -36,16 +36,17 @@ class Archive:
raise NotImplementedError()
def insert(self, podpost: POST_ID_TYPE):
def insert(self, podpost: POST_ID_TYPE, overwrite_insert_date=True):
"""
Insert an podost
"""
try:
ArchiveEntry.of_podpost(podpost).save(force_insert=True)
except IntegrityError:
entry: ArchiveEntry = ArchiveEntry.get_by_id(podpost)
entry.insert_date = time.time()
entry.save()
if overwrite_insert_date:
entry: ArchiveEntry = ArchiveEntry.get_by_id(podpost)
entry.insert_date = time.time()
entry.save()
def bulk_insert(self, podposts: List[POST_ID_TYPE]):
ArchiveEntry.insert_many_podposts(podposts)
......@@ -59,7 +60,8 @@ class Archive:
query = query.order_by(ArchiveEntry.insert_date.desc())
return list(e.podpost_id for e in query)
def get_podpost_objects(self, url_filter=None, filter_favorite=False, sort_by_insertdate = False, limit = 0) -> Iterator[Podpost]:
def get_podpost_objects(self, url_filter=None, filter_favorite=False, sort_by_insertdate=False, limit=0) -> \
Iterator[Podpost]:
query: ModelSelect = ArchiveEntry.select(Podpost).join(Podpost)
if url_filter:
query = query.where(Podpost.podurl == url_filter)
......@@ -86,7 +88,7 @@ class ArchiveFactory(metaclass=Singleton):
otherwise if returns the Singleton
"""
def get_archive(self):
def get_archive(self) -> Archive:
"""
Get the Archive
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment