util.py 5.92 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-
#
# AWL simulator - common utility functions
#
Michael Büsch's avatar
Michael Büsch committed
5
# Copyright 2012-2016 Michael Buesch <m@bues.ch>
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

from __future__ import division, absolute_import, print_function, unicode_literals
from awlsim.common.compat import *

from awlsim.common.enumeration import *
from awlsim.common.exceptions import *

import sys
import os
Michael Büsch's avatar
Michael Büsch committed
30
import errno
31
import random
32 33
import base64
import binascii
Michael Büsch's avatar
Michael Büsch committed
34
import functools
Michael Büsch's avatar
Michael Büsch committed
35
import itertools
36 37 38 39 40 41


class Logging(object):
	EnumGen.start
	LOG_NONE	= EnumGen.item
	LOG_ERROR	= EnumGen.item
42
	LOG_WARNING	= EnumGen.item
43
	LOG_INFO	= EnumGen.item
44
	LOG_VERBOSE	= EnumGen.item
45 46 47
	LOG_DEBUG	= EnumGen.item
	EnumGen.end

48
	loglevel = LOG_INFO
49
	prefix = ""
50 51 52 53 54

	@classmethod
	def setLoglevel(cls, loglevel):
		if loglevel not in (cls.LOG_NONE,
				    cls.LOG_ERROR,
55
				    cls.LOG_WARNING,
56
				    cls.LOG_INFO,
57
				    cls.LOG_VERBOSE,
58 59
				    cls.LOG_DEBUG):
			raise AwlSimError("Invalid log level '%d'" % loglevel)
60
		cls.loglevel = loglevel
61

62 63 64 65
	@classmethod
	def setPrefix(cls, prefix):
		cls.prefix = prefix

66
	@classmethod
67
	def __print(cls, stream, text):
68
		with contextlib.suppress(RuntimeError):
69 70 71 72 73 74
			if stream:
				if cls.prefix:
					stream.write(cls.prefix)
				stream.write(text)
				stream.write("\n")
				stream.flush()
75 76 77

	@classmethod
	def printDebug(cls, text):
78 79 80 81 82 83 84
		if cls.loglevel >= cls.LOG_DEBUG:
			cls.__print(sys.stdout, text)

	@classmethod
	def printVerbose(cls, text):
		if cls.loglevel >= cls.LOG_VERBOSE:
			cls.__print(sys.stdout, text)
85 86 87

	@classmethod
	def printInfo(cls, text):
88 89 90 91 92 93 94
		if cls.loglevel >= cls.LOG_INFO:
			cls.__print(sys.stdout, text)

	@classmethod
	def printWarning(cls, text):
		if cls.loglevel >= cls.LOG_WARNING:
			cls.__print(sys.stderr, text)
95 96 97

	@classmethod
	def printError(cls, text):
98 99
		if cls.loglevel >= cls.LOG_ERROR:
			cls.__print(sys.stderr, text)
100 101 102 103

def printDebug(text):
	Logging.printDebug(text)

104 105 106
def printVerbose(text):
	Logging.printVerbose(text)

107 108 109
def printInfo(text):
	Logging.printInfo(text)

110 111 112
def printWarning(text):
	Logging.printWarning(text)

113 114 115
def printError(text):
	Logging.printError(text)

Michael Büsch's avatar
Michael Büsch committed
116 117 118 119 120 121 122 123 124 125 126 127 128
def fileExists(filename):
	"""Returns True, if the file exists.
	Returns False, if the file does not exist.
	Returns None, if another error occurred.
	"""
	try:
		os.stat(filename)
	except OSError as e:
		if e.errno == errno.ENOENT:
			return False
		return None
	return True

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
def awlFileRead(filename, encoding="latin_1"):
	try:
		fd = open(filename, "rb")
		data = fd.read()
		if encoding != "binary":
			data = data.decode(encoding)
		fd.close()
	except (IOError, UnicodeError) as e:
		raise AwlParserError("Failed to read '%s': %s" %\
			(filename, str(e)))
	return data

def awlFileWrite(filename, data, encoding="latin_1"):
	if encoding != "binary":
		data = "\r\n".join(data.splitlines()) + "\r\n"
	for count in range(1000):
		tmpFile = "%s-%d-%d.tmp" %\
			(filename, random.randint(0, 0xFFFF), count)
		if not os.path.exists(tmpFile):
			break
	else:
		raise AwlParserError("Could not create temporary file")
	try:
		fd = open(tmpFile, "wb")
		if encoding != "binary":
			data = data.encode(encoding)
		fd.write(data)
		fd.flush()
		fd.close()
		if not osIsPosix:
			# Can't use safe rename on non-POSIX.
			# Must unlink first.
161
			with contextlib.suppress(OSError):
162
				os.unlink(filename)
163 164 165 166
		os.rename(tmpFile, filename)
	except (IOError, OSError, UnicodeError) as e:
		raise AwlParserError("Failed to write file:\n" + str(e))
	finally:
167
		with contextlib.suppress(IOError, OSError):
168
			os.unlink(tmpFile)
169

Michael Büsch's avatar
Michael Büsch committed
170 171 172 173 174 175 176 177 178 179 180 181 182
def str2bool(string, default=False):
	"""Convert a human readable string to a boolean.
	"""
	s = string.lower().strip()
	if s in {"true", "yes", "on", "enable", "enabled"}:
		return True
	if s in {"false", "no", "off", "disable", "disabled"}:
		return False
	try:
		return bool(int(s, 10))
	except ValueError:
		return default

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
def strToBase64(string, ignoreErrors=False):
	"""Convert a string to a base64 encoded ascii string.
	Throws ValueError on errors, if ignoreErrors is False."""

	try:
		b = string.encode("utf-8", "ignore" if ignoreErrors else "strict")
		return base64.b64encode(b).decode("ascii")
	except (UnicodeError, binascii.Error, TypeError) as e:
		if ignoreErrors:
			return ""
		raise ValueError

def base64ToStr(b64String, ignoreErrors=False):
	"""Convert a base64 encoded ascii string to utf-8 string.
	Throws ValueError on errors, if ignoreErrors is False."""

	try:
		b = b64String.encode("ascii",
			"ignore" if ignoreErrors else "strict")
		return base64.b64decode(b).decode("utf-8",
			"ignore" if ignoreErrors else "strict")
	except (UnicodeError, binascii.Error, TypeError) as e:
		if ignoreErrors:
			return ""
		raise ValueError
Michael Büsch's avatar
Michael Büsch committed
208 209 210 211

def bytesToHexStr(_bytes):
	"""Convert bytes to a hex-string.
	"""
212 213
	if _bytes is None:
		return None
Michael Büsch's avatar
Michael Büsch committed
214
	return binascii.b2a_hex(_bytes).decode("ascii")
215 216 217 218 219 220 221 222 223 224 225 226 227 228

def envClearLang(env, lang = "C"):
	"""Reset the language settings of an environment dict
	to some expected value and return the result.
	"""
	env = dict(env)
	env["LANG"] = lang
	for i in {"LANGUAGE", "LC_CTYPE", "LC_NUMERIC",
		  "LC_TIME", "LC_COLLATE", "LC_MONETARY",
		  "LC_MESSAGES", "LC_PAPER", "LC_NAME",
		  "LC_ADDRESS", "LC_TELEPHONE", "LC_MEASUREMENT",
		  "LC_IDENTIFICATION",}:
		env.pop(i, None)
	return env
229 230 231 232 233 234 235 236 237 238

def isiterable(obj):
	"""Check if an object is iterable.
	"""
	try:
		iter(obj)
		return True
	except TypeError:
		pass
	return False