Commit f3fdeb27 authored by Alan Trick's avatar Alan Trick

Add autocomplete for query fields in admin, and rename query fields

parent 180e4085
Pipeline #38461620 passed with stage
in 1 minute and 32 seconds
......@@ -52,7 +52,7 @@ of it looks like this:
currency = models.Foreignkey(to=Currency)
date = models.DateTimeField(auto_now_add=True)
django_adminstats.register_model(Transaction)
register_model(Transaction)
By default, we look for a field called ‘date’ on the model, and it should be
......@@ -76,19 +76,19 @@ Creating Charts
You can add charts in the admin. In order for the chart to show anything, you
need to add criteria. By default, it will just show a count of all the items
charted by the date field, if you to change this, you need to add things in
the filter spec, axis spec, and group spec fields.
the filter query, axis query, and group query fields.
First The content of these fields is formatted like a URL querystring,
for example a filter spec of ``message=Hello%20World&x=y`` is equivalent to
First, the content of these fields is formatted like a URL querystring,
for example a filter query of ``message=Hello%20World&x=y`` is equivalent to
``.filter(message='Hello World', x='y')``. Note that you only use the
``key0=value0&key1=value1`` form in the filter spec, the axis and group
specs are just ``key0&key2``.
``key0=value0&key1=value1`` form in the filter query, the axis and group
queries are just ``key0&key2``.
Second, you can use lookups and relations just like in a normal django
query (e.g. ``field__gt=2`` or ``field__related_field``).
Finally, you can also specify functions to use on the field by doing
``field:function``. For example ``id:count`` is the default axis spec when
``field:function``. For example ``id:count`` is the default axis query when
the field is left blank.
......
......@@ -3,7 +3,10 @@ import typing
import django.core.exceptions
import django.http
import django.urls
from django import forms
from django.contrib.admin.widgets import AutocompleteSelect
from django.views.generic.list import BaseListView
from django.conf.urls import url
from django.contrib import admin
from django.template.response import TemplateResponse
......@@ -35,15 +38,117 @@ def copy_chart(_admin, _request, queryset):
models.Criteria.objects.create(
chart=new,
stats_key=criteria.stats_key,
filter_spec=criteria.filter_spec,
axis_spec=criteria.axis_spec,
group_spec=criteria.group_spec,
filter_query=criteria.filter_query,
group_query=criteria.group_query,
axis_query=criteria.axis_query,
)
class QueryAutocompleteJsonView(BaseListView):
"""Handle Widget's AJAX requests for data."""
paginate_by = 20
stats_key = ''
def get(self, request, *args, **kwargs):
"""
Return a JsonResponse with search results of the form:
{
results: [{id: "123" text: "foo"}],
}
"""
if not self.has_perm(request):
return django.http.JsonResponse(
{'error': '403 Forbidden'}, status=403)
try:
reg = registry.REGISTRY[self.stats_key]
except KeyError:
return django.http.HttpResponseBadRequest("Invalid stats key")
term = self.request.GET.get('term', '')
results = []
for item in self.get_options(reg, term):
results.append({'id': item, 'text': item})
return django.http.JsonResponse({'results': results})
def get_options(self, reg: registry.Registry, term: str) -> list:
"""Return queryset based on ModelAdmin.get_search_results()."""
return reg.query_options(term)
def has_perm(self, request, obj=None):
"""Check if user has permission to access the related model."""
return request.user.has_perm('django_adminstats.view_chart')
class FilterAutocompleteJsonView(QueryAutocompleteJsonView):
def get_options(self, reg: registry.Registry, term: str) -> list:
return reg.query_options(term, is_filter=True)
class QueryAutocomplete(AutocompleteSelect):
allow_multiple_selected = True
is_required = False
def __init__(self):
super().__init__(None, None)
self.choices = ()
def get_url(self):
return django.urls.reverse(
'admin:django_adminstats_query_autocomplete', args=[''])
def optgroups(self, name, value, attrs=None):
"""Return a list of optgroups for this widget."""
subgroup = []
# just show the existing selected options, everything they can get
# from select2
for part in value:
subgroup.append(self.create_option(
name, part, part, True, 0,
subindex=None, attrs=attrs,
))
return [(None, subgroup, 0)]
def value_omitted_from_data(self, data, files, name):
# this is required otherwise django won't save blank values
return False
def format_value(self, value):
"""Return selected values as a list."""
if value is None:
return []
return [s for s in value.split('&') if s != '']
def value_from_datadict(self, data, _files, name):
return '&'.join(data.getlist(name))
def build_attrs(self, base_attrs, extra_attrs=None):
attrs = super().build_attrs(base_attrs, extra_attrs)
attrs['data-token-separator'] = '&'
attrs['data-select-on-close'] = 'true'
attrs['data-ajax--delay'] = '250'
return attrs
class FilterAutocomplete(QueryAutocomplete):
def get_url(self):
return django.urls.reverse(
'admin:django_adminstats_filter_autocomplete', args=[''])
class CriteriaForm(forms.ModelForm):
stats_key = forms.ChoiceField(choices=registry.REGISTRY.choices())
class Meta:
model = models.Criteria
fields = ['stats_key', 'filter_query', 'group_query', 'axis_query']
widgets = {
'filter_query': FilterAutocomplete,
'group_query': QueryAutocomplete,
'axis_query': QueryAutocomplete,
}
class CriteriaInline(admin.TabularInline):
form = CriteriaForm
......@@ -69,6 +174,14 @@ class ChartAdmin(admin.ModelAdmin):
r'^(?P<chart_id>\w+)/chart$',
self.admin_site.admin_view(self.view_chart),
name='django_adminstats_chart'),
url(
r'^filter_autocomplete/(?P<stats_key>.*)$',
self.filter_autocomplete_view,
name='django_adminstats_filter_autocomplete'),
url(
r'^query_autocomplete/(?P<stats_key>.*)$',
self.query_autocomplete_view,
name='django_adminstats_query_autocomplete'),
] + super().get_urls()
@add_description(_('Actions'))
......@@ -77,12 +190,20 @@ class ChartAdmin(admin.ModelAdmin):
'<a href="{url}">{text}</a>',
text=_('Show Chart'), url='{}/chart'.format(obj.pk))
@staticmethod
def query_autocomplete_view(request, stats_key):
return QueryAutocompleteJsonView.as_view(stats_key=stats_key)(request)
@staticmethod
def filter_autocomplete_view(request, stats_key):
return FilterAutocompleteJsonView.as_view(stats_key=stats_key)(request)
def view_chart(self, request, chart_id):
if not self.has_change_permission(request):
raise django.core.exceptions.PermissionDenied
return django.http.HttpResponseForbidden()
chart = self.get_object(request, chart_id) # type: models.Chart
if chart is None:
raise django.http.Http404()
return django.http.HttpResponseNotFound()
if request.method != 'GET':
return django.http.HttpResponseNotAllowed(('GET',))
......
# Generated by Django 2.1.3 on 2018-11-23 22:58
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('django_adminstats', '0002_filters'),
]
operations = [
migrations.RenameField(
model_name='criteria',
old_name='axis_spec',
new_name='axis_query',
),
migrations.RenameField(
model_name='criteria',
old_name='filter_spec',
new_name='filter_query',
),
migrations.RenameField(
model_name='criteria',
old_name='group_spec',
new_name='group_query',
),
]
......@@ -89,9 +89,9 @@ class Criteria(models.Model):
chart = models.ForeignKey(
Chart, on_delete=models.CASCADE, related_name='criteria')
stats_key = models.CharField(max_length=200)
filter_spec = models.TextField(blank=True, default='')
axis_spec = models.TextField(blank=True, default='')
group_spec = models.TextField(blank=True, default='')
filter_query = models.TextField(blank=True, default='')
group_query = models.TextField(blank=True, default='')
axis_query = models.TextField(blank=True, default='')
def __str__(self):
return self.stats_key
import inspect
import re
import urllib.parse
from typing import Mapping, List, Any
import collections
import django.db.models as db_models
import django.db.models.aggregates as db_aggregates
......@@ -25,8 +29,6 @@ class QuerySpec:
query.filter(group__name__contains='Foo Bar')
``group__users:count__lt=6``
``query.annotate(x=Count(group__users)).filter(x__lt=6)``
``group__users__isnull``
``query.annotate(x=Count(group__users)).filter(x__lt=6)``
Invalid text strings
......@@ -38,22 +40,6 @@ class QuerySpec:
"""
# nb. make range work right
FIELD_LOOKUPS = {'exact', 'iexact', 'contains', 'icontains', 'in', 'gt',
'gte', 'lt', 'lte', 'startswith', 'endswith', 'range',
'date', 'month', 'day', 'week', 'weekday', 'quarter',
'time', 'hour', 'minute', 'second', 'isnull', 'regex',
'iregex'}
FUNCS = {
'avg': db_aggregates.Avg,
'count': db_aggregates.Count,
'max': db_aggregates.Max,
'min': db_aggregates.Min,
'stddev': db_aggregates.StdDev,
'sum': db_aggregates.Sum,
'variance': db_aggregates.Variance,
}
def __init__(self, axis_text: str, group_text: str, filter_text: str):
"""
:param axis_text: Text for the y axis query
......@@ -62,13 +48,13 @@ class QuerySpec:
"""
if not axis_text:
axis_text = 'id:count'
self.axis_parts = [QuerySpecPart(part, False)
self.axis_parts = [QuerySpecPart(part, is_filter=False)
for part in axis_text.split('&')
if part != '']
self.group_parts = [QuerySpecPart(part, False)
self.group_parts = [QuerySpecPart(part, is_filter=False)
for part in group_text.split('&')
if part != '']
self.filter_parts = [QuerySpecPart(part, True)
self.filter_parts = [QuerySpecPart(part, is_filter=True)
for part in filter_text.split('&')
if part != '']
......@@ -113,51 +99,46 @@ class QuerySpec:
class QuerySpecPart:
FUNCS = {
'avg': db_aggregates.Avg,
'count': db_aggregates.Count,
'max': db_aggregates.Max,
'min': db_aggregates.Min,
'stddev': db_aggregates.StdDev,
'sum': db_aggregates.Sum,
'variance': db_aggregates.Variance,
}
def __init__(self, text: str, is_filter: bool):
FUNCS = collections.OrderedDict((
('count', db_aggregates.Count),
('sum', db_aggregates.Sum),
('avg', db_aggregates.Avg),
('min', db_aggregates.Min),
('max', db_aggregates.Max),
('stddev', db_aggregates.StdDev),
('variance', db_aggregates.Variance),
))
FIELD_LOOKUPS = [
'exact', 'iexact', 'contains', 'icontains', 'in', 'gt', 'gte', 'lt',
'lte', 'startswith', 'endswith', 'range', 'date', 'month', 'day',
'week', 'weekday', 'quarter', 'time', 'hour', 'minute', 'second',
'isnull', 'regex', 'iregex']
def __init__(self, text: str, *, is_filter: bool):
self.is_filter = is_filter
if not is_filter:
if '=' in text:
raise ValueError(
'Filters (=) not supported in {}'.format(text))
else:
col_func, self.value = urllib.parse.unquote(text), ''
parts = (text, '')
else:
if '=' not in text:
raise ValueError(
'Filters (=) are required in {}'.format(text))
else:
col_func, self.value = (urllib.parse.unquote(part) for part
in text.split('=', 1))
parts = text.split('=', 1) if '=' in text else (text, '')
col_func, self.value = (urllib.parse.unquote(p) for p in parts)
if ':' in col_func:
self.col, func = col_func.split(':', 1)
else:
self.col, func = col_func, ''
self.col, func = col_func, None
if '__' in func:
func_name, self.func_lookup = func.split('__', 1)
if func is None:
self.func_name, self.func_lookup = None, None
elif '__' in func:
self.func_name, self.func_lookup = func.split('__', 1)
else:
func_name, self.func_lookup = func, ''
self.func_name, self.func_lookup = func, None
if func_name == '':
self.func = None
elif func_name not in self.FUNCS:
raise ValueError('Unrecognized function {}', func_name)
else:
self.func = self.FUNCS[func_name]
if is_filter and self.func_lookup:
raise ValueError(
'Function lookups aren\'t allowed in filter queries')
@property
def func(self):
if self.func_name not in self.FUNCS:
return None
return self.FUNCS[self.func_name]
def expression_col(self):
"""Used with not filter lookups, gets the expression to query.
......@@ -167,3 +148,91 @@ class QuerySpecPart:
"""
exp = self.func if self.func else db_models.F
return exp(self.col)
@staticmethod
def _model_options(model: type) -> Mapping[str, db_models.Field]:
meta = getattr(model, '_meta')
return getattr(meta, '_forward_fields_map').copy()
# note: _forward_fields_map is a dict, which currently (in Python
# 3.6 & 3.7) maintains insertion order, If this is no longer the
# case in the future, we can get the fields with
# return collections.OrderedDict((f.name, f) for f in meta.fields)
# but that won't include the *_id fields :-/
@staticmethod
def _field_options(field: db_models.Field) -> Mapping[str, Any]:
return field.get_lookups()
@staticmethod
def _is_field_or_lookup(obj: Any) -> bool:
return (issubclass(obj, db_models.Lookup)
if inspect.isclass(obj) else
isinstance(obj, db_models.Field))
def options(self, model: type)-> List[str]:
"""Given a filter string, return the next options"""
options = self._model_options(model)
query_startswith = ''
top_key = ''
last_key = ''
last_obj = model
for match in re.finditer(r'(^|__)(\w*?)(?=__|$)', self.col):
# add last item
part = match.group(2)
if part == '':
continue
elif part in options:
last_obj = options[part]
last_key = self.col[:match.end()]
query_startswith = ''
key = self.col[:match.end()]
top_key = key
if isinstance(last_obj, db_models.Field):
if last_obj.related_model is not None:
last_obj = last_obj.related_model
options = self._model_options(last_obj)
elif self.is_filter:
options = self._field_options(last_obj)
else:
options = {}
else:
options = {}
continue
query_startswith = part
break
result = []
if top_key != '':
result = [top_key]
if self.value == '':
if query_startswith == '':
if self._is_field_or_lookup(last_obj):
if self.func_name is None:
result.append(last_key + ':count')
else:
result += [
last_key + ':' + fn for fn in self.FUNCS.keys()
if fn.startswith(self.func_name)]
duplicate_keys = set()
# clear out duplicate fkey_id field
for key, field in options.items():
if (getattr(field, 'related_model', None) is not None
and field.name != key
and options[field.name] is field):
duplicate_keys.add(key)
for key in duplicate_keys:
del options[key]
keys = options.keys()
if query_startswith != '':
keys = (key for key in keys
if key.startswith(query_startswith))
if last_key != '':
keys = (last_key + '__' + key for key in keys)
result = result + list(keys)
# see if we need to add query stuff
if self.is_filter:
result = [item + '=' + self.value for item in result]
return result
......@@ -61,8 +61,8 @@ class Registration(metaclass=abc.ABCMeta):
qs, x_annotations=x_annotations, x_value=x_value)
def get_data(self, start: date, end: date,
period_step: Step, axis_text: str, group_text: str,
filter_text: str) -> typing.Mapping[
period_step: Step, filter_text: str, group_text: str,
axis_text: str) -> typing.Mapping[
str, typing.Mapping[date, str]]:
query_spec = QuerySpec(axis_text=axis_text, group_text=group_text,
filter_text=filter_text)
......@@ -80,6 +80,10 @@ class Registration(metaclass=abc.ABCMeta):
result[group_key][item['_django_adminstats_x']] = value
return result
@abc.abstractmethod
def query_options(self, text: str, *, is_filter: bool = False) -> list:
"""Given a filter string, return the next options"""
class Registry(collections.OrderedDict):
......@@ -100,5 +104,5 @@ class Registry(collections.OrderedDict):
stats = self[criteria.stats_key]
start, end = criteria.chart.span()
return stats.get_data(start, end, Step(criteria.chart.period_step),
criteria.axis_spec, criteria.group_spec,
criteria.filter_spec)
criteria.filter_query, criteria.group_query,
criteria.axis_query)
from . import Registration
from django_adminstats import qs
class ModelRegistration(Registration):
......@@ -18,27 +19,7 @@ class ModelRegistration(Registration):
def label(self):
return self.meta.verbose_name_plural.title()
def _filter(self, cls):
"""Given an individual filter token and a model, find the next model"""
# first check if _meta (model)
meta = getattr(cls, '_meta', None)
if meta is not None:
ff = getattr(meta, '_forward_fields_map', None)
if ff is not None:
return ff
return dict((f.name, f) for f in meta.fields)
# check if it's a ForeignKey or something
related = getattr(cls, 'related_model', None)
if related is not None:
return self._filter(related)
raise NotImplementedError
def filter_options(self, text: str = '') -> set:
def query_options(self, text: str, *, is_filter: bool = False) -> list:
"""Given a filter string, return the next options"""
cls = self.model
options = self._filter(cls)
for part in text.split('__'):
if part in options:
cls = options[part]
options = self._filter(cls)
return set(options.keys())
part = qs.QuerySpecPart(text, is_filter=is_filter)
return part.options(self.model)
......@@ -8,7 +8,7 @@ import trackstats.models
from django.contrib.contenttypes.models import ContentType
from django_adminstats import Step
from django_adminstats.qs import QuerySpec
from django_adminstats import qs
from .base import Registration
......@@ -101,7 +101,7 @@ class MetricRegistration(Registration):
return self.get_queryset().filter(**kwargs)
def query(self, start: date, end: date,
period_step: Step, query_spec: QuerySpec):
period_step: Step, query_spec: qs.QuerySpec):
qs = self.get_span_queryset(start, end, period_step)
if self.model:
x_annotations, x_value = self.get_x_parameters(period_step)
......@@ -112,11 +112,18 @@ class MetricRegistration(Registration):
qs, x_annotations=x_annotations, x_value=x_value)
def get_data(self, start: date, end: date,
period_step: Step, axis_text: str, group_text: str,
filter_text: str) -> Mapping[str, Mapping[date, str]]:
period_step: Step, filter_text: str, group_text: str,
axis_text: str) -> Mapping[str, Mapping[date, str]]:
if axis_text != '':
raise ValueError('Axis not supported for tracstats')
if not group_text:
group_text = 'object_id' if self.model else ''
return super().get_data(start, end, period_step, 'value',
group_text, filter_text)
return super().get_data(start, end, period_step, filter_text,
group_text, 'value')
def query_options(self, text: str, *, is_filter: bool = False) -> list:
"""Given a filter string, return the next options"""
if self.model is None:
return []
part = qs.QuerySpecPart(text, is_filter=is_filter)
return part.options(self.model)
......@@ -17,9 +17,48 @@
findFieldRow('until_date').toggle(opt.value == 's');
}
function changeStats(origin) {
var row = origin.parentNode.parentNode;
var queryFieldNames = ['filter', 'group', 'axis'];
for (var idx=0; idx<queryFieldNames.length; idx++) {
var select = row.querySelector(
'.field-'+queryFieldNames[idx]+'_query select');
if (select.dataset['ajax-OrigUrl'] == undefined) {
select.dataset['ajax-OrigUrl'] = select.dataset['ajax-Url'];
}
// need to specify a baseURL, but we don't use it
var url = new URL(select.dataset['ajax-OrigUrl'],
'https://example/');
select.dataset['ajax-Url'] = (url.pathname + origin.value);
// we need to reinitialize the jquery plugin so it recognizes
// the new settings
$(select).djangoAdminSelect2().on("select2:unselecting",
// work-around for issue #3320 in select2
function (e) {
// make sure we are on the list and not within input box
if (e.params._type === 'unselecting') {
$(this).val([]).trigger('change');
e.preventDefault();
}
});
}
}
document.addEventListener('DOMContentLoaded', function() {
untilTypeField = findField('until_type', 'select');
// Hide Until date if Until type is not "Specific Date"
var untilTypeField = findField('until_type', 'select');
untilTypeField.on('change', function() {selectUntilType(this)});
selectUntilType(untilTypeField[0]);
// go through criteria entries and set up autocomplete URLs
var criteriaRows = document.querySelectorAll(
'#criteria-group .form-row');
for (var idx=0; idx<criteriaRows.length; idx++) {
var select = criteriaRows[0].querySelector(
'.field-stats_key select');
changeStats(select);
select.addEventListener(
'change', function(event) { changeStats(this); });
}
});
})(django.jQuery)
import django.http
from django.contrib.admin import AdminSite
from django.test import TestCase
import django_adminstats.admin
import django_adminstats.models
import django_adminstats.registry
class MockSuperUser:
class MockAnon:
is_authenticated = False
is_active = False
is_staff = False
@staticmethod
def has_perm(_perm):
return False
class MockSuperUser:
is_authenticated = True
is_active = True
is_staff = True
......@@ -54,3 +66,71 @@ class AdminTests(TestCase):
response = self.admin.view_chart(request, str(obj.id))
assert response.status_code == 200
assert '<td>3</td>' in response.rendered_content
def test_chart404(self):
request = MockRequest('GET', {})
response = self.admin.view_chart(request, '404')
assert response.status_code == 404
def test_chart403(self):
request = MockRequest('GET', {})
request.user = MockAnon()
response = self.admin.view_chart(request, '2')
assert response.status_code == 403
def test_chart405(self):
request = MockRequest('POST', {})
response = self.admin.view_chart(request, '2')
assert response.status_code == 405
def test_chart403(self):
request = MockRequest('GET', {})
request.user = MockAnon()
response = self.admin.view_chart(request, '2')
assert response.status_code == 403
def test_autocomplete_view_perm():
stats_key = list(django_adminstats.registry.REGISTRY.keys())[0]
su_request = MockRequest('GET', {})
anon_request = MockRequest('GET', {})
anon_request.user = MockAnon()
view_func = django_adminstats.admin.ChartAdmin.query_autocomplete_view
view = django_adminstats.admin.QueryAutocompleteJsonView()
# anon user should get 403
assert not view.has_perm(anon_request)
assert view_func(anon_request, stats_key).status_code == 403
# admin user should not
assert view.has_perm(su_request)
assert view_func(su_request, stats_key).status_code == 200
def test_autocomplete_format_value():
widget = django_adminstats.admin.QueryAutocomplete()
assert ['foo', 'bar=baz'] == widget.format_value('foo&bar=baz')