Python stop multiple process when one returns a result?
You can use multiprocessing.Queue(). Have a Queue per CPU/process. When a process finds a nonce, it puts it on the queue of other processes. Other processes check their queue (non-blocking) in each iteration of the while loop and if there is anything on it, they decide to continue or terminate based on the value in the queue:
def proof_of_work(b, nBytes, num_of_cpus_running, this_cpu_id, qSelf, qOthers):
nonce = this_cpu_id
while sha256(b + uint2bytes(nonce))[:nBytes] != bytes(nBytes):
nonce = nonce + num_of_cpus_running
try:
otherNonce = qSelf.get(block=False)
if otherNonce < nonce:
return
except:
pass
for q in qOthers:
q.put(nonce)
return nonce
qOthers is a list of queues ( each queue=multiprocessing.Queue() ) belonging to other processes.
If you decide to use queues as I suggested, you should be able to write a better/nicer implementation of above approach.
One easy option is to use micro-batches and check if an answer was found. Too small batches incur overhead from starting parallel jobs, too large size causes other processes to do extra work while one process already found an answer. Each batch should take 1 - 10 seconds to be efficient.
Sample code:
from multiprocessing import Pool
from hashlib import sha256
from time import time
def find_solution(args):
salt, nBytes, nonce_range = args
target = '0' * nBytes
for nonce in xrange(nonce_range[0], nonce_range[1]):
result = sha256(salt + str(nonce)).hexdigest()
#print('%s %s vs %s' % (result, result[:nBytes], target)); sleep(0.1)
if result[:nBytes] == target:
return (nonce, result)
return None
def proof_of_work(salt, nBytes):
n_processes = 8
batch_size = int(2.5e5)
pool = Pool(n_processes)
nonce = 0
while True:
nonce_ranges = [
(nonce + i * batch_size, nonce + (i+1) * batch_size)
for i in range(n_processes)
]
params = [
(salt, nBytes, nonce_range) for nonce_range in nonce_ranges
]
# Single-process search:
#solutions = map(find_solution, params)
# Multi-process search:
solutions = pool.map(find_solution, params)
print('Searched %d to %d' % (nonce_ranges[0][0], nonce_ranges[-1][1]-1))
# Find non-None results
solutions = filter(None, solutions)
if solutions:
return solutions
nonce += n_processes * batch_size
if __name__ == '__main__':
start = time()
solutions = proof_of_work('abc', 6)
print('\n'.join('%d => %s' % s for s in solutions))
print('Solution found in %.3f seconds' % (time() - start))
Output (a laptop with Core i7):
Searched 0 to 1999999
Searched 2000000 to 3999999
Searched 4000000 to 5999999
Searched 6000000 to 7999999
Searched 8000000 to 9999999
Searched 10000000 to 11999999
Searched 12000000 to 13999999
Searched 14000000 to 15999999
Searched 16000000 to 17999999
Searched 18000000 to 19999999
Searched 20000000 to 21999999
Searched 22000000 to 23999999
Searched 24000000 to 25999999
Searched 26000000 to 27999999
Searched 28000000 to 29999999
Searched 30000000 to 31999999
Searched 32000000 to 33999999
Searched 34000000 to 35999999
Searched 36000000 to 37999999
37196346 => 000000f4c9aee9d427dc94316fd49192a07f1aeca52f6b7c3bb76be10c5adf4d
Solution found in 20.536 seconds
With single core it took 76.468 seconds. Anyway this isn't by far the most efficient way to find a solution but it works. For example if the salt
is long then the SHA-256
state could be pre-computed after the salt has been absorbed and continue brute-force search from there. Also byte array could be more efficient than the hexdigest()
.
A general method to do this is to:
- think of work packets, e.g. to perform the calculation for a particular range, a range should not take long, say 0.1 seconds to a second
- have some manager distribute the work packets to the worker
- after a work packet has been concluded, tell the manager the result and request a new work packet
- if the work is done and a result has been found accept the results from workers and give them a signal that no more work is to be performed - the workers can now safely terminate
This way you don't have to check with the manager each iteration (which would slow down everything), or do nasty things like stopping a thread mid-session. Needless to say, the manager needs to be thread safe.
This fits perfectly with your model, as you still need the results of the other workers, even if a result has been found.
Note that in your model, it could be that a thread may go out of sync with the other threads, lagging behind. You don't want to do another million calculations once a result is found. I'm just reiterating this from the question because I think the model is wrong. You should fix the model instead of fixing the implementation.