Commits (2)
......@@ -2,9 +2,12 @@
branch = True
omit =
exclude_lines =
exclude_lines =
pragma: no cover
def __repr__
......@@ -103,7 +103,7 @@ def _create_default_blocks(cfg):
name='Next tasks',
fmt='<m class=comment>{id: >4}</m> <m class=warning>{priority:<2}</m>{description} <m class=info>{tags:>}</m>', # pylint: disable=line-too-long
fmt=' {description}<sr beg=" "><m name=warning>{priority}</m></sr><sr beg=" "><m name=info>{tags:>}</m></sr>', # pylint: disable=line-too-long
......@@ -15,35 +15,104 @@
# You should have received a copy of the GNU General Public License
# along with TWC. If not, see <http://www.gnu.org/licenses/>.
'''Custom markup processor.
Each tag should be implemented as a separate class which has the following
1. implements process(data : <list, str, tuple>) -> <list, str, tuple>
2. defines static __tagname__ : str, which defines handled tag name
3. is added to _tag_types tuple
Markup element is either a string or a tuple of (potentially nested) of other
markup elements.
process() should accept either a list of markup elements or a single markup
element (string or tuple).
from collections import deque
from html.parser import HTMLParser
import attr
from twc.utils import eprint
from twc.locale import _
class _M:
'''Elements markup: <m name=text>'''
name = attr.ib('text')
__tagname__ = 'm'
def process(self, data):
# wrap only string and lists, because wrapping another tuple (another
# <m>) won't have any effect because URWID will only process the
# inner-most markup element. Plus my tests showed that URWID quickly
# starts bugging with markup nested multiple times.
if isinstance(data, (list, str)):
return (self.name, data)
return data
class _Sr:
'''Elements surround: <sr beg=[ end=]>'''
beg = attr.ib('')
end = attr.ib('')
__tagname__ = 'sr'
def process(self, data):
if isinstance(data, str):
return '{0}{1}{2}'.format(self.beg, data, self.end)
if isinstance(data, list):
return [self.beg] + data + [self.end]
return [self.beg, data, self.end]
_tag_types = (_M, _Sr)
_tag_handlers = {cls.__tagname__: cls for cls in _tag_types}
# pylint: disable=abstract-method
class Parser(HTMLParser):
'''Parser of task format markup'''
markup = attr.ib(factory=list)
_curr = attr.ib(None)
tags = attr.ib(factory=deque)
_data = attr.ib(None)
def __attrs_post_init__(self):
def handle_starttag(self, tag, attrs):
if tag == 'm':
self._curr = self._find_attr('class', attrs)
ctor = _tag_handlers.get(tag)
if ctor:
kwds = {}
eprint(_('Tag not supported: <{}>'.format(tag)))
def handle_endtag(self, tag):
if tag == 'm':
self._curr = None
ctor = _tag_handlers.get(tag)
if not ctor:
if not self.tags:
eprint(_('Unexpected end tag: </{}>'.format(tag)))
if not isinstance(self.tags[-1], ctor):
eprint(_('End tag in incorrect order: </{}>'.format(tag)))
def handle_data(self, data):
if self._curr is not None:
self.markup.append((self._curr, data))
for tag in reversed(self.tags):
data = tag.process(data)
if isinstance(data, list):
def _find_attr(self, searched, tag_attrs):
for ta in tag_attrs:
if searched == ta[0]:
return ta[1]
return None
# Copyright (C) 2019 Michał Góral.
# This file is part of TWC
# TWC is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# TWC is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with TWC. If not, see <http://www.gnu.org/licenses/>.
'''Definition of Task wrapper for dictionaries returned by taskw'''
import weakref
class Task(dict):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self._parent = None
self._children = []
def children(self):
return self._children
def parent(self):
if self._parent:
return self._parent()
return None
def parent(self, val):
self._parent = weakref.ref(val)
def depth(self):
i = 0
w = self
while w.parent:
i += 1
w = w.parent
return i
def add_child(self, child):
child.parent = self
......@@ -19,6 +19,9 @@
import shlex
import functools
from collections import OrderedDict, deque
from twc.task import Task
class TaskComparer:
......@@ -71,10 +74,15 @@ def _process_sort_string(sort_string):
return cmp
def export_tasks(tw, *query):
tasks = tw._get_task_objects(*query, 'export') # pylint: disable=protected-access
return [Task(**t) for t in tasks]
def filter_tasks(filter_string, tw):
'''Returns a list of tasks filtered by a given TW-compatible string'''
query = _process_filter(filter_string)
return tw._get_task_objects(*query, 'export') # pylint: disable=protected-access
return export_tasks(tw, *query)
def sort_tasks(tasks, sort_string):
......@@ -86,3 +94,31 @@ def sort_tasks(tasks, sort_string):
cmp = _process_sort_string(sort_string)
comparer = functools.partial(TaskComparer, cmp=cmp)
def group_tasks(tasks):
'''Groups tasks by creating parent-child relationship. This allows creating subtasks
even if TaskWarrior doesn't natively supports that.
Tasks are grouped by their `depends` field: parent tasks depend on
grouped = OrderedDict([(t['uuid'], t) for t in tasks])
for t in tasks:
deps = t.get('depends')
if deps:
for dep_uuid in deps.split(','):
dep = grouped.get(dep_uuid)
if dep:
del grouped[dep_uuid]
return grouped
def dfs(tasks):
'''Depth-first-search walk through tasks grouped by group_tasks()'''
stack = deque(tasks)
while stack:
task = stack.pop()
yield task
......@@ -57,19 +57,26 @@ class BlockView(urwid.BoxAdapter):
'''View of a single agenda block, i.e. a list of tasks'''
def __init__(self, screen, cfg, tw, block):
self.block = block
self.tasks = twutils.filter_tasks(block.filter, tw)
twutils.sort_tasks(self.tasks, block.sort)
tasks = twutils.filter_tasks(block.filter, tw)
twutils.sort_tasks(tasks, block.sort)
if block.limit is not None:
self.tasks = self.tasks[:block.limit]
tasks = tasks[:block.limit]
self.tasks = twutils.group_tasks(tasks)
focus_map = {c: '{}-focus'.format(c)
for c in cfg.colors if not c.endswith('-focus')}
focus_map[None] = 'text-focus'
textboxes = [urwid.AttrMap(TaskView(t, self.block.fmt),
None, focus_map)
for t in self.tasks]
textboxes = []
for task in twutils.dfs(list(self.tasks.values())):
indent = ' ' * task.depth * 2
fmt = indent + self.block.fmt
urwid.AttrMap(TaskView(task, fmt), None, focus_map))
lw = urwid.ListBox(urwid.SimpleFocusListWalker(textboxes))
lb = urwid.LineBox(lw, title=self.block.name)
import copy
import itertools
from collections import OrderedDict
import pytest
import twc.twutils as twutils
from twc.task import Task
def perms(*a, **kw):
return list(itertools.permutations(*a, **kw))
def _gen_tasks():
return perms([
Task(uuid='1', depends='2,3,4'), # p, task 3 doesn't exist
Task(uuid='2', depends='5'), # p, ch
Task(uuid='4'), # ch
Task(uuid='5', depends='6'), # ch, 2nd level
Task(uuid='6'), # ch, 3rd level
def _find_children_order(tasks, uuids):
i = 0
found = []
for t in tasks:
if t['uuid'] == str(uuids[i]):
i += 1
return found
def tasks(request):
return request.param
def test_grouping(tasks):
tasks = copy.deepcopy(tasks)
td = {t['uuid']: t for t in tasks}
grouped = twutils.group_tasks(tasks)
assert tuple(grouped.keys()) in perms(['0', '1'])
assert td['0'].depth == 0
assert td['0'].parent is None
assert not td['0'].children
assert td['1'].depth == 0
assert td['1'].parent is None
assert tuple(td['1'].children) in perms([td['2'], td['4']])
assert td['2'].depth == 1
assert td['2'].parent is td['1']
assert td['2'].children == [td['5']]
assert td['4'].depth == 1
assert td['4'].parent is td['1']
assert not td['4'].children
assert td['5'].depth == 2
assert td['5'].parent is td['2']
assert td['5'].children == [td['6']]
assert td['6'].depth == 3
assert td['6'].parent is td['5']
assert not td['6'].children
......@@ -6,13 +6,60 @@ import twc.markup as markup
@pytest.mark.parametrize('fmt,expected', [
('', []),
('foo bar', ['foo bar']),
('foo <m class=highlight>bar baz</m> blah',
('foo <m name=highlight>bar baz</m> blah',
['foo ', ('highlight', 'bar baz'), ' blah']),
('foo <m class="highlight">bar baz</m> blah',
('foo <m name="highlight">bar baz</m> blah',
['foo ', ('highlight', 'bar baz'), ' blah']),
('foo <sr beg=, end=?>bar </sr> baz', ['foo ', ',bar ?', ' baz'])
def test_markup(fmt, expected):
p = markup.Parser()
assert p.markup == expected
@pytest.mark.parametrize('fmt,expected', [
('<m>a</m>', [('text', 'a')]),
('<sr>a</sr>', ['a']),
def test_default_attrs(fmt, expected):
p = markup.Parser()
assert p.markup == expected
@pytest.mark.parametrize('fmt', [
def test_empty(fmt):
p = markup.Parser()
assert not p.markup
@pytest.mark.parametrize('fmt,expected', [
('<sr beg=[ end=]><sr beg=[ end=]><m>foo</m></sr></sr>',
['[', '[', ('text', 'foo'), ']', ']']),
('<m><sr beg=[ end=]>foo</sr></m> bar', [('text', '[foo]'), ' bar']),
('<m><m><sr beg=[ end=]>foo</sr></m> bar</m>',
[('text', '[foo]'), ('text', ' bar')]),
('<sr beg=[ end=]><m>foo</m></sr>', ['[', ('text', 'foo'), ']']),
('<sr><m name=out><sr beg=; end=;><m name=in>foo</m></sr></m></sr>',
['', ('out', [';', ('in', 'foo'), ';']), ''])
def test_nested(fmt, expected):
p = markup.Parser()
assert p.markup == expected