...
 
Commits (5)
import gym
import gym_shopping_cart
import numpy as np
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
env = gym.make("ShoppingCart-v0")
episode_over = False
......
......@@ -3,6 +3,7 @@ import tarfile
import time
from pathlib import Path
from typing import List
from functools import lru_cache
import numpy as np
import pandas as pd
......@@ -11,23 +12,27 @@ LOGGER = logging.getLogger(__name__)
class InstacartData:
RAW_N_PRODUCTS = 49383
RAW_N_PRODUCTS = 50000
MAX_DAYS_SINCE_PRIOR = (
30
) # From https://gist.github.com/jeremystan/c3b39d947d9b88b3ccff3147dbcf6c6b
N_OBSERVATIONS = 49416
RAW_N_OBSERVATIONS = 32
def __init__(self, gz_file: Path, max_products=None):
def __init__(self, gz_file: Path, max_products=None, cache_data=True):
self.cache_data = cache_data
if self.cache_data:
LOGGER.info("Caching data in memory")
self.directory = gz_file.parent / "instacart_2017_05_01"
self.max_products = max_products
if not self.directory.exists():
LOGGER.info("Extracting data from {} to {}".format(gz_file, self.directory))
with tarfile.open(gz_file, "r:gz") as tar:
tar.extractall(path=gz_file.parent)
if self.directory.exists():
LOGGER.warning("Overwriting {}".format(self.directory))
LOGGER.info("Extracting data from {} to {}".format(gz_file, self.directory))
with tarfile.open(gz_file, "r:gz") as tar:
tar.extractall(path=gz_file.parent)
def orders_for_user(self, id: np.uint32 = None) -> np.ndarray:
raw = self._raw_orders_for_user(id)
return self._format(raw)
return self._format_orders(raw), self._format_purchases(raw)
@staticmethod
def _encode(
......@@ -47,26 +52,25 @@ class InstacartData:
key + "_"
)
@staticmethod
def _common_products(data: pd.DataFrame) -> pd.DataFrame:
@lru_cache(maxsize=None)
def _common_products(self) -> pd.DataFrame:
return (
data[["product_id", "order_id"]]
.groupby(by="product_id")
.count()
.sort_values(by="order_id", ascending=False)
.reset_index()["product_id"]
(
self._prior_products()[["product_id", "order_id"]]
.groupby(by="product_id")
.count()
.sort_values(by="order_id", ascending=False)
.reset_index()["product_id"]
)
.head(self.max_products)
.values
)
@staticmethod
def _filter_common_products(
data: pd.DataFrame, max_products: int = None
) -> pd.DataFrame:
s = InstacartData._common_products(data).head(max_products)
return data[data["product_id"].isin(s)]
def _format(self, data: pd.DataFrame) -> pd.DataFrame:
def _format_purchases(self, data: pd.DataFrame) -> pd.DataFrame:
LOGGER.info(
"Formatting data with {} orders".format(len(data["order_number"].unique()))
"Formatting purchaces with {} orders".format(
len(data["order_number"].unique())
)
)
grouped = data[
......@@ -81,15 +85,36 @@ class InstacartData:
# One-hot encode product numbers for each order
encoded_products = InstacartData._encode(
grouped, InstacartData.RAW_N_PRODUCTS, "product_id", zero_indexed=False
grouped, self.RAW_N_PRODUCTS, "product_id", zero_indexed=False
)
# Remove all but the most popular products
if self.max_products is not None:
common_products = InstacartData._common_products(data).head(
self.max_products
LOGGER.info(
"Returning the {} most common products".format(self.max_products)
)
encoded_products = encoded_products[
encoded_products.columns[self._common_products()]
]
return encoded_products.sort_index()
def _format_orders(self, data: pd.DataFrame) -> pd.DataFrame:
LOGGER.info(
"Formatting orders with {} orders".format(
len(data["order_number"].unique())
)
cols = encoded_products.columns[common_products.values.tolist()]
encoded_products = encoded_products[cols]
)
grouped = data[
[
"order_number",
"order_dow",
"order_hour_of_day",
"days_since_prior_order",
"product_id",
]
].groupby("order_number")
# One-hot encode days of the week
encoded_dow = InstacartData._encode(grouped, 7, "order_dow", zero_indexed=True)
......@@ -107,13 +132,12 @@ class InstacartData:
) / InstacartData.MAX_DAYS_SINCE_PRIOR
# Merge other features with product encoding
res = pd.concat(
[encoded_products, encoded_dow, encoded_hod, encoded_days_since], axis=1
)
res = pd.concat([encoded_dow, encoded_hod, encoded_days_since], axis=1)
res = res.fillna(0)
return res.sort_index()
def _raw_orders_for_user(self, id: np.uint32 = None) -> pd.DataFrame:
@lru_cache(maxsize=None)
def _orders(self) -> pd.DataFrame:
LOGGER.info("Loading order data")
start = time.process_time()
orders_df = pd.read_csv(
......@@ -128,25 +152,21 @@ class InstacartData:
"eval_set": np.str,
},
)
# Remove all data that is not in the "prior" dataset
orders_df = orders_df[orders_df["eval_set"] == "prior"]
if id is None:
g = orders_df.groupby(by=["user_id"])["order_id"].count()
g = g[g >= 50]
large_user_ids = g.index
id = np.random.choice(large_user_ids)
LOGGER.info("Loading data for user {}".format(id))
orders_df = orders_df[orders_df["user_id"] == id]
# Remove users with small numbers of orders (to reduce data size)
orders_df = orders_df[
orders_df["user_id"].map(orders_df["user_id"].value_counts()) >= 50
]
LOGGER.debug("Took {:0.2f} s".format(time.process_time() - start))
return orders_df
n_orders = len(orders_df.groupby(by="order_id"))
if n_orders < 50:
LOGGER.warning("This user only has {} orders".format(n_orders))
@lru_cache(maxsize=None)
def _prior_products(self) -> pd.DataFrame:
LOGGER.info("Loading prior orders")
start = time.process_time()
order_products_prior_df = pd.read_csv(
prior_products = pd.read_csv(
self.directory / "order_products__prior.csv",
dtype={
"order_id": np.uint32,
......@@ -156,10 +176,13 @@ class InstacartData:
},
)
LOGGER.debug("Took {:0.2f} s".format(time.process_time() - start))
return prior_products
@lru_cache(maxsize=None)
def _products(self) -> pd.DataFrame:
LOGGER.info("Loading products")
start = time.process_time()
products_df = pd.read_csv(
products = pd.read_csv(
self.directory / "products.csv",
dtype={
"aisle_id": np.uint8,
......@@ -169,16 +192,36 @@ class InstacartData:
},
).drop(["product_name"], axis=1)
LOGGER.debug("Took {:0.2f} s".format(time.process_time() - start))
return products
@lru_cache(maxsize=None)
def _merged_data(self) -> pd.DataFrame:
prior_products_df = self._prior_products()
orders_df = self._orders()
LOGGER.info("Joining data")
start = time.process_time()
df_prior = pd.merge(
orders_df, order_products_prior_df, how="left", on="order_id"
)
df_prior = pd.merge(df_prior, products_df, how="left", on="product_id")
df_prior = pd.merge(orders_df, prior_products_df, how="left", on="order_id")
df_prior = pd.merge(df_prior, self._products(), how="left", on="product_id")
LOGGER.debug("Took {:0.2f} s".format(time.process_time() - start))
return df_prior
def _raw_orders_for_user(self, id: np.uint32 = None) -> pd.DataFrame:
data = self._merged_data()
if not self.cache_data:
self._orders.cache_clear()
self._prior_products.cache_clear()
self._products.cache_clear()
self._merged_data.cache_clear()
if id is None:
g = data.groupby(by=["user_id"])["order_id"].count()
id = np.random.choice(g.index)
LOGGER.info("Loading data for user {}".format(id))
# df_prior = df_prior[df_prior["user_id"] == id]
return data[data["user_id"] == id]
def product_str(self, id: int) -> str:
df = pd.read_csv(
self.directory / "products.csv",
......@@ -196,10 +239,14 @@ class InstacartData:
raise ValueError("Unknown product id")
def columns(self) -> List[str]:
return self.orders_for_user().columns
_, purchases = self.orders_for_user()
return purchases.columns
def n_products(self) -> int:
if self.max_products is not None:
return self.max_products
if self.max_products is None:
return InstacartData.RAW_N_PRODUCTS + 1 # Because of non-zero indexing
else:
return InstacartData.RAW_N_PRODUCTS
return self.max_products
def n_observations(self) -> int:
return InstacartData.RAW_N_OBSERVATIONS
from gym_shopping_cart.envs.shopping_cart_v0 import ShoppingCart
from gym_shopping_cart.envs.shopping_cart_v0 import SimpleShoppingCart
from gym_shopping_cart.envs.shopping_cart_v0 import ShoppingCart
......@@ -7,18 +7,6 @@ import pandas as pd
from gym_shopping_cart.data.parser import InstacartData
def F1_score(labels: np.array, predicted: np.array) -> float:
tp = (labels.astype(bool) & predicted.astype(bool)).sum()
fp = predicted.sum() - tp
fn = labels.sum() - tp
if tp > 0:
precision = tp / (tp + fp)
recall = float(tp) / (tp + fn)
return 2 * ((precision * recall) / (precision + recall))
else:
return 0
class ShoppingCart(gym.Env):
"""
Simulates real customer product purchases using the Instacart dataset.
......@@ -34,7 +22,7 @@ class ShoppingCart(gym.Env):
State:
The state comprises of:
[products bought in previous shop, day of the week, hour of the day, days since last order]
[day of the week, hour of the day, days since last order]
All values have a range of 0.0-1.0.
......@@ -44,22 +32,32 @@ class ShoppingCart(gym.Env):
A vector of length N, where N are the total number of products in the catalogue.
Reward:
F1-score over all products
+1 for a correctly ordered product (true positive). -1 for an incorrectly ordered product (false positive).
"""
metadata = {"render.modes": [""]}
def __init__(self, data: InstacartData = None, user_id: int = None):
"""
data: an instance of the class representing the Instacart Data. Default: some test data from a single customer
user_id: only use data from a specific customer. Default: a random customer
"""
if data is None:
data = self.test_data()
self.data = data
self.user_id = user_id
self.action_space = gym.spaces.MultiBinary(self.data.n_products())
self.observation_space = gym.spaces.Box(
0.0, 1.0, shape=(InstacartData.N_OBSERVATIONS,), dtype=np.float32
)
self.reset()
@property
def action_space(self):
return gym.spaces.MultiBinary(self.data.n_products())
@property
def observation_space(self):
return gym.spaces.Box(
0.0, 1.0, shape=(self.data.n_observations(),), dtype=np.float32
)
def test_data(self) -> InstacartData:
return get_test_data()
......@@ -68,7 +66,7 @@ class ShoppingCart(gym.Env):
next_observation = self._get_observation()
# Get reward
reward = self._reward(next_observation, action)
reward = self._reward(action)
# Check if this is the end of the batch
done = bool(self._order_number > self._n_orders)
......@@ -81,15 +79,23 @@ class ShoppingCart(gym.Env):
self._order_number += 1
return obs
def _reward(self, obs: np.ndarray, action: np.ndarray) -> float:
def _reward(self, action: np.ndarray) -> float:
# Pull out the products ordered
ordered_products = obs[: self.data.n_products()]
assert len(ordered_products) == len(action)
# The reward is the F1-score
return F1_score(ordered_products, action)
previous_purchased_products = self._purchase_data.loc[
[self._order_number - 1]
].to_numpy()[0, :]
if len(previous_purchased_products) != len(action):
raise ValueError(
"Provided action vector ({}) is not the same size as the customer's purchased products ({}).".format(
len(action), len(previous_purchased_products)
)
)
tp = (previous_purchased_products.astype(bool) & action.astype(bool)).sum()
fp = action.astype(bool).sum() - tp
return tp - fp
def reset(self) -> np.ndarray:
self._user_data = self.data.orders_for_user(self.user_id)
self._user_data, self._purchase_data = self.data.orders_for_user(self.user_id)
self._n_orders = self._user_data.index.max()
self._order_number = self._user_data.index.min()
return self._get_observation()
......@@ -103,11 +109,13 @@ class ShoppingCart(gym.Env):
class SimpleShoppingCart(ShoppingCart):
"""
Exactly the same as ShoppingCart except I limit the number of products to the 20 most popular
Exactly the same as ShoppingCart except I limit the number of products to the DEFAULT_MAX_PRODUCTS most popular
"""
DEFAULT_MAX_PRODUCTS = 25
def test_data(self) -> InstacartData:
return get_test_data(max_products=20)
return get_test_data(max_products=SimpleShoppingCart.DEFAULT_MAX_PRODUCTS)
def get_test_data(max_products: int = None) -> InstacartData:
......
import setuptools
import os
if os.environ.get("CI_COMMIT_TAG"):
version = os.environ["CI_COMMIT_TAG"]
else:
version = os.environ["CI_JOB_ID"]
version = os.getenv("CI_COMMIT_TAG", os.getenv("CI_JOB_ID", "SNAPSHOT"))
# read the contents of your README file
from os import path
......
from pathlib import Path
import time
import numpy as np
import pandas as pd
......@@ -24,16 +25,15 @@ def test_parse_instacart_data():
data = InstacartData(
gz_file=this_dir / ".." / "gym_shopping_cart" / "data" / "test_data.tar.gz"
)
res = data.orders_for_user()
res, _ = data.orders_for_user()
assert res is not None
assert isinstance(res, pd.DataFrame)
assert res.loc[33].shape[0] == InstacartData.N_OBSERVATIONS
assert res.loc[33].shape[0] == data.n_observations()
assert res.loc[33]["order_dow_3"] == 1
assert res.loc[33]["order_hour_of_day_12"] == 1
assert res.loc[1]["days_since_prior_order"] == 0 # NOT nan.
assert res.loc[2].shape[0] == InstacartData.N_OBSERVATIONS
assert res.loc[2].shape[0] == data.n_observations()
assert res.loc[2]["order_dow_1"] == 1
assert res.loc[2]["product_id_9637"] == 1
assert res.loc[2]["order_hour_of_day_13"] == 1
np.testing.assert_almost_equal(
res.loc[2]["days_since_prior_order"],
......@@ -60,15 +60,21 @@ def test_chicken_in_right_place():
assert data.product_str(chicken_id) == "Boneless Skinless Chicken Breast"
def test_filter_common_products():
df = pd.DataFrame(
data={"product_id": [3, 3, 3, 2, 2, 5], "order_id": [1, 2, 3, 4, 5, 6]}
def test_caching():
data = InstacartData(
gz_file=this_dir / ".." / "gym_shopping_cart" / "data" / "test_data.tar.gz",
cache_data=False,
)
start = time.process_time()
[data._raw_orders_for_user() for i in range(10)]
total_no_cache = time.process_time() - start
data = InstacartData(
gz_file=this_dir / ".." / "gym_shopping_cart" / "data" / "test_data.tar.gz",
cache_data=True,
)
res = InstacartData._common_products(data=df)
assert res.loc[0] == 3
assert res.loc[1] == 2
assert res.loc[2] == 5
start = time.process_time()
[data._raw_orders_for_user() for i in range(10)]
total_cache = time.process_time() - start
res = InstacartData._filter_common_products(data=df, max_products=1)
assert len(res) == 3
assert res.loc[0]["product_id"] == 3
assert total_cache < total_no_cache / 2
......@@ -4,26 +4,9 @@ import gym
import numpy as np
import gym_shopping_cart
from gym_shopping_cart.envs.shopping_cart_v0 import F1_score
from gym_shopping_cart.data.parser import InstacartData
def test_f1_score():
labels = np.array([0, 1, 0, 1])
predicted = np.array([0, 1, 0, 1])
res = F1_score(labels, predicted)
assert res == 1.0
predicted = np.array([1, 0, 1, 0])
res = F1_score(labels, predicted)
assert res == 0.0
predicted = np.array([0, 1, 1, 0])
res = F1_score(labels, predicted)
assert res == 0.5
predicted = np.array([0, 0, 0, 1])
res = F1_score(labels, predicted)
assert res == 2 / 3
def test_registration():
env = gym.make("ShoppingCart-v0")
assert env != None
......@@ -59,7 +42,27 @@ def test_correct_reward():
action = np.zeros((env.data.n_products(),))
action[[8518, 9637, 14651, 37188, 45807, 46782]] = 1
_, reward, _, _ = env.step(action)
assert reward == 1
assert reward == 6
env.reset()
_, reward, _, _ = env.step(action - 1)
_, reward, _, _ = env.step(np.zeros((env.data.n_products(),)))
assert reward == 0
action = np.ones((env.data.n_products(),))
_, reward, _, _ = env.step(action)
assert reward == -49979.0
def test_swap_data_class():
env = gym.make("ShoppingCart-v0")
current_directory = pathlib.Path(__file__).parent
instacart_data = InstacartData(
gz_file=current_directory
/ ".."
/ "gym_shopping_cart"
/ "data"
/ "test_data.tar.gz",
max_products=2,
)
env.data = instacart_data
env.reset() # Have to reset the environment when you change the data source
env.step(np.ones(instacart_data.n_products()))
env.step(np.ones(instacart_data.n_products()))
from gym_shopping_cart.envs.shopping_cart_v0 import SimpleShoppingCart
from gym_shopping_cart.data.parser import InstacartData
import gym
import numpy as np
......@@ -8,4 +10,4 @@ def test_simplified_shopping_cart():
env = gym.make("SimpleShoppingCart-v0")
state, _, _, _ = env.step(env.action_space.sample())
assert isinstance(state, np.ndarray)
assert state.shape[0] == 52
assert state.shape[0] == env.observation_space.shape[0]