Monday, December 09, 2019

NDB Parallel Query, part 4

In this part we will discuss how NDB batch handling works. Query execution of
complex SQL queries means that more rows can be delivered than the receiver is
capable of receiving. This means that we need to create a data flow from the
producer where the data resides and the query executor in the MySQL Server.

The MySQL Server uses a record where the storage engine have to copy the result
row into the record. This means that the storage of batches of rows is taken
care of by the storage engine.

When NDB performs a range scan it will decide on the possible parallelism before
the scan is started. The NDB API have to allocate enough memory to ensure that
we have memory prepared to receive the rows as they arrive in a flow of result
rows from the data nodes. It is possible to set batch size of hundreds and even
thousands of rows for a query.

The executor of the scan is the DBTC module in the TC threads. This module only
passes message through and sends them to the proper place. There is no storage
of result rows in DBTC. There is only one TC thread involved in one scan (range
scan or full table scan). The TC thread will decide on which modules that should
handle each individual fragment scan. The message to scan contains a set of
references to the memory available in the NDB API. This set of references is in
turn distributed to the fragment scans. This means that these can send result
rows directly to the NDB API.

When a fragment scan has completed sending rows for all memory references it
cannot continue until the NDB API has processed these rows. The fragment
scan handled by the DBLQH module in the LDM threads will send information
to the DBTC module that it waits for a continue request. The DBTC module will
ensure that the NDB API knows that it should receive a set of rows as specified in
the response to the NDB API.

As soon as the NDB API have processed the set of rows it will inform the DBTC
module that it is now ready to receive more rows. Since there are multiple fragment
scans it is possible that rows have been continously received in the NDB API while
it was processing the rows received previously.

As can be seen in the above description the fragment scans will not be actively
performing the scans all the time. It would be possible to scan in the DBLQH
module and store the result row locally there until the continue request arrives.
This is not done currently, it would obviously increase the parallelism for a
specific scan, but at the same time it would also increase the overhead for the
scan.

When we execute the special scans that execute joins in NDB in the DBSPJ module
we also have batches to handle. The NDB API will allocate memory for a set of
rows on each table, thus the total batch size is can become quite high. It is
however limited to a maximum of 4096 rows per table.

When DBSPJ concludes a batch towards the NDB API it will wait for the NDB API to
process those rows. However other DBSPJ modules working on other parts of the
query can continue the join processing. Actually the NDB API has setup enough
memory to receive 2 batch sets, this means that DBSPJ can continue on the next set
of rows even before the NDB API has processed the rows. This is another reason why
Q12 can execute faster than Q6 although it has more work to perform.

At the moment result rows are sent immediately from the DBLQH module as part of
the fragment scans (or key lookups). This means that we will process rows in the
NDB API that are not really needed to be handled there. It is not an inefficiency
since if not done by the NDB API the work has to be done by DBSPJ instead. But
we can increase parallelism by handling this in DBSPJ.

This possible increased parallelism comes from two things. First not sending
unnecessary rows to the NDB API means that we have to wait less time for the
NDB API to process rows. Additionally by storing rows in the DBSPJ module we
can increase the parallelism by using more memory in the data nodes for
query processing.

The conclusion here is that we have a number of wait states in the DBLQH module
while processing the scan waiting for the NDB API. We have similar wait states
in the join processing in the DBSPJ module waiting for the NDB API to process
the result rows from the join processing.

We already have implemented batch handling that makes the query execution efficient.
It is possible by storing result rows temporarily in DBLQH and in DBSPJ to improve
parallelism in the query execution.

Next part
.........
In the next part we will go through a bit more complex query, Q7 in TPC-H which is
a 6-way join that uses a mix of scans and key lookups.

The query is:
SELECT
        supp_nation,
        cust_nation,
        l_year,
        SUM(volume) AS revenue
FROM
        (
                SELECT
                        n1.n_name AS supp_nation,
                        n2.n_name AS cust_nation,
                        extract(year FROM l_shipdate) as l_year,
                        l_extendedprice * (1 - l_discount) AS volume
                FROM
                        supplier,
                        lineitem,
                        orders,
                        customer,
                        nation n1,
                        nation n2
                WHERE
                        s_suppkey = l_suppkey
                        AND o_orderkey = l_orderkey
                        AND c_custkey = o_custkey
                        AND s_nationkey = n1.n_nationkey
                        AND c_nationkey = n2.n_nationkey
                        AND (
                                (n1.n_name = 'GERMANY' AND n2.n_name = 'FRANCE')
                                OR (n1.n_name = 'FRANCE' AND n2.n_name = 'GERMANY')
                        )
                        AND l_shipdate BETWEEN '1995-01-01' AND '1996-12-31'
        ) AS shipping
GROUP BY
        supp_nation,
        cust_nation,
        l_year
ORDER BY
        supp_nation,
        cust_nation,
        l_year;

No comments: