Monday, December 09, 2019

NDB Parallel Query, part 4

In this part we will discuss how NDB batch handling works. Query execution of
complex SQL queries means that more rows can be delivered than the receiver is
capable of receiving. This means that we need to create a data flow from the
producer where the data resides and the query executor in the MySQL Server.

The MySQL Server uses a record where the storage engine have to copy the result
row into the record. This means that the storage of batches of rows is taken
care of by the storage engine.

When NDB performs a range scan it will decide on the possible parallelism before
the scan is started. The NDB API have to allocate enough memory to ensure that
we have memory prepared to receive the rows as they arrive in a flow of result
rows from the data nodes. It is possible to set batch size of hundreds and even
thousands of rows for a query.

The executor of the scan is the DBTC module in the TC threads. This module only
passes message through and sends them to the proper place. There is no storage
of result rows in DBTC. There is only one TC thread involved in one scan (range
scan or full table scan). The TC thread will decide on which modules that should
handle each individual fragment scan. The message to scan contains a set of
references to the memory available in the NDB API. This set of references is in
turn distributed to the fragment scans. This means that these can send result
rows directly to the NDB API.

When a fragment scan has completed sending rows for all memory references it
cannot continue until the NDB API has processed these rows. The fragment
scan handled by the DBLQH module in the LDM threads will send information
to the DBTC module that it waits for a continue request. The DBTC module will
ensure that the NDB API knows that it should receive a set of rows as specified in
the response to the NDB API.

As soon as the NDB API have processed the set of rows it will inform the DBTC
module that it is now ready to receive more rows. Since there are multiple fragment
scans it is possible that rows have been continously received in the NDB API while
it was processing the rows received previously.

As can be seen in the above description the fragment scans will not be actively
performing the scans all the time. It would be possible to scan in the DBLQH
module and store the result row locally there until the continue request arrives.
This is not done currently, it would obviously increase the parallelism for a
specific scan, but at the same time it would also increase the overhead for the
scan.

When we execute the special scans that execute joins in NDB in the DBSPJ module
we also have batches to handle. The NDB API will allocate memory for a set of
rows on each table, thus the total batch size is can become quite high. It is
however limited to a maximum of 4096 rows per table.

When DBSPJ concludes a batch towards the NDB API it will wait for the NDB API to
process those rows. However other DBSPJ modules working on other parts of the
query can continue the join processing. Actually the NDB API has setup enough
memory to receive 2 batch sets, this means that DBSPJ can continue on the next set
of rows even before the NDB API has processed the rows. This is another reason why
Q12 can execute faster than Q6 although it has more work to perform.

At the moment result rows are sent immediately from the DBLQH module as part of
the fragment scans (or key lookups). This means that we will process rows in the
NDB API that are not really needed to be handled there. It is not an inefficiency
since if not done by the NDB API the work has to be done by DBSPJ instead. But
we can increase parallelism by handling this in DBSPJ.

This possible increased parallelism comes from two things. First not sending
unnecessary rows to the NDB API means that we have to wait less time for the
NDB API to process rows. Additionally by storing rows in the DBSPJ module we
can increase the parallelism by using more memory in the data nodes for
query processing.

The conclusion here is that we have a number of wait states in the DBLQH module
while processing the scan waiting for the NDB API. We have similar wait states
in the join processing in the DBSPJ module waiting for the NDB API to process
the result rows from the join processing.

We already have implemented batch handling that makes the query execution efficient.
It is possible by storing result rows temporarily in DBLQH and in DBSPJ to improve
parallelism in the query execution.

Next part
.........
In the next part we will go through a bit more complex query, Q7 in TPC-H which is
a 6-way join that uses a mix of scans and key lookups.

The query is:
SELECT
        supp_nation,
        cust_nation,
        l_year,
        SUM(volume) AS revenue
FROM
        (
                SELECT
                        n1.n_name AS supp_nation,
                        n2.n_name AS cust_nation,
                        extract(year FROM l_shipdate) as l_year,
                        l_extendedprice * (1 - l_discount) AS volume
                FROM
                        supplier,
                        lineitem,
                        orders,
                        customer,
                        nation n1,
                        nation n2
                WHERE
                        s_suppkey = l_suppkey
                        AND o_orderkey = l_orderkey
                        AND c_custkey = o_custkey
                        AND s_nationkey = n1.n_nationkey
                        AND c_nationkey = n2.n_nationkey
                        AND (
                                (n1.n_name = 'GERMANY' AND n2.n_name = 'FRANCE')
                                OR (n1.n_name = 'FRANCE' AND n2.n_name = 'GERMANY')
                        )
                        AND l_shipdate BETWEEN '1995-01-01' AND '1996-12-31'
        ) AS shipping
GROUP BY
        supp_nation,
        cust_nation,
        l_year
ORDER BY
        supp_nation,
        cust_nation,
        l_year;

NDB Parallel Query, part 3

In the previous part we showed how NDB will parallelise a simple
2-way join query from TPC-H. In this part we will describe how
the pushdown of joins to a storage engine works in the MySQL Server.

First a quick introduction to how a SQL engine handles a query.
The query normally goes through 5 different phases:
1) Receive query on the client connection
2) Query parsing
3) Query optimisation
4) Query execution
5) Send result of query on client connection

The result of 1) is a text string that contains the SQL query to
execute. In this simplistic view of the SQL engine we will ignore
any such things as prepared statements and other things making the
model more complex.

The text string is parsed by 2) into a data structure that represents
the query in objects that match concepts in the SQL engine.

Query optimisation takes this data structure as input and creates a
new data structure that contains an executable query plan.

Query execution uses the data structure from the query optimisation
phase to execute the query.

Query execution produces a set of result rows that is sent back to the
client.

In the MySQL Server those phases isn't completely sequential, there are
many different optimisations that occurs in all phases. However for this
description it is accurate enough.

When we started working on the plan to develop a way to allow the storage
engine to take over parts of the query, we concluded that the storage
engine should use an Abstract Query Plan of some sort.

We decided early on to only push joins down after the query optimisation
phase. There could be some additional benefits of this, but the project
was sufficiently complex to handle anyways.

We see how the pushdown of a join into the storage engine happens:


As can be seen the storage engine receives the Query Plan as input and
produces a modified query plan as output. In addition the storage engine
creates an internal plan for how to execute the query internally.

NDB is involved in the Query Optimisation phase in the normal manner handled
by the MySQL Server. This means that NDB has to keep index statistics up to
date. A new feature can also be used to improve the cost model in MySQL 8.0.
This is to generate histograms on individual columns, this feature works per
MySQL Server and is generated by an SQL command. We will show a few examples
later on how this can be used to improve performance of queries.

MySQL uses a cost model, and this cost model works fairly well for NDB as well
even though NDB is a distributed storage engine. There is some improvements
possible in the exactness of the NDB index statistics, but the model as such
works well enough.

Examples of ways to change the query plan is when we push a condition
to the storage engine, in this case the query condition can be removed
from the query plan used by the MySQL Server. The internal query plan
contains information of join order, pushed conditions, linked reads
from a table earlier in the join order to a later table in the join
order. Some parts of the internal query execution can be modified as
the query is executed. Examples of such things is the parallelism used
in the query. This can be optimised to make optimal use of the server
resources (CPU, memory, disks and networks).

The storage engine can select to handle the join partially or fully.
Taking care of a pushdown partially can be down both on condition level
as well as on a table level.

So as an example if we have a condition that cannot be pushed to NDB, this
condition will not be pushed, but the table can still be pushed to NDB.
If we have a 6-way join and the third table in the join for some reason
cannot be pushed to NDB, then we can still push the join of the first two
tables, the result of those two tables joined is then joined inside the
MySQL server and finally the results are fed into the last 3-way join that
is also pushed to NDB.

One common case where a query can be pushed in multiple steps into NDB is
when the query contains a subquery. We will look into such an example
in a later blog.

So in conclusion the join pushdown works by first allowing the MySQL Server
to create an optimal execution plan. Next we attempt to push as many parts
down to the NDB storage engine as possible.

The idea with the pushdown is to be able to get more batching happening.
For example if we have a key lookup in one of the tables in the join it is not
possible to handle more than one row at a time using the MySQL Server whereas
with pushdown we can handle hundreds of key lookups in parallel.

Another reason is that by moving the join operator into the data node we come
closer to the data. This avoids a number of messages back and forth between the
MySQL Server and the data nodes.

Finally by moving the join operator down into the data nodes we can even have
multiple join operators. This can be used  also for other things than the join operator
in the future such as aggregation, sorting and so forth.

An alternative approach would be to push the entire query down to NDB when it
works. The NDB model with join pushdown of full or partial parts of the
queries however works very well for the NDB team. We are thus able to develop
improvements of the join pushdown in a stepwise approach and even without being
able to push the full query we can still improve the query substantially.

As an example Q12 was not completely pushed before MySQL Cluster 8.0.18. Still
pushing only parts of it made a speedup of 20x possible, when the final step
of comparing two columns was added in 8.0.18 the full improvement of 40x
was made possible.

Next part
.........
In the next part we will describe how NDB handle batches, this has an important
impact on the possible parallelism in query execution.

Friday, December 06, 2019

NDB Parallel Query, part 2

In part 1 we showed how NDB can parallelise a simple query with only a single
table involved. In this blog we will build on this and show how NDB can only
parallelise some parts of two-way join query. As example we will use Q12 in
DBT3:

SELECT
        l_shipmode,
        SUM(CASE
                WHEN o_orderpriority = '1-URGENT'
                        OR o_orderpriority = '2-HIGH'
                        THEN 1
                ELSE 0
        END) AS high_line_count,
        SUM(CASE
                WHEN o_orderpriority <> '1-URGENT'
                        AND o_orderpriority <> '2-HIGH'
                        THEN 1
                ELSE 0
        END) AS low_line_count
FROM
        orders,
        lineitem
WHERE
        o_orderkey = l_orderkey
        AND l_shipmode IN ('MAIL', 'SHIP')
        AND l_commitdate < l_receiptdate
        AND l_shipdate < l_commitdate
        AND l_receiptdate >= '1994-01-01'
        AND l_receiptdate < DATE_ADD( '1994-01-01', INTERVAL '1' year)
GROUP BY
        l_shipmode
ORDER BY
        l_shipmode;

This query when seen through the relational operators will first pass through
a SELECT operator and a PROJECT operator in the data nodes. The JOIN operator
will be executed on the lineitem and orders tables and the result of the JOIN operator
will be sent to the MySQL Server. The MySQL Server will thereafter handle the
GROUP BY operator with its aggregation function and also the final SORT operator.
Thus we can parallelise the filtering, projection and join, but the GROUP BY
aggregation and sorting will be implemented in the normal MySQL execution of
GROUP BY, SUM and sorting.

This query will be execute by first performing a range scan on the lineitem
table and evaluating the condition that limits the amount of rows to send to
the join with the orders table. The join is performed on the primary key of
the orders table. So the access in the orders table is a primary key lookup
for each row that comes from the range scan on the lineitem table.

In the MySQL implementation of this join one will fetch one row from the
lineitem table and for each such row it will perform a primary key lookup
in the orders table. Given that this means that we can only handle one
primary key lookup at a time unless we do something in the NDB storage
engine. The execution of this query without pushdown join would make
it possible to run the scans towards the lineitem table in parallel. The
primary key lookup on the orders table would however execute serially
and only fetching one row at a time. This will increase the query time in
this case with a factor of around 5x. So by pushing the join down into
the NDB data nodes we can make sure that the primary key lookups on the
orders table are parallelised as well.

To handle this the MySQL Server has the ability to push an entire join
execution down to the storage engine. We will describe this interface in more
detail in a later blog part.

To handle this query in NDB we have implemented a special scan protocol that
enables performing complex join operations. The scan will be presented with
a parameter part for each table in the join operation that will describe the
dependencies between the table and the conditions to be pushed to each table.

This is implemented in the TC threads in the NDB data node. The TC threads in
this case acts as parallel JOIN operators. The join is parallelised on the
first table in the join, in this case the lineitem table. For each node in
the cluster a JOIN operator will be created that takes care of scanning all
partitions that have its primary partition in the node. This means that the
scan of the first table and the join operator is always located on the same node.

The primary key lookup is sent to the node where the data resides, in a cluster
with 2 replicas and 2 nodes and the table uses READ BACKUP, we will always find
the row locally. With larger clusters the likelihood that this lookup is sent
over the network increases.

Compared to a single threaded storage engine this query scales almost 30x times
using 2 nodes with 8 LDM threads each. NDBs implementation is as mentioned in
the previous blog very efficiently implemented, so the speedup gets a benefit
from this.

This query is more efficiently implemented in MySQL Cluster 8.0.18 since we
implemented support for comparing two columns, both from the same table and
from different tables provided they have the same data type. This improved
performance of this query by 2x. Previous to this the NDB interpreter could
handle comparisons of the type col_name COMPARATOR constant, e.g.
l_receiptdate >= '1994-01-01'.

Query Execution


In the figure below we show the execution flow for this query in NDB. As described
above we have a module called DBSPJ in the TC threads that handle the JOIN
processing. We have shown in the figure below the flow for the scan of the lineitem
table in blue arrows. The primary key lookups have been shown with red arrows.
In the figure below we have assumed that we're not using READ BACKUP. We will
describe in more detail the impact of READ BACKUP in a later part of this blog serie.


Query Analysis

The query will read the lineitem in parallel using a range scan. This scan will
evaluate 909.844 rows when using scale factor 1 in TPC-H. Of those rows there will
be 30.988 rows that will evaluate to true. Each of those 30.988 rows will be sent to the
NDB API but will also be reported to the DBSPJ module to issue parallel key lookups
towards the orders table.

As a matter of a fact this query will actually execute faster than Q6 although it does
more work compared to the previous query we analysed (Q6 in TPC-H). Most of the
work is done in the lineitem table, both Q6 and Q12 does almost the same amount of
work in the range scan on the lineitem. However since there are fewer records to report
back to the MySQL Server this means that parallelism is improved due to the batch
handling in NDB.

Scalability impact

This query will scale very well with more partitions of the lineitem table
and the orders table. As the cluster grows some scalability impact will
come from a higher cost of the primary key lookups that have to be sent on
the network to other nodes.

Next Part

In part 3 we will discuss how the MySQL Server and the NDB storage engine works
together to define the query parts pushed down to NDB.

NDB Parallel Query, part 1

I will describe how NDB handles complex SQL queries in a number of
blogs. NDB has the ability to parallelise parts of join processing.
Ensuring that your queries makes best possible use of these
parallelisation features enables appplications to boost their
performance significantly. It will also be a good base to explain
any improvements we add to the query processing in NDB Cluster.

NDB was designed from the beginning for extremely efficient key lookups
and for extreme availability (less than 30 seconds of downtime per year
including time for software change, meta data changes and crashes).

Originally the model was single-threaded and optimised for 1-2 CPUs.
The execution model uses an architecture where messages are sent
between modules. This made it very straightforward to extend the
architecture to support multi-threaded execution when CPUs with
many cores became prevalent. The first multi-threaded version of NDB
was version 7.0. This supported up to 7 threads working parallel
plus a large number of threads handling interaction with the file
system.

With the introduction of 7.0 the scans of a table, either using an
range scan on an index or scanning the entire table was automatially
parallelised. So NDB have supported a limited form of parallel query
already since the release of 7.0 (around 2011 I think).

Now let's use an example query, Q6 from DBT3 that mimics TPC-H.

SELECT
    SUM(l_extendedprice * l_discount) AS revenue
FROM
    lineitem
WHERE
    l_shipdate >= '1994-01-01'
    AND l_shipdate < DATE_ADD( '1994-01-01' , INTERVAL '1' year)
    AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
    AND l_quantity < 24;

The execution of this will use a range scan on the index on l_shipdate.
This range is a perfectly normal range scan in NDB. Since range scans
are parallelised, this query will execute using 1 CPU for each partition
of the table. Assuming that we set up a cluster with default setup
and with 8 LDM threads the table will be partitioned into 16 partitions.
Each of those partitions will have a different CPU for the primary
partition. This means that the range scans will execute on 16 CPUs in
parallel.

LDM (Local Data Manager) is the name of the threads in the NDB data
nodes that manages the actual data in NDB. It contains a distributed
hash index for the primary keys and unique keys, an ordered index
implemented as a T-tree, a query handler that controls execution of
lookups and scans and checkpointing and also handles the REDO log.
Finally the LDM thread contains the row storage that has 4 parts.
Fixed size parts of the row in memory, variable sized parts of the
row in memory, dynamic parts of the row (absence of a column here
means that it is NULL, so this provides the ability to ADD a column
as an online operation) and finally a fixed size part that is stored
on disk using a page cache. The row storage also contains an
interpreter that can evaluate conditions, perform simple operations
like add to support efficient auto increment.

Now the first implementation of the NDB storage engine was implemented
such that all condition evaluation was done in the MySQL Server. This
meant that although we could scan the table in parallel, we still had
a single thread to evaluate the conditions. This meant that to handle
this query efficiently a condition pushdown is required. Condition
pushdown was added to the MySQL storage engine API a fairly long time
ago as part of the NDB development and can also benefit any other
storage engine that can handle condition evaluation.

So the above contains 3 parts that can be parallelised individually.
Scanning the data, evaluating the condition and finally performing
the sum on the rows that match the condition.

NDB currently parallelises the scan part and the condition evaluation
part. The sum is handled by the MySQL Server. In this case this the
filtering factor is high, so this means that the sum part is not a
bottleneck in this query. The bottleneck in this query is scanning
the data and evaluating the condition.

In the terminology of relational algebra this means that NDB supports
a parallelised SELECT operator for some filters. NDB also supports a
parallel PROJECT operator. NDB doesn't yet support a parallel
AGGREGATE function.

The bottleneck in this query is how fast one can scan the data and
evaluate the condition. In version 7.6 we made a substantial
optimisation of this part where we managed to improve a simple
query by 240% through low-level optimisations of the code.
With this optimisation NDB can handle more than 2 million rows
per second per CPU with a very simple condition to evaluate. This
query greatly benefits from this greater efficiency. Executing this
query with scale factor 10 (60M rows in the lineitem table) takes
about 1.5 seconds with the configuration above where 16 CPUs
concurrently perform the scan and condition evaluation.

A single-threaded storage engine is around 20x slower. With more
CPUs available in the LDM threads the parallelisation will be even
higher.

Obviously there are other DBMSs that are focused on analytical
queries that can handle this query even faster, NDB is focused
on online applications with high write scalability and the highest
availability. But we are working to also make query execution of
complex SQL much faster that online applications can analyze
data in real-time.

Query Execution

In the figure below we describe the execution flow for this query. As usual
the query starts with parsing (unless it is a prepared statement) and after
that the query is optimised.

This query is executed as a single range scan against the lineitem table. Scans
are controlled by a TC thread that ensures that all the fragments of the table are
scanned. It is possible to control the parallelism of the query through the
NDB API. In most of the cases the parallelism will be full parallelism. Each thread
has a real-time scheduler and the scan in the LDM threads will be split up into
multiple executions that will be interleaved with execution by other queries
executing in parallel.

This means that in an idle system this query will be able to execute at full speed.
However even if there is lots of other queries going on in parallel the query will
execute almost as fast as long as the CPUs are not overloaded.

In the figure below we also show that control of the scan goes through the TC
thread, but the result row is sent directly from the LDM thread to the NDB API.

In the MySQL Server the NDB storage engine gets the row from the NDB API
and returns it to the MySQL Server for the sum function and result processing.

Query Analysis

The query reads the lineitem table that has about 6M rows in scale
factor 1. It reads them using an index on l_shipdate. The range
consists of 909.455 rows to analyse and of those 114.160 rows are
produced to calculate results of the sum.  In the above configuration
it takes about 0.15 seconds for NDB to execute the query. There are
some limitations to get full use of all CPUs involved even in this
query that is related to batch handling. I will describe this in a
later blog.

Scalability impact

This query is only positively impacted by any type of scaling. The
more fragments the lineitem table is partitioned into, the more
parallelism the query will use. So the only limitation to scaling
is when the sum part starts to become the bottleneck.

Next part

In the next part we will discuss how NDB can parallelise a very
simple 2-way join from the DBT3 benchmark. This is Q12 from
TPC-H that looks like this.

SELECT
        l_shipmode,
        SUM(CASE
                WHEN o_orderpriority = '1-URGENT'
                        OR o_orderpriority = '2-HIGH'
                        THEN 1
                ELSE 0
        END) AS high_line_count,
        SUM(CASE
                WHEN o_orderpriority <> '1-URGENT'
                        AND o_orderpriority <> '2-HIGH'
                        THEN 1
                ELSE 0
        END) AS low_line_count
FROM
        orders,
        lineitem
WHERE
        o_orderkey = l_orderkey
        AND l_shipmode IN ('MAIL', 'SHIP')
        AND l_commitdate < l_receiptdate
        AND l_shipdate < l_commitdate
        AND l_receiptdate >= '1994-01-01'
        AND l_receiptdate < DATE_ADD( '1994-01-01', INTERVAL '1' year)
GROUP BY
        l_shipmode
ORDER BY
        l_shipmode;

This query introduces 3 additional relational algebra operators,
a JOIN operator, a GROUP BY operator and a SORT operator.