[debops.ansible_plugins] Test copy_id_from and restore *args handling

parent d6498a9f
......@@ -128,7 +128,7 @@ def _parse_kv_value(current_data, new_data, data_index, *args, **kwargs):
int(dict_element.get('id')) +
int(dict_element.get('weight')))
# Include any unknown keys
# Include any unknown keys.
for key in element.keys():
if key not in ['name', 'state', 'id', 'weight',
'real_weight', 'param',
......@@ -280,7 +280,7 @@ def parse_kv_config(*args, **kwargs):
return sorted(output, key=itemgetter('real_weight'))
def parse_kv_items(input_args, empty=None, defaults=None, merge_keys=None):
def parse_kv_items(*args, **kwargs):
"""Return a parsed list of with_items elements
Optional arguments:
......@@ -300,30 +300,18 @@ def parse_kv_items(input_args, empty=None, defaults=None, merge_keys=None):
list of keys in the dictionary that will be merged together. If not
specified, 'options' key instances will be merged by default.
"""
args = {}
kwargs = {}
if empty is None:
empty = {}
if defaults is None:
defaults = {}
if merge_keys is None:
merge_keys = set()
else:
merge_keys = set(merge_keys)
empty = kwargs.get('empty', {})
defaults = kwargs.get('defaults', {})
merge_keys = set(kwargs.get('merge_keys', set()))
merge_keys.add('options')
input_args = []
# input_args = []
parsed_config = {}
# Flatten the input list
# Flatten the input list.
for sublist in args:
for item in sublist:
input_args.append(item)
input_args.extend(sublist)
parsed_config = {}
for element_index, element in enumerate(input_args):
......@@ -337,7 +325,7 @@ def parse_kv_items(input_args, empty=None, defaults=None, merge_keys=None):
if element_state == 'append':
# In append mode, don't create new config entries
# In append mode, don't create new config entries.
if (param_name not in parsed_config or
parsed_config[param_name].get('state',
'present') == 'init'):
......@@ -367,10 +355,10 @@ def parse_kv_items(input_args, empty=None, defaults=None, merge_keys=None):
})
if 'copy_id_from' in element:
if element.get('copy_id_from') in parsed_config:
id_src = element.get('copy_id_from')
if element['copy_id_from'] in parsed_config:
id_src = element['copy_id_from']
current_param['id'] = (
int(parsed_config[id_src].get('id')) +
int(parsed_config[id_src]['id']) +
int(parsed_config[id_src].get('weight', 0)))
if 'weight' in element:
......@@ -380,13 +368,13 @@ def parse_kv_items(input_args, empty=None, defaults=None, merge_keys=None):
int(current_param.get('weight', 0)))
current_param['real_weight'] = (
int(current_param.get('id')) +
int(current_param.get('weight')))
int(current_param['id']) +
int(current_param['weight']))
if 'comment' in element:
current_param['comment'] = element.get('comment')
# Set any default keys defined for the filter
# Set any default keys defined for the filter.
for k, v in defaults.items():
current_param[k] = current_param.get(k, v)
......@@ -395,16 +383,14 @@ def parse_kv_items(input_args, empty=None, defaults=None, merge_keys=None):
current_options = current_param.get(key_name, [])
current_param[key_name] = parse_kv_config(
current_options + element.get(key_name),
merge_keys=kwargs.get('merge_keys'))
merge_keys=merge_keys)
known_keys = ['name', 'state', 'id', 'weight',
'real_weight', 'separator',
'comment', 'options']
if isinstance(kwargs.get('merge_keys'), list):
known_keys.extend(kwargs.get('merge_keys'))
# Include any unknown keys
# Include any unknown keys.
for unknown_key in element.keys():
if unknown_key not in known_keys:
current_param[unknown_key] = element.get(unknown_key)
......@@ -419,10 +405,10 @@ def parse_kv_items(input_args, empty=None, defaults=None, merge_keys=None):
current_param[key_to_set] = current_param[key_to_check]
break
parsed_config.update({param_name: current_param})
parsed_config[param_name] = current_param
# Expand the dictionary of configuration options into a list,
# and return sorted by weight
# and return sorted by weight.
output = []
for key, params in parsed_config.items():
if isinstance(params.get('value'), dict):
......@@ -603,8 +589,45 @@ if __name__ == '__main__':
assert_equal(items, expected_items)
def test_parse_kv_items(self):
def test_parse_kv_items_copy_id_from(self):
input_items = yaml.safe_load(textwrap.dedent('''
- name: 'second'
weight: 10
value: 'value1'
- name: 'first'
weight: -10
copy_id_from: 'second'
value: 'value2'
'''))
expected_items = yaml.safe_load(textwrap.dedent('''
- copy_id_from: second
id: 10
name: first
real_weight: 0
separator: false
state: present
value: value2
weight: -10
- id: 0
name: second
real_weight: 10
separator: false
state: present
value: value1
weight: 10
'''))
items = parse_kv_items(input_items)
# print(yaml.dump(items, default_flow_style=False))
# print(yaml.dump(expected_items, default_flow_style=False))
assert_equal(items, expected_items)
def test_parse_kv_items(self):
input_items1 = yaml.safe_load(textwrap.dedent('''
- name: 'should-stay-init'
options:
......@@ -666,8 +689,9 @@ if __name__ == '__main__':
state: 'init'
state: 'init'
'''))
input_items2 = yaml.safe_load(textwrap.dedent('''
- name: 'should-become-present3'
options:
......@@ -682,7 +706,6 @@ if __name__ == '__main__':
- name: 'external2'
value: 'test'
'''))
expected_items = yaml.safe_load(textwrap.dedent('''
......@@ -773,9 +796,11 @@ if __name__ == '__main__':
weight: 0
'''))
# print("----------------------------")
# print(yaml.dump(parse_kv_items(input_items), default_flow_style=False))
items = parse_kv_items(input_items1, input_items2)
assert_equal(parse_kv_items(input_items), expected_items)
# print(yaml.dump(items, default_flow_style=False))
# print(yaml.dump(expected_items, default_flow_style=False))
assert_equal(items, expected_items)
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment