文章目录
- 固定的metrics
- Topic相关的metrics
- Statestore topic介绍
- Topic update的metrics
- Metrics更新
- Topic update总计metrics
- 小结
本文主要梳理一下Impala的“statestore-subscriber”相关的metrics,这类metrics主要是在catalog和impalad上存在。目前主要分为两种类型,下面来简单看一下。
固定的metrics
第一类就是几个固定的metrics,如下所示:
代码语言:javascript复制statestore-subscriber.connected
statestore-subscriber.heartbeat-interval-time
statestore-subscriber.last-recovery-duration
statestore-subscriber.last-recovery-time
statestore-subscriber.num-connection-failures
这些metrics都是在statestore的subscriber启动之后直接注册的。我们可以直接从web页面看到这些metrics的描述,也比较简单,不是本文的重点,这里不再展开:
我们这里主要来看一下第二类metrics。
Topic相关的metrics
第二类metrics主要是topic相关的,在介绍这类metrics之前,我们先简单看下目前Impala的几种metrics。
Statestore topic介绍
目前Impala一共有三类topic,subscriber可以向statestore注册,如下所示:
分别是:
- impala-membership,成员信息同步,包括各个backends的相关信息;
- impala-request-queue,资源队列相关的信息;
- catalog-update,元数据相关的信息;
关于每个topic的详细信息,这里不再展开,后续有机会再详细介绍一下。当各个节点启动之后,就会注册相应的topic:
代码语言:javascript复制//catalog-server.cc
Status status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
/* is_transient=*/ false, /* populate_min_subscriber_topic_version=*/ false,
filter_prefix, cb);
//impala-server.cc
ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
/* populate_min_subscriber_topic_version=*/ true,
filter_prefix, catalog_cb));
//admission-controller.cc
Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC,
/* is_transient=*/true, /* populate_min_subscriber_topic_version=*/false,
/* filter_prefix=*/"", cb);
//cluster-membership-mgr.cc
Status status = statestore_subscriber_->AddTopic(
Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
/* populate_min_subscriber_topic_version=*/ false,
/* filter_prefix= */"", cb);
可以看到,这里impalad(coordinator角色才会注册,executro则不会)和catalogd都注册了catalog-update,另外两个topic,只有impalad会注册。
Topic update的metrics
接下来我们就看下topic update相关的metrics。我们可以在metrics.json文件中找到这两类模板,如下所示:
代码语言:javascript复制statestore-subscriber.topic-$0.processing-time-s
statestore-subscriber.topic-$0.update-interval
从描述来看,就是topic update的处理时间和时间间隔。下面我们就结合代码来实际来看一下:
代码语言:javascript复制//statestore-subscriber.cc
// Template for metrics that measure the processing time for individual topics.
const string CALLBACK_METRIC_PATTERN = "statestore-subscriber.topic-$0.processing-time-s";
// Template for metrics that measure the interval between updates for individual topics.
const string UPDATE_INTERVAL_METRIC_PATTERN = "statestore-subscriber.topic-$0.update-interval";
topic的注册流程代码如下所示:
代码语言:javascript复制//statestore-subscriber.cc
Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
bool is_transient, bool populate_min_subscriber_topic_version,
string filter_prefix, const UpdateCallback& callback) {
//省略部分代码
if (registration.processing_time_metric == nullptr) {
registration.processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
CALLBACK_METRIC_PATTERN, topic_id);
registration.update_interval_metric = StatsMetric<double>::CreateAndRegister(metrics_,
UPDATE_INTERVAL_METRIC_PATTERN, topic_id);
registration.update_interval_timer.Start();
}
//省略部分代码
}
目前一共有三个topic,我们在上面已经介绍过了。对于coordinator节点来说,启动之后,针对这两类topic metrics模板,就会有6个具体的metrics,如下所示:
Metrics更新
主要的更新处理逻辑位于函数StatestoreSubscriber::UpdateState()中。该函数的主要功能就是当进程接受到statestore发来的topic update信息时,就调用该函数,进行实际的topic更新操作。由于该函数比较长,我们分成三个部分来看一下,如下所示:
代码语言:javascript复制//typedef std::map<Statestore::TopicId, TTopicDelta> TopicDeltaMap
//TopicDeltaMap& incoming_topic_deltas
vector<const TTopicDelta*> deltas_to_process;
for (auto& delta : incoming_topic_deltas) deltas_to_process.push_back(&delta.second);
sort(deltas_to_process.begin(), deltas_to_process.end(),
[](const TTopicDelta* left, const TTopicDelta* right) {
return left->topic_name < right->topic_name;
});
vector<unique_lock<mutex>> topic_update_locks(deltas_to_process.size());
//省略部分代码
for (int i = 0; i < deltas_to_process.size(); i) {
const TTopicDelta& delta = *deltas_to_process[i];
auto it = topic_registrations_.find(delta.topic_name);
TopicRegistration& registration = it->second;
unique_lock<mutex> ul(registration.update_lock, std::try_to_lock);
//省略部分代码
double interval =
registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0);
registration.update_interval_metric->Update(interval);
//省略部分代码
topic_update_locks[i].swap(ul);
}
一个TTopicDelta对象表示的就是一个topic的相关update信息。第一部分就是将接收到的topic update数据放到一个vector当中,然后循环更新topic的update_interval_metric,这个成员也就是上面提到的update-interval。所以,这个metric的含义就是表示,从上次处理完成topic update,到本次接收到topic update,这中间的时间间隔。update_interval_timer变量就是用来追踪topic update的,该变量在上面注册topic的时候就会启动,每次处理完之后会重置,这个我们会在下面提到。更新完成后,将当前这个topic对应锁加入到topic_update_locks中,后续处理topic update的时候,还会需要对该lock进行检查是否已经持有,也就是我们第二部分要讲的内容:
代码语言:javascript复制 MonotonicStopWatch sw;
sw.Start();
for (int i = 0; i < deltas_to_process.size(); i) {
if (!topic_update_locks[i].owns_lock()) continue;
const TTopicDelta& delta = *deltas_to_process[i];
auto it = topic_registrations_.find(delta.topic_name);
TopicRegistration& registration = it->second;
//省略部分代码
MonotonicStopWatch update_callback_sw;
update_callback_sw.Start();
for (const UpdateCallback& callback : registration.callbacks) {
callback(incoming_topic_deltas, subscriber_topic_updates);
}
update_callback_sw.Stop();
registration.current_topic_version = delta.to_version;
registration.processing_time_metric->Update(
sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
}
第二部分就是实际处理对应的topic update,主要就是调用topic对应的callback函数。例如对于coordinator的catalog-update函数,就要调用ImpalaServer::CatalogUpdateCallback()函数。这个callback也是在注册topic的时候指定的。处理完该topic对应的update时,就会更新processing_time_metric变量,也就是上面对应的update-interval。所以说,该metric记录的就是每个topic在进行update时,实际消耗的时间。接下来看下第三部分:
代码语言:javascript复制 for (int i = 0; i < deltas_to_process.size(); i) {
if (!topic_update_locks[i].owns_lock()) continue;
const TTopicDelta& delta = *deltas_to_process[i];
auto it = topic_registrations_.find(delta.topic_name);
TopicRegistration& registration = it->second;
registration.update_interval_timer.Reset();
}
sw.Stop();
topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
第三部分的逻辑比较简单,就是通过循环处理将每个topic对应的update_interval_timer变量进行重置,主要就是为了下次统计topic update的时间间隔。 也就是说,经过一次UpdateState()函数调用之后,本次从statestore发过来的topic update都会进行处理,并且每个topic对应的metrics都会进行更新。
Topic update总计metrics
最后还有两个总计的metrics,如下所示:
代码语言:javascript复制statestore-subscriber.topic-update-duration
statestore-subscriber.topic-update-interval-time
这两个metrics是针对所有topic的处理时间和时间间隔的更新。注册代码如下所示:
代码语言:javascript复制//statestore-subscriber.cc
topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
"statestore-subscriber.topic-update-interval-time");
topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
"statestore-subscriber.topic-update-duration");
注册完成之后,也会在UpdateState()函数中进行更新。对于“statestore-subscriber.topic-update-interval-time”,这个metric的更新时机如下所示:
代码语言:javascript复制for (int i = 0; i < deltas_to_process.size(); i) {
//省略部分代码
double interval =
registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0);
registration.update_interval_metric->Update(interval);
topic_update_interval_metric_->Update(interval);
//省略部分代码
}
可以看到,在对每一个topic update进行时间间隔统计的时候,也会对这个metric进行更新。也就是说,这个metric包含了三个topic的更新时间间隔信息。 关于“statestore-subscriber.topic-update-duration”可以在上一节的第三部分代码最末端看到。也就是说,每次处理完本轮所有topic update的时候,才会将该metric进行更新。
小结
到这里,关于“statestore-subscriber”相关的metrics就介绍的差不多了。这一系列的metrics相对比较简单,都是在StatestoreSubscriber::UpdateState()这个函数中进行更新的,反映的是当前subscriber在处理topic update时的一些负载情况,我们可以根据这些metrics,来排查一下topic同步慢的问题,比如元数据过期等。