Hello Scala!
Akka Typed Actor从2.4开始直到2.5可以商用,进而Akka 2.6已经把Akka Typed Actor做为推荐的Actor使用模式。Typed Actor与原先的Untyped Actor最大的区别Actor有类型了,其签名也改成了akka.actor.typed.ActorRef[T]
。通过一个简单的示例来看看在Akka Typed环境下怎样使用Actor。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| sealed trait Command final case class Hello(message: String, replyTo: ActorRef[Reply]) extends Command final case class Tell(message: String) extends Command
sealed trait Reply final case class HelloReply(message: String) extends Reply
def apply(): Behavior[Command] = Behaviors.setup { context => Behaviors.receiveMessage { case Hello(message, replyTo) => replyTo ! HelloReply(s"$message, scala!") Behaviors.same case Tell(message) => context.log.debug("收到消息:{}", message) Behaviors.same } }
|
Akka Typed不再需要通过类的形式来实现Actor
接口定义,而是函数的形式来定义actor。可以看到,定义的actor类型为Behavior[T]
(形为),通过Behaviors.receiveMessage[T](T => Behavior[T]): Receive[T]
函数来处理接收到的消息,而Receive
继承了Behavior
trait。通过函数签名可以看到,每次接收到消息并对其处理完成后,都必需要返回一个新的形为。
apply(): Behavior[Command]
函数签名里的范性参数类型Command
限制了这个actor将只接收Command
或Command
子类型的消息,编译器将在编译期对传给actor的消息做类型检查,相对于从前的untyped actor可以向actor传入任何类型的消息,这可以限制的减少程序中的bug。特别是在程序规模很大,当你定义了成百上千个消息时。
也因为有类型的actor,在Akka Typed中没有了隐式发送的sender: ActorRef
,必需在发送的消息里面包含回复字段,就如Hello
消息定义里的replyTo: ActorRef[Reply]
字段一样。actor在处理完Hello
消息后可以通过它向发送者回复处理结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| class HelloScalaSpec extends ScalaTestWithActorTestKit with WordSpecLike { "HelloScala" should { "tell" in { val actorRef = spawn(HelloScala(), "tell") actorRef ! Tell("Hello") }
"replyTo" in { val actorRef = spawn(HelloScala(), "replyTo") val probe = createTestProbe[Reply]() actorRef ! Hello("hello", probe.ref)
probe.expectMessageType[HelloReply] should be(HelloReply("hello, scala!")) }
"ask" in { import akka.actor.typed.scaladsl.AskPattern._ val actorRef = spawn(HelloScala(), "ask") val reply = actorRef .ask[Reply](replyTo => Hello("Hello", replyTo)) .mapTo[HelloReply] .futureValue reply.message should be("Hello, scala!") } } }
|
更复杂的一个示例
上一个示例简单的演示了Akka Typed Actor的功能和基本使用方式,接下来看一个更复杂的示例,将展示Akka Typed更多的特性及功能。
首先是消息定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| sealed trait Command trait ControlCommand extends Command { val clientId: String } trait ReplyCommand extends Command { val replyTo: ActorRef[Reply] }
final case class Connect(clientId: String, replyTo: ActorRef[Reply]) extends ControlCommand with ReplyCommand final case class Disconnect(clientId: String, replyTo: ActorRef[Reply]) extends ControlCommand with ReplyCommand final case class QueryResource(clientId: String, replyTo: ActorRef[Reply]) extends ReplyCommand final private[typed] case object SessionTimeout extends Command final private case class ServiceKeyRegistered(registered: Receptionist.Registered) extends Command
sealed trait Reply final case class Connected(status: Int, clientId: String) extends Reply final case class Disconnected(status: Int, clientId: String) extends Reply final case class ResourceQueried(status: Int, clientId: String, resources: Seq[String]) extends Reply final case class ReplyError(status: Int) extends Reply
|
上面分别定义了actor可接收的请求消息:Command
和返回结果消息:Reply
。建议对于需要返回值的消息使用:replyTo
来命名收受返回值的actor字段,这里也可以不定义Reply
trait来做为统一的返回值类型,可以直接返回结果类型,如:ActorRef[String
。
这里将定义两个actor,一个做为父actor,一个做为子actor。父actor为:ComplexActor
,管理连接客户端和转发消息到子actor,每次有新的客户端连接上来时做以客户端clientId
做为名字创建一个子actor;子actor:ComplexClient
,保持客户端连接会话,处理消息……
ComplexActor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| final class ComplexActor private(context: ActorContext[ComplexActor.Command]) { import ComplexActor._ private var connects = Map.empty[String, ActorRef[Command]]
def init(): Behavior[Command] = Behaviors.receiveMessage { case ServiceKeyRegistered(registered) if registered.isForKey(serviceKey) => context.log.info("Actor be registered, serviceKey: {}", serviceKey) receive() .... }
def receive(): Behavior[Command] = Behaviors .receiveMessage[Command] { case cmd @ Connect(clientId, replyTo) => if (connects.contains(clientId)) { replyTo ! Connected(IntStatus.CONFLICT, clientId) } else { val child = context.spawn( Behaviors .supervise(ComplexClient(clientId)) .onFailure(SupervisorStrategy.restart), clientId) context.watch(child) connects = connects.updated(clientId, child) child ! cmd } Behaviors.same .... } .receiveSignal { case (_, Terminated(child)) => val clientId = child.path.name connects -= clientId context.unwatch(child) Behaviors.same } }
|
ComplexActor
在收到Connect
消息后将首先判断请求客户端ID(clientId
)是否已经连接,若重复连接将直接返回409错误(Connected(IntStatus.CONFLICT, _)
)。若是一个新连接将调用context.spawn
函数在创建一个字actor:ComplexClient
。spawn
函数签名如下:
1
| def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U]
|
behavior
是要创建的actor,name
为子actor的名字,需要保证在同一级内唯一(兄弟之间),props
可对actor作一些自定义,如:线程执行器(Dispatcher
)、邮箱等。
receiveSignal
用于接收系统控制信号消息,经典actor的preRestart
和postStop
回调函数(将分别做为PreRestart
和PostStop
信号),以及Terminated
消息都将做为信号发送到这里。
ComplexClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| final class ComplexClient private ( clientId: String, context: ActorContext[ComplexActor.Command]) { import ComplexActor._
def active(): Behavior[Command] = Behaviors.receiveMessagePartial { .... case SessionTimeout => context.log.warn("Inactive timeout, stop!") Behaviors.stopped }
def init(): Behavior[Command] = Behaviors.receiveMessage { case Connect(`clientId`, replyTo) => replyTo ! Connected(IntStatus.OK, clientId) context.setReceiveTimeout(120.seconds, SessionTimeout) active() case other => context.log.warn("Receive invalid command: {}", other) Behaviors.same }
|
ComplexClient
定义了两个形为函数,init()
和active
。当客户端连接成功以后会返回active()
函数作为actor新的形为来接收之后的消息。这种返回一个新的Behavior
函数的形式替代了经典actor里的become
、unbecome
函数,它更直观,甚至还可以使用这种方式来实现状态机。
context.setReceiveTimeout(120.seconds, SessionTimeout)
用来设置两次消息接收之间的超时时间,这里设备为120秒。可以通过方式来实现服务端会话(session)超时判断,当session超时时返回Behaviors.stopped
消息来停止actor(自己)。这里需要注意的是context.stop
只能用来停止直接子actor,停止actor自身返回stopped
形为即可,这与经典actor有着明显的区别。
发现actor
Akka Typed取消了actorSelection
函数,不再允许通过actor path路径来查找ActorRef。取而代之的是使用Receptionist
机制来注册服务(actor实例)。也就是说,在Akka Typed中,actor默认情况下是不能查找的,只能通过引用(ActorRef[T]
)来使用,要么actor之间具有父子关系,要么通过消息传递ActorRef[T]
……
1 2 3 4 5 6 7 8 9
| object ComplexActor { val serviceKey = ServiceKey[Command]("complex")
def apply(): Behavior[Command] = Behaviors.setup { context => val registerAdapter = context.messageAdapter[Receptionist.Registered](value => ServiceKeyRegistered(value)) context.system.receptionist ! Receptionist.Register(serviceKey, context.self, registerAdapter) new ComplexActor(context).init() } }
|
上面代码通过Receptionist.Register
将actor(context.self
引用)以serviceKey
注册到Actor系统的receptionist表,之后就可以通过serviceKey
来发现并获取此actor的引用。
1 2 3 4 5 6 7 8
| val actorRef: ActorRef[ComplexActor.Command] = system.receptionist .ask[Receptionist.Listing](Receptionist.Find(ComplexActor.serviceKey)) .map { listing => if (listing.isForKey(serviceKey)) listing.serviceInstances(serviceKey).head else throw new IllegalAccessException(s"Actor reference not found: $serviceKey") }
|
消息适配器
有时候,需要将不匹配的消息发送给actor,比如:把receptionist服务注册结果 Receptionist.Registered
发送给一个actor,我们可以通过将消息包装到一个实现了Command
trait的case class来实现。如下面的代码示例:
1 2
| val registerAdapter: ActorRef[Receptionist.Registered] = context.messageAdapter[Receptionist.Registered](value => ServiceKeyRegistered(value))
|
在使用Receptionist.Register
时将registerAdapter
作为第3个参数传入,这样服务注册结果就将被包装成ServiceKeyRegistered
消息传给actor。
在actor内部处理异步任务
actor内部消息都是串行执行的,在actor内执行异步操作时需要小心。不能在Future
的回调函数里直接操作actor内部变量,因为它们很可能在两个不同的线程中。
可以通过context.pipeToSelf
将异步结果转换成一个消息传递给actor,这样异步结果将进入actor的邮箱列队,通过正确的消息处理机制来处理。
1 2 3 4 5 6 7 8 9
| case QueryResource(_, replyTo) => context.pipeToSelf(findExternalResource())(value => InternalQueryResource(value, replyTo)) Behaviors.same
case InternalQueryResource(tryValue, replyTo) => replyTo ! tryValue .map(ResourceQueried(IntStatus.OK, clientId, _)) .getOrElse(ResourceQueried(IntStatus.INTERNAL_ERROR, clientId, Nil)) Behaviors.same
|
在ActorSystem[_]外部创建actor
Akka Typed开始,ActorSystem[T]
也拥有一个泛型参数,在构造ActorSystem时需要传入一个默认Behavior[T]
,并将其作为经典actor下的user守卫(也就类似拥有akka://system-name/user
这个路径的actor),同时ActorSystem[T]
的actorOf
函数也被取消。Akka Typed推荐应用都从传给ActorSystem的默认Behavior[T]
开始构建actor树。但有时,也许通过ActorSystem[T]
的实例来创建actor是有意义的,可以通过将typed的ActorSystem[T]
转换成经典的untyped ActorSystem
来实现。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| implicit val timeout = Timeout(2.seconds) implicit val system: ActorSystem[_] = _
val spawnActor: ActorRef[SpawnProtocol.Command] = system.toClassic .actorOf( PropsAdapter(Behaviors.supervise(SpawnProtocol()) .onFailure(SupervisorStrategy.resume)), "spawn") .toTyped[SpawnProtocol.Command]
val helloScalaF: Future[ActorRef[HelloScala.Command]] = spawnActor.ask[ActorRef[HelloScala.Command]](replyTo => SpawnProtocol.Spawn(HelloScala(), "sample", Props.empty, replyTo))
val helloScala: ActorRef[HelloScala.Command] = Await.result(helloScalaF, 2.seconds)
|
也可以将SpawnProtocol()
作为ActorSystem[_]
的初始Behavior[T]
来构造ActorSystem,这样就可以通过system.ask[ActorRef[T]](SpawnProtocol.Spawn(....))
来创建在user守卫下的actor了。
小结
本文通过两个例子展示了Akka Typed的特性,它与经典actor的区别还是挺大的。从untyped和typed,actor拥有了类型,这对于大规模actor系统开发可以在编译期发现很多重复,它将强制你在设计actor时首先考虑消息的定义。定义的消息即是actor之间的数据交互协议,消息定义的过程也是业务模式和模块划分的过程。
完整示例代码