本篇作为scala快速入门系列的第四十篇博客,为大家带来的是关于Akka的内容。
Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。
Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。
Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换信息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确的并发程序和并行系统,Actor具有如下特性:
- 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
- 提供了异步非阻塞的、高性能的事件驱动编程模型
- 超级轻量级事件处理(每GB堆内存几百万Actor)
下面,将通过实战,来加深对Akka的熟练度。
实例
1.需求:
通过Akka的actor编程模型,实现2个进程间的通信。
2.架构图:
3.重要类介绍
ActorSystem:在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
注意: <1>. ActorSystem是一个进程中的老大,它负责创建和监督actor <2>. ActorSystem是一个单例对象 <3>. actor负责通信
4.Actor
在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。
(1)preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。
(2)receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。
5.具体代码
<1> Master 类
代码语言:javascript复制package cn.itcast.rpc
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
//todo:利用akka的actor模型实现2个进程间的通信-----Master端
class Master extends Actor{
//构造代码块先被执行
println("master constructor invoked")
//prestart方法会在构造代码块执行后被调用,并且只被调用一次
override def preStart(): Unit = {
println("preStart method invoked")
}
//receive方法会在prestart方法执行后被调用,表示不断的接受消息
override def receive: Receive = {
case "connect" =>{
println("a client connected")
//master发送注册成功信息给worker
sender ! "success"
}
}
}
object Master{
def main(args: Array[String]): Unit = {
//master的ip地址
val host=args(0)
//master的port端口
val port=args(1)
//准备配置文件信息
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//配置config对象 利用ConfigFactory解析配置文件,获取配置信息
val config=ConfigFactory.parseString(configStr)
// 1、创建ActorSystem,它是整个进程中老大,它负责创建和监督actor,它是单例对象
val masterActorSystem = ActorSystem("masterActorSystem",config)
// 2、通过ActorSystem来创建master actor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor")
// 3、向master actor发送消息
//masterActor ! "connect"
}
}
<2> Worker类
代码语言:javascript复制package cn.itcast.rpc
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
//todo:利用akka中的actor实现2个进程间的通信-----Worker端
class Worker extends Actor{
println("Worker constructor invoked")
//prestart方法会在构造代码块之后被调用,并且只会被调用一次
override def preStart(): Unit = {
println("preStart method invoked")
//获取master actor的引用
//ActorContext全局变量,可以通过在已经存在的actor中,寻找目标actor
//调用对应actorSelection方法,
// 方法需要一个path路径:1、通信协议、2、master的IP地址、3、master的端口 4、创建master actor老大 5、actor层级
val master: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@172.16.43.63:8888/user/masterActor")
//向master发送消息
master ! "connect"
}
//receive方法会在prestart方法执行后被调用,不断的接受消息
override def receive: Receive = {
case "connect" =>{
println("a client connected")
}
case "success" =>{
println("注册成功")
}
}
}
object Worker{
def main(args: Array[String]): Unit = {
//定义worker的IP地址
val host=args(0)
//定义worker的端口
val port=args(1)
//准备配置文件
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//通过configFactory来解析配置信息
val config=ConfigFactory.parseString(configStr)
// 1、创建ActorSystem,它是整个进程中的老大,它负责创建和监督actor
val workerActorSystem = ActorSystem("workerActorSystem",config)
// 2、通过actorSystem来创建 worker actor
val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker),"workerActor")
//向worker actor发送消息
workerActor ! "connect"
}
}
好了,本期的分享就到这里了,另外scala快速入门系列暂时先告一个段落,感谢各位小伙伴的支持和鼓励!