如何在Native层设计一个消息队列

2022-05-25 14:45:24 浏览数 (1)

做过Android开发的同学对Handler、Looper、MessageQueue、Message应该是非常熟悉了,Android是一个基于消息驱动的系统,我们在日常开发中用到消息队列的地方非常多。Android也给我们封装好了一个强大易用的消息处理API,音视频开发核心逻辑都会放在Native层,我们也希望在C 层实现这样的消息队列。

例如VideoEditor会创建一个GL线程,这个线程会构建EGL环境,我们可以在这个线程中构造EGLContext,然后使用OpenGL工具绘制各种效果。在此过程中,需要保持GL线程的统一,不然不同线程要通过共享EGLContext才可以实现效果了。言归正传,不需要多复杂的IPC机制,我们只需要实现一个简易的消息队列机制就行了。Android的消息队列机制实现太过复杂了,其实在音视频中很多东西不需要这么复杂,我们只需要将我们需要的那部分抠出来就可以了。

Android消息队列

我们首先分析一下Android中消息队列是如何设计的,下面是消息队列相关的类:

代码语言:javascript复制
HandlerThread
Looper
Handler
MessageQueue
Message

正常构建一个消息分发机制的代码如下:

代码语言:javascript复制
HandlerThread thread = new HandlerThread("Message Thread");
thread.start();
Handler handler = new Handler(thread.getLooper());
//......
handler.sendMessage(....)

大体的流程如下:

代码语言:javascript复制
1.通过创建HandlerThread实例,HandlerThread实例中构建一个Looper实例
2.通过调用HandlerThread实例的start()方法开始执行消息队列轮转,进入Looper中的轮转
3.Handler实例中持有刚刚创建的Looper实例
4.Looper实例中构建一个消息队列MessageQueue
5.Handler每次发送消息都会通过Handler持有的Looper实例添加到消息队列中
6.Looper轮转中会消化处理消息

简单的流程示意如下图:

可以看到Looper.java中的轮转函数中有无限循环在执行,这个无限循环中会不断地处理消息队列中的消息(如果消息队列中存在消息的话),如果消息队列中不存在消息,那就一直等着。

从上面我们简单地分析中可以比较清晰地了解了Android原生的消息队列机制,不过有些地方实现的过于复杂了,在音视频SDK处理中可以不必要这么复杂,至于复杂的地方我在下面会提到的。下面我们根据对Android原生消息队列的分析来提供C 层的消息队列机制。

C 消息队列

我们照葫芦画瓢在C 中定义了几个文件:

代码语言:javascript复制
handler_thread.cc
handler.cc
looper.cc
message_queue.cc
message.cc

每个文件提供的功能和Android基本上一致,不过我们还是先简单分析一下代码,方便后续的阐述。

handler_thread.cc

代码语言:javascript复制
#include "handler_thread.h"
#include "log.h"

namespace thread {

HandlerThread *HandlerThread::Create(std::string name) {
  return new HandlerThread(name);
}

static void *RunTask(void *context) {
  auto handler_thread = reinterpret_cast<HandlerThread *>(context);
  handler_thread->RunInternal();
  pthread_exit(nullptr);
}

HandlerThread::HandlerThread(std::string name)
  : name_(name)
  , looper_(nullptr)
  , exiting_(false)
  , exited_(false) {
  pthread_mutex_init(&mutex_, nullptr);
  pthread_cond_init(&cond_, nullptr);
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  pthread_create(&thread_, &attr, RunTask, (void *) this);
}

void HandlerThread::RunInternal() {
  pthread_mutex_lock(&mutex_);
  exiting_ = false;
  exited_ = false;
  pthread_mutex_unlock(&mutex_);

  Looper::Prepare();
  pthread_mutex_lock(&mutex_);
  looper_ = Looper::MyLooper();
  pthread_cond_broadcast(&cond_);
  pthread_mutex_unlock(&mutex_);

  Looper::Loop();
  Looper::Exit();

  pthread_mutex_lock(&mutex_);
  exiting_ = false;
  looper_ = nullptr;
  exited_ = true;
  pthread_mutex_unlock(&mutex_);
}

HandlerThread::~HandlerThread() {
  if (looper_) {
    looper_->Quit(true);
  }
  pthread_join(thread_, nullptr);
  pthread_mutex_destroy(&mutex_);
  pthread_cond_destroy(&cond_);
  if (looper_) {
    delete looper_;
    looper_ = nullptr;
  }
}

void HandlerThread::Quit() {
  pthread_mutex_lock(&mutex_);
  if (exiting_ || exited_) {
    pthread_mutex_unlock(&mutex_);
    return;
  }
  exiting_ = true;
  pthread_mutex_unlock(&mutex_);
  Looper *looper = GetLooper();
  if (looper) {
    looper->Quit(false);
  }
}

bool HandlerThread::QuitSafely() {
  pthread_mutex_lock(&mutex_);
  if (exiting_ || exited_) {
    pthread_mutex_unlock(&mutex_);
    return false;
  }
  exiting_ = true;
  pthread_mutex_unlock(&mutex_);
  Looper *looper = GetLooper();
  if (looper) {
    looper->Quit(true);
    return true;
  }
  return false;
}

Looper *HandlerThread::GetLooper() {
  pthread_mutex_lock(&mutex_);
  if (exited_) {
    LOGE("Thread has been exited");
    pthread_mutex_unlock(&mutex_);
    return nullptr;
  }
  if (looper_ == nullptr) {
    LOGE("Thread should wait");
    pthread_cond_wait(&cond_, &mutex_);
  }
  pthread_mutex_unlock(&mutex_);
  return looper_;
}

handler.cc

代码语言:javascript复制
#include "handler.h"
#include "log.h"
/**
 *
 * HandlerThread 持有 Looper
 * Handler 持有 Looper
 * Handler 发送消息通过Looper轮转消息
 * Looper 中持有MessageQueue来管理消息
 */

namespace thread {

Handler::Handler(Looper *looper, HandlerCallback *callback)
  : looper_(looper)
  , callback_(callback) {
}

Handler::~Handler() {
}

void Handler::SendMessage(Message *msg) {
  if (looper_) {
    msg->target = this;
    looper_->SendMessage(msg);
  }
}

void Handler::DispatchMessage(Message *msg) {
  if (callback_) {
    callback_->HandleMessage(msg);
  }
}

void Handler::RemoveMessage(int what) {
  if (looper_) {
    looper_->RemoveMessage(what);
  }
}

int Handler::Size() {
  if (looper_) {
    return looper_->Size();
  }
  return 0;
}

}

looper.cc

代码语言:javascript复制
#include "looper.h"
#include "thread.h"
#include "log.h"
#include <cassert>
#include "time_utils.h"

namespace thread {

Looper::Looper()
  : exiting_(false)
  , exited_(false)
  , exit_safely_(false)
  , looping_(false) {
  message_queue_ = new MessageQueue();
  pthread_mutex_init(&variable_mutex_, nullptr);
}

Looper::~Looper() {
  pthread_mutex_destroy(&variable_mutex_);
}

void Looper::Prepare() {
  int64_t tid = Thread::CurrentThreadId();
  Looper *looper = LooperManager::GetInstance()->Create(tid);
  if (looper == nullptr) {
    LOGE("Current thread looper has been called");
  }
}

void Looper::Loop() {
  MyLooper()->LoopInternal();
}

Looper * Looper::MyLooper() {
  int64_t tid = Thread::CurrentThreadId();
  Looper *looper = LooperManager::GetInstance()->Get(tid);
  if (looper == nullptr) {
    LOGE("Please invoke Looper::Prepare first");
  }
  assert(looper);
  return looper;
}

int64_t Looper::MyLooperId() {
  return reinterpret_cast<int64_t>(MyLooper());
}

void Looper::Exit() {
  int64_t tid = Thread::CurrentThreadId();
  LooperManager::GetInstance()->Remove(tid);
}

void Looper::Quit(bool safely) {
  pthread_mutex_lock(&variable_mutex_);
  if (exiting_ || exited_) {
    pthread_mutex_unlock(&variable_mutex_);
    return;
  }
  exit_safely_ = safely;
  exiting_ = true;
  pthread_mutex_unlock(&variable_mutex_);
  message_queue_->Notify();
}

void Looper::Dump() {
  message_queue_->Dump();
}

int Looper::Size() {
  return message_queue_->Size();
}

void Looper::SendMessage(Message *msg) {
  pthread_mutex_lock(&variable_mutex_);
  if (exiting_ || exited_) {
    pthread_mutex_unlock(&variable_mutex_);
    return;
  }
  pthread_mutex_unlock(&variable_mutex_);
  EnqueueMessage(msg);
}

void Looper::RemoveMessage(int what) {
  message_queue_->RemoveMessage(what);
}

void Looper::LoopInternal() {
  pthread_mutex_lock(&variable_mutex_);
  if (looping_ || exiting_ || exited_) {
    pthread_mutex_unlock(&variable_mutex_);
    return;
  }
  looping_ = true;
  pthread_mutex_unlock(&variable_mutex_);

  for (;;) {
    Message *msg = Take();
    if (msg) {
      if (msg->target) {
        msg->target->DispatchMessage(msg);
      }
      delete msg;
    }

    pthread_mutex_lock(&variable_mutex_);
    if (exit_safely_) {
      if (exiting_ && message_queue_->Size() == 0) {
        pthread_mutex_unlock(&variable_mutex_);
        break;
      }
    } else {
      if (exiting_) {
        pthread_mutex_unlock(&variable_mutex_);
        break;
      }
    }
    pthread_mutex_unlock(&variable_mutex_);
  }

  int64_t time = TimeUtils::GetCurrentTimeUs();
  while (message_queue_->Size() > 0) {
    Message *msg = message_queue_->Take();
    if (msg) {
      delete msg;
    }
  }
  message_queue_->Clear();
  LOGI("Clear message_queue cost time=%lld us", (TimeUtils::GetCurrentTimeUs() - time));

  pthread_mutex_lock(&variable_mutex_);
  exiting_ = false;
  exited_ = true;
  looping_ = false;
  pthread_mutex_unlock(&variable_mutex_);
}

void Looper::EnqueueMessage(Message *msg) {
  /// TODO msg 模式, 可以放在队头, 也可以放在队尾
  message_queue_->Offer(msg);
}

Message * Looper::Take() {
  return message_queue_->Take();
}

/// ------------------------------------------------------------------

LooperManager *LooperManager::instance_ = new LooperManager();

LooperManager::LooperManager() {

}

LooperManager::~LooperManager() {

}

LooperManager * LooperManager::GetInstance() {
  return instance_;
}

Looper * LooperManager::Create(int64_t tid) {
  std::lock_guard<std::mutex> guard(looper_mutex_);
  auto it = looper_map_.find(tid);
  if (it == looper_map_.end()) {
    Looper *looper = new Looper();
    looper_map_[tid] = looper;
    return looper;
  }
  return nullptr;
}

Looper * LooperManager::Get(int64_t tid) {
  std::lock_guard<std::mutex> guard(looper_mutex_);
  auto it = looper_map_.find(tid);
  if (it == looper_map_.end()) {
    return nullptr;
  }
  return it->second;

}

void LooperManager::Remove(int64_t tid) {
  std::lock_guard<std::mutex> guard(looper_mutex_);
  auto it = looper_map_.find(tid);
  if (it != looper_map_.end()) {
    looper_map_.erase(it);
  }
}

int LooperManager::Size() {
  std::lock_guard<std::mutex> guard(looper_mutex_);
  return looper_map_.size();
}

}

message_queue.cc

代码语言:javascript复制
#include "message_queue.h"
#include "log.h"
#include <sstream>

namespace thread {

MessageQueue::MessageQueue()
  : is_destroyed_(false) {
  pthread_mutex_init(&queue_mutex_, nullptr);
  pthread_cond_init(&queue_cond_, nullptr);
}

MessageQueue::~MessageQueue() {
  LOGI("Enter");
  pthread_mutex_lock(&queue_mutex_);
  is_destroyed_ = true;
  pthread_mutex_unlock(&queue_mutex_);

  Clear();

  pthread_mutex_destroy(&queue_mutex_);
  pthread_cond_destroy(&queue_cond_);
  LOGI("Leave");
}

void MessageQueue::Offer(Message *msg) {
  pthread_mutex_lock(&queue_mutex_);
  if (is_destroyed_) {
    pthread_mutex_unlock(&queue_mutex_);
    return;
  }
  queue_.push_back(msg);
  pthread_cond_broadcast(&queue_cond_);
  pthread_mutex_unlock(&queue_mutex_);
}

void MessageQueue::OfferAtFront(Message *msg) {
  pthread_mutex_lock(&queue_mutex_);
  if (is_destroyed_) {
    pthread_mutex_unlock(&queue_mutex_);
    return;
  }
  queue_.push_front(msg);
  pthread_cond_broadcast(&queue_cond_);
  pthread_mutex_unlock(&queue_mutex_);
}

Message *MessageQueue::Take() {
  pthread_mutex_lock(&queue_mutex_);
  if (is_destroyed_) {
    pthread_mutex_unlock(&queue_mutex_);
    return nullptr;
  }
  if (Size() <= 0) {
    pthread_cond_wait(&queue_cond_, &queue_mutex_);
  }
  if (queue_.empty()) {
    pthread_mutex_unlock(&queue_mutex_);
    return nullptr;
  }
  Message *msg = queue_.front();
  queue_.pop_front();
  pthread_mutex_unlock(&queue_mutex_);
  return msg;
}

void MessageQueue::Notify() {
  pthread_mutex_lock(&queue_mutex_);
  pthread_cond_broadcast(&queue_cond_);
  pthread_mutex_unlock(&queue_mutex_);
}

int MessageQueue::Size() {
  return queue_.size();
}

bool MessageQueue::IsEmpty() {
  return queue_.empty();
}

void MessageQueue::Clear() {
  Notify();
  if (queue_.empty()) {
    return;
  }
  pthread_mutex_lock(&queue_mutex_);
  while (!queue_.empty()) {
    Message *msg = queue_.front();
    queue_.pop_front();
    if (msg) {
      delete msg;
    }
  }
  queue_.clear();
  pthread_mutex_unlock(&queue_mutex_);
}

void MessageQueue::RemoveMessage(int what) {
  pthread_mutex_lock(&queue_mutex_);
  if (is_destroyed_) {
    pthread_mutex_unlock(&queue_mutex_);
    return;
  }
  std::list<Message *>::iterator it = queue_.begin();
  while (it != queue_.end()) {
    Message *msg = *it;
    if (what == msg->what) {
      delete msg;
      it = queue_.erase(it);
      continue;
    }
      it;
  }
  pthread_mutex_unlock(&queue_mutex_);
}

void MessageQueue::Dump() {
  std::ostringstream os;
  std::list<Message *>::iterator it = queue_.begin();
  while (it != queue_.end()) {
    Message *msg = *it;
    os << msg->what<<"n";
      it;
  }
  LOGI("Result=%s", os.str().c_str());
}

}

message.cc

代码语言:javascript复制
#include "message.h"

#include "log.h"

namespace thread {

Message::Message()
  : what(-1)
  , arg1(-1)
  , arg2(-1)
  , arg3(-1)
  , arg4(-1)
  , arg5(-1)
  , arg6(-1)
  , arg7(-1)
  , obj1(nullptr)
  , obj2(nullptr)
  , target(nullptr) {

}

Message::~Message() {
  /**
   * obj1
   * obj2
   * target
   * 不应该在Message析构函数中销毁, 应该由开发者决定是否销毁
   */
}

}

C 消息队列怎么使用

初始化:

代码语言:javascript复制
std::string name("AV Message Queue");
thread::HandlerThread *handler_thread = thread::HandlerThread::Create(name);
thread::Handler *handler = new thread::Handler(handler_thread->GetLooper(), this);

同时保证当前的类实现thread::HandlerCallback,实现函数HandleMessage(thread::Message *msg)

不要忘记在析构函数中将handler_thread和handler指针销毁。

发送消息:

代码语言:javascript复制
thread::Message *msg = new thread::Message();
msg->what = MSG_WHAT;
msg->obj1 = XXXX;
handler->SendMessage(msg);

遗留问题

代码语言:javascript复制
同步等待的消息处理
延时消息处理

延时消息处理需要使用链表的结果,目前我们使用的双端队列,不过目前音视频SDK已经够用了,但是如果需要延时处理的话,你愿意来尝试一下吗?

0 人点赞