search_runtime.py 7.5 KB
Newer Older
1
2
3
4
"""
Contains global state of the recomendation engine and the key search functions
"""

5
6
7
import arxiv
from datetime import datetime
import time
8
9
10
11
12
13
import pandas as pd  # type: ignore
import numpy as np  # type: ignore
import scipy.sparse  # type: ignore
from functools import lru_cache
from joblib import load as load_pickle
from sklearn.feature_extraction.text import TfidfVectorizer
14
from sklearn.decomposition import NMF
15
from sklearn.preprocessing import normalize
16
17
18
from gensim.models import LdaModel as LDA
from gensim.matutils import Sparse2Corpus
from typing import Union, List, Dict, Any
19

20
21
22
23
24

#######################################
# Setup work at Import-time
######################################

25
META_DF_FILENAME = "../arXiv_dataset.parquet"
Derek Rodriguez's avatar
Derek Rodriguez committed
26
27
28
29
TOPIC_MAT_FILENAME = "../semiprod_doc_topic_matrix2020-08-21T02-12-21.npy"
TERM_MAT_FILENAME = "../term_mats/semiprod_term_mat_2020-08-21T02-12-21.npz"
MODEL_PICKLE_FILENAME = "../pickles/semiprod_lda_2020-08-21T02-12-21"
VECTORIZER_FILE_NAME = "../pickles/semiprod_tfidf_vec_2020-08-21T02-12-21"
30
31
32
33
34
35
36
37

# This is the classifier object used for inferring new papers
with open(MODEL_PICKLE_FILENAME, "rb") as fh:
    topic_model = load_pickle(fh)  # type: Union[NMF, LDA]

# This is the object capable of parsing text into a vector of weighted text
# tokens based on the pre-determined corpus vocabulary
with open(VECTORIZER_FILE_NAME, "rb") as fh:
38
39
    vectorizer = load_pickle(fh)  # type: TfidfVectorizer

40

41
# This is the main in-memory datastore for all paper info
42
meta_df = pd.read_parquet(META_DF_FILENAME)  # type: pd.DataFrame
43
44
45
46
47

# A matrix where all rows correspond to papers, and columns correspond to vocab
doc_term_mat = scipy.sparse.load_npz(TERM_MAT_FILENAME)  # type: scipy.sparse.csr_matrix

# Bit-matrix tracking the categories of each paper. Useful for fast subsetting of the corpus
48
category_mat = np.matrix(meta_df["category_bits"].tolist())  # type: np.ndarray
49
50

# The Topic Matrix that is a reduced projection of the Document-Term matrix.
51
doc_topic_mat = np.load(TOPIC_MAT_FILENAME)  # type: np.ndarray
52
53

# Lookup table used to decode the category bit-matrix.
54
55
56
57
arXiv_categories = sorted(
    list(meta_df["categories"].explode().unique())
)  # type: List[str]

58
59
60

# Normalize the topic matrix so that we can do fast comparisons
# Cosine of pre-normalized vectors == simple dot product
61
62
doc_topic_mat /= np.linalg.norm(doc_topic_mat, axis=0)

63
64
65
66
67
68
69
70
71
# Do the same for the document-term matrix
normalize(doc_term_mat, norm="l2", axis=1)


#######################################
# Private Helper Functions
######################################


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def _fetch_metadata_from_api(query_id: str) -> Dict[str, Any]:
    """
    Call the arXiv API and clean the response for our uses.

    TODO: replace the arxiv package with a custom query written using
    grequests.
    """
    arxiv_api_response = arxiv.query(id_list=[query_id])[0]
    return {
        "date": datetime.fromtimestamp(
            time.mktime(arxiv_api_response["published_parsed"])
        ),
        "id": arxiv_api_response["id"],
        "title": arxiv_api_response["title"],
        "authors": arxiv_api_response["authors"],
        "categories": [tag_obj["term"] for tag_obj in arxiv_api_response["tags"]],
        "abstract": arxiv_api_response["summary"],
    }


def _add_new_metadata_to_global_state(query_id: str):
    """
    Generate data for new paper and add it to the global structures.

    Note that these structures are on a per-worker basis, and that 
    """
    global doc_topic_mat, doc_term_mat, category_mat, meta_df, arXiv_categories
    # get paper from arXiv API and parse response
    cleaned_metadata_dict = _fetch_metadata_from_api(query_id)

    # Generate Category bitset and append to existing matrix
    # super shitty fix for the fact that cs.LG is missing
    if "cs.LG" in cleaned_metadata_dict["categories"]:
        cleaned_metadata_dict["categories"][
            cleaned_metadata_dict["categories"].index("cs.LG")
        ] = "stat.ML"
    new_paper_cat_idxs = [
        arXiv_categories.index(category)
        for category in cleaned_metadata_dict["categories"]
    ]
    cleaned_metadata_dict["category_bits"] = np.packbits(
        [(0, 1)[idx in new_paper_cat_idxs] for idx in range(len(arXiv_categories))]
    )
    category_mat = np.vstack((category_mat, cleaned_metadata_dict["category_bits"]))

    # Vectorize abstract and append to doc_term_mat
    term_vector = vectorizer.transform([cleaned_metadata_dict["abstract"]])
    doc_term_mat = scipy.sparse.vstack([doc_term_mat, term_vector])

    # Get topic vector from term_vector and append to doc_topic_mat
    if isinstance(topic_model, NMF):
        pass
    elif isinstance(topic_model, LDA):
        gensim_doc_wrapper = Sparse2Corpus(term_vector, documents_columns=False)
        topic_vector = np.array([x[1] for x in topic_model[gensim_doc_wrapper[0]]])
Derek Rodriguez's avatar
Derek Rodriguez committed
127
        # topic_model.update(gensim_doc_wrapper)  # yay, online learning!
128
129
130
131
132
133

    doc_topic_mat = np.vstack((doc_topic_mat, topic_vector))

    meta_df = meta_df.append(cleaned_metadata_dict, ignore_index=True)


134
135
136
137
138
139
140
def _convert_arXiv_id_to_idx(query_id: str) -> int:
    """
    Takes an arXiv preprint ID and finds the corresponding index in the global structures.

    Note that the indices of all globla data structures are associative, so by querying 
    meta_df, we can use the same number in all the other structures.
    """
141
142
143
144
145
146
147
    global meta_df
    conversion_result = meta_df[meta_df["id"] == query_id].index  # type: pd.RangeIndex
    if len(conversion_result) == 0:
        _add_new_metadata_to_global_state(query_id)
        return len(meta_df) - 1
    else:
        return conversion_result[0]
148
149
150
151
152
153
154
155
156
157
158
159


def _get_metadata_batch(row_idxs: List[int]) -> pd.DataFrame:
    """
    Return a subset of meta_df containing the requested indices.
    """
    return meta_df.iloc[row_idxs]


#####################################
# Global Functions
####################################
160

161
162
163

@lru_cache(maxsize=len(meta_df) // 5)
def topic_mat_search(query_id: str, top_n: int = 10) -> pd.DataFrame:
164
165
166
    """
    Given row_idx, return top_n most similar rows using cosine similarity.
    """
167
    query_idx = _convert_arXiv_id_to_idx(query_id)
168
    query_cat_vector = meta_df.iloc[query_idx].category_bits
169
170
171
172
173
174
175
    search_space = [
        idx
        for idx in np.unique((query_cat_vector & category_mat).nonzero()[0])
        if idx != query_idx
    ]
    # Alternatively : ALL Categories must match
    # search_space = [idx for idx in np.where(
176
    #     (category_mat == query_cat_vector).all(axis=1)
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
    # )[0] if idx != query_idx]
    best_from_search_space = np.argpartition(
        doc_topic_mat[search_space, :].dot(doc_topic_mat[query_idx]), top_n,
    )[:top_n]
    return _get_metadata_batch(np.take(search_space, best_from_search_space))


@lru_cache(maxsize=len(meta_df) // 5)
def tfidf_search(query_id: str, top_n: int = 10) -> pd.DataFrame:
    """
    Performs search directly on tfidf matrix instead of topic matrix.

    TODO: The two search functions are actually almost identical now, the problem is 
    I want two LRU caches so I will have to refactor this all later.
    """
    query_idx = _convert_arXiv_id_to_idx(query_id)
    query_cat_vector = meta_df.iloc[query_idx]["category_bits"]
    search_space = [
        idx
        for idx in np.unique((query_cat_vector & category_mat).nonzero()[0])
        if idx != query_idx
    ]
    best_from_search_space = np.argpartition(
        doc_term_mat[search_space, :]
        .dot(doc_term_mat[query_idx].T)
        .toarray()
        .flatten(),
204
205
        top_n,
    )[:top_n]
206
    return _get_metadata_batch(np.take(search_space, best_from_search_space))