Impala metrics之statestore-subscriber

2022-05-20 08:50:24 浏览数 (2)

文章目录

    • 固定的metrics
    • Topic相关的metrics
      • Statestore topic介绍
      • Topic update的metrics
        • Metrics更新
        • Topic update总计metrics
    • 小结

本文主要梳理一下Impala的“statestore-subscriber”相关的metrics,这类metrics主要是在catalog和impalad上存在。目前主要分为两种类型,下面来简单看一下。

固定的metrics

第一类就是几个固定的metrics,如下所示:

代码语言:javascript复制
statestore-subscriber.connected
statestore-subscriber.heartbeat-interval-time
statestore-subscriber.last-recovery-duration
statestore-subscriber.last-recovery-time
statestore-subscriber.num-connection-failures

这些metrics都是在statestore的subscriber启动之后直接注册的。我们可以直接从web页面看到这些metrics的描述,也比较简单,不是本文的重点,这里不再展开:

我们这里主要来看一下第二类metrics。

Topic相关的metrics

第二类metrics主要是topic相关的,在介绍这类metrics之前,我们先简单看下目前Impala的几种metrics。

Statestore topic介绍

目前Impala一共有三类topic,subscriber可以向statestore注册,如下所示:

分别是:

  • impala-membership,成员信息同步,包括各个backends的相关信息;
  • impala-request-queue,资源队列相关的信息;
  • catalog-update,元数据相关的信息;

关于每个topic的详细信息,这里不再展开,后续有机会再详细介绍一下。当各个节点启动之后,就会注册相应的topic:

代码语言:javascript复制
//catalog-server.cc
Status status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
    /* is_transient=*/ false, /* populate_min_subscriber_topic_version=*/ false,
    filter_prefix, cb);
//impala-server.cc
ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
   CatalogServer::IMPALA_CATALOG_TOPIC, /* is_transient=*/ true,
   /* populate_min_subscriber_topic_version=*/ true,
   filter_prefix, catalog_cb));
    
//admission-controller.cc
Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC,
   /* is_transient=*/true, /* populate_min_subscriber_topic_version=*/false,
   /* filter_prefix=*/"", cb);

//cluster-membership-mgr.cc
Status status = statestore_subscriber_->AddTopic(
    Statestore::IMPALA_MEMBERSHIP_TOPIC, /* is_transient=*/ true,
    /* populate_min_subscriber_topic_version=*/ false,
    /* filter_prefix= */"", cb);

可以看到,这里impalad(coordinator角色才会注册,executro则不会)和catalogd都注册了catalog-update,另外两个topic,只有impalad会注册。

Topic update的metrics

接下来我们就看下topic update相关的metrics。我们可以在metrics.json文件中找到这两类模板,如下所示:

代码语言:javascript复制
statestore-subscriber.topic-$0.processing-time-s
statestore-subscriber.topic-$0.update-interval

从描述来看,就是topic update的处理时间和时间间隔。下面我们就结合代码来实际来看一下:

代码语言:javascript复制
//statestore-subscriber.cc
// Template for metrics that measure the processing time for individual topics.
const string CALLBACK_METRIC_PATTERN = "statestore-subscriber.topic-$0.processing-time-s";

// Template for metrics that measure the interval between updates for individual topics.
const string UPDATE_INTERVAL_METRIC_PATTERN = "statestore-subscriber.topic-$0.update-interval";

topic的注册流程代码如下所示:

代码语言:javascript复制
//statestore-subscriber.cc
Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
    bool is_transient, bool populate_min_subscriber_topic_version,
    string filter_prefix, const UpdateCallback& callback) {
  //省略部分代码
  if (registration.processing_time_metric == nullptr) {
    registration.processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
        CALLBACK_METRIC_PATTERN, topic_id);
    registration.update_interval_metric = StatsMetric<double>::CreateAndRegister(metrics_,
        UPDATE_INTERVAL_METRIC_PATTERN, topic_id);
    registration.update_interval_timer.Start();
  }
  //省略部分代码
}

目前一共有三个topic,我们在上面已经介绍过了。对于coordinator节点来说,启动之后,针对这两类topic metrics模板,就会有6个具体的metrics,如下所示:

Metrics更新

主要的更新处理逻辑位于函数StatestoreSubscriber::UpdateState()中。该函数的主要功能就是当进程接受到statestore发来的topic update信息时,就调用该函数,进行实际的topic更新操作。由于该函数比较长,我们分成三个部分来看一下,如下所示:

代码语言:javascript复制
//typedef std::map<Statestore::TopicId, TTopicDelta> TopicDeltaMap
//TopicDeltaMap& incoming_topic_deltas
  vector<const TTopicDelta*> deltas_to_process;
  for (auto& delta : incoming_topic_deltas) deltas_to_process.push_back(&delta.second);
  sort(deltas_to_process.begin(), deltas_to_process.end(),
    [](const TTopicDelta* left, const TTopicDelta* right) {
      return left->topic_name < right->topic_name;
    });
  vector<unique_lock<mutex>> topic_update_locks(deltas_to_process.size());
  //省略部分代码
  for (int i = 0; i < deltas_to_process.size();   i) {
    const TTopicDelta& delta = *deltas_to_process[i];
    auto it = topic_registrations_.find(delta.topic_name);
    TopicRegistration& registration = it->second;
    unique_lock<mutex> ul(registration.update_lock, std::try_to_lock);
    //省略部分代码
    double interval =
        registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0);
    registration.update_interval_metric->Update(interval);
    //省略部分代码
    topic_update_locks[i].swap(ul);
  }

一个TTopicDelta对象表示的就是一个topic的相关update信息。第一部分就是将接收到的topic update数据放到一个vector当中,然后循环更新topic的update_interval_metric,这个成员也就是上面提到的update-interval。所以,这个metric的含义就是表示,从上次处理完成topic update,到本次接收到topic update,这中间的时间间隔。update_interval_timer变量就是用来追踪topic update的,该变量在上面注册topic的时候就会启动,每次处理完之后会重置,这个我们会在下面提到。更新完成后,将当前这个topic对应锁加入到topic_update_locks中,后续处理topic update的时候,还会需要对该lock进行检查是否已经持有,也就是我们第二部分要讲的内容:

代码语言:javascript复制
  MonotonicStopWatch sw;
  sw.Start();
  for (int i = 0; i < deltas_to_process.size();   i) {
    if (!topic_update_locks[i].owns_lock()) continue;
    const TTopicDelta& delta = *deltas_to_process[i];
    auto it = topic_registrations_.find(delta.topic_name);
    TopicRegistration& registration = it->second;
    //省略部分代码
    MonotonicStopWatch update_callback_sw;
    update_callback_sw.Start();
    for (const UpdateCallback& callback : registration.callbacks) {
      callback(incoming_topic_deltas, subscriber_topic_updates);
    }
    update_callback_sw.Stop();
    registration.current_topic_version = delta.to_version;
    registration.processing_time_metric->Update(
        sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
  }

第二部分就是实际处理对应的topic update,主要就是调用topic对应的callback函数。例如对于coordinator的catalog-update函数,就要调用ImpalaServer::CatalogUpdateCallback()函数。这个callback也是在注册topic的时候指定的。处理完该topic对应的update时,就会更新processing_time_metric变量,也就是上面对应的update-interval。所以说,该metric记录的就是每个topic在进行update时,实际消耗的时间。接下来看下第三部分:

代码语言:javascript复制
  for (int i = 0; i < deltas_to_process.size();   i) {
    if (!topic_update_locks[i].owns_lock()) continue;

    const TTopicDelta& delta = *deltas_to_process[i];
    auto it = topic_registrations_.find(delta.topic_name);
    TopicRegistration& registration = it->second;
    registration.update_interval_timer.Reset();
  }
  sw.Stop();
  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));

第三部分的逻辑比较简单,就是通过循环处理将每个topic对应的update_interval_timer变量进行重置,主要就是为了下次统计topic update的时间间隔。 也就是说,经过一次UpdateState()函数调用之后,本次从statestore发过来的topic update都会进行处理,并且每个topic对应的metrics都会进行更新。

Topic update总计metrics

最后还有两个总计的metrics,如下所示:

代码语言:javascript复制
statestore-subscriber.topic-update-duration
statestore-subscriber.topic-update-interval-time

这两个metrics是针对所有topic的处理时间和时间间隔的更新。注册代码如下所示:

代码语言:javascript复制
//statestore-subscriber.cc
  topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
      "statestore-subscriber.topic-update-interval-time");
  topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
      "statestore-subscriber.topic-update-duration");

注册完成之后,也会在UpdateState()函数中进行更新。对于“statestore-subscriber.topic-update-interval-time”,这个metric的更新时机如下所示:

代码语言:javascript复制
for (int i = 0; i < deltas_to_process.size();   i) {
  //省略部分代码
  double interval =
      registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0);
  registration.update_interval_metric->Update(interval);
  topic_update_interval_metric_->Update(interval);
  //省略部分代码
}

可以看到,在对每一个topic update进行时间间隔统计的时候,也会对这个metric进行更新。也就是说,这个metric包含了三个topic的更新时间间隔信息。 关于“statestore-subscriber.topic-update-duration”可以在上一节的第三部分代码最末端看到。也就是说,每次处理完本轮所有topic update的时候,才会将该metric进行更新。

小结

到这里,关于“statestore-subscriber”相关的metrics就介绍的差不多了。这一系列的metrics相对比较简单,都是在StatestoreSubscriber::UpdateState()这个函数中进行更新的,反映的是当前subscriber在处理topic update时的一些负载情况,我们可以根据这些metrics,来排查一下topic同步慢的问题,比如元数据过期等。

0 人点赞