Skip to content

Performance issues when importing large json objects

Summary

Importing large JSON records takes a lot of time.

Below you'll find a client that fetches just one record from eodhistoricaldata.com endpoint. The record size is >=1MB.

Steps to reproduce

  1. Implement a very simple client and a stream
"""REST client handling, including eodhistoricaldataStream base class."""

import requests
from pathlib import Path
from typing import Any, Dict, Optional

from singer_sdk.streams import RESTStream

SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")


class eodhistoricaldataStream(RESTStream):
    """eodhistoricaldata stream class."""

    url_base = "https://eodhistoricaldata.com/api"

    def get_next_page_token(
            self, response: requests.Response, previous_token: Optional[Any]
    ) -> Optional[Any]:
        return None

    def get_url_params(
            self, context: Optional[dict], next_page_token: Optional[Any]
    ) -> Dict[str, Any]:
        """Return a dictionary of values to be used in URL parameterization."""
        params: dict = {"api_token": self.config['api_token']}
        return params

And a stream

"""Stream type classes for tap-eodhistoricaldata."""

from pathlib import Path
from typing import Any, Dict, Optional, Iterator

from tap_eodhistoricaldata.client import eodhistoricaldataStream

SCHEMAS_DIR = Path(__file__).parent / Path("./schemas")


class Fundamentals(eodhistoricaldataStream):
    """Define custom stream."""
    name = "fundamentals"
    path = "/fundamentals/{Code}"
    primary_keys = ["Code"]
    selected_by_default = True

    replication_key = None
    schema_filepath = SCHEMAS_DIR / "fundamentals.json"

    @property
    def partitions(self) -> Iterator[Dict[str, Any]]:
        return map(lambda x: {'Code': x}, self.config['symbols'])

    def post_process(self, row: dict, context: Optional[dict] = None) -> dict:
        row['Code'] = context['Code']
        return row

and a tap

"""eodhistoricaldata tap class."""

from typing import List

from singer_sdk import Tap, Stream
from singer_sdk import typing as th  # JSON schema typing helpers

from tap_eodhistoricaldata.streams import (
    eodhistoricaldataStream,
    Fundamentals,
)

STREAM_TYPES = [
    Fundamentals
]


class Tapeodhistoricaldata(Tap):
    """eodhistoricaldata tap class."""
    name = "tap-eodhistoricaldata"

    # TODO: Update this section with the actual config values you expect:
    config_jsonschema = th.PropertiesList(
        th.Property("api_token", th.StringType, required=True),
        th.Property("symbols", th.ArrayType(th.StringType), required=True),
        th.Property("start_date", th.DateTimeType),
        th.Property("api_url", th.StringType, default="https://eodhistoricaldata.com/api/"),
    ).to_dict()

    def discover_streams(self) -> List[Stream]:
        """Return a list of discovered streams."""
        return [stream_class(tap=self) for stream_class in STREAM_TYPES]

Then write a temple test

@pytest.mark.vcr
def test_selected():
    tap1 = Tapeodhistoricaldata(config=SAMPLE_CONFIG, parse_env_config=True)

    tap1.sync_all()

and run it

What is the current bug behavior?

The test takes 50 seconds to run

What is the expected correct behavior?

A decent JSON validator should be able to crank such record in an acceptable time (less than 2 seconds I'd say. !

Relevant logs and/or screenshots

Here are the profiling results of the test above. combined__1_.svg

Possible fixes

I don't understand much of the logic but it seems to me that pop_deselected_record_properties is extremely expensive. It traverses the entire JSON dict and converts it into a set of Schema and Catalog objects. This produces about 4 million objects in memory. I think one way to improve it is to avoid Schema and Catalog objects altogether and work with an existing json dict.