James Baker tech blog

SIMD accelerated sorting in Java - how it works and why it was 3x faster

In this post I explain a little about how to use Java’s Vector APIs, attempt to explain how they turn out fast, and then use them to implement a sorting algorithm 3x faster than Arrays.sort. I then explain some problems I found, and how I resolved them. Supporting code is published here.

I’m an occasional reader of Daniel Lemire’s blog. Many of his posts follow a standard pattern, where he takes some standard operation (json parsing, bitsets, existence filters, sorting, integer encoding), reforms it using vector operations and makes it vastly faster due to increasing the data level parallelism. It’s a fun read, and the numbers are scary high (most of us can only dream of writing code that processes 1GB/sec with a single core, let alone doing something useful in that time). Especially not in Java, where scalar code has usually been the only practical option in most cases.

There’s an ongoing OpenJDK project called Project Panama which has been working on improving interop between the JVM and more ‘native’ APIs. One of their subprojects has focused on providing vectorized intrinsics within the JVM, and I decided to test out these APIs by trying to use them to implement sorting on integer arrays. Given there was a recent Google blogpost on a similar topic, it seems like a good comparison point.

Mile high view of Java’s Vector API

Java’s Vector API is currently distributed as a preview with recent versions of Java. To use it, you must append --enable-preview --add-modules jdk.incubator.vector to your java and javac commands.

With the exception of char, Java supports each numerical primitive type as a vector (byte, short, int, long, float, double), with the corresponding vector type being *Vector, like IntVector. In order to use vectors, one must choose a ‘species’, which represents the width. Java presently supports up to 512 bit vectors, so one might choose a specific width like IntVector.SPECIES_256, the maximum with IntVector.SPECIES_MAX or the maximum hardware accelerated size with IntVector.SPECIES_PREFERRED.

One can then use code like the below to process data.

static final VectorSpecies<Integer> SPECIES = IntVector.SPECIES_256;

void multiply(int[] array, int by) {
    int i = 0;
    int bound = SPECIES.loopBound(array.length);
    IntVector byVector = IntVector.broadcast(SPECIES, by);
    for (; i < bound; i += SPECIES.length()) {
        IntVector vec = IntVector.fromArray(SPECIES, array, i);
        IntVector multiplied = vec.mul(byVector);
        multiplied.intoArray(array, i);
    }
    for (; i < array.length; i++) {
        array[i] *= by;
    }
}

There are two other useful primitives. The first is VectorMask which you can think of as an array of booleans. Many of the methods can take masks to switch on or off behaviour on specific vector lanes, and predicates return masks.

VectorSpecies<Integer> species = IntVector.SPECIES_64;
IntVector vector = IntVector.broadcast(species, 1);
VectorMask<Integer> mask = VectorMask.ofValues(species, true, false);
IntVector result = vector.mul(0, mask);
assertThat(result).isEqualTo(IntVector.fromValues(species, 0, 1));

The second other useful type is VectorShuffle. This can be used to rearrange vector lanes. For example:

VectorSpecies<Integer> species = IntVector.SPECIES_128;
IntVector vector = IntVector.fromArray(species, new int[] {12, 34, 56, 78});
assertThat(vector.rearrange(VectorShuffle.fromValues(species, 4, 3, 2, 1))).isEqualTo(IntVector.fromArray(species, new int[] {78, 56, 34, 12}));

How it works (well)

The benefit of SIMD code is that we can process more data within a single instruction; for example with 256 bit instructions we can perhaps process 8 ints per cycle instead of 1. In the multiply example above, we have a vectorized loop and a scalar loop. If the two loops are identical except in the scalar case we see an x86 MUL instruction being run and in the vectorized case we see a PMULQ instruction being run, we might expect to see the full benefit of the vectorization. However, the loop is represented as a number of new Java objects, and if we have to run even only 8 additional instructions to initialize the new objects, we’ve given away our advantage. Java’s vector APIs put a lot of effort into working well in this regard, and rely heavily on the following optimizations.

Inlining

Inlining is a simple, easy to understand compiler technique that can dramatically improve performance.

What is inlining?

When we inline a method, we are replacing a function call to that method with the definition of the function we are calling.

boolean notInlined(String string) {
    return string.isEmpty() || string.length() == 1;
}

boolean inlined(String string) {
    return string.value.length == 0 || string.value.length == 1;
}

When does Java inline functions?

There are a number of heuristics within Java that decide whether or not a function should be inlined. Some are as follows:

  • If a method has thrown many times, it is inlined. Presumably this is because the try-catch loop can be optimized.
  • If a method A calls a method B many times more frequently (default 20x) than the outer method is called, it is inlined unless its definition is very large (> 335 JVM opcodes).
  • If a method is very small (<=35 opcodes) it is inlined.

Virtual methods make this all harder, but various other analyses help with this.

There is also a magic Java-internal annotation that helps with inlining; @ForceInline. If this annotation is applied, and the class it is applied to comes from the ‘boot classloader’ (your application does not), then the data will be inlined.

Why is it so much better when a function can be inlined?

There are two key improvements that inlining can lead to. The first is that it avoids the function call overhead. Calling a function is cheap but not free because various book-keeping must be done (flushing registers to the stack, moving the program counter to the other function, restoring when it completes, etc). Once we’ve inlined a function, these costs disappear.

The second improvement is arguably far more important (and is why inlining is sometimes referred to as ‘the mother of all optimizations’) - because we’ve inlined the function, the compiler possibly has more information, enabling further optimizations. For example, in the above case in psuedo-Java, the method ‘inlined’ is equivalent to:

boolean inlined(String string) {
    if (string == null) {
        throw new NullPointerException();
    }
    if (arrayLength(string.value) == 0) { // let's assume the compiler knows that String.value is never null, because the constructors never let it be so.
        return true;
    }
    if (arrayLength(string.value) == 1) {
        return true;
    }
    return false;
}

In this instance, the compiler will likely be able to intuit that arrayLength will not change between the two calls. It will likely also know that an array length cannot be less than 0. The compiler therefore may execute this code as:

boolean inlined(String string) {
    if (string == null) {
        throw new NullPointerException();
    }
    if (arrayLength(string.value) <= 1) {
        return true;
    }
    return false;
}

which ensures that not only is the function call overhead avoided, but also that the method is simpler. This optimization is used all over the place; null checks can be avoided when it’s known that the value is not null, shared parts of methods can avoid being re-executed, etc.

Escape analysis and scalar replacement

Escape analysis is a compiler technique that can be used to gain information about the lifetimes of objects. When we create an object inside a function, its lifetime is either shorter than the scope of the function, or longer. If it is possibly longer, we say that it escapes. If we know that an object does not escape, there are other optimizations we can do. One that Java performs is called ‘scalar replacement’ - roughly, it can entirely avoid creating the object and instead allocate any containing fields as stack variables.

For example, these two methods are semantically identical, and might be compiled as such.

Optional<String> withoutScalarReplacement(String someString) {
    return Optional.ofNullable(someString).map(String::trim).filter(String::isEmpty);
}

Optional<String> withScalarReplacement(String someString) {
    if (someString == null) {
        return Optional.empty();
    }
    String trimmed = someString.trim();
    if (trimmed == null) {
        return Optional.empty();
    }
    if (!trimmed.isEmpty()) {
        return Optional.empty();
    }
    return Optional.of(trimmed);
}

In this example, we are able to fully avoid allocating two different intermediate Optional objects. Scalar replacement can mostly only be useful when inlining occurs. For example, in the above, Optional.ofNullable possibly allocates an Optional, which escapes. The implementation cannot avoid this allocation. If it is inlined, the object may no longer escape and therefore the scalar replacement may be viable.

Intrinsics

In the Java standard library, many methods have a pure-Java implementation, and an ‘intrinsic’ implementation. Sometimes the intrinsic might be pure C++, sometimes assembly. The rough idea is that certain methods are called so frequently in Java or are so important to be fast, that we shouldn’t rely on the compiler producing the right code; the JVM should contain a native implementation that is known to have the right properties that is to be used instead. When these methods are compiled, the intrinsic is substituted in. The availability of an intrinsic can be denoted via the @IntrinsicCandidate annotation.

How Java’s Vector API uses these tools

Each IntVector object in Java contains an int[] which represents the vector. There are pure Java implementations of each of the vector operations - check IntVector.binaryOperations(int) for the definitions of the binary lanewise operations. However, these are designed to be only a fallback - they are primarily called by methods like VectorSupport.binaryOp which has the magic annotation @IntrinsicCandidate. In other words, if these vectorized methods are run on a system with relevant SIMD functionality (e.g. when AVX2 or NEON is supported), then they will be replaced with native implementations.

Furthermore, all of the relevant vector methods (fromArray, intoArray, mul, rearrange, …) have @ForceInline applied to them, so the definitions will be dropped directly into the code that calls them, enabling the scalar replacement. At this point, the fake int[] buffer can also be dropped at the time of register allocation, and made a stack variable, in the same way as other primitives.

In conclusion, when we use the vector API, everything gets inlined. This gives the compiler the most it can have to work with so as to remove enough function call overhead to actually compile a vector operation to only a couple of instructions. Practically speaking, you end up with code that looks like it creates many objects, but actually compiles to code that allocates no memory. It is extremely disconcerting.

Quicksort using IntVectors

For purposes of this exercise, I’m going to expend my effort primarily on the operation of partitioning. I select the pivots by sampling 8 elements and using their median. I also decided to not go down the path of empirically trying to find patterns, because my goal is to exercise the SIMD operations and not to build a production sorting algorithm.

A simple in-place quicksort algorithm can be summarized as follows:

void quicksort(int[] array, int from, int to) {
    int size = to - from;
    if (size <= 1) {
        return;
    }
    int pivot = partition(array, from, to);
    quicksort(array, from, pivot);
    quicksort(array, pivot + 1, to);
}

int partition(int[] array, int from, int to) {
    int pivot = array[to - 1];
    int boundary = partition(array, pivot, from, to - 1);
    int temp = array[boundary];
    array[to - 1] = temp;
    array[boundary] = pivot;
    return boundary;
}

int partition(int[] array, int pivot, int lowerPointer, int upperPointer) {
    while (lowerPointer < upperPointer) {
        if (array[lowerPointer] < pivot) {
            lowerPointer++;
        } else if (array[upperPointer - 1] >= pivot) {
            upperPointer--;
        } else {
            int tmp = array[lowerPointer];
            int index = --upperPointer;
            array[lowerPointer++] = array[index];
            array[index] = tmp;
        }
    }
    return lowerPointer;
}

For purposes of vectorizing, I’m primarily looking at reimplementing the ‘partition’ method. I also implement a special sorting function for regions <= 8 elements using a sorting network.

Vectorizing the partition method

We’re trying to replace the partition method. The purpose is to reorder a list of items such that it contains a contiguous region of items less than the pivot argument, followed by a contiguous region of items greater than the pivot argument. If we can partition a single vector, we can expand this to the whole array.

Partitioning a single 8-lane vector

There is an AVX512 instruction which is absolutely perfect for this specific task. It’s called VPCOMPRESSQ and development versions of Java expose it as .compress(), like:

IntVector vec;
VectorMask<Integer> lessThan = vec.lt(pivot);
IntVector lt = vec.compress(lessThan);
IntVector gte = vec.compress(lessThan.not());

My computer does not have AVX512 and without hardware acceleration it executes incredibly slowly (>20ns rather than the 0.25ns I’d expect). So, we must work around! To work around, we can leverage the fact that for any mask, there is a permutation of the lanes which leads to all the ‘trues’ in the mask appearing before all the ‘falses’. For example, if the mask reads FFFFFFFT, such a permutation might be [7, 1, 2, 3, 4, 5, 6, 0]. We can create such a mask using the following code:

VectorShuffle<Integer> partitioner(VectorMask<Integer> vectorMask) {
    VectorSpecies<Integer> species = vectorMask.species();
    int numTrues = vectorMask.trueCount();

    int mask = (int) vectorMask.toLong(); // bit 2^n is set to true iff lane n is set in the mask.
    int[] ret = new int[species.length()];
    int unmatchedIndex = numTrues;
    int matchedIndex = 0;
    for (int i = 0; i < species.length(); i++) {
        if ((mask & 1) == 0) {
            ret[unmatchedIndex++] = i;
        } else {
            ret[matchedIndex++] = i;
        }
        mask >>= 1;
    }
    return VectorShuffle.fromValues(species, ret);
}

We have an 8-lane vector, so the total possible number of such shuffles is 2^8 = 256. Each shuffle is conceptually 8 bytes, and so cumulatively this would need around 1kB, probably around 4kB with overheads, easily small enough to fit in L1 cache as a lookup table.

We can therefore implement a similar partitioning process using:

IntVector vec;
VectorMask<Integer> lessThan = vec.lt(pivot);
IntVector rearranged = vec.rearrange(partitions[(int) lessThan.toLong()]);

Partitioning two partitioned 8-lane vectors

n.b. this approach can also be used to partition a single 16 lane vector without a giant lookup table.

If we have two partitioned vectors and know how many elements less than the pivot each contains, we can easily merge them into a single partitioned vector.

IntVector vec1;
int numTrues1;
IntVector vec2;
int numTrues2;

IntVector rotatedVec2 = vec2.rearrange(rotate(numTrues1));
VectorMask<Integer> mask = VectorMask.fromLong(species, (1L << numTrues1) - 1).not();
IntVector left = vec1.blend(rotatedVec2, mask); // blend takes the elements that the mask allows from the argument and the others from the base vector
IntVector right = rotatedVec2.blend(vec1, mask);

Extending partitioning 16 elements to a whole array

The best approach I was able to come up here is one where I tried to avoid doing any unnecessary vector operations. From the left of the array, I read two vectors and partition them. At this point (example has 8 lanes), inside them there are either >8 (1) or <=8 (2) elements that are less than the pivot.

In case 1, every element in the left vector is less than the pivot. We can write it back into the array and increment the from pointer like in the simplified algorithm above. Some elements in the right array are less than and some are greater than the pivot. In case 2, we have the opposite - all elements on the right side are greater than the pivot, and so we swap that vector to the back and continue. Special casing the case where we can process two vectors at a time did not improve performance. This can be thought of as being very similar to the scalar algorithm.

IntVector compareTo = IntVector.broadcast(SPECIES, pivot);

IntVector cachedVector;
int cachedNumTrues;
boolean initialize = true;
while (index + SPECIES.length() < upperBound) {
    if (initialize) {
        initialize = false;
        // load our first vector and partition it
        cachedVector = IntVector.fromArray(SPECIES, array, index);
        VectorMask<Integer> cachedMask = cachedVector.lt(compareTo);
        cachedVector = cachedVector.rearrange(IntVectorizedQuickSort.compress(cachedMask));
        cachedNumTrues = cachedMask.trueCount();
    }

    // partition our second vector
    int index2 = index + SPECIES.length();
    IntVector vector2 = IntVector.fromArray(SPECIES, array, index2);
    VectorMask<Integer> mask2 = vector2.lt(compareTo);
    int numTrues2 = mask2.trueCount();
    IntVector rearranged2 = vector2.rearrange(IntVectorizedQuickSort.compress(mask2));

    // merge our two partitioned vectors
    VectorMask<Integer> mask = IntVectorizedQuickSort.lowerOverflowMask(cachedNumTrues);
    IntVector rotated = rearranged2.rearrange(IntVectorizedQuickSort.rotateRight(cachedNumTrues));
    IntVector merged1 = cachedVector.blend(rotated, mask);
    IntVector merged2 = rotated.blend(cachedVector, mask);

    int totalTrues = cachedNumTrues + numTrues2;
    if (totalTrues < SPECIES.length()) { // merged2 contains only elements greater than the pivot. Swap it to the end of the array and continue
        cachedVector = merged1;
        cachedNumTrues = totalTrues;
        upperBound -= SPECIES.length();
        IntVector newData = IntVector.fromArray(SPECIES, array, upperBound);
        newData.intoArray(array, index2);
        merged2.intoArray(array, upperBound);
    } else { // merged1 contains only elements less than the pivot. Move forwards
        cachedVector = merged2;
        cachedNumTrues = totalTrues - SPECIES.length();
        merged1.intoArray(array, index);
        index += SPECIES.length();
    }

The rest is just edge cases.

What about a non-in-place version?

The sorting algorithm I describe above is an in-place algorithm, so it requires no additional memory. If we allow ourselves O(n) extra memory, we have a simpler algorithm. Here, the critical loop looks like:

int leftOffset = from;
int rightOffset = 0;
for (int i = from; i < upperBound; i += SPECIES.length()) {
    IntVector vec = IntVector.fromArray(SPECIES, array, i);
    VectorMask<Integer> mask = compareTo.compare(VectorOperators.GT, vec);
    IntVector matching = compress(mask, vec);
    IntVector notMatching = reverse(vec);
    matching.intoArray(array, leftOffset);
    notMatching.intoArray(buffer, rightOffset);
    int matchCount = mask.trueCount();
    leftOffset += matchCount;
    rightOffset += SPECIES.length() - matchCount;
}

In words, we’re moving the lower elements into their final position and the upper elements into a buffer. Once we’re done processing, we can copy the upper elements into their appropriate place. This algorithm is somewhat faster, to the tune of 10 cycles per iteration (~1 cycle per int per partitioning operation), although it appears to be much faster only on larger arrays.

What’s performance like?

Great! YMMV, but on my Zen3 desktop (using AVX2 with no AVX512 functionality), I saw the following results:

Benchmark               (length)   Mode  Cnt           Score          Error  Units
jdk              8  thrpt    5   170714899.546 ±   1763875.997  ops/s
jdk             10  thrpt    5   154264407.313 ±    500819.241  ops/s
jdk            100  thrpt    5    69536931.152 ±   5248268.260  ops/s
jdk           1000  thrpt    5    46402055.294 ±    174077.215  ops/s
jdk          10000  thrpt    5    34134138.728 ±    188221.976  ops/s
jdk         100000  thrpt    5    28013585.553 ±     49562.094  ops/s
jdk        1000000  thrpt    5    23439276.552 ±     21704.859  ops/s
buffered         8  thrpt    5  2426909167.588 ±  14904754.115  ops/s
buffered        10  thrpt    5   200646786.158 ±  46948027.100  ops/s
buffered       100  thrpt    5   107955736.666 ±    968960.902  ops/s
buffered      1000  thrpt    5    89037645.967 ±   1329267.539  ops/s
buffered     10000  thrpt    5    79148716.685 ±    105261.243  ops/s
buffered    100000  thrpt    5    70237129.982 ±    149517.060  ops/s
buffered   1000000  thrpt    5    65537322.850 ±   3437058.994  ops/s
inplace          8  thrpt    5  2507547203.755 ± 383141136.258  ops/s
inplace         10  thrpt    5   215032676.207 ±    320366.428  ops/s
inplace        100  thrpt    5   114147458.514 ±    843670.046  ops/s
inplace       1000  thrpt    5    77580105.869 ±  10698901.290  ops/s
inplace      10000  thrpt    5    66624691.684 ±    197844.229  ops/s
inplace     100000  thrpt    5    54046743.796 ±   4568646.180  ops/s
inplace    1000000  thrpt    5    41139103.464 ±    464187.831  ops/s
hybrid           8  thrpt    5  2495687921.669 ± 186468040.821  ops/s
hybrid          10  thrpt    5   227652656.154 ±    382682.821  ops/s
hybrid         100  thrpt    5   108415081.740 ±    274307.588  ops/s
hybrid        1000  thrpt    5    92006274.816 ±    389174.105  ops/s
hybrid       10000  thrpt    5    82193216.008 ±     84009.870  ops/s
hybrid      100000  thrpt    5    74267051.943 ±   9385926.923  ops/s
hybrid     1000000  thrpt    5    69021089.102 ±   7740152.825  ops/s

The units are ‘ints sorted per second’, so 10M with a length of 1M means that we can sort 10 arrays per second. Here, jdk is the standard Arrays.sort whereas inplace uses solely in-place partitioning, buffered prioritizes sequential access at the cost of more memory moves, and hybrid uses buffered for larger partition operations, inplace for smaller ones. There’s a nice SIMD speedup at every level, with large arrays in the region of 3x faster than the JDK method and a 2x improvement on all but the smallest arrays.

Problems

Programming against the vector APIs without shooting myself in the foot was a huge challenge. Getting to the scores achieved required me to be comfortable with a variety of tools including:

  1. Java Microbenchmarking Harness (JMH) (used to run benchmarks).
  2. Java Flight Recorder/Java Mission Control (used to check for any stray memory allocations from IntVector objects that weren’t optimized away).
  3. Linux Perf (to check if my improvements actually led to the number of cycles dropping).
  4. Java’s ability to print its compiler output as assembly (to see what’s actually going on under the hood).

This took a while - it took me many iterations to get the algorithm to good, I learned a bunch about what works and what I found challenging and feel that I know how Java works considerably better.

Inlining can optimize code, but not memory layouts!

Our single vector partition operation ends up with this chunk:

v1 = v1.rearrange(compressions[(int) mask1.toLong()]);

If we unbox all the object references, it looks more like:

v1 = v1.rearrange(compressions[(int) mask1.toLong()].array)

so we have a non-ideal pointer follow. Worse, the mask is stored in memory as a byte array, but in order for the rearrange instruction (vpermd) to properly operate, the mask must be encoded as an 8-element vector, too, which means it must be reformatted.

So the sequence of instructions we end up with looks like

movabsq ; load the reference to the mask stored in compressions
vmovq ; load the (64-bit) shuffle into a 128 bit register
vpmovzxbd ; expand the 8-bit numbers into the 256 bit register, ready for the permutation
vpermd ; actually run the permutation operation

The part that is sub-optimal is the vpmovzxbd instruction. This instruction takes 4 cycles on my processor. If we could store the bytes pre-unpacked in RAM (since the overall amount of memory is small) we could have the instructions look more like:

vmovdqu ; just straight up load the relevant offsets
vpermd ;

My suspicion is that the vector APIs are probably more optimized towards numerical operations like matrix multiplication, and less towards integer bit twiddling type approaches, which is why we see lookup tables underperform. My suspicion was that a VectorShuffleList or VectorMaskList could be helpful for this purpose. To test this approach I modified the JDK to add a new VectorOperators.PERM which does not require the boxing. Hacky, yes, but required only 48 lines of diff to prove the concept.

The performance improvement varied. In the tightest loops, there was very little change. In these cases, the instruction count went down considerably, but the cycle count did not change, indicating the processor simply picking up slack. When ILP is already high and we’re bound by execution ports, we see a much bigger difference. When applied to in-place sorting, the performance difference was around 10% overall, which is a significant number of cycles.

I had the same problem with masks, but it turned out to be simple to recompute the mask on each iteration.

Length/validity checks slow everything down (especially vector shuffles)

There’s a parallel between this issue and the previous one. As is usual in Java, various validity checks occur at runtime. In usual cases, solid CPU engineering (superscalar execution, branch prediction, etc) means that these checks do not massively affect runtime. However, shuffles have extra validity checks which are always performed. Specifically, running

vector.rearrange(someShuffle);

is equivalent to

checkState(!someShuffle.toVector().lt(0).anyTrue()); // make sure that none of the shuffle indices are 'exceptional' aka negative.
vector.rearrange(someShuffle);

and this is expensive in a tight loop because it adds a non-trivial number of instructions. In fact, the cost is serious enough that there is a system property (jdk.incubator.vector.VECTOR_ACCESS_OOB_CHECK) which when set to 0 disables these checks. With the checks, it takes an average of 12.14 cycles and 50 instructions to partition a single vector. Without, it takes 7.7 cycles and 32 instructions. Unfortunately, disabling the checks is unsafe; out of bounds accesses can trigger segfaults.

This is another example where the type hierarchy used hurts the implementation. With a shufflelist or similar, the validation could occur at construction time.

@ForceInline for thee but not for me?

For the small cases (<=8 elements) I implemented a sorting network using a single vector. As a part of this implementation, I implemented a ‘compare and exchange’ method as follows.

IntVector compareAndExchange(IntVector vector, VectorShuffle<Integer> shuffle, VectorMask<Integer> mask) {
    IntVector vector1 = vector.rearrange(shuffle);
    return vector.min(vector1).blend(vector.max(vector1), mask);
}

Here, the shuffle and mask have been precomputed - shuffle connects the lanes as necessary, and mask selects the lower or higher outputs as necessary, leading to a method that might look like:

static void sort16(int[] array, int from) {
    IntVector v0 = IntVector.fromArray(SPECIES_16, array, from);
    IntVector v1 = compareAndExchange(v0, COMPARISON_16_1_S, COMPARISON_16_1_M);
    IntVector v2 = compareAndExchange(v1, COMPARISON_16_2_S, COMPARISON_16_2_M);
    IntVector v3 = compareAndExchange(v2, COMPARISON_16_3_S, COMPARISON_16_3_M);
    IntVector v4 = compareAndExchange(v3, COMPARISON_16_4_S, COMPARISON_16_4_M);
    IntVector v5 = compareAndExchange(v4, COMPARISON_16_5_S, COMPARISON_16_5_M);
    IntVector v6 = compareAndExchange(v5, COMPARISON_16_6_S, COMPARISON_16_6_M);
    IntVector v7 = compareAndExchange(v6, COMPARISON_16_7_S, COMPARISON_16_7_M);
    IntVector v8 = compareAndExchange(v7, COMPARISON_16_8_S, COMPARISON_16_8_M);
    IntVector v9 = compareAndExchange(v8, COMPARISON_16_9_S, COMPARISON_16_9_M);
    IntVector v10 = compareAndExchange(v9, COMPARISON_16_10_S, COMPARISON_16_10_M);
    v10.intoArray(array, from);
}

I found performance of this strangely poor until I discovered the reason - the compareAndExchange method that I’d written wasn’t being inlined, and so the IntVector objects could not be optimized away. Each run of this method creates many objects, many int arrays, and in general it just causes a lot of garbage collector pressure due to how hot the method was. Inlining compareAndExchange manually worked and restored good performance, but is ugly.

In general what I’d be concerned about is that without some mechanism of doing so, creating performant reusable code is very difficult because there’s no clear way to combine code and ensure it’s properly inlined at the user level without very carefully testing it. Even with the manual inlining, very occasionally Java will allocate objects on the very last step of the sorting network.

My suspicion is that it would be easier to use these APIs performantly if the inlining threshold were higher when vector methods are used.

Masking loads and stores added memory allocations

In the above ‘sort16’ implementation there is no ‘to’ - ideally this would allow us to sort up to 16 elements. What I discovered that this was a risky proposition because again it became an easy deoptimization. If you are trying to load a smaller number of elements into a vector, you can do something like:

VectorMask<Integer> mask = VectorMask.fromValues(SPECIES, true, false, false, false);
int[] array = new int[] {1, 2, 3, 4};
IntVector vector = IntVector.fromArray(SPECIES, array, 3, mask);
assertThat(vector).isEqualTo(IntVector.fromArray(SPECIES, new int[] {4, 0, 0, 0}));

This code works, but behind the scenes this will allocate an int array (reading the code, it looks like the mask’s validity is checked by creating a mask, and this is the culprit of the allocations), increasing GC overhead. In the end I observed that in quicksort, if you’re trying to sort elements 3 to 5, it’s equally safe to sort 3 to 11 because of the recursive nature of partitioning, and so I just oversort (and use Arrays.sort on the last vector in the array instead of the sorting network).

It was entirely non-obvious that this might occur, and again, this was disconcerting because the code change required to fix the method seemed so minor. There’s so much scalar replacement happening when you use these APIs that small permutations that break the optimization make things a lot more challenging.

SPECIES_PREFERRED considered harmful?

I primarily tested on my modern desktop, which supports AVX2. At a late stage, I wanted to see how well the code would work with an AVX512 server. I used GCP and requested an Ice Lake architecture processor. The performance crashed; using 512 bit vectors in any part of the code appeared to cause the throughput to drop by a factor of 3. The purpose of SPECIES_PREFERRED seems to be to let the user pick the widest species with hardware acceleration and not have to specialize their code for each specific architecture, but (and I didn’t dig in enough here) if derating causes performance to dive bomb that much, it becomes harder to author the code reliably.

Conclusion

Google recently published a library for running vectorized sorting in C++. This was measured at around 800MB/sec on an older Xeon processor. My processor is probably faster, and could handle around 300MB/sec on the same size arrays as compared to 100MB/sec from the standard library. Clearly there is a gap, but I think this is a reasonable outcome. Java’s vector support is impressive, the implementation is interesting, and generally I had fun. I think there’s some room for improvement with regards to some runtime checks, errant memory allocations, and lookup table support, but this is what preview apis are for! Code can be found here.

Appendix: Why might we want faster sorting?

Here’s one simple example I encountered recently. It’s common in columnar database code to ‘dictionary encode’ lists of values. Here, for some subset of the stored data, we store a dictionary containing the unique values that exist, assigning unique indices to each value, and we store a list of pointers into the list to represent each element.

record DictionaryEncoded<T>(List<T> dictionary, int[] ordinals) {}

DictionaryEncoded<T> encode(List<T> buffer) {
    Map<T, Integer> itemToIndex = new HashMap<>();
    List<T> indexToItem = new ArrayList<>();
    // this need not be an int[] type - if the dictionary ends up having fewer than 256 elements, it could be a byte[]! There are lots of encoding schemes that help here, e.g. https://github.com/lemire/JavaFastPFOR
    int[] ordinals = new int[buffer.size()];
    for (int i = 0; i < ordinals.length; i++) {
        ordinals[i] = itemToIndex.computeIfAbsent(element, key -> {
            int index = indexToItem.size();
            indexToItem.add(key);
            return index;
        });
    }
    return new DictionaryEncoded<>(indexToItem, ordinals);
}

<T> T get(int index, DictionaryEncoded<T> encoded) {
    return encoded.dictionary().get(encoded.ordinals()[index]);
}

This kind of format is used by many data systems - open source projects like Apache Arrow, Parquet, Lucene, closed source projects like Vertica all contain a model somewhat like this, since it has advantages for both storage and querying. On the storage side, it’s a cheap way to reduce the amount of data stored; in my example above the worst case additional memory usage amortizes to 4 bytes per element; in the case where we have a single large element being repeated over and over, it could replace kilobytes with those same 4 bytes. This usually improves both storage cost and query performance (since databases are usually I/O bottlenecked). Further, some queries may no longer require scanning all of the data (for example a query to select the max element need only scan the dictionary). Conversely, if one is able to compute a global dictionary, if the column is accessed for a join, we could join the values column and ignore the dictionary entirely. If the dictionary represents large strings, this might dramatically improve performance because the comparison operation may go from hundreds of cycles to 1.

Unfortunately, there can be some downsides. The key to high performing data processing code is usually to do as many sequential reads as possible, minimizing the amount of random memory access to regions that don’t fit in the L1 or L2 caches. This is because uncached memory accesses are far slower than a processor can handle data; an L1 cache read might be 0.5ns, whereas a main memory read might be 100ns, whereas a disk read might be into the milliseconds. In some cases, practitioners may limit the sizes of their dictionaries to compensate for this, so as to very rarely result in a main memory or disk read, whereas in others the cost may simply be eaten as an unlikely worst case outcome.

In this latter case, suppose we need to read a subset of the values, but in a way where we don’t care about the order. An example of the pattern might be:

class NaiveMaxAccumulator {
    final List<T> dictionary;
    final Ordering<T> ordering;

    T max;


    void accumulate(int ordinal) {
        if (max = null) {
            max = dictionary.get(ordinal);
        } else {
            max = ordering.max(max, dictionary.get(ordinal));
        }
    }

    T getValue() {
        return checkNotNull(max);
    }
}

where some other code is responsible for passing us those ordinals which are a part of the query.

We could perhaps improve this code by writing something more like

class DeduplicatingMaxAccumulator {
    final List<T> dictionary;
    final Ordering<T> ordering;

    final int[] buffer = new int[1 << 16];
    int bufferSize = 0;

    T max;

    void accumulate(int ordinal) {
        if (bufferSize == buffer.length) {
            flush();
        }
        buffer[bufferSize++] = ordinal;
    }

    T getValue() {
        flush();
        return checkNotNull(max);
    }

    void flush() {
        Arrays.sort(buffer, 0, bufferSize);
        int lastOrdinal = -1;
        for (int i = 0; i < bufferSize; i++) {
            int ordinal = buffer[i];
            if (ordinal != lastOrdinal) {
                if (max == null) {
                    max = dictionary.get(ordinal);
                } else {
                    max = ordering.max(max, dictionary.get(ordinal));
                }
                lastOrdinal = ordinal;
            }
        }
        bufferSize = 0;
    }
}

This approach gives us two potential advantages. The first is that in the naive approach, the comparison is run linearly with the number of elements, whereas the improved approach runs it linearly with the number of unique elements. If the comparison function is particularly expensive (as it might be for strings or other more complex datatypes) this might be a considerable percentage of the overall time. The second advantage is that by sorting the ordinals, we can benefit from spatial locality of reference. When we issue a small read to a storage volume, it’s common for the operating system to load a larger chunk of data and cache it (e.g. if we try to read 16 bytes from our app, it might read 4096 and keep it around). If we are accessing ordinals 0, 2, 5, 9, 11 and we have laid out our data linearly, these will likely be located on the same operating system pages, reducing cost. If we are accessing ordinals 0, 1000000, 2000000, 300000, 45012, etc etc, randomly jumping around, we have no hope of achieving this.

Practically speaking, I’ve seen this tactic lead to an order of magnitude jump. However, it’s not a pure improvement - for each element we process, we are definitely adding the amortized cost of sorting that element, and possibly removing the costs of loading and comparing the element, or otherwise possibly reducing the cost of loading the element. If these are only sometimes true (for example, if loading is cheap but comparing is expensive, if the dictionary is very large we will lose out, if the dictionary is very small but the comparison is very cheap we might also lose out).

On my computer, Java’s Arrays.sort method costs around 50ns/element in the real world. This means that we unfortunately have a lot of scope to cause ourselves performance problems by using this approach, because in some cases the cost of sorting will dominate the cost of processing. If we could sort at more like 10ns/element, the change would almost always be worthwhile.

Using Java's Project Loom to build more reliable distributed systems

Building distributed databases is very difficult. There are lots of clever ideas you can put into a distributed database, but making the ideas work well together in a real system can be very challenging because different decompositions of the same problem can massively affect how difficult it is to solve, and bugs are typically both subtle and hard to detect. This is especially problematic as the system evolves, where it can be difficult to understand whether an improvement helps or hurts.

With Jepsen and FoundationDB I’ve seen two particularly interesting mechanisms of testing that distributed systems maintain the properties they claim to hold, and Java’s Project Loom should enable a hybrid approach that gains the most of the benefits of both without the costs.

Jepsen

Jepsen is a software framework and blog post series which attempts to find bugs in distributed databases, especially although not exclusively around partition tolerance. The database is installed in a controlled environment (say, virtual machines or Docker containers) and operations are issued and their results recorded. Control over the environment is used to inject nemeses (say, clock skew, network latency, network partitions) randomly, while the operations are being issued and recorded reliably. Invariants can be written according to the database’s marketing material, and should the results violate the invariants, one has cast iron evidence of a bug. As a comparative tool, the results speak for themselves. In the early days, many fanciful claims made by database companies bit the dust, and more recently contracting Kyle Kingsbury to stress your database has become something of a rite of passage.

jepsen

As a white box tool for bug detection, Jepsen is fantastic. If you’ve written the database in question, Jepsen leaves something to be desired. By falling down to the lowest common denominator of ‘the database must run on Linux’, testing is both slow and non-deterministic because most production-level actions one can take are comparatively slow. For a quick example, suppose I’m looking for bugs in Apache Cassandra which occur due to adding and removing nodes. It’s usual for adding and removing nodes to Cassandra to take hours or even days, although for small databases it might be possible in minutes, probably not much less than. A Jepsen environment could only run one iteration of the test every few minutes; if the failure case only happens one time in every few thousand attempts, without massive parallelism I might expect to discover issues only every few days, if that. I had an improvement that I was testing out against a Cassandra cluster which I discovered deviated from Cassandra’s pre-existing behaviour with (against a production workload) probability one in a billion. Jepsen likely won’t help with that.

Suppose that we either have a large server farm or a large amount of time and have detected the bug somewhere in our stack of at least tens of thousands of lines of code. We now have a bug, but no reproduction of said bug. If there is some kind of smoking gun in the bug report or a sufficiently small set of potential causes, this might just be the start of an odyssey.

Jepsen is probably the best known example of this type of testing, and it certainly moved the state of the art; most database authors have similar suites of tests. ScyllaDB documents their testing strategy here and while the styles of testing might vary between different vendors, the strategis have mostly coalesced around this approach.

Jepsen is limited here by its broadness. As the author of the database, we have much more access to the database if we so desire, as shown by FoundationDB.

FoundationDB

One of the most personally influential tech talks I’ve watched is Will Wilson’s FoundationDB talk at StrangeLoop 2014. I highly recommend watching the talk, but here follows a brief summary.

When the FoundationDB team set out to build a distributed database, they didn’t start by building a distributed database. Instead, they built a deterministic simulation of a distributed database. They built mocks of networks, filesystems, hosts, which all worked similarly to those you’d see in a real system but with simulated time and resources allowing injection of failures.

fdb

They could now write deterministic tests of their system, similarly to the Jepsen tests, except for two key differences:

  1. The tests were deterministic and so any test failure naturally had a baked in reproduction (one can simply re-run the test to observe the same result).
  2. The tests could be made extremely fast because the test doubles enabled skipping work. For example, suppose that a task needs to wait for a second. Instead of implementing by actually waiting for a second, the simulation could possibly increment a number (time) by a second (and do any work that needed to be done in the meantime). Alternatively, consider an RPC connection. Instead of actually using the TCP stack, a simulation could be used which does not require any operating system collaboration.

Once the team had built their simulation of a database, they could swap out their mocks for the real thing, writing the adapters from their interfaces to the various underlying operating system calls. At this point, they could run the same tests in a way similar to Jepsen (my understanding was that a small fleet of servers, programmable switches and power supplies was used). These real-hardware re-runs could be used to ensure that the simulation matched the real world, since any failure not seen in the simulation naturally corresponds to a deficiency in the simulation.

Once satisfied with their simulation, they had a database they could sell!

I’ve heard that this was beneficial to the company in a number of ways, but the one that caught my eye can be paraphrased as ‘because we had very high confidence in the correctness of our code and the changes we were making to it, we could be much more aggressive with regards to the technical architecture of the system1’.

fdb-pyramid

One problem with this approach is that it makes a huge technical tradeoff - programming languages and operating system concepts typically do not make it easy to solve problems using this mechanic because large parts of the system are not naturally controllable in a deterministic way. Let’s use a simple Java example, where we have a thread that kicks off some concurrent work, does some work for itself, and then waits for the initial work to finish.

ExecutorService executor = createExecutorService();
Future<String> task = executor.submit(() -> {
   Future<String> concurrentComputation = executor.submit(this::doExpensiveComputation);
   return doExpensiveComputation2() + concurrentComputation.get();
});

If the ExecutorService involved is backed by multiple operating system threads, then the task will not be executed in a deterministic fashion because the operating system task scheduler is not pluggable. If instead it is backed by a single operating system thread, it will deadlock.

An alternative approach might be to use an asynchronous implementation, using Listenable/CompletableFutures, Promises, etc. Here, we don’t block on another task, but use callbacks to move state. Palantir’s Dialogue uses this model to implement an RPC library. By utilizing an asynchronous implementation and using a deterministic scheduler (as well as swapping out the underlying Apache HTTP client for a stub implementation), they can easily simulate the load balancing logic and generate predictions of how well different algorithms handle different cases against differently unperformant servers, producing pretty charts of the results which can be posted to pull requests each time they change. This had a side effect - by measuring the runtime of the simulation, one can get a good understanding of the CPU overheads of the library and optimize the runtime against this. In some ways this is similar to SQLite’s approach to CPU optimization. More broad usage of the model can easily become unwieldy2.

FoundationDB’s usage of this model required them to build their own programming language, Flow, which is transpiled to C++. The simulation model therefore infects the entire codebase and places large constraints on dependencies, which makes it a difficult choice.

Conclusion

It’s typical to test the consistency protocols of distributed systems via randomized failure testing. Two approaches which sit at different ends of the spectrum are Jepsen and the simulation mechanism pioneered by FoundationDB. The former allows the system under test to be implemented in any way, but is only viable as a last line of defense. The latter can be used to guide a much more aggressive implementation strategy, but requires the system to be implemented in a very specific style.

Feature Jepsen-style FoundationDB-style
Integration difficulty So long as it uses syscalls, it can be done Required writing own programming language
Speed of testing Slow - good for acceptance tests Fast - much faster than the production DB
Representative? Yes - all subsystems same as in production No - detailed simulations, but test doubles relied upon
Useful for debugging No - distributed systems failures never fun to debug. Yes - since deterministic, all failures replayable

Java’s Project Loom considerably shakes up this tradeoff.

spectrum

Project Loom

Project Loom provides ‘virtual’ threads as a first class concept within Java. There is plenty of good information in the 2020 blog post ‘State of Loom’ although details have changed in the last two years.

I will give a simplified description of what I find exciting about this. In traditional Java we can launch a Thread. If it needs to pause for some reason, the thread will be paused, and will resume when it is able to. Java does not make it easy to control the threads (pause at a critical section, pick who acquired the lock, etc), and so influencing the interleaving of execution is very difficult except for in very isolated cases.

With Loom’s virtual threads, when a thread starts, a Runnable is submitted to an Executor. When that task is run by the executor, if the thread needs to block, the submitted runnable will exit, instead of pausing. When the thread can be unblocked, a new runnable is submitted to the same executor to pick up where the previous Runnable left off. Here, interleaving is much, much easier, since we are passed each piece of runnable work as it becomes runnable. Combined with the Thread.yield() primitive, we can also influence the points at which code becomes deschedulable.

Queue<Runnable> executor = new ArrayDeque<>();
Lock lock = new ReentrantLock();
lock.lock();
newVirtualThread(executor::add, lock::lock);
assertThat(executor).hasSize(1)
    .as("runnable for vthread has been submitted");
executor.poll().run();
assertThat(executor).hasSize(0)
    .as("vthread has blocked, no longer runnable");
lock.unlock();
assertThat(executor).hasSize(1)
    .as("due to unlock, the vthread is now schedulable again");
executor.poll().run();
assertThat(lock.tryLock()).isFalse()
    .as("the virtual thread now holds the lock");

By utilizing this API, we can exert fine grained deterministic control over execution within Java. Let’s give a micro level example. Suppose we’re trying to test the correctness of a buggy version of Guava’s Suppliers.memoize function.

class BrokenMemoizingSupplier<T> implements Supplier<T> {
  private final Lock lock;
  private final Supplier<T> delegate;
  private volatile boolean initialized = false;
  private volatile T value;

  BrokenMemoizingSupplier(Supplier<T> delegate) {
    this(new ReentrantLock(), delegate);
  }

  @VisibleForTesting
  BrokenMemoizingSupplier(Lock lock, Supplier<T> delegate) {
    this.lock = lock;
    this.delegate = delegate;
  }

  @Override
  public T get() {
    if (!initialized) {
      lock.lock();
      try {
        // This code is broken because initialized may have changed between its check before the call to 'lock' and now.
        T result = delegate.get();
        value = result;
        initialized = true;
        return result;
      } finally {
        lock.unlock();
      }
    }
    return value;
  }
}

This test is enough to deterministically exhibit the bug:

// This will fail every time
@ParameterizedTest
@MethodSource("range")
void testAllCases(int seed) {
  AtomicInteger countCalls = new AtomicInteger();
  BrokenMemoizingSupplier<Integer> supplier =
    new BrokenMemoizingSupplier<>(
      new YieldingLock(),
      countCalls::incrementAndGet);
  Random random = new Random(seed);
  RandomizingExecutor executor = new RandomizingExecutor(random);
  newVirtualThread(executor, supplier::get);
  newVirtualThread(executor, supplier::get);
  executor.drain();
  assertThat(supplier.get()).isOne();
}

static Stream<Integer> range() {
  return IntStream.range(0, 10).boxed();
}

/**
  * This executor executes randomly for brevity. By tracking which
  * tasks launch which other tasks, one could generate all valid
  * interleavings for small cases such as this.
  */
private static final class RandomizingExecutor implements Executor {
  private final Random random;
  private final List<Runnable> queue = new ArrayList<>();

  private RandomizingExecutor(Random random) {
    this.random = random;
  }

  @Override
  public void execute(Runnable command) {
    queue.add(command);
  }

  public void drain() {
    while (!queue.isEmpty()) {
      Collections.shuffle(queue, random);
      Runnable task = queue.remove(queue.size() - 1);
      task.run();
    }
  }
}

// Thread.yield will pause the thread and cause it to become schedulable again.
private static final class YieldingLock extends ReentrantLock {
  @Override
  public void lock() {
    Thread.yield();
    super.lock();
  }
}

This test is highly limited as compared to a tool like jcstress, since any issues related to compiler reordering of reads or writes will be untestable. There are however two key advantages of testing using this style.

  1. Many races will only be exhibited in specific circumstances. For example, on a single core machine, in the absence of a sleep or control primitive like a CountDownLatch, it’s unlikely that the above bug could be found. By generating a lot of simulated context switching, a broader set of possible interleavings can be cheaply explored. This makes it more likely that the author will find bugs that they were not specifically looking for.
  2. The approach can be scaled up to much larger systems.

Hypothesis

Loom offers the same simulation advantages of FoundationDB’s Flow language (Flow has other features too, it should be noted) but with the advantage that it works well with almost the entire Java runtime. This means that idiomatic Java (or other JVM languages like Kotlin, Scala or presumably eventually anything that works on GraalVM) fits in well, and the APIs that Loom exposes3 makes it straightforward to experiment with this kind of approach. This significantly broadens the scope for FoundationDB like implementation patterns, making it much easier for a large class of software to utilize this mechanism of building and verifying distributed systems.

What one could do is the following:

  1. Start by building a simulation of core Java primitives (concurrency/threads/locks/caches, filesystem access, RPC). Implement the ability to insert delays, errors in the results as necessary. The simulations could be high or low level. One could implement a simulation of core I/O primitives like Socket, or a much higher level primitive like a gRPC unary RPC4.
  2. As you build your distributed system, write your tests using the simulation framework.
    • For shared datastructures that see accesses from multiple threads, one could write unit tests which check that properties are maintained using the framework.
    • Subsystems could be tested in isolation against the simulation (for example, the storage backend could be tested against a simulation of a filesystem).
    • And then the integration of the system could also be tested against the simulation.
  3. Sufficiently high level tests would be run against the real system as well, with any unobserved behaviours or failures that cannot be replicated in the simulation the start of a refining feedback loop.
  4. Run tests continuously, using a server farm to accelerate the rate of test execution, different runs starting on different seeds; flag failures for diagnosis.
  5. Over time, refine the infrastructure. For unit test type simulations, perhaps try to find all topological sorts of triggered futures or otherwise try to understand different dependencies (control flow, data, etc) between different tasks. For acceptance test style simulation, probably try to accelerate the feedback loop of what to do when a test fails; perhaps re-run the test up to a few minutes before so that a human can attach a debugger at a relevant point, snapshot state, etc etc. Start looking for ‘smoke’, large memory allocations or slowdowns that might not influence correctness.

Because Java’s implementation of virtual threads is so general, one could also retrofit the system onto their pre-existing system. A loosely coupled system which uses a ‘dependency injection’ style for construction where different subsystems can be replaced with test stubs as necessary would likely find it easy to get started (similarly to writing a new system). A tightly coupled system which uses lots of static singletons would likely need some refactoring before the model could be attempted. It’s also worth saying that even though Loom is a preview feature and is not in a production release of Java, one could run their tests using Loom APIs with preview mode enabled, and their production code in a more traditional way.

My main claim is that the team that follows this path would find themselves to have commercial advantages over a more traditionally tested database. Historically, what I’ve seen is that confidence must be paid for - testing infra for distributed systems can be extremely expensive to maintain; infra becomes outdated, the software evolves, the infra becomes unrepresentative, flakes must be understood and dealt with promptly. These costs are particularly high for distributed systems. Loom-driven simulations make much of this far simpler.

Usage in a real database (Raft)

To demonstrate the value of an approach like this when scaled up, I challenged myself to write a toy implementation of Raft, according to the simplified protocol in the paper’s figure 2 (no membership changes, no snapshotting). I chose Raft because it’s new to me (although I have some experience with Paxos), and is supposed to be hard to get right and so a good target for experimenting with bug-finding code.

All code is made available here.

Implementation sketch

The core of the simulation is built around a PriorityQueue of runnable tasks. Tasks are added to this queue with a certain launch Instant, and the priority queue moves time forward as it processes tasks, somewhat like:

class Simulation {
  PriorityQueue<QueuedTask> taskQueue = new PriorityQueue<>();
  Instant now = Instant.EPOCH;
  long taskId = 0;

  boolean runNextTask() {
    QueuedTask maybeTask = taskQueue.poll();
    if (maybeTask == null) {
      return false;
    }
    now = maybeTask.launchTime;
    try {
      maybeTask.task.run();
    } catch (RuntimeException e) {
      log.info("caught exception", e);
    }
    return !taskQueue.isEmpty();
  }

  Clock clock() {
    return new SupplierClock(() -> now);
  }

  void enqueue(Duration delay, Runnable task) {
    taskQueue.add(new QueuedTask(
      now.plus(delay), task, taskId++));
  }

  class QueuedTask implements Comparable<QueuedTask> {
    Instant launchTime;
    Runnable task;
    long tieBreak;

    QueuedTask(Instant launchTime, Runnable task, long tieBreak) {
      this.launchTime = launchTime;
      this.task = task;
      this.tieBreak = tieBreak;
    }

    @Override
    public int compareTo(QueuedTask other) {
      int ltc = launchTime.compareTo(other.launchTime);
      if (ltc != 0) {
        return ltc;
      }
      return Long.compare(tieBreak, other.tieBreak);
    }
  }
}

Other primitives (such as RPC, thread sleeps) can be implemented in terms of this. Certain parts of the system need some closer attention. For example, there are many potential failure modes for RPCs that must be considered; network failures, retries, timeouts, slowdowns etc; we can encode logic that accounts for a realistic model of this.

For the actual Raft implementation, I follow a thread-per-RPC model, similar to many web applications. My application has HTTP endpoints (via Palantir’s Conjure RPC framework) for implementing the Raft protocol, and requests are processed in a thread-per-RPC model similar to most web applications. Local state is held in a store (which multiple threads may access), which for purposes of demonstration is implemented solely in memory. In a production environment, there would then be two groups of threads in the system.

server

  1. Request threads (one per request, probably pooled). These access the store and mutate state in response to the requirements of the paper, they can be thought of as similar to any other HTTP API backed by a key-value store.
  2. Leadership thread (singleton). This processes the leadership state machine in response to timeouts. For simplicity, you can think of it as implementing ‘If the leader didn’t heartbeat us in the last X time, try to become the leader. If we’re the leader, make sure the other nodes are up to date with us’.

Any standard methods that needed to be simulated (like Clocks) were replacable with test doubles.

The bulk of the Raft implementation can be found in RaftResource, and the bulk of the simulation in DefaultSimulation. The store sadly goes against my usual good code principles by heavily using setter methods, but this kept the implementation brief.

Evaluation

Simulation performance

The simulation was surprisingly performant. I have no clear comparison point, but on my computer with reasonable-looking latency configurations I was able to simulate about 40k Raft rounds per second on a single core, and 500k when running multiple simulations in parallel. This represents simulating hundreds of thousands of individual RPCs per second, and represents 2.5M Loom context switches per second on a single core.

When I cranked up the rate of timeouts and failures (leading to lots of exceptions being thrown), I saw closer to 15k requests per second processed (with about 100 leader elections during that time) and when I made performance uniformly excellent, I saw single core throughput as high as 85k Raft rounds per second.

Using the simulation to improve protocol performance

The determinism made it straightforward to understand the throughput of the system. For example, with one version of the code I was able to compute that after simulating 10k requests, the simulated system time had moved by 8m37s. After looking through the code, I determined that I was not parallelizing calls to the two followers on one codepath. After making the improvement, after the same number of requests only 6m14s of simulated time (and 240ms of wall clock time!) had passed. This makes it very easy to understand performance characteristics with regards to changes made.

When building a database, a challenging component is building a benchmarking harness. It’s challenging because the cadence at which one can surface benchmark results to developers is governed by how noisy the tests are. Many improvements and regressions represent 1-2% changes in whole-system results; if due to the benchmarking environment or the actual benchmarks 5% variance can be seen, it’s difficult to understand improvements in the short term. Due to this, many teams will either overly rely on microbenchmark results, which can be hard to understand due to Amdahl’s Law, or choose to not benchmark continuously, meaning that regressions will only be caught and sorted infrequently.

Deterministic scheduling entirely removes noise, ensuring that improvements over a wide spectrum can be more easily measured. Even when the improvements are algorithmic and so not represented in the time simulation, the fact that the whole cluster runs in a single core will naturally lead to reduced noise over something that uses a networking stack. Not a panacea, but an improvement for sure.

Using the simulation to test for correctness

Here, I found a number of bugs in my implementation. There may still be bugs, since I have no need to perfect this. However, the rough technique I ended up using was to use a Raft state machine that made it easier to detect bugs; Raft’s guarantees are

  • Election Safety. At most one leader can be elected in a given term
  • Leader Append-Only. A leader never overwrites or deletes entries in its log; it only append new entries.
  • Log Matching. If two logs contain an entry with the same index and term, the logs are identical in all entries up through the given index.
  • Leader completeness. If a log entry is committed in a given term, then that entry will be present in the logs of the leaders for all higher-numbered terms.
  • State Machine Safety. If a server has applied a log entry at a given index to its state machine, no other server will ever apply a different log entry for the same index.

By adding some simple hooks (such as a callback for when a new leader is elected) to the underlying servers, and to provide a state machine implementation which validates the state machine property, as well as periodically validating other state properties, I was able to continuously validate as rounds ran that the correctness properties are maintained, with liveness being testable by ensuring that the state machine continues to progress.

By tweaking latency properties I could easily ensure that the software continued to work in the presence of e.g. RPC failures or slow servers, and I could validate the testing quality by introducing obvious bugs (e.g. if the required quorum size is set too low, it’s not possible to make progress).

private static final class CorrectnessValidator implements Hooks {
  private final Map<TermId, ServerId> leaders = new HashMap<>();
  private boolean incorrectnessDetected = false;
  private final List<LogEntry> canonicalStateMachineEntries = new ArrayList<>();

  public void ensureNoBadnessFound() {
    checkState(!incorrectnessDetected, "at some point, some form of incorrectness was found");
  }

  @Override
  public void onNewLeader(TermId termId, ServerId leader) {
    ServerId currentLeader = leaders.putIfAbsent(termId, leader);
    checkState(
        currentLeader == null,
        "two leaders were elected for the same",
        SafeArg.of("term", termId),
        SafeArg.of("currentLeader", currentLeader),
        SafeArg.of("newLeader", leader));
  }

  public StateMachine newStateMachine() {
    return new StateMachine() {
      private LogIndex currentIndex = LogIndexes.ZERO;

      @Override
      public void apply(LogIndex logIndex, LogEntry entry) {
        checkState(
            logIndex.equals(LogIndexes.inc(currentIndex)),
            "out of order log entries passed to state machine");
        if (canonicalStateMachineEntries.size() >= logIndex.get()) {
          checkState(
              canonicalStateMachineEntries
                  .get(LogIndexes.listIndex(logIndex))
                  .equals(entry),
              "two state machines saw different entries for a given log index");
        } else {
          canonicalStateMachineEntries.add(entry);
        }
        currentIndex = logIndex;
      }
    };
  }

  private void checkState(boolean condition, String message, SafeArg<?>... args) {
    if (!condition) {
      incorrectnessDetected = true;
    }
    Preconditions.checkState(condition, message, args);
  }
}

Conclusion

I’ve found Jepsen and FoundationDB to apply two similar in idea but different in implementation testing methodologies in an extremely interesting way. Java’s Project Loom makes fine grained control over execution easier than ever before, enabling a hybridized approach to be cheaply invested in. I believe that there’s a competitive advantage to be had for a development team that uses simulation to guide their development, and usage of Loom should allow a team to dip in and out where the approach is and isn’t beneficial. Historically this approach was viable, but a gamble, since it led to large compromises elsewhere in the stack. Loom’s primitives mean that for a Java shop, the prior compromises are almost entirely absent due to the depth of integration put into making Loom work well with the core of Java, meaning that most libraries and frameworks will work with virtual threads unmodified. I think that there’s room for a library to be built that provides standard Java primitives in a way that can admits straightforward simulation (for example, something similar to CharybdeFS using standard Java IO primitives).


  1. This rang true for me, since I’ve seen even small bugs in complex systems cause outsized debugging pain and issues in production. One example here might be HTTP/2 implementations; every web-app level implementation of HTTP/2 I’ve seen (in OkHTTP, Undertow, Jetty, Go) has eventually caused issues in production due to subtle bugs in connection management state machines. 

  2. More sophisticated implementations can easily become unwieldy. I briefly experimented with an implementation of Palantir’s transactional key-value store AtlasDB in such a style early on in 2020 in some free time. While I eventually had something that worked, the code I had written was unfortunately inefficient, inscrutable, and disastrous to debug, since a stack trace 10 elements deep might instead become 10 anonymous runnables in an executor. 

  3. Loom does not strictly have pluggable schedulers. The API was dropped for the preview release here, but based on documentation is a clear design goal of the project and reflection re-establishes the desired behaviour

  4. My bias would be that when the failure modes of a given piece of code can be well understood and separately tested, the simulation should be kept high level so as to ensure some modularity and good performance, or otherwise to implement at both levels and to choose the most appropriate one at time of usage (e.g. use the simulation of a socket for RPC testing, use the simulation of the RPC for server testing). 

Proving a language irregular in a nontrivial way

Everyone knows that regular expressions (at least in the theoretical sense) have almost immediate limits on what they can recognise, though this is not always realised by users. The problem is that the presented proofs of languages that are not regular are typically those that are easily provable. This is because

  • It is easy to present a simple use of the pumping lemma in a 1 hour lecture than anything in depth, and
  • most of the reasons why regular languages are taught can be summed up in the well bracketing and $a^nb^n$ proofs - they prove that you can’t parse programming languages with regular expressions.

The challenge

I found this problem on Reddit, around 9 months ago. I had a quick search for the source, but with no luck. It was set to a redditor as a homework question, by a professor who mixed up his nouns. As I recall, the professor realised his mistake and changed the question to an easier one.

Is the language consisting of numbers encoded in base 3 for which the base 2 representation has even parity regular?

Here, even parity means that each number has an even number of 1 bits in its binary representation.

Standard approaches

The Pumping Lemma

The classic two approaches are the Pumping Lemma, and the Myhill-Nerode theorem. As far as I can work out, the pumping lemma is known by all, whilst Myhill-Nerode is known by fewer.

The problem here is that (at least in my case) it’s difficult to reason about whether pumping actually produces a string in the language (or not). Generally one would hope to find a word known to be in the language, but which when pumped must produce something not in the language. In my case, the nearest I could get was something of the form $3^n$, which empirically proved to be in the language about 50% of the time.

This at least implies that if I picked a random word on the language and pumped on it, the probability that the word was in the language and its pumped $3^{n+m}$ form was not would be around 0.25 (and no obvious pattern could be seen in the results). Of course, this proved nothing, but was a useful heuristic going forward - I could assume that the language would not be regular. I admit that I find this argument mostly convincing, but clearly it is not a rigorous proof at all.

Myhill-Nerode

Broadly speaking, you just get to the same issues as the pumping lemma.

Generating functions

Another approach is to use the generating function of a regular language. If we let $s _ L(n)$ be the number of words of length $n$ in $L$, then the ordinary generating function is defined by \(S _ L(z) = \sum _ {n \geq 0}s _ L(n)z^n.\) It is known that the generating function of a language $L$ is a rational function if $L$ is regular. By rational function, it is meant that there exist polynomials $P$ and $Q$ such that \(S _ L(z) = \frac{P(z)}{Q(z)}.\)

Let us consider the number of words of each length in this language. In binary, for a word of length $n$, exactly half of all words of a given length will be in the language. Assuming that the numbers in the language are evenly distributed, this will be the case for the ternary representations (and again, empirical evidence implied this to be true). So, this is clearly rational, and again is not a witness to the irregularity of the language.

Cobham’s Theorem

Clearly at this point I wanted to try and reason specifically about the structure of the sequence. The OEIS page gives great context to this, and in particular it references the Thue-Morse sequence, the characteristic function for the natural numbers not in this set. This is very interesting, as the Thue-Morse sequence is overlap-free.

This means that the set $S$ that we want to recognise in base 3 cannot be eventually periodic - that is to say, there cannot exist $C$ and $k$ such that for all $x \geq C$, if $x \in S$ then $x+k \in S$ also.

Thankfully, it turns out that Cobham (of Cobham-Edmonds thesis fame) at one point proved the theorem

Let $r, s \geq 2$ be multiplicatively independent bases. A set $X \subseteq \mathbb{N}$ is $r$- and $s-$recognisable (recognisable by a DFA in bases $r$ and $s$) iff $X$ is a finite union of constants and arithmetic progressions.

The great news here is that bases 2 and 3 are multiplicatively independent (for no $k, l$ is $r^k=s^l$ where $k, l \in \mathbb{N} - \{0\}$).

This means that if the language is to be recognisable in base 3, it must be a finite union of constants and arithmetic progressions.

Here follows a simple proof that such a language must be eventually periodic (there exist $P$ and $C$ such that for all $x\geq C$, if $x$ is in the set then so is $x + P$).

Suppose that our language is defined as the numbers in some set \(\\{c _ 0, c _ 1, \dots, c _ n\\}\cup \bigcup _ {i \leq N}\\{x | \exists n\in \mathbb{N}.x = a _ i + nb _ i\\}.\) Now consider the number $K$, defined to be the least common multiple of $b _ 0, b _ 1, \dots, b _ N$. The language must at least be periodic with period $K$, because $K$ is some integer multiple of each $b _ i$, and so is periodic with period $K$ for all $x \geq \max\{c _ 0, \dots, c _ n\}$.

This means that if the sequence is to be recognisable in base 3, it must be eventually periodic. As noted above, for each number $n$ in the language, Thue-Morse$(n) = 0$. It is well known that the Thue-Morse sequence is overlap-free, and so the language cannot possibly be regular.

Conclusion

We’re done. I thought lightly about this exercise for a fair while, but only recently had the motivation to actually follow it through to the end. I (broadly speaking) wrote this up as a description of how proving languages irregular can be more involved and indeed interesting than simply invoking the pumping lemma and bashing through a few lines of answer, as was required of me in my course. I wrote this mostly as a quick after-action report, but I hope it reads well.

Acknowledgement

After getting stuck, I pointed my search towards CompSci StackExchange. I was stuck, and I didn’t have a good enough grasp of the literature to know where to move next. The result was essentially a great advertisement for StackExchange - Jeffrey Shallit who answered my question has written papers about the Thue-Morse sequence and written a book about the sequences recognisable by finite automata - as far as I can work out he was one of the best placed to help me anywhere. The proof presented is his, not mine - I got to exploring the properties of the Thue-Morse sequence, and have merely found the proof interesting enough to write about here.

My new blog

If all goes well, this site will eventually contain a whole wealth of informative, useful and (by and large) correct information. If all goes well, you’ll want to read this. At the moment, there’s nothing here. Sorry.