前言:akka是一种基于Actor 模型,提供了一个在 JVM 上构建高并发、分布式和高容错应用程序的平台。框架资料较少,主要参考资料:akka官网文档:https://doc.akka.io/docs/akka/current/actors.html netty作为 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序,是目前最流行的 NIO 框架。
1、聊天室整体框架
聊天室demo较为简单,主要作为学习akka框架练手比较合适,可以帮助理清akka框架的逻辑与一些使用规则。本人在实习中主要使用单节点actor与集群actor进行了聊天室demo的编写,单节点较为简单,这里不做展示。同时由于公司主要使用kotlin语言进行开发,所以主要使用kotlin进行编写。
2、主要内容
2.1客户端与服务端模拟
客户端与服务端都是使用netty框架,客户端模拟用户的登录,服务端作为消息的转发,发送到akka集群中的分片区域的节点。 注意:这里netty没有添加心跳机制,同时注意需要考虑TCP粘包问题,进行tcp消息头与消息体的划分,否则在用户输入发送消息之后会产生粘包。同时在不同节点之间传输需要对传输的数据进行序列化,这里直接使用string编解码器,也可以使用protobuf进行自定义编解码器(推荐)。
Client
代码语言:java复制object Client {
@JvmStatic
fun main(args: Array<String>) {
val bootstrap = Bootstrap()
bootstrap
.group(NioEventLoopGroup())
.channel(NioSocketChannel::class.java) //注意客户端与服务端在这里的区别
.handler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(channel: SocketChannel) {
channel.pipeline()
.addLast(LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
// 添加LengthFieldPrepender来处理包长度信息
.addLast(LengthFieldPrepender(4))
.addLast(StringDecoder())
.addLast(StringEncoder())
.addLast(object : ChannelInboundHandlerAdapter(){
override fun channelActive(ctx: ChannelHandlerContext) {
println("与服务端链接已建立")
println("-------------------------------------------")
print("发送聊天消息(如果需要私聊用户则使用@用户名):")
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
val currentTime = LocalDateTime.now()
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
val formattedTime = currentTime.format(formatter)
print("r") // 回车符,回到行首
print(" r") // 清空当前行
println("---------------$formattedTime---------------")
println((msg as String).trim())
print("发送聊天消息(如果需要私聊用户则使用@用户名):") // 重新显示输入提示
}
})
}
})
.option(ChannelOption.SO_KEEPALIVE, true) //设置长连接
val channel = bootstrap.connect("localhost", 8080).sync().channel()
/**
* 如果私聊用户则需要在发送消息之前加上:@用户名
* */
val messageThread = Thread{
val scanner = Scanner(System.`in`) //模拟用户输入数据进行发送
try {
while (scanner.hasNextLine()) {
print("发送聊天消息(如果需要私聊用户则使用@用户名):")
val message = scanner.nextLine()
channel.writeAndFlush(message).sync()
}
}catch (e:InterruptedException){
e.printStackTrace()
}finally {
scanner.close()
}
}
messageThread.start()
channel.closeFuture().sync()
}
}
Server
代码语言:javascript复制object Server {
@JvmStatic
fun main(args: Array<String>) {
val actorSystem = ActorSystem.create("ChatServerKt")
val roomActor = actorSystem.actorOf(Props.create(RoomActor::class.java), "RoomActor")
var UserList:MutableList<Channel> = mutableListOf() //保存创建的用户
val bossGroup: EventLoopGroup = NioEventLoopGroup()
val workerGroup: EventLoopGroup = NioEventLoopGroup()
val bootstrap = ServerBootstrap()
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel::class.java)
.childHandler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(channel: SocketChannel) {
// channel.pipeline().addLast("IdleStateHandler", IdleStateHandler(0,10,0, TimeUnit.SECONDS));
channel.pipeline()
// .addLast(LineBasedFrameDecoder(8192)) // 添加行解码器
.addLast(LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
// 添加LengthFieldPrepender来处理包长度信息
.addLast(LengthFieldPrepender(4))
.addLast(StringDecoder())
.addLast(StringEncoder())
.addLast(object : ChannelInboundHandlerAdapter(){
override fun channelActive(ctx: ChannelHandlerContext) {
UserList.add(ctx.channel())
val num:Int = UserList.indexOf(ctx.channel())
val user = User("user${UserList.indexOf(ctx.channel())}",ctx.channel())
roomActor.tell(user, ActorRef.noSender()) //创建用户
println("user${num}连接")
println("当前用户数量为:${UserList.size}")
}
override fun channelInactive(ctx: ChannelHandlerContext) {
println("user${UserList.indexOf(ctx.channel())}退出")
UserList.remove(ctx.channel())
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
val msgstr = (msg as String).trim()
if(msgstr.startsWith("@")){ //@user1你好
val tonum = msgstr[5]-'0'
val fromnum:Int = UserList.indexOf(ctx.channel())
if(tonum==fromnum){
println("不能向自己发送消息!")
}else{
if(tonum > UserList.size){
println("目标用户不存在!")
}else{
val tomsg:String = msgstr.substring(6)
val toMsg = ToUserMsg("user$fromnum","user$tonum",tomsg,UserList[tonum])
roomActor.tell(toMsg, ActorRef.noSender())
}
}
}else{
println("服务端收到消息:$msgstr")
val num:Int = UserList.indexOf(ctx.channel())
val message = MsgEntity("user${num}",msg as String,ctx.channel())
roomActor.tell(message, ActorRef.noSender()) //向Router
}
}
})
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true)
bootstrap.bind(8080)
}
}
2.2 AKKA编写用户节点以及分区规则 首先编写一个UserActor来接收不同类型的消息,每次有新用户注册登录相当于集群会启动创建一个UserActor
代码语言:javascript复制class UserActor : AbstractPersistentActor() {
private val log: LoggingAdapter = Logging.getLogger(context.system, this)
// private val channelGroup: ChannelGroup = DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
private val mediator = DistributedPubSub.get(context.system).mediator()
override fun persistenceId(): String {
return "User-" self.path().name()
}
override fun preStart() {
mediator.tell(DistributedPubSubMediator.Subscribe("PublicMsg", self), self)
mediator.tell(DistributedPubSubMediator.Subscribe(self.path().name(), self), self)
//mediator.tell(DistributedPubSubMediator.Put(getSelf()), getSelf());
println("节点启动")
println("Actor路径: ${self.path()}")
println("Actor名称: ${self.path().name()}")
}
override fun createReceiveRecover(): Receive {
return receiveBuilder()
.match(PublicMsg::class.java){
println("UserActor重启成功")
}.build()
}
fun getTime():String{
val currentTime = LocalDateTime.now()
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
val formattedTime = currentTime.format(formatter)
return formattedTime
}
override fun createReceive(): Receive {
return receiveBuilder()
.match(PreWarmMessage::class.java){
_->
println("已经可以正常发送消息!")
}
.match(PublicMsg::class.java) { msg ->
println("---------------聊天室消息-${getTime()}---------------")
println("用户 ${msg.userId} : ${msg.msg}")
}
.match(PrivateMsg::class.java) { msg ->
println("---------------私聊消息-${getTime()}---------------")
println("来自用户 ${msg.fromUser}的私聊消息 : ${msg.msg}")
}
.match(DistributedPubSubMediator.SubscribeAck::class.java) {
_->
println("收到订阅消息")
}
.build()
}
companion object {
fun props(): Props {
return Props.create(UserActor::class.java)
}
}
}
使用AKKA框架的经典集群分片,需要编写一个类来集成ShardRegion类来设定分片规则,一般会把消息实体中封装的用户ID作为分片的实体ID,用户ID进行哈希作为分区ID。(消息实体类需自己定义)
代码语言:javascript复制class ShardExtractor:ShardRegion.MessageExtractor {
override fun entityId(message: Any?): String {
return when (message) {
is PreWarmMessage -> message.shardId ""
is PrivateMsg -> message.toUser!! ""
// is PublicMsg -> message.userId.toString() ""
is ShardRegion.StartEntity -> message.entityId()
is DistributedPubSubMediator.SubscribeAck -> "subscribe-ack-entity" // 添加处理 SubscribeAck 的逻辑
else -> throw RuntimeException("无法识别消息类型 $message")
}
}
override fun shardId(message: Any?): String {
return when (message) {
is PreWarmMessage -> (message.shardId.toString().hashCode() % 10).toString() ""
is PrivateMsg -> (message.toUser!!.toString().hashCode() % 10).toString() ""
// is PublicMsg -> (message.userId.toString().hashCode() % 10).toString() ""
is ShardRegion.StartEntity -> (message.entityId().hashCode() % 10).toString()
is DistributedPubSubMediator.SubscribeAck -> "0"
else -> throw RuntimeException("无法识别消息类型 $message")
}
}
override fun entityMessage(message: Any?): Any {
return message!!
}
}
2.3节点的配置 这边我图方便就起了两个节点模拟两个用户的登录
代码语言:javascript复制akka {
actor {
provider = "cluster"
allow-java-serialization = on
}
remote {
artery {
enabled = on
transport = tcp
canonical.hostname = "127.0.0.1"
canonical.port = 2551
}
}
cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551"
"akka://ClusterSystem@127.0.0.1:2552"
]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
cluster.sharding {
remember-entities = on
state-store-mode = persistence
}
persistence {
journal.plugin = "akka.persistence.journal.inmem" # 使用内存中的持久化插件,只适用于测试
snapshot-store.plugin = "akka.persistence.snapshot-store.local" # 使用本地文件系统快照存储
snapshot-store.local.dir = "target/snapshots/node1" # 快照存储路径
}
}
3、补充
netty在进行消息传输时,服务端收到消息才会创建useractor节点,所以在两个用户消息发送之前,需要在用户登陆成功之后自动向Server发送一个预热消息进行节点的启动与创建
代码语言:javascript复制data class PreWarmMessage(val shardId: String) : Serializable