Websocket Proxy using Play 2.6 and akka streams
The following seems to work. Note: I've implemented both the server socket and the proxy socket in the same controller, but you can split them or deploy the same controller on separate instances. The ws url to the 'upper' service will need to be updated in both cases.
package controllers
import javax.inject._
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import play.api.libs.streams.ActorFlow
import play.api.mvc._
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
@Singleton
class SomeController @Inject()(implicit exec: ExecutionContext,
actorSystem: ActorSystem,
materializer: Materializer) extends Controller {
/*--- proxy ---*/
def websocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket"))
def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
Flow[String].map(s => TextMessage(s))
.via(websocketFlow)
.map(_.asTextMessage.getStrictText)
}
/*--- server ---*/
class UpperService(socket: ActorRef) extends Actor {
override def receive: Receive = {
case s: String => socket ! s.toUpperCase()
case _ =>
}
}
object UpperService {
def props(socket: ActorRef): Props = Props(new UpperService(socket))
}
def upperSocket: WebSocket = WebSocket.accept[String, String] { _ =>
ActorFlow.actorRef(out => UpperService.props(out))
}
}
You will need the routes to be set up like this:
GET /upper-socket controllers.SomeController.upperSocket
GET /proxy-socket controllers.SomeController.proxySocket
You can test by sending a string to ws://localhost:9000/proxy-socket. The answer will be the uppercased string.
There will be a timeout after 1 minute of inactivity though:
akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000], no bytes passed in the last 1 minute
But see: http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html on how to configure this.
First of all you need some akka
imports:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.WebSocketRequest
import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.stream.scaladsl.Flow
import akka.http.scaladsl.server.Directives.{ extractUpgradeToWebSocket, complete }
This is an example App
that creates a WebSocket
proxy, binding on 0.0.0.0
on port 80
, proxing to ws://echo.websocket.org
:
object WebSocketProxy extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
private[this] def manipulateFlow: Flow[Message, Message, akka.NotUsed] = ???
private[this] def webSocketFlow =
Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))
private[this] val route: Flow[HttpRequest, HttpResponse, Any] =
extractUpgradeToWebSocket { upgrade =>
val webSocketFlowProxy = manipulateFlow via webSocketFlow
val handleWebSocketProxy = upgrade.handleMessages(webSocketFlowProxy)
complete(handleWebSocketProxy)
}
private[this] val proxyBindingFuture =
Http().bindAndHandle(route, "0.0.0.0", 80)
println(s"Server online\nPress RETURN to stop...")
Console.readLine()
}
You have to adapt it for play
and for your application structure.
Notes:
- remember to unbind
proxyBindingFuture
and terminate thesystem
in production; - you need
manipulateFlow
only if you want to manipulate messages.
A proxy need to provide two flows (Proxy Flow A / B):
(Client) request -> Proxy Flow A -> request (Server)
(Client) response <- Proxy Flow B <- response (Server)
One option to implement such proxy flow is using ActorSubscriber and SourceQueue:
class Subscriber[T](proxy: ActorRef) extends ActorSubscriber {
private var queue = Option.empty[SourceQueueWithComplete[T]]
def receive = {
case Attach(sourceQueue) => queue = Some(sourceQueue)
case msg: T => // wait until queue attached and pass forward all msgs to queue and the proxy actor
}
}
def proxyFlow[T](proxy: ActorRef): Flow[T, ActorRef] = {
val sink = Sink.actorSubscriber(Props(new Subscriber[T](proxy)))
val source = Source.queue[T](...)
Flow.fromSinkAndSourceMat(sink, source){ (ref, queue) =>
ref ! Attach(queue)
ref
}
}
You can then assemble the client flow like:
val proxy = actorOf(...)
val requestFlow = proxyFlow[Request](proxy)
val responseFlow = proxyFlow[Response](proxy)
val finalFlow: Flow[Request, Response] =
requestFlow.via(webSocketFlow).via(responseFlow)