`DummyExecutor` for Python's `futures`
Something like this should do it:
from concurrent.futures import Future, Executor
from threading import Lock
class DummyExecutor(Executor):
def __init__(self):
self._shutdown = False
self._shutdownLock = Lock()
def submit(self, fn, *args, **kwargs):
with self._shutdownLock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = Future()
try:
result = fn(*args, **kwargs)
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)
return f
def shutdown(self, wait=True):
with self._shutdownLock:
self._shutdown = True
if __name__ == '__main__':
def fnc(err):
if err:
raise Exception("test")
else:
return "ok"
ex = DummyExecutor()
print(ex.submit(fnc, True))
print(ex.submit(fnc, False))
ex.shutdown()
ex.submit(fnc, True) # raises exception
locking is probably not needed in this case, but can't hurt to have it.
Use this to mock your ThreadPoolExecutor
class MockThreadPoolExecutor():
def __init__(self, **kwargs):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
pass
def submit(self, fn, *args, **kwargs):
# execute functions in series without creating threads
# for easier unit testing
result = fn(*args, **kwargs)
return result
def shutdown(self, wait=True):
pass
if __name__ == "__main__":
def sum(a, b):
return a + b
with MockThreadPoolExecutor(max_workers=3) as executor:
future_result = list()
for i in range(5):
future_result.append(executor.submit(sum, i + 1, i + 2))