实习培训考核内容--Akka+Netty编写聊天室系统

2024-08-19 18:35:29 浏览数 (2)

前言:akka是一种基于Actor 模型,提供了一个在 JVM 上构建高并发、分布式和高容错应用程序的平台。框架资料较少,主要参考资料:akka官网文档:https://doc.akka.io/docs/akka/current/actors.html netty作为 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序,是目前最流行的 NIO 框架。

1、聊天室整体框架

聊天室demo较为简单,主要作为学习akka框架练手比较合适,可以帮助理清akka框架的逻辑与一些使用规则。本人在实习中主要使用单节点actor与集群actor进行了聊天室demo的编写,单节点较为简单,这里不做展示。同时由于公司主要使用kotlin语言进行开发,所以主要使用kotlin进行编写。

聊天室demo整体框架聊天室demo整体框架

2、主要内容

2.1客户端与服务端模拟

客户端与服务端都是使用netty框架,客户端模拟用户的登录,服务端作为消息的转发,发送到akka集群中的分片区域的节点。 注意:这里netty没有添加心跳机制,同时注意需要考虑TCP粘包问题,进行tcp消息头与消息体的划分,否则在用户输入发送消息之后会产生粘包。同时在不同节点之间传输需要对传输的数据进行序列化,这里直接使用string编解码器,也可以使用protobuf进行自定义编解码器(推荐)。

Client

代码语言:java复制
object Client {
    @JvmStatic
    fun main(args: Array<String>) {
        val bootstrap = Bootstrap()
        bootstrap
            .group(NioEventLoopGroup())
            .channel(NioSocketChannel::class.java) //注意客户端与服务端在这里的区别
            .handler(object : ChannelInitializer<SocketChannel>() {
                override fun initChannel(channel: SocketChannel) {
                    channel.pipeline()
                        .addLast(LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
                        // 添加LengthFieldPrepender来处理包长度信息
                        .addLast(LengthFieldPrepender(4))
                        .addLast(StringDecoder())
                        .addLast(StringEncoder())
                        .addLast(object : ChannelInboundHandlerAdapter(){
                            override fun channelActive(ctx: ChannelHandlerContext) {
                                println("与服务端链接已建立")
                                println("-------------------------------------------")
                                print("发送聊天消息(如果需要私聊用户则使用@用户名):")

                            }
                            override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
                                val currentTime = LocalDateTime.now()
                                val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
                                val formattedTime = currentTime.format(formatter)
                                print("r")  // 回车符,回到行首
                                print("                                       r")  // 清空当前行
                                println("---------------$formattedTime---------------")
                                println((msg as String).trim())
                                print("发送聊天消息(如果需要私聊用户则使用@用户名):")  // 重新显示输入提示
                            }
                        })
                }
            })
            .option(ChannelOption.SO_KEEPALIVE, true) //设置长连接
        val channel = bootstrap.connect("localhost", 8080).sync().channel()
        /**
         * 如果私聊用户则需要在发送消息之前加上:@用户名
         * */
        val messageThread = Thread{
            val scanner = Scanner(System.`in`)  //模拟用户输入数据进行发送
            try {
                while (scanner.hasNextLine()) {
                    print("发送聊天消息(如果需要私聊用户则使用@用户名):")
                    val message = scanner.nextLine()
                    channel.writeAndFlush(message).sync()
                }
            }catch (e:InterruptedException){
                e.printStackTrace()
            }finally {
                scanner.close()
            }
        }
        messageThread.start()
        channel.closeFuture().sync()
    }
}


Server

代码语言:javascript复制
object Server {
    @JvmStatic
    fun main(args: Array<String>) {
        val actorSystem = ActorSystem.create("ChatServerKt")
        val roomActor = actorSystem.actorOf(Props.create(RoomActor::class.java), "RoomActor")
        var UserList:MutableList<Channel> = mutableListOf() //保存创建的用户
        val bossGroup: EventLoopGroup = NioEventLoopGroup()
        val workerGroup: EventLoopGroup = NioEventLoopGroup()
        val bootstrap = ServerBootstrap()
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel::class.java)
            .childHandler(object : ChannelInitializer<SocketChannel>() {
                override fun initChannel(channel: SocketChannel) {
       //             channel.pipeline().addLast("IdleStateHandler", IdleStateHandler(0,10,0, TimeUnit.SECONDS));
                    channel.pipeline()
    //                    .addLast(LineBasedFrameDecoder(8192)) // 添加行解码器
                        .addLast(LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
                        // 添加LengthFieldPrepender来处理包长度信息
                        .addLast(LengthFieldPrepender(4))
                        .addLast(StringDecoder())
                        .addLast(StringEncoder())
                        .addLast(object : ChannelInboundHandlerAdapter(){
                        override fun channelActive(ctx: ChannelHandlerContext) {
                            UserList.add(ctx.channel())
                            val num:Int = UserList.indexOf(ctx.channel())
                            val user = User("user${UserList.indexOf(ctx.channel())}",ctx.channel())
                            roomActor.tell(user, ActorRef.noSender())  //创建用户
                            println("user${num}连接")
                            println("当前用户数量为:${UserList.size}")
                        }
                        override fun channelInactive(ctx: ChannelHandlerContext) {
                            println("user${UserList.indexOf(ctx.channel())}退出")
                            UserList.remove(ctx.channel())
                        }
                        override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
                            val msgstr = (msg as String).trim()
                            if(msgstr.startsWith("@")){ //@user1你好
                                val tonum = msgstr[5]-'0'
                                val fromnum:Int = UserList.indexOf(ctx.channel())
                                if(tonum==fromnum){
                                    println("不能向自己发送消息!")
                                }else{
                                    if(tonum > UserList.size){
                                        println("目标用户不存在!")
                                    }else{
                                        val tomsg:String = msgstr.substring(6)
                                        val toMsg = ToUserMsg("user$fromnum","user$tonum",tomsg,UserList[tonum])
                                        roomActor.tell(toMsg, ActorRef.noSender())
                                    }
                                }
                            }else{
                                println("服务端收到消息:$msgstr")
                                val num:Int = UserList.indexOf(ctx.channel())
                                val message = MsgEntity("user${num}",msg as String,ctx.channel())
                                roomActor.tell(message, ActorRef.noSender()) //向Router
                            }
                        }
                    })
                }
            })
            .childOption(ChannelOption.SO_KEEPALIVE, true)
        bootstrap.bind(8080)

    }
}

2.2 AKKA编写用户节点以及分区规则 首先编写一个UserActor来接收不同类型的消息,每次有新用户注册登录相当于集群会启动创建一个UserActor

代码语言:javascript复制
class UserActor : AbstractPersistentActor() {
    private val log: LoggingAdapter = Logging.getLogger(context.system, this)
//    private val channelGroup: ChannelGroup = DefaultChannelGroup(GlobalEventExecutor.INSTANCE)

    private val mediator = DistributedPubSub.get(context.system).mediator()

    override fun persistenceId(): String {
        return "User-"   self.path().name()
    }
    override fun preStart() {
        mediator.tell(DistributedPubSubMediator.Subscribe("PublicMsg", self), self)
        mediator.tell(DistributedPubSubMediator.Subscribe(self.path().name(), self), self)
        //mediator.tell(DistributedPubSubMediator.Put(getSelf()), getSelf());
        println("节点启动")
        println("Actor路径: ${self.path()}")
        println("Actor名称: ${self.path().name()}")
    }
    override fun createReceiveRecover(): Receive {
        return receiveBuilder()
            .match(PublicMsg::class.java){
                println("UserActor重启成功")
            }.build()
    }
    fun getTime():String{
        val currentTime = LocalDateTime.now()
        val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
        val formattedTime = currentTime.format(formatter)
        return formattedTime
    }
    override fun createReceive(): Receive {
        return receiveBuilder()
            .match(PreWarmMessage::class.java){
                _->
                println("已经可以正常发送消息!")
            }
            .match(PublicMsg::class.java) { msg ->
                println("---------------聊天室消息-${getTime()}---------------")
                println("用户 ${msg.userId} : ${msg.msg}")
            }
            .match(PrivateMsg::class.java) { msg ->
                println("---------------私聊消息-${getTime()}---------------")
                println("来自用户 ${msg.fromUser}的私聊消息 : ${msg.msg}")
            }
            .match(DistributedPubSubMediator.SubscribeAck::class.java) {
                _->
                println("收到订阅消息")
            }
            .build()
    }
    companion object {
        fun props(): Props {
            return Props.create(UserActor::class.java)
        }
    }
}

使用AKKA框架的经典集群分片,需要编写一个类来集成ShardRegion类来设定分片规则,一般会把消息实体中封装的用户ID作为分片的实体ID,用户ID进行哈希作为分区ID。(消息实体类需自己定义)

代码语言:javascript复制
class ShardExtractor:ShardRegion.MessageExtractor {
    override fun entityId(message: Any?): String {
        return when (message) {
            is PreWarmMessage -> message.shardId ""
            is PrivateMsg -> message.toUser!! ""
       //     is PublicMsg -> message.userId.toString()   ""
            is ShardRegion.StartEntity -> message.entityId()
            is DistributedPubSubMediator.SubscribeAck -> "subscribe-ack-entity" // 添加处理 SubscribeAck 的逻辑
            else -> throw RuntimeException("无法识别消息类型 $message")
        }
    }
    override fun shardId(message: Any?): String {
        return when (message) {
            is PreWarmMessage -> (message.shardId.toString().hashCode() % 10).toString()   ""
            is PrivateMsg -> (message.toUser!!.toString().hashCode() % 10).toString()   ""
    //        is PublicMsg -> (message.userId.toString().hashCode() % 10).toString()   ""
            is ShardRegion.StartEntity -> (message.entityId().hashCode() % 10).toString()
            is DistributedPubSubMediator.SubscribeAck -> "0"
            else -> throw RuntimeException("无法识别消息类型 $message")
        }
    }
    override fun entityMessage(message: Any?): Any {
        return message!!
    }
}

2.3节点的配置 这边我图方便就起了两个节点模拟两个用户的登录

代码语言:javascript复制
akka {
  actor {
    provider = "cluster"
    allow-java-serialization = on
  }
  remote {
    artery {
      enabled = on
      transport = tcp
      canonical.hostname = "127.0.0.1"
      canonical.port = 2551
    }
  }
  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551"
      "akka://ClusterSystem@127.0.0.1:2552"
    ]
     downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
  cluster.sharding {
    remember-entities = on
    state-store-mode = persistence
  }
    persistence {
      journal.plugin = "akka.persistence.journal.inmem"  # 使用内存中的持久化插件,只适用于测试
      snapshot-store.plugin = "akka.persistence.snapshot-store.local"  # 使用本地文件系统快照存储
      snapshot-store.local.dir = "target/snapshots/node1"  # 快照存储路径
    }
}

3、补充

netty在进行消息传输时,服务端收到消息才会创建useractor节点,所以在两个用户消息发送之前,需要在用户登陆成功之后自动向Server发送一个预热消息进行节点的启动与创建

代码语言:javascript复制
data class PreWarmMessage(val shardId: String) : Serializable

其余细节性的内容没有过多展示,新手代码编写可能较为冗余,文章只是作为自己学习的记录,可能没有太大的参考意义,所以希望大佬们嘴下留情

0 人点赞