Naturally the development is never as straightforward as the high-level view looks like. In this blog I'll mention a few of the most important roadblocks on the path to improved performance of MySQL Cluster 7.2 that we met and resolved.
Initially when we increased the number of LQH threads from 4 to 16 we only saw scaling to 8 LQH threads and we saw no scaling in going to 16 LQH threads. This was very puzzling since we don't really have any mutexes that should be an issue. However we looked into the mutexes that we had and managed to decrease the number of conflicts on the send mutexes by a factor of 15. This did however not improve performance at all.
Next we noted using oprofile that there was a few functions that for some reason 50% of the CPU time was spent. This was quite surprising and given that the exactness of those measurements is not always 100%, I was very suspicious about those numbers. Eventually however the reason dawned on me.
The reason was that I had some cache lines that was too often updated. This lead to that some instructions took several microseconds to execute since all threads were serialised on updating this cacheline.
The first such instance was a piece of code used to check whether send buffers were overloaded. In case the send buffer is more than 75% overloaded we start rejecting client requests to ensure that already ongoing requests are able to complete. This is accomplished using a bitmap with one bit per node we're communicating with. This bitmap is obviously global and this was updated every time we made a remote send to another node. This was obviously quite unnecessary to update it every time, it's enough to update when the state changes, so a simple if-statement resolved that problem.
The next problem was even harder to understand how it could be an issue. It turned out that the problem resided in our crash information subsystem. We have a macro called jam() (Jump Address Memory, an acronym we inherited from the AXE system once upon a time). This macro inserts the line number we're currently executing together with sometimes the block number we're executing. When there was only one thread in the MySQL Cluster data nodes then this data structure was global and shared by all others.
With the move to multithreaded architecture we changed this to be a data structure per thread such that we can get detailed information on each thread what it did before any crash.
Most blocks are only executing in one thread and was fairly straightforward to change this. However in one case we have a code path which is used by all TC threads to ask the distribution handlers which nodes and threads that contain the data for a certain partition of the MySQL Cluster. The distribution handler thus is one block called from many threads and thus we needed to use different jam's dependent on which thread that called this function. When this wasn't done then this code showed up as another bottleneck since many threads tried to update the same cachelines again.
With those fixes we were able to reach very good numbers on a single node with up to 16 LQH threads. However we saw that the risk of getting out of send buffer memory had severely increased due to the great increase of threads in the data node. The threads communicate using a lock-free scheme, however this means that there needs to be dedicated memory available for each two threads that communicate. Also the code doesn't always use the send buffer memory in the most efficient manner to speed up communication. This meant that we needed to do something about send buffer memory handling in order to make the data nodes as stable as before. We found three points in the code where we needed to pack send buffer memory, non of these were part of the normal code path but were vital to ensure that we packed things in cases when we got close to run out of send buffer memory. We also went through all send buffer memory configuration defaults and parameters and made them more appropriate to also handle larger data node configurations.
As a final step towards getting single node performance working really good we also made sure that all data structures that were global were properly aligned on cacheline sizes.
Putting the code to the test in a distributed environment also revealed a few new points to handle. At first we discovered that the free list of connections to the data node in an API had an interesting impact on the balance of the use of TC threads. For some reason, still unclear exactly how, a LIFO queue here had the impact that we used some TC threads up to 10x more than other TC threads which obviously made for very bad scalability with many TC threads. The solution was simple however, a quick change to a FIFO queue and the problem was no longer there.
The next problem was yet one more imbalance, this time the imbalance was on LQH threads. The imbalance only showed up on very large clusters. This time the imbalance came from the manner in which we distribute rows into partitions. In order to make on-line reorganisation of tables very efficient we divide the table into a number of virtual partitions using a hashmap, then the virtual partitions are mapped to a real partition. Previously the hashmap always created 240 virtual partitions which was quite sufficient with 4 LQH threads, but not when moving to 16 LQH threads. So we changed to using 3840 virtual partitions instead.
Actually the choice of 240 and 3840 is intricate here. 3840 is equal to 2 * 2 * 2 * 2 * 2 * 2 * 2 * 2 * 3 * 5. This means that if using 12 LQH threads (2 * 2 * 3) then the number of nodes in the cluster should either be on the form 2**n or 5 * 2**n. If not things will still work fine, but load will be slightly unbalanced since the real partitions will contain different numbers of virtual partitions. Uisng 16 LQH partitions the numbers of nodes should be on either of the forms 2**n, 3*2**n, 5*2**n or 3*5*2**n where n is less than or equal to 4. Thus we get from this calculation that 30 nodes is better than 32 nodes if using 16 LQH threads since it brings about a more even distribution of rows in the cluster.
With these changes the performance of reads in a distributed environment was quite good and was providing very good scalability. However updates still caused issues.
The problem for updates was that the receiver thread handling signals between the nodes in the same node group was overloaded. There was simply too many signals to handle. Much of this was due to some work that was left as a TODO after the 7.0 release since it wasn't an issue in 7.0, but now with the higher throughput it has become an issue.
So the solution was to properly implement packing of signals such that several commit messages were packed together and similarly for the commit acknowledge from the API. These straightforward changes decreased the load on the receiver thread to a third and made it possible to push through 1.5M updates per second per node group. It is still possible that this becomes a bottleneck but it should be extremely unusual for a real-world application to reach this state.
With all these changes implemented we managed to scale update and read performance linearly up to 30 nodes.