bitcoin multithread verify script

2022-08-20 10:58:00 浏览数 (1)

多线程脚本检查启动

代码语言:javascript复制
bool AppInitMain(Config &config, boost::thread_group &threadGroup, CScheduler &scheduler) {
    ...
    // 脚本检查线程; 启动多个脚本检查线程
    if (nScriptCheckThreads) {
        for (int i = 0; i < nScriptCheckThreads - 1; i  ) {
            threadGroup.create_thread(&ThreadScriptCheck);
        }
    }
    ...
}
static CCheckQueue<CScriptCheck> scriptcheckqueue(128);     

void ThreadScriptCheck() {
    RenameThread("bitcoin-scriptch");
    scriptcheckqueue.Thread();  
}

此处使用了boost的线程库,在绑定的线程函数ThreadScriptCheck中,调用一个全局状态的任务队列scriptcheckqueue;每个线程都去该队列中去任务,当队列中无任务可执行时,线程被条件变量阻塞。

任务队列

代码语言:javascript复制
//模板类,执行的验证任务由T标识,T都必须提供一个重载的operator()方法,并且反回一个bool。
//默认为主线程push 批量任务到队列中,其他的工作线程去处理这些任务,当主线程push完任务后,也去处理这些任务,直到任务队列全部处理完毕。
template <typename T> class CCheckQueue {
private:
    // 互斥锁保护内部的状态
    boost::mutex mutex;
    // 在没有工作时,工作线程阻塞条件变量。
    boost::condition_variable condWorker;
    // 在没有工作时,master线程阻塞条件变量。
    boost::condition_variable condMaster;
    // 要处理元素的队列。
    std::vector<T> queue;
    // 空闲的工作线程数量(包含主线程)
    int nIdle;
    // 总的工作线程的数量,包含主线程
    int nTotal;
    // 临时评估结果
    bool fAllOk;
    // 还有多少验证任务没有完成。包括不再排队,但仍在工作线程自己的批次中的任务数量。
    unsigned int nTodo;
    // 是否需要退出。
    bool fQuit;
    // 每个批次最大的元素处理数量
    unsigned int nBatchSize;
    
    //内部函数,队列中的任务在此处将被全部处理。
    bool Loop(bool fMaster = false);
public:
    //! Create a new check queue
    CCheckQueue(unsigned int nBatchSizeIn)
            : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false),
              nBatchSize(nBatchSizeIn) {}

    //! Worker thread;此处为工作线程进行调用,
    void Thread() { Loop(); }
    //在代码中主要是主线程进行调用,等待任务全部处理完毕,返回处理结果
    bool Wait() { return Loop(true); }
    //! 给类的内部队列批量添加任务;本次操作受锁保护;并更新所有的状态。
    void Add(std::vector<T> &vChecks) {
        boost::unique_lock<boost::mutex> lock(mutex);
        //将任务添加至 内部队列中
        for (T &check : vChecks) {
            queue.push_back(std::move(check));
        }
        // 更新未完成的任务数量
        nTodo  = vChecks.size();
        //如果刚添加的任务数量为1,只唤醒一个工作线程去处理;否则,唤醒全部工作线程。
        if (vChecks.size() == 1) {
            condWorker.notify_one();
        } else if (vChecks.size() > 1) {
            condWorker.notify_all();
        }
    }
    // 任务处理队列是否处于休眠状态
    bool IsIdle() {
        boost::unique_lock<boost::mutex> lock(mutex);
        return (nTotal == nIdle && nTodo == 0 && fAllOk == true);
    }
    ~CCheckQueue() {}
}

//fMaster : 标识是否为主线程在调用
bool CCheckQueue::Loop(bool fMaster = false){
    // 根据参数,选择条件变量
    boost::condition_variable &cond = fMaster ? condMaster : condWorker;

    std::vector<T> vChecks;     //临时任务队列
    vChecks.reserve(nBatchSize);
    unsigned int nNow = 0;      
    bool fOk = true;
    do {
        {
            boost::unique_lock<boost::mutex> lock(mutex);       //自动管理资源锁;
            // first do the clean-up of the previous loop run (allowing us
            // to do it in the same critsect)  对上一次的运行状态做清理。
            if (nNow) {
                fAllOk &= fOk;
                nTodo -= nNow;
                if (nTodo == 0 && !fMaster)
                    // We processed the last element; inform the master it
                    // can exit and return the result  处理最后一个元素,通知master它可以退出,并且返回结果
                    condMaster.notify_one();
            } else {
                // nNow == 0,标识该线程第一次运行,即线程的数量又增加了一个。
                nTotal  ;
            }
           
            // 该对象的任务队列为空时,进入下列条件;
            // 下面处理分为两种情况:1. 任务全部完成后,将主线程/或退出状态时,退出主线程或所有的线程。
            //  2. 任务全部完成后,将子线程
            while (queue.empty()) {
                // 当验证任务为0,且需要退出时,
                if ((fMaster || fQuit) && nTodo == 0) {
                    nTotal--;
                    bool fRet = fAllOk;     //fAllOk : 最新的临时评估结果。对该值做缓存,然后退出。
                    // reset the status for new work later
                    if (fMaster) fAllOk = true;
                    // return the current status        //返回当前的状态。
                    return fRet;
                }
                nIdle  ;
                cond.wait(lock); // wait   此处配合条件变量使用。进行线程间的同步;
                nIdle--;
            }
        
            // 获取当前每个线程每个任务循环时处理的任务数量;
            nNow = std::max(
                1U, std::min(nBatchSize, (unsigned int)queue.size() /
                                             (nTotal   nIdle   1)));
            //从类的任务队列中 向临时队列中添加任务。
            vChecks.resize(nNow);
            for (unsigned int i = 0; i < nNow; i  ) {
                // 想让锁的时间尽可能的短,所以采用这种方法(move)来从 类的队列中拿到任务,而不是采用拷贝的方式。
                vChecks[i].swap(queue.back());
                queue.pop_back();       //将放到局部队列中的任务清除
            }
            fOk = fAllOk;
        }
        // execute work; 执行本线程刚分到的工作。
        for (T &check : vChecks) {
            if (fOk) fOk = check();
        }
        // 执行完后,清空临时任务的集合。继续下次循环。
        vChecks.clear();
    } while (true);
    
}

上述是队列的实现:主要的任务处理是在Loop()函数中; 该队列会进行两种调用,来处理队列中的任务: >* 向添加任务后:自动唤醒阻塞的工作线程去处理添加的任务;细节请看:void Add(std::vector<T> &vChecks) >* 主线程添加完任务后,调用bool Wait(),也去处理队列中的任务,队列中的全部任务处理完后,主线程退出。

采用RAII机制去操作任务队列

代码语言:javascript复制
template <typename T> class CCheckQueueControl {
private:
    CCheckQueue<T> *pqueue;
    bool fDone;

public:
    CCheckQueueControl(CCheckQueue<T> *pqueueIn)
        : pqueue(pqueueIn), fDone(false) {
        // 用来构建该对象的任务队列只能是nil, 或者队列中无任务。
        // 因为创建的该对象在析构时会调用任务队列的wait()方法去处理完队列中所有的任务,然后退出:
        if (pqueue != nullptr) {
            bool isIdle = pqueue->IsIdle();     //获取该队列是否空闲
            assert(isIdle);
        }
    }
    
    //处理完队列中的所有任务后,该方法退出,并返回这些任务的处理结果
    bool Wait() {
        if (pqueue == nullptr) return true;
        bool fRet = pqueue->Wait();     //执行完所有的任务后,
        fDone = true;
        return fRet;
    }

    // 向 CCheckQueue 中添加任务;唤醒子线程去处理。
    void Add(std::vector<T> &vChecks) {
        if (pqueue != nullptr) pqueue->Add(vChecks);
    }
    // 对象析构时,调用wait()方法保证了该队列中的所有任务都被处理。
    ~CCheckQueueControl() {
        if (!fDone) Wait();
    }
};

该类主要是用来管理 CCheckQueue对象;采用RAII机制,保证每次析构该类的对象时,CCheckQueue中的任务队列被全部处理。

代码语言:javascript复制
//将该区块链接到当前激活链上,并更新UTXO集合。
// block(in):将要链接到激活链上的区块(带有完整数据); pindex(in):该链接块对应的索引;
static bool ConnectBlock(const Config &config, const CBlock &block, CValidationState &state, CBlockIndex *pindex,
    CCoinsViewCache &view, const CChainParams &chainparams, bool fJustCheck = false) {
    ...
    
    CCheckQueueControl<CScriptCheck> control(fScriptChecks ? &scriptcheckqueue : nullptr);
        ...
    for (size_t i = 0; i < block.vtx.size(); i  ) {
        ...
        if (!tx.IsCoinBase()) {
            Amount fee = view.GetValueIn(tx) - tx.GetValueOut();
            nFees  = fee.GetSatoshis();

            // Don't cache results if we're actually connecting blocks (still
            // consult the cache, though).
            bool fCacheResults = fJustCheck;

            std::vector<CScriptCheck> vChecks;
            // 检查交易交易,并将的交易每个输入构造成对应的可检查对象`CScriptCheck`,放入临时集合中,然后添加进任务队列中。
            if (!CheckInputs(tx, state, view, fScriptChecks, flags,
                             fCacheResults, fCacheResults,
                             PrecomputedTransactionData(tx), &vChecks)) {
                return error("ConnectBlock(): CheckInputs on %s failed with %s",
                             tx.GetId().ToString(), FormatStateMessage(state));
            }

            control.Add(vChecks);       //向验证线程中添加任务;添加完后,此时其他的任务线程就开始执行。
        }
        ...
    }
    
    ...
}

在该方法中:使用了全局对象scriptcheckqueue去构造了一个临时的管理对象,并通过该管理对象来操作全局任务队列:添加任务,执行任务;当该临时的管理对象析构时,会调用wait()方法,加入任务处理,处理完所有任务后,该对象析构完成。

CScriptCheck(根据每个交易输入构造的检查任务)

代码语言:javascript复制
class CScriptCheck {
private:
    CScript scriptPubKey;       //锁定脚本(即该验证交易的某个引用输出对应的锁定脚本)
    Amount amount;              //上述锁定脚本对应 的金额(即花费的UTXO的金额)
    const CTransaction *ptxTo;  //正在花费的交易,即要检查的交易
    unsigned int nIn;           //要检查该交易的第几个输入;
    uint32_t nFlags;            //检查标识
    bool cacheStore;
    ScriptError error;          //验证出错的原因
    PrecomputedTransactionData txdata;  

public:
    CScriptCheck()
        : amount(0), ptxTo(0), nIn(0), nFlags(0), cacheStore(false),
          error(SCRIPT_ERR_UNKNOWN_ERROR), txdata() {}

    CScriptCheck(const CScript &scriptPubKeyIn, const Amount amountIn,
                 const CTransaction &txToIn, unsigned int nInIn,
                 uint32_t nFlagsIn, bool cacheIn,
                 const PrecomputedTransactionData &txdataIn)
        : scriptPubKey(scriptPubKeyIn), amount(amountIn), ptxTo(&txToIn),
          nIn(nInIn), nFlags(nFlagsIn), cacheStore(cacheIn),
          error(SCRIPT_ERR_UNKNOWN_ERROR), txdata(txdataIn) {}

    bool operator()();    //此处重载了()运算符,执行脚本检查操作;详情见下集:脚本验证

    // 采用这种方式对新对象进行赋值,避免拷贝赋值,节省时间。
    void swap(CScriptCheck &check) {
        scriptPubKey.swap(check.scriptPubKey);
        std::swap(ptxTo, check.ptxTo);
        std::swap(amount, check.amount);
        std::swap(nIn, check.nIn);
        std::swap(nFlags, check.nFlags);
        std::swap(cacheStore, check.cacheStore);
        std::swap(error, check.error);
        std::swap(txdata, check.txdata);
    }

    ScriptError GetScriptError() const { return error; }
};

本文由 Copernicus团队 姚永芯写作,转载无需授权。

0 人点赞