Commit 38c5ec5c authored by Lucas Moura's avatar Lucas Moura

Imported Upstream version 0.6.2

parent 97b700a5
This diff is collapsed.
......@@ -7,7 +7,6 @@ import datetime as dt
import logging
import os
import re
import shutil
import tarfile
import time
import xapian
......@@ -413,7 +412,7 @@ def main():
collect_user_preferences()
make_tarfile(LOG_PATH + '.tar.gz', LOG_PATH)
shutil.rmtree(LOG_PATH)
commands.getoutput('rm -rf {}'.format(LOG_PATH))
print "\n\nFinished: All files and recommendations were collected"
print "Collect data folder: {0}.tar.gz\n".format(LOG_PATH)
......
#!/usr/bin/env python
import apt
import commands
import re
import xapian
from sklearn.feature_extraction.stop_words import ENGLISH_STOP_WORDS
class PkgInitDecider():
"""
Class used to decide if a package can be considered to recommended to an
user or not.
"""
INVALID_PREFIXES = ['ruby', 'python', 'python3', 'golang', 'gir',
'texlive']
INVALID_SUFFIXES = ['examples', 'dbg', 'data', 'dev', 'utils', 'common',
'fonts']
def __init__(self):
self.cache = apt.Cache()
self.user_role_programs = self.get_user_role_programs()
def is_in_apt_cache(self, pkg):
return pkg in self.cache
def get_package_dependencies(self, pkg):
return [dep[0].name for dep in pkg.dependencies]
def get_user_installed_packages(self):
manual_installed = commands.getoutput('apt-mark showmanual')
return manual_installed.splitlines()
def get_user_role_programs(self):
user_pkgs = self.get_user_installed_packages()
user_programs = set()
for pkg in user_pkgs:
if pkg in self.cache:
pkg_candidate = self.cache[pkg].candidate
pkg_tags = pkg_candidate.record.get('Tag', None)
if not pkg_tags:
continue
if 'role::program' in pkg_tags:
user_programs.add(pkg)
return user_programs
def is_valid_dependency(self, pkg_tags, pkg_section):
tags_dep = 'role::program' in pkg_tags or 'devel::editor' in pkg_tags
section_dep = pkg_section == 'interpreters'
return tags_dep or section_dep
def is_program_dependencies_installed(self, pkg):
pkg_dependencies = self.get_package_dependencies(pkg)
dep_programs = set()
for dep in pkg_dependencies:
if dep in self.cache:
pkg = self.cache[dep].candidate
if pkg is not None:
pkg_tags = pkg.record.get('Tag', None)
pkg_section = pkg.section
if pkg_tags is None:
continue
is_valid_dependency = self.is_valid_dependency(
pkg_tags, pkg_section)
if is_valid_dependency:
dep_programs.add(dep)
return len(dep_programs - self.user_role_programs) == 0
def is_pkg_a_prefix_or_suffix(self, pkg):
splited_pkg = pkg.split('-')
if len(splited_pkg) == 1:
return False
pkg_prefix = splited_pkg[0]
for prefix in PkgInitDecider.INVALID_PREFIXES:
if pkg_prefix.startswith(prefix):
return True
for suffix in PkgInitDecider.INVALID_SUFFIXES:
if (splited_pkg[-1].endswith(suffix) or
splited_pkg[-2].endswith(suffix)):
return True
return False
def __call__(self, pkg):
if not self.is_in_apt_cache(pkg):
return False
pkg_candidate = self.cache[pkg].candidate
valid = (pkg_candidate and
self.is_program_dependencies_installed(pkg_candidate) and
not self.is_pkg_a_prefix_or_suffix(pkg))
return valid
class PkgMatchDecider(xapian.MatchDecider):
"""
......
#!/usr/bin/env python
"""
dissimilarity - python module for classes and methods related to similarity
measuring between two sets of data.
"""
__author__ = "Tassia Camoes Araujo <tassia@gmail.com>"
__copyright__ = "Copyright (C) 2011 Tassia Camoes Araujo"
__license__ = """
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import math
def norm(x):
"""
Return norm of numeric vector x.
"""
return math.sqrt(sum([x_i ** 2 for x_i in x]))
def dot_product(x, y):
"""
Return dot product of numeric vectors 'x' and 'y'.
"""
return sum([(x[i] * y[i]) for i in range(len(x))])
class Dissimilarity:
"""
Abstraction for different measures of dissimilarity between two sets or
vectors.
"""
class EuclidianDistance(Dissimilarity):
"""
Euclidian distance between two vectors.
"""
def __call__(self, x, y):
"""
Return euclidian distance between vectors 'x' and 'y'.
"""
sum_pow = sum([((x[i] - y[i]) ** 2) for i in range(len(x))])
return math.sqrt(sum_pow)
class CosineDissimilarity(Dissimilarity):
"""
Dissimilarity measure complementary to the cosine similarity
which is defined by the cosine of the angle between two vectors.
"""
def __call__(self, x, y):
"""
Return complement of the cosine of angle between vectors 'x' and 'y'.
"""
return 1 - (float(dot_product(x, y) / (norm(x) * norm(y))))
class JaccardDistance(Dissimilarity):
"""
Dissimilarity measure complentary to Jaccard Index which is defined by
the quantity of common values divided by the size of the two sets union.
"""
def __call__(self, x, y):
"""
Return Jaccard Index between sets 'x' and 'y'.
"""
common = [v for v in x if v in y]
return 1 - (float(len(common)) / (len(x) + len(y) - len(common)))
class DiffCoefficient(Dissimilarity):
"""
Measure the difference between the two sets in terms of how many items
should be added and removed from one set to transform it into the
other set. Similar to edit distance, but the items positions are not
relevant for sets.
"""
def __call__(self, x, y):
"""
Return the diff coeficient between sets 'x' and 'y'.
"""
add = [v for v in x if v not in y]
delete = [v for v in y if v not in x]
common = [v for v in x if v in y]
return float((len(add) + len(delete)) /
(len(x) + len(y) - len(common)))
......@@ -9,6 +9,7 @@ import shutil
import xapian
from apprecommender.config import Config
from apprecommender.decider import PkgInitDecider
class Initialize:
......@@ -24,6 +25,7 @@ class Initialize:
def __init__(self):
self.config = Config()
self.cache = apt.Cache()
self.pkg_init_decider = PkgInitDecider()
def get_tags(self):
command = "cat /var/lib/debtags/vocabulary" \
......@@ -39,9 +41,6 @@ class Initialize:
axi = xapian.Database(axi_path)
all_terms = set()
user_pkgs = self.get_user_installed_packages()
user_role_programs = self.get_user_role_programs(user_pkgs)
for n in range(1, axi.get_lastdocid()):
doc = 0
try:
......@@ -61,81 +60,11 @@ class Initialize:
if pkg_name.startswith('M'):
pkg_name = pkg_name.lstrip('M')
if pkg_name not in self.cache:
continue
pkg = self.cache[pkg_name].candidate
if not pkg or not self.is_section_valid(pkg.section):
continue
pkg_dependencies = self.get_package_dependencies(pkg)
is_dep_installed = self.is_program_dependencies_installed(
pkg_dependencies, user_role_programs)
if is_dep_installed:
if self.pkg_init_decider(pkg_name):
all_terms.add(pkg_name)
return all_terms
def is_section_valid(self, pkg_section):
if pkg_section == 'doc':
return False
return True
def is_valid_dependency(self, pkg_tags, pkg_section):
tags_dep = 'role::program' in pkg_tags or 'devel::editor' in pkg_tags
section_dep = pkg_section == 'interpreters'
return tags_dep or section_dep
def is_program_dependencies_installed(self, pkg_dependencies,
user_role_programs):
dep_programs = set()
for dep in pkg_dependencies:
if dep in self.cache:
pkg = self.cache[dep].candidate
if pkg is not None:
pkg_tags = pkg.record.get('Tag', None)
pkg_section = pkg.section
if pkg_tags is None:
continue
is_valid_dependency = self.is_valid_dependency(
pkg_tags, pkg_section)
if is_valid_dependency:
dep_programs.add(dep)
return len(dep_programs - user_role_programs) == 0
def get_package_dependencies(self, pkg):
return [dep[0].name for dep in pkg.dependencies]
def get_user_installed_packages(self):
manual_installed = commands.getoutput('apt-mark showmanual')
return manual_installed.splitlines()
def get_user_role_programs(self, user_pkgs):
user_programs = set()
for pkg in user_pkgs:
if pkg in self.cache:
pkg_candidate = self.cache[pkg].candidate
pkg_tags = pkg_candidate.record.get('Tag', None)
if not pkg_tags:
continue
if 'role::program' in pkg_tags:
user_programs.add(pkg)
return user_programs
def indexer_axi(self, axi_sample, filters_path, terms=[]):
axi_path = Initialize.DEFAULT_AXI_PATH
axi = xapian.Database(axi_path)
......
......@@ -145,35 +145,15 @@ class Recommender:
self.weight = xapian.TradWeight()
self.set_strategy(self.cfg.strategy)
def set_strategy(self, strategy_str, k=0, n=0):
def set_strategy(self, strategy_str, n=0):
"""
Set the recommendation strategy.
"""
if k:
k_neighbors = k
else:
k_neighbors = self.cfg.k_neighbors
if n:
profile_size = n
else:
profile_size = self.cfg.profile_size
logging.info("Setting recommender strategy to \'%s\'" % strategy_str)
# Check if collaborative strategies can be instanciated
if "knn" in strategy_str:
if not self.cfg.popcon:
logging.info("Cannot perform collaborative strategy")
return 1
# if self.cfg.pkgs_filter.split("/")[-1] == "desktopapps":
profile_size = n if n else self.cfg.profile_size
self.items_repository = self.axi_desktopapps
self.valid_pkgs = self.valid_desktopapps
if "knn" in strategy_str:
self.users_repository = self.popcon_desktopapps
# else:
# self.items_repository = self.axi_programs
# self.valid_pkgs = self.valid_programs
# if "knn" in strategy_str:
# self.users_repository = self.popcon_programs
# Set strategy based on strategy_str
logging.info("Setting recommender strategy to \'%s\'" % strategy_str)
if strategy_str == "cb":
self.strategy = strategy.ContentBased("mix", profile_size)
......@@ -191,12 +171,6 @@ class Recommender:
elif strategy_str == "mlbow":
self.strategy = strategy.MachineLearningBOW("mlbow_mix",
profile_size)
elif strategy_str == "mlbva_eset":
self.strategy = strategy.MachineLearningBVA("mlbva_mix_eset",
profile_size)
elif strategy_str == "mlbow_eset":
self.strategy = strategy.MachineLearningBOW("mlbow_mix_eset",
profile_size)
elif strategy_str == "cb_eset":
self.strategy = strategy.ContentBased("mix_eset", profile_size)
elif strategy_str == "cbt_eset":
......@@ -205,19 +179,12 @@ class Recommender:
self.strategy = strategy.ContentBased("desc_eset", profile_size)
elif strategy_str == "cbh_eset":
self.strategy = strategy.ContentBased("half_eset", profile_size)
elif strategy_str == "knn":
self.strategy = strategy.Knn(k_neighbors)
elif strategy_str == "knn_plus":
self.strategy = strategy.KnnPlus(k_neighbors)
elif strategy_str == "knn_eset":
self.strategy = strategy.KnnEset(k_neighbors)
elif strategy_str == "knnco":
self.strategy = strategy.KnnContent(k_neighbors)
elif strategy_str == "knnco_eset":
self.strategy = strategy.KnnContentEset(k_neighbors)
# [FIXME: fix repository instanciation]
# elif strategy_str.startswith("demo"):
# self.strategy = strategy.Demographic(strategy_str)
elif strategy_str == "mlbva_eset":
self.strategy = strategy.MachineLearningBVA("mlbva_mix_eset",
profile_size)
elif strategy_str == "mlbow_eset":
self.strategy = strategy.MachineLearningBOW("mlbow_mix_eset",
profile_size)
else:
logging.info("Strategy not defined.")
self.strategy = None
......
......@@ -21,10 +21,8 @@ __license__ = """
"""
import apt
import data
import logging
import operator
import os
import pickle
import re
import recommender
......@@ -35,13 +33,11 @@ import numpy as np
from abc import ABCMeta, abstractmethod
from os import path
from apprecommender.error import Error
from apprecommender.config import Config
from apprecommender.decider import PkgMatchDecider
from apprecommender.ml.bag_of_words import BagOfWords
from apprecommender.ml.bayes_matrix import BayesMatrix
from apprecommender.ml.data import MachineLearningData
from apprecommender.decider import (PkgMatchDecider, PkgExpandDecider,
TagExpandDecider)
XAPIAN_DATABASE_PATH = path.expanduser('~/.app-recommender/axi_desktopapps/')
USER_DATA_DIR = Config().user_data_dir
......@@ -110,253 +106,6 @@ class ContentBased(RecommendationStrategy):
return result
class Collaborative(RecommendationStrategy):
"""
Colaborative recommendation strategy.
"""
def get_user_profile(self, user, rec):
logging.debug("Composing user profile...")
profile = ["XP" + package for package in
user.filter_pkg_profile(rec.valid_pkgs)]
logging.debug(profile)
return profile
def get_enquire(self, rec):
enquire = xapian.Enquire(rec.users_repository)
enquire.set_weighting_scheme(rec.weight)
return enquire
# def get_rset_from_profile(self, profile):
# Create document to represent user profile and mark it as relevant
# return rset
def get_neighborhood(self, user, rec):
profile = self.get_user_profile(user, rec)
# query = xapian.Query(xapian.Query.OP_OR,profile)
query = xapian.Query(xapian.Query.OP_ELITE_SET, profile)
enquire = self.get_enquire(rec)
enquire.set_query(query)
# Retrieve matching users
try:
mset = enquire.get_mset(0, self.neighbours)
except xapian.DatabaseError as error:
error_msg = "Could not compose user neighborhood.\n "
logging.critical(error_msg + error.get_msg())
raise Error
return mset
def get_neighborhood_rset(self, user, rec):
mset = self.get_neighborhood(user, rec)
rset = xapian.RSet()
for m in mset:
rset.add_document(m.document.get_docid())
return rset
def get_result_from_eset(self, eset):
# compose result dictionary
item_score = {}
ranking = []
for e in eset:
package = e.term.lstrip("XP")
item_score[package] = e.weight
ranking.append(package)
return recommender.RecommendationResult(item_score, ranking)
class Knn(Collaborative):
"""
KNN based packages tf-idf weights.
"""
def __init__(self, k):
self.description = "Knn"
self.neighbours = k
def run(self, rec, user, recommendation_size):
"""
Perform recommendation strategy.
"""
neighborhood = self.get_neighborhood(user, rec)
weights = data.tfidf_weighting(rec.users_repository, neighborhood,
PkgExpandDecider(user.items()))
item_score = {}
ranking = []
for pkg in weights[:recommendation_size]:
package = pkg[0].lstrip("XP")
item_score[package] = pkg[1]
ranking.append(package)
result = recommender.RecommendationResult(item_score, ranking)
return result
class KnnPlus(Collaborative):
"""
KNN based packages tf-idf weights.
"""
def __init__(self, k):
self.description = "Knn plus"
self.neighbours = k
def run(self, rec, user, recommendation_size):
"""
Perform recommendation strategy.
"""
neighborhood = self.get_neighborhood(user, rec)
weights = data.tfidf_plus(rec.users_repository, neighborhood,
PkgExpandDecider(user.items()))
item_score = {}
ranking = []
for pkg in weights[:recommendation_size]:
package = pkg[0].lstrip("XP")
item_score[package] = pkg[1]
ranking.append(package)
result = recommender.RecommendationResult(item_score, ranking)
return result
class KnnEset(Collaborative):
"""
KNN based on query expansion.
"""
def __init__(self, k):
self.description = "KnnEset"
self.neighbours = k
def run(self, rec, user, recommendation_size):
"""
Perform recommendation strategy.
"""
neighbors_rset = self.get_neighborhood_rset(user, rec)
enquire = self.get_enquire(rec)
# Retrieve new packages based on neighborhood profile expansion
eset = enquire.get_eset(recommendation_size, neighbors_rset,
PkgExpandDecider(user.items()))
result = self.get_result_from_eset(eset)
return result
class CollaborativeEset(Collaborative):
"""
Colaborative strategy based on query expansion.
"""
def __init__(self):
self.description = "Collaborative-Eset"
def run(self, rec, user, recommendation_size):
"""
Perform recommendation strategy.
"""
temp_index = xapian.WritableDatabase("/tmp/Database",
xapian.DB_CREATE_OR_OVERWRITE)
profile = self.get_user_profile(user, rec)
doc = xapian.Document()
for pkg in profile:
doc.add_term(pkg)
doc.add_term("TO_BE_DELETED")
docid = temp_index.add_document(doc)
temp_index.add_database(rec.users_repository)
rset = xapian.RSet()
rset.add_document(docid)
# rset = self.get_rset_from_profile(profile)
enquire = xapian.Enquire(temp_index)
enquire.set_weighting_scheme(rec.weight)
eset = enquire.get_eset(recommendation_size, rset,
PkgExpandDecider(user.items()))
result = self.get_result_from_eset(eset)
return result
class KnnContent(Collaborative):
"""
Hybrid "Colaborative through content" recommendation strategy.
"""
def __init__(self, k):
self.description = "Knn-Content"
self.neighbours = k
def run(self, rec, user, recommendation_size):
"""
Perform recommendation strategy.
"""
neighborhood = self.get_neighborhood(user, rec)
weights = data.tfidf_weighting(rec.users_repository, neighborhood,
PkgExpandDecider(user.items()))
profile = [w[0] for w in weights][:rec.cfg.profile_size]
result = ContentBased("tag", rec.cfg.profile_size)
result = result.get_sugestion_from_profile(rec, user, profile,
recommendation_size)
return result
class KnnContentEset(Collaborative):
"""
Hybrid "Colaborative through content" recommendation strategy.
"""
def __init__(self, k):
self.description = "Knn-Content-Eset"
self.neighbours = k
def run(self, rec, user, recommendation_size):
"""
Perform recommendation strategy.
"""
neighbors_rset = self.get_neighborhood_rset(user, rec)
enquire = self.get_enquire(rec)
# Retrieve relevant tags based on neighborhood profile expansion
eset = enquire.get_eset(rec.cfg.profile_size, neighbors_rset,
TagExpandDecider())
profile = [e.term for e in eset]
result = ContentBased("tag", rec.cfg.profile_size)
result = result.get_sugestion_from_profile(rec, user, profile,
recommendation_size)
return result
class Demographic(RecommendationStrategy):
"""
Hybrid rotation strategy based on demographic data.
"""
def __init__(self, strategy_str):
self.description = "Demographic"
self.strategy_str = strategy_str.lstrip("demo_")
def run(self, rec, user, recommendation_size):
"""
Perform recommendation strategy.
"""
filters_dir = rec.cfg.filters_dir
program_dir = os.path.join(filters_dir, "programs")
desktop_dir = os.path.join(filters_dir, "desktopapps")
program_profile = user.filter_pkg_profile(program_dir)
desktop_profile = user.filter_pkg_profile(desktop_dir)
if len(desktop_profile) > 10 or (len(desktop_profile) >
len(program_profile) / 2):
rec.set_strategy(self.strategy_str)
# Redefine repositories after configuring strategy
rec.items_repository = rec.axi_desktopapps
rec.valid_pkgs = rec.valid_desktopapps
if "col" in self.strategy_str:
rec.users_repository = rec.popcon_desktopapps
return rec.get_recommendation(user, recommendation_size)
class MachineLearning(ContentBased):
__metaclass__ = ABCMeta
......
import unittest
from apprecommender.decider import PkgInitDecider
class PkgInitDeciderTests(unittest.TestCase):
def setUp(self):
self.pkg_init_decider = PkgInitDecider()
def test_python_pkg_regex(self):
pkg = 'python-test'
self.assertTrue(self.pkg_init_decider.is_pkg_a_prefix_or_suffix(pkg))