再来一个诊断SparkSql慢任务的案例吧

2023-10-30 17:03:08 浏览数 (2)

干货是枯燥的,这篇这周末在源码群里给大家细讲一下吧~

前天晚上,被拉群,给了一批慢任务,严重影响体验,任务运行时长如下图,有的任务跑了一天,还没跑完,该怎么着手优化呢?

近半年来,我优化诊断了上百个任务,但确实很少去总结记录了。

每次找问题的过程很简单,但是其实其中是有很多基本功在里面的,比如这次优化用到的基本功:

  • sparksql的执行计划得熟悉,至少得能看懂啥是啥
  • sparksql的Join选择策略,5大Join实现类的基本原理,优劣势,特性,每种join应该生成什么样的执行计划
  • Distribution与Partitioning体系
  • EnsureRequirements规则
  • Exchange(ShuffleExchangeExec)节点是啥,啥时候会生成Exchange节点

这里大概说一下我的诊断步骤呀:

1、看sparkwebui,看哪个job,哪个stage慢

2、研究sql代码,大部分情况下,出现这种问题的sql代码都很长,要有足够的耐心去读代码,然后再依据sql的dag图,确定慢的stage对应的是哪段sql

3、研究执行计划,看看当前有问题的节点是哪种类型的,是hashAgg,还是objHashAgg,还是sortAgg,又或者是sortMergeJoin? 等等,一方面知道当前节点是哪种类型后,因为我们知道它的运行原理,也就知道数据上可能有的问题,尤其是看到broadcastNestedLoopJoin或者cartesianproduct 这两个时,要格外注意

4、看stage的Summary Metrics页面,从已完成的task来看,task平均的运行情况,判断有没有数据倾斜、是不是所有task都处理了太多的数据量、有没有慢节点的机器等

5、研究这段sql上下文的数据,确认数据层面有没有数据倾斜、大字段啊这些

上面这些,是从数据开发的角度来看,有没有改进的地方,如果从sql的角度确实看不出问题,我们需要一些推断,比如:

  • 每个task处理的数据量不大,并且没有复杂的计算逻辑,但是运行的很慢 --是不是服器性能有卡点?
  • 我们把任务换个队列就ok,但在当前队列就不行 --是不是当前队列所在集群有问题?
  • ......

拿着这些推断,请求套件开发 or 运维的同学来一起排查,因为这个层面的问题,是我们的专业能力范围内没办法解决的(ps:如果你是全能型人才除外啊,即做数据开发 又做服务器搭建运维工作 又负责源码套件二次开发优化,那遇到问题基本没有解决不了的)。

下面说一下过程:

1、看spark web ui界面的Jobs

发现job Id是20的任务跑的很慢,30多个小时了,继续点链接跟进去

发现这个job只有一个task在运行,并且shuffle read量还挺大的,第一怀疑是数据倾斜了嘛?

2、找sql的dag图,再确定一下出卡点的任务对应的是哪一块的执行计划,输入和输出的上下文是什么

如上,最终找到和卡点task对应的dag图,是BroadcastHashJoin,左表是一个经过一系列计算后输出的中间结果,右表也是经过一系列的计算最终只有一条数据,所以走了广播,比较全的图如下:

从dag图上看左表的数据量确实很大,只有1个task肯定跑的慢,但是以对join的理解,这里右表已经走广播了,左表理论上不再需要exchange(shuffle)节点,但这儿确实多了一个shuffle

3、看sql具体逻辑(是一个很大的考验)

把sql简化和脱敏后,粘这儿,真的是一个非常复杂的sql,这也是最考验人的一步,真正优化时,得耐心读代码,哈哈,有时候自己写的代码还不想看第二遍呢,读别人的代码确实不容易

代码语言:javascript复制
WITH s1 AS (
    SELECT  id,
            xx,
            ....
    FROM    table1
    WHERE   date = '${date}'
    AND     id IN (1111)
),
s2 AS (
    SELECT  id,
            did,
            MAX(
                XX
            ) AS xx 
    FROM    table2
    WHERE   date = '${date}'
    AND      id IN (1111)
    GROUP BY
            id,
            did
),
s3 AS (
    
    SELECT  id,
            s_id,
            SUM(xx) AS xx
    FROM    table3
    WHERE   date = '${date}'
    GROUP BY id,
            s_id
),
s4 AS (
    SELECT id,
            s_id,
            xx
    FROM    (
                SELECT  xx
                FROM    table
                WHERE   date = '${date}'
                GROUP BY
                       xx
            ) t1
    LEFT JOIN
            (
                SELECT  xx
                FROM    table
                WHERE   date = '${date}'
                GROUP BY
                       xx
            ) t2
    ON      t1.xx = t2.xx
    AND     t1.xx = t2.xx
),
s5 AS (
    SELECT  id,
            xx
    FROM    (
                SELECT  id,
                        xx,
                        row_number() OVER(
                            PARTITION BY
                                    id
                            ORDER BY
                                    xx ASC
                        ) rn
                FROM    (
                            SELECT  xx
                            FROM    table
                            WHERE   date BETWEEN '${date-14}' AND '${date-1}'
                            AND     id = 1111
                            GROUP BY
                                   xx
                        ) m1
            ) m2
    WHERE   rn BETWEEN 3 AND 7
    GROUP BY
            id
),
s6 AS (
    SELECT  id,
            xx
    FROM    (
                SELECT  id,
                        xx,
                        row_number() OVER(
                            PARTITION BY
                                    id
                            ORDER BY
                                    xx ASC
                        ) rn
                FROM    (
                            SELECT  xx
                            FROM    (
                                        SELECT  xx
                                        FROM    (
                                                    SELECT  xx
                                                    FROM    table
                                                    WHERE   date BETWEEN '${date-14}' AND '${date-1}'                                                    
                                                    AND     id IN (1111)
                                                    GROUP BY
                                                           xx
                                                ) t1
                                        LEFT JOIN
                                                (
                                                    SELECT  xx
                                                    FROM    table
                                                    WHERE   date BETWEEN '${date-14}' AND '${date-1}'                                                   
                                                    AND     id IN (1111)                                                    
                                                    GROUP BY
                                                           xx
                                                ) t2
                                        ON      t1.xx = t2.xx
                                        AND    ..
                                    )
                            GROUP BY
                                    xx
                        ) m1
            ) m2
    WHERE   rn BETWEEN 3 AND 7
    GROUP BY
            id
)
INSERT OVERWRITE TABLE tablexx PARTITION (date = '${date}')
SELECT  xx
FROM    (
            SELECT xx
            FROM    s1
            LEFT JOIN
                    s2
            ON      s1.id = s2.id
            AND     s1.did = s2.did
            LEFT JOIN
                    s3
            ON      s1.s_id = s3.s_id
            AND     s1.id = s3.id
            LEFT JOIN
                    s4
            ON      s1.s_id = s4.s_id
            AND     s1.id = s4.id
            LEFT JOIN
                    s5
            ON      s1.id = s5.id
            LEFT JOIN
                    s6
            ON      s1.id = s6.id
        ) mid
GROUP BY
       xx

6个with... as ... 结合dag图,定位到出现卡点的地方是最后两步:

这儿或许我们知道原因了,key值只有1111,是热点key,我们处理这种join时,有一个方法就是,如果右表足够小,让右表走广播,左表就不用shuffle了,也就避免了热点key在shuffle时出现数据倾斜。

但是,我们看执行计划,右表走广播了呀,这时,就基本确定了是sparksql生成的执行计划有问题。

正常的执行计划应该是这样:

把这两种执行计划放一起对比一下:

结论:

sql没问题,数据也没有问题,所以怀疑是sparksql生成执行计划那里出现了badcase,我们内部用的spark版本,是经过二次开发的,把问题反馈给套件开发的同学,设置spark版本回退参数,新的一天的数据运行正常了。

0 人点赞