util.py 11.5 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-
#
# AWL simulator - common utility functions
#
5
# Copyright 2012-2018 Michael Buesch <m@bues.ch>
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#

from __future__ import division, absolute_import, print_function, unicode_literals
23
#from awlsim.common.cython_support cimport * #@cy
24 25
from awlsim.common.compat import *

26
from awlsim.common.datatypehelpers import * #+cimport
27 28 29 30 31
from awlsim.common.enumeration import *
from awlsim.common.exceptions import *

import sys
import os
Michael Büsch's avatar
Michael Büsch committed
32
import errno
33
import random
34 35
import base64
import binascii
Michael Büsch's avatar
Michael Büsch committed
36
import functools
Michael Büsch's avatar
Michael Büsch committed
37
import itertools
38
from collections import deque
39
import time
40
from copy import copy, deepcopy
41 42


Michael Büsch's avatar
Michael Büsch committed
43 44 45
__all__ = [
	"functools",
	"itertools",
46
	"deque",
Michael Büsch's avatar
Michael Büsch committed
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
	"Logging",
	"printDebug",
	"printVerbose",
	"printInfo",
	"printWarning",
	"printError",
	"fileExists",
	"safeFileRead",
	"safeFileWrite",
	"strPartitionFull",
	"str2bool",
	"strToBase64",
	"base64ToStr",
	"bytesToHexStr",
	"toUnixEol",
	"toDosEol",
63
	"strEqual",
Michael Büsch's avatar
Michael Büsch committed
64 65 66 67 68 69 70 71 72
	"isiterable",
	"getfirst",
	"getany",
	"toList",
	"toSet",
	"pivotDict",
	"listIndex",
	"listToHumanStr",
	"listExpand",
73
	"clamp",
Michael Büsch's avatar
Michael Büsch committed
74 75 76
	"math_gcd",
	"math_lcm",
	"RelPath",
77
	"shortUUID",
78 79
	"copy",
	"deepcopy",
Michael Büsch's avatar
Michael Büsch committed
80 81 82
]


83 84 85 86
class Logging(object):
	EnumGen.start
	LOG_NONE	= EnumGen.item
	LOG_ERROR	= EnumGen.item
87
	LOG_WARNING	= EnumGen.item
88
	LOG_INFO	= EnumGen.item
89
	LOG_VERBOSE	= EnumGen.item
90 91 92
	LOG_DEBUG	= EnumGen.item
	EnumGen.end

93
	loglevel = LOG_INFO
94
	prefix = ""
95

96
	startupTime = time.time()
97

98 99 100 101
	@classmethod
	def setLoglevel(cls, loglevel):
		if loglevel not in (cls.LOG_NONE,
				    cls.LOG_ERROR,
102
				    cls.LOG_WARNING,
103
				    cls.LOG_INFO,
104
				    cls.LOG_VERBOSE,
105
				    cls.LOG_DEBUG):
106
			raise AwlSimError("Invalid log level '%d'" % loglevel) #@nocov
107
		cls.loglevel = loglevel
108

109 110 111 112
	@classmethod
	def setPrefix(cls, prefix):
		cls.prefix = prefix

113
	@classmethod
114
	def __print(cls, stream, text):
115
		with contextlib.suppress(RuntimeError):
116 117 118
			if stream:
				if cls.prefix:
					stream.write(cls.prefix)
119
				now = time.time() - cls.startupTime
120
				stream.write("[%.3f] " % now)
121 122 123
				stream.write(text)
				stream.write("\n")
				stream.flush()
124 125

	@classmethod
126
	def printDebug(cls, text): #@nocov
127 128 129 130
		if cls.loglevel >= cls.LOG_DEBUG:
			cls.__print(sys.stdout, text)

	@classmethod
131
	def printVerbose(cls, text): #@nocov
132 133
		if cls.loglevel >= cls.LOG_VERBOSE:
			cls.__print(sys.stdout, text)
134 135

	@classmethod
136
	def printInfo(cls, text): #@nocov
137 138 139 140
		if cls.loglevel >= cls.LOG_INFO:
			cls.__print(sys.stdout, text)

	@classmethod
141
	def printWarning(cls, text): #@nocov
142 143
		if cls.loglevel >= cls.LOG_WARNING:
			cls.__print(sys.stderr, text)
144 145

	@classmethod
146
	def printError(cls, text): #@nocov
147 148
		if cls.loglevel >= cls.LOG_ERROR:
			cls.__print(sys.stderr, text)
149

150
def printDebug(text): #@nocov
151 152
	Logging.printDebug(text)

153
def printVerbose(text): #@nocov
154 155
	Logging.printVerbose(text)

156
def printInfo(text): #@nocov
157 158
	Logging.printInfo(text)

159
def printWarning(text): #@nocov
160 161
	Logging.printWarning(text)

162
def printError(text): #@nocov
163 164
	Logging.printError(text)

Michael Büsch's avatar
Michael Büsch committed
165 166 167 168 169 170 171 172 173 174 175 176 177
def fileExists(filename):
	"""Returns True, if the file exists.
	Returns False, if the file does not exist.
	Returns None, if another error occurred.
	"""
	try:
		os.stat(filename)
	except OSError as e:
		if e.errno == errno.ENOENT:
			return False
		return None
	return True

178
def safeFileRead(filename):
179
	try:
180 181 182
		with open(filename, "rb") as fd:
			data = fd.read()
			fd.close()
183
	except IOError as e: #@nocov
184
		raise AwlSimError("Failed to read '%s': %s" %\
185 186 187
			(filename, str(e)))
	return data

188
def safeFileWrite(filename, data):
189 190 191 192 193 194
	for count in range(1000):
		tmpFile = "%s-%d-%d.tmp" %\
			(filename, random.randint(0, 0xFFFF), count)
		if not os.path.exists(tmpFile):
			break
	else:
195
		raise AwlSimError("Could not create temporary file")
196
	try:
197 198 199 200
		with open(tmpFile, "wb") as fd:
			fd.write(data)
			fd.flush()
			fd.close()
201
		if not osIsPosix: #@nocov
202 203
			# Can't use safe rename on non-POSIX.
			# Must unlink first.
204
			with contextlib.suppress(IOError, OSError):
205
				os.unlink(filename)
206
		os.rename(tmpFile, filename)
207
	except (IOError, OSError) as e: #@nocov
208
		raise AwlSimError("Failed to write file:\n" + str(e))
209
	finally:
210
		with contextlib.suppress(IOError, OSError):
211
			os.unlink(tmpFile)
212

213 214 215 216 217 218 219 220 221 222 223 224 225 226
# Fully partition a string by separator 'sep'.
# Returns a list of strings:
# [ "first-element", sep, "second-element", sep, ... ]
# If 'keepEmpty' is True, empty elements are kept.
def strPartitionFull(string, sep, keepEmpty=True):
	first, ret = True, []
	for elem in string.split(sep):
		if not first:
			ret.append(sep)
		if elem or keepEmpty:
			ret.append(elem)
		first = False
	return ret

Michael Büsch's avatar
Michael Büsch committed
227 228 229 230 231 232 233 234 235 236 237 238 239
def str2bool(string, default=False):
	"""Convert a human readable string to a boolean.
	"""
	s = string.lower().strip()
	if s in {"true", "yes", "on", "enable", "enabled"}:
		return True
	if s in {"false", "no", "off", "disable", "disabled"}:
		return False
	try:
		return bool(int(s, 10))
	except ValueError:
		return default

240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
def strToBase64(string, ignoreErrors=False):
	"""Convert a string to a base64 encoded ascii string.
	Throws ValueError on errors, if ignoreErrors is False."""

	try:
		b = string.encode("utf-8", "ignore" if ignoreErrors else "strict")
		return base64.b64encode(b).decode("ascii")
	except (UnicodeError, binascii.Error, TypeError) as e:
		if ignoreErrors:
			return ""
		raise ValueError

def base64ToStr(b64String, ignoreErrors=False):
	"""Convert a base64 encoded ascii string to utf-8 string.
	Throws ValueError on errors, if ignoreErrors is False."""

	try:
		b = b64String.encode("ascii",
			"ignore" if ignoreErrors else "strict")
		return base64.b64decode(b).decode("utf-8",
			"ignore" if ignoreErrors else "strict")
	except (UnicodeError, binascii.Error, TypeError) as e:
		if ignoreErrors:
			return ""
		raise ValueError
Michael Büsch's avatar
Michael Büsch committed
265 266 267 268

def bytesToHexStr(_bytes):
	"""Convert bytes to a hex-string.
	"""
269 270
	if _bytes is None:
		return None
Michael Büsch's avatar
Michael Büsch committed
271
	return binascii.b2a_hex(_bytes).decode("ascii")
272

273 274 275 276 277 278 279 280 281 282 283 284 285
def toUnixEol(string):
	"""Convert a string to UNIX line endings,
	no matter what line endings (mix) the input string is.
	"""
	return string.replace("\r\n", "\n")\
		     .replace("\r", "\n")

def toDosEol(string):
	"""Convert a string to DOS line endings,
	no matter what line endings (mix) the input string is.
	"""
	return toUnixEol(string).replace("\n", "\r\n")

286 287 288 289 290 291 292 293 294 295 296 297
def strEqual(string0, string1, caseSensitive=True):
	"""Compare string0 to string1.
	If caseSensitive is False, case is ignored.
	Returns True, if both strings are equal.
	"""
	if not caseSensitive:
		if hasattr(string0, "casefold"):
			string0, string1 = string0.casefold(), string1.casefold()
		else:
			string0, string1 = string0.lower(), string1.lower()
	return string0 == string1

298 299 300 301 302 303 304 305 306
def isiterable(obj):
	"""Check if an object is iterable.
	"""
	try:
		iter(obj)
		return True
	except TypeError:
		pass
	return False
307

308 309 310
def getfirst(iterable, exception=KeyError):
	"""Get the first item from an iterable.
	This also works for generators.
311 312
	If the iterable is empty, exception is raised.
	If exception is None, None is returned instead.
313 314
	Warning: If iterable is not indexable (for example a set),
		 an arbitrary item is returned instead.
315 316 317 318 319 320 321 322
	"""
	try:
		return next(iter(iterable))
	except StopIteration:
		if exception:
			raise exception
		return None

323 324 325 326 327
# Get an arbitrary item from an iterable.
# If the iterable is empty, exception is raised.
# If exception is None, None is returned instead.
getany = getfirst

328 329 330
def toList(value):
	"""Returns value, if value is a list.
	Returns a list with the elements of value, if value is a set.
331
	Returns a list with the elements of value, if value is a frozenset.
332 333 334 335 336
	Returns a list with the elements of value, if value is an iterable, but not a string.
	Otherwise returns a list with value as element.
	"""
	if isinstance(value, list):
		return value
337
	if isinstance(value, (set, frozenset)):
338
		return sorted(value)
339 340
	if not isString(value) and isiterable(value):
		return list(value)
341
	return [ value, ]
342

343 344 345 346 347 348 349 350
# Returns value, if value is a set.
# Returns a set, if value is a frozenset.
# Returns a set with the elements of value, if value is a tuple.
# Returns a set with the elements of value, if value is a list.
# Otherwise returns a set with value as single element.
def toSet(value):
	if isinstance(value, set):
		return value
351
	if isinstance(value, (frozenset, list, tuple)):
352 353 354 355 356 357 358
		return set(value)
	return { value, }

def pivotDict(inDict):
	outDict = {}
	for key, value in dictItems(inDict):
		if value in outDict:
359
			raise KeyError("Ambiguous key in pivot dict") #@nocov
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
		outDict[value] = key
	return outDict

# Returns the index of a list element, or -1 if not found.
# If translate if not None, it should be a callable that translates
# a list entry. Arguments are index, entry.
def listIndex(_list, value, start=0, stop=-1, translate=None):
	if stop < 0:
		stop = len(_list)
	if translate:
		for i, ent in enumerate(_list[start:stop], start):
			if translate(i, ent) == value:
				return i
		return -1
	try:
		return _list.index(value, start, stop)
	except ValueError:
		return -1

# Convert an integer list to a human readable string.
# Example: [1, 2, 3]  ->  "1, 2 or 3"
def listToHumanStr(lst, lastSep="or"):
	if not lst:
		return ""
	lst = toList(lst)
	string = ", ".join(str(i) for i in lst)
	# Replace last comma with 'lastSep'
	string = string[::-1].replace(",", lastSep[::-1] + " ", 1)[::-1]
	return string

# Expand the elements of a list.
# 'expander' is the expansion callback. 'expander' takes
# one list element as argument. It returns a list.
def listExpand(lst, expander):
	ret = []
	for item in lst:
		ret.extend(expander(item))
	return ret

399 400 401 402 403
def clamp(value, minValue, maxValue):
	"""Clamp value to the range minValue-maxValue.
	ValueError is raised, if minValue is bigger than maxValue.
	"""
	if minValue > maxValue:
404
		raise ValueError #@nocov
405 406
	return max(min(value, maxValue), minValue)

407 408 409 410 411 412 413 414 415
# Get "Greatest Common Divisor"
def math_gcd(*args):
	return reduce(compat_gcd, args)

# Get "Least Common Multiple"
def math_lcm(*args):
	return reduce(lambda x, y: x * y // math_gcd(x, y),
		      args)

416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
class RelPath(object):
	def __init__(self, relativeToDir):
		self.__relativeToDir = relativeToDir

	def toRelative(self, path):
		"""Generate an OS-independent relative string from a path."""
		path = os.path.relpath(path, self.__relativeToDir)
		if os.path.splitdrive(path)[0]:
			raise AwlSimError("Failed to strip the drive letter from a path, "
				"because the base and the path don't reside on the "
				"same drive. Please make sure the base and the path "
				"reside on the same drive.\n"
				"Base: %s\n"
				"Path: %s" % (
				self.__relativeToDir, path))
		path = path.replace(os.path.sep, "/")
		return path

	def fromRelative(self, path):
		"""Generate a path from an OS-independent relative string."""
		path = path.replace("/", os.path.sep)
		path = os.path.join(self.__relativeToDir, path)
		return path
439 440 441 442 443 444 445 446 447 448

def shortUUID(uuidStr):
	"""Shorten an uuid string.
	"""
	uuidStr = str(uuidStr).strip()
	if len(uuidStr) == 36 and\
	   uuidStr[8] == '-' and\
	   uuidStr[13] == '-' and\
	   uuidStr[18] == '-' and\
	   uuidStr[23] == '-':
449
		uuidStr = uuidStr[0:8] + ".." + uuidStr[-6:-1]
450
	return uuidStr