Daniel Ciocîrlan
11 min read •
Share on:
In this article, we’re going to take a look at Akka Discovery, a feature that allows us to locate actors that we normally don’t have access to, and that no other actors can notify us for.
This article assumes some basic familiarity with typed actors.
We’ll use the regular Akka Typed Actors library, so in your Scala project, add the following to your build.sbt
:
val AkkaVersion = "2.6.19"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion
)
We assume some basics with Akka actors, but in a nutshell:
Having worked with actors, you’ve probably needed to pass a reference belonging to one part of the hierarchy, to another part of the hierarchy, so that these actors can communicate and exchange data. The trouble is that managing such exchanges needlessly complicates the logic of both hierarchies/departments and mixes in business logic with management logic.
Akka Discovery is a feature that allows us to find actors that we cannot otherwise have access to. Its use cases include
For this article, we’ll imagine a fictitious IOT application, where we have
Whenever the sensor controller sends the heartbeat message, the sensors must all send their readings to the data aggregator. At all times, the sensors must be running, but the data aggregator might change/be swapped in the meantime. Now, for some reason (logic too complicated, data privacy, impracticability etc), the guardian actor cannot directly communicate to the sensors to update them of the data aggregator change. We need to notify them in some other way.
Considering the following imports for the entire article
import akka.NotUsed
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
import scala.util.Random
we’re going to sketch the main entities in our application:
// data aggregator domain
case class SensorReading(id: String, value: Double)
object DataAggregator {
def apply(): Behavior[SensorReading] = ???
}
// sensors domain
trait SensorCommand
case object SensorHeartbeat extends SensorCommand
case class ChangeDataAggregator(agg: Option[ActorRef[SensorReading]]) extends SensorCommand
object Sensor {
// the sensor actor
def apply(id: String): Behavior[SensorCommand] = ???
// the sensor aggregator
def controller(): Behavior[NotUsed] = ???
}
while the main guardian would look something like this:
val guardian: Behavior[NotUsed] = Behaviors.setup { context =>
// controller for the sensors
context.spawn(Sensor.controller(), "controller")
val dataAgg1 = context.spawn(DataAggregator(), "data_agg_1")
// TODO make it "known" to the sensors that dataAgg1 is the new data aggregator
val dataAgg2 = context.spawn(DataAggregator(), "data_agg_2")
// TODO after 10 seconds, make it "known" to the sensors that dataAgg2 is the new data aggregator
Behaviors.empty
}
and the main method
def main(args: Array[String]): Unit = {
val system = ActorSystem(guardian, "ActorDiscovery")
import system.executionContext
system.scheduler.scheduleOnce(20.seconds, () => system.terminate())
}
The application should work as follows:
SensorReading
, the data aggregator would display the latest data; in real life, this would be in a graph, or feeding in a data streaming application like Flink or Spark Streaming, but here we’ll simply log the dataThe “magical” finding of the aggregator, as well as the notification mechanism, are the two most important components in this application.
In order to find an actor under a name or identifier, it must be registered by a unique identifier known as a ServiceKey
. This is a simple data structure that is registered at the level of the actor system. After the registration is done, then the finder is able to query the actor system for all actors registered under that key, thereby retrieving their ActorRef
s and sending them messages.
This registration is done through a special actor known as the receptionist. All actor systems have a receptionist, and the goal of this actor is to perform this ServiceKey
-ActorRef
s mapping.
For the main guardian actor to publish the fact that dataAgg1
is the data aggregator to use, we need to define a ServiceKey
by which we can identify the data aggregator. It’s usually best practice to place the ServiceKey
inside the object that spawns the actors to be registered. In this case, under the DataAggregator
object:
// inside DataAggregator
val serviceKey = ServiceKey[SensorReading]("dataAggregator")
The ServiceKey
must be typed with the same type as the actor that we want to register. With the ServiceKey
in place, we can then add some TODOs in our guardian actor:
val guardian: Behavior[NotUsed] = Behaviors.setup { context =>
// controller for the sensors
context.spawn(Sensor.controller(), "controller")
val dataAgg1 = context.spawn(DataAggregator(), "data_agg_1")
// "publish" dataAgg1 is available by associating it to a key (service key)
context.system.receptionist ! Receptionist.register(DataAggregator.serviceKey, dataAgg1)
// change data aggregator after 10s
Thread.sleep(10000)
context.log.info("[guardian] Changing data aggregator")
context.system.receptionist ! Receptionist.deregister(DataAggregator.serviceKey, dataAgg1)
val dataAgg2 = context.spawn(DataAggregator(), "data_agg_2")
context.system.receptionist ! Receptionist.register(DataAggregator.serviceKey, dataAgg2)
Behaviors.empty
}
We send the context.system.receptionist
some special messages to either register or deregister an actor. You can register multiple actors to the same ServiceKey
if you want, but in this case we’ll keep it to just one actor.
The protocol handled by the receptionist is pretty rich, and you can be notified when the registration is complete by listening to the Registered
message given back by the receptionist.
The other side is a bit more involved, because we can fetch information from the receptionist in multiple ways:
ServiceKey
was changedWe’ll need to work on the apply()
method of the Sensor
object:
def apply(id: String): Behavior[SensorCommand] = Behaviors.setup { context =>
// subscribe to the receptionist using the service key
context.system.receptionist ! Receptionist.Subscribe(DataAggregator.serviceKey, ???)
}
We would like to be automatically notified if there is any change in the association to the ServiceKey
. The API allows us to pass the ServiceKey
instance in question and an actor which can handle a Listing
message that the receptionist will send with every update.
However, because our Sensor actor is typed with SensorCommand
and we also need to handle the listing message, we will need a message adapter. We discussed the message adapter technique in another article, so we will use it here. We will need to wrap the listing message into some other SensorCommand
that we can handle later:
// new message
case class ChangeDataAggregator(agg: Option[ActorRef[SensorReading]]) extends SensorCommand
def apply(id: String): Behavior[SensorCommand] = Behaviors.setup { context =>
// use a message adapter to turn a receptionist listing into a SensorCommand
val receptionistSubscriber: ActorRef[Receptionist.Listing] = context.messageAdapter {
case DataAggregator.serviceKey.Listing(set) => ChangeDataAggregator(set.headOption)
}
// subscribe to the receptionist using the service key
context.system.receptionist ! Receptionist.Subscribe(DataAggregator.serviceKey, receptionistSubscriber)
}
So in the setup
method we have subscribed to the receptionist and are able to receive its listings, transformed into ChangeDataAggregator
messages. We now need to handle them and keep track of the data aggregator that we have on hand at the moment:
def activeSensor(id: String, aggregator: Option[ActorRef[SensorReading]]): Behavior[SensorCommand] =
Behaviors.receiveMessage {
case SensorHeartbeat =>
// send the data to the aggregator
aggregator.foreach(_ ! SensorReading(id, Random.nextDouble() * 40))
Behaviors.same
case ChangeDataAggregator(newAgg) =>
// swap the aggregator for the new one
activeSensor(id, newAgg)
}
And with this message handler in place, the return value of the apply()
method is going to be
active(newReadings)
Currently, the code of the sensor actor looks like this:
object Sensor {
def apply(id: String): Behavior[SensorCommand] = Behaviors.setup { context =>
// use a message adapter to turn a receptionist listing into a SensorCommand
val receptionistSubscriber: ActorRef[Receptionist.Listing] = context.messageAdapter {
case DataAggregator.serviceKey.Listing(set) => ChangeDataAggregator(set.headOption)
}
// subscribe to the receptionist using the service key
context.system.receptionist ! Receptionist.Subscribe(DataAggregator.serviceKey, receptionistSubscriber)
activeSensor(id, None)
}
def activeSensor(id: String, aggregator: Option[ActorRef[SensorReading]]): Behavior[SensorCommand] =
Behaviors.receiveMessage {
case SensorHeartbeat =>
aggregator.foreach(_ ! SensorReading(id, Random.nextDouble() * 40))
Behaviors.same
case ChangeDataAggregator(newAgg) =>
activeSensor(id, newAgg)
}
}
We need to finish off the DataAggregator
and the sensor controller. The data aggregator will keep track of all the readings received so far from all the sensors, using a “stateless” approach, which we also describe in another article:
def apply(): Behavior[SensorReading] = active(Map())
def active(latestReadings: Map[String, Double]): Behavior[SensorReading] = Behaviors.receive { (context, reading) =>
val id = reading.id
val value = reading.value
// val SensorReading(id, value) = reading
val newReadings = latestReadings + (id -> value)
// "display" part - in real life this would feed a graph, a data ingestion engine or processor
context.log.info(s"[${context.self.path.name}] Latest readings: $newReadings")
active(newReadings)
}
and the sensor controller is a dumb actor which doesn’t receive any messages:
def controller(): Behavior[NotUsed] = Behaviors.setup { context =>
val sensors = (1 to 10).map(i => context.spawn(Sensor(s"sensor_$i"), s"sensor_$i"))
val logger = context.log // used so that we don't directly use context inside the lambda below
// send heartbeats every second
import context.executionContext
context.system.scheduler.scheduleAtFixedRate(1.second, 1.second) { () =>
logger.info("Heartbeat")
sensors.foreach(_ ! SensorHeartbeat)
}
Behaviors.empty
}
If we run this application, we notice that every second, the sensor controller sends the heartbeat, and by some magic — we now know how it works — the sensors automatically know where to send their data, because the aggregator picks up the readings and displays all of them every second.
After 10 seconds, the heartbeats keep running, but the logs now say data_agg_2
— so the sensors were automatically notified that the data aggregator changed, so they simply pushed their readings elsewhere. Exactly as intended.
The entire code looks like this:
case class SensorReading(id: String, value: Double)
object DataAggregator {
val serviceKey = ServiceKey[SensorReading]("dataAggregator")
def apply(): Behavior[SensorReading] = active(Map())
def active(latestReadings: Map[String, Double]): Behavior[SensorReading] = Behaviors.receive { (context, reading) =>
val id = reading.id
val value = reading.value
// val SensorReading(id, value) = reading
val newReadings = latestReadings + (id -> value)
// "display" part
context.log.info(s"[${context.self.path.name}] Latest readings: $newReadings")
active(newReadings)
}
}
// sensor section
trait SensorCommand
case object SensorHeartbeat extends SensorCommand
case class ChangeDataAggregator(agg: Option[ActorRef[SensorReading]]) extends SensorCommand
object Sensor {
def apply(id: String): Behavior[SensorCommand] = Behaviors.setup { context =>
// use a message adapter to turn a receptionist listing into a SensorCommand
val receptionistSubscriber: ActorRef[Receptionist.Listing] = context.messageAdapter {
case DataAggregator.serviceKey.Listing(set) => ChangeDataAggregator(set.headOption)
}
// subscribe to the receptionist using the service key
context.system.receptionist ! Receptionist.Subscribe(DataAggregator.serviceKey, receptionistSubscriber)
activeSensor(id, None)
}
def activeSensor(id: String, aggregator: Option[ActorRef[SensorReading]]): Behavior[SensorCommand] =
Behaviors.receiveMessage {
case SensorHeartbeat =>
aggregator.foreach(_ ! SensorReading(id, Random.nextDouble() * 40))
Behaviors.same
case ChangeDataAggregator(newAgg) =>
activeSensor(id, newAgg)
}
def controller(): Behavior[NotUsed] = Behaviors.setup { context =>
val sensors = (1 to 10).map(i => context.spawn(Sensor(s"sensor_$i"), s"sensor_$i"))
val logger = context.log
// send heartbeats every second
import context.executionContext
context.system.scheduler.scheduleAtFixedRate(1.second, 1.second) { () =>
logger.info("Heartbeat")
sensors.foreach(_ ! SensorHeartbeat)
}
Behaviors.empty
}
}
val guardian: Behavior[NotUsed] = Behaviors.setup { context =>
// controller for the sensors
context.spawn(Sensor.controller(), "controller")
val dataAgg1 = context.spawn(DataAggregator(), "data_agg_1")
// "publish" dataAgg1 is available by associating it to a key (service key)
context.system.receptionist ! Receptionist.register(DataAggregator.serviceKey, dataAgg1)
// change data aggregator after 10s
Thread.sleep(10000)
context.log.info("[guardian] Changing data aggregator")
context.system.receptionist ! Receptionist.deregister(DataAggregator.serviceKey, dataAgg1)
val dataAgg2 = context.spawn(DataAggregator(), "data_agg_2")
context.system.receptionist ! Receptionist.register(DataAggregator.serviceKey, dataAgg2)
Behaviors.empty
}
def main(args: Array[String]): Unit = {
val system = ActorSystem(guardian, "ActorDiscovery")
import system.executionContext
system.scheduler.scheduleOnce(20.seconds, () => system.terminate())
}
In this article we discovered (pun) Akka Discovery, a powerful tool to find actors and use them in the situation where it’s hard (or impossible) to locate the right reference for your needs. We saw where Discovery is useful, we learned how to use the actor system’s receptionist to register, deregister and subscribe for updates, so that our actors can seamlessly send the right messages to the right actors.
Share on: