Dataworks实践—关于Maxcompute运行日志说明

2024-08-13 11:34:30 浏览数 (1)

1 前言

我们在dataworks执行任何一段sql的时候都会在窗口下方看到不断滚动的日志,除了任务报错,大家会关注到它以外,其他情况下更多会被略过。但是真正想对SQL进行优化,想了解SQL运行的底层逻辑,了解日志是必不可少的。

2 一段具体sql的日志之旅

我们仍然以一段具体的生产环境的SQL为例,来引出我们的正文:

代码语言:sql复制
select z1.*
,case when all_cnt > 0 then round( delivered_cnt / all_cnt,4) else null end as delivered_pct
from (
select 
    y1.y_bt_of_timerk
    ,y1.m_bt_of_timerk
    ,y1.country_en
    ,substr(y1.time_rk,1,10) as time_rk
    ,count(distinct case when status_id='DELIVERED' then waybill_no else null end ) as delivered_cnt
    ,count(distinct waybill_no) as all_cnt
from (
    select
        m2.week_year_id AS y_bt_of_curweek
        ,m2.week_of_year AS m_bt_of_curweek
        ,to_char(dateadd(to_date(m2.start_week,'yyyy-mm-dd'),-1,'dd'),'yyyy-mm-dd') as start_week
        ,to_char(dateadd(to_date(m2.end_week,'yyyy-mm-dd'),-1,'dd'),'yyyy-mm-dd') as end_week
        ,m1.* 
        ,m4.week_year_id AS y_bt_of_timerk
        ,m4.week_of_year AS m_bt_of_timerk
    from (
        select 
            to_char(dateadd(CURRENT_TIMESTAMP(),-1,'dd'),'yyyy-mm-dd') as date_current
            ,t1.waybill_no
            ,t1.start_time
            ,t5.delivered_time
            ,t2.order_type as Country_EN
            ,t2.status_id
            ,t3.name as status_id_CN
            ,case when t2.status_id ='DELIVERED' then t5.delivered_time else t1.start_time end as time_rk
            --#,case when (substr(t5.delivered_time,1,10) = '1970-01-01' or substr(t5.delivered_time,1,10) = '1969-12-31' ) then t1.start_time else t5.delivered_time end as time_rk
        from yht_dws.dws_ilp_ar_billing_detail t1
        left join yht_dwd.dwd_uexpress_u_express_order_df  t2 on t1.waybill_no = t2.number
        and t2.pt='${bizdate}'
        left join yht_dw.ods_uexpress_order_status_df t3 on t3.id = t2.status_id    --#status_id_CN=订单状态_中文
        and t3.pt='${bizdate}'
        left join yht_dwd.dwd_ilp_trail_detail  t5 on t5.trail_ref1 = t1.waybill_no
        and t5.pt ='${bizdate}'
        where t1.pt='${bizdate}' and  t1.bussiness_type='QLL' 
        and substr(t1.start_time,1,4) = 2023 
        and t3.name in ('到达待取','退回仓库','清关中','清关完成','已签收','派送中','销毁包裹','航班抵达','运输中','已出库','包裹丢失','二次派送') 
        ) m1
    left join yht_dwd.dim_date m2 on m2.date=m1.date_current  --#定位当前周
    LEFT JOIN yht_dwd.dim_date m4  ON m4.date=to_char(dateadd(m1.time_rk,1,'dd'),'yyyy-mm-dd')     --#定位入库时间的当前周
    ) y1 
where ( y_bt_of_curweek = y_bt_of_timerk and m_bt_of_curweek = m_bt_of_timerk ) 
group by y1.y_bt_of_timerk
,y1.m_bt_of_timerk
,y1.country_en
,substr(y1.time_rk,1,10)
) z1

而运行的日志如下:

代码语言:sql复制
job0 is root job
In Job job0:
root Tasks: M6, M4, M3, M2, M1
J5_2_3_4 depends on: M2, M3, M4
J7_1_5_6 depends on: J5_2_3_4, M1, M6
R8_7 depends on: J7_1_5_6
In Task M2:
    Data source: yht_dwd.dwd_uexpress_u_express_order_df/pt=20230628
    TS: yht_dwd.dwd_uexpress_u_express_order_df/pt=20230628
        Statistics: Num rows: 2.2855571E7, Data size: 6.8566713E9
        FIL: AND(ISNOTNULL(number),ISNOTNULL(status_id))
            Statistics: Num rows: 1.851301251E7, Data size: 5.553903753000001E9
            RS: order:  
                nullDirection: *
                optimizeOrderBy: False
                valueDestLimit: 0
                dist: HASH
                keys:
                      number
                values:
                      number (string)
                      order_type (string)
                      status_id (string)
                partitions:
                      number
                Statistics: Num rows: 1.851301251E7, Data size: 5.553903753000001E9
In Task M3:
    Data source: yht_dws.dws_ilp_ar_billing_detail/pt=20230628
    TS: yht_dws.dws_ilp_ar_billing_detail/pt=20230628
        Statistics: Num rows: 1.5791014E7, Data size: 3.284530912E9
        SEL: start_time, waybill_no CONDITION: AND(EQ(bussiness_type,'QLL'),EQ(TODOUBLE(SUBSTR(TOSTRING(start_time),1L,4L)),2023.0),ISNOTNULL(waybill_no))
            Statistics: Num rows: 3552978.15, Data size: 3.837216402E8
            RS: order:  
                nullDirection: *
                optimizeOrderBy: False
                valueDestLimit: 0
                dist: HASH
                keys:
                      waybill_no
                values:
                      start_time (datetime)
                      waybill_no (string)
                partitions:
                      waybill_no
                Statistics: Num rows: 3552978.15, Data size: 3.837216402E8
In Task M4:
    Data source: yht_dw.ods_uexpress_order_status_df/pt=20230628
    TS: yht_dw.ods_uexpress_order_status_df/pt=20230628
        Statistics: Num rows: 27.0, Data size: 5400.0
        SEL: id CONDITION: AND(IN(name,'到达待取','退回仓库','清关中','清关完成','已签收','派送中','销毁包裹','航班抵达','运输中','已出库','包裹丢失','二次派送'),ISNOTNULL(id))
            Statistics: Num rows: 18.225, Data size: 1822.5000000000002
            RS: valueDestLimit: 0
                dist: BROADCAST
                keys:
                values:
                      id (string)
                partitions:
                Statistics: Num rows: 18.225, Data size: 1822.5000000000002
In Task J5_2_3_4:
    JOIN:
         StreamLineRead3 INNERJOIN StreamLineRead4
         keys:
             0:waybill_no
             1:number
         probeCandidate: StreamLineRead4
        Statistics: Num rows: 3.2888164469353332E13, Data size: 1.341837110349616E16
        HASHJOIN:
                 StreamLineRead2 INNERJOIN MergeJoin1
                 keys:
                     0:id
                     1:status_id
                 non-equals:
                     0:
                     1:
                 bigTable: MergeJoin1
            Statistics: Num rows: 2.9969339872698225E14, Data size: 1.22274906680608752E17
            RS: order:  
                nullDirection: *
                optimizeOrderBy: False
                valueDestLimit: 0
                dist: HASH
                keys:
                      waybill_no
                values:
                      start_time (datetime)
                      waybill_no (string)
                      order_type (string)
                      status_id (string)
                partitions:
                      waybill_no
                Statistics: Num rows: 2.9969339872698225E14, Data size: 9.2305566807910528E16
In Task M1:
    Data source: yht_dwd.dim_date
    TS: yht_dwd.dim_date
        Statistics: Num rows: 4748.0, Data size: 1424400.0
        SEL: week_of_year, week_year_id CONDITION: AND(EQ(date,'2023-07-03'),ISNOTNULL(week_of_year),ISNOTNULL(week_year_id))
            Statistics: Num rows: 1922.94, Data size: 384588.0
            RS: valueDestLimit: 0
                dist: BROADCAST
                keys:
                values:
                      week_of_year (string)
                      week_year_id (string)
                partitions:
                Statistics: Num rows: 1922.94, Data size: 384588.0
        SEL: date, week_of_year, week_year_id CONDITION: AND(ISNOTNULL(date),ISNOTNULL(week_of_year),ISNOTNULL(week_year_id))
            Statistics: Num rows: 3461.2920000000004, Data size: 1038387.6000000001
            RS: valueDestLimit: 0
                dist: BROADCAST
                keys:
                values:
                      date (string)
                      week_of_year (string)
                      week_year_id (string)
                partitions:
                Statistics: Num rows: 3461.2920000000004, Data size: 1038387.6000000001
In Task M6:
    Data source: yht_dwd.dwd_ilp_trail_detail/pt=20230628
    TS: yht_dwd.dwd_ilp_trail_detail/pt=20230628
        Statistics: Num rows: 1.3612384E7, Data size: 1.470137472E9
        FIL: ISNOTNULL(trail_ref1)
            Statistics: Num rows: 1.22511456E7, Data size: 1.3231237248E9
            RS: order:  
                nullDirection: *
                optimizeOrderBy: False
                valueDestLimit: 0
                dist: HASH
                keys:
                      trail_ref1
                values:
                      trail_ref1 (string)
                      delivered_time (datetime)
                partitions:
                      trail_ref1
                Statistics: Num rows: 1.22511456E7, Data size: 1.3231237248E9
In Task J7_1_5_6:
    JOIN:
         StreamLineRead1 RIGHTJOIN StreamLineRead5
         keys:
             0:trail_ref1
             1:waybill_no
         probeCandidate: StreamLineRead5
        Statistics: Num rows: 1.835793731581557E21, Data size: 7.636901923379278E23
        SEL: waybill_no, order_type country_en, status_id, WHEN(EQ(status_id,'DELIVERED'),delivered_time,start_time) time_rk
            Statistics: Num rows: 1.835793731581557E21, Data size: 5.654244693271196E23
            HASHJOIN:
                     Project1 INNERJOIN StreamLineRead6
                     keys:
                         0:
                         1:
                     non-equals:
                         0:
                         1:
                     bigTable: Project1
                Statistics: Num rows: 3.5301211982074395E24, Data size: 1.7933015686893794E27
                SEL: waybill_no, country_en, status_id, time_rk, week_of_year, week_year_id, $commonVariable8 _col8 [ $commonVariable8(lazy: true) = TO_CHAR(DATEADD(time_rk,1L,'dd'),'yyyy-mm-dd') ]  CONDITION: ISNOTNULL($commonVariable8)
                    Statistics: Num rows: 3.1771090783866956E24, Data size: 1.9316823196591108E27
                    HASHJOIN:
                             Calc4 INNERJOIN StreamLineRead7
                             keys:
                                 0:_col8,week_year_id,week_of_year
                                 1:date,week_year_id,week_of_year
                             non-equals:
                                 0:
                                 1:
                             bigTable: Calc4
                        Statistics: Num rows: 5.498451118073621E27, Data size: 4.992593615210848E30
                        SEL: week_year_id y_bt_of_timerk, week_of_year m_bt_of_timerk, country_en, SUBSTR(TOSTRING(time_rk),1L,10L) __gk_3, WHEN(EQ(status_id,'DELIVERED'),waybill_no,null) __agg_0_p0, waybill_no
                            Statistics: Num rows: 5.498451118073621E27, Data size: 3.299070670844173E30
                            AGGREGATE: group by:y_bt_of_timerk,m_bt_of_timerk,country_en,__gk_3
                             UDAF: COUNT(DISTINCT__agg_0_p0)[Deduplicate] __agg_0,COUNT(DISTINCTwaybill_no)[Deduplicate] __agg_1
                                Statistics: Num rows: 1.8830312048197332E27, Data size: 1.12981872289184E30
                                RS: order:     
                                    nullDirection: ****
                                    optimizeOrderBy: False
                                    valueDestLimit: 0
                                    dist: HASH
                                    keys:
                                          y_bt_of_timerk
                                          m_bt_of_timerk
                                          country_en
                                          __gk_3
                                    values:
                                          y_bt_of_timerk (string)
                                          m_bt_of_timerk (string)
                                          country_en (string)
                                          __gk_3 (string)
                                          __agg_0_p0 (string)
                                          waybill_no (string)
                                    partitions:
                                          y_bt_of_timerk
                                          m_bt_of_timerk
                                          country_en
                                          __gk_3
                                    Statistics: Num rows: 1.8830312048197332E27, Data size: 1.12981872289184E30
In Task R8_7:
    AGGREGATE: group by:y_bt_of_timerk,m_bt_of_timerk,country_en,__gk_3
     UDAF: COUNT(DISTINCT__agg_0_p0) (__agg_0_count)[Complete],COUNT(DISTINCTwaybill_no) (__agg_1_count)[Complete]
        Statistics: Num rows: 1.3746127795184053E27, Data size: 5.718389162796566E29
        SEL: y_bt_of_timerk, m_bt_of_timerk, country_en, __gk_3 time_rk, __agg_0 delivered_cnt, __agg_1 all_cnt, WHEN(GT(__agg_1,0L),ROUND(DIV(TODOUBLE(__agg_0),TODOUBLE(__agg_1)),4L),null) delivered_pct
            Statistics: Num rows: 1.3746127795184053E27, Data size: 5.828358185158038E29
            FS: output: Screen
                schema:
                  y_bt_of_timerk (string)
                  m_bt_of_timerk (string)
                  country_en (string)
                  time_rk (string)
                  delivered_cnt (bigint)
                  all_cnt (bigint)
                  delivered_pct (double)
                Statistics: Num rows: 1.3746127795184053E27, Data size: 5.828358185158038E29

想要读懂日志,第一,需要对一些日志中的术语进行熟悉;第二,需要对日志进行分组来一一击破。

2.1 运行日志(或logview)中的常见术语或者定义
  1. 一个MaxCompute Task可以由一个或多个Fuxi Job组成。例如,如果您提交的SQL任务较为复杂,MaxCompute将自动向分布式调度系统(Fuxi)提交多个Fuxi Job。
  2. 每个Fuxi Job可以由一个或多个Fuxi Task组成。例如,简单的MapReduce任务通常会产生两个Fuxi Task,即Map Task(M1)和Reduce Task(R2)。而一个比较复杂的SQL任务也可能产生多个Fuxi Task。
  3. 查看所产生的Fuxi Task的任务名称。任务名称一般由字母和数字组成,其中字母表示任务类型(例如,M代表Map Task),数字标识任务编号以及依赖关系(例如,R5_4表示该Reduce Task需要J4任务执行结束后才会开始执行;J4_1_2_3则表示该Join Task需要M1、M2、M3这三个任务都执行完成后才会开始执行)。
  4. I/O Records表示这个task的输入和输出的records数,R/W表示Task读取和写的行数。
  5. 以上述日志中的首段为例:
代码语言:txt复制
   job0 is root job
   In Job job0:
   root Tasks: M6, M4, M3, M2, M1
   J5_2_3_4 depends on: M2, M3, M4
   J7_1_5_6 depends on: J5_2_3_4, M1, M6
   R8_7 depends on: J7_1_5_6

术语含义以及语句释义:

M代表Map Task;R代表Reduce Task;J代表Join Task。

root Tasks 代表虚拟节点Job job0下的第一层节点,即root Tasks都属于无依赖节点,命名规则为:M 数字。

J52_3_4 代表 Join Tasks,命名规则为J 数字下划线后为依赖的任务M2,M3,M4,即只有依赖任务M2,M3和M4完成后,J5_2_3_4才会运行。

R87代表Reduce Tasks,命名规则为R 数字下划线后为依赖的任务J7_1_5_6。

  1. Map Task只拿其中的一段日志M2举例子
代码语言:txt复制
   In Task M2:
       Data source: yht_dwd.dwd_uexpress_u_express_order_df/pt=20230628
       # 数据来源 yht_dwd.dwd_uexpress_u_express_order_df/pt=20230628
       TS: yht_dwd.dwd_uexpress_u_express_order_df/pt=20230628
       # 扫描表 pt=20230628 分区
           Statistics: Num rows: 2.2855571E7, Data size: 6.8566713E9
           # 扫描数据行数,扫描数据的字节数
           FIL: AND(ISNOTNULL(number),ISNOTNULL(status_id))
           # 对number和status_id 做非空的过滤
               Statistics: Num rows: 1.851301251E7, Data size: 5.553903753000001E9
               # 过滤后的条数和数据的字节数
               RS: order:  
               # 使用ReduceSinkOperator执行数据分发操作
               # order   升序;- 降序
                   nullDirection: *
                   optimizeOrderBy: False
                   valueDestLimit: 0
                   dist: HASH
                   keys:
                         number
                   values:
                         number (string)
                         order_type (string)
                         status_id (string)
                   partitions:
                         number
                   Statistics: Num rows: 1.851301251E7, Data size: 5.553903753000001E9

Operator算子含义以及语句释义:

  • TableScanOperator(TS):描述查询语句中的from语句块的逻辑。explain结果中会显示输入表的名称(Alias)。
  • SelectOperator(SEL):描述查询语句中的select语句块的逻辑。explain结果中会显示向下一个Operator传递的列,多个列由逗号分隔。
    • 如果是列的引用,则显示为<alias>.<column_name>
    • 如果是表达式的结果,则显示为函数形式,例如func1(arg1_1, arg1_2, func2(arg2_1, arg2_2))
    • 如果是常量,则直接显示常量值。
  • FilterOperator(FIL):描述查询语句中的where语句块的逻辑。explain结果中会显示一个where条件表达式,形式类似SelectOperator的显示规则。
  • JoinOperator(JOIN):描述查询语句中的join语句块的逻辑。explain结果中会显示哪些表以哪种方式Join在一起。
  • GroupByOperator(例如AGGREGATE):描述聚合操作的逻辑。如果查询中使用了聚合函数,就会出现该结构,explain结果中会显示聚合函数的内容。
  • ReduceSinkOperator(RS):描述Task间数据分发操作的逻辑。如果当前Task的结果会传递给另一个Task,则必然需要在当前Task的最后,使用ReduceSinkOperator执行数据分发操作。explain的结果中会显示输出结果的排序方式、分发的Key、Value以及用来求Hash值的列。
  • FileSinkOperator(FS):描述最终数据的存储操作。如果查询中有insert语句块,explain结果中会显示目标表名称。
  • LimitOperator(LIM):描述查询语句中的limit语句块的逻辑。explain结果中会显示limit数。
  • MapjoinOperator(HASHJOIN):类似JoinOperator,描述大表的join操作。
  1. Join Task以为例
代码语言:sql复制
   In Task J7_1_5_6:
       JOIN:
            StreamLineRead1 RIGHTJOIN StreamLineRead5
            keys:
            #  两表通过0 1 两个字段来关联
                0:trail_ref1
                1:waybill_no
            probeCandidate: StreamLineRead5
           Statistics: Num rows: 1.835793731581557E21, Data size: 7.636901923379278E23
           SEL: waybill_no, order_type country_en, status_id, WHEN(EQ(status_id,'DELIVERED'),delivered_time,start_time) time_rk
           # 关联后选择的字段为如上
               Statistics: Num rows: 1.835793731581557E21, Data size: 5.654244693271196E23
               HASHJOIN:
                        Project1 INNERJOIN StreamLineRead6
                        keys:
                            0:
                            1:
                        non-equals:
                            0:
                            1:
                        bigTable: Project1
                   Statistics: Num rows: 3.5301211982074395E24, Data size: 1.7933015686893794E27
                   SEL: waybill_no, country_en, status_id, time_rk, week_of_year, week_year_id, $commonVariable8 _col8 [ $commonVariable8(lazy: true) = TO_CHAR(DATEADD(time_rk,1L,'dd'),'yyyy-mm-dd') ]  CONDITION: ISNOTNULL($commonVariable8)
                   # CONDITION 过滤$commonVariable8为非空
                       Statistics: Num rows: 3.1771090783866956E24, Data size: 1.9316823196591108E27
                       HASHJOIN:
                                Calc4 INNERJOIN StreamLineRead7
                                keys:
                                    0:_col8,week_year_id,week_of_year
                                    1:date,week_year_id,week_of_year
                                non-equals:
                                    0:
                                    1:
                                bigTable: Calc4
                           Statistics: Num rows: 5.498451118073621E27, Data size: 4.992593615210848E30
                           SEL: week_year_id y_bt_of_timerk, week_of_year m_bt_of_timerk, country_en, SUBSTR(TOSTRING(time_rk),1L,10L) __gk_3, WHEN(EQ(status_id,'DELIVERED'),waybill_no,null) __agg_0_p0, waybill_no
                               Statistics: Num rows: 5.498451118073621E27, Data size: 3.299070670844173E30
                               AGGREGATE: group by:y_bt_of_timerk,m_bt_of_timerk,country_en,__gk_3
                                UDAF: COUNT(DISTINCT__agg_0_p0)[Deduplicate] __agg_0,COUNT(DISTINCTwaybill_no)[Deduplicate] __agg_1
                                   Statistics: Num rows: 1.8830312048197332E27, Data size: 1.12981872289184E30
                                   RS: order:     
                                       nullDirection: ****
                                       optimizeOrderBy: False
                                       valueDestLimit: 0
                                       dist: HASH
                                       keys:
                                             y_bt_of_timerk
                                             m_bt_of_timerk
                                             country_en
                                             __gk_3
                                       values:
                                             y_bt_of_timerk (string)
                                             m_bt_of_timerk (string)
                                             country_en (string)
                                             __gk_3 (string)
                                             __agg_0_p0 (string)
                                             waybill_no (string)
                                       partitions:
                                             y_bt_of_timerk
                                             m_bt_of_timerk
                                             country_en
                                             __gk_3
                                       Statistics: Num rows: 1.8830312048197332E27, Data size: 1.12981872289184E30
2.2 通过日志进行脚本优化

我看大家在写脚本的时候,喜欢用“()”将表括起来,可能想着小学时候数学中的运算顺序,括号中的运算程序是优先进行的。如下图:

括号中的脚本括号中的脚本

而我将脚本修改后,具体如下:

修改后的脚本修改后的脚本

我们通过运行日志来看下,具体的执行结果:

日志对比日志对比

我们可以看到除了几个字段的不一致以外,其他的完全一致,说明大家费劲心思的括号其实对于脚本的优化没有一点作用,而且显得脚本特别凌乱,增加了脚本的不可读性。

因此,学会看日志,掌握sql内部运行的规律,对于脚本的优化至关重要。

0 人点赞