0.20 — streaming with backpressure

17 Sep 2024

Version 0.20 of capnproto-rust is now available on crates.io. In this release, the library has new built-in support for streaming.

the stream keyword

Suppose we have the following interface defined in a capnp schema file:

interface ByteSink {
  # A destination for a sequence of bytes.

  write @0 (chunk :Data);
  # Writes a chunk of bytes.

  end @1 ();
  # Indicates that no more chunks will be written.
}

In Rust code, we can invoke methods on a (possibly-remote) ByteSink object through handles of type byte_sink::Client. For example:

/// Writes all of `bytes` to `sink` in a single call to `write()`.
async fn write_bytes_all_at_once(
    bytes: &[u8],
    sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
    // Construct an RPC request for ByteSink.write().
    let mut request = sink.write_request();

    // Copy all of the bytes into the request.
    request.get().set_chunk(bytes);

    // Send the request and wait for the response.
    request.send().promise.await?;

    // Send a ByteSink::end() request and wait for the response.
    sink.end_request().send().promise.await?;

    Ok(())
}

However, if bytes is too large, we might observe congestion, spikes in memory usage, or an error like this:

remote exception: Failed: Message has 8750020 words, which is too large.

Fortunately, the ByteSink interface lets us avoid such problems. We can split the bytes into chunks over multiple ByteSink.write() calls:

const CHUNK_SIZE: usize = 8192;

/// Writes all of `bytes` to `sink` in chunks of size `CHUNK_SIZE`.
async fn write_bytes_chunked(
    mut bytes: &[u8],
    sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
    // Loop until all bytes have been written.
    while bytes.len() > 0 {
        // Compute the end index.
        let end = usize::min(CHUNK_SIZE, bytes.len());

        // Construct the write() request.
        let mut request = sink.write_request();
        request.get().set_chunk(&bytes[..end]);

        // Send the request and wait for a response.
        request.send().promise.await?;

        // Update `bytes` to point to the remaining bytes.
        bytes = &bytes[end..];
    }

    // Send `ByteSink.end()`.
    sink.end_request().send().promise.await?;

    Ok(())
}

Now the bytes should transfer much more smoothly, but we’ve potentially introduced a new problem: latency. After sending each ByteSink.write() request, we’re waiting for the server to send back its response before we start the next request. Depending on the amount of time it takes for messages to travel between the client and server, waiting like that could slow things down considerably!

Here’s a naive, bad way to avoid such latency:

/// Like write_bytes_chunked(), but sends all write() requests
/// immediately.
async fn write_bytes_chunked_in_parallel(
    mut bytes: &[u8],
    sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
    // Construct an object that will manage multiple futures
    // executing at once.
    let mut responses = FuturesOrdered::new();

    // Loop until all writes have been enqueued.
    while bytes.len() > 0 {
        let end = usize::min(CHUNK_SIZE, bytes.len());

        // Construct the write() request.
        let mut request = sink.write_request();
        request.get().set_chunk(&bytes[..end]);

        // Send the write() request and collect its response
        // future (but don't wait on it yet).
        responses.push_back(request.send().promise);

        bytes = &bytes[end..];
    }
    // Wait for all of the responses.
    while let Some(x) = responses.next().await {
        x?;
    }

    // Send `ByteSink.end()`.
    sink.end_request().send().promise.await?;

    Ok(())
}

The problem is that, because we immediately send all the requests, now we are back to causing memory spikes and congestion, even if each individual message is small.

We need to add some way to limit the number of in-flight requests. That’s going to make the code significantly more complicated! Perhaps you’d like to try it as an exercise?

The good news is that capnproto-rust can now automatically perform such bookkeeping, and our code can remain simple. To enable the new functionality, we define our method with the stream keyword:

interface StreamingByteSink {
   write @0 (chunk :Data) -> stream;
   # Writes a chunk of bytes.

   end @1 ();
   # Indicates that no more chunks will be written.
}

Now we can write our code in the following direct style:


/// Like write_bytes_chunked(), but the library
/// automatically handles backpressure.
async fn write_bytes_streaming(
    mut bytes: &[u8],
    sink: streaming_byte_sink::Client,
) -> Result<(), capnp::Error> {
    // Loop until all bytes have been enqueued.
    while bytes.len() > 0 {
        let end = usize::min(CHUNK_SIZE, bytes.len());

        // Construct the write() request.
        let mut request = sink.write_request();
        request.get().set_chunk(&bytes[..end]);

        // Send the request and wait until the RPC system determines
        // that we are clear to send another chunk (which is typically
        // *before* the response has returned).
        request.send().await?;

        bytes = &bytes[end..];
    }

    // The end() method is not streaming. It does not return until all of
    // the streaming calls have completed. If any streaming call triggered
    // an error, that error will be returned here.
    sink.end_request().send().promise.await?;

    Ok(())
}

Behind the scenes, capnproto-rust maintains a limit on how many bytes are in flight for this object at any given time. Currently it uses a fixed value that can be configured via twoparty::VatNetwork::set_window_size(). In the future, the library could potentially add sophisticated dynamic flow control to optimize throughput, without requiring any code change from users.

implementing streaming methods

To create an instance of a StreamingByteSink object in Rust, we need to implement the byte_sink_streaming::Server trait:

impl byte_sink_streaming::Server for StreamingByteSinkImpl {
    fn write(
        &mut self,
        params: byte_sink_streaming::WriteParams
    ) -> Promise<(), Error> {
       todo!()
    }

    fn end(
        &mut self,
        params: byte_sink_streaming::EndParams,
        results: byte_sink_streaming::EndResults,
    ) -> Promise<(), Error> {
        todo!()
    }

}

Because it was declared with -> stream, the write() method does not have a Results parameter, unlike end() and other non-streaming methods. Streaming methods never have any values to return.

difference from capnproto-c++

In the C++ implementation, streaming method calls are delivered one-by-one to the implementing object, with no overlap. This is intended “as a convenience” and to make it easier to interface with kj::AsyncOutputStream objects. In Rust, async I/O objects are rather different and enjoy good type system support for preventing misuse, so I opted not to implement the one-by-one delivery guarantee in capnproto-rust.

example

For a working example, see the capnp-rpc/examples/streaming directory.

-- posted by dwrensha

capnproto-rust on github
more posts