JDK8 CompletableFuture.supplyAsync how to deal with interruptedException
I change the code like this.
CompletableFuture<Rep> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
transporter.write(req);
try {
Rep rep = responseQueue.take();
result.complete(rep);
} catch (InterruptedException e) {
result.completeExceptionally(e);
Thread.currentThread().interrupt();
} catch (Exception e) {
result.completeExceptionally(e);
}
}, executorService);
return result;
@antak mentioned it buried in a comment, but I think the correct answer here is:
For
CompletableFuture.supplyAsync()
wrap it injava.util.concurrent.CompletionException
and rethrow it.
So the sample code would look something like:
CompletableFuture.supplyAsync(
() -> {
transporter.write(req);
try {
//here take the value from a blocking queue,will throw a interruptedException
return responseQueue.take();
}
catch (InterruptedException e) {
throw new CompletionException(e);
}
}, executorService);
I ran into the same question, but after reading more from comments here and reference book I think you can do either one of these two:
1 (what I end up doing):
CompletableFuture.runAsync(() -> {
transporter.write(req);
try {
Rep rep = responseQueue.take();
result.complete(rep);
} catch (Exception e) {
throw new CompletionException(e);
}
}, executorService);
return result;
or 2:
CompletableFuture<Rep> result = new CompletableFuture<>();
new Thread(()-> {
transporter.write(req);
try {
Rep rep = responseQueue.take();
result.complete(rep);
} catch (Exception e) {
result.completeExceptionally(e);
}
}).start();
I know the 2nd one does not use the executorService
, but I feel the whole point of using CompletableFuture is utilizing the CompletionStage APIs in functional-style.
As lambda functions don't support throwing exceptions, I think Java developers will need a new paradigm. One thing that comes to mind is as follows:
public class ResultWrapper<R, E extends Exception> {
E exception;
R result;
}
Lambda functions can return instances of this wrapper. (Edit: your case)
CompletableFuture<ResultWrapper<String, InterruptedException>> aFuture = ...;
...
aFuture.supplyAsync(
() -> {
try {
transporter.write(req);
} catch(InterruptedException e) {
ResultWrapper<String, InterruptedException> r = new ResultWrapper<>();
r.exception = e;
r.result = null;
return r;
}
...
}, executorService);