diff --git a/tap_abra/client.py b/tap_abra/client.py index 40d691739cd6aa5382a3d8b376f450202e444794..795da6c7d0626d7992e645ada8c4fc768cebe60c 100644 --- a/tap_abra/client.py +++ b/tap_abra/client.py @@ -3,7 +3,7 @@ import requests import datetime from pathlib import Path -from typing import Any, Dict, Optional, Union, List, Iterable, Callable +from typing import Any, Dict, Optional, Union, List, Iterable, Callable, cast from singer_sdk import typing as th from memoization import cached @@ -14,6 +14,75 @@ from singer_sdk.exceptions import RetriableAPIError, FatalAPIError import backoff from backports.cached_property import cached_property +def _find_in_partitions_list( + partitions: List[dict], state_partition_context: dict +) -> Optional[dict]: + found = [ + partition_state + for partition_state in partitions + if partition_state["context"] == state_partition_context + ] + if len(found) > 1: + raise ValueError( + f"State file contains duplicate entries for partition: " + "{state_partition_context}.\n" + f"Matching state values were: {str(found)}" + ) + if found: + return cast(dict, found[0]) + + return None + + +def get_state_if_exists( + tap_state: dict, + tap_stream_id: str, + state_partition_context: Optional[dict] = None, + key: Optional[str] = None, +) -> Optional[Any]: + if "bookmarks" not in tap_state: + return None + if tap_stream_id not in tap_state["bookmarks"]: + return None + + skip_incremental_partitions = [ + "product_production_bom", + "sale_with_details", + "sale_fulfilment_list", + "advance_purchase", + "production_order_detail", + + ] + stream_state = tap_state["bookmarks"][tap_stream_id] + if tap_stream_id in skip_incremental_partitions and "partitions" in stream_state: + # stream_state["partitions"] = [] + partitions = stream_state["partitions"][len(stream_state["partitions"]) - 1][ + "context" + ] + stream_state["partitions"] = [{"context": partitions}] + + if not state_partition_context: + if key: + return stream_state.get(key, None) + return stream_state + if "partitions" not in stream_state: + return None # No partitions defined + + matched_partition = _find_in_partitions_list( + stream_state["partitions"], state_partition_context + ) + if matched_partition is None: + return None # Partition definition not present + if key: + return matched_partition.get(key, None) + return matched_partition + + +def get_state_partitions_list( + tap_state: dict, tap_stream_id: str +) -> Optional[List[dict]]: + """Return a list of partitions defined in the state, or None if not defined.""" + return (get_state_if_exists(tap_state, tap_stream_id) or {}).get("partitions", None) class FailJobException(Exception): pass class AbraStream(RESTStream): @@ -36,6 +105,17 @@ class AbraStream(RESTStream): items_per_page = 1000 select = None + @property + def partitions(self) -> Optional[List[dict]]: + result: List[dict] = [] + for partition_state in ( + get_state_partitions_list(self.tap_state, self.name) or [] + ): + result.append(partition_state["context"]) + if result is not None and len(result) > 0: + result = [result[len(result) - 1]] + return result or None + @property def authenticator(self) -> BasicAuthenticator: """Return a new authenticator object.""" @@ -60,13 +140,22 @@ class AbraStream(RESTStream): ( RetriableAPIError, requests.exceptions.ReadTimeout, - requests.exceptions.ConnectionError, requests.exceptions.RequestException, - ConnectionRefusedError ), max_tries=7, factor=2, )(func) + + # increase backoff for connection errors + decorator = backoff.on_exception( + backoff.constant, + ( + requests.exceptions.ConnectionError, + ConnectionRefusedError, + ), + max_tries=15, + interval=2, + )(decorator) return decorator def get_next_page_token( @@ -158,6 +247,12 @@ class AbraStream(RESTStream): max_tries=10, factor=3, ) + @backoff.on_exception( + backoff.expo, + (requests.exceptions.ConnectionError,ConnectionRefusedError), + max_tries=15, + factor=3, + ) def make_models_request(self,model_id=None): headers = self.http_headers authenticator = self.authenticator