How to add elements to Source dynamically?
With Akka Streams 2 you can use a sourceQueue : How to create a Source that can receive elements later via a method call?
One way to have a non-finite source is to use a special kind of actor as the source, one that mixes in the ActorPublisher
trait. If you create one of those kinds of actors, and then wrap with a call to ActorPublisher.apply
, you end up with a Reactive Streams Publisher
instance and with that, you can use an apply
from Source
to generate a Source
from it. After that, you just need to make sure your ActorPublisher
class properly handles the Reactive Streams protocol for sending elements downstream and you are good to go. A very trivial example is as follows:
import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
object DynamicSourceExample extends App{
implicit val system = ActorSystem("test")
implicit val materializer = ActorFlowMaterializer()
val actorRef = system.actorOf(Props[ActorBasedSource])
val pub = ActorPublisher[Int](actorRef)
Source(pub).
map(_ * 2).
runWith(Sink.foreach(println))
for(i <- 1 until 20){
actorRef ! i.toString
Thread.sleep(1000)
}
}
class ActorBasedSource extends Actor with ActorPublisher[Int]{
import ActorPublisherMessage._
var items:List[Int] = List.empty
def receive = {
case s:String =>
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size){
items foreach (onNext)
items = List.empty
}
else{
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}