Tuesday, October 27, 2020

Using Windows functions in TPC-H with NDB Cluster

 This is probably the first time I post a blog that handles variants of

how to use SQL :)

I got the inspiration from reading some new and old blogs by Öystein Grövlen

where he mentioned various ways to rewrite queries as Common Table

Expressions (CTEs) and using windows functions instead of using subqueries.

I tried this and found that the rewritten queries was faster using windows in some

cases. Obviously the CTE variant and the subquery variant of the query can

be executed in the same way. But SQL implementations are not perfect and

are able to handle some constructs better than others.


This got me a bit puzzled, so I wanted to understand what specifically is the

thing that makes the CTEs and windows functions variant run faster.


Let's take Q17 in TPC-H as an example.

Here is the original query:

select

        sum(l_extendedprice) / 7.0 as avg_yearly

from

        lineitem,

        part

where

        p_partkey = l_partkey

        and p_brand = 'Brand#23'

        and p_container = 'MED BOX'

        and l_quantity < (

                select

                        0.2 * avg(l_quantity)

                from

                        lineitem

                where

                        l_partkey = p_partkey

        );


Here is the output from EXPLAIN.

+----+--------------------+----------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+------------------------------------------------------------------------------------------------------------------------------------------+

| id | select_type        | table    | partitions              | type | possible_keys         | key       | key_len | ref                 | rows   | filtered | Extra                                                                                                                                    |

+----+--------------------+----------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+------------------------------------------------------------------------------------------------------------------------------------------+

|  1 | PRIMARY            | part     | p0,p1,p2,p3,p4,p5,p6,p7 | ALL  | PRIMARY               | NULL      | NULL    | NULL                | 200000 |     1.00 | Parent of 2 pushed join@1; Using pushed condition ((`dbt3`.`part`.`p_container` = 'MED BOX') and (`dbt3`.`part`.`p_brand` = 'Brand#23')) |

|  1 | PRIMARY            | lineitem | p0,p1,p2,p3,p4,p5,p6,p7 | ref  | l_partkey,l_partkey_2 | l_partkey | 5       | dbt3.part.p_partkey |     30 |   100.00 | Child of 'part' in pushed join@1; Using where                                                                                            |

|  2 | DEPENDENT SUBQUERY | lineitem | p0,p1,p2,p3,p4,p5,p6,p7 | ref  | l_partkey,l_partkey_2 | l_partkey | 5       | dbt3.part.p_partkey |     30 |   100.00 | NULL                                                                                                                                     |

+----+--------------------+----------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+------------------------------------------------------------------------------------------------------------------------------------------+



Here is the output from EXPLAIN ANALYZE.

| -> Aggregate: sum(lineitem.l_extendedprice)  (actual time=484.773..484.773 rows=1 loops=1)

    -> Nested loop inner join  (cost=760100.20 rows=6017424) (actual time=14.792..484.552 rows=587 loops=1)

        -> Table scan on part, activating pushed join of 2 tables, with pushed condition: ((part.p_container = 'MED BOX') and (part.p_brand = 'Brand#23'))  (cost=98183.59 rows=200000) (actual time=10.267..10.664 rows=313 loops=1)

        -> Filter: (lineitem.l_quantity < (select #2))  (cost=30.09 rows=30) (actual time=0.625..1.513 rows=2 loops=313)

            -> Index lookup on lineitem using l_partkey (l_partkey=part.p_partkey), child of part in pushed join  (cost=30.09 rows=30) (actual time=0.001..0.018 rows=19 loops=313)

            -> Select #2 (subquery in condition; dependent)

                -> Aggregate: avg(lineitem.l_quantity)  (actual time=0.073..0.073 rows=1 loops=6088)

                    -> Index lookup on lineitem using l_partkey (l_partkey=part.p_partkey)  (cost=33.10 rows=30) (actual time=0.062..0.068 rows=31 loops=6088)



What seem to happen here is that we re-execute the subquery again and again.

When writing this query using a windows function the query looks like this:

WITH win AS (

 SELECT l_extendedprice, l_quantity, AVG(l_quantity)

 OVER (PARTITION BY p_partkey) avg_l_quantity

 FROM lineitem, part

 WHERE p_partkey = l_partkey AND

 p_brand = 'Brand#23' AND

 p_container = 'MED BOX')

SELECT SUM(l_extendedprice) / 7.0 AS avg_yearly

from win

where l_quantity < 0.2 * avg_l_quantity;


What we have done here is that we have pushed the calculation of the average

into the join processing and thus made the subquery possible to run on a

materialised table.

Here is the output of the new query from EXPLAIN.

+----+-------------+------------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

| id | select_type | table      | partitions              | type | possible_keys         | key       | key_len | ref                 | rows   | filtered | Extra                                                                                                                                                                     |

+----+-------------+------------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

|  1 | PRIMARY     | <derived2> | NULL                    | ALL  | NULL                  | NULL      | NULL    | NULL                |  60174 |    33.33 | Using where                                                                                                                                                               |

|  2 | DERIVED     | part       | p0,p1,p2,p3,p4,p5,p6,p7 | ALL  | PRIMARY               | NULL      | NULL    | NULL                | 200000 |     1.00 | Parent of 2 pushed join@1; Using pushed condition ((`dbt3`.`part`.`p_container` = 'MED BOX') and (`dbt3`.`part`.`p_brand` = 'Brand#23')); Using temporary; Using filesort |

|  2 | DERIVED     | lineitem   | p0,p1,p2,p3,p4,p5,p6,p7 | ref  | l_partkey,l_partkey_2 | l_partkey | 5       | dbt3.part.p_partkey |     30 |   100.00 | Child of 'part' in pushed join@1                                                                                                                                          |

+----+-------------+------------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Here is the output of the new query from EXPLAIN ANALYZE.

| -> Aggregate: sum(win.l_extendedprice)  (actual time=33.745..33.746 rows=1 loops=1)

    -> Filter: (win.l_quantity < (0.2 * win.avg_l_quantity))  (actual time=32.352..33.695 rows=587 loops=1)

        -> Table scan on win  (cost=6772.08 rows=60174) (actual time=0.000..0.218 rows=6088 loops=1)

            -> Materialize CTE win  (actual time=32.343..32.802 rows=6088 loops=1)

                -> Window aggregate with buffering: avg(lineitem.l_quantity) OVER (PARTITION BY part.p_partkey )   (actual time=16.773..30.318 rows=6088 loops=1)

                    -> Sort: part.p_partkey  (actual time=16.700..17.123 rows=6088 loops=1)

                        -> Stream results  (cost=760100.20 rows=6017424) (actual time=10.584..15.531 rows=6088 loops=1)

                            -> Nested loop inner join  (cost=760100.20 rows=6017424) (actual time=10.578..13.499 rows=6088 loops=1)

                                -> Table scan on part, activating pushed join of 2 tables, with pushed condition: ((part.p_container = 'MED BOX') and (part.p_brand = 'Brand#23'))  (cost=98183.59 rows=200000) (actual time=10.551..11.403 rows=313 loops=1)

                                -> Index lookup on lineitem using l_partkey (l_partkey=part.p_partkey), child of part in pushed join  (cost=30.09 rows=30) (actual time=0.000..0.005 rows=19 loops=313)


What we see there is that the windows function is materialised and executing the

where clause on average quantity is very quick.


The result is impressive the speedup of running this query in
MySQL NDB Cluster is 16x! The long project of integrating NDB Cluster
with MySQL 8.0 bears fruit here.

No comments: