Commit 710eadd3 authored by Bastien Abadie's avatar Bastien Abadie 🐐 Committed by Erwan Rouchet
Browse files

Resilient Pagination

parent a9af9a90
Pipeline #204448938 passed with stage
in 17 seconds
......@@ -73,6 +73,34 @@ that is, until the next page must be loaded.
of pages at once and cause a big load on the server. You can use ``len`` to
get the total item count before spamming a server.
A call to ``paginate`` may produce hundreds of sub-requests depending on the size
of the dataset you're requesting. To accommodate with large datasets, and support
network or performance issues, ``paginate`` supports a ``retries`` parameter to
specify the number of sub-request it's able to run for each page in the dataset.
By default, the method will retry 5 times per page.
You may want to allow ``paginate`` to fail on some pages, for really big datasets
(errors happen). In this case, you should use the optional boolean parameter
``allow_missing_data`` (set to ``False`` by default).
Here is an example of pagination on a large dataset, allowing data loss, lowering
retries and listing the missed pages:
.. code:: python
elements = cli.paginate(
'ListProcessElements',
id='XXX',
retries=3,
allow_missing_data=True,
)
for element in elements:
print(element['id'])
print("Missing pages: {elements.missing}")
Using another server
^^^^^^^^^^^^^^^^^^^^
......
# -*- coding: utf-8 -*-
import logging
import math
import random
import time
from collections.abc import Iterator, Sized
import apistar
import requests
logger = logging.getLogger(__name__)
class ResponsePaginator(Sized, Iterator):
"""
......@@ -38,12 +47,86 @@ class ResponsePaginator(Sized, Iterator):
self._started = False
"""Has any request been sent"""
def _fetch_page(self, page):
self.retries = request_kwargs.pop("retries", 5)
assert (
isinstance(self.retries, int) and self.retries > 0
), "retries must be a positive integer"
"""Max number of retries per API request"""
# Add initial page to pages
self.initial_page = self.current_page or 1
self.pages = {self.initial_page: self.retries}
# Store missing page indexes
self.missing = set()
self.allow_missing_data = request_kwargs.pop("allow_missing_data", False)
assert isinstance(
self.allow_missing_data, bool
), "allow_missing_data must be a boolean"
def _fetch_page(self):
# Filter out pages with no retries
# Transform as a list of tuples for simpler output
remaining = sorted([(m, v) for m, v in self.pages.items() if v > 0])
# No remaining pages, end of iteration
if not remaining:
raise StopIteration
# Get next page to load
page, retry = remaining[0]
self.request_kwargs["page"] = page
self.data = self.client.request(*self.request_args, **self.request_kwargs)
self.results = self.data.get("results", [])
self.current_page = page
self._started = True
try:
logger.info(
f"Loading page {page} on try {self.retries - retry + 1}/{self.retries} - remains {len(self.pages)} pages to load."
)
self.data = self.client.request(*self.request_args, **self.request_kwargs)
self.results = self.data.get("results", [])
self.current_page = page
self._started = True
if page == self.initial_page and self.results:
# On first successful page load, populate pages to load
nb_pages = math.ceil(self.data["count"] / len(self.results))
self.pages = {
i: self.retries for i in range(self.initial_page + 1, nb_pages + 1)
}
if self.pages:
logger.info(f"Pagination will load {nb_pages} pages.")
else:
# Mark page as loaded on other pages
del self.pages[page]
# Stop happy path here, we don't need to process errors
return self.data
except apistar.exceptions.ErrorResponse as e:
logger.warning(f"API Error {e.status_code} on pagination: {e.content}")
# Decrement pages counter
self.pages[page] -= 1
# Sleep a bit (less than a second)
time.sleep(random.random())
except requests.exceptions.ConnectionError as e:
logger.error(f"Server connection error, will retry in a few seconds: {e}")
# Decrement pages counter
self.pages[page] -= 1
# Sleep a few seconds
time.sleep(random.randint(1, 10))
# Detect and store references to missing pages
# when a page has no retries left
if self.pages[page] <= 0:
logger.warning(f"No more retries left for page {page}")
if self.allow_missing_data:
self.missing.add(page)
else:
raise Exception("Stopping pagination as data will be incomplete")
def __iter__(self):
return self
......@@ -52,7 +135,11 @@ class ResponsePaginator(Sized, Iterator):
if len(self.results) < 1:
if self.data and self.data.get("next") is None:
raise StopIteration
self._fetch_page(self.current_page + 1)
# Continuously try to fetch a page until there are some retries left
# This will still yield as soon as some data is fetched
while self._fetch_page() is None:
pass
# Even after fetching a new page, if the new page is empty, just fail
if len(self.results) < 1:
......@@ -63,9 +150,9 @@ class ResponsePaginator(Sized, Iterator):
def __len__(self):
# Handle calls to len when no requests have been made yet
if not self.data and self.current_page < 1:
self._fetch_page(1)
self._fetch_page()
elif not self._started:
self._fetch_page(self.current_page)
self._fetch_page()
return self.data["count"]
def __repr__(self):
......
# -*- coding: utf-8 -*-
from pathlib import Path
import pytest
import responses
DUMMY_SCHEMA = Path(__file__).absolute().parent / "schema.json"
@pytest.fixture
def mock_schema():
with DUMMY_SCHEMA.open() as f:
responses.add(
responses.GET,
"https://dummy.test/api/v1/openapi/?format=openapi-json",
body=f.read(),
)
......@@ -18,7 +18,6 @@ def test_invalid_url():
ArkindexClient(base_url="http://aaa")
@responses.activate
def test_http_error():
responses.add(
responses.GET,
......@@ -29,7 +28,6 @@ def test_http_error():
ArkindexClient(base_url="https://dummy.test/")
@responses.activate
def test_invalid_json():
responses.add(
responses.GET,
......@@ -40,7 +38,6 @@ def test_invalid_json():
ArkindexClient(base_url="https://dummy.test/")
@responses.activate
def test_no_endpoints():
responses.add(
responses.GET,
......@@ -58,7 +55,6 @@ def test_no_endpoints():
ArkindexClient(base_url="https://dummy.test/")
@responses.activate
def test_schema_url():
with DUMMY_SCHEMA.open() as f:
responses.add(
......
# -*- coding: utf-8 -*-
import time
from pathlib import Path
import pytest
......@@ -9,15 +10,7 @@ from arkindex import ArkindexClient
DUMMY_SCHEMA = Path(__file__).absolute().parent / "schema.json"
@responses.activate
def test_pagination_empty():
with DUMMY_SCHEMA.open() as f:
responses.add(
responses.GET,
"https://dummy.test/api/v1/openapi/?format=openapi-json",
body=f.read(),
)
def test_pagination_empty(mock_schema):
responses.add(
responses.GET,
"https://dummy.test/api/v1/elements/",
......@@ -29,15 +22,7 @@ def test_pagination_empty():
next(cli.paginate("ListElements"))
@responses.activate
def test_pagination_with_given_starting_page():
with DUMMY_SCHEMA.open() as f:
responses.add(
responses.GET,
"https://dummy.test/api/v1/openapi/?format=openapi-json",
body=f.read(),
)
def test_pagination_with_given_starting_page(mock_schema):
responses.add(
responses.GET,
"https://dummy.test/api/v1/elements/",
......@@ -52,3 +37,92 @@ def test_pagination_with_given_starting_page():
assert len(paginator) == 0
# verify that calling len() does not set current_page to 1 as before
assert paginator.current_page == starting_page
def test_pagination_with_missing_data(mock_schema, monkeypatch):
base_url = "https://dummy.test/api/v1/elements/"
# Disable sleeps
monkeypatch.setattr(time, "sleep", lambda x: None)
# Page 1
responses.add(
responses.GET,
f"{base_url}?page=1",
match_querystring=True,
json={
"count": 9,
"previous": None,
"next": f"{base_url}?page=2",
"results": [1, 2, 3],
},
)
# Page 2 is erroneous
# We need 3 responses, one for each retries
for i in range(3):
responses.add(
responses.GET,
f"{base_url}?page=2",
match_querystring=True,
status=400,
json={"error": "some error happened"},
)
# Page 3
responses.add(
responses.GET,
f"{base_url}?page=3",
match_querystring=True,
json={
"count": 9,
"previous": f"{base_url}?page=2",
"next": None,
"results": [7, 8, 9],
},
)
cli = ArkindexClient("t0k3n", base_url="https://dummy.test")
paginator = cli.paginate("ListElements", allow_missing_data=True, retries=3)
assert paginator.allow_missing_data is True
assert len(paginator) == 9
assert list(paginator) == [1, 2, 3, 7, 8, 9]
assert paginator.missing == {2}
def test_pagination_incomplete(mock_schema, monkeypatch):
base_url = "https://dummy.test/api/v1/elements/"
# Disable sleeps
monkeypatch.setattr(time, "sleep", lambda x: None)
# Page 1
responses.add(
f"{base_url}?page=1",
base_url,
match_querystring=True,
json={
"count": 9,
"previous": None,
"next": f"{base_url}?page=2",
"results": ["A", "B"],
},
)
# Page 2 is erroneous
# Page 3 is not needed as it won't try to load it
responses.add(
responses.GET,
f"{base_url}?page=2",
status=500,
)
cli = ArkindexClient("t0k3n", base_url="https://dummy.test")
paginator = cli.paginate("ListElements", retries=1)
assert paginator.allow_missing_data is False
with pytest.raises(
Exception, match="Stopping pagination as data will be incomplete"
):
list(paginator)
......@@ -8,4 +8,4 @@ commands =
deps =
pytest
responses==0.12.0
pytest-responses
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment