一、Akka概念
Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时环境。Akka用Scala 语言编写,同时提供了 Scala 、JAVA 的开发接口。
二、Akka 中 Actor 模型
2.1 Actor模型介绍
Akka 处理并发的方法基于 Actor 模型。在基于 Actor的系统里,所有的事物都是 Actor。这就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是Actor模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与Actor之间只能通过消息通信。
- 对并发模型进行了更高的抽象
- 异步、非阻塞、高性能的事件驱动编程模型
- 轻量级事件处理(1GB内存可容纳百万级别个Actor)
为什么 Actor 模型是一种处理并发问题的解决方案呢? 处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢? 答:无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor模型的出现解决了这个问题,简化并发编程,提升程序性能。
从图中可以看到,Actor 与 Actor 之前只能用消息进行通信,当某一个 Actor 给另外一个 Actor发消息,消息是有顺序的,只需要将消息投寄到相应的邮箱,至于对方 Actor 怎么处理你的消息你并不知道,当然你也可等待它的回复。
2.2 Actor工作机制
Actor 是 ActorSystem 创建的,ActorSystem 的职责是负责创建并管理其创建的 Actor,ActorSystem 的单例的,一个 JVM 进程中有一个即可,而 Acotr 是多例的。
如果Actor A需要给Actor B发送消息,必须获得ActorRef B,然后调用ActorRef B的recive方法,在该方法内有相关的API(sender方法)可以获得是由Actor A作为发送方发送给B的消息。
2.3 HelloActor示例
导入pom 依赖:
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcats.akka</groupId>
<artifactId>itcats-akka</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 定义一下常量 -->
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 添加akka的actor依赖 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- 多进程之间的Actor通信 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
<!-- 指定插件-->
<build>
<!-- 指定源码包和测试包的位置 -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- maven打包的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<!-- 指定main方法 -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.sheep.robot.ClientActor</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
创建一个最简单的Actor模型,实现自己给自己发消息的环回实验
代码语言:javascript复制//继承akka.actor.Actor
class HelloActor extends Actor{
//用于接收Receive
//type Receive = PartialFunction[Any, Unit]
override def receive: Receive = {
//接收消息并处理
case "itcats" => println("This is itcats")
case "itcats_cn" => println("This is itcats_cn")
case "stop" => {
context.stop(self) //self: ActorRef引用自己,将Actor关闭
context.system.terminate() //关闭ActorSystem
}
case _ => println("What is is?")
}
}
object HelloActor{
//1.获取ActorSystem工厂对象,并命名
private val helloActorFactory = ActorSystem("HelloActorFactory")
//2.从ActorSystem获取HelloActor的引用对象ActorRef,并命名
private val helloActorRef: ActorRef = helloActorFactory.actorOf(Props[HelloActor],"helloActor")
def main(args: Array[String]): Unit = {
//给自己(HelloActor)发送消息 方法名称就是!
helloActorRef ! "itcats" //给自己发送了消息itcats
helloActorRef ! "itcats_cn" //给自己发送了消息itcats_cn
helloActorRef ! "stop" //给自己发送消息stop
}
}
打印结果:
代码语言:javascript复制This is itcats
This is itcats_cn
打印完运行结果后,发现Actor工程一直在运行。实际上说明了Dispatcher Message内部是一个线程池,receive()方法实际上是从自己的Mail Box中取出消息,内部类似于调用Runnable的run方法。
三、"乒乓球"模型的Akka通信
代码语言:javascript复制import akka.actor.{Actor, ActorRef}
class AActor(val bActor: ActorRef) extends Actor{
override def receive: Receive = {
case "a is ready" => {
println("a is ready")
bActor ! "啪"
}
case "啪啪" => {
println("接到了BActor的球")
Thread.sleep(1000)
bActor ! "啪"
}
}
}
代码语言:javascript复制import akka.actor.Actor
class BActor extends Actor {
override def receive: Receive = {
case "b is ready" => println("b is ready")
case "啪" => {
println("接到了AActor的球")
Thread.sleep(1000)
sender() ! "啪啪"
}
}
}
执行类:
代码语言:javascript复制import akka.actor.{ActorRef, ActorSystem, Props}
// A --> B
// B --> A
object PingPongMain {
def main(args: Array[String]): Unit = {
//创建ActorSystem
val pingpongActorSystem = ActorSystem("pingpongActorSystem")
//创建bActor的引用对象
val bActor: ActorRef = pingpongActorSystem.actorOf(Props[BActor],"bActor")
//创建aActor的引用对象
val aActor: ActorRef = pingpongActorSystem.actorOf(Props(new AActor(bActor)),"aActor")
bActor ! "b is ready"
aActor ! "a is ready"
}
}
四、基于Akka实现智能聊天客服
假设智能聊天客服能回答如下问题
1、在吗
2、你叫什么名字
3、你是男的还是女的
4、你在哪
对于其他问题直接返回:我听不懂你的问题!
客户端程序:
代码语言:javascript复制import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class ChatServer extends Actor {
override def receive: Receive = {
case "start" => println("服务器已就绪")
case ClientToServerMessage(msg) => {
println(s"收到客户端消息: $msg")
msg match {
case "在吗" => sender() ! ServerToClientMessage("我在")
case "你叫什么名字" => sender() ! ServerToClientMessage("我是中国电信语音客服")
case "你是男的还是女的" => sender() ! ServerToClientMessage("我是小姐姐哟~")
case "你在哪" => sender() ! ServerToClientMessage("我在广东省广州市")
case _ => sender() ! ServerToClientMessage("我听不懂你亲的意思,客服还在不断学习中哦~")
}
}
}
}
//为了服务端能够独立运行,定义一个伴生对象
object ChatServer extends App {
val host: String = "127.0.0.1"
val port: Int = 8878
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
//指定服务器的IP和端口号
private val server = ActorSystem("Server", config)
private val serverActorRef: ActorRef = server.actorOf(Props[ChatServer], "chat")
serverActorRef ! "start"
}
客户端程序:
代码语言:javascript复制import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
class ClientActor extends Actor{
var serverActorRef: ActorSelection = _
//根据服务端的协议地址
override def preStart(): Unit = {
//获得服务端的引用对象
serverActorRef = context.actorSelection("akka.tcp://Server@127.0.0.1:8878/user/chat")
}
override def receive = {
case "start" => println("聊天客户端已就绪")
case msg: String => {
serverActorRef ! ClientToServerMessage(msg)
}
case ServerToClientMessage(msg) => {
println(s"收到服务端消息: $msg")
}
}
}
//创建伴生类对象,可执行
object ClientActor extends App {
val host: String = "127.0.0.1"
//再启动只需修改端口号即可
val port: Int = 8879
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
//指定服务器的IP和端口号
private val clientSystem = ActorSystem("client", config)
private val actorRef: ActorRef = clientSystem.actorOf(Props[ClientActor], "chatClient1")
//再启动一个客户端修改名称和端口号即可
//private val actorRef: ActorRef = clientSystem.actorOf(Props[ClientActor], "chatClient2")
actorRef ! "start"
//接收客户端的参数
while(true){
//获取用户输入的内容
val question = StdIn.readLine()
actorRef ! question
}
}
协议格式样例类:
代码语言:javascript复制//服务端发送给客户端的消息格式
case class ServerToClientMessage(msg: String)
//客户端发送给服务端的消息格式
case class ClientToServerMessage(msg: String)