Scylla Documentation Logo Documentation
  • Server
    • Scylla Open Source
    • Scylla Enterprise
    • Scylla Alternator
  • Cloud
    • Scylla Cloud
    • Scylla Cloud Docs
  • Tools
    • Scylla Manager
    • Scylla Monitoring Stack
    • Scylla Operator
  • Drivers
    • CQL Drivers
    • DynamoDB Drivers
Download
Menu

Caution

You're viewing documentation for a previous version of Scylla Java Driver. Switch to the latest stable version.

Scylla Java Driver Manual Core driver Asynchronous programming

Asynchronous programming¶

Quick overview¶

Async driver methods return Java 8’s CompletionStage.

  • don’t call synchronous methods from asynchronous callbacks (the driver detects that and throws).

  • callbacks execute on I/O threads: consider providing your own executor for expensive computations.

  • be careful not to accidentally ignore errors thrown from callbacks.


The driver exposes an asynchronous API that allows you to write fully non-blocking programs. Asynchronous methods return instances of the JDK’s CompletionStage, that can be conveniently chained and composed.

Here is a short example that opens a session and runs a query asynchronously:

CompletionStage<CqlSession> sessionStage = CqlSession.builder().buildAsync();

// Chain one async operation after another:
CompletionStage<AsyncResultSet> responseStage =
    sessionStage.thenCompose(
        session -> session.executeAsync("SELECT release_version FROM system.local"));

// Apply a synchronous computation:
CompletionStage<String> resultStage =
    responseStage.thenApply(resultSet -> resultSet.one().getString("release_version"));

// Perform an action once a stage is complete:
resultStage.whenComplete(
    (version, error) -> {
      if (error != null) {
        System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
      } else {
        System.out.printf("Server version: %s%n", version);
      }
      sessionStage.thenAccept(CqlSession::closeAsync);
    });

Threading model¶

The driver uses two internal thread pools: one for request I/O and one for administrative tasks (such as metadata refreshes, schema agreement or processing server events). Note that you can control the size of these pools with the advanced.netty options in the configuration.

When you register a callback on a completion stage, it will execute on a thread in the corresponding pool:

CompletionStage<CqlSession> sessionStage = CqlSession.builder().buildAsync();
sessionStage.thenAccept(session -> System.out.println(Thread.currentThread().getName()));
// prints s0-admin-n (admin pool thread)

CompletionStage<AsyncResultSet> resultStage =
    session.executeAsync("SELECT release_version FROM system.local");
resultStage.thenAccept(resultSet -> System.out.println(Thread.currentThread().getName()));
// prints s0-io-n (I/O pool thread)

As long as you use the asynchronous API, the driver will behave in a non-blocking manner: its internal threads will almost never block. There are a few exceptions to the rule though: see the manual page on non-blocking programming for details.

Because the asynchronous API is non-blocking, you can safely call a driver method from inside a callback, even when the callback’s execution is triggered by a future returned by the driver:

// Get the department id for a given user:
CompletionStage<AsyncResultSet> idStage =
    session.executeAsync("SELECT department_id FROM user WHERE id = 1");

// Once we have the id, query the details of that department:
CompletionStage<AsyncResultSet> dataStage =
    idStage.thenCompose(
        resultSet -> {
          UUID departmentId = resultSet.one().getUuid(0);
          return session.executeAsync(
              SimpleStatement.newInstance(
                  "SELECT * FROM department WHERE id = ?", departmentId));
        });

However, you can’t call a synchronous method from a callback. This would be very unsafe, because the driver blocks until the response is received; if the request happened to be assigned to the same I/O thread that is currently running the callback, it would deadlock. In fact, the driver detects this situation, and fails fast with a runtime exception to eliminate any chance of a hard-to-debug deadlock:

CompletionStage<ResultSet> dataStage =
    idStage.thenApply(
        resultSet -> {
          UUID departmentId = resultSet.one().getUuid(0);
          // WRONG: calling a synchronous method from an asynchronous callback. DON'T DO THIS!
          return session.execute(
              SimpleStatement.newInstance(
                  "SELECT * FROM department WHERE id = ?", departmentId));
        });

// This is just to show the exception:
dataStage.whenComplete(
    (resultSet, error) -> {
      if (error != null) {
        error.printStackTrace();
      }
    });
// java.util.concurrent.CompletionException:
//   java.lang.IllegalStateException: Detected a synchronous API call on a driver thread,
//                                    failing because this can cause deadlocks.

You should also be careful about expensive computations: if your callbacks hold I/O threads for too long, they will negatively impact the driver’s throughput. Consider providing your own executor:

// Create this as a global resource in your application
Executor computeExecutor = ...

CompletionStage<Integer> resultStage =
    responseStage.thenApplyAsync( // note: thenApplyAsync instead of thenApply
        resultSet -> someVeryExpensiveComputation(resultSet),
        computeExecutor);

Note that an alternate executor can also be used to allow synchronous driver API calls in callbacks, but the recommended approach is to fully commit to the asynchronous model described above.

Error propagation¶

One thing to pay attention to when programming asynchronously is error handling (this is not specific to the driver). When all your callback does is a side effect, it’s easy to accidentally swallow an exception:

CompletionStage<AsyncResultSet> responseStage =
    sessionStage.thenCompose(
        session -> session.executeAsync("SELECT release_version FROM system.local"));
responseStage.thenAccept(
    resultSet -> {
      String version = resultSet.one().getString(0);
      System.out.printf("Server version: %s%n", version);
    });

If the request fails, responseStage is failed, and thenAccept doesn’t run the callback at all (it just returns another failed stage). The error won’t be surfaced anywhere, just silently ignored. One way to address this is with whenComplete, which explicitly handles the error:

responseStage.whenComplete(
    (resultSet, error) -> {
      if (error != null) {
        System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
      } else {
        String version = resultSet.one().getString(0);
        System.out.printf("Server version: %s%n", version);
      }
    });

Or you can chain more operations on the result of printStage, and handle the error further down the chain:

CompletionStage<Void> printStage =
    responseStage.thenAccept(
        resultSet -> {
          String version = resultSet.one().getString(0);
          System.out.printf("Server version: %s%n", version);
        });
// Here trivially handled right away for the sake of example, but could be after more operations:
printStage.exceptionally(error -> {
  System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
  return null;
});

One more subtle source for errors is if the callback itself throws:

responseStage.whenComplete(
    (resultSet, error) -> {
      if (error != null) {
        System.out.printf("Request failed: %s%n", error.getMessage());
      } else {
        int v = resultSet.one().getInt(0);
        System.out.printf("The result is %f%n", 1.0 / v);
      }
    });

There is a potential division by zero on the last line; the resulting ArithmeticException wouldn’t be handled anywhere. Either add a try/catch block in the callback, or don’t ignore the result of whenComplete.

Asynchronous paging¶

Unlike previous versions of the driver, the asynchronous API never triggers synchronous behavior, even when iterating through the results of a request. session.executeAsync returns a dedicated AsyncResultSet that only iterates the current page, the next pages must be fetched explicitly. This greatly simplifies asynchronous paging; see the paging documentation for more details and an example.

PREVIOUS
Address resolution
NEXT
Authentication
  • 4.11.1.x
    • 4.13.0.x
    • 4.12.0.x
    • 4.11.1.x
    • 4.10.0.x
    • 4.7.2.x
    • 3.11.2.x
    • 3.11.0.x
    • 3.10.2.x
    • 3.7.2.x
  • Java Driver for Scylla and Apache Cassandra®
  • API Documentation
  • Manual
    • API conventions
    • Case sensitivity
    • Core driver
      • Address resolution
      • Asynchronous programming
      • Authentication
      • Bill of Materials (BOM)
      • Compression
      • Configuration
        • Reference configuration
      • Control connection
      • Custom codecs
      • Detachable types
      • Query idempotence
      • Integration
      • Load balancing
      • Logging
      • Metadata
        • Node metadata
        • Schema metadata
        • Token metadata
      • Metrics
      • Native protocol
      • Non-blocking programming
      • Paging
      • Performance
      • Connection pooling
      • Query timestamps
      • Reactive Style Programming
      • Reconnection
      • Request tracker
      • Retries
      • Using the shaded JAR
      • Speculative query execution
      • SSL
      • Statements
        • Batch statements
        • Per-query keyspace
        • Prepared statements
        • Simple statements
      • Temporal types
      • Request throttling
      • Query tracing
      • Tuples
      • User-defined types
    • Developer docs
      • Administrative tasks
      • Common infrastructure
        • Concurrency
        • Driver context
        • Event bus
      • Native protocol layer
      • Netty pipeline
      • Request execution
    • Mapper
      • Integration
        • Kotlin
        • Lombok
        • Java 14 Records
        • Scala
      • DAOs
        • Custom result types
        • Delete methods
        • GetEntity methods
        • Increment methods
        • Insert methods
        • Null saving strategy
        • Query methods
        • Query provider methods
        • Select methods
        • SetEntity methods
        • Statement attributes
        • Update methods
      • Entities
      • Mapper interface
    • OSGi
    • Query builder
      • Conditions
      • DELETE
      • Idempotence in the query builder
      • INSERT
      • Relations
      • Schema builder
        • Aggregate
        • Function
        • Index
        • Keyspace
        • Materialized View
        • Table
        • Type
      • SELECT
      • Terms
      • TRUNCATE
      • UPDATE
  • Upgrade guide
  • Frequently asked questions
  • Changelog
  • Create an issue
  • Edit this page

On this page

  • Asynchronous programming
    • Quick overview
    • Threading model
    • Error propagation
    • Asynchronous paging
Logo
Docs Contact Us About Us
Mail List Icon Slack Icon
© 2022, ScyllaDB. All rights reserved.
Last updated on 25 May 2022.
Powered by Sphinx 4.3.2 & ScyllaDB Theme 1.2.2