Commit 4142e6a4 authored by Alan Trick's avatar Alan Trick

Add the ability to filter/group related models in trackstats by-object

stats registrations
parent 6fa650f7
Pipeline #37460901 passed with stage
in 1 minute and 43 seconds
......@@ -12,4 +12,6 @@ build
AUTHORS
ChangeLog
coverage.xml
.eggs
\ No newline at end of file
.eggs
htmlcov
.mypy_cache
\ No newline at end of file
import collections
import django.http
import django.core.exceptions
import django.http
from django import forms
from django.conf.urls import url
from django.contrib import admin
......@@ -84,7 +84,7 @@ class ChartAdmin(admin.ModelAdmin):
'has_delete_permission': False,
'has_change_permission': True,
'has_absolute_url': False,
'opts': self.model._meta,
'opts': getattr(self.model, '_meta'),
'chart': chart,
'header': chart_header,
'rows': chart_rows,
......
......@@ -60,8 +60,6 @@ class QuerySpec:
:param group_text: Text for the group by query
:param filter_text: Text for the filter query
"""
if '&' in axis_text:
raise ValueError('Multiple filters (&) not supported in axis text')
if not axis_text:
axis_text = 'id:count'
self.axis_parts = [QuerySpecPart(part, False)
......@@ -141,10 +139,16 @@ class QuerySpecPart:
col_func, self.value = (urllib.parse.unquote(part) for part
in text.split('=', 1))
self.col, func = (col_func.split(':', 1)
if ':' in col_func else (col_func, ''))
func_name, self.func_lookup = (func.split('__', 1) if '__' in func
else (func, ''))
if ':' in col_func:
self.col, func = col_func.split(':', 1)
else:
self.col, func = col_func, ''
if '__' in func:
func_name, self.func_lookup = func.split('__', 1)
else:
func_name, self.func_lookup = func, ''
if func_name == '':
self.func = None
elif func_name not in self.FUNCS:
......
from .base import Registry, Registration
from .model import ModelRegistration
REGISTRY = Registry()
def register(registration: Registration):
REGISTRY.register(registration)
def register_model(model):
register(ModelRegistration(model))
def register_metric(metric, cls=None):
from .trackstats import MetricRegistration
register(MetricRegistration(metric, cls))
def get_registry_ids():
return REGISTRY.keys()
import abc
import collections
import typing
from datetime import date
from typing import Mapping
import django.db.models.functions as db_functions
from django.contrib.contenttypes.models import ContentType
import django.db.models as db_models
import django.db.models.functions as db_functions
from django_adminstats import Step
from django_adminstats.qs import QuerySpec
from .qs import QuerySpec
from . import Step
class Registration(metaclass=abc.ABCMeta):
date_field = 'date'
class Registration:
@property
@abc.abstractmethod
def key(self) -> str:
"""A unique string for this statistics registration"""
...
date_field = 'date'
@property
@abc.abstractmethod
def label(self) -> str:
"""A human-readable name for these statistics"""
...
@abc.abstractmethod
def get_queryset(self):
raise NotImplementedError
...
def get_span_queryset(self, start, end, _period_step):
qs = self.get_queryset().annotate(
......@@ -51,11 +62,12 @@ class Registration:
def get_data(self, start: date, end: date,
period_step: Step, axis_text: str, group_text: str,
filter_text: str) -> Mapping[str, Mapping[date, str]]:
filter_text: str) -> typing.Mapping[
str, typing.Mapping[date, str]]:
query_spec = QuerySpec(axis_text=axis_text, group_text=group_text,
filter_text=filter_text)
qs = self.query(start, end, period_step, query_spec)
result = {}
result = {} # type: typing.Dict[str, typing.Dict[date, str]]
group_fields = ['_django_adminstats_group_{}'.format(index)
for index in range(len(query_spec.group_parts))]
axis_fields = ['_django_adminstats_axis_{}'.format(index)
......@@ -90,141 +102,3 @@ class Registry(collections.OrderedDict):
return stats.get_data(start, end, Step(criteria.chart.period_step),
criteria.axis_spec, criteria.group_spec,
criteria.filter_spec)
class ModelRegistration(Registration):
def __init__(self, model):
self.model = model
self.meta = getattr(self.model, '_meta')
if not self.meta:
raise RuntimeError(
'Model {} is missing Meta class'.format(self.model))
def get_queryset(self):
return self.model.objects
@property
def key(self):
return '{}.{}'.format(self.meta.app_label, self.meta.model_name)
@property
def label(self):
return self.meta.verbose_name_plural.title()
def _filter(self, cls):
"""Given an individual filter token and a model, find the next model"""
# first check if _meta (model)
meta = getattr(cls, '_meta', None)
if meta is not None:
ff = getattr(meta, '_forward_fields_map', None)
if ff is not None:
return ff
return dict((f.name, f) for f in meta.fields)
# check if it's a ForeignKey or something
related = getattr(cls, 'related_model', None)
if related is not None:
return self._filter(related)
raise NotImplementedError
def filter_options(self, text: str = '') -> set:
"""Given a filter string, return the next options"""
cls = self.model
options = self._filter(cls)
for part in text.split('__'):
if part in options:
cls = options[part]
options = self._filter(cls)
return set(options.keys())
def validate_filter(self, text: str):
cls = self.model
sub_parts = text.split('__')
cls = self._filter(sub_parts[0], cls)
for sub_part in sub_parts[1:]:
pass
pass
class MetricRegistration(Registration):
def __init__(self, metric, model=None):
# metric is probably a lazy object
self.metric = metric
self.model = model
lazy_func = getattr(metric, '_setupfunc')
if lazy_func:
contents = lazy_func.__closure__[1].cell_contents
self.ref = contents['ref']
domain = contents['domain']
lazy_func = getattr(domain, '_setupfunc')
if lazy_func:
contents = lazy_func.__closure__[1].cell_contents
self.domain_ref = contents['ref']
else:
self.domain_ref = metric.domain.ref
else:
self.ref = metric.ref
@property
def key(self):
if self.model is None:
return 'trackstats:{}.{}'.format(self.domain_ref, self.ref)
meta = getattr(self.model, '_meta')
return 'trackstats:{}.{}:{}'.format(self.domain_ref, self.ref,
meta.label_lower)
@property
def label(self):
return '{} {} Stats'.format(self.domain_ref, self.ref).title()
def get_queryset(self):
from trackstats.models import StatisticByDate, StatisticByDateAndObject
if self.model is None:
return StatisticByDate.objects.filter(metric=self.metric)
object_type = ContentType.objects.get_for_model(self.model)
return StatisticByDateAndObject.objects.filter(
metric=self.metric, object_type=object_type)
def get_span_queryset(self, start: date, end: date,
period_step: Step):
from trackstats.models import Period
kwargs = {self.date_field + '__gte': start,
self.date_field + '__lt': end}
if period_step == Step.DAY:
kwargs['period'] = Period.DAY
elif period_step == Step.MONTH:
kwargs['period'] == Period.MONTH
else:
# trackstats doesn't actually support year
kwargs['period'] = Period.LIFETIME
return self.get_queryset().filter(**kwargs)
def get_data(self, start: date, end: date,
period_step: Step, axis_text: str, group_text: str,
filter_text: str) -> Mapping[str, Mapping[date, str]]:
if axis_text != '':
raise ValueError('Axis not supported for tracstats')
if group_text != '':
raise ValueError('Group not supported for tracstats')
return super().get_data(start, end, period_step, 'value',
'object_id' if self.model else '', filter_text)
REGISTRY = Registry()
def register(registration: Registration):
REGISTRY.register(registration)
def register_model(model):
register(ModelRegistration(model))
def register_metric(metric, cls=None):
register(MetricRegistration(metric, cls))
def get_registry_ids():
return REGISTRY.keys()
from . import Registration
class ModelRegistration(Registration):
def __init__(self, model):
self.model = model
self.meta = getattr(self.model, '_meta')
def get_queryset(self):
return self.model.objects
@property
def key(self):
return '{}.{}'.format(self.meta.app_label, self.meta.model_name)
@property
def label(self):
return self.meta.verbose_name_plural.title()
def _filter(self, cls):
"""Given an individual filter token and a model, find the next model"""
# first check if _meta (model)
meta = getattr(cls, '_meta', None)
if meta is not None:
ff = getattr(meta, '_forward_fields_map', None)
if ff is not None:
return ff
return dict((f.name, f) for f in meta.fields)
# check if it's a ForeignKey or something
related = getattr(cls, 'related_model', None)
if related is not None:
return self._filter(related)
raise NotImplementedError
def filter_options(self, text: str = '') -> set:
"""Given a filter string, return the next options"""
cls = self.model
options = self._filter(cls)
for part in text.split('__'):
if part in options:
cls = options[part]
options = self._filter(cls)
return set(options.keys())
from datetime import date
from typing import Mapping
import django.apps
import django.db.models as models
import trackstats.models
from django.contrib.contenttypes.models import ContentType
from django_adminstats import Step
from django_adminstats.qs import QuerySpec
from .base import Registration
class MetricRegistration(Registration):
def __init__(self, metric, model=None):
# metric is probably a lazy object
self.metric = metric
self.model = model
lazy_func = getattr(metric, '_setupfunc')
if lazy_func:
contents = lazy_func.__closure__[1].cell_contents
self.ref = contents['ref']
domain = contents['domain']
lazy_func = getattr(domain, '_setupfunc')
if lazy_func:
contents = lazy_func.__closure__[1].cell_contents
self.domain_ref = contents['ref']
else:
self.domain_ref = metric.domain.ref
else:
self.ref = metric.ref
@property
def key(self):
if self.model is None:
return 'trackstats:{}.{}'.format(self.domain_ref, self.ref)
meta = getattr(self.model, '_meta')
return 'trackstats:{}.{}:{}'.format(self.domain_ref, self.ref,
meta.label_lower)
@property
def label(self):
if self.model is None:
return '{} {} Stats'.format(
self.domain_ref, self.ref).title()
meta = getattr(self.model, '_meta')
return '{} {} ({}) Stats'.format(
self.domain_ref, self.ref, meta.verbose_name).title()
def get_queryset(self):
if self.model is None:
return trackstats.models.StatisticByDate.objects.filter(
metric=self.metric)
object_type = ContentType.objects.get_for_model(self.model)
meta = getattr(trackstats.models.StatisticByDateAndObject, '_meta')
cls_name = '_fake_StatisticByDateAndObject_{}'.format(
self.model.__name__).lower()
all_models = django.apps.apps.all_models['django_adminstats']
if cls_name in all_models:
new_class = all_models[cls_name]
else:
class FakeMeta:
app_label = 'django_adminstats'
# app_label = meta.app_label
db_table = meta.db_table
verbose_name = meta.verbose_name
verbose_name_plural = meta.verbose_name_plural
new_class = type(cls_name, (
trackstats.models.ByDateMixin,
trackstats.models.AbstractStatistic),
{
'__module__': '__ignore__',
'Meta': FakeMeta,
'object_type': models.ForeignKey(
ContentType, on_delete=models.PROTECT),
'object_id': models.PositiveIntegerField(),
'object': models.ForeignKey(
to=self.model, on_delete=models.PROTECT)
})
qs = new_class.objects.filter(
metric=self.metric, object_type=object_type)
return qs
def get_span_queryset(self, start: date, end: date,
period_step: Step):
kwargs = {self.date_field + '__gte': start,
self.date_field + '__lt': end}
if period_step == Step.DAY:
kwargs['period'] = trackstats.models.Period.DAY
elif period_step == Step.MONTH:
kwargs['period'] == trackstats.models.Period.MONTH
else:
# trackstats doesn't actually support year
kwargs['period'] = trackstats.models.Period.LIFETIME
return self.get_queryset().filter(**kwargs)
def query(self, start: date, end: date,
period_step: Step, query_spec: QuerySpec):
qs = self.get_span_queryset(start, end, period_step)
if self.model:
x_annotations, x_value = self.get_x_parameters(period_step)
return query_spec.update_queryset(
qs, x_annotations=x_annotations, x_value=x_value)
x_annotations, x_value = self.get_x_parameters(period_step)
return query_spec.update_queryset(
qs, x_annotations=x_annotations, x_value=x_value)
def get_data(self, start: date, end: date,
period_step: Step, axis_text: str, group_text: str,
filter_text: str) -> Mapping[str, Mapping[date, str]]:
if axis_text != '':
raise ValueError('Axis not supported for tracstats')
if not group_text:
group_text = 'object_id' if self.model else ''
return super().get_data(start, end, period_step, 'value',
group_text, filter_text)
from django_adminstats import registry
from . import models
......
import unittest.mock
from datetime import date, datetime
from django_adminstats import Step, models
@unittest.mock.patch('django.utils.timezone.now')
def test_chart_get_end_date(mock_now):
mock_now.return_value = datetime(2005, 2, 10)
expected_results = (
# note that the end date is the date after
# the last one
('t', Step.DAY.value, date(2005, 2, 11)),
('y', Step.DAY.value, date(2005, 2, 10)),
('t', Step.MONTH.value, date(2005, 3, 1)),
('y', Step.MONTH.value, date(2005, 2, 1)),
('t', Step.YEAR.value, date(2006, 1, 1)),
('y', Step.YEAR.value, date(2005, 1, 1)),
)
for until_type, step_type, end_date in expected_results:
chart = models.Chart(until_type=until_type, period_step=step_type)
assert end_date == chart.get_end_date()
# special check for month overflow
mock_now.return_value = datetime(2005, 12, 10)
chart = models.Chart(until_type='t', period_step=Step.MONTH.value)
assert date(2006, 1, 1) == chart.get_end_date()
def test_chart_move_delta():
now = date(2005, 2, 10)
expected_results = (
# note that the end date is the date after
# the last one
(0, Step.DAY.value, date(2005, 2, 10)),
(30, Step.DAY.value, date(2005, 1, 11)),
(0, Step.MONTH.value, date(2005, 2, 10)),
(30, Step.MONTH.value, date(2002, 8, 10)),
(0, Step.YEAR.value, date(2005, 2, 10)),
(1, Step.YEAR.value, date(2004, 2, 10)),
)
for num, step_type, start_date in expected_results:
chart = models.Chart(period_step=step_type)
assert start_date == now - chart.move_delta(num)
def test_criteria_stats_key():
criteria = models.Criteria(stats_key='foo')
assert 'foo' == str(criteria)
......@@ -2,7 +2,8 @@ from datetime import date
from django.test import TestCase
from django_adminstats import registry, Step
from django_adminstats import Step, registry
from . import models
......@@ -76,16 +77,31 @@ class QueryTrackstatsDemoTests(TestCase):
@staticmethod
def test_group_by_path():
reg = registry.REGISTRY['trackstats:page.view:tests.page']
expected = {'1': {
expected = {'/': {
date(2018, 11, 6): '1',
date(2018, 11, 7): '2',
date(2018, 11, 8): '3',
}, '2': {
}, '/george': {
date(2018, 11, 6): '1',
}}
# group_text='group_path' not supported
actual = reg.get_data(
start=date(2018, 11, 1), end=date(2018, 11, 12),
period_step=Step.DAY, axis_text='', group_text='',
filter_text='')
period_step=Step.DAY, axis_text='',
group_text='object__path', filter_text='')
assert expected == actual
@staticmethod
def test_multi_group_by():
reg = registry.REGISTRY['trackstats:page.view:tests.page']
expected = {'/ / 1': {
date(2018, 11, 6): '1',
date(2018, 11, 7): '2',
date(2018, 11, 8): '3',
}, '/george / 2': {
date(2018, 11, 6): '1',
}}
actual = reg.get_data(
start=date(2018, 11, 1), end=date(2018, 11, 12),
period_step=Step.DAY, axis_text='',
group_text='object__path&object__id', filter_text='')
assert expected == actual
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment