17 Sep 2024
Version 0.20 of capnproto-rust is now available on crates.io. In this release, the library has new built-in support for streaming.
stream
keywordSuppose we have the following interface defined in a capnp schema file:
interface ByteSink {
# A destination for a sequence of bytes.
write @0 (chunk :Data);
# Writes a chunk of bytes.
end @1 ();
# Indicates that no more chunks will be written.
}
In Rust code, we can invoke methods on a (possibly-remote) ByteSink
object
through handles of type byte_sink::Client
. For example:
/// Writes all of `bytes` to `sink` in a single call to `write()`.
async fn write_bytes_all_at_once(
bytes: &[u8],
sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
// Construct an RPC request for ByteSink.write().
let mut request = sink.write_request();
// Copy all of the bytes into the request.
request.get().set_chunk(bytes);
// Send the request and wait for the response.
request.send().promise.await?;
// Send a ByteSink::end() request and wait for the response.
sink.end_request().send().promise.await?;
Ok(())
}
However, if bytes
is too large, we might
observe congestion, spikes in memory usage, or
an error like this:
remote exception: Failed: Message has 8750020 words, which is too large.
Fortunately, the ByteSink
interface lets us avoid such problems. We can split
the bytes into chunks over multiple ByteSink.write()
calls:
const CHUNK_SIZE: usize = 8192;
/// Writes all of `bytes` to `sink` in chunks of size `CHUNK_SIZE`.
async fn write_bytes_chunked(
mut bytes: &[u8],
sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
// Loop until all bytes have been written.
while bytes.len() > 0 {
// Compute the end index.
let end = usize::min(CHUNK_SIZE, bytes.len());
// Construct the write() request.
let mut request = sink.write_request();
request.get().set_chunk(&bytes[..end]);
// Send the request and wait for a response.
request.send().promise.await?;
// Update `bytes` to point to the remaining bytes.
bytes = &bytes[end..];
}
// Send `ByteSink.end()`.
sink.end_request().send().promise.await?;
Ok(())
}
Now the bytes should transfer much more smoothly,
but we’ve potentially introduced a new problem: latency.
After sending each ByteSink.write()
request,
we’re waiting for the server to send back its response
before we start the next request.
Depending on the amount of time it takes for messages
to travel between the client and server,
waiting like that could slow things down considerably!
Here’s a naive, bad way to avoid such latency:
/// Like write_bytes_chunked(), but sends all write() requests
/// immediately.
async fn write_bytes_chunked_in_parallel(
mut bytes: &[u8],
sink: byte_sink::Client,
) -> Result<(), capnp::Error> {
// Construct an object that will manage multiple futures
// executing at once.
let mut responses = FuturesOrdered::new();
// Loop until all writes have been enqueued.
while bytes.len() > 0 {
let end = usize::min(CHUNK_SIZE, bytes.len());
// Construct the write() request.
let mut request = sink.write_request();
request.get().set_chunk(&bytes[..end]);
// Send the write() request and collect its response
// future (but don't wait on it yet).
responses.push_back(request.send().promise);
bytes = &bytes[end..];
}
// Wait for all of the responses.
while let Some(x) = responses.next().await {
x?;
}
// Send `ByteSink.end()`.
sink.end_request().send().promise.await?;
Ok(())
}
The problem is that, because we immediately send all the requests, now we are back to causing memory spikes and congestion, even if each individual message is small.
We need to add some way to limit the number of in-flight requests. That’s going to make the code significantly more complicated! Perhaps you’d like to try it as an exercise?
The good news is that capnproto-rust can now automatically
perform such bookkeeping, and our code can remain simple.
To enable the new functionality, we define our
method with the stream
keyword:
interface StreamingByteSink {
write @0 (chunk :Data) -> stream;
# Writes a chunk of bytes.
end @1 ();
# Indicates that no more chunks will be written.
}
Now we can write our code in the following direct style:
/// Like write_bytes_chunked(), but the library
/// automatically handles backpressure.
async fn write_bytes_streaming(
mut bytes: &[u8],
sink: streaming_byte_sink::Client,
) -> Result<(), capnp::Error> {
// Loop until all bytes have been enqueued.
while bytes.len() > 0 {
let end = usize::min(CHUNK_SIZE, bytes.len());
// Construct the write() request.
let mut request = sink.write_request();
request.get().set_chunk(&bytes[..end]);
// Send the request and wait until the RPC system determines
// that we are clear to send another chunk (which is typically
// *before* the response has returned).
request.send().await?;
bytes = &bytes[end..];
}
// The end() method is not streaming. It does not return until all of
// the streaming calls have completed. If any streaming call triggered
// an error, that error will be returned here.
sink.end_request().send().promise.await?;
Ok(())
}
Behind the scenes, capnproto-rust maintains a limit on how many bytes
are in flight for this object at any given time.
Currently it uses a fixed value
that can be configured via
twoparty::VatNetwork::set_window_size()
.
In the future, the library could potentially add sophisticated dynamic
flow control to optimize throughput, without requiring
any code change from users.
To create an instance of a StreamingByteSink
object in Rust,
we need to implement the byte_sink_streaming::Server
trait:
impl byte_sink_streaming::Server for StreamingByteSinkImpl {
fn write(
&mut self,
params: byte_sink_streaming::WriteParams
) -> Promise<(), Error> {
todo!()
}
fn end(
&mut self,
params: byte_sink_streaming::EndParams,
results: byte_sink_streaming::EndResults,
) -> Promise<(), Error> {
todo!()
}
}
Because it was declared with -> stream
, the write()
method
does not have a Results
parameter,
unlike end()
and other non-streaming methods.
Streaming methods never have any values to return.
In the C++ implementation,
streaming method calls are delivered one-by-one
to the implementing object, with no overlap.
This is intended “as a convenience” and to
make it easier to interface with kj::AsyncOutputStream
objects.
In Rust, async I/O objects are rather different
and enjoy good type system support for preventing misuse,
so I opted not to implement the one-by-one delivery guarantee
in capnproto-rust.
For a working example, see the capnp-rpc/examples/streaming directory.