Impala:Impalad impala-server beeswax 调用过程及关系图

2022-03-07 20:38:13 浏览数 (1)

一 UML

UMLUML

二 本文主要讲解过程

  1. main(daemain-main.cc)
  2. ImpaladMain/StatestoredMain/CatalogdMain/AdmissiondMain (impalad-main.cc)
    1. ExecEnv.init()
    2. impala_server = new ImpalaServer(exec_env)
    3. impala_server->Start((FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port, FLAGS_external_fe_port)
    4. impaa_server->Join
代码语言:c 复制
        // Initialize the client servers.
        shared_ptr<ImpalaServer> handler = shared_from_this();
        if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
        {
            //ImpalaServiceProcessor
            shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
            //ImpalaServiceProcessor
            shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
            
            //设置实践句柄
            beeswax_processor->setEventHandler(event_handler);
            
            //BEESWAX_SERVER_NAME  beeswax_processor  beeswax_port ThriftServerBuilder
            ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);

            if (IsExternalTlsConfigured())
            {
                LOG(INFO) << "Enabling SSL for Beeswax";
                builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
                    .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
                    .ssl_version(ssl_version)
                    .cipher_list(FLAGS_ssl_cipher_list);
            }

            ThriftServer * server;
            //build server 
            //这是属于啥语法 
            RETURN_IF_ERROR(builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
                                .metrics(exec_env_->metrics())
                                .max_concurrent_connections(FLAGS_fe_service_threads)
                                .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
                                .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
                                .Build(&server));
            
            //beeswax_server_ 
            beeswax_server_.reset(server);
            beeswax_server_->SetConnectionHandler(this);
        }


    if (beeswax_server_.get())
    {
        RETURN_IF_ERROR(beeswax_server_->Start());
        LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
    }

三 基础关系篇

1 BeeswaxServiceIf 系列

代码语言:javascript复制
class ImpalaServiceIf : virtual public  ::beeswax::BeeswaxServiceIf

class ImpalaServer : public ImpalaServiceIf,
                     public ImpalaHiveServer2ServiceIf,
                     public ThriftServer::ConnectionHandlerIf,
                     public std::enable_shared_from_this<ImpalaServer>,
                     public CacheLineAligned

2 TProcessorEventHandler

代码语言:c 复制
namespace apache {
namespace thrift {

/**
 * Virtual interface class that can handle events from the processor. To
 * use this you should subclass it and implement the methods that you care
 * about. Your subclass can also store local data that you may care about,
 * such as additional "arguments" to these methods (stored in the object
 * instance's state).
 */
class TProcessorEventHandler {
public:
  virtual ~TProcessorEventHandler() {}

  /**
   * Called before calling other callback methods.
   * Expected to return some sort of context object.
   * The return value is passed to all other callbacks
   * for that function invocation.
   */
  virtual void* getContext(const char* fn_name, void* serverContext) {
    (void)fn_name;
    (void)serverContext;
    return NULL;
  }

  /**
   * Expected to free resources associated with a context.
   */
  virtual void freeContext(void* ctx, const char* fn_name) {
    (void)ctx;
    (void)fn_name;
  }

  /**
   * Called before reading arguments.
   */
  virtual void preRead(void* ctx, const char* fn_name) {
    (void)ctx;
    (void)fn_name;
  }

  /**
   * Called between reading arguments and calling the handler.
   */
  virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
    (void)ctx;
    (void)fn_name;
    (void)bytes;
  }

  /**
   * Called between calling the handler and writing the response.
   */
  virtual void preWrite(void* ctx, const char* fn_name) {
    (void)ctx;
    (void)fn_name;
  }

  /**
   * Called after writing the response.
   */
  virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
    (void)ctx;
    (void)fn_name;
    (void)bytes;
  }
}
}
 
  
  
namespace impala
{
  
/// An RpcEventHandler is called every time an Rpc is started and completed. There is at
/// most one RpcEventHandler per ThriftServer. When an Rpc is started, getContext() creates
/// an InvocationContext recording the current time and other metadata for that invocation.
class RpcEventHandler : public apache::thrift::TProcessorEventHandler
{
public:
    RpcEventHandler(const std::string & server_name, MetricGroup * metrics);

}
}

3 TProcessor 系列

代码语言:c 复制
namespace apache {
namespace thrift {
/**
 * A processor is a generic object that acts upon two streams of data, one
 * an input and the other an output. The definition of this object is loose,
 * though the typical case is for some sort of server that either generates
 * responses to an input stream or forwards data from one pipe onto another.
 *
 */
class TProcessor {
public:
  virtual ~TProcessor() {}

  virtual bool process(stdcxx::shared_ptr<protocol::TProtocol> in,
                       stdcxx::shared_ptr<protocol::TProtocol> out,
                       void* connectionContext) = 0;

  bool process(stdcxx::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) {
    return process(io, io, connectionContext);
  }

  stdcxx::shared_ptr<TProcessorEventHandler> getEventHandler() const { return eventHandler_; }

  void setEventHandler(stdcxx::shared_ptr<TProcessorEventHandler> eventHandler) {
    eventHandler_ = eventHandler;
  }

protected:
  TProcessor() {}

  stdcxx::shared_ptr<TProcessorEventHandler> eventHandler_;
};
}


namespace beeswax 
{
    stdcxx::shared_ptr<TProcessorEventHandler> eventHandler_;

  
  
    //这里只是作了一层抽象
    template <class Protocol_>
    class TDispatchProcessorT : public TProcessor 
    {
        
    }
      
    template <class Protocol_>
    class BeeswaxServiceProcessorT : public ::apache::thrift::TDispatchProcessorT<Protocol_>
    {
    protected:
        ::apache::thrift::stdcxx::shared_ptr<BeeswaxServiceIf> iface_;
        BeeswaxServiceProcessorT(::apache::thrift::stdcxx::shared_ptr<BeeswaxServiceIf> iface) :
        iface_(iface) {}
    }
}

namespace apache {
namespace thrift {
namespace protocol {

  class TProtocol
  {
    virtual uint32_t writeBool_virt(const bool value) = 0;

    virtual uint32_t writeByte_virt(const int8_t byte) = 0;

    virtual uint32_t writeI16_virt(const int16_t i16) = 0;

    virtual uint32_t writeI32_virt(const int32_t i32) = 0;

  }
  
  class TDummyProtocol : public TProtocol {};
  }
  }
}


namespace impala
{
  template <class Protocol_>
  class ImpalaServiceProcessorT : public  ::beeswax::BeeswaxServiceProcessorT<Protocol_>
  {
   protected:
    ::apache::thrift::stdcxx::shared_ptr<ImpalaServiceIf> iface_;
    ImpalaServiceProcessorT(::apache::thrift::stdcxx::shared_ptr<ImpalaServiceIf> iface) :
       ::beeswax::BeeswaxServiceProcessorT<Protocol_>(iface),
      iface_(iface)
  } 
  
  typedef ImpalaServiceProcessorT< ::apache::thrift::protocol::TDummyProtocol > ImpalaServiceProcessor;

}

4 TServer

代码语言:c 复制
namespace apache {
namespace thrift {
namespace concurrency {
  class Runnable {
    public:
      virtual ~Runnable(){};
      virtual void run() = 0;
      virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
      virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }

    private:
      stdcxx::weak_ptr<Thread> thread_;
	};
}
  
  
namespace apache {
namespace thrift {
namespace server {

class TServer : public concurrency::Runnable {
  //Contain TProcessor  TServerTransport  TTransportFactory  TProtocolFactory
  TServer(const stdcxx::shared_ptr<TProcessor>& processor,
          const stdcxx::shared_ptr<TServerTransport>& serverTransport,
          const stdcxx::shared_ptr<TTransportFactory>& transportFactory,
          const stdcxx::shared_ptr<TProtocolFactory>& protocolFactory)
    : processorFactory_(new TSingletonProcessorFactory(processor)),
      serverTransport_(serverTransport),
      inputTransportFactory_(transportFactory),
      outputTransportFactory_(transportFactory),
      inputProtocolFactory_(protocolFactory),
      outputProtocolFactory_(protocolFactory) {}
  
  
public:
  virtual ~TServer() {}
  virtual void serve() = 0;
  virtual void stop() {}
  // Allows running the server as a Runnable thread
  virtual void run() { serve(); }
}
 /**
 * In TAcceptQueueServer, the main server thread calls accept() and then immediately
 * places the returned TTransport on a queue to be processed by a separate thread,
 * asynchronously.
 *
 * This helps solve IMPALA-4135, where connections were timing out while waiting in the
 * OS accept queue, by ensuring that accept() is called as quickly as possible.
 */
class TAcceptQueueServer : public TServer
{
  void serve() override;
}
  
class TAcceptQueueServer::Task : public Runnable  
{
  void run() override
}
}

5 ThreadPool

代码语言:c 复制
namespace apache {
namespace thrift {
namespace transport {
  /**
 * Generic interface for a method of transporting data. A TTransport may be
 * capable of either reading or writing, but not necessarily both.
 *
 */
class TTransport {
protected:
  /**
   * Simple constructor.
   */
  TTransport() {}
public:
  virtual bool isOpen() { return false; }
    virtual void close() {
    throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
  }

  uint32_t read(uint8_t* buf, uint32_t len) {
    T_VIRTUAL_CALL();
    return read_virt(buf, len);
  }
  
  void write(const uint8_t* buf, uint32_t len) {
    T_VIRTUAL_CALL();
    write_virt(buf, len);
  }
}


namespace apache
{
namespace thrift
{
    namespace server
    {
        using apache::thrift::TProcessor;
        using apache::thrift::concurrency::Monitor;
        using apache::thrift::concurrency::ThreadFactory;
        using apache::thrift::transport::TServerTransport;
        using apache::thrift::transport::TTransportFactory;

        struct TAcceptQueueEntry
        {
            std::shared_ptr<TTransport> client_; //Client 封装
            int64_t expiration_time_ = 0LL;
        };
    }
}


namespace impala
{
/// Simple threadpool which processes items (of type T) in parallel which were placed on a
/// blocking queue by Offer(). Each item is processed by a single user-supplied method.
template <typename T>
class ThreadPool : public CacheLineAligned
{
public:
    /// Signature of a work-processing function. Takes the integer id of the thread which is
    /// calling it (ids run from 0 to num_threads - 1) and a reference to the item to
    /// process.
    /// Lambda 表达式 存储 所有 正在WorkFunction 正在处理的请求
    typedef boost::function<void(int thread_id, const T & workitem)> WorkFunction;

    /// Creates a new thread pool without starting any threads. Code must call
    /// Init() on this thread pool before any calls to Offer().
    ///  -- num_threads: how many threads are part of this pool
    ///  -- queue_size: the maximum size of the queue on which work items are offered. If the
    ///     queue exceeds this size, subsequent calls to Offer will block until there is
    ///     capacity available.
    ///  -- work_function: the function to run every time an item is consumed from the queue
    ///  -- fault_injection_eligible - If set to true, allow fault injection at this
    ///     callsite (see thread_creation_fault_injection). If set to false, fault
    ///     injection is diabled at this callsite. Thread creation sites that crash
    ///     Impala or abort startup must have this set to false.
    ThreadPool(
        const std::string & group,
        const std::string & thread_prefix,
        uint32_t num_threads,
        uint32_t queue_size,
        const WorkFunction & work_function,
        bool fault_injection_eligible = false)
        : group_(group)
        , thread_prefix_(thread_prefix)
        , num_threads_(num_threads)
        , work_function_(work_function)
        , work_queue_(queue_size)
        , fault_injection_eligible_(fault_injection_eligible)
    {
   
    }
    /// Create the threads needed for this ThreadPool. Returns an error on any
    /// error spawning the threads.
    Status Init()
    {
        for (int i = 0; i < num_threads_;   i)
        {
            std::stringstream threadname;
            threadname << thread_prefix_ << "(" << i   1 << ":" << num_threads_ << ")";
            std::unique_ptr<Thread> t;
            Status status = Thread::Create(
                group_,
                threadname.str(),
                boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i),
                &t,
                fault_injection_eligible_);
            if (!status.ok())
            {
                // The thread pool initialization failed. Shutdown any threads that were
                // spawned. Note: Shutdown() and Join() are safe to call multiple times.
                Shutdown();
                Join();
                return status;
            }
            threads_.AddThread(std::move(t));
        }
        initialized_ = true;
        return Status::OK();
    }
    /// Blocking operation that puts a work item on the queue. If the queue is full, blocks
    /// until there is capacity available. The ThreadPool must be initialized before
    /// calling this method.
    //
    /// 'work' is copied into the work queue, but may be referenced at any time in the
    /// future. Therefore the caller needs to ensure that any data referenced by work (if T
    /// is, e.g., a pointer type) remains valid until work has been processed, and it's up to
    /// the caller to provide their own signalling mechanism to detect this (or to wait until
    /// after DrainAndShutdown returns).
    //
    /// Returns true if the work item was successfully added to the queue, false otherwise
    /// (which typically means that the thread pool has already been shut down).
    template <typename V>
    bool Offer(V && work)
    {
        DCHECK(initialized_);
        return work_queue_.BlockingPut(std::forward<V>(work));
    }

    /// Blocks until the work item is placed on the queue or the timeout expires. The
    /// ThreadPool must be initialized before calling this method. The same requirements
    /// about the lifetime of 'work' applies as in Offer() above. If the operation times
    /// out, the work item can be safely freed.
    ///
    /// Returns true if the work item was successfully added to the queue, false otherwise
    /// (which means the operation timed out or the thread pool has already been shut down).
    template <typename V>
    bool Offer(V && work, int64_t timeout_millis)
    {
        DCHECK(initialized_);
        int64_t timeout_micros = timeout_millis * MICROS_PER_MILLI;
        return work_queue_.BlockingPutWithTimeout(std::forward<V>(work), timeout_micros);
    }
private:
    /// Driver method for each thread in the pool. Continues to read work from the queue
    /// until the pool is shutdown.
  
    //工作线程
    void WorkerThread(int thread_id)
    {
        while (!IsShutdown())
        {
            T workitem;
            if (work_queue_.BlockingGet(&workitem))
            {
                work_function_(thread_id, workitem);
            }
            if (work_queue_.Size() == 0)
            {
                /// Take lock to ensure that DrainAndShutdown() cannot be between checking
                /// GetSize() and wait()'ing when the condition variable is notified.
                /// (It will hang if we notify right before calling wait().)
                std::unique_lock<std::mutex> l(lock_);
                empty_cv_.NotifyAll();
            }
        }
    }
    uint32_t num_threads_;

    /// User-supplied method to call to process each work item.
    WorkFunction work_function_;
}

6 TAcceptQueueServer

代码语言:c 复制
namespace apache {
namespace thrift {
namespace concurrency {
  class Runnable {
    public:
      virtual ~Runnable(){};
      virtual void run() = 0;
      virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
      virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }

    private:
      stdcxx::weak_ptr<Thread> thread_;
	};
  class ThreadFactory {
  protected:
    ThreadFactory(bool detached) : detached_(detached) { }
  }
}



namespace apache {
namespace thrift {
namespace server {

class TServer : public concurrency::Runnable {
public:
  virtual ~TServer() {}
  virtual void serve() = 0;
  virtual void stop() {}
  // Allows running the server as a Runnable thread
  virtual void run() { serve(); }
}

  

}
  
class TAcceptQueueServer : public TServer
{ 
        std::shared_ptr<ThreadFactory> threadFactory_;

        void TAcceptQueueServer::serve()
        {
            //SetupConnection 调用
            //Thread Pool 
            //WorkFunction 
            ThreadPool<shared_ptr<TAcceptQueueEntry>> connection_setup_pool(
                "setup-server",
                "setup-worker",
                FLAGS_accepted_cnxn_setup_thread_pool_size,
                FLAGS_accepted_cnxn_queue_depth,
                [this](int tid, const shared_ptr<TAcceptQueueEntry> & item) { this->SetupConnection(item); });
          
            Status status = connection_setup_pool.Init();

            while (!stop_)
            {
                try
                {
                    // Fetch client from server
                    shared_ptr<TTransport> client = serverTransport_->accept();

                    TSocket * socket = reinterpret_cast<TSocket *>(client.get());
                    //new Connection 
                    VLOG(1) << Substitute("New connection to server $0 from client $1", name_, socket->getSocketInfo());

                    shared_ptr<TAcceptQueueEntry> entry{new TAcceptQueueEntry};
                    entry->client_ = client;
                    if (queue_timeout_ms_ > 0)
                    {
                        entry->expiration_time_ = MonotonicMillis()   queue_timeout_ms_;
                    }

                    // New - the work done to set up the connection has been moved to SetupConnection.
                    // Note that we move() entry so it's owned by SetupConnection thread.
                    connection_setup_pool.Offer(std::move(entry);
                }
            }
          
        }
                                             
        //处理一个 Entry 即一个Client 
        void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry)
        {
                //net Task 
                //获取参数 成员变量
                TAcceptQueueServer::Task * task = new TAcceptQueueServer::Task(*this, processor, inputProtocol, outputProtocol, client);
                // Create a task
                shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
                // Create a thread for this task
                //这里就相当关键了 会去调用 thrift-server newThread 
                //将 Thrift-Server 的 ThriftThread 设置为 TAcceptQueueServer::Task TAcceptQueueServer::Task 
                shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
          
                tasks_.insert(task);
        }
}
                                                
 
  //跟 class TAcceptQueueServer::Task : public concurrency::Runnable 一个意思
  class TAcceptQueueServer::Task : public Runnable
  {
    public:
    Task(
      TAcceptQueueServer & server,
      shared_ptr<TProcessor> processor,
      shared_ptr<TProtocol> input,
      shared_ptr<TProtocol> output,
      shared_ptr<TTransport> transport)
      : server_(server)
        , processor_(std::move(processor))
        , input_(std::move(input))
        , output_(std::move(output))
        , transport_(std::move(transport))
      {
      }


    void run() override
    {
      //get EventHandler
      shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
      //eventHandler->CreateContext
      connectionContext = eventHandler->createContext(input_, output_);
      //event->processContext
      eventHandler->processContext(connectionContext, transport_);

    }
  }
}


7 ThriftThreadFactory

代码语言:c 复制
namespace apache {
namespace thrift {
namespace concurrency {
  class Runnable {
    public:
      virtual ~Runnable(){};
      virtual void run() = 0;
      virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
      virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }

    private:
      stdcxx::weak_ptr<Thread> thread_;
	};
  class ThreadFactory {
  protected:
    ThreadFactory(bool detached) : detached_(detached) { }
  }
}


class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory
{
  
}

四 调用过程

1 ImpalaServer->TProcessor

代码语言:c 复制
        // Initialize the client servers.
        shared_ptr<ImpalaServer> handler = shared_from_this();
        if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
        {
            //ImpalaServiceProcessor
            shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
        }

2 ImpalaServer->TProcessorEventHandler

代码语言:c 复制
        // Initialize the client servers.
        shared_ptr<ImpalaServer> handler = shared_from_this();
        if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
        {
            //ImpalaServiceProcessor
            shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
            //ImpalaServiceProcessor
            shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
            
            //设置实践句柄
            beeswax_processor->setEventHandler(event_handler);

3 ThriftServer->ThriftServer

代码语言:c 复制
namespace impala
{
/// Helper class to build new ThriftServer instances.
class ThriftServerBuilder
{
public:
    ThriftServerBuilder(const std::string & name, const std::shared_ptr<apache::thrift::TProcessor> & processor, int port)
        : name_(name), processor_(processor), port_(port)
    {
    }
    std::string name_;
  
    //contain TProcess
    std::shared_ptr<apache::thrift::TProcessor> processor_;
    int port_ = 0;
    /// Constructs a new ThriftServer and puts it in 'server', if construction was
    /// successful, returns an error otherwise. In the error case, 'server' will not have
    /// been set and will not need to be freed, otherwise the caller assumes ownership of
    /// '*server'.
    Status Build(ThriftServer ** server)
    {
        std::unique_ptr<ThriftServer> ptr(new ThriftServer(
            name_,
            processor_,
            port_,
            auth_provider_,
            metrics_,
            max_concurrent_connections_,
            queue_timeout_ms_,
            idle_poll_period_ms_,
            server_transport_type_));
        if (enable_ssl_)
        {
            RETURN_IF_ERROR(ptr->EnableSsl(version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
        }
        (*server) = ptr.release();
        return Status::OK();
    }
}
}


        // Initialize the client servers.
        shared_ptr<ImpalaServer> handler = shared_from_this();
        if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
        {
            //ImpalaServiceProcessor
            shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
            //ImpalaServiceProcessor
            shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
            
            //设置实践句柄
            beeswax_processor->setEventHandler(event_handler);
            
            //BEESWAX_SERVER_NAME  beeswax_processor  beeswax_port ThriftServerBuilder
            ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);

            if (IsExternalTlsConfigured())
            {
                LOG(INFO) << "Enabling SSL for Beeswax";
                builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
                    .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
                    .ssl_version(ssl_version)
                    .cipher_list(FLAGS_ssl_cipher_list);
            }

            ThriftServer * server;
            //build server 
            //这是属于啥语法 
            RETURN_IF_ERROR(builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
                                .metrics(exec_env_->metrics())
                                .max_concurrent_connections(FLAGS_fe_service_threads)
                                .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
                                .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
                                .Build(&server));
            
            //beeswax_server_ 
            beeswax_server_.reset(server);
            beeswax_server_->SetConnectionHandler(this);

4 ThriftServer Start

代码语言:c 复制
namespace impala
{
class AuthProvider;

/// Utility class for all Thrift servers. Runs a TAcceptQueueServer server with, by
/// default, no enforced concurrent connection limit, that exposes the interface
/// described by a user-supplied TProcessor object.
///
/// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's c'tors are
/// private.
/// TODO: shutdown is buggy (which only harms tests)
class ThriftServer
{
    friend class ThriftServerEventProcessor;

    /// Helper class that starts a server in a separate thread, and handles
    /// the inter-thread communication to monitor whether it started
    /// correctly.
    class ThriftServerEventProcessor : public apache::thrift::server::TServerEventHandler
    {
      
        Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer()
        {
            //Thread Create
            RETURN_IF_ERROR(Thread::Create(
                "thrift-server", name.str(), &ThriftServer::ThriftServerEventProcessor::Supervise, this, &thrift_server_->server_thread_));
            return Status::OK();
        }

        void ThriftServer::ThriftServerEventProcessor::Supervise()
        {
            //servce -> run 
            //server_ == TAcceptQueueServer
            thrift_server_->server_->serve();
          
          //go  TAcceptQueueServer->run 服务拉起
        }
    }
  
  
    /// Thrift housekeeping
    //Contaion TServer
    boost::scoped_ptr<apache::thrift::server::TServer> server_;
    /// Contain TProcessor
    std::shared_ptr<apache::thrift::TProcessor> processor_;
  
  
    
    void Start()
    {
        DCHECK(!started_);
        std::shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory());
        std::shared_ptr<ThreadFactory> thread_factory(new ThriftThreadFactory("thrift-server", name_));

        // Note - if you change the transport types here, you must check that the
        // logic in createContext is still accurate.
        std::shared_ptr<TServerSocket> server_socket;
        std::shared_ptr<TTransportFactory> transport_factory;
        RETURN_IF_ERROR(CreateSocket(&server_socket));
        RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(transport_type_, metrics_name_, metrics_, &transport_factory));

        //set TServer by TAcceptQueueServer 
        //且将 thread_factory .. 传递下去
        server_.reset(new TAcceptQueueServer(
            processor_,
            server_socket,
            transport_factory,
            protocol_factory,
            thread_factory,
            name_,
            max_concurrent_connections_,
            queue_timeout_ms_,
            idle_poll_period_ms_));
        if (metrics_ != NULL)
        {
            (static_cast<TAcceptQueueServer *>(server_.get()))->InitMetrics(metrics_, metrics_name_);
        }
        std::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(new ThriftServer::ThriftServerEventProcessor(this));
        server_->setServerEventHandler(event_processor);

        RETURN_IF_ERROR(event_processor->StartAndWaitForServer());

        // If port_ was 0, figure out which port the server is listening on after starting.
        port_ = server_socket->getPort();
        LOG(INFO) << "ThriftServer '" << name_ << "' started on port: " << port_ << (ssl_enabled() ? "s" : "");
        DCHECK(started_);
        return Status::OK();
    }
}
  

void ImpalaServer::Start()
{
    ....
    if (beeswax_server_.get())
    {
        RETURN_IF_ERROR(beeswax_server_->Start());
        LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
    }
}

}

5 ThreadThread ThreadFactory

代码语言:javascript复制
//将 Thrift-Server 的 ThriftThread 设置为 TAcceptQueueServer::Task
std::shared_ptr<atc::Thread> ThriftThreadFactory::newThread(std::shared_ptr<atc::Runnable> runnable) const
{
    stringstream name;
    name << prefix_ << "-" << count_.Add(1);
  
    //new ThriftThread
    std::shared_ptr<ThriftThread> result = std::shared_ptr<ThriftThread>(new ThriftThread(group_, name.str(), runnable));
    runnable->thread(result);
    return result;
}

//ThriftThread  
ThriftThread::ThriftThread(const string & group, const string & name, std::shared_ptr<atc::Runnable> runnable) : group_(group), name_(name)
{
    // Sets this::runnable (and no, I don't know why it's not protected in atc::Thread)
    this->Thread::runnable(runnable);
}



thrift-server run
void ThriftThread::start()
{
    Promise<atc::Thread::id_t> promise;
     //thriftThread 启动时候会使用runable() 

    Status status = impala::Thread::Create(group_, name_, bind(&ThriftThread::RunRunnable, this, runnable(), &promise), &impala_thread_);

void ThriftThread::RunRunnable(std::shared_ptr<atc::Runnable> runnable, Promise<atc::Thread::id_t> * promise)
{
    promise->Set(get_current());
    // Passing runnable in to this method (rather than reading from this->runnable())
    // ensures that it will live as long as this method, otherwise the ThriftThread could be
    // destroyed between the previous statement and this one (according to my reading of
    // PosixThread)
  
  
    //这里就回去调用 TAcceptQueueServer::Task::run()
    runnable->run();
}
  

五 简要梳理

Main->ImapadMain->ImpalaServer->ThriftServer

ThrifServer 依赖 TServer 创建了较多的 对象(TProcess TThreadFactory)

ThriftServer/ThriftFactory->ThreadFactory->ThreadThread

最后希望能够学习Impala的同学带来一些帮助!

0 人点赞