13. 深入研究
13.1 MDS启动阶段分析
代码语言:javascript
复制//src/ceph_mds.cc
int main(int argc, const char **argv)
{
ceph_pthread_setname(pthread_self(), "ceph-mds");
vector<const char*> args;
argv_to_vec(argc, argv, args);
env_to_vec(args);
//初始化全局信息
auto cct = global_init(NULL, args,
CEPH_ENTITY_TYPE_MDS, CODE_ENVIRONMENT_DAEMON,
0, "mds_data");
//初始化堆栈分析器
ceph_heap_profiler_init();
std::string val, action;
for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
break;
}
else if (ceph_argparse_flag(args, i, "--help", "-h", (char*)NULL)) {
// exit(1) will be called in the usage()
usage();
}
else if (ceph_argparse_witharg(args, i, &val, "--hot-standby", (char*)NULL)) {
int r = parse_rank("hot-standby", val);
dout(0) << "requesting standby_replay for mds." << r << dendl;
char rb[32];
snprintf(rb, sizeof(rb), "%d", r);
g_conf->set_val("mds_standby_for_rank", rb);
g_conf->set_val("mds_standby_replay", "true");
g_conf->apply_changes(NULL);
}
else {
derr << "Error: can't understand argument: " << *i << "n" << dendl;
usage();
}
}
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
// Normal startup
if (g_conf->name.has_default_id()) {
derr << "must specify '-i name' with the ceph-mds instance name" << dendl;
usage();
}
if (g_conf->name.get_id().empty() ||
(g_conf->name.get_id()[0] >= '0' && g_conf->name.get_id()[0] <= '9')) {
derr << "deprecation warning: MDS id '" << g_conf->name
<< "' is invalid and will be forbidden in a future version. "
"MDS names may not start with a numeric digit." << dendl;
}
uint64_t nonce = 0;
get_random_bytes((char*)&nonce, sizeof(nonce));
std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
//创建通信的messenger
Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::MDS(-1), "mds",
nonce, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL);
cout << "starting " << g_conf->name << " at " << msgr->get_myaddr()
<< std::endl;
uint64_t required =
CEPH_FEATURE_OSDREPLYMUX;
msgr->set_default_policy(Messenger::Policy::lossy_client(required));
msgr->set_policy(entity_name_t::TYPE_MON,
Messenger::Policy::lossy_client(CEPH_FEATURE_UID |
CEPH_FEATURE_PGID64));
msgr->set_policy(entity_name_t::TYPE_MDS,
Messenger::Policy::lossless_peer(CEPH_FEATURE_UID));
msgr->set_policy(entity_name_t::TYPE_CLIENT,
Messenger::Policy::stateful_server(0));
int r = msgr->bind(g_conf->public_addr);
if (r < 0)
exit(1);
global_init_daemonize(g_ceph_context);
common_init_finish(g_ceph_context);
// get monmap
MonClient mc(g_ceph_context);
if (mc.build_initial_monmap() < 0)
return -1;
global_init_chdir(g_ceph_context);
//开始接收消息
msgr->start();
//创建MDSDaemon,启动MDS
mds = new MDSDaemon(g_conf->name.get_id().c_str(), msgr, &mc);
// in case we have to respawn...
mds->orig_argc = argc;
mds->orig_argv = argv;
r = mds->init();
if (r < 0) {
msgr->wait();
goto shutdown;
}
...
msgr->wait();
...
return 0;
}
13.2 MDS核心组件
13.3 MDSDaemon类图
13.4 MDSDaemon源码分析
代码语言:javascript
复制//MDSDaemon.cc
/***************************admin socket相关,统计mds埋点信息及状态信息***************************/
bool MDSDaemon::asok_command(string command, cmdmap_t& cmdmap, string format,
ostream& ss);
void MDSDaemon::dump_status(Formatter *f);
void MDSDaemon::set_up_admin_socket();
void MDSDaemon::clean_up_admin_socket()
/*********************************************************************************************/
/***************************初始化***************************/
int MDSDaemon::init()
{
...
//初始化MonClient
int r = 0;
r = monc->init();
if (r < 0) {
derr << "ERROR: failed to get monmap: " << cpp_strerror(-r) << dendl;
mds_lock.Lock();
suicide();
mds_lock.Unlock();
return r;
}
...
//初始化mgrclient
mgrc.init();
messenger->add_dispatcher_head(&mgrc);
mds_lock.Lock();
if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
dout(4) << __func__ << ": terminated already, dropping out" << dendl;
mds_lock.Unlock();
return 0;
}
monc->sub_want("mdsmap", 0, 0);
monc->sub_want("mgrmap", 0, 0);
monc->renew_subs();
mds_lock.Unlock();
//初始化SaftTimer
timer.init();
//初始化Beacon
beacon.init(mdsmap);
messenger->set_myname(entity_name_t::MDS(MDS_RANK_NONE));
// 重置tick
reset_tick();
mds_lock.Unlock();
return 0;
}
/*********************************************************/
/***************************重置tick相关***************************/
void MDSDaemon::reset_tick();
void MDSDaemon::tick();
/****************************************************************/
/***************************处理命令,返回信息***************************/
void MDSDaemon::handle_command(MCommand *m);
void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
int r, bufferlist outbl,
boost::string_view outs);
//mds map信息
void MDSDaemon::handle_mds_map(MMDSMap *m);
/*********************************************************************/
/***************************处理信号,自杀,重生*************************/
void MDSDaemon::handle_signal(int signum);
void MDSDaemon::suicide();
void MDSDaemon::respawn();
/********************************************************************/
/***************************消息调度处理*************************/
bool MDSDaemon::ms_dispatch(Message *m)
{
Mutex::Locker l(mds_lock);
if (stopping) {
return false;
}
//mds处于shutdown状态,不处理消息
if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
dout(10) << " stopping, discarding " << *m << dendl;
m->put();
return true;
}
// 优先处理daemon message
const bool handled_core = handle_core_message(m);
if (handled_core) {
return true;
}
// 不是核心的,尝试给rank发送消息
if (mds_rank) {
return mds_rank->ms_dispatch(m);
} else {
return false;
}
}
//高优先级处理的消息MON,MDS,OSD
bool MDSDaemon::handle_core_message(Message *m)
{
switch (m->get_type()) {
// MON
case CEPH_MSG_MON_MAP:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
m->put();
break;
// MDS
case CEPH_MSG_MDS_MAP:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_MDS);
handle_mds_map(static_cast<MMDSMap*>(m));
break;
// OSD
case MSG_COMMAND:
handle_command(static_cast<MCommand*>(m));
break;
case CEPH_MSG_OSD_MAP:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
if (mds_rank) {
mds_rank->handle_osd_map();
}
m->put();
break;
case MSG_MON_COMMAND:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
clog->warn() << "dropping `mds tell` command from legacy monitor";
m->put();
break;
default:
return false;
}
return true;
}
//重置消息,不进行处理
bool MDSDaemon::ms_handle_reset(Connection *con);
void MDSDaemon::ms_handle_remote_reset(Connection *con);
bool MDSDaemon::ms_handle_refused(Connection *con)
/***************************************************************/
/***************************auth模块*************************/
//mon生成auth
bool MDSDaemon::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
//验证授权
bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& is_valid, CryptoKey& session_key,
std::unique_ptr<AuthAuthorizerChallenge> *challenge);
/**************************************************************/
/***************************session 连接accept*************************/
void MDSDaemon::ms_handle_accept(Connection *con)
{
Mutex::Locker l(mds_lock);
if (stopping) {
return;
}
Session *s = static_cast<Session *>(con->get_priv());
dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
if (s) {
if (s->connection != con) {
dout(10) << " session connection " << s->connection << " -> " << con << dendl;
s->connection = con;
// send out any queued messages
while (!s->preopen_out_queue.empty()) {
con->send_message(s->preopen_out_queue.front());
s->preopen_out_queue.pop_front();
}
}
s->put();
}
}
/*************************************************************/
/***************************clean shutdown*************************/
bool MDSDaemon::is_clean_shutdown()
{
if (mds_rank) {
return mds_rank->is_stopped();
} else {
return true;
}
}
/************************************************************/
13.5 MDSRank类图
13.6 MDSRank源码分析
代码语言:javascript
复制//MDSRank.cc
/***************************init初始化***************************/
void MDSRankDispatcher::init()
{
//Objecter初始化,并且添加到消息头部,然后启动
objecter->init();
messenger->add_dispatcher_head(objecter);
objecter->start();
//更新配置文件中log配置信息
update_log_config();
create_logger();
handle_osd_map();
progress_thread.create("mds_rank_progr");
purge_queue.init();
finisher->start();
}
/***************************************************************/
/***************************tick***************************/
void MDSRankDispatcher::tick()
{
//重置heartbeat超时时间,避免被monitor kill
heartbeat_reset();
if (beacon.is_laggy()) {
dout(5) << "tick bailing out since we seem laggy" << dendl;
return;
}
//从op_tracker中读取到所有in_flight的操作名称
check_ops_in_flight();
//唤醒progress_thread线程
progress_thread.signal();
// make sure mds log flushes, trims periodically
mdlog->flush();
//如果是active,stopping去除cache,client_leases,log
if (is_active() || is_stopping()) {
mdcache->trim();
mdcache->trim_client_leases();
mdcache->check_memory_usage();
mdlog->trim(); // NOT during recovery!
}
// 更新log
if (logger) {
logger->set(l_mds_subtrees, mdcache->num_subtrees());
mdcache->log_stat();
}
// ...
if (is_clientreplay() || is_active() || is_stopping()) {
server->find_idle_sessions();
locker->tick();
}
//如果处于reconnect 标记
if (is_reconnect())
server->reconnect_tick();
if (is_active()) {
balancer->tick();
mdcache->find_stale_fragment_freeze();
mdcache->migrator->find_stale_export_freeze();
if (snapserver)
snapserver->check_osd_map(false);
}
if (is_active() || is_stopping()) {
update_targets(ceph_clock_now());
}
// shut down?
if (is_stopping()) {
mdlog->trim();
if (mdcache->shutdown_pass()) {
uint64_t pq_progress = 0 ;
uint64_t pq_total = 0;
size_t pq_in_flight = 0;
if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
dout(7) << "shutdown_pass=true, but still waiting for purge queue"
<< dendl;
// This takes unbounded time, so we must indicate progress
// to the administrator: we do it in a slightly imperfect way
// by sending periodic (tick frequency) clog messages while
// in this state.
clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
<< std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
<< " files purging" << ")";
} else {
dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
"down:stopped" << dendl;
stopping_done();
}
}
else {
dout(7) << "shutdown_pass=false" << dendl;
}
}
// Expose ourselves to Beacon to update health indicators
beacon.notify_health(this);
}
/***********************************************************/
/***************************shutdown***************************/
void MDSRankDispatcher::shutdown()
{
// It should never be possible for shutdown to get called twice, because
// anyone picking up mds_lock checks if stopping is true and drops
// out if it is.
assert(stopping == false);
stopping = true;
dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
//关闭定时器
timer.shutdown();
//关闭mdlog
mdlog->shutdown();
//关闭mdcache
mdcache->shutdown();
purge_queue.shutdown();
mds_lock.Unlock();
finisher->stop(); // no flushing
mds_lock.Lock();
//关闭objecter
if (objecter->initialized)
objecter->shutdown();
//关闭monclient
monc->shutdown();
//关闭op_tracker
op_tracker.on_shutdown();
//关闭progress_thread
progress_thread.shutdown();
// release mds_lock for finisher/messenger threads (e.g.
// MDSDaemon::ms_handle_reset called from Messenger).
mds_lock.Unlock();
//关闭messenger
messenger->shutdown();
mds_lock.Lock();
//删除handle
if (hb) {
g_ceph_context->get_heartbeat_map()->remove_worker(hb);
hb = NULL;
}
}
/***********************************************************/
/*****************************admin socket asok******************************/
bool MDSRankDispatcher::handle_asok_command();
//剔除用户
void MDSRankDispatcher::evict_clients(const SessionFilter &filter, MCommand *m);
bool MDSRank::evict_client(int64_t session_id, bool wait, bool blacklist, std::stringstream& err_ss,Context *on_killed);
//dump用户session
void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f);
void MDSRankDispatcher::update_log_config();
Session *MDSRank::get_session(Message *m);
void MDSRank::command_scrub_path(Formatter *f, boost::string_view path, vector<string>& scrubop_vec);
void MDSRank::command_tag_path(Formatter *f, boost::string_view path, boost::string_view tag);
void MDSRank::command_flush_path(Formatter *f, boost::string_view path);
void MDSRank::command_flush_journal(Formatter *f);
void MDSRank::command_get_subtrees(Formatter *f);
void MDSRank::command_export_dir(Formatter *f, boost::string_view path, mds_rank_t target);
bool MDSRank::command_dirfrag_split(cmdmap_t cmdmap, std::ostream &ss);
bool MDSRank::command_dirfrag_merge(cmdmap_t cmdmap, std::ostream &ss);
bool MDSRank::command_dirfrag_ls(cmdmap_t cmdmap, std::ostream &ss, Formatter *f);
void MDSRank::dump_status(Formatter *f);
void MDSRank::dump_clientreplay_status(Formatter *f);
void MDSRank::create_logger();
/***************************************************************************/
/*****************************消息分发调度******************************/
bool MDSRankDispatcher::ms_dispatch(Message *m);
bool MDSRank::_dispatch(Message *m, bool new_msg)
{
//如果message不是mds发送过来,则直接返回
if (is_stale_message(m)) {
m->put();
return true;
}
//如果mds处于laggy状态,将消息放入waiting_for_nolaggy数组
if (beacon.is_laggy()) {
dout(10) << " laggy, deferring " << *m << dendl;
waiting_for_nolaggy.push_back(m);
}
//如果消息是新消息并且waiting_for_nolaggy数组不为空, 则放入waiting_for_nolaggy中
else if (new_msg && !waiting_for_nolaggy.empty()) {
dout(10) << " there are deferred messages, deferring " << *m << dendl;
waiting_for_nolaggy.push_back(m);
} else {
if (!handle_deferrable_message(m)) {
dout(0) << "unrecognized message " << *m << dendl;
return false;
}
heartbeat_reset();
}
...
//如果mds处于laggy状态,则直接返回
if (beacon.is_laggy()) {
// We've gone laggy during dispatch, don't do any
// more housekeeping
return true;
}
// done with all client replayed requests?
if (is_clientreplay() &&
mdcache->is_open() &&
replay_queue.empty() &&
beacon.get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
int num_requests = mdcache->get_num_client_requests();
dout(10) << " still have " << num_requests << " active replay requests" << dendl;
if (num_requests == 0)
clientreplay_done();
}
...
update_mlogger();
return true;
}
//延期待处理的消息
bool MDSRank::handle_deferrable_message(Message *m)
{
int port = m->get_type() & 0xff00;
switch (port) {
//cache类型消息,由mdcache处理
case MDS_PORT_CACHE:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
mdcache->dispatch(m);
break;
//migrator类型消息,由migrator处理
case MDS_PORT_MIGRATOR:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
mdcache->migrator->dispatch(m);
break;
default:
//client session,slave消息,由server处理
switch (m->get_type()) {
// SERVER
case CEPH_MSG_CLIENT_SESSION:
case CEPH_MSG_CLIENT_RECONNECT:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
// fall-thru
case CEPH_MSG_CLIENT_REQUEST:
server->dispatch(m);
break;
case MSG_MDS_SLAVE_REQUEST:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
server->dispatch(m);
break;
//heartbeat消息,有balancer处理
case MSG_MDS_HEARTBEAT:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
balancer->proc_message(m);
break;
case MSG_MDS_TABLE_REQUEST:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
{
MMDSTableRequest *req = static_cast<MMDSTableRequest*>(m);
if (req->op < 0) {
MDSTableClient *client = get_table_client(req->table);
client->handle_request(req);
} else {
MDSTableServer *server = get_table_server(req->table);
server->handle_request(req);
}
}
break;
//lock消息,由locker处理
case MSG_MDS_LOCK:
case MSG_MDS_INODEFILECAPS:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
locker->dispatch(m);
break;
//client caps消息,由locker处理
case CEPH_MSG_CLIENT_CAPS:
case CEPH_MSG_CLIENT_CAPRELEASE:
case CEPH_MSG_CLIENT_LEASE:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
locker->dispatch(m);
break;
default:
return false;
}
}
return true;
}
void MDSRank::_advance_queues();
void MDSRank::heartbeat_reset();
/******************************************************************/
/*****************************消息发送******************************/
void MDSRank::send_message(Message *m, Connection *c);
void MDSRank::send_message_mds(Message *m, mds_rank_t mds);
void MDSRank::forward_message_mds(Message *m, mds_rank_t mds);
void MDSRank::send_message_client_counted(Message *m, client_t client);
void MDSRank::send_message_client_counted(Message *m, Connection *connection);
void MDSRank::send_message_client_counted(Message *m, Session *session);
void MDSRank::send_message_client(Message *m, Session *session);
/******************************************************************/
/*****************************类成员相关******************************/
int64_t MDSRank::get_metadata_pool();
MDSTableClient *MDSRank::get_table_client(int t);
MDSTableServer *MDSRank::get_table_server(int t);
utime_t MDSRank::get_laggy_until();
void MDSRank::request_state(MDSMap::DaemonState s);
/*******************************************************************/
/*****************************MDSRank状态相关******************************/
//自杀
void MDSRank::suicide();
//重生
void MDSRank::respawn();
//损坏
void MDSRank::damaged();
void MDSRank::damaged_unlocked();
void MDSRank::handle_write_error(int err)
{
//如果错误为-EBLACKLISTED,则重启MDS
if (err == -EBLACKLISTED) {
derr << "we have been blacklisted (fenced), respawning..." << dendl;
respawn();
return;
}
//如果mds_action_on_write_error大于等于2,则重启MDS
if (g_conf->mds_action_on_write_error >= 2) {
derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
respawn();
} else if (g_conf->mds_action_on_write_error == 1) {
derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
mdcache->force_readonly();
} else {
// ignore;
derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
}
}
//消息是否来着mds
bool MDSRank::is_stale_message(Message *m);
/********************************************************************/
/*****************************ProgressThread相关******************************/
void *MDSRank::ProgressThread::entry();
void MDSRank::ProgressThread::shutdown();
/***************************************************************************/
/*****************************boot相关******************************/
void MDSRank::boot_start(BootStep step, int r);
void MDSRank::validate_sessions();
void MDSRank::starting_done();
void MDSRank::boot_create();
oid MDSRank::creating_done();
/*****************************boot相关******************************/
/*****************************replay相关******************************/
void MDSRank::calc_recovery_set();
void MDSRank::replay_start();
void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos);
void MDSRank::standby_replay_restart();
void MDSRank::replay_done();
/*******************************************************************/
/*****************************resolve相关******************************/
void MDSRank::reopen_log();
void MDSRank::resolve_start();
void MDSRank::resolve_done();
/*********************************************************************/
/*****************************reconnect相关******************************/
void MDSRank::reconnect_start();
void MDSRank::reconnect_done();
/***********************************************************************/
/*****************************rejoin相关******************************/
void MDSRank::rejoin_joint_start();
void MDSRank::rejoin_start();
void MDSRank::rejoin_done();
/********************************************************************/
/*****************************clientreplay相关******************************/
void MDSRank::clientreplay_start();
bool MDSRank::queue_one_replay();
void MDSRank::clientreplay_done();
/*************************************************************************/
/*****************************active相关******************************/
void MDSRank::active_start();
/********************************************************************/
/*****************************recovery相关******************************/
oid MDSRank::recovery_done(int oldstate);
/**********************************************************************/
/*****************************creating_相关******************************/
void MDSRank::creating_done();
/***********************************************************************/
/*****************************stopping相关******************************/
void MDSRank::stopping_start();
void MDSRank::stopping_done();
/**********************************************************************/
/*****************************handle_mds_map相关******************************/
void MDSRankDispatcher::handle_mds_map(MMDSMap *m, MDSMap *oldmap);
void MDSRank::handle_mds_recovery(mds_rank_t who);
void MDSRank::handle_mds_failure(mds_rank_t who);
/***************************************************************************/