service.py 3.07 KB
Newer Older
1
#!/usr/bin/env python3
Brad Cable's avatar
Brad Cable committed
2

3 4 5
# Sopeq: service.py
# License: GPLv2
# Author: Brad Cable
Brad Cable's avatar
Brad Cable committed
6

7
from iptables_helpers import *
8
from rule_generator import rule_generator
Brad Cable's avatar
Brad Cable committed
9

10
class service(rule_generator):
11
	_controller = None
12 13
	direction = None
	name = None
14
	_rule = None
15 16
	_log_prefix = None
	_rule_comment = None
17 18 19
	_forwarding = None
	_chains = None
	_rules = None
20
	_cache_construct_comment = None
Brad Cable's avatar
Brad Cable committed
21

22 23 24 25
	def __init__(self,
		controller, direction, name, rule, log_prefix, rule_comment
	):
		self._controller = controller
26 27 28
		self.direction = direction
		self.service_name = name
		self.name = "{}:{}".format(name, direction)
29
		self._rule = rule
30 31
		self._log_prefix = log_prefix
		self._rule_comment = rule_comment
Brad Cable's avatar
Brad Cable committed
32

33 34
	def chains(self):
		return self._chains
Brad Cable's avatar
Brad Cable committed
35

36 37
	def rules(self):
		return self._rules
Brad Cable's avatar
Brad Cable committed
38

39 40 41 42 43 44 45 46 47 48 49 50
	def append_rule(self, rule):
		if self._cache_construct_comment is not None:
			if self._rule_comment is not None:
				self._cache_construct_comment = \
					ipt_construct_comment(self._rule_comment)
			else:
				self._cache_construct_comment = ""

			rule += self._cache_construct_comment

		self._rules.append(("filter", rule))

Brad Cable's avatar
Brad Cable committed
51 52
	def generate(self, *args, **kwargs):
		super().generate(*args, **kwargs)
53

54 55
		self._rules = []
		self._chains = []
Brad Cable's avatar
Brad Cable committed
56

57 58
		if self.direction == "in":
			new_chain = "{}_IN".format(self.service_name)
59
			direction_table = "INPUT"
Brad Cable's avatar
Brad Cable committed
60

61 62 63 64 65
		elif self.direction == "out":
			if not self._egress:
				return

			new_chain = "{}_OUT".format(self.service_name)
66
			direction_table = "OUTPUT"
Brad Cable's avatar
Brad Cable committed
67

68 69 70
		else:
			raise Exception # TODO

71
		self._chains.append(("filter", new_chain))
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86

		if self._log_prefix is not None:
			log_append = ipt_construct_log(
				self._controller.log_level, self._log_prefix
			)

			cur_rule = "-A {} {}{}".format(
				direction_table, self._rule, log_append
			)

			if comment_append is not None:
				cur_rule += comment_append

			rules.append(("filter", cur_rule))

87
		self.append_rule("-A {} {} -j {}".format(
88
			direction_table, self._rule, new_chain
89
		))
90 91

		if self._forwarding:
92
			if self._log_prefix is not None:
93
				self.append_rule("-A FORWARD {}".format(log_append))
94

95 96 97
			self.append_rule(
				"-A FORWARD {} -j {}".format(self._rule, new_chain)
			)
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

if __name__ == "__main__":
	ssh = service("in", "ssh", "-p tcp --dport 22")
	ssh.generate()
	assert ssh.chains() == [("filter", "ssh_IN")]
	assert ssh.rules() == [
		("filter", "-A INPUT -p tcp --dport 22 -j ssh_IN")
	]

	ssh = service("out", "ssh", "-p tcp --sport 22")
	ssh.generate()
	assert ssh.chains() == [("filter", "ssh_OUT")]
	assert ssh.rules() == [
		("filter", "-A OUTPUT -p tcp --sport 22 -j ssh_OUT")
	]

114 115
	ssh = service("in", "ssh", "-p tcp --dport 22")
	ssh.generate(forwarding=True)
116 117 118 119 120 121
	assert ssh.chains() == [("filter", "ssh_IN")]
	assert ssh.rules() == [
		("filter", "-A INPUT -p tcp --dport 22 -j ssh_IN"),
		("filter", "-A FORWARD -p tcp --dport 22 -j ssh_IN")
	]

122 123
	ssh = service("out", "ssh", "-p tcp --sport 22")
	ssh.generate(forwarding=True)
124 125 126 127 128 129 130
	assert ssh.chains() == [("filter", "ssh_OUT")]
	assert ssh.rules() == [
		("filter", "-A OUTPUT -p tcp --sport 22 -j ssh_OUT"),
		("filter", "-A FORWARD -p tcp --sport 22 -j ssh_OUT")
	]

	print("All asserts passed.")