Correct use of Akka http client connection pools
I was a little confused by this myself initially until I read through the docs a few times. If you are going to use single requests into the pool, no matter how many different places are sharing that same pool, the T
that you are supplying (an Int
in your case) doesn't matter. So if you are using Source.single
all the time, that key can always be 1
if you really want.
Where it does come into play though, is if a piece of code is going to use the pool and submit multiple requests at once into the pool and wants the responses from all of those requests. The reason why is that the responses come back in the order they were received from the service that was called, and not the order in which they were supplied to the pool. Each request could take different amounts of time, so they flow downstream to the Sink
in the order they were received back from the pool.
Say we had a service out there that accepted GET
requests with a url in the form:
/product/123
Where the 123
part is the id of the product that you wanted to look up. If I wanted to look up products 1-10
all at once, with separate request for each, this is where the identifier becomes important so that I can correlate each HttpResponse
with the product id that it is for. A simplified code example for this scenario would be as follows:
val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] =
Source(requests).
via(pool).
runFold(Map.empty[Int,HttpResponse]){
case (m, (util.Success(resp), id)) =>
m ++ Map(id -> resp)
case (m, (util.Failure(ex), i)) =>
//Log a failure here probably
m
}
When I get my responses in the fold
, I also conveniently have the id that each is associated with so I can add them to my Map
that is keyed by id. Without this functionality, I would probably have to do something like parse the body (if it was json) to try and figure out which response was which and that is not ideal, and that doesn't cover the fail case. In this solution, I know which requests failed because I still get the identifier back.
I hope that clarifies things a bit for you.
Akka HTTP Connection pools are powerful allies when consuming HTTP based resources. If you are going to execute single requests at a time then a solution is:
def exec(req: HttpRequest): Future[HttpResponse] = {
Source.single(req → 1)
.via(pool)
.runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
case (Failure(f), _) ⇒ Future.failed(f)
}
}
Because you are executing a single
request, there is no need to disambiguate the response. However, Akka streams are clever. You can submit multiple requests to the pool at the same time. In this instance we pass in an Iterable[HttpRequest]
. The returned Iterable[HttpResponse]
is reordered using a SortedMap
to the same order as the original requests. You can just do a request zip response
to line things up:
def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.map(r ⇒ r.values)
}
Futures of iterable futures are great if you need to unpack things your way. A simpler response can be obtained by just flattening things.
def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.flatMap(r ⇒ Future.sequence(r.values))
}
I have made this gist with all the imports and wrappers to make a client for consuming HTTP services.
A special thanks to @cmbaxter for his neat example.