设计了简单高效的弹幕系统!老板直接加薪

2024-09-18 17:41:24 浏览数 (1)

先赞后看,南哥助你Java进阶一大半

弹幕系统最早起源于日本,流行于视频网站niconico。我们认识的初音未来(Hatsune Miku)就是在niconico平台上爆红的!!

我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。

⭐⭐⭐本文收录在《Java学习/进阶/面试指南》:https://github..JavaSouth

1. 直播弹幕设计

1.1 底层数据结构支持

南友们看看右下角的弹幕列表,这个弹幕列表就是我们今天要的攻克的对象,至于中间视频直播的走马灯弹幕,它其实也是根据弹幕列表的数据来进行滚动。

南哥观察了下这个直播间,现在有41.5万人在观看!

计算机世界实际上是现实世界的抽象,那弹幕列表我们要用什么数据结构支持。

我希望用的是Redis,Redis官方写着这么霸气的宣传语。

Get the world’s fastest in-memory database from the ones who built it

从构建者那里获取世界上最快的内存数据库

而底层数据结构我们使用Redis五大基本数据类型之一:Zset。Zset是一种有序集合类型,它有一个score值,score值用来存储用户发送弹幕的时间戳,那整个列表就会根据时间戳来进行排序。

而Zset元素的值就作为用户弹幕,例如上图的:剧本演了又演

有了底层数据结构支持,我们来说说这个弹幕列表有什么功能限制。大家有没注意到我们进入直播间,直播间是不会把所有的弹幕内容都

显示出来的,往往只是显示前10条。

那我们也给弹幕列表加上这个特性,在Redis的Zset结构设置只保留前10条的属性。

代码语言:java复制
// 创建Zset数据结构并设置10条的限制
public class DanmakuService {

    private Jedis jedis = new Jedis("localhost");

    public void addDanmaku(String roomId, String danmaku, long timestamp) {
        String key = "room:"   roomId   ":danmaku";
        jedis.zadd(key, timestamp, danmaku);
        // 只保留最新的10条弹幕
        jedis.zremrangeByRank(key, 0, -(11));
    }
}

1.2 弹幕列表查询

那用户进入直播间,弹幕列表是怎么查询出来的?

我们按最简单高效来,用户进入直播间,客户端调用API接口去查询出Redis里的弹幕列表。

有南友会问:这只是最近的前10条聊天记录,后面的呢?

别急,有两种方案。

(1)轮询查询

客户端轮询查询API接口,不断抓取出用户发出的新弹幕。具体细节的话,客户端第一次查询出的弹幕列表的数据结构是:[(时间戳1: 弹幕1), (时间戳2: 弹幕2)]

后续查询客户端继续轮询调用API接口,同时携带当前弹幕列表的最大时间戳入参。而后端服务就会根据该时间戳返回比该时间戳大的数据,用户发送的新的弹幕也就会显示出来。

代码语言:java复制
// 轮询API接口
public class DanmakuService {

    private Jedis jedis = new Jedis("localhost");

    public Set<String> getRecentDanmaku(String roomId, long lastTimestamp) {
        String key = "room:"   roomId   ":danmaku";
        return jedis.zrangeByScore(key, lastTimestamp   1, Long.MAX_VALUE);
    }
}

轮询API我们要设置多少时间轮询一次呢?我们先设置 3 秒轮询更新一次弹幕列表,后续再根据用户反馈、服务器资源进行优化调整。

(2)WebSocket技术

视频直播间有个特点,主播和观众是无时不刻在进行互动聊天的,这就要求音视频要实时同步了。那直播弹幕就更应该实时,使用第一种轮询API的方法,可能会有 3 秒延迟的情况发生。

要实时推送新的弹幕,我们可以使用WebSocket技术,客户端和WebSocket服务器保存长连接,用户只要发送新的弹幕消息,WebSocket服务器便会实时推送到客户端上。

第一种方法虽然粗暴,如果轮询查询会空,那本次查询就是一次资源浪费,对服务器资源不友好。但他简单高效,出错情况也少。

如果老板要你半个月上线这个弹幕列表功能,那第一种方法也未尝不可。后续我们再来根据实际情况作出升级调整的策略,例如升级为WebSocket技术。

1.3 系统流程

南哥画下整个系统的流程。

用户通过客户端发送弹幕,通过后端服务把弹幕消息发送到Kafka。使用Kafka消息我们就可以进行流量削峰,弹幕消息有时是成千上万 / 秒,把存储弹幕消息的任务通过消息队列缓存起来,一个个执行,减少服务器瞬时的高压力。

发送到Kafka后,负责监听弹幕Kafka消息的后端服务会把弹幕消息写入Redis。

代码语言:java复制
// 监听写入Redis
public class DanmakuListener {

    private Jedis jedis = new Jedis("localhost");

    @KafkaListener(topics = "danmaku", groupId = "group_id")
    public void listen(ConsumerRecord<String, String> record) {
        String roomId = record.key();
        String message = record.value();
        long timestamp = System.currentTimeMillis();
        
        String key = "room:"   roomId   ":danmaku";
        // 将弹幕消息写入Redis Zset
        jedis.zadd(key, timestamp, message);
        // 保留最近10条
        jedis.zremrangeByRank(key, 0, -11);
    }
}

1.4 消息丢失问题

有这么一种情况,用户A发送了(1726406132, 弹幕A),用户B发送了(1726406150, 弹幕B),用户A先发送了弹幕,用户B再发送弹幕。

如果用户B的弹幕先写入到了Redis的Zset列表,其他用户进入直播间查询了第一个弹幕列表。那即使用户A后面成功写入了弹幕,其他用户也不会获取到用户A的弹幕。

因为客户端只会更新比弹幕B时间戳更大的弹幕消息。这怎么处理?

我们要解决的是弹幕B先于弹幕A成功写入的问题,不考虑其他特殊情况,可以给写入Redis的方法加上分布式锁的功能,保证先获取锁的弹幕消息写入的过程中,不会有其他弹幕消息写入的干扰。

代码语言:java复制
// 在写入弹幕时获取分布式锁
public class DanmakuService {

    private RedisLockUtil redisLock = new RedisLockUtil();
    
    private Jedis jedis = new Jedis("localhost");

    public void addDanmakuWithLock(String roomId, String danmaku, long timestamp) {
        String lockKey = "lock:"   roomId;
        try {
            if (redisLock.acquireLock(lockKey)) {
                String key = "room:"   roomId   ":danmaku";
                jedis.zadd(key, timestamp, danmaku);
                jedis.zremrangeByRank(key, 0, -11);
            }
        } finally {
            redisLock.releaseLock(lockKey);
        }
    }
}
代码语言:java复制
public class RedisLockUtil {

    private Jedis jedis = new Jedis("localhost");
    
    private static final int EXPIRE_TIME = 5000; // 5秒

    // 获取锁
    public boolean acquireLock(String lockKey) {
        long currentTime = System.currentTimeMillis();
        String result = jedis.set(lockKey, String.valueOf(currentTime), "NX", "PX", EXPIRE_TIME);
        return "OK".equals(result);
    }

    // 释放锁
    public void releaseLock(String lockKey) {
        jedis.del(lockKey);
    }
}

戳这,《JavaSouth》作为一份涵盖Java程序员所需掌握核心知识、面试重点的《Java学习进阶指南》。

我是南哥,南就南在Get到你的有趣评论➕点赞➕关注。

创作不易,不妨点赞、收藏、关注支持一下,各位的支持就是我创作的最大动力❤️

0 人点赞