util.py 8.38 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-
#
# AWL simulator - common utility functions
#
5
# Copyright 2012-2017 Michael Buesch <m@bues.ch>
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

from __future__ import division, absolute_import, print_function, unicode_literals
from awlsim.common.compat import *

from awlsim.common.enumeration import *
from awlsim.common.exceptions import *

import sys
import os
Michael Büsch's avatar
Michael Büsch committed
30
import errno
31
import random
32 33
import base64
import binascii
Michael Büsch's avatar
Michael Büsch committed
34
import functools
Michael Büsch's avatar
Michael Büsch committed
35
import itertools
36 37 38 39 40 41


class Logging(object):
	EnumGen.start
	LOG_NONE	= EnumGen.item
	LOG_ERROR	= EnumGen.item
42
	LOG_WARNING	= EnumGen.item
43
	LOG_INFO	= EnumGen.item
44
	LOG_VERBOSE	= EnumGen.item
45 46 47
	LOG_DEBUG	= EnumGen.item
	EnumGen.end

48
	loglevel = LOG_INFO
49
	prefix = ""
50 51 52 53 54

	@classmethod
	def setLoglevel(cls, loglevel):
		if loglevel not in (cls.LOG_NONE,
				    cls.LOG_ERROR,
55
				    cls.LOG_WARNING,
56
				    cls.LOG_INFO,
57
				    cls.LOG_VERBOSE,
58 59
				    cls.LOG_DEBUG):
			raise AwlSimError("Invalid log level '%d'" % loglevel)
60
		cls.loglevel = loglevel
61

62 63 64 65
	@classmethod
	def setPrefix(cls, prefix):
		cls.prefix = prefix

66
	@classmethod
67
	def __print(cls, stream, text):
68
		with contextlib.suppress(RuntimeError):
69 70 71 72 73 74
			if stream:
				if cls.prefix:
					stream.write(cls.prefix)
				stream.write(text)
				stream.write("\n")
				stream.flush()
75 76 77

	@classmethod
	def printDebug(cls, text):
78 79 80 81 82 83 84
		if cls.loglevel >= cls.LOG_DEBUG:
			cls.__print(sys.stdout, text)

	@classmethod
	def printVerbose(cls, text):
		if cls.loglevel >= cls.LOG_VERBOSE:
			cls.__print(sys.stdout, text)
85 86 87

	@classmethod
	def printInfo(cls, text):
88 89 90 91 92 93 94
		if cls.loglevel >= cls.LOG_INFO:
			cls.__print(sys.stdout, text)

	@classmethod
	def printWarning(cls, text):
		if cls.loglevel >= cls.LOG_WARNING:
			cls.__print(sys.stderr, text)
95 96 97

	@classmethod
	def printError(cls, text):
98 99
		if cls.loglevel >= cls.LOG_ERROR:
			cls.__print(sys.stderr, text)
100 101 102 103

def printDebug(text):
	Logging.printDebug(text)

104 105 106
def printVerbose(text):
	Logging.printVerbose(text)

107 108 109
def printInfo(text):
	Logging.printInfo(text)

110 111 112
def printWarning(text):
	Logging.printWarning(text)

113 114 115
def printError(text):
	Logging.printError(text)

Michael Büsch's avatar
Michael Büsch committed
116 117 118 119 120 121 122 123 124 125 126 127 128
def fileExists(filename):
	"""Returns True, if the file exists.
	Returns False, if the file does not exist.
	Returns None, if another error occurred.
	"""
	try:
		os.stat(filename)
	except OSError as e:
		if e.errno == errno.ENOENT:
			return False
		return None
	return True

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
def awlFileRead(filename, encoding="latin_1"):
	try:
		fd = open(filename, "rb")
		data = fd.read()
		if encoding != "binary":
			data = data.decode(encoding)
		fd.close()
	except (IOError, UnicodeError) as e:
		raise AwlParserError("Failed to read '%s': %s" %\
			(filename, str(e)))
	return data

def awlFileWrite(filename, data, encoding="latin_1"):
	if encoding != "binary":
		data = "\r\n".join(data.splitlines()) + "\r\n"
	for count in range(1000):
		tmpFile = "%s-%d-%d.tmp" %\
			(filename, random.randint(0, 0xFFFF), count)
		if not os.path.exists(tmpFile):
			break
	else:
		raise AwlParserError("Could not create temporary file")
	try:
		fd = open(tmpFile, "wb")
		if encoding != "binary":
			data = data.encode(encoding)
		fd.write(data)
		fd.flush()
		fd.close()
		if not osIsPosix:
			# Can't use safe rename on non-POSIX.
			# Must unlink first.
161
			with contextlib.suppress(OSError):
162
				os.unlink(filename)
163 164 165 166
		os.rename(tmpFile, filename)
	except (IOError, OSError, UnicodeError) as e:
		raise AwlParserError("Failed to write file:\n" + str(e))
	finally:
167
		with contextlib.suppress(IOError, OSError):
168
			os.unlink(tmpFile)
169

Michael Büsch's avatar
Michael Büsch committed
170 171 172 173 174 175 176 177 178 179 180 181 182
def str2bool(string, default=False):
	"""Convert a human readable string to a boolean.
	"""
	s = string.lower().strip()
	if s in {"true", "yes", "on", "enable", "enabled"}:
		return True
	if s in {"false", "no", "off", "disable", "disabled"}:
		return False
	try:
		return bool(int(s, 10))
	except ValueError:
		return default

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
def strToBase64(string, ignoreErrors=False):
	"""Convert a string to a base64 encoded ascii string.
	Throws ValueError on errors, if ignoreErrors is False."""

	try:
		b = string.encode("utf-8", "ignore" if ignoreErrors else "strict")
		return base64.b64encode(b).decode("ascii")
	except (UnicodeError, binascii.Error, TypeError) as e:
		if ignoreErrors:
			return ""
		raise ValueError

def base64ToStr(b64String, ignoreErrors=False):
	"""Convert a base64 encoded ascii string to utf-8 string.
	Throws ValueError on errors, if ignoreErrors is False."""

	try:
		b = b64String.encode("ascii",
			"ignore" if ignoreErrors else "strict")
		return base64.b64decode(b).decode("utf-8",
			"ignore" if ignoreErrors else "strict")
	except (UnicodeError, binascii.Error, TypeError) as e:
		if ignoreErrors:
			return ""
		raise ValueError
Michael Büsch's avatar
Michael Büsch committed
208 209 210 211

def bytesToHexStr(_bytes):
	"""Convert bytes to a hex-string.
	"""
212 213
	if _bytes is None:
		return None
Michael Büsch's avatar
Michael Büsch committed
214
	return binascii.b2a_hex(_bytes).decode("ascii")
215 216 217 218 219 220 221 222 223 224 225 226 227 228

def envClearLang(env, lang = "C"):
	"""Reset the language settings of an environment dict
	to some expected value and return the result.
	"""
	env = dict(env)
	env["LANG"] = lang
	for i in {"LANGUAGE", "LC_CTYPE", "LC_NUMERIC",
		  "LC_TIME", "LC_COLLATE", "LC_MONETARY",
		  "LC_MESSAGES", "LC_PAPER", "LC_NAME",
		  "LC_ADDRESS", "LC_TELEPHONE", "LC_MEASUREMENT",
		  "LC_IDENTIFICATION",}:
		env.pop(i, None)
	return env
229

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
def __isInteger_python2(value):
	return isinstance(value, int) or\
	       isinstance(value, long)

def __isInteger_python3(value):
	return isinstance(value, int)

isInteger = py23(__isInteger_python2,
		 __isInteger_python3)

def __isString_python2(value):
	return isinstance(value, unicode) or\
	       isinstance(value, str)

def __isString_python3(value):
	return isinstance(value, str)

isString = py23(__isString_python2,
		__isString_python3)

250 251 252 253 254 255 256 257 258
def isiterable(obj):
	"""Check if an object is iterable.
	"""
	try:
		iter(obj)
		return True
	except TypeError:
		pass
	return False
259

260 261 262
def getfirst(iterable, exception=KeyError):
	"""Get the first item from an iterable.
	This also works for generators.
263 264
	If the iterable is empty, exception is raised.
	If exception is None, None is returned instead.
265 266
	Warning: If iterable is not indexable (for example a set),
		 an arbitrary item is returned instead.
267 268 269 270 271 272 273 274
	"""
	try:
		return next(iter(iterable))
	except StopIteration:
		if exception:
			raise exception
		return None

275 276 277 278 279
# Get an arbitrary item from an iterable.
# If the iterable is empty, exception is raised.
# If exception is None, None is returned instead.
getany = getfirst

280 281 282 283 284 285 286 287 288 289 290 291 292 293
def toList(value):
	"""Returns value, if value is a list.
	Returns a list with the elements of value, if value is a set.
	Returns a list with the elements of value, if value is an iterable, but not a string.
	Otherwise returns a list with value as element.
	"""
	if isinstance(value, list):
		return value
	if isinstance(value, set):
		return sorted(value)
	if not isString(value):
		if isiterable(value):
			return list(value)
	return [ value, ]
294 295 296 297 298 299 300 301 302 303 304

class nopContextManager(object):
	"""No-operation context manager.
	"""

	def __enter__(self):
		return None

	def __exit__(self, exctype, excinst, exctb):
		return False
nopContext = nopContextManager()
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328

class RelPath(object):
	def __init__(self, relativeToDir):
		self.__relativeToDir = relativeToDir

	def toRelative(self, path):
		"""Generate an OS-independent relative string from a path."""
		path = os.path.relpath(path, self.__relativeToDir)
		if os.path.splitdrive(path)[0]:
			raise AwlSimError("Failed to strip the drive letter from a path, "
				"because the base and the path don't reside on the "
				"same drive. Please make sure the base and the path "
				"reside on the same drive.\n"
				"Base: %s\n"
				"Path: %s" % (
				self.__relativeToDir, path))
		path = path.replace(os.path.sep, "/")
		return path

	def fromRelative(self, path):
		"""Generate a path from an OS-independent relative string."""
		path = path.replace("/", os.path.sep)
		path = os.path.join(self.__relativeToDir, path)
		return path