【说在前面的话】
在前一篇文章《实时性迷思(5)——实战RTOS多任务性能分析》中,我们介绍了如何在多任务环境下利用 perf_counter “排除多任务穿插的影响”——精确测量某一任务中指定代码片消耗CPU周期数的方法。还没有阅读过这篇文章的小伙伴可以单击这里,今天的内容将在这一基础上继续深入。
在实际应用中,很多数据处理的过程(或者是算法)基本都是由多个步骤构成,假设我们把每个步骤都“简单粗暴”的看做(或者放入)一个函数中的话,为了方便今天的讨论,它们可以被简化为如下的形式:
代码语言:javascript复制step_1();
step_2();
...
step_n();
如果这些步骤总是
“我们说好不分离~要一直一直在一起~”
那么测量起来就非常简单:
- 在裸机中,我们可以使用 __cycleof__():
__cycleof__() {
step_1();
step_2();
...
step_n();
}
也可以简单的使用系统提供的API函数:
代码语言:javascript复制start_cycle_counter();
do {
step_1();
step_2();
...
step_n();
} while(0);
int32_t iCycleUsed = stop_cycle_counter();
- 在RTOS环境下,我们可以使用上一篇文章介绍过的专用函数 start_task_cycle_counter() 和 stop_task_cycle_counter()来获取周期数信息:
void example_task (void *argument)
{
init_task_cycle_counter();
...
start_task_cycle_counter();
do {
step_1();
step_2();
...
step_n();
} while(0);
int64_t lCycleUsed = stop_task_cyclee_counter();
...
}
如果你还不清楚问题的全貌,不妨看下面几张图:
一开始,在裸机中所有的步骤都是在一起的:
(理论上中断一关)我们就可以轻松的测量多个步骤所使用的CPU周期数,此时测量是连续进行的。当我们进入多任务环境时,虽然多个步骤仍然集中在同一个任务里,但由于“任务调度的存在”,实际上的情况就变得复杂起来:
借助每个任务独立的 start_task_cycle_counter() 和 stop_task_cycle_counter() 我们得以“无视”任务调度带来的影响——只专注于本任务内指定范围的代码块所消耗的CPU时间。
可是,现实是残酷的,发誓要“一直一直在一起”的“好朋友”也难免要各奔东西,更何况是同一个数据处理中的不同步骤呢?——各种各样的原因都会促使多任务应用设计时将不同的步骤分散到不同的任务中,比如:
- 不同的步骤拥有不同的实时性要求
- 不同的步骤处于不同的模块中
- 不同的步骤处于不同的安全域中
- 考虑到未来扩展的需要,认为的需要将步骤拆散并放置到不同的任务中
- 不同的步骤处于数据流的不同位置
- ……
此时,我们又该如何简单的测量这些分散在不同任务中的步骤所消耗的总CPU周期数呢?
【“活干完了告诉我,我先睡会儿”】
在简单的多任务合作模式中,如下图所示的“主从合作模式”是最常见的异步工作模式:
这里:
- 从任务的目的是从主任务那里分担一部分的工作;
- 从任务会在任务启动后完成必要的准备工作后就开始等候来自于主任务的信号量,此时,从任务是处于挂起状态;
- 一般来说,从任务应该在主任务发送信号量之前就完成所有的准备工作——否则,主任务就有“触发了个寂寞”的风险;
- 当从任务接收到来自主任务的信号量后,将从挂起状态中唤醒,开始正常工作;
- 当从任务完成本职工作后,会向主任务发送一个信号量——告诉它“你交代的事情我已经做完啦,我先睡会儿,有事您说话”——然后等待下一次主任务的信号量,并由此进入挂起状态。
- 主任务一般会在对应的阶段向从任务发送信号量,以启动异步处理,这就好比是对从任务说:“来活儿了,快醒醒,我先睡会儿”,然后就进入挂起状态——等待来自从任务的完成信号。
如果觉得上述步骤比较抽象,不妨来看一个实际的例子。假设一个数据处理可以被拆分成三个步骤——为了简化讨论,分别由三个函数 step_1()、step_2()和 step_3() 表示:
代码语言:javascript复制void step_1(void)
{
delay_ms(1);
}
void step_2(void)
{
delay_ms(2);
}
void step_3(void)
{
delay_ms(3);
}
这里,每个步骤都是用由 perf_counter提供的 delay_ms() 函数来模拟一个任务负载(注意:该 delay_ms() 函数不会引发RTOS的任务调度)。
假设三个步骤需要在一个任务中以10ms为间隔周期性的进行执行(以CMSIS-RTOS2的API为范例):
代码语言:javascript复制osThreadId_t s_tidTaskA;
osThreadId_t s_tidTaskB;
void task_a (void *argument)
{
...
while (1) {
// 获取本次循环开始时的系统毫秒数
uint32_t wTick = osKernelGetTickCount();
step_1();
step_2();
step_3();
//!< 100Hz 的周期性任务
osDelayUntil(wTick 10);
}
}
根据上一篇文章的内容,我们可以很容易的测量出三个步骤的CPU占用率:
代码语言:javascript复制osThreadId_t s_tidTaskA;
osThreadId_t s_tidTaskB;
void task_a (void *argument)
{
init_task_cycle_counter();
...
__super_loop_monitor__(100) {
// 获取本次循环开始时的系统毫秒数
uint32_t wTick = osKernelGetTickCount();
step_1();
step_2();
step_3();
//!< 100Hz 的周期性任务
osDelayUntil(wTick 10);
}
}
由于三个步骤中负载所占用的时间分别为 1ms、2ms、3ms,因此在10ms的循环周期中,容易计算出这里的CPU占用率为60%,而perf_counter的测量结果也应征了这一结论(100次循环的平均结果):
假设处于某种原因,步骤2必须要放置到一个独立的从任务中执行,根据前面的描述,对应的代码为:
代码语言:javascript复制osThreadId_t s_tidTaskA;
osThreadId_t s_tidTaskB;
void task_a (void *argument)
{
init_task_cycle_counter();
...
__super_loop_monitor__(100) {
uint32_t wTick = osKernelGetTickCount();
step_1();
//! 向从任务发送信号量,催其起床
osThreadFlagsSet(s_tidTaskB, 0x0001);
//! 等待从任务完成,挂起当前任务
osThreadFlagsWait(0x0002, osFlagsWaitAll, osWaitForever);
step_3();
//!< 100Hz 的周期性任务
osDelayUntil(wTick 10);
}
}
void task_b (void *argument)
{
init_task_cycle_counter();
...
while(1) {
//! 等待来自主任务的信号量
osThreadFlagsWait(0x0001, osFlagsWaitAll, osWaitForever);
//! 干活
step_2();
//! 向主任务发送完成信号
osThreadFlagsSet(s_tidTaskA, 0x0002);
}
}
这里有两点需要注意:
1、为了保证主任务不会“触发了个寂寞”,task_b()需要先于task_a()启动,(或者最保险的方式是将从任务的优先级设置的“大于等于”主任务。)比如:
代码语言:javascript复制int main (void) {
// System Initialization
SystemCoreClockUpdate();
osKernelInitialize(); // Initialize CMSIS-RTOS
init_cycle_counter(true);
s_tidTaskB = osThreadNew(task_b, NULL, NULL);
s_tidTaskA = osThreadNew(task_a, NULL, NULL);
...
if (osKernelGetState() == osKernelReady) {
osKernelStart(); // Start thread execution
}
while(1);
}
2、别忘在两个任务的一开始使用 init_task_cycle_counter() 初始化对应任务的 cycle counter。
然而,经过上述修改后,我们发现实际测量到的 CPU 占用率为 40%:
显然,该值由主任务中 step_1() 的 1ms 和 step_3() 的 3ms 构成,而从任务中 step_2() 所消耗的时间则没有比计算在内——这就是跨任务周期计数的问题所在。
为了应对这一问题,perf_counter 专门引入了可以跨越多个任务进行计数的计数器类 task_cycle_info_t,配合对应的方法(API函数)使用:
- 构造函数(初始化函数):init_task_cycle_info()
/*! brief intialize a given task_cycle_info_t object and enable it
*/
extern
task_cycle_info_t *
init_task_cycle_info(task_cycle_info_t *ptInfo);
- 在“涉事”任务内使用的注册和反注册:register_task_cycle_agent() 和 unregister_task_cycle_agent():
/*! brief register a global virtual cycle counter agent to the current task
*!
*! note the ptAgent it is better to be allocated as a static variable, global
*! variable or comes from heap or pool
*/
extern
task_cycle_info_agent_t *
register_task_cycle_agent(
task_cycle_info_t *ptInfo,
task_cycle_info_agent_t *ptAgent);
/*! brief remove a global virtual cycle counter agent from the current task
*/
extern
task_cycle_info_agent_t *
unregister_task_cycle_agent(task_cycle_info_agent_t *ptAgent);
- 在 perf_counter 的头文件中还能找到其它更为精细控制的方法,比如“使能开关相关的函数”等等,这里就不再赘述。
task_cycle_info_t 类的使用也非常简单:
1、以静态(或者堆、池)分配的方式获得一个 task_cycle_info_t 类的实例。比如定义一个静态变量:
代码语言:javascript复制static task_cycel_info_t s_tMyCycleInfo;
2、在所有相关的任务启动前,对其进行初始化(完成构造):
代码语言:javascript复制int main (void)
{
// System Initialization
SystemCoreClockUpdate();
osKernelInitialize(); // Initialize CMSIS-RTOS
init_cycle_counter(true);
init_task_cycle_info(&s_tMyCycleInfo);
s_tidTaskB = osThreadNew(task_b, NULL, NULL);
s_tidTaskA = osThreadNew(task_a, NULL, NULL);
if (osKernelGetState() == osKernelReady) {
osKernelStart(); // Start thread execution
}
while(1);
}
3、在所有“涉事”任务中,调用函数 register_task_cycle_agent() 注册我们的计数器实例,比如:
代码语言:javascript复制void task_a (void *argument)
{
int64_t lTimeElapsed;
init_task_cycle_counter();
task_cycle_info_agent_t tCycleInfoAgent;
register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
start_task_cycle_counter(&s_tMyCycleInfo);
...
}
void task_b (void *argument)
{
init_task_cycle_counter();
task_cycle_info_agent_t tCycleInfoAgent;
register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
while(1) {
osThreadFlagsWait(0x0001, osFlagsWaitAll, osWaitForever);
step_2();
osThreadFlagsSet(s_tidTaskA, 0x0002);
}
}
这里需要注意:
- 前面的例子中,涉事的任务是 task_a() 和 task_b() 因此,这两个任务函数在完成了 init_task_cycle_counter() 后,都要调用 register_task_cycle_agent() 函数来注册 s_tMyCycleInfo;
- 注册时,需要借助一个 task_cycle_info_agent_t 的链表容器,帮助我们将 task_cycle_info_t 的实例加入到 每个任务自己的计数器链表中。这实际上也告诉我们,一个任务可以同时挂载多个不同的 task_cycle_info_t 实例——换句话说:每个任务都能同时服务多个不同目的的跨任务计数器,是不是很强大?
task_cycle_info_agent_t tCycleInfoAgent;
register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
4、在开始计数时,通过 start_task_cycle_counter() 来启动我们的计数器:
代码语言:javascript复制start_task_cycle_counter(&s_tMyCycleInfo);
同理,在需要获得计数结果的时候,调用 stop_task_cycle_counter() 来获取计数结果:
代码语言:javascript复制int64_t lCycleUsed = stop_task_cycle_counter(&s_tMyCycleInfo);
这里,细心的小伙伴多半会注意到:这两个函数之前使用的时候不是不需要传递参数么?为什么又可以传递 task_cycle_info_t 类型的指针作为参数呢?其实这里使用了一个此前介绍过的技巧,还不太了解的小伙伴,可以参考这篇文章《【为宏正名】99%人都不知道的"##"里用法》,这里就不再赘述:
代码语言:javascript复制
extern
void __start_task_cycle_counter(task_cycle_info_t *ptInfo);
extern
int64_t __stop_task_cycle_counter(task_cycle_info_t *ptInfo);
#define start_task_cycle_counter(...)
__start_task_cycle_counter((NULL,##__VA_ARGS__))
#define stop_task_cycle_counter(...)
__stop_task_cycle_counter((NULL,##__VA_ARGS__))
对应前面的例子,一个完整的示例代码如下:
代码语言:javascript复制void step_1(void)
{
delay_ms(1);
}
void step_2(void)
{
delay_ms(2);
}
void step_3(void)
{
delay_ms(3);
}
osThreadId_t s_tidTaskA;
osThreadId_t s_tidTaskB;
task_cycle_info_t s_tMyCycleInfo;
void task_a (void *argument)
{
int64_t lTimeElapsed;
init_task_cycle_counter();
task_cycle_info_agent_t tCycleInfoAgent;
register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
start_task_cycle_counter(&s_tMyCycleInfo);
__super_loop_monitor__(100,
{
lTimeElapsed = __cpu_usage__.lTimeElapsed;
int64_t lCycleUsed = stop_task_cycle_counter(&s_tMyCycleInfo);
printf("s_tMyCycleInfo CPU Usage %2.3f%%rn",
(float)((double)lCycleUsed* 100.0 /
(double)__cpu_usage__.lTimeElapsed));
start_task_cycle_counter(&s_tMyCycleInfo);
}) {
uint32_t wTick = osKernelGetTickCount();
step_1();
//! 向从任务发送信号量,催其起床
osThreadFlagsSet(s_tidTaskB, 0x0001);
//! 等待从任务完成,挂起当前任务
osThreadFlagsWait(0x0002, osFlagsWaitAll, osWaitForever);
step_3();
osDelayUntil(wTick 10); //!< 50Hz
}
}
void task_b (void *argument)
{
init_task_cycle_counter();
task_cycle_info_agent_t tCycleInfoAgent;
register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
while(1) {
//! 等待来自主任务的信号量
osThreadFlagsWait(0x0001, osFlagsWaitAll, osWaitForever);
//! 干活
step_2();
//! 向主任务发送完成信号
osThreadFlagsSet(s_tidTaskA, 0x0002);
}
}
int main (void)
{
// System Initialization
SystemCoreClockUpdate();
osKernelInitialize(); // Initialize CMSIS-RTOS
init_cycle_counter(true);
init_task_cycle_info(&s_tMyCycleInfo);
s_tidTaskB = osThreadNew(task_b, NULL, NULL);
s_tidTaskA = osThreadNew(task_a, NULL, NULL);
if (osKernelGetState() == osKernelReady) {
osKernelStart(); // Start thread execution
}
while(1);
}
运行结果如下:
可以看到,三个步骤的任务负载(1 2 3=6ms)都被计算在内
友情提示:如果你对 __super_loop_monitor__() 结构的用法感到迷惑,可以单击这里。
【流水线模式下的性能测量】
除了前面介绍的简单“主从”模式外,多任务环境下往往还存在另外一种类似流水线的多任务“接力”模式:
这种任务模式在“数据流图”上往往呈现“百川汇海”模式,有时候,我们需要沿着其中一条线索完成从源头到末端的性能分析,而它的“事件触发图”大体如下:
这种情况下,我们仍然可以使用 task_cycle_info_t 来测量整个工序的耗时(周期数),只不过需要注意的是:
- 我们要在工序开始的地方调用 start_task_cycle_counter()来开始计数;在工序结束的地方调用 stop_task_cycle_counter() 来获取测量结果。
- 由于 start_task_cycle_counter() 会清零计数器,因此要在源头处作必要的保护——防止在一次完整的测量结束前,过早的调用 start_task_cycle_counter()。利用RTOS所提供的互斥量,我们可以轻松的实现这一功能,这里就不再赘述。
- 测量的结果可以通过 SystemCoreClock 中保存的CPU工作频率换算成物理时间(ms或者us):
int64_t lCycleUsed = stop_task_cycle_counter(&s_tMyCycleInfo);
printf("Pipeline used %d ms", lCycleUsed / (SystemCoreClock / 1000) );
- 有时候,进行性能分析需要暂时性的(或者有条件的)关闭某一计数器,此时灵活使用 使能开关函数 对 task_cycle_info_t 对象进行操作就成为了关键。
【说在后面的话】
跨任务性能测量是 perf_counter 所提供的“拳头功能”,可以说目前在市面上针对Cortex-M的开源工具中,还鲜有类似的功能。虽然关注【裸机思维】公众号后,在后台发送关键字 "perf_counter" 就可以获得对应 CMSIS-Pack 的网盘链接和相关的教程,但作为一个Github上的开源项目,我还是希望喜欢该工具的小伙伴能给我一个宝贵的Star。复制下面的链接到浏览器,或者单击“阅读原文” 就可以找到该项目。
https://github.com/GorgonMeducer/perf_counter
谢谢啦。