Tuesday, October 27, 2020

Using CTEs in TPC-H in NDB Cluster

 In the previous post I showed some improvements based on using

Windows functions in rewritten TPC-H queries. In this blog post

I will discuss the improvements made in Q15 using a CTE (Common

Table Expression). This is again based on inspiration from new and

old blogs written by Öystein Grövlen (should be norwegian ö's).


Here is Q15 in TPC-H:

create view revenue (supplier_no, total_revenue) as

        select

                l_suppkey,

                sum(l_extendedprice * (1 - l_discount))

        from

                lineitem

        where

                l_shipdate >= '1996-01-01'

                and l_shipdate < date_add('1996-01-01', interval '90' day)

        group by

                l_suppkey;


select

        s_suppkey,

        s_name,

        s_address,

        s_phone,

        total_revenue

from

        supplier,

        revenue

where

        s_suppkey = supplier_no

        and total_revenue = (

                select

                        max(total_revenue)

                from

                        revenue

        )

order by

        s_suppkey;


drop view revenue;


It creates a view and uses this view in 2 places in the query.

The same query using a CTE:

WITH revenue0(supplier_no , total_revenue) AS (

    SELECT l_suppkey, SUM(l_extendedprice * (1 - l_discount))

    FROM lineitem

    WHERE l_shipdate >= '1996-01-01'

      AND l_shipdate < DATE_ADD('1996-01-01', INTERVAL '90' DAY)

    GROUP BY l_suppkey )

SELECT s_suppkey, s_name, s_address, s_phone, total_revenue

FROM supplier, revenue0

WHERE s_suppkey = supplier_no

  AND total_revenue = (SELECT MAX(total_revenue) FROM revenue0)

ORDER BY s_suppkey;

Here is the output from EXPLAIN for the original query:

+----+-------------+------------+-------------------------+--------+--------------------------------+------------+---------+--------------------------+--------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

| id | select_type | table      | partitions              | type   | possible_keys                  | key        | key_len | ref                      | rows   | filtered | Extra                                                                                                                                                                                        |

+----+-------------+------------+-------------------------+--------+--------------------------------+------------+---------+--------------------------+--------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

|  1 | PRIMARY     | <derived3> | NULL                    | ALL    | NULL                           | NULL       | NULL    | NULL                     | 223448 |    10.00 | Using where; Using temporary; Using filesort                                                                                                                                                 |

|  1 | PRIMARY     | supplier   | p0,p1,p2,p3,p4,p5,p6,p7 | eq_ref | PRIMARY                        | PRIMARY    | 4       | dbt3.revenue.supplier_no |      1 |   100.00 | NULL                                                                                                                                                                                         |

|  3 | DERIVED     | lineitem   | p0,p1,p2,p3,p4,p5,p6,p7 | range  | l_shipDATE,l_partkey,l_suppkey | l_shipDATE | 4       | NULL                     | 223448 |   100.00 | Using pushed condition ((`dbt3`.`lineitem`.`l_shipDATE` >= DATE'1996-01-01') and (`dbt3`.`lineitem`.`l_shipDATE` < <cache>(('1996-01-01' + interval '90' day)))); Using MRR; Using temporary |

|  2 | SUBQUERY    | <derived4> | NULL                    | ALL    | NULL                           | NULL       | NULL    | NULL                     | 223448 |   100.00 | NULL                                                                                                                                                                                         |

|  4 | DERIVED     | lineitem   | p0,p1,p2,p3,p4,p5,p6,p7 | range  | l_shipDATE,l_partkey,l_suppkey | l_shipDATE | 4       | NULL                     | 223448 |   100.00 | Using pushed condition ((`dbt3`.`lineitem`.`l_shipDATE` >= DATE'1996-01-01') and (`dbt3`.`lineitem`.`l_shipDATE` < <cache>(('1996-01-01' + interval '90' day)))); Using MRR; Using temporary |

+----+-------------+------------+-------------------------+--------+--------------------------------+------------+---------+--------------------------+--------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

5 rows in set, 1 warning (0.00 sec)


What is clear in the original query is that we execute the view in both places where

it is used.

Here is the EXPLAIN from the CTE query:

+----+-------------+------------+-------------------------+--------+--------------------------------+------------+---------+----------------------+--------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

| id | select_type | table      | partitions              | type   | possible_keys                  | key        | key_len | ref                  | rows   | filtered | Extra                                                                                                                                                                                        |

+----+-------------+------------+-------------------------+--------+--------------------------------+------------+---------+----------------------+--------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

|  1 | PRIMARY     | <derived2> | NULL                    | ALL    | NULL                           | NULL       | NULL    | NULL                 | 223448 |    10.00 | Using where; Using temporary; Using filesort                                                                                                                                                 |

|  1 | PRIMARY     | supplier   | p0,p1,p2,p3,p4,p5,p6,p7 | eq_ref | PRIMARY                        | PRIMARY    | 4       | revenue0.supplier_no |      1 |   100.00 | NULL                                                                                                                                                                                         |

|  3 | SUBQUERY    | <derived2> | NULL                    | ALL    | NULL                           | NULL       | NULL    | NULL                 | 223448 |   100.00 | NULL                                                                                                                                                                                         |

|  2 | DERIVED     | lineitem   | p0,p1,p2,p3,p4,p5,p6,p7 | range  | l_shipDATE,l_partkey,l_suppkey | l_shipDATE | 4       | NULL                 | 223448 |   100.00 | Using pushed condition ((`dbt3`.`lineitem`.`l_shipDATE` >= DATE'1996-01-01') and (`dbt3`.`lineitem`.`l_shipDATE` < <cache>(('1996-01-01' + interval '90' day)))); Using MRR; Using temporary |

+----+-------------+------------+-------------------------+--------+--------------------------------+------------+---------+----------------------+--------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


Here we only execute the scan on the lineitem table once and reuse it in both places

where it is used.

Thus what we see here is that CTEs as well as windows functions makes it easier

for MySQL to discover repeated patterns.

Since we only need to scan once instead of twice the query execution times is cut in

half.

Using Windows functions in TPC-H with NDB Cluster

 This is probably the first time I post a blog that handles variants of

how to use SQL :)

I got the inspiration from reading some new and old blogs by Öystein Grövlen

where he mentioned various ways to rewrite queries as Common Table

Expressions (CTEs) and using windows functions instead of using subqueries.

I tried this and found that the rewritten queries was faster using windows in some

cases. Obviously the CTE variant and the subquery variant of the query can

be executed in the same way. But SQL implementations are not perfect and

are able to handle some constructs better than others.


This got me a bit puzzled, so I wanted to understand what specifically is the

thing that makes the CTEs and windows functions variant run faster.


Let's take Q17 in TPC-H as an example.

Here is the original query:

select

        sum(l_extendedprice) / 7.0 as avg_yearly

from

        lineitem,

        part

where

        p_partkey = l_partkey

        and p_brand = 'Brand#23'

        and p_container = 'MED BOX'

        and l_quantity < (

                select

                        0.2 * avg(l_quantity)

                from

                        lineitem

                where

                        l_partkey = p_partkey

        );


Here is the output from EXPLAIN.

+----+--------------------+----------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+------------------------------------------------------------------------------------------------------------------------------------------+

| id | select_type        | table    | partitions              | type | possible_keys         | key       | key_len | ref                 | rows   | filtered | Extra                                                                                                                                    |

+----+--------------------+----------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+------------------------------------------------------------------------------------------------------------------------------------------+

|  1 | PRIMARY            | part     | p0,p1,p2,p3,p4,p5,p6,p7 | ALL  | PRIMARY               | NULL      | NULL    | NULL                | 200000 |     1.00 | Parent of 2 pushed join@1; Using pushed condition ((`dbt3`.`part`.`p_container` = 'MED BOX') and (`dbt3`.`part`.`p_brand` = 'Brand#23')) |

|  1 | PRIMARY            | lineitem | p0,p1,p2,p3,p4,p5,p6,p7 | ref  | l_partkey,l_partkey_2 | l_partkey | 5       | dbt3.part.p_partkey |     30 |   100.00 | Child of 'part' in pushed join@1; Using where                                                                                            |

|  2 | DEPENDENT SUBQUERY | lineitem | p0,p1,p2,p3,p4,p5,p6,p7 | ref  | l_partkey,l_partkey_2 | l_partkey | 5       | dbt3.part.p_partkey |     30 |   100.00 | NULL                                                                                                                                     |

+----+--------------------+----------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+------------------------------------------------------------------------------------------------------------------------------------------+



Here is the output from EXPLAIN ANALYZE.

| -> Aggregate: sum(lineitem.l_extendedprice)  (actual time=484.773..484.773 rows=1 loops=1)

    -> Nested loop inner join  (cost=760100.20 rows=6017424) (actual time=14.792..484.552 rows=587 loops=1)

        -> Table scan on part, activating pushed join of 2 tables, with pushed condition: ((part.p_container = 'MED BOX') and (part.p_brand = 'Brand#23'))  (cost=98183.59 rows=200000) (actual time=10.267..10.664 rows=313 loops=1)

        -> Filter: (lineitem.l_quantity < (select #2))  (cost=30.09 rows=30) (actual time=0.625..1.513 rows=2 loops=313)

            -> Index lookup on lineitem using l_partkey (l_partkey=part.p_partkey), child of part in pushed join  (cost=30.09 rows=30) (actual time=0.001..0.018 rows=19 loops=313)

            -> Select #2 (subquery in condition; dependent)

                -> Aggregate: avg(lineitem.l_quantity)  (actual time=0.073..0.073 rows=1 loops=6088)

                    -> Index lookup on lineitem using l_partkey (l_partkey=part.p_partkey)  (cost=33.10 rows=30) (actual time=0.062..0.068 rows=31 loops=6088)



What seem to happen here is that we re-execute the subquery again and again.

When writing this query using a windows function the query looks like this:

WITH win AS (

 SELECT l_extendedprice, l_quantity, AVG(l_quantity)

 OVER (PARTITION BY p_partkey) avg_l_quantity

 FROM lineitem, part

 WHERE p_partkey = l_partkey AND

 p_brand = 'Brand#23' AND

 p_container = 'MED BOX')

SELECT SUM(l_extendedprice) / 7.0 AS avg_yearly

from win

where l_quantity < 0.2 * avg_l_quantity;


What we have done here is that we have pushed the calculation of the average

into the join processing and thus made the subquery possible to run on a

materialised table.

Here is the output of the new query from EXPLAIN.

+----+-------------+------------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

| id | select_type | table      | partitions              | type | possible_keys         | key       | key_len | ref                 | rows   | filtered | Extra                                                                                                                                                                     |

+----+-------------+------------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

|  1 | PRIMARY     | <derived2> | NULL                    | ALL  | NULL                  | NULL      | NULL    | NULL                |  60174 |    33.33 | Using where                                                                                                                                                               |

|  2 | DERIVED     | part       | p0,p1,p2,p3,p4,p5,p6,p7 | ALL  | PRIMARY               | NULL      | NULL    | NULL                | 200000 |     1.00 | Parent of 2 pushed join@1; Using pushed condition ((`dbt3`.`part`.`p_container` = 'MED BOX') and (`dbt3`.`part`.`p_brand` = 'Brand#23')); Using temporary; Using filesort |

|  2 | DERIVED     | lineitem   | p0,p1,p2,p3,p4,p5,p6,p7 | ref  | l_partkey,l_partkey_2 | l_partkey | 5       | dbt3.part.p_partkey |     30 |   100.00 | Child of 'part' in pushed join@1                                                                                                                                          |

+----+-------------+------------+-------------------------+------+-----------------------+-----------+---------+---------------------+--------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Here is the output of the new query from EXPLAIN ANALYZE.

| -> Aggregate: sum(win.l_extendedprice)  (actual time=33.745..33.746 rows=1 loops=1)

    -> Filter: (win.l_quantity < (0.2 * win.avg_l_quantity))  (actual time=32.352..33.695 rows=587 loops=1)

        -> Table scan on win  (cost=6772.08 rows=60174) (actual time=0.000..0.218 rows=6088 loops=1)

            -> Materialize CTE win  (actual time=32.343..32.802 rows=6088 loops=1)

                -> Window aggregate with buffering: avg(lineitem.l_quantity) OVER (PARTITION BY part.p_partkey )   (actual time=16.773..30.318 rows=6088 loops=1)

                    -> Sort: part.p_partkey  (actual time=16.700..17.123 rows=6088 loops=1)

                        -> Stream results  (cost=760100.20 rows=6017424) (actual time=10.584..15.531 rows=6088 loops=1)

                            -> Nested loop inner join  (cost=760100.20 rows=6017424) (actual time=10.578..13.499 rows=6088 loops=1)

                                -> Table scan on part, activating pushed join of 2 tables, with pushed condition: ((part.p_container = 'MED BOX') and (part.p_brand = 'Brand#23'))  (cost=98183.59 rows=200000) (actual time=10.551..11.403 rows=313 loops=1)

                                -> Index lookup on lineitem using l_partkey (l_partkey=part.p_partkey), child of part in pushed join  (cost=30.09 rows=30) (actual time=0.000..0.005 rows=19 loops=313)


What we see there is that the windows function is materialised and executing the

where clause on average quantity is very quick.


The result is impressive the speedup of running this query in
MySQL NDB Cluster is 16x! The long project of integrating NDB Cluster
with MySQL 8.0 bears fruit here.

Monday, October 19, 2020

New things coming in MySQL Cluster 8.0.22

In version 8.0.22 we have added a number of important new features. The first

one is that we introduced the capability to create encrypted backups.

This includes capabilities to create encrypted backups, restore encryped

backups and also encrypt backups that was created without encryption.


The second is that we have added support for IPv6 addresses.  If it is

necessary to mix IPv4 and IPv6 addresses in a cluster it is important

to be careful and follow a number of rules as documented.


Performance of replication of BLOB columns has been significantly

improved and similarly for SELECT statements using BLOB columns.

This has been achieved through decreasing the amount of round-trips used

to perform operations involving BLOB tables. This significantly decreases

the latency of operations involving BLOBs.


Also in 8.0.21 an important new feature was added that made some

complex queries significantly faster. We added the ability to push

antijoins and semijoin execution down to the NDB data nodes.

One example query of this is Q22 in TPC-H. This query executes

3.5x faster due to this change and there is likely queries that will

benefit even more than this query due to this change.

select

        cntrycode,

        count(*) as numcust,

        sum(c_acctbal) as totacctbal

from

        (

                select

                        substr(c_phone from 1 for 2) as cntrycode,

                        c_acctbal

                from

                        customer

                where

                        substr(c_phone from 1 for 2) in

                                ('13', '31', '23', '29', '30', '18', '17')

                        and c_acctbal > (

                                select

                                        avg(c_acctbal)

                                from

                                        customer

                                where

                                        c_acctbal > 0.00

                                        and substr(c_phone from 1 for 2) in

                                                ('13', '31', '23', '29', '30', '18', '17')

                        )

                        and not exists (

                                select

                                        *

                                from

                                        orders

                                where

                                        o_custkey = c_custkey

                        )

        ) as custsale

group by

        cntrycode

order by

        cntrycode;


Wednesday, October 07, 2020

DBT2 benchmarks with NDB Cluster

This blog post is referring to the uploaded slides.


DBT2 is based on the standard benchmark TPC-C. This benchmark is a mix of read

and write activity. It is much more write intensive than many other benchmarks.

This has resulted that performance in NDB in previous versions have only scaled

to nodes with around 6 LDM threads. With the introduction of multiple sockets

for communication between nodes we have been able to remove this limitation.


Thus DBT2 serves as a good tool to verify that MySQL Cluster 8.0.20 has improved

its capability to scale writes.


To our disposal for those tests we had access to a set of bare metal servers in

the Oracle Cloud (OCI). The data nodes used DenseIO2 bare metal servers with

52 CPU cores, 768 GByte of memory and 8 NVMe drives. We had 6 such servers,

these 6 servers were spread in 3 different availability domains.


The queries was executed by MySQL Servers, we used 15 bare metal servers with

36 CPU cores each to run the MySQL Servers. These servers are spread such that

5 servers are located in each availability domain.


The actual benchmark driver and client was executed on its own bare metal server.


DBT2 has five different transaction types. Each transaction is a complex transaction

consisting of a set of SQL queries. On average each transaction consists of around

30-35 SQL queries. Only NewOrder transactions are counted in the results. These

constitute about 45% of the transactions. This means that if the report says 45.000

TPM (transactions per minute), it means that actually 100.000 transactions were

executed.


DBT2 doesn't follow the standard TPC-C since each terminal continues immediately

without waiting before it starts a new transaction. This makes it possible to also

use DBT2 to test in-memory DBMS. Interestingly however with persistent memory a

server can have 6 TB of memory and this means that more than 50.000 warehouses

can be used and thus results beyond 500.000 TPM can be handled even with an

in-memory setup.


Each SQL query is created in the DBT2 driver, sent to the DBT2 client and from there

it is sent to the MySQL Server and from there it accesses a set of NDB data nodes to

perform the actual query.


Since the servers are located in different availability domains the main cause of

latency comes from the communication between availability domains. When executing

DBT2 in a local network one can achieve latency down to 5-10 ms for a transaction

that contains around 30 SQL statements. In this environment we get latencies of

around 30 ms. Most of this time is spent communicating from the DBT2 client to the

MySQL Servers when they are located in a different availability domain.


NDB has a configuration option that ensures that any reads will always stay local

in the availability domain of the MySQL Server it was sent to. Updates need to

update all replicas and this might require communication over availability domains.


We have selected 3 different configurations to execute DBT2 in.


The first configuration is shown in slide 5. It uses 2 data nodes, these are both

located in the same availability domain. They are however located in different

failure domains and thus only share electricity network. We use 10 MySQL servers

from 2 different availability domains.


The second configuration shows a setup where we still have 2 replicas, but now we

have one "shard" per availability domain. Thus replicas are still in the same

availability domain. This means that we use all 6 data node servers. We also use

all 15 MySQL servers in 3 different availability domains.


The third configuration we use 3 replicas where the replicas are in different

availability domains. In this case we get 2 node groups using the 6 data nodes.

There is still 15 MySQL Servers in 3 different availability domains.


The first configuration reaches almost 1.4M TPM at 2560 connections. The second

configuration with 3 node groups reaches almost linear scaling  and reaches

a bit beyond 4M TPM with 12000 connections.


Finally the last configuration reaches around 2.5M TPM at 8400 connections.


Thus we can see that performance is fairly constant per node group. The latency

differs and the number of connections required to reach optimal throughput

differs a bit.


It is very clear that the multi-socket solution has improved the scalability of

writes and thus ensuring that NDB can reach very high numbers using DBT2.


In this benchmark we showed NDB scale to using more than 1000 CPUs in the MySQL

Servers and the NDB data nodes. MySQL NDB Cluster 8.0 can scale to 144 data

nodes and thus can scale far beyond those numbers.

TPC-H Improvements in NDB Cluster

 This blog is a presentation based on the uploaded slides.


We have focused a lot of energy on improving our capabilities to execute complex

queries in an efficient manner. The first step on these improvements we took in

MySQL Cluster 7.6, but even more improvements have been made in

MySQL Cluster 8.0.


There are four types of improvements that we have made. One is that we have added

the option to use a shared memory transporter between the MySQL Server and the

NDB data nodes when they are located on the same machine.


The second improvement is the adaptive CPU spinning described in blog post 1.


The third improvement is that we made Read Backup the default, this means that in

complex queries any replica can be used for reading.


Finally we have added a number of algorithmic improvements, these mainly focus on

enabling more parts of the queries to be pushed down into the NDB data nodes.

We described the improvements in 8.0.20 in blog post 2.


Slide 4 shows a 2-node setup optimised for optimal query latency.


In slide 5 we show the improvements in TPC-H comparing MySQL Cluster 8.0.20 vs

MySQL Cluster 7.6.12. 15 out of 22 queries have improved by more 100% and most

of those even a lot more than 100%. 3 queries have improved betwen 50% and 100%.

Only 4 queries have been unaffected by these improvements.


In slide 6 we see the latency improvements that comes from colocating the MySQL

Server and the NDB data nodes using a shared memory transporter. 14 out of 22

queries improve in this scenario and some of them almost 100% and even above

100%.


Slide 7 shows the improvements based on using LatencyOptimisedSpinning as

SpinMethod as described in blog post 3. 13 out of 22 queries sees an improvement

in this scenario. Most of them around 15-20% with a few that improves up to 60%.


Slide 8 shows the sum of the improvements of colocation and CPU spinning.


Slide 9 shows that some queries can benefit from more LDMs, but some also get a

negative impact from this. Thus we conclude that there is a limit to have many

fragments we split tables into for optimal latency. This is an important thing

to consider in any distributed DBMS.

Tuesday, October 06, 2020

YCSB Disk Data Benchmark with NDB Cluster

 As mentioned in  blog post 1 we have improved the capabilities to handle

very large write bandwidth and blog post 2 improved the checkpointing of

disk data columns in MySQL Cluster 8.0.20.


We wanted to verify that these changes were succesful. To do this we

selected to use the YCSB benchmark. This is a very simple benchmark,

it contains one table with in our variant 2 columns. The first

column is the primary key, the second key contains the payload data

and is stored in VARCHAR(29500) column that is stored on disk.


The first part of the benchmark fills the database. The database is

mostly stored in a tablespace that is setup in accordance with the

blog post 3. This means that we had a tablespace size of 20 TBytes.

We loaded 600M rows, thus creating a database size of 18 TByte.


The load phase inserted 44.500 rows per second. This means that we

loaded 1.25 GByte per second into the database. The bottleneck in

both the load phase and the benchmark run phase was mainly the

NVMe drives, but in some cases also the 25G Ethernet became the

bottleneck. The CPUs were never loaded more than 20%, thus never

becoming a bottleneck.


From this we can conclude that the setup and use of the NVMe drives

is the most important part of achieving extreme write rates for

use cases where NDB Cluster is used for file write loads.


The cluster setup used 2 data nodes, each of the data nodes was a bare

metal server in the Oracle Cloud (OCI) that had 8 NVMe drives (2 used

for logs and checkpoints and 6 used for the tablespace). The servers

had 52 CPU cores each. Instead of setting up a RAID on the 6 NVMe drives

we instead opted for one file system per NVMe drive and added one

data file per NVMe drive to NDB. This meant that NDB handled the

spread of writes on the different data files. Thus no complex RAID

solutions was required. However to get the best possible performance

it was necessary to use SSD overprovisioning.


The disk usage and CPU usage and network usage during this benchmark can

be found in blog post 4. The mean latency of those transactions was a bit more

than 2 milliseconds where reads took a bit more than 1 ms and writes around

4-5 milliseconds.


The actual benchmark consisted of 50% reads and 50% writes. Here we

achieved almost 70.000 transactions per second. This meant that we

read 1 GByte per second in parallel with writing 1 GByte per second.