如何在EasyCVR中实现NSQ延时推流技术?

2021-11-18 17:31:32 浏览数 (2)

EasyCVR 是TSINGSEE青犀视频开发的高稳定、高接入性的视频平台,可接入的协议丰富,且可通过国标协议级联。EasyCVR 各模块之间进行消息通信时,需要一款消息中间件进行消息的传输和发送。在调研各种 MQ 中间件后,确定采用 NSQ 来进行编译。

EasyCVR 使用 NSQ 时,希望延时 60s,消费端才能够收到对应的消息,因此我们本文主要是调研是否有该功能的过程,我们主要使用 DeferredPublish 方法实现,方法代码如下:

代码语言:javascript复制
package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "time"
   "zhangqiadams.com/gotools/model/consts"
)

func main() {
   config := nsq.NewConfig()
   // 1. 向 nsqd 的 tcp 端口发送消息,因此进行对应的配置
   producer, err := nsq.NewProducer("127.0.0.1:4154", config)
   if err != nil {
      log.Fatal(err)
   }

   messageBody := []byte("hello world delay")
   topicName := "topic2"

   // 2. 同步推流到 nspd, 同步推流代表等待 nspd 的响应,如果发送失败返回错误。
   // PublishAsync 代表是异步推送消息给 nspd,发送完消息后立刻返回
   err = producer.DeferredPublish(topicName, 60*time.Second, messageBody)
   fmt.Println("发送消息时间为", time.Now().Format(consts.TimeFormat))
   if err != nil {
      log.Fatal(err)
   }

   /*sigChan := make(chan os.Signal, 1)
   signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
   <-sigChan*/

   // 3. 停止生产者,一般在停止服务,停止进程的时候需要调用
   producer.Stop()
}

在 14:06:45 开始发送了一个消息。

消费者在 60s 后收到消息,14:07:46 收到对应的消息。

经过代码确认,延时消息的发送是在 nsqd 中进行实现的,延时推流功能已经实现。

0 人点赞