Wednesday, December 11, 2019

NDB Parallel Query, part 5

In this part we are going to analyze a bit more complex query than before.
This query is a 6-way join.

The query is:
SELECT
        supp_nation,
        cust_nation,
        l_year,
        SUM(volume) AS revenue
FROM
        (
                SELECT
                        n1.n_name AS supp_nation,
                        n2.n_name AS cust_nation,
                        extract(year FROM l_shipdate) as l_year,
                        l_extendedprice * (1 - l_discount) AS volume
                FROM
                        supplier,
                        lineitem,
                        orders,
                        customer,
                        nation n1,
                        nation n2
                WHERE
                        s_suppkey = l_suppkey
                        AND o_orderkey = l_orderkey
                        AND c_custkey = o_custkey
                        AND s_nationkey = n1.n_nationkey
                        AND c_nationkey = n2.n_nationkey
                        AND (
                                (n1.n_name = 'GERMANY' AND n2.n_name = 'FRANCE')
                                OR (n1.n_name = 'FRANCE' AND n2.n_name = 'GERMANY')
                        )
                        AND l_shipdate BETWEEN '1995-01-01' AND '1996-12-31'
        ) AS shipping
GROUP BY
        supp_nation,
        cust_nation,
        l_year
ORDER BY
        supp_nation,
        cust_nation,
        l_year;

It is the inner SELECT that is the 6-way join. The outer part only deals with the
GROUP BY aggregation and ORDER BY of the result set from the inner
SELECT. As mentioned before the GROUP BY aggregation and ORDER BY
parts are handled by the MySQL Server. So the NDB join pushdown only deals
with the inner select.

In the previous queries we analysed the join order was pretty obvious. In this
case it isn't that obvious. But the selection of join order is still fairly
straightforward. The selected join order is
n1 -> supplier -> lineitem -> orders -> customer -> n2.

Query analysis

The query starts by reading 2 rows from the nation table. This creates a new scan
on the supplier table, these 2 rows are either coming from the same TC thread or
from separate TC threads. This scan generates data for the next scan in the supplier
table. The supplier table will return 798 rows that is used in the scan against the
lineitem table. This assumes scale factor 1.

This represents a new thing to discuss. If this query would have been executed in the
MySQL Server we would only be able to handle one row from the supplier table at a
time. There have been some improvement in the storage engine API to handle this
using read multi range API in the storage engine API. This means a lot of
communication back and forth and starting up new scans. With the NDB join
processing we will send a multi-range scan to the lineitem table. This means that we
will send one scan message that contains many different ranges. There will still be a
new walking through the index tree for each range, but there is no need to send the
scan messages again and again.

Creation of these multi-ranges is handled as part of the join processing in the
DBSPJ module.

The join between supplier table and the lineitem contains one more interesting
aspect. Here we join towards the column l_orderkey in the lineitem table. In many
queries in TPC-H the join against the lineitem table uses the order key as the join
column. The order key is the first part of the primary key and is thus a candidate to
use as partition key. The TPC-H queries definitely improves by using the order key as
partition key instead of the primary key. This means that the orders and all lineitems
for the order are stored in the same LDM thread.

The scan on the lineitem will produce 145.703 to join with the orders table. The rest of
the joins are joined through the primary key. Thus we will perform 145.703 key lookups
in the orders table, there will be 145.703 key lookups in the customer table and finally
there will be 145.703 lookups against the nations table. The only filtering here will be
on the last table that will decrease the amount of result rows to the MySQL Server,
the end result will be 5.924 rows.

This gives another new point that it would be possible to increase parallelism in this
query by storing the result rows in the DBSPJ. However this would increase the
overhead, so it would improve parallelism at the cost of efficiency.

Scalability impact

If we make sure that the lineitem table is partitioned on the order key this query will
scale nicely. There will be fairly small impact with more partitions since only the scan
against the supplier table will be more costly in a larger cluster.

One thing that will make the query cost more is when the primary key lookups are
distributed instead of local. One table that definitely will be a good idea to use
FULLY REPLICATED for is the nations table. This means that all those 145.703 key
lookups will be handled inside a data node instead of over the network.

The supplier table has only 10.000 rows compared to the lineitem table that has
6M rows. Thus it should definitely be possible to use FULLY REPLICATED also
for this table. The customer table has 150.000 rows and is another candidate to use
for FULLY REPLICATED.

Since the MySQL Server will have to handle more than 300.000 rows in this query,
this will be the main bottleneck for parallelism. This means that the query will have a
parallelism of about 5. This is also the speed up we see compared to single threaded
storage engine for this query. This bottleneck will be about the same even with
larger clusters.

Next Part

I will take a break in this sequence of blogs for now and come back later with a
description of a bit more involved queries and how NDB handles pushing down
subqueries and parts of join query.

Monday, December 09, 2019

NDB Parallel Query, part 4

In this part we will discuss how NDB batch handling works. Query execution of
complex SQL queries means that more rows can be delivered than the receiver is
capable of receiving. This means that we need to create a data flow from the
producer where the data resides and the query executor in the MySQL Server.

The MySQL Server uses a record where the storage engine have to copy the result
row into the record. This means that the storage of batches of rows is taken
care of by the storage engine.

When NDB performs a range scan it will decide on the possible parallelism before
the scan is started. The NDB API have to allocate enough memory to ensure that
we have memory prepared to receive the rows as they arrive in a flow of result
rows from the data nodes. It is possible to set batch size of hundreds and even
thousands of rows for a query.

The executor of the scan is the DBTC module in the TC threads. This module only
passes message through and sends them to the proper place. There is no storage
of result rows in DBTC. There is only one TC thread involved in one scan (range
scan or full table scan). The TC thread will decide on which modules that should
handle each individual fragment scan. The message to scan contains a set of
references to the memory available in the NDB API. This set of references is in
turn distributed to the fragment scans. This means that these can send result
rows directly to the NDB API.

When a fragment scan has completed sending rows for all memory references it
cannot continue until the NDB API has processed these rows. The fragment
scan handled by the DBLQH module in the LDM threads will send information
to the DBTC module that it waits for a continue request. The DBTC module will
ensure that the NDB API knows that it should receive a set of rows as specified in
the response to the NDB API.

As soon as the NDB API have processed the set of rows it will inform the DBTC
module that it is now ready to receive more rows. Since there are multiple fragment
scans it is possible that rows have been continously received in the NDB API while
it was processing the rows received previously.

As can be seen in the above description the fragment scans will not be actively
performing the scans all the time. It would be possible to scan in the DBLQH
module and store the result row locally there until the continue request arrives.
This is not done currently, it would obviously increase the parallelism for a
specific scan, but at the same time it would also increase the overhead for the
scan.

When we execute the special scans that execute joins in NDB in the DBSPJ module
we also have batches to handle. The NDB API will allocate memory for a set of
rows on each table, thus the total batch size is can become quite high. It is
however limited to a maximum of 4096 rows per table.

When DBSPJ concludes a batch towards the NDB API it will wait for the NDB API to
process those rows. However other DBSPJ modules working on other parts of the
query can continue the join processing. Actually the NDB API has setup enough
memory to receive 2 batch sets, this means that DBSPJ can continue on the next set
of rows even before the NDB API has processed the rows. This is another reason why
Q12 can execute faster than Q6 although it has more work to perform.

At the moment result rows are sent immediately from the DBLQH module as part of
the fragment scans (or key lookups). This means that we will process rows in the
NDB API that are not really needed to be handled there. It is not an inefficiency
since if not done by the NDB API the work has to be done by DBSPJ instead. But
we can increase parallelism by handling this in DBSPJ.

This possible increased parallelism comes from two things. First not sending
unnecessary rows to the NDB API means that we have to wait less time for the
NDB API to process rows. Additionally by storing rows in the DBSPJ module we
can increase the parallelism by using more memory in the data nodes for
query processing.

The conclusion here is that we have a number of wait states in the DBLQH module
while processing the scan waiting for the NDB API. We have similar wait states
in the join processing in the DBSPJ module waiting for the NDB API to process
the result rows from the join processing.

We already have implemented batch handling that makes the query execution efficient.
It is possible by storing result rows temporarily in DBLQH and in DBSPJ to improve
parallelism in the query execution.

Next part
.........
In the next part we will go through a bit more complex query, Q7 in TPC-H which is
a 6-way join that uses a mix of scans and key lookups.

The query is:
SELECT
        supp_nation,
        cust_nation,
        l_year,
        SUM(volume) AS revenue
FROM
        (
                SELECT
                        n1.n_name AS supp_nation,
                        n2.n_name AS cust_nation,
                        extract(year FROM l_shipdate) as l_year,
                        l_extendedprice * (1 - l_discount) AS volume
                FROM
                        supplier,
                        lineitem,
                        orders,
                        customer,
                        nation n1,
                        nation n2
                WHERE
                        s_suppkey = l_suppkey
                        AND o_orderkey = l_orderkey
                        AND c_custkey = o_custkey
                        AND s_nationkey = n1.n_nationkey
                        AND c_nationkey = n2.n_nationkey
                        AND (
                                (n1.n_name = 'GERMANY' AND n2.n_name = 'FRANCE')
                                OR (n1.n_name = 'FRANCE' AND n2.n_name = 'GERMANY')
                        )
                        AND l_shipdate BETWEEN '1995-01-01' AND '1996-12-31'
        ) AS shipping
GROUP BY
        supp_nation,
        cust_nation,
        l_year
ORDER BY
        supp_nation,
        cust_nation,
        l_year;

NDB Parallel Query, part 3

In the previous part we showed how NDB will parallelise a simple
2-way join query from TPC-H. In this part we will describe how
the pushdown of joins to a storage engine works in the MySQL Server.

First a quick introduction to how a SQL engine handles a query.
The query normally goes through 5 different phases:
1) Receive query on the client connection
2) Query parsing
3) Query optimisation
4) Query execution
5) Send result of query on client connection

The result of 1) is a text string that contains the SQL query to
execute. In this simplistic view of the SQL engine we will ignore
any such things as prepared statements and other things making the
model more complex.

The text string is parsed by 2) into a data structure that represents
the query in objects that match concepts in the SQL engine.

Query optimisation takes this data structure as input and creates a
new data structure that contains an executable query plan.

Query execution uses the data structure from the query optimisation
phase to execute the query.

Query execution produces a set of result rows that is sent back to the
client.

In the MySQL Server those phases isn't completely sequential, there are
many different optimisations that occurs in all phases. However for this
description it is accurate enough.

When we started working on the plan to develop a way to allow the storage
engine to take over parts of the query, we concluded that the storage
engine should use an Abstract Query Plan of some sort.

We decided early on to only push joins down after the query optimisation
phase. There could be some additional benefits of this, but the project
was sufficiently complex to handle anyways.

We see how the pushdown of a join into the storage engine happens:


As can be seen the storage engine receives the Query Plan as input and
produces a modified query plan as output. In addition the storage engine
creates an internal plan for how to execute the query internally.

NDB is involved in the Query Optimisation phase in the normal manner handled
by the MySQL Server. This means that NDB has to keep index statistics up to
date. A new feature can also be used to improve the cost model in MySQL 8.0.
This is to generate histograms on individual columns, this feature works per
MySQL Server and is generated by an SQL command. We will show a few examples
later on how this can be used to improve performance of queries.

MySQL uses a cost model, and this cost model works fairly well for NDB as well
even though NDB is a distributed storage engine. There is some improvements
possible in the exactness of the NDB index statistics, but the model as such
works well enough.

Examples of ways to change the query plan is when we push a condition
to the storage engine, in this case the query condition can be removed
from the query plan used by the MySQL Server. The internal query plan
contains information of join order, pushed conditions, linked reads
from a table earlier in the join order to a later table in the join
order. Some parts of the internal query execution can be modified as
the query is executed. Examples of such things is the parallelism used
in the query. This can be optimised to make optimal use of the server
resources (CPU, memory, disks and networks).

The storage engine can select to handle the join partially or fully.
Taking care of a pushdown partially can be down both on condition level
as well as on a table level.

So as an example if we have a condition that cannot be pushed to NDB, this
condition will not be pushed, but the table can still be pushed to NDB.
If we have a 6-way join and the third table in the join for some reason
cannot be pushed to NDB, then we can still push the join of the first two
tables, the result of those two tables joined is then joined inside the
MySQL server and finally the results are fed into the last 3-way join that
is also pushed to NDB.

One common case where a query can be pushed in multiple steps into NDB is
when the query contains a subquery. We will look into such an example
in a later blog.

So in conclusion the join pushdown works by first allowing the MySQL Server
to create an optimal execution plan. Next we attempt to push as many parts
down to the NDB storage engine as possible.

The idea with the pushdown is to be able to get more batching happening.
For example if we have a key lookup in one of the tables in the join it is not
possible to handle more than one row at a time using the MySQL Server whereas
with pushdown we can handle hundreds of key lookups in parallel.

Another reason is that by moving the join operator into the data node we come
closer to the data. This avoids a number of messages back and forth between the
MySQL Server and the data nodes.

Finally by moving the join operator down into the data nodes we can even have
multiple join operators. This can be used  also for other things than the join operator
in the future such as aggregation, sorting and so forth.

An alternative approach would be to push the entire query down to NDB when it
works. The NDB model with join pushdown of full or partial parts of the
queries however works very well for the NDB team. We are thus able to develop
improvements of the join pushdown in a stepwise approach and even without being
able to push the full query we can still improve the query substantially.

As an example Q12 was not completely pushed before MySQL Cluster 8.0.18. Still
pushing only parts of it made a speedup of 20x possible, when the final step
of comparing two columns was added in 8.0.18 the full improvement of 40x
was made possible.

Next part
.........
In the next part we will describe how NDB handle batches, this has an important
impact on the possible parallelism in query execution.

Friday, December 06, 2019

NDB Parallel Query, part 2

In part 1 we showed how NDB can parallelise a simple query with only a single
table involved. In this blog we will build on this and show how NDB can only
parallelise some parts of two-way join query. As example we will use Q12 in
DBT3:

SELECT
        l_shipmode,
        SUM(CASE
                WHEN o_orderpriority = '1-URGENT'
                        OR o_orderpriority = '2-HIGH'
                        THEN 1
                ELSE 0
        END) AS high_line_count,
        SUM(CASE
                WHEN o_orderpriority <> '1-URGENT'
                        AND o_orderpriority <> '2-HIGH'
                        THEN 1
                ELSE 0
        END) AS low_line_count
FROM
        orders,
        lineitem
WHERE
        o_orderkey = l_orderkey
        AND l_shipmode IN ('MAIL', 'SHIP')
        AND l_commitdate < l_receiptdate
        AND l_shipdate < l_commitdate
        AND l_receiptdate >= '1994-01-01'
        AND l_receiptdate < DATE_ADD( '1994-01-01', INTERVAL '1' year)
GROUP BY
        l_shipmode
ORDER BY
        l_shipmode;

This query when seen through the relational operators will first pass through
a SELECT operator and a PROJECT operator in the data nodes. The JOIN operator
will be executed on the lineitem and orders tables and the result of the JOIN operator
will be sent to the MySQL Server. The MySQL Server will thereafter handle the
GROUP BY operator with its aggregation function and also the final SORT operator.
Thus we can parallelise the filtering, projection and join, but the GROUP BY
aggregation and sorting will be implemented in the normal MySQL execution of
GROUP BY, SUM and sorting.

This query will be execute by first performing a range scan on the lineitem
table and evaluating the condition that limits the amount of rows to send to
the join with the orders table. The join is performed on the primary key of
the orders table. So the access in the orders table is a primary key lookup
for each row that comes from the range scan on the lineitem table.

In the MySQL implementation of this join one will fetch one row from the
lineitem table and for each such row it will perform a primary key lookup
in the orders table. Given that this means that we can only handle one
primary key lookup at a time unless we do something in the NDB storage
engine. The execution of this query without pushdown join would make
it possible to run the scans towards the lineitem table in parallel. The
primary key lookup on the orders table would however execute serially
and only fetching one row at a time. This will increase the query time in
this case with a factor of around 5x. So by pushing the join down into
the NDB data nodes we can make sure that the primary key lookups on the
orders table are parallelised as well.

To handle this the MySQL Server has the ability to push an entire join
execution down to the storage engine. We will describe this interface in more
detail in a later blog part.

To handle this query in NDB we have implemented a special scan protocol that
enables performing complex join operations. The scan will be presented with
a parameter part for each table in the join operation that will describe the
dependencies between the table and the conditions to be pushed to each table.

This is implemented in the TC threads in the NDB data node. The TC threads in
this case acts as parallel JOIN operators. The join is parallelised on the
first table in the join, in this case the lineitem table. For each node in
the cluster a JOIN operator will be created that takes care of scanning all
partitions that have its primary partition in the node. This means that the
scan of the first table and the join operator is always located on the same node.

The primary key lookup is sent to the node where the data resides, in a cluster
with 2 replicas and 2 nodes and the table uses READ BACKUP, we will always find
the row locally. With larger clusters the likelihood that this lookup is sent
over the network increases.

Compared to a single threaded storage engine this query scales almost 30x times
using 2 nodes with 8 LDM threads each. NDBs implementation is as mentioned in
the previous blog very efficiently implemented, so the speedup gets a benefit
from this.

This query is more efficiently implemented in MySQL Cluster 8.0.18 since we
implemented support for comparing two columns, both from the same table and
from different tables provided they have the same data type. This improved
performance of this query by 2x. Previous to this the NDB interpreter could
handle comparisons of the type col_name COMPARATOR constant, e.g.
l_receiptdate >= '1994-01-01'.

Query Execution


In the figure below we show the execution flow for this query in NDB. As described
above we have a module called DBSPJ in the TC threads that handle the JOIN
processing. We have shown in the figure below the flow for the scan of the lineitem
table in blue arrows. The primary key lookups have been shown with red arrows.
In the figure below we have assumed that we're not using READ BACKUP. We will
describe in more detail the impact of READ BACKUP in a later part of this blog serie.


Query Analysis

The query will read the lineitem in parallel using a range scan. This scan will
evaluate 909.844 rows when using scale factor 1 in TPC-H. Of those rows there will
be 30.988 rows that will evaluate to true. Each of those 30.988 rows will be sent to the
NDB API but will also be reported to the DBSPJ module to issue parallel key lookups
towards the orders table.

As a matter of a fact this query will actually execute faster than Q6 although it does
more work compared to the previous query we analysed (Q6 in TPC-H). Most of the
work is done in the lineitem table, both Q6 and Q12 does almost the same amount of
work in the range scan on the lineitem. However since there are fewer records to report
back to the MySQL Server this means that parallelism is improved due to the batch
handling in NDB.

Scalability impact

This query will scale very well with more partitions of the lineitem table
and the orders table. As the cluster grows some scalability impact will
come from a higher cost of the primary key lookups that have to be sent on
the network to other nodes.

Next Part

In part 3 we will discuss how the MySQL Server and the NDB storage engine works
together to define the query parts pushed down to NDB.

NDB Parallel Query, part 1

I will describe how NDB handles complex SQL queries in a number of
blogs. NDB has the ability to parallelise parts of join processing.
Ensuring that your queries makes best possible use of these
parallelisation features enables appplications to boost their
performance significantly. It will also be a good base to explain
any improvements we add to the query processing in NDB Cluster.

NDB was designed from the beginning for extremely efficient key lookups
and for extreme availability (less than 30 seconds of downtime per year
including time for software change, meta data changes and crashes).

Originally the model was single-threaded and optimised for 1-2 CPUs.
The execution model uses an architecture where messages are sent
between modules. This made it very straightforward to extend the
architecture to support multi-threaded execution when CPUs with
many cores became prevalent. The first multi-threaded version of NDB
was version 7.0. This supported up to 7 threads working parallel
plus a large number of threads handling interaction with the file
system.

With the introduction of 7.0 the scans of a table, either using an
range scan on an index or scanning the entire table was automatially
parallelised. So NDB have supported a limited form of parallel query
already since the release of 7.0 (around 2011 I think).

Now let's use an example query, Q6 from DBT3 that mimics TPC-H.

SELECT
    SUM(l_extendedprice * l_discount) AS revenue
FROM
    lineitem
WHERE
    l_shipdate >= '1994-01-01'
    AND l_shipdate < DATE_ADD( '1994-01-01' , INTERVAL '1' year)
    AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
    AND l_quantity < 24;

The execution of this will use a range scan on the index on l_shipdate.
This range is a perfectly normal range scan in NDB. Since range scans
are parallelised, this query will execute using 1 CPU for each partition
of the table. Assuming that we set up a cluster with default setup
and with 8 LDM threads the table will be partitioned into 16 partitions.
Each of those partitions will have a different CPU for the primary
partition. This means that the range scans will execute on 16 CPUs in
parallel.

LDM (Local Data Manager) is the name of the threads in the NDB data
nodes that manages the actual data in NDB. It contains a distributed
hash index for the primary keys and unique keys, an ordered index
implemented as a T-tree, a query handler that controls execution of
lookups and scans and checkpointing and also handles the REDO log.
Finally the LDM thread contains the row storage that has 4 parts.
Fixed size parts of the row in memory, variable sized parts of the
row in memory, dynamic parts of the row (absence of a column here
means that it is NULL, so this provides the ability to ADD a column
as an online operation) and finally a fixed size part that is stored
on disk using a page cache. The row storage also contains an
interpreter that can evaluate conditions, perform simple operations
like add to support efficient auto increment.

Now the first implementation of the NDB storage engine was implemented
such that all condition evaluation was done in the MySQL Server. This
meant that although we could scan the table in parallel, we still had
a single thread to evaluate the conditions. This meant that to handle
this query efficiently a condition pushdown is required. Condition
pushdown was added to the MySQL storage engine API a fairly long time
ago as part of the NDB development and can also benefit any other
storage engine that can handle condition evaluation.

So the above contains 3 parts that can be parallelised individually.
Scanning the data, evaluating the condition and finally performing
the sum on the rows that match the condition.

NDB currently parallelises the scan part and the condition evaluation
part. The sum is handled by the MySQL Server. In this case this the
filtering factor is high, so this means that the sum part is not a
bottleneck in this query. The bottleneck in this query is scanning
the data and evaluating the condition.

In the terminology of relational algebra this means that NDB supports
a parallelised SELECT operator for some filters. NDB also supports a
parallel PROJECT operator. NDB doesn't yet support a parallel
AGGREGATE function.

The bottleneck in this query is how fast one can scan the data and
evaluate the condition. In version 7.6 we made a substantial
optimisation of this part where we managed to improve a simple
query by 240% through low-level optimisations of the code.
With this optimisation NDB can handle more than 2 million rows
per second per CPU with a very simple condition to evaluate. This
query greatly benefits from this greater efficiency. Executing this
query with scale factor 10 (60M rows in the lineitem table) takes
about 1.5 seconds with the configuration above where 16 CPUs
concurrently perform the scan and condition evaluation.

A single-threaded storage engine is around 20x slower. With more
CPUs available in the LDM threads the parallelisation will be even
higher.

Obviously there are other DBMSs that are focused on analytical
queries that can handle this query even faster, NDB is focused
on online applications with high write scalability and the highest
availability. But we are working to also make query execution of
complex SQL much faster that online applications can analyze
data in real-time.

Query Execution

In the figure below we describe the execution flow for this query. As usual
the query starts with parsing (unless it is a prepared statement) and after
that the query is optimised.

This query is executed as a single range scan against the lineitem table. Scans
are controlled by a TC thread that ensures that all the fragments of the table are
scanned. It is possible to control the parallelism of the query through the
NDB API. In most of the cases the parallelism will be full parallelism. Each thread
has a real-time scheduler and the scan in the LDM threads will be split up into
multiple executions that will be interleaved with execution by other queries
executing in parallel.

This means that in an idle system this query will be able to execute at full speed.
However even if there is lots of other queries going on in parallel the query will
execute almost as fast as long as the CPUs are not overloaded.

In the figure below we also show that control of the scan goes through the TC
thread, but the result row is sent directly from the LDM thread to the NDB API.

In the MySQL Server the NDB storage engine gets the row from the NDB API
and returns it to the MySQL Server for the sum function and result processing.

Query Analysis

The query reads the lineitem table that has about 6M rows in scale
factor 1. It reads them using an index on l_shipdate. The range
consists of 909.455 rows to analyse and of those 114.160 rows are
produced to calculate results of the sum.  In the above configuration
it takes about 0.15 seconds for NDB to execute the query. There are
some limitations to get full use of all CPUs involved even in this
query that is related to batch handling. I will describe this in a
later blog.

Scalability impact

This query is only positively impacted by any type of scaling. The
more fragments the lineitem table is partitioned into, the more
parallelism the query will use. So the only limitation to scaling
is when the sum part starts to become the bottleneck.

Next part

In the next part we will discuss how NDB can parallelise a very
simple 2-way join from the DBT3 benchmark. This is Q12 from
TPC-H that looks like this.

SELECT
        l_shipmode,
        SUM(CASE
                WHEN o_orderpriority = '1-URGENT'
                        OR o_orderpriority = '2-HIGH'
                        THEN 1
                ELSE 0
        END) AS high_line_count,
        SUM(CASE
                WHEN o_orderpriority <> '1-URGENT'
                        AND o_orderpriority <> '2-HIGH'
                        THEN 1
                ELSE 0
        END) AS low_line_count
FROM
        orders,
        lineitem
WHERE
        o_orderkey = l_orderkey
        AND l_shipmode IN ('MAIL', 'SHIP')
        AND l_commitdate < l_receiptdate
        AND l_shipdate < l_commitdate
        AND l_receiptdate >= '1994-01-01'
        AND l_receiptdate < DATE_ADD( '1994-01-01', INTERVAL '1' year)
GROUP BY
        l_shipmode
ORDER BY
        l_shipmode;

This query introduces 3 additional relational algebra operators,
a JOIN operator, a GROUP BY operator and a SORT operator.

Friday, November 15, 2019

What's new in MySQL Cluster 8.0.18

MySQL Cluster 8.0.18 RC2 was released a few weeks back packed with a set of
new interesting things.

One major change we have done is to increase the number of data nodes from 48 to
144. There is also ongoing work to fully support 3 and 4 replicas in MySQL
Cluster 8.0. NDB has actually always been designed to handle up to 4 replicas.
But the test focus have previously been completely focused on 2 replicas. Now we
expanded our test focus to also verify that 3 and 4 replicas work well. This means
that with NDB 8.0 we will be able to confidently support 3 and 4 replicas.
This means that with NDB 8.0 it will be possible to have 48 node
groups with 3 replicas in each node group in one cluster.

The higher number of nodes in a cluster gives the possibility to store even more
data in the cluster. So with 48 node groups it is possible to store 48 TByte of
in-memory data in one NDB Cluster and on top of that one can also have
about 10x more data in disk data columns. Actually we have successfully
managed to load 5 TByte of data into a single node using the DBT2 benchmark,
so with 8.0 we will be able to store a few hundred TBytes of replicated
in-memory and petabytes of data in disk data columns for key-value stores
with high demands on storage space.

Given that we now support so much bigger data sets it is natural that we focus
on the ability to load data at high rates, both into in-memory data and into
disk data columns. For in-memory data this was solved already in 7.6 and
there is even more work in this area ongoing in 8.0.

We also upped one limitation in NDB where 7.6 have a limitation on row sizes
up to 14000 bytes, with NDB 8.0 we can handle 30000 byte row sizes.

Another obvious fact is that with so much data in the cluster it is important to
be able to analyze the data as well. Already in MySQL Cluster 7.2 we
implemented a parallel join operator inside of NDB Cluster available
from the MySQL Server for NDB tables. We made several important
improvements of this in 7.6 and even more has happened in NDB 8.0.

This graph shows the improvement made to TPC-H queries in 7.6 and in
8.0 up until 8.0.18. So chances are good that you will find that some of
your queries will be executed substantially faster in NDB 8.0 compared to
earlier versions. NDB is by design a parallel database machine, so what
we are doing here is ensuring that this parallel database machine can now
also be applied for more real-time analytics. We currently have parallel
filtering, parallel projection and parallel join in the data nodes. With
NDB 8.0 we also get all the new features of MySQL 8.0 that provides a
set of new features in the query processing area.
 The main feature added in 8.0.18 for query processing is the ability to pushdown
join execution of queries where we have conditions of the type t1.a = t2.b.
Previously this was only possible for the columns handled by the choosen index
in the join. Now it can be handled for any condition in the query.

8.0.18 also introduces a first step of improved memory management where the goal
is to make more efficient use of the memory available to NDB data nodes and also
to make configuration a lot simpler.

In NDB 8.0 we have also introduced a parallel backup feature. This means that taking
a backup will be much faster than previously and load will be shared on all LDM threads.

Wednesday, October 23, 2019

Setting up an NDB Cluster in the Oracle Cloud using Auto Installer

In MySQL Cluster 8.0.18 we have developed MySQL Cluster Auto Installer to
also support installing NDB :)

We have made it very easy to setup an NDB Cluster in the Oracle Cloud.
The Auto Installer will take care of installing the proper software, installing
firewalls, installing some supportive software. Most of the testing of this
software have been done against Oracle Cloud using instances with Oracle
Linux 7.

I prepared two Youtube videos to show how it works. The first one gives some
insights into setting up the Compute Instances required in the Oracle Cloud.

Setup compute instances in Oracle Cloud for MySQL Cluster AutoInstaller

The second video uses these compute instances to set up an NDB Cluster.

Setting up an NDB Cluster in the Oracle Cloud using Auto Installer

Tuesday, October 22, 2019

Setting up MySQL Cluster on local machine using AutoInstaller

We have a new version of the MySQL Cluster Auto Installer. I have prepared 3
Youtube videos that shows how to make use of the Auto Installer to install and
set up a cluster on your local machine.

This is my first attempt at making Youtube videos to explain things around
MySQL Cluster.

The Auto Installer is intended as a tool to make it easy to get a cluster up and
running. It is NOT intended for managing a cluster.

The first Youtube video Install MySQL Cluster 8.0.18 on Mac OS X shows how
to install MySQL Cluster 8.0.18 on Mac OS X. This is obviously trivial, so should
be straightforward to do even without the video.

The second Youtube video Starting a local MySQL Cluster using Auto Installer sets
up a small cluster with 1 management server, 2 data nodes and 1 MySQL Server and
explains some details around this.

Using MySQL Cluster AutoInstaller to start development with NDB goes a bit deeper
and also shows how to make use of the cluster and do some trivial operations
through a MySQL client and the NDB management client. It also shows how one can
extend the possible configurations supported by directly manipulating configuration
files deployed by the Auto Installer.

A little note for Mac OS X users with high resolution is that full 4K resolution is
available through Google Chrome, not through Safari.

Friday, April 05, 2019

Manual for dbt2-0.37.50.15, fully automated Sysbench and DBT2 benchmarking with NDB

The link dbt2.0.37.50 manual provides the details of how to use the dbt2-0.37.50 scripts
to execute benchmarks using MySQL Cluster.

These scripts can be used to execute automated test runs of Sysbench, DBT2 and
FlexAsynch. I also use it to start up NDB Clusters to run DBT3 benchmarks and
YCSB benchmarks.

This set of scripts originates from 2006 when I wanted to automate all my benchmark
efforts. The most challenging benchmarks constitute starting more than 100 programs
to work together and using more than 100 machines. This requires automation to
be succesful.

Now running any benchmark is a 1-liner e.g.
./bench_run.sh --default-directory /path/to/dir --init

The preparation to run this benchmark is to place a file called autobench.conf in
/path/to/dir. This contains the configuration of the NDB data nodes, NDB MGM
servers, MySQL Servers and the benchmark programs. Multiple benchmark
programs are supported for Sysbench, DBT2 and flexAsynch.