Skip to content

Race condition when using pipelining in cluster mode #1111

@volodymyrpavlenko

Description

@volodymyrpavlenko

Bug Report

Current Behavior

When using Lettuce in cluster mode with pipelining (connection.setAutoFlushCommands(false)), there seems to be a race condition between adding commands to write buffer, and flushing commands to output.

The race condition can be triggered when lettuce client is not yet connected to the node, where the command should be routed. In this case, the response future from a GET method can be returned before the command is actually added to the command buffer.

If connection.flushCommands() is invoked before the command is added to the buffer, then it will not be send, thus the client future will never complete (unless we invoke connection.flushCommands() again).

The race condition seems to be due to the following lines:
https://github.com/lettuce-io/lettuce-core/blob/master/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java#L104
https://github.com/lettuce-io/lettuce-core/blob/master/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java#L129

Input Code

Input Code
  public static void main(String[] args) {
    for (int i = 0; i < 1000; i++) {
      test();
      System.out.println("Finished iteration " + i);
    }
  }

  private static void test() {
    RedisClusterClient redisClusterClient = RedisClusterClient.create("redis://localhost:7008/");

    final StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();

//    connection.setReadFrom(ReadFrom.SLAVE); // Uncommenting this line will make test fail much more often
    connection.setAutoFlushCommands(false);

    final RedisAdvancedClusterAsyncCommands<String, String> async = connection.async();

    /*
     127.0.0.1:7000> cluster keyslot bar
     (integer) 5061 (node1)
     127.0.0.1:7000> cluster keyslot baz2
     (integer) 10594 (node2)
     127.0.0.1:7000> cluster keyslot foo
     (integer) 12182 (node3)
    */

    final RedisFuture<String> future1 = async.get("bar");
    final RedisFuture<String> future2 = async.get("baz2");
    final RedisFuture<String> future3 = async.get("foo");

    connection.flushCommands();

    if (!LettuceFutures.awaitAll(10, TimeUnit.SECONDS, future1, future2, future3)) {
      connection.flushCommands();
      if (LettuceFutures.awaitAll(1, TimeUnit.SECONDS, future1, future2, future3)) {
        throw new IllegalStateException("Commands didn't finish in 10 seconds. Finished after second invocation of connection.flushCommands().");
      } else {
        throw new IllegalStateException("Commands didn't finish in 10 seconds.");
      }
    }

    connection.close();
  }
Output
Finished iteration 0
Finished iteration 1
Finished iteration 2
Finished iteration 3
Finished iteration 4
Finished iteration 5
Finished iteration 6
Finished iteration 7
Finished iteration 8
Finished iteration 9
Finished iteration 10
Finished iteration 11
Finished iteration 12
Finished iteration 13
Exception in thread "main" java.lang.IllegalStateException: Commands didn't finish in 10 seconds. Finished after second invocation of connection.flushCommands().
	at com.spotify.redisperformancebenchmark.tests.TestRaceCondition.test(TestRaceCondition.java:47)
	at com.spotify.redisperformancebenchmark.tests.TestRaceCondition.main(TestRaceCondition.java:13)

Expected behavior/code

All futures eventually complete with only one invocation of flushCommands()

Environment

  • Lettuce version(s): [5.1.8.RELEASE]
  • Redis version: [5.0.5]

Possible Solution

Is it possible to separate creation of a Connection object (and command buffer), and actual network connectivity. This will allow to store command into the buffer before returning future to the client.

Additional context

From the code, looks like MasterSlaveChannelWriter will have the same issue, though I have not validated this.

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions