Scylla Documentation Logo Documentation
  • Server
    • Scylla Open Source
    • Scylla Enterprise
    • Scylla Alternator
  • Cloud
    • Scylla Cloud
    • Scylla Cloud Docs
  • Tools
    • Scylla Manager
    • Scylla Monitoring Stack
    • Scylla Operator
  • Drivers
    • CQL Drivers
    • DynamoDB Drivers
Download
Menu

Caution

You're viewing documentation for a previous version of Scylla Java Driver. Switch to the latest stable version.

Scylla Java Driver Manual Core driver Speculative query execution

Speculative query execution¶

Quick overview¶

Pre-emptively query another node if the current one takes too long to respond.

  • advanced.speculative-execution-policy in the configuration.

  • disabled by default. Also available: constant delay, or write your own policy.

  • can have per-profile policies.

  • only kicks in if the query is idempotent.

  • creates more traffic: tune your pool and provision your cluster accordingly.


Sometimes a Cassandra node might be experiencing difficulties (ex: long GC pause) and take longer than usual to reply. Queries sent to that node will experience bad latency.

One thing we can do to improve that is pre-emptively start a second execution of the query against another node, before the first node has replied or errored out. If that second node replies faster, we can send the response back to the client. We also cancel the first execution (note that “cancelling” in this context simply means discarding the response when it arrives later, Cassandra does not support cancellation of in flight requests):

client           driver          exec1  exec2
--+----------------+--------------+------+---
  | execute(query) |
  |--------------->|
  |                | query node1
  |                |------------->|
  |                |              |
  |                |              |
  |                |     query node2
  |                |-------------------->|
  |                |              |      |
  |                |              |      |
  |                |     node2 replies   |
  |                |<--------------------|
  |   complete     |              |
  |<---------------|              |
  |                | cancel       |
  |                |------------->|

Or the first node could reply just after the second execution was started. In this case, we cancel the second execution. In other words, whichever node replies faster “wins” and completes the client query:

client           driver          exec1  exec2
--+----------------+--------------+------+---
  | execute(query) |
  |--------------->|
  |                | query node1
  |                |------------->|
  |                |              |
  |                |              |
  |                |     query node2
  |                |-------------------->|
  |                |              |      |
  |                |              |      |
  |                | node1 replies|      |
  |                |<-------------|      |
  |   complete     |                     |
  |<---------------|                     |
  |                | cancel              |
  |                |-------------------->|

Speculative executions are disabled by default. The following sections cover the practical details and how to enable them.

Query idempotence¶

If a query is not idempotent, the driver will never schedule speculative executions for it, because there is no way to guarantee that only one node will apply the mutation.

Configuration¶

Speculative executions are controlled by a policy defined in the configuration. The default implementation never schedules an execution:

datastax-java-driver.advanced.speculative-execution-policy {
  class = NoSpeculativeExecutionPolicy
}

The “constant” policy schedules executions at a fixed delay:

datastax-java-driver.advanced.speculative-execution-policy {
  class = ConstantSpeculativeExecutionPolicy
  
  # The maximum number of executions (including the initial, non-speculative execution).
  # This must be at least one.
  max-executions = 3

  # The delay between each execution. 0 is allowed, and will result in all executions being sent
  # simultaneously when the request starts.
  # Note that sub-millisecond precision is not supported, any excess precision information will
  # be dropped; in particular, delays of less than 1 millisecond are equivalent to 0.
  # This must be positive or 0.
  delay = 100 milliseconds
}

Given the above configuration, an idempotent query would be handled this way:

  • start the initial execution at t0;

  • if no response has been received at t0 + 100 milliseconds, start a speculative execution on another node;

  • if no response has been received at t0 + 200 milliseconds, start another speculative execution on a third node;

  • past that point, don’t query other nodes, just wait for the first response to arrive.

Finally, you can create your own policy by implementing SpeculativeExecutionPolicy, and referencing your implementation class from the configuration.

How speculative executions affect retries¶

Turning on speculative executions doesn’t change the driver’s retry behavior. Each parallel execution will trigger retries independently:

client           driver          exec1  exec2
--+----------------+--------------+------+---
  | execute(query) |
  |--------------->|
  |                | query node1
  |                |------------->|
  |                |              |
  |                | unavailable  |
  |                |<-------------|
  |                |
  |                |retry at lower CL
  |                |------------->|
  |                |              |
  |                |     query node2
  |                |-------------------->|
  |                |              |      |
  |                |     server error    |
  |                |<--------------------|
  |                |              |
  |                |   retry on node3
  |                |-------------------->|
  |                |              |      |
  |                | node1 replies|      |
  |                |<-------------|      |
  |   complete     |                     |
  |<---------------|                     |
  |                | cancel              |
  |                |-------------------->|

The only impact is that all executions of the same query always share the same query plan, so each node will be used by at most one execution.

Tuning and practical details¶

The goal of speculative executions is to improve overall latency (the time between execute(query) and complete in the diagrams above) at high percentiles. On the flip side, too many speculative executions increase the pressure on the cluster.

If you use speculative executions to avoid unhealthy nodes, a good-behaving node should rarely hit the threshold. We recommend running a benchmark on a healthy platform (all nodes up and healthy) and monitoring the request percentiles with the cql-requests metric. Then use the latency at a high percentile (for example p99.9) as the threshold.

Alternatively, maybe low latency is your absolute priority, and you are willing to take the increased throughput as a tradeoff. In that case, set the threshold to 0 and provision your cluster accordingly.

You can monitor the number of speculative executions triggered by each node with the speculative-executions metric.

Stream id exhaustion¶

One side-effect of speculative executions is that many requests get cancelled, which can lead to a phenomenon called stream id exhaustion: each TCP connection can handle multiple simultaneous requests, identified by a unique number called stream id (see also the pooling section). When a request gets cancelled, we can’t reuse its stream id immediately because we might still receive a response from the server later. If this happens often, the number of available stream ids diminishes over time, and when it goes below a given threshold we close the connection and create a new one. If requests are often cancelled, you will see connections being recycled at a high rate.

The best way to monitor this is to compare the pool.orphaned-streams metric to the total number of available stream ids (which can be computed from the configuration: pool.local.size * max-requests-per-connection). The pool.available-streams and pool.in-flight metrics will also give you an idea of how many stream ids are left for active queries.

Request ordering¶

Note: ordering issues are only a problem with server-side timestamps, which are not the default anymore in driver 4+. So unless you’ve explicitly enabled ServerSideTimestampGenerator, you can skip this section.

Suppose you run the following query with speculative executions and server-side timestamps enabled:

insert into my_table (k, v) values (1, 1);

The first execution is a bit too slow, so a second execution gets triggered. Finally, the first execution completes, so the client code gets back an acknowledgement, and the second execution is cancelled. However, cancelling only means that the driver stops waiting for the server’s response, the request could still be “on the wire”; let’s assume that this is the case.

Now you run the following query, which completes successfully:

delete from my_table where k = 1;

But now the second execution of the first query finally reaches its target node, which applies the mutation. The row that you’ve just deleted is back!

The workaround is to either specify a timestamp in your CQL queries:

insert into my_table (k, v) values (1, 1) USING TIMESTAMP 1432764000;

Or use a client-side timestamp generator.

Using multiple policies¶

The speculative execution policy can be overridden in execution profiles:

datastax-java-driver {
  advanced.speculative-execution-policy {
    class = ConstantSpeculativeExecutionPolicy
    max-executions = 3
    delay = 100 milliseconds
  }
  profiles {
    oltp {
      basic.request.timeout = 100 milliseconds
    }
    olap {
      basic.request.timeout = 30 seconds
      advanced.speculative-execution-policy.class = NoSpeculativeExecutionPolicy
    }
  }
}

The olap profile uses its own policy. The oltp profile inherits the default profile’s. Note that this goes beyond configuration inheritance: the driver only creates a single ConstantSpeculativeExecutionPolicy instance and reuses it (this also occurs if two sibling profiles have the same configuration).

Each request uses its declared profile’s policy. If it doesn’t declare any profile, or if the profile doesn’t have a dedicated policy, then the default profile’s policy is used.

PREVIOUS
Using the shaded JAR
NEXT
SSL
  • 4.12.0.x
    • 4.13.0.x
    • 4.12.0.x
    • 4.11.1.x
    • 4.10.0.x
    • 4.7.2.x
    • 3.11.2.x
    • 3.11.0.x
    • 3.10.2.x
    • 3.7.2.x
  • Java Driver for Scylla and Apache Cassandra®
  • API Documentation
  • Manual
    • API conventions
    • Case sensitivity
    • Core driver
      • Address resolution
      • Asynchronous programming
      • Authentication
      • Bill of Materials (BOM)
      • Compression
      • Configuration
        • Reference configuration
      • Control connection
      • Custom codecs
      • Detachable types
      • Query idempotence
      • Integration
      • Load balancing
      • Logging
      • Metadata
        • Node metadata
        • Schema metadata
        • Token metadata
      • Metrics
      • Native protocol
      • Non-blocking programming
      • Paging
      • Performance
      • Connection pooling
      • Query timestamps
      • Reactive Style Programming
      • Reconnection
      • Request tracker
      • Retries
      • Using the shaded JAR
      • Speculative query execution
      • SSL
      • Statements
        • Batch statements
        • Per-query keyspace
        • Prepared statements
        • Simple statements
      • Temporal types
      • Request throttling
      • Query tracing
      • Tuples
      • User-defined types
    • Developer docs
      • Administrative tasks
      • Common infrastructure
        • Concurrency
        • Driver context
        • Event bus
      • Native protocol layer
      • Netty pipeline
      • Request execution
    • Mapper
      • Integration
        • Kotlin
        • Lombok
        • Java 14 Records
        • Scala
      • DAOs
        • Custom result types
        • Delete methods
        • GetEntity methods
        • Increment methods
        • Insert methods
        • Null saving strategy
        • Query methods
        • Query provider methods
        • Select methods
        • SetEntity methods
        • Statement attributes
        • Update methods
      • Entities
      • Mapper interface
    • OSGi
    • Query builder
      • Conditions
      • DELETE
      • Idempotence in the query builder
      • INSERT
      • Relations
      • Schema builder
        • Aggregate
        • Function
        • Index
        • Keyspace
        • Materialized View
        • Table
        • Type
      • SELECT
      • Terms
      • TRUNCATE
      • UPDATE
  • Upgrade guide
  • Frequently asked questions
  • Changelog
  • Create an issue
  • Edit this page

On this page

  • Speculative query execution
    • Quick overview
    • Query idempotence
    • Configuration
    • How speculative executions affect retries
    • Tuning and practical details
      • Stream id exhaustion
      • Request ordering
    • Using multiple policies
Logo
Docs Contact Us About Us
Mail List Icon Slack Icon
© 2022, ScyllaDB. All rights reserved.
Last updated on 25 May 2022.
Powered by Sphinx 4.3.2 & ScyllaDB Theme 1.2.2