缩略muduo库(4):事件循环 EventLoop

2021-10-09 15:33:33 浏览数 (1)

文章目录

    • 获取线程ID
    • 事件循环 EventLoop

获取线程ID

每一个线程都有一个EventLoop,每个loop里面都会有很多的channel,每个channel的任务都要在自己的线程中完成。 为了管理这些线程,设置了一份获取线程ID的代码,辅助管理。

代码语言:javascript复制
#pragma once

#include<unistd.h>
#include<sys/syscall.h>

namespace CurrentThread{
    //__thread:这个变量虽然是一个全局变量,但是加上这个修饰的话,其在每一个线程中都会保有一份缓存,互不干扰
    extern __thread int t_CachedTid;   
    extern __thread char t_TidString[32]; 
    extern __thread int t_TidStringLength; 
    extern __thread const char* t_ThreadNum;

    void cacheTid();    //做一份缓存

    inline int tid(){
        if (__builtin_expect(t_CachedTid == 0, 0)){ //底层优化,不再赘述
        cacheTid();
        }
        return t_CachedTid;
    }


    // for logging
    inline const char* tidString() {
        return t_TidString;
    }

    // for logging
    inline int tidStringLength() {
        return t_TidStringLength;
    }

    inline const char* name(){
        return t_ThreadNum;
    }
}
代码语言:javascript复制
#include "currenthread.hpp"

#include<stdio.h>

namespace CurrentThread{
    __thread int t_cachedTid = 0;
    __thread char t_tidString[32];
    __thread int t_tidStringLength = 6;
    __thread const char* t_threadName = "unknown";

    void cacheTid(){
        if (t_CachedTid == 0){
            t_CachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
            t_TidStringLength = snprintf(t_tidString, sizeof t_tidString, "] ", t_cachedTid);
        }
    }
}

事件循环 EventLoop

太恐怖了,cc文件莫名丢失,赶紧来备份一下。。。

代码语言:javascript复制
#pragma once

#include "nocopyable.hpp"
#include "timestamp.hpp"
#include "currenthread.hpp"

#include <atomic>
#include <functional>
#include <memory>
#include <sys/eventfd.h> //去了解一下
#include <fcntl.h>
#include <mutex>
#include <vector>

class Channel;
class Poller;

class EventLoop:public nocpoyable{
public:
    //typedef std::function<void()> Functor;
    using Functor = std::function<void()>;

    EventLoop();
    ~EventLoop();  
 
    void loop();
    void quit();

    timestamp pollReturnTime() const { return pollReturnTime_; }

    void runInLoop(Functor cb); //在当前loop中运行
    void queueInLoop(Functor cb);//放到队列中,唤醒loop所在的线程去执行

    void wakeup();  //唤醒loop所在的线程

    //Poller的方法
    void updateChannel(Channel* channel);
    void removeChannel(Channel* channel);
    bool hasChannel(Channel* channel);

    bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
    
private:

    void handleRead();  // waked up
    void doPendingFunctors();   //执行回调

    using ChannelList = std::vector<Channel*>;
    std::atomic_bool looping_;  //原子操作的bool值
    std::atomic_bool quit_; //退出loop循环
    std::atomic_bool callingPendingFunctors_; //是否有需要回调的操作
    const pid_t threadId_;
    timestamp pollReturnTime_;  //返回发生事件的时间点

    std::unique_ptr<Poller> poller_;
  
    int wakeupFd_; 
    //当mainloop获取一个新用户的channel时,通过轮询算法选择一个subloop,
    //通过该成员唤醒subloop

    std::unique_ptr<Channel> wakeupChannel_;

    ChannelList activeChannels_;
    Channel* currentActiveChannel_;

    std::mutex mutex_;
    std::vector<Functor> pendingFunctors_;   //存储loop需要的所有回调操作
};
代码语言:javascript复制
#include "eventloop.hpp"
#include "logger.hpp"
#include "poller.hpp"
#include "channel.hpp"

#include <mutex>

//防止一个线程创建多个EventLoop
__thread EventLoop *t_loopInThisThread = nullptr;

//定义默认的poller超时时间
const int kPollTimers = 10000;

//通过轮询的方式唤醒channel
int createEventfd(){
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0){
        LOG_ERROR("Failed in eventfd%dn", errno);
    }
    return evtfd;
}

EventLoop::EventLoop()
    : looping_(false),
      quit_(false),
      callingPendingFunctors_(false),
      threadId_(CurrentThread::tid()),
      poller_(Poller::newDefaultPoller(this)),
      wakeupFd_(createEventfd()),
      wakeupChannel_(new Channel(this, wakeupFd_)),
      currentActiveChannel_(nullptr)
{
    LOG_DEBUG("EventLoop created %p in threadn", threadId_);
    if (t_loopInThisThread){
        LOG_FATAL("Another EventLoop %p exists in this thread %dn", t_loopInThisThread, threadId_);
    }
    else{
        t_loopInThisThread = this;
    }

    //设置wakeup事件类型及发生事件后的回调操作
    wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));

    //每一个EventLoop都将监听wakeupchannel的EPOLL读事件
    wakeupChannel_->enableReading();
}

EventLoop::~EventLoop(){
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = nullptr;
}

void EventLoop::loop(){
    looping_ = true;
    quit_ = false; // FIXME: what if someone calls quit() before loop() ?
    LOG_INFO("EventLoop %p start looping n",this);

    while (!quit_){
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimers, &activeChannels_);

        for (Channel *channel : activeChannels_){
            currentActiveChannel_ = channel;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;

        //执行当前EventLoop事件循环需要的事件操作
        doPendingFunctors();
    }

    LOG_INFO("EventLoop %p stop loopingn",this);
    looping_ = false;
}

void EventLoop::quit(){
    quit_ = true;
   
    if (!isInLoopThread()){
        wakeup();
    }
}

void EventLoop::runInLoop(Functor cb){
    if (isInLoopThread()){
        cb();
    }
    else{
        queueInLoop(std::move(cb));
    }
}

void EventLoop::queueInLoop(Functor cb){
    
    {
        std::unique_lock<std::mutex> lock(mutex_);
        pendingFunctors_.push_back(std::move(cb));
    }

    if (!isInLoopThread() || callingPendingFunctors_){
        wakeup();
    }
}

void EventLoop::updateChannel(Channel *channel){
    poller_->updateChannel(channel);
}

void EventLoop::removeChannel(Channel *channel){
    poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel *channel){
    return poller_->hasChannel(channel);
}

void EventLoop::wakeup(){
    uint64_t one = 1;
    ssize_t n = write(wakeupFd_, &one, sizeof one);
    if (n != sizeof one){
        LOG_ERROR("EventLoop::wakeup() writes %d bytes instead of 8 n",n);
    }
}

void EventLoop::handleRead(){
    uint64_t one = 1;
    ssize_t n = read(wakeupFd_, &one, sizeof one);
    if (n != sizeof one){
        LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8n", n);
    }
}

void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (const Functor &functor : functors)
    {
        functor();
    }
    callingPendingFunctors_ = false;
}

0 人点赞