文章目录
- 获取线程ID
- 事件循环 EventLoop
获取线程ID
每一个线程都有一个EventLoop,每个loop里面都会有很多的channel,每个channel的任务都要在自己的线程中完成。 为了管理这些线程,设置了一份获取线程ID的代码,辅助管理。
代码语言:javascript复制#pragma once
#include<unistd.h>
#include<sys/syscall.h>
namespace CurrentThread{
//__thread:这个变量虽然是一个全局变量,但是加上这个修饰的话,其在每一个线程中都会保有一份缓存,互不干扰
extern __thread int t_CachedTid;
extern __thread char t_TidString[32];
extern __thread int t_TidStringLength;
extern __thread const char* t_ThreadNum;
void cacheTid(); //做一份缓存
inline int tid(){
if (__builtin_expect(t_CachedTid == 0, 0)){ //底层优化,不再赘述
cacheTid();
}
return t_CachedTid;
}
// for logging
inline const char* tidString() {
return t_TidString;
}
// for logging
inline int tidStringLength() {
return t_TidStringLength;
}
inline const char* name(){
return t_ThreadNum;
}
}
代码语言:javascript复制#include "currenthread.hpp"
#include<stdio.h>
namespace CurrentThread{
__thread int t_cachedTid = 0;
__thread char t_tidString[32];
__thread int t_tidStringLength = 6;
__thread const char* t_threadName = "unknown";
void cacheTid(){
if (t_CachedTid == 0){
t_CachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
t_TidStringLength = snprintf(t_tidString, sizeof t_tidString, "] ", t_cachedTid);
}
}
}
事件循环 EventLoop
太恐怖了,cc文件莫名丢失,赶紧来备份一下。。。
代码语言:javascript复制#pragma once
#include "nocopyable.hpp"
#include "timestamp.hpp"
#include "currenthread.hpp"
#include <atomic>
#include <functional>
#include <memory>
#include <sys/eventfd.h> //去了解一下
#include <fcntl.h>
#include <mutex>
#include <vector>
class Channel;
class Poller;
class EventLoop:public nocpoyable{
public:
//typedef std::function<void()> Functor;
using Functor = std::function<void()>;
EventLoop();
~EventLoop();
void loop();
void quit();
timestamp pollReturnTime() const { return pollReturnTime_; }
void runInLoop(Functor cb); //在当前loop中运行
void queueInLoop(Functor cb);//放到队列中,唤醒loop所在的线程去执行
void wakeup(); //唤醒loop所在的线程
//Poller的方法
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
private:
void handleRead(); // waked up
void doPendingFunctors(); //执行回调
using ChannelList = std::vector<Channel*>;
std::atomic_bool looping_; //原子操作的bool值
std::atomic_bool quit_; //退出loop循环
std::atomic_bool callingPendingFunctors_; //是否有需要回调的操作
const pid_t threadId_;
timestamp pollReturnTime_; //返回发生事件的时间点
std::unique_ptr<Poller> poller_;
int wakeupFd_;
//当mainloop获取一个新用户的channel时,通过轮询算法选择一个subloop,
//通过该成员唤醒subloop
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
Channel* currentActiveChannel_;
std::mutex mutex_;
std::vector<Functor> pendingFunctors_; //存储loop需要的所有回调操作
};
代码语言:javascript复制#include "eventloop.hpp"
#include "logger.hpp"
#include "poller.hpp"
#include "channel.hpp"
#include <mutex>
//防止一个线程创建多个EventLoop
__thread EventLoop *t_loopInThisThread = nullptr;
//定义默认的poller超时时间
const int kPollTimers = 10000;
//通过轮询的方式唤醒channel
int createEventfd(){
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0){
LOG_ERROR("Failed in eventfd%dn", errno);
}
return evtfd;
}
EventLoop::EventLoop()
: looping_(false),
quit_(false),
callingPendingFunctors_(false),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(nullptr)
{
LOG_DEBUG("EventLoop created %p in threadn", threadId_);
if (t_loopInThisThread){
LOG_FATAL("Another EventLoop %p exists in this thread %dn", t_loopInThisThread, threadId_);
}
else{
t_loopInThisThread = this;
}
//设置wakeup事件类型及发生事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
//每一个EventLoop都将监听wakeupchannel的EPOLL读事件
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop(){
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
void EventLoop::loop(){
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_INFO("EventLoop %p start looping n",this);
while (!quit_){
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimers, &activeChannels_);
for (Channel *channel : activeChannels_){
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
//执行当前EventLoop事件循环需要的事件操作
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop loopingn",this);
looping_ = false;
}
void EventLoop::quit(){
quit_ = true;
if (!isInLoopThread()){
wakeup();
}
}
void EventLoop::runInLoop(Functor cb){
if (isInLoopThread()){
cb();
}
else{
queueInLoop(std::move(cb));
}
}
void EventLoop::queueInLoop(Functor cb){
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_){
wakeup();
}
}
void EventLoop::updateChannel(Channel *channel){
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel *channel){
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel *channel){
return poller_->hasChannel(channel);
}
void EventLoop::wakeup(){
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if (n != sizeof one){
LOG_ERROR("EventLoop::wakeup() writes %d bytes instead of 8 n",n);
}
}
void EventLoop::handleRead(){
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if (n != sizeof one){
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8n", n);
}
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor &functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}