eloipool.py 28.4 KB
Newer Older
1
#!/usr/bin/python3
2
# Eloipool - Python Bitcoin pool server
3
# Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4
# Portions written by Peter Leurs <kinlo@triplemining.com>
5 6 7 8 9 10 11 12 13 14 15 16 17 18
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

19 20 21 22 23 24 25 26
import argparse
import importlib
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
args = argparser.parse_args()
configmod = 'config'
if not args.config is None:
	configmod = 'config_%s' % (args.config,)
27
__import__(configmod)
28
config = importlib.import_module(configmod)
29

Luke Dashjr's avatar
Luke Dashjr committed
30 31 32
if not hasattr(config, 'ServerName'):
	config.ServerName = 'Unnamed Eloipool'

33 34 35
if not hasattr(config, 'ShareTarget'):
	config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff

36

37
import logging
38
import logging.handlers
39

40 41 42 43
rootlogger = logging.getLogger(None)
logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
logformatter = logging.Formatter(logformat)
if len(rootlogger.handlers) == 0:
44
	logging.basicConfig(
45
		format=logformat,
46 47
		level=logging.DEBUG,
	)
48 49 50 51 52 53 54 55 56 57 58
	for infoOnly in (
		'checkShare',
		'getTarget',
		'JSONRPCHandler',
		'JSONRPCServer',
		'merkleMaker',
		'StratumServer',
		'Waker for JSONRPCServer',
		'Waker for StratumServer',
		'WorkLogPruner'
	):
59
		logging.getLogger(infoOnly).setLevel(logging.INFO)
60 61 62 63 64 65 66 67 68 69
if getattr(config, 'LogToSysLog', False):
	sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
	rootlogger.addHandler(sysloghandler)
if hasattr(config, 'LogFile'):
	if isinstance(config.LogFile, str):
		filehandler = logging.FileHandler(config.LogFile)
	else:
		filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
	filehandler.setFormatter(logformatter)
	rootlogger.addHandler(filehandler)
70

71
def RaiseRedFlags(reason):
72
	logging.getLogger('redflag').critical(reason)
73 74 75
	return reason


76 77 78
from bitcoin.node import BitcoinLink, BitcoinNode
bcnode = BitcoinNode(config.UpstreamNetworkId)
bcnode.userAgent += b'Eloipool:0.1/'
79
bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
80

81
import jsonrpc
82

83 84 85 86 87 88 89
try:
	import jsonrpc.authproxy
	jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
except:
	pass


Luke Dashjr's avatar
Luke Dashjr committed
90
from bitcoin.script import BitcoinScript
91
from bitcoin.txn import Txn
92
from base58 import b58decode
93
from binascii import b2a_hex
94
from struct import pack
Luke Dashjr's avatar
Luke Dashjr committed
95
import subprocess
96
from time import time
97

98
def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True, prevBlockHex = None):
99
	txn = Txn.new()
Luke Dashjr's avatar
Luke Dashjr committed
100
	
101
	if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
Luke Dashjr's avatar
Luke Dashjr committed
102 103 104 105
		coinbased = 0
		try:
			cmd = config.CoinbaserCmd
			cmd = cmd.replace('%d', str(coinbaseValue))
106
			cmd = cmd.replace('%p', prevBlockHex or '""')
Luke Dashjr's avatar
Luke Dashjr committed
107 108 109 110 111 112
			p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
			nout = int(p.stdout.readline())
			for i in range(nout):
				amount = int(p.stdout.readline())
				addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
				pkScript = BitcoinScript.toAddress(addr)
113
				txn.addOutput(amount, pkScript)
Luke Dashjr's avatar
Luke Dashjr committed
114 115 116 117 118
				coinbased += amount
		except:
			coinbased = coinbaseValue + 1
		if coinbased >= coinbaseValue:
			logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
119
			txn.outputs = []
Luke Dashjr's avatar
Luke Dashjr committed
120 121 122
		else:
			coinbaseValue -= coinbased
	
Luke Dashjr's avatar
Luke Dashjr committed
123
	pkScript = BitcoinScript.toAddress(config.TrackerAddr)
124
	txn.addOutput(coinbaseValue, pkScript)
Luke Dashjr's avatar
Luke Dashjr committed
125
	
126
	# TODO
127
	# TODO: red flag on dupe coinbase
128
	return txn
129

130

131
import jsonrpc_getwork
132
from util import Bits2Target
133

Luke Dashjr's avatar
Luke Dashjr committed
134
workLog = {}
135
userStatus = {}
136
networkTarget = None
137
DupeShareHACK = {}
Luke Dashjr's avatar
Luke Dashjr committed
138

Luke Dashjr's avatar
Luke Dashjr committed
139
server = None
140
stratumsrv = None
141
def updateBlocks():
142
	server.wakeLongpoll()
143
	stratumsrv.updateJob()
144

145
def blockChanged():
Luke Dashjr's avatar
Luke Dashjr committed
146
	global MM, networkTarget, server
147 148 149 150 151
	bits = MM.currentBlock[2]
	if bits is None:
		networkTarget = None
	else:
		networkTarget = Bits2Target(bits)
152 153 154 155 156
	if MM.lastBlock != (None, None, None):
		global DupeShareHACK
		DupeShareHACK = {}
		jsonrpc_getwork._CheckForDupesHACK = {}
		workLog.clear()
157
	server.wakeLongpoll(wantClear=True)
158
	stratumsrv.updateJob(wantClear=True)
Luke Dashjr's avatar
Luke Dashjr committed
159 160


161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
from time import sleep, time
import traceback

def _WorkLogPruner_I(wl):
	now = time()
	pruned = 0
	for username in wl:
		userwork = wl[username]
		for wli in tuple(userwork.keys()):
			if now > userwork[wli][1] + 120:
				del userwork[wli]
				pruned += 1
	WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))

def WorkLogPruner(wl):
	while True:
		try:
			sleep(60)
			_WorkLogPruner_I(wl)
		except:
			WorkLogPruner.logger.error(traceback.format_exc())
WorkLogPruner.logger = logging.getLogger('WorkLogPruner')


185 186
from merklemaker import merkleMaker
MM = merkleMaker()
187
MM.__dict__.update(config.__dict__)
188
MM.makeCoinbaseTxn = makeCoinbaseTxn
189
MM.onBlockChange = blockChanged
190
MM.onBlockUpdate = updateBlocks
191

192

193
from binascii import b2a_hex
194
from copy import deepcopy
195
from math import ceil, log
196
from merklemaker import MakeBlockHeader
197
from struct import pack, unpack
198
import threading
Luke Dashjr's avatar
Luke Dashjr committed
199
from time import time
Luke Dashjr's avatar
Luke Dashjr committed
200
from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
201
import jsonrpc
202
import traceback
203 204 205 206

gotwork = None
if hasattr(config, 'GotWorkURI'):
	gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
Luke Dashjr's avatar
Luke Dashjr committed
207

208 209 210
if not hasattr(config, 'DelayLogForUpstream'):
	config.DelayLogForUpstream = False

211 212 213
if not hasattr(config, 'DynamicTargetting'):
	config.DynamicTargetting = 0
else:
214 215 216
	if not hasattr(config, 'DynamicTargetWindow'):
		config.DynamicTargetWindow = 120
	config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
217

Luke Dashjr's avatar
Luke Dashjr committed
218 219 220 221
def submitGotwork(info):
	try:
		gotwork.gotwork(info)
	except:
222
		checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
Luke Dashjr's avatar
Luke Dashjr committed
223

224 225 226
if not hasattr(config, 'GotWorkTarget'):
	config.GotWorkTarget = 0

227 228 229 230 231
def clampTarget(target, DTMode):
	# ShareTarget is the minimum
	if target is None or target > config.ShareTarget:
		target = config.ShareTarget
	
232 233
	# Never target above upstream(s), as we'd lose blocks
	target = max(target, networkTarget, config.GotWorkTarget)
234 235 236
	
	if DTMode == 2:
		# Ceil target to a power of two :)
Luke Dashjr's avatar
Luke Dashjr committed
237
		truebits = log(target, 2)
Luke Dashjr's avatar
Luke Dashjr committed
238 239 240
		if target <= 2**int(truebits):
			# Workaround for bug in Python's math.log function
			truebits = int(truebits)
Luke Dashjr's avatar
Luke Dashjr committed
241
		target = 2**ceil(truebits) - 1
242 243 244 245 246 247 248 249 250
	elif DTMode == 3:
		# Round target to multiple of bdiff 1
		target = bdiff1target / int(round(target2bdiff(target)))
	
	# Return None for ShareTarget to save memory
	if target == config.ShareTarget:
		return None
	return target

251
def getTarget(username, now, DTMode = None, RequestedTarget = None):
252 253
	if DTMode is None:
		DTMode = config.DynamicTargetting
254
	if not DTMode:
255 256 257 258
		return None
	if username in userStatus:
		status = userStatus[username]
	else:
259
		# No record, use default target
260 261 262
		RequestedTarget = clampTarget(RequestedTarget, DTMode)
		userStatus[username] = [RequestedTarget, now, 0]
		return RequestedTarget
263 264
	(targetIn, lastUpdate, work) = status
	if work <= config.DynamicTargetGoal:
265
		if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
266 267
			# No reason to change it just yet
			return clampTarget(targetIn, DTMode)
268
		if not work:
269
			# No shares received, reset to minimum
270
			if targetIn:
271
				getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
272
				userStatus[username] = [None, now, 0]
273
			return clampTarget(None, DTMode)
274 275
	
	deltaSec = now - lastUpdate
276
	target = targetIn or config.ShareTarget
277
	target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
278
	target = clampTarget(target, DTMode)
279 280
	if target != targetIn:
		pfx = 'Retargetting %s' % (repr(username),)
281 282 283 284
		tin = targetIn or config.ShareTarget
		getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
		tgt = target or config.ShareTarget
		getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
285 286 287 288
	userStatus[username] = [target, now, 0]
	return target
getTarget.logger = logging.getLogger('getTarget')

289 290 291 292 293 294 295 296 297 298
def TopTargets(n = 0x10):
	tmp = list(k for k, v in userStatus.items() if not v[0] is None)
	tmp.sort(key=lambda k: -userStatus[k][0])
	tmp2 = {}
	def t2d(t):
		if t not in tmp2:
			tmp2[t] = target2pdiff(t)
		return tmp2[t]
	for k in tmp[-n:]:
		tgt = userStatus[k][0]
299
		print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
300

301
def RegisterWork(username, wli, wld, RequestedTarget = None):
302
	now = time()
303
	target = getTarget(username, now, RequestedTarget=RequestedTarget)
304 305
	wld = tuple(wld) + (target,)
	workLog.setdefault(username, {})[wli] = (wld, now)
306 307
	return target or config.ShareTarget

Luke Dashjr's avatar
Luke Dashjr committed
308
def getBlockHeader(username):
309
	MRD = MM.getMRD()
310 311
	merkleRoot = MRD[0]
	hdr = MakeBlockHeader(MRD)
312
	workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
313
	target = RegisterWork(username, merkleRoot, MRD)
314
	return (hdr, workLog[username][merkleRoot], target)
315

316
def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
317 318 319 320 321 322 323 324
	if server.tls.wantClear:
		wantClear = True
	elif p_magic and username not in workLog:
		wantClear = True
		p_magic[0] = True
	else:
		wantClear = False
	MC = MM.getMC(wantClear)
325
	(dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
326 327 328
	wliPos = coinbase[0] + 2
	wliLen = coinbase[wliPos - 1]
	wli = coinbase[wliPos:wliPos+wliLen]
329
	target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
330
	return (MC, workLog[username][wli], target)
331

332 333 334 335 336 337 338
def getStratumJob(jobid, wantClear = False):
	MC = MM.getMC(wantClear)
	(dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
	now = time()
	workLog.setdefault(None, {})[jobid] = (MC, now)
	return (MC, workLog[None][jobid])

339 340 341 342
def getExistingStratumJob(jobid):
	wld = workLog[None][jobid]
	return (wld[0], wld)

343
loggersShare = []
344
authenticators = []
345

346
RBDs = []
347 348
RBPs = []

349 350
from bitcoin.varlen import varlenEncode, varlenDecode
import bitcoin.txn
351
from merklemaker import assembleBlock
352

353 354 355
if not hasattr(config, 'BlockSubmissions'):
	config.BlockSubmissions = None

356 357
RBFs = []
def blockSubmissionThread(payload, blkhash, share):
358 359 360 361 362
	if config.BlockSubmissions is None:
		servers = list(a for b in MM.TemplateSources for a in b)
	else:
		servers = list(config.BlockSubmissions)
	
363 364 365 366 367
	if hasattr(share['merkletree'], 'source_uri'):
		servers.insert(0, {
			'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
			'name': share['merkletree'].source,
		})
368 369
	elif not servers:
		servers = list(a for b in MM.TemplateSources for a in b)
370
	
371 372 373
	myblock = (blkhash, payload[4:36])
	payload = b2a_hex(payload).decode('ascii')
	nexterr = 0
374 375 376 377 378 379
	tries = 0
	success = False
	while len(servers):
		tries += 1
		TS = servers.pop(0)
		UpstreamBitcoindJSONRPC = TS['access']
380
		try:
381
			# BIP 22 standard submitblock
382
			reason = UpstreamBitcoindJSONRPC.submitblock(payload)
383
		except BaseException as gbterr:
384
			gbterr_fmt = traceback.format_exc()
385
			try:
386 387
				try:
					# bitcoind 0.5/0.6 getmemorypool
388
					reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
389 390
				except:
					# Old BIP 22 draft getmemorypool
391
					reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
392 393 394 395
				if reason is True:
					reason = None
				elif reason is False:
					reason = 'rejected'
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
			except BaseException as gmperr:
				now = time()
				if now > nexterr:
					# FIXME: This will show "Method not found" on pre-BIP22 servers
					RaiseRedFlags(gbterr_fmt)
					nexterr = now + 5
				if MM.currentBlock[0] not in myblock and tries > len(servers):
					RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
					RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
					if share['upstreamRejectReason'] is PendingUpstream:
						share['upstreamRejectReason'] = 'GAVE UP'
						share['upstreamResult'] = False
						logShare(share)
					return
				
411
				servers.append(TS)
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
				continue
		
		# At this point, we have a reason back
		if reason:
			# FIXME: The returned value could be a list of multiple responses
			msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
			if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
				# no big deal
				blockSubmissionThread.logger.debug(msg)
			else:
				RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
				RaiseRedFlags(msg)
		else:
			blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
			success = True
		if share['upstreamRejectReason'] is PendingUpstream:
			share['upstreamRejectReason'] = reason
			share['upstreamResult'] = not reason
			logShare(share)
431
blockSubmissionThread.logger = logging.getLogger('blockSubmission')
432

433
def checkData(share):
434
	data = share['data']
435
	data = data[:80]
436
	(prevBlock, height, bits) = MM.currentBlock
437 438 439 440 441
	sharePrevBlock = data[4:36]
	if sharePrevBlock != prevBlock:
		if sharePrevBlock == MM.lastBlock[0]:
			raise RejectedShare('stale-prevblk')
		raise RejectedShare('bad-prevblk')
Luke Dashjr's avatar
Luke Dashjr committed
442
	
443 444
	if data[72:76] != bits:
		raise RejectedShare('bad-diffbits')
445 446 447 448
	
	# Note that we should accept miners reducing version to 1 if they don't understand 2 yet
	# FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
	if data[1:4] != b'\0\0\0' or data[0] > 2:
449
		raise RejectedShare('bad-version')
450 451 452 453 454 455 456 457 458 459

def buildStratumData(share, merkleroot):
	(prevBlock, height, bits) = MM.currentBlock
	
	data = b'\x02\0\0\0'
	data += prevBlock
	data += merkleroot
	data += share['ntime'][::-1]
	data += bits
	data += share['nonce'][::-1]
460
	
461 462 463
	share['data'] = data
	return data

464 465 466 467 468 469 470 471 472 473
def IsJobValid(wli, wluser = None):
	if wluser not in workLog:
		return False
	if wli not in workLog[wluser]:
		return False
	(wld, issueT) = workLog[wluser][wli]
	if time() < issueT - 120:
		return False
	return True

474 475 476 477
def checkShare(share):
	shareTime = share['time'] = time()
	
	username = share['username']
478
	isstratum = False
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
	if 'data' in share:
		# getwork/GBT
		checkData(share)
		data = share['data']
		
		if username not in workLog:
			raise RejectedShare('unknown-user')
		MWL = workLog[username]
		
		shareMerkleRoot = data[36:68]
		if 'blkdata' in share:
			pl = share['blkdata']
			(txncount, pl) = varlenDecode(pl)
			cbtxn = bitcoin.txn.Txn(pl)
			othertxndata = cbtxn.disassemble(retExtra=True)
			coinbase = cbtxn.getCoinbase()
			wliPos = coinbase[0] + 2
			wliLen = coinbase[wliPos - 1]
			wli = coinbase[wliPos:wliPos+wliLen]
			mode = 'MC'
			moden = 1
		else:
			wli = shareMerkleRoot
			mode = 'MRD'
			moden = 0
			coinbase = None
	else:
		# Stratum
507
		isstratum = True
508 509
		wli = share['jobid']
		buildStratumData(share, b'\0' * 32)
510 511
		mode = 'MC'
		moden = 1
512
		othertxndata = b''
513 514 515 516
		if None not in workLog:
			# We haven't yet sent any stratum work for this block
			raise RejectedShare('unknown-work')
		MWL = workLog[None]
517 518
	
	if wli not in MWL:
Luke Dashjr's avatar
Luke Dashjr committed
519
		raise RejectedShare('unknown-work')
520
	(wld, issueT) = MWL[wli]
521
	share[mode] = wld
Luke Dashjr's avatar
Luke Dashjr committed
522
	
523 524
	share['issuetime'] = issueT
	
525
	(workMerkleTree, workCoinbase) = wld[1:3]
526
	share['merkletree'] = workMerkleTree
527 528 529 530 531 532 533 534
	if 'jobid' in share:
		cbtxn = deepcopy(workMerkleTree.data[0])
		coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
		cbtxn.setCoinbase(coinbase)
		cbtxn.assemble()
		data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
		shareMerkleRoot = data[36:68]
	
535 536 537 538
	if data in DupeShareHACK:
		raise RejectedShare('duplicate')
	DupeShareHACK[data] = None
	
539 540 541
	blkhash = dblsha(data)
	if blkhash[28:] != b'\0\0\0\0':
		raise RejectedShare('H-not-zero')
542
	blkhashn = LEhash2int(blkhash)
543 544
	
	global networkTarget
545 546 547
	logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
	logfunc('BLKHASH: %64x' % (blkhashn,))
	logfunc(' TARGET: %64x' % (networkTarget,))
548
	
549
	# NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
550
	txlist = workMerkleTree.data
551
	txlist = [deepcopy(txlist[0]),] + txlist[1:]
552
	cbtxn = txlist[0]
553
	cbtxn.setCoinbase(coinbase or workCoinbase)
554
	cbtxn.assemble()
555
	
556
	if blkhashn <= networkTarget:
557
		logfunc("Submitting upstream")
558
		RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
559 560 561
		if not moden:
			payload = assembleBlock(data, txlist)
		else:
562 563 564 565 566
			payload = share['data']
			if len(othertxndata):
				payload += share['blkdata']
			else:
				payload += assembleBlock(data, txlist)[80:]
Luke Dashjr's avatar
Luke Dashjr committed
567
		logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
568
		RBPs.append(payload)
569
		threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
570
		bcnode.submitBlock(payload)
571 572 573 574 575
		if config.DelayLogForUpstream:
			share['upstreamRejectReason'] = PendingUpstream
		else:
			share['upstreamRejectReason'] = None
			share['upstreamResult'] = True
576
		MM.updateBlock(blkhash)
577
	
578
	# Gotwork hack...
Luke Dashjr's avatar
Luke Dashjr committed
579
	if gotwork and blkhashn <= config.GotWorkTarget:
580
		try:
581
			coinbaseMrkl = cbtxn.data
582
			coinbaseMrkl += blkhash
583
			steps = workMerkleTree._steps
584
			coinbaseMrkl += pack('B', len(steps))
585 586 587 588 589 590 591
			for step in steps:
				coinbaseMrkl += step
			coinbaseMrkl += b"\0\0\0\0"
			info = {}
			info['hash'] = b2a_hex(blkhash).decode('ascii')
			info['header'] = b2a_hex(data).decode('ascii')
			info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
Luke Dashjr's avatar
Luke Dashjr committed
592 593 594
			thr = threading.Thread(target=submitGotwork, args=(info,))
			thr.daemon = True
			thr.start()
595
		except:
Luke Dashjr's avatar
Luke Dashjr committed
596
			checkShare.logger.warning('Failed to build gotwork request')
597
	
598 599 600 601 602 603 604
	if 'target' in share:
		workTarget = share['target']
	elif len(wld) > 6:
		workTarget = wld[6]
	else:
		workTarget = None
	
605 606 607
	if workTarget is None:
		workTarget = config.ShareTarget
	if blkhashn > workTarget:
608
		raise RejectedShare('high-hash')
609 610
	share['target'] = workTarget
	share['_targethex'] = '%064x' % (workTarget,)
611
	
612 613 614 615 616 617 618 619
	shareTimestamp = unpack('<L', data[68:72])[0]
	if shareTime < issueT - 120:
		raise RejectedShare('stale-work')
	if shareTimestamp < shareTime - 300:
		raise RejectedShare('time-too-old')
	if shareTimestamp > shareTime + 7200:
		raise RejectedShare('time-too-new')
	
620
	if moden:
621
		cbpre = workCoinbase
622 623
		cbpreLen = len(cbpre)
		if coinbase[:cbpreLen] != cbpre:
624 625 626 627
			raise RejectedShare('bad-cb-prefix')
		
		# Filter out known "I support" flags, to prevent exploits
		for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
628
			if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
629 630 631 632 633 634
				raise RejectedShare('bad-cb-flag')
		
		if len(coinbase) > 100:
			raise RejectedShare('bad-cb-length')
		
		if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
635
			raise RejectedShare('bad-txnmrklroot')
636
		
637 638 639 640
		if len(othertxndata):
			allowed = assembleBlock(data, txlist)[80:]
			if allowed != share['blkdata']:
				raise RejectedShare('bad-txns')
Luke Dashjr's avatar
Luke Dashjr committed
641 642 643 644 645 646 647 648 649
	
	if config.DynamicTargetting and username in userStatus:
		# NOTE: userStatus[username] only doesn't exist across restarts
		status = userStatus[username]
		target = status[0] or config.ShareTarget
		if target == workTarget:
			userStatus[username][2] += 1
		else:
			userStatus[username][2] += float(target) / workTarget
650 651
		if isstratum and userStatus[username][2] > config.DynamicTargetGoal * 2:
			stratumsrv.quickDifficultyUpdate(username)
652
checkShare.logger = logging.getLogger('checkShare')
653

654 655 656 657 658 659 660 661
def logShare(share):
	if '_origdata' in share:
		share['solution'] = share['_origdata']
	else:
		share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
	for i in loggersShare:
		i.logShare(share)

662
def checkAuthentication(username, password):
663 664 665 666
	# HTTPServer uses bytes, and StratumServer uses str
	if hasattr(username, 'decode'): username = username.decode('utf8')
	if hasattr(password, 'decode'): password = password.decode('utf8')
	
667 668 669 670 671
	for i in authenticators:
		if i.checkAuthentication(username, password):
			return True
	return False

672 673
def receiveShare(share):
	# TODO: username => userid
Luke Dashjr's avatar
Luke Dashjr committed
674 675 676 677 678
	try:
		checkShare(share)
	except RejectedShare as rej:
		share['rejectReason'] = str(rej)
		raise
679 680 681
	except BaseException as e:
		share['rejectReason'] = 'ERROR'
		raise
682
	finally:
683 684
		if not share.get('upstreamRejectReason', None) is PendingUpstream:
			logShare(share)
685

686
def newBlockNotification():
687
	logging.getLogger('newBlockNotification').info('Received new block notification')
688
	MM.updateMerkleTree()
689
	# TODO: Force RESPOND TO LONGPOLLS?
690 691
	pass

692 693 694 695 696 697
def newBlockNotificationSIGNAL(signum, frame):
	# Use a new thread, in case the signal handler is called with locks held
	thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
	thr.daemon = True
	thr.start()

698
from signal import signal, SIGUSR1
699
signal(SIGUSR1, newBlockNotificationSIGNAL)
700 701


702 703 704 705 706 707 708 709
import os
import os.path
import pickle
import signal
import sys
from time import sleep
import traceback

710 711
if getattr(config, 'SaveStateFilename', None) is None:
	config.SaveStateFilename = 'eloipool.worklog'
712

713 714
def stopServers():
	logger = logging.getLogger('stopServers')
715
	
716 717 718 719 720
	if hasattr(stopServers, 'already'):
		logger.debug('Already tried to stop servers before')
		return
	stopServers.already = True
	
721
	logger.info('Stopping servers...')
Luke Dashjr's avatar
Luke Dashjr committed
722
	global bcnode, server
723
	servers = (bcnode, server, stratumsrv)
Luke Dashjr's avatar
Luke Dashjr committed
724 725 726
	for s in servers:
		s.keepgoing = False
	for s in servers:
727 728 729 730
		try:
			s.wakeup()
		except:
			logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
731
	i = 0
Luke Dashjr's avatar
Luke Dashjr committed
732 733 734 735 736 737 738
	while True:
		sl = []
		for s in servers:
			if s.running:
				sl.append(s.__class__.__name__)
		if not sl:
			break
739 740
		i += 1
		if i >= 0x100:
Luke Dashjr's avatar
Luke Dashjr committed
741
			logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
742 743 744
			break
		sleep(0.01)
	
745 746 747
	for s in servers:
		for fd in s._fd.keys():
			os.close(fd)
748

749 750 751 752 753
def stopLoggers():
	for i in loggersShare:
		if hasattr(i, 'stop'):
			i.stop()

754
def saveState(SAVE_STATE_FILENAME, t = None):
755 756
	logger = logging.getLogger('saveState')
	
757
	# Then, save data needed to resume work
758
	logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
759 760 761 762
	i = 0
	while True:
		try:
			with open(SAVE_STATE_FILENAME, 'wb') as f:
763
				pickle.dump(t, f)
764 765
				pickle.dump(DupeShareHACK, f)
				pickle.dump(workLog, f)
766 767 768 769 770 771 772 773 774
			break
		except:
			i += 1
			if i >= 0x10000:
				logger.error('Failed to save work\n' + traceback.format_exc())
				try:
					os.unlink(SAVE_STATE_FILENAME)
				except:
					logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
775 776

def exit():
777
	t = time()
778
	stopServers()
779
	stopLoggers()
780
	saveState(config.SaveStateFilename, t=t)
781
	logging.getLogger('exit').info('Goodbye...')
782 783 784
	os.kill(os.getpid(), signal.SIGTERM)
	sys.exit(0)

785
def restart():
786
	t = time()
787
	stopServers()
788
	stopLoggers()
789
	saveState(config.SaveStateFilename, t=t)
790 791 792 793 794 795
	logging.getLogger('restart').info('Restarting...')
	try:
		os.execv(sys.argv[0], sys.argv)
	except:
		logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())

796
def restoreState(SAVE_STATE_FILENAME):
797 798 799 800 801 802
	if not os.path.exists(SAVE_STATE_FILENAME):
		return
	
	global workLog, DupeShareHACK
	
	logger = logging.getLogger('restoreState')
803 804
	s = os.stat(SAVE_STATE_FILENAME)
	logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
805 806
	try:
		with open(SAVE_STATE_FILENAME, 'rb') as f:
807 808
			t = pickle.load(f)
			if type(t) == tuple:
809 810 811 812 813 814
				if len(t) > 2:
					# Future formats, not supported here
					ver = t[3]
					# TODO
				
				# Old format, from 2012-02-02 to 2012-02-03
815 816 817
				workLog = t[0]
				DupeShareHACK = t[1]
				t = None
818
			else:
819
				if isinstance(t, dict):
820
					# Old format, from 2012-02-03 to 2012-02-03
821 822 823
					DupeShareHACK = t
					t = None
				else:
824
					# Current format, from 2012-02-03 onward
825 826
					DupeShareHACK = pickle.load(f)
				
827
				if t + 120 >= time():
828 829 830
					workLog = pickle.load(f)
				else:
					logger.debug('Skipping restore of expired workLog')
831 832 833 834
	except:
		logger.error('Failed to restore state\n' + traceback.format_exc())
		return
	logger.info('State restored successfully')
835 836
	if t:
		logger.info('Total downtime: %g seconds' % (time() - t,))
837 838


839
from jsonrpcserver import JSONRPCListener, JSONRPCServer
840
import interactivemode
841 842
from networkserver import NetworkListener
import threading
843
import sharelogging
844
import authentication
845
from stratumserver import StratumServer
846
import imp
847 848

if __name__ == "__main__":
849 850
	if not hasattr(config, 'ShareLogging'):
		config.ShareLogging = ()
851
	if hasattr(config, 'DbOptions'):
852
		logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
853
		config.ShareLogging = list(config.ShareLogging)
854 855 856 857
		config.ShareLogging.append( {
			'type': 'sql',
			'engine': 'postgres',
			'dbopts': config.DbOptions,
858
			'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
859
		} )
860
	for i in config.ShareLogging:
861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
		if not hasattr(i, 'keys'):
			name, parameters = i
			logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
			if name == 'postgres':
				name = 'sql'
				i = {
					'engine': 'postgres',
					'dbopts': parameters,
				}
			elif name == 'logfile':
				i = {}
				i['thropts'] = parameters
				if 'filename' in parameters:
					i['filename'] = parameters['filename']
					i['thropts'] = dict(i['thropts'])
					del i['thropts']['filename']
			else:
				i = parameters
			i['type'] = name
		
		name = i['type']
		parameters = i
883 884 885
		try:
			fp, pathname, description = imp.find_module(name, sharelogging.__path__)
			m = imp.load_module(name, fp, pathname, description)
886
			lo = getattr(m, name)(**parameters)
887
			loggersShare.append(lo)
888
		except:
889
			logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
	
	if not hasattr(config, 'Authentication'):
		config.Authentication = ({'module': 'allowall'},)
	
	for i in config.Authentication:
		name = i['module']
		parameters = i
		try:
			fp, pathname, description = imp.find_module(name, authentication.__path__)
			m = imp.load_module(name, fp, pathname, description)
			lo = getattr(m, name)(**parameters)
			authenticators.append(lo)
		except:
			logging.getLogger('authentication').error("Error setting up authentication module %s: %s", name, sys.exc_info())
	
905
	LSbc = []
906 907
	if not hasattr(config, 'BitcoinNodeAddresses'):
		config.BitcoinNodeAddresses = ()
908 909 910
	for a in config.BitcoinNodeAddresses:
		LSbc.append(NetworkListener(bcnode, a))
	
911 912
	if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
		BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
913
	
914
	import jsonrpc_getblocktemplate
915 916 917
	import jsonrpc_getwork
	import jsonrpc_setworkaux
	
918
	server = JSONRPCServer()
919 920
	server.tls = threading.local()
	server.tls.wantClear = False
921
	if hasattr(config, 'JSONRPCAddress'):
922
		logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
923 924 925
		if not hasattr(config, 'JSONRPCAddresses'):
			config.JSONRPCAddresses = []
		config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
926
	LS = []
927
	for a in config.JSONRPCAddresses:
928
		LS.append(JSONRPCListener(server, a))
929 930 931
	if hasattr(config, 'SecretUser'):
		server.SecretUser = config.SecretUser
	server.aux = MM.CoinbaseAux
932
	server.getBlockHeader = getBlockHeader
933
	server.getBlockTemplate = getBlockTemplate
934
	server.receiveShare = receiveShare
935
	server.RaiseRedFlags = RaiseRedFlags
936
	server.ShareTarget = config.ShareTarget
937
	server.checkAuthentication = checkAuthentication
938
	
939 940
	if hasattr(config, 'TrustedForwarders'):
		server.TrustedForwarders = config.TrustedForwarders
941
	server.ServerName = config.ServerName
942
	
943 944
	stratumsrv = StratumServer()
	stratumsrv.getStratumJob = getStratumJob
945
	stratumsrv.getExistingStratumJob = getExistingStratumJob
946
	stratumsrv.receiveShare = receiveShare
947
	stratumsrv.RaiseRedFlags = RaiseRedFlags
948 949
	stratumsrv.getTarget = getTarget
	stratumsrv.defaultTarget = config.ShareTarget
950
	stratumsrv.IsJobValid = IsJobValid
951
	stratumsrv.checkAuthentication = checkAuthentication
952 953 954 955 956
	if not hasattr(config, 'StratumAddresses'):
		config.StratumAddresses = ()
	for a in config.StratumAddresses:
		NetworkListener(stratumsrv, a)
	
957
	MM.start()
958
	
959
	restoreState(config.SaveStateFilename)
960
	
961 962 963 964
	prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
	prune_thr.daemon = True
	prune_thr.start()
	
965 966 967 968
	bcnode_thr = threading.Thread(target=bcnode.serve_forever)
	bcnode_thr.daemon = True
	bcnode_thr.start()
	
969 970 971 972
	stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
	stratum_thr.daemon = True
	stratum_thr.start()
	
973
	server.serve_forever()