一 UML
二 本文主要讲解过程
- main(daemain-main.cc)
- ImpaladMain/StatestoredMain/CatalogdMain/AdmissiondMain (impalad-main.cc)
- ExecEnv.init()
- impala_server = new ImpalaServer(exec_env)
- impala_server->Start((FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port, FLAGS_external_fe_port)
- impaa_server->Join
// Initialize the client servers.
shared_ptr<ImpalaServer> handler = shared_from_this();
if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
{
//ImpalaServiceProcessor
shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
//ImpalaServiceProcessor
shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
//设置实践句柄
beeswax_processor->setEventHandler(event_handler);
//BEESWAX_SERVER_NAME beeswax_processor beeswax_port ThriftServerBuilder
ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
if (IsExternalTlsConfigured())
{
LOG(INFO) << "Enabling SSL for Beeswax";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
ThriftServer * server;
//build server
//这是属于啥语法
RETURN_IF_ERROR(builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.Build(&server));
//beeswax_server_
beeswax_server_.reset(server);
beeswax_server_->SetConnectionHandler(this);
}
if (beeswax_server_.get())
{
RETURN_IF_ERROR(beeswax_server_->Start());
LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
}
三 基础关系篇
1 BeeswaxServiceIf 系列
代码语言:javascript复制class ImpalaServiceIf : virtual public ::beeswax::BeeswaxServiceIf
class ImpalaServer : public ImpalaServiceIf,
public ImpalaHiveServer2ServiceIf,
public ThriftServer::ConnectionHandlerIf,
public std::enable_shared_from_this<ImpalaServer>,
public CacheLineAligned
2 TProcessorEventHandler
代码语言:c 复制namespace apache {
namespace thrift {
/**
* Virtual interface class that can handle events from the processor. To
* use this you should subclass it and implement the methods that you care
* about. Your subclass can also store local data that you may care about,
* such as additional "arguments" to these methods (stored in the object
* instance's state).
*/
class TProcessorEventHandler {
public:
virtual ~TProcessorEventHandler() {}
/**
* Called before calling other callback methods.
* Expected to return some sort of context object.
* The return value is passed to all other callbacks
* for that function invocation.
*/
virtual void* getContext(const char* fn_name, void* serverContext) {
(void)fn_name;
(void)serverContext;
return NULL;
}
/**
* Expected to free resources associated with a context.
*/
virtual void freeContext(void* ctx, const char* fn_name) {
(void)ctx;
(void)fn_name;
}
/**
* Called before reading arguments.
*/
virtual void preRead(void* ctx, const char* fn_name) {
(void)ctx;
(void)fn_name;
}
/**
* Called between reading arguments and calling the handler.
*/
virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
(void)ctx;
(void)fn_name;
(void)bytes;
}
/**
* Called between calling the handler and writing the response.
*/
virtual void preWrite(void* ctx, const char* fn_name) {
(void)ctx;
(void)fn_name;
}
/**
* Called after writing the response.
*/
virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
(void)ctx;
(void)fn_name;
(void)bytes;
}
}
}
namespace impala
{
/// An RpcEventHandler is called every time an Rpc is started and completed. There is at
/// most one RpcEventHandler per ThriftServer. When an Rpc is started, getContext() creates
/// an InvocationContext recording the current time and other metadata for that invocation.
class RpcEventHandler : public apache::thrift::TProcessorEventHandler
{
public:
RpcEventHandler(const std::string & server_name, MetricGroup * metrics);
}
}
3 TProcessor 系列
代码语言:c 复制namespace apache {
namespace thrift {
/**
* A processor is a generic object that acts upon two streams of data, one
* an input and the other an output. The definition of this object is loose,
* though the typical case is for some sort of server that either generates
* responses to an input stream or forwards data from one pipe onto another.
*
*/
class TProcessor {
public:
virtual ~TProcessor() {}
virtual bool process(stdcxx::shared_ptr<protocol::TProtocol> in,
stdcxx::shared_ptr<protocol::TProtocol> out,
void* connectionContext) = 0;
bool process(stdcxx::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) {
return process(io, io, connectionContext);
}
stdcxx::shared_ptr<TProcessorEventHandler> getEventHandler() const { return eventHandler_; }
void setEventHandler(stdcxx::shared_ptr<TProcessorEventHandler> eventHandler) {
eventHandler_ = eventHandler;
}
protected:
TProcessor() {}
stdcxx::shared_ptr<TProcessorEventHandler> eventHandler_;
};
}
namespace beeswax
{
stdcxx::shared_ptr<TProcessorEventHandler> eventHandler_;
//这里只是作了一层抽象
template <class Protocol_>
class TDispatchProcessorT : public TProcessor
{
}
template <class Protocol_>
class BeeswaxServiceProcessorT : public ::apache::thrift::TDispatchProcessorT<Protocol_>
{
protected:
::apache::thrift::stdcxx::shared_ptr<BeeswaxServiceIf> iface_;
BeeswaxServiceProcessorT(::apache::thrift::stdcxx::shared_ptr<BeeswaxServiceIf> iface) :
iface_(iface) {}
}
}
namespace apache {
namespace thrift {
namespace protocol {
class TProtocol
{
virtual uint32_t writeBool_virt(const bool value) = 0;
virtual uint32_t writeByte_virt(const int8_t byte) = 0;
virtual uint32_t writeI16_virt(const int16_t i16) = 0;
virtual uint32_t writeI32_virt(const int32_t i32) = 0;
}
class TDummyProtocol : public TProtocol {};
}
}
}
namespace impala
{
template <class Protocol_>
class ImpalaServiceProcessorT : public ::beeswax::BeeswaxServiceProcessorT<Protocol_>
{
protected:
::apache::thrift::stdcxx::shared_ptr<ImpalaServiceIf> iface_;
ImpalaServiceProcessorT(::apache::thrift::stdcxx::shared_ptr<ImpalaServiceIf> iface) :
::beeswax::BeeswaxServiceProcessorT<Protocol_>(iface),
iface_(iface)
}
typedef ImpalaServiceProcessorT< ::apache::thrift::protocol::TDummyProtocol > ImpalaServiceProcessor;
}
4 TServer
代码语言:c 复制namespace apache {
namespace thrift {
namespace concurrency {
class Runnable {
public:
virtual ~Runnable(){};
virtual void run() = 0;
virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
private:
stdcxx::weak_ptr<Thread> thread_;
};
}
namespace apache {
namespace thrift {
namespace server {
class TServer : public concurrency::Runnable {
//Contain TProcessor TServerTransport TTransportFactory TProtocolFactory
TServer(const stdcxx::shared_ptr<TProcessor>& processor,
const stdcxx::shared_ptr<TServerTransport>& serverTransport,
const stdcxx::shared_ptr<TTransportFactory>& transportFactory,
const stdcxx::shared_ptr<TProtocolFactory>& protocolFactory)
: processorFactory_(new TSingletonProcessorFactory(processor)),
serverTransport_(serverTransport),
inputTransportFactory_(transportFactory),
outputTransportFactory_(transportFactory),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory) {}
public:
virtual ~TServer() {}
virtual void serve() = 0;
virtual void stop() {}
// Allows running the server as a Runnable thread
virtual void run() { serve(); }
}
/**
* In TAcceptQueueServer, the main server thread calls accept() and then immediately
* places the returned TTransport on a queue to be processed by a separate thread,
* asynchronously.
*
* This helps solve IMPALA-4135, where connections were timing out while waiting in the
* OS accept queue, by ensuring that accept() is called as quickly as possible.
*/
class TAcceptQueueServer : public TServer
{
void serve() override;
}
class TAcceptQueueServer::Task : public Runnable
{
void run() override
}
}
5 ThreadPool
代码语言:c 复制namespace apache {
namespace thrift {
namespace transport {
/**
* Generic interface for a method of transporting data. A TTransport may be
* capable of either reading or writing, but not necessarily both.
*
*/
class TTransport {
protected:
/**
* Simple constructor.
*/
TTransport() {}
public:
virtual bool isOpen() { return false; }
virtual void close() {
throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
}
uint32_t read(uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
return read_virt(buf, len);
}
void write(const uint8_t* buf, uint32_t len) {
T_VIRTUAL_CALL();
write_virt(buf, len);
}
}
namespace apache
{
namespace thrift
{
namespace server
{
using apache::thrift::TProcessor;
using apache::thrift::concurrency::Monitor;
using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::transport::TServerTransport;
using apache::thrift::transport::TTransportFactory;
struct TAcceptQueueEntry
{
std::shared_ptr<TTransport> client_; //Client 封装
int64_t expiration_time_ = 0LL;
};
}
}
namespace impala
{
/// Simple threadpool which processes items (of type T) in parallel which were placed on a
/// blocking queue by Offer(). Each item is processed by a single user-supplied method.
template <typename T>
class ThreadPool : public CacheLineAligned
{
public:
/// Signature of a work-processing function. Takes the integer id of the thread which is
/// calling it (ids run from 0 to num_threads - 1) and a reference to the item to
/// process.
/// Lambda 表达式 存储 所有 正在WorkFunction 正在处理的请求
typedef boost::function<void(int thread_id, const T & workitem)> WorkFunction;
/// Creates a new thread pool without starting any threads. Code must call
/// Init() on this thread pool before any calls to Offer().
/// -- num_threads: how many threads are part of this pool
/// -- queue_size: the maximum size of the queue on which work items are offered. If the
/// queue exceeds this size, subsequent calls to Offer will block until there is
/// capacity available.
/// -- work_function: the function to run every time an item is consumed from the queue
/// -- fault_injection_eligible - If set to true, allow fault injection at this
/// callsite (see thread_creation_fault_injection). If set to false, fault
/// injection is diabled at this callsite. Thread creation sites that crash
/// Impala or abort startup must have this set to false.
ThreadPool(
const std::string & group,
const std::string & thread_prefix,
uint32_t num_threads,
uint32_t queue_size,
const WorkFunction & work_function,
bool fault_injection_eligible = false)
: group_(group)
, thread_prefix_(thread_prefix)
, num_threads_(num_threads)
, work_function_(work_function)
, work_queue_(queue_size)
, fault_injection_eligible_(fault_injection_eligible)
{
}
/// Create the threads needed for this ThreadPool. Returns an error on any
/// error spawning the threads.
Status Init()
{
for (int i = 0; i < num_threads_; i)
{
std::stringstream threadname;
threadname << thread_prefix_ << "(" << i 1 << ":" << num_threads_ << ")";
std::unique_ptr<Thread> t;
Status status = Thread::Create(
group_,
threadname.str(),
boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i),
&t,
fault_injection_eligible_);
if (!status.ok())
{
// The thread pool initialization failed. Shutdown any threads that were
// spawned. Note: Shutdown() and Join() are safe to call multiple times.
Shutdown();
Join();
return status;
}
threads_.AddThread(std::move(t));
}
initialized_ = true;
return Status::OK();
}
/// Blocking operation that puts a work item on the queue. If the queue is full, blocks
/// until there is capacity available. The ThreadPool must be initialized before
/// calling this method.
//
/// 'work' is copied into the work queue, but may be referenced at any time in the
/// future. Therefore the caller needs to ensure that any data referenced by work (if T
/// is, e.g., a pointer type) remains valid until work has been processed, and it's up to
/// the caller to provide their own signalling mechanism to detect this (or to wait until
/// after DrainAndShutdown returns).
//
/// Returns true if the work item was successfully added to the queue, false otherwise
/// (which typically means that the thread pool has already been shut down).
template <typename V>
bool Offer(V && work)
{
DCHECK(initialized_);
return work_queue_.BlockingPut(std::forward<V>(work));
}
/// Blocks until the work item is placed on the queue or the timeout expires. The
/// ThreadPool must be initialized before calling this method. The same requirements
/// about the lifetime of 'work' applies as in Offer() above. If the operation times
/// out, the work item can be safely freed.
///
/// Returns true if the work item was successfully added to the queue, false otherwise
/// (which means the operation timed out or the thread pool has already been shut down).
template <typename V>
bool Offer(V && work, int64_t timeout_millis)
{
DCHECK(initialized_);
int64_t timeout_micros = timeout_millis * MICROS_PER_MILLI;
return work_queue_.BlockingPutWithTimeout(std::forward<V>(work), timeout_micros);
}
private:
/// Driver method for each thread in the pool. Continues to read work from the queue
/// until the pool is shutdown.
//工作线程
void WorkerThread(int thread_id)
{
while (!IsShutdown())
{
T workitem;
if (work_queue_.BlockingGet(&workitem))
{
work_function_(thread_id, workitem);
}
if (work_queue_.Size() == 0)
{
/// Take lock to ensure that DrainAndShutdown() cannot be between checking
/// GetSize() and wait()'ing when the condition variable is notified.
/// (It will hang if we notify right before calling wait().)
std::unique_lock<std::mutex> l(lock_);
empty_cv_.NotifyAll();
}
}
}
uint32_t num_threads_;
/// User-supplied method to call to process each work item.
WorkFunction work_function_;
}
6 TAcceptQueueServer
代码语言:c 复制namespace apache {
namespace thrift {
namespace concurrency {
class Runnable {
public:
virtual ~Runnable(){};
virtual void run() = 0;
virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
private:
stdcxx::weak_ptr<Thread> thread_;
};
class ThreadFactory {
protected:
ThreadFactory(bool detached) : detached_(detached) { }
}
}
namespace apache {
namespace thrift {
namespace server {
class TServer : public concurrency::Runnable {
public:
virtual ~TServer() {}
virtual void serve() = 0;
virtual void stop() {}
// Allows running the server as a Runnable thread
virtual void run() { serve(); }
}
}
class TAcceptQueueServer : public TServer
{
std::shared_ptr<ThreadFactory> threadFactory_;
void TAcceptQueueServer::serve()
{
//SetupConnection 调用
//Thread Pool
//WorkFunction
ThreadPool<shared_ptr<TAcceptQueueEntry>> connection_setup_pool(
"setup-server",
"setup-worker",
FLAGS_accepted_cnxn_setup_thread_pool_size,
FLAGS_accepted_cnxn_queue_depth,
[this](int tid, const shared_ptr<TAcceptQueueEntry> & item) { this->SetupConnection(item); });
Status status = connection_setup_pool.Init();
while (!stop_)
{
try
{
// Fetch client from server
shared_ptr<TTransport> client = serverTransport_->accept();
TSocket * socket = reinterpret_cast<TSocket *>(client.get());
//new Connection
VLOG(1) << Substitute("New connection to server $0 from client $1", name_, socket->getSocketInfo());
shared_ptr<TAcceptQueueEntry> entry{new TAcceptQueueEntry};
entry->client_ = client;
if (queue_timeout_ms_ > 0)
{
entry->expiration_time_ = MonotonicMillis() queue_timeout_ms_;
}
// New - the work done to set up the connection has been moved to SetupConnection.
// Note that we move() entry so it's owned by SetupConnection thread.
connection_setup_pool.Offer(std::move(entry);
}
}
}
//处理一个 Entry 即一个Client
void TAcceptQueueServer::SetupConnection(shared_ptr<TAcceptQueueEntry> entry)
{
//net Task
//获取参数 成员变量
TAcceptQueueServer::Task * task = new TAcceptQueueServer::Task(*this, processor, inputProtocol, outputProtocol, client);
// Create a task
shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
// Create a thread for this task
//这里就相当关键了 会去调用 thrift-server newThread
//将 Thrift-Server 的 ThriftThread 设置为 TAcceptQueueServer::Task TAcceptQueueServer::Task
shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
tasks_.insert(task);
}
}
//跟 class TAcceptQueueServer::Task : public concurrency::Runnable 一个意思
class TAcceptQueueServer::Task : public Runnable
{
public:
Task(
TAcceptQueueServer & server,
shared_ptr<TProcessor> processor,
shared_ptr<TProtocol> input,
shared_ptr<TProtocol> output,
shared_ptr<TTransport> transport)
: server_(server)
, processor_(std::move(processor))
, input_(std::move(input))
, output_(std::move(output))
, transport_(std::move(transport))
{
}
void run() override
{
//get EventHandler
shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
//eventHandler->CreateContext
connectionContext = eventHandler->createContext(input_, output_);
//event->processContext
eventHandler->processContext(connectionContext, transport_);
}
}
}
7 ThriftThreadFactory
代码语言:c 复制namespace apache {
namespace thrift {
namespace concurrency {
class Runnable {
public:
virtual ~Runnable(){};
virtual void run() = 0;
virtual stdcxx::shared_ptr<Thread> thread() { return thread_.lock(); }
virtual void thread(stdcxx::shared_ptr<Thread> value) { thread_ = value; }
private:
stdcxx::weak_ptr<Thread> thread_;
};
class ThreadFactory {
protected:
ThreadFactory(bool detached) : detached_(detached) { }
}
}
class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory
{
}
四 调用过程
1 ImpalaServer->TProcessor
代码语言:c 复制 // Initialize the client servers.
shared_ptr<ImpalaServer> handler = shared_from_this();
if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
{
//ImpalaServiceProcessor
shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
}
2 ImpalaServer->TProcessorEventHandler
代码语言:c 复制 // Initialize the client servers.
shared_ptr<ImpalaServer> handler = shared_from_this();
if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
{
//ImpalaServiceProcessor
shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
//ImpalaServiceProcessor
shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
//设置实践句柄
beeswax_processor->setEventHandler(event_handler);
3 ThriftServer->ThriftServer
代码语言:c 复制namespace impala
{
/// Helper class to build new ThriftServer instances.
class ThriftServerBuilder
{
public:
ThriftServerBuilder(const std::string & name, const std::shared_ptr<apache::thrift::TProcessor> & processor, int port)
: name_(name), processor_(processor), port_(port)
{
}
std::string name_;
//contain TProcess
std::shared_ptr<apache::thrift::TProcessor> processor_;
int port_ = 0;
/// Constructs a new ThriftServer and puts it in 'server', if construction was
/// successful, returns an error otherwise. In the error case, 'server' will not have
/// been set and will not need to be freed, otherwise the caller assumes ownership of
/// '*server'.
Status Build(ThriftServer ** server)
{
std::unique_ptr<ThriftServer> ptr(new ThriftServer(
name_,
processor_,
port_,
auth_provider_,
metrics_,
max_concurrent_connections_,
queue_timeout_ms_,
idle_poll_period_ms_,
server_transport_type_));
if (enable_ssl_)
{
RETURN_IF_ERROR(ptr->EnableSsl(version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
}
(*server) = ptr.release();
return Status::OK();
}
}
}
// Initialize the client servers.
shared_ptr<ImpalaServer> handler = shared_from_this();
if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0))
{
//ImpalaServiceProcessor
shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
//ImpalaServiceProcessor
shared_ptr<TProcessorEventHandler> event_handler(new RpcEventHandler("ImpalaServiceProcessor", exec_env_->metrics()));
//设置实践句柄
beeswax_processor->setEventHandler(event_handler);
//BEESWAX_SERVER_NAME beeswax_processor beeswax_port ThriftServerBuilder
ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
if (IsExternalTlsConfigured())
{
LOG(INFO) << "Enabling SSL for Beeswax";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
ThriftServer * server;
//build server
//这是属于啥语法
RETURN_IF_ERROR(builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.Build(&server));
//beeswax_server_
beeswax_server_.reset(server);
beeswax_server_->SetConnectionHandler(this);
4 ThriftServer Start
代码语言:c 复制namespace impala
{
class AuthProvider;
/// Utility class for all Thrift servers. Runs a TAcceptQueueServer server with, by
/// default, no enforced concurrent connection limit, that exposes the interface
/// described by a user-supplied TProcessor object.
///
/// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's c'tors are
/// private.
/// TODO: shutdown is buggy (which only harms tests)
class ThriftServer
{
friend class ThriftServerEventProcessor;
/// Helper class that starts a server in a separate thread, and handles
/// the inter-thread communication to monitor whether it started
/// correctly.
class ThriftServerEventProcessor : public apache::thrift::server::TServerEventHandler
{
Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer()
{
//Thread Create
RETURN_IF_ERROR(Thread::Create(
"thrift-server", name.str(), &ThriftServer::ThriftServerEventProcessor::Supervise, this, &thrift_server_->server_thread_));
return Status::OK();
}
void ThriftServer::ThriftServerEventProcessor::Supervise()
{
//servce -> run
//server_ == TAcceptQueueServer
thrift_server_->server_->serve();
//go TAcceptQueueServer->run 服务拉起
}
}
/// Thrift housekeeping
//Contaion TServer
boost::scoped_ptr<apache::thrift::server::TServer> server_;
/// Contain TProcessor
std::shared_ptr<apache::thrift::TProcessor> processor_;
void Start()
{
DCHECK(!started_);
std::shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory());
std::shared_ptr<ThreadFactory> thread_factory(new ThriftThreadFactory("thrift-server", name_));
// Note - if you change the transport types here, you must check that the
// logic in createContext is still accurate.
std::shared_ptr<TServerSocket> server_socket;
std::shared_ptr<TTransportFactory> transport_factory;
RETURN_IF_ERROR(CreateSocket(&server_socket));
RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(transport_type_, metrics_name_, metrics_, &transport_factory));
//set TServer by TAcceptQueueServer
//且将 thread_factory .. 传递下去
server_.reset(new TAcceptQueueServer(
processor_,
server_socket,
transport_factory,
protocol_factory,
thread_factory,
name_,
max_concurrent_connections_,
queue_timeout_ms_,
idle_poll_period_ms_));
if (metrics_ != NULL)
{
(static_cast<TAcceptQueueServer *>(server_.get()))->InitMetrics(metrics_, metrics_name_);
}
std::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(new ThriftServer::ThriftServerEventProcessor(this));
server_->setServerEventHandler(event_processor);
RETURN_IF_ERROR(event_processor->StartAndWaitForServer());
// If port_ was 0, figure out which port the server is listening on after starting.
port_ = server_socket->getPort();
LOG(INFO) << "ThriftServer '" << name_ << "' started on port: " << port_ << (ssl_enabled() ? "s" : "");
DCHECK(started_);
return Status::OK();
}
}
void ImpalaServer::Start()
{
....
if (beeswax_server_.get())
{
RETURN_IF_ERROR(beeswax_server_->Start());
LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
}
}
}
5 ThreadThread ThreadFactory
代码语言:javascript复制//将 Thrift-Server 的 ThriftThread 设置为 TAcceptQueueServer::Task
std::shared_ptr<atc::Thread> ThriftThreadFactory::newThread(std::shared_ptr<atc::Runnable> runnable) const
{
stringstream name;
name << prefix_ << "-" << count_.Add(1);
//new ThriftThread
std::shared_ptr<ThriftThread> result = std::shared_ptr<ThriftThread>(new ThriftThread(group_, name.str(), runnable));
runnable->thread(result);
return result;
}
//ThriftThread
ThriftThread::ThriftThread(const string & group, const string & name, std::shared_ptr<atc::Runnable> runnable) : group_(group), name_(name)
{
// Sets this::runnable (and no, I don't know why it's not protected in atc::Thread)
this->Thread::runnable(runnable);
}
thrift-server run
void ThriftThread::start()
{
Promise<atc::Thread::id_t> promise;
//thriftThread 启动时候会使用runable()
Status status = impala::Thread::Create(group_, name_, bind(&ThriftThread::RunRunnable, this, runnable(), &promise), &impala_thread_);
void ThriftThread::RunRunnable(std::shared_ptr<atc::Runnable> runnable, Promise<atc::Thread::id_t> * promise)
{
promise->Set(get_current());
// Passing runnable in to this method (rather than reading from this->runnable())
// ensures that it will live as long as this method, otherwise the ThriftThread could be
// destroyed between the previous statement and this one (according to my reading of
// PosixThread)
//这里就回去调用 TAcceptQueueServer::Task::run()
runnable->run();
}
五 简要梳理
Main->ImapadMain->ImpalaServer->ThriftServer
ThrifServer 依赖 TServer 创建了较多的 对象(TProcess TThreadFactory)
ThriftServer/ThriftFactory->ThreadFactory->ThreadThread
最后希望能够学习Impala的同学带来一些帮助!