Getting started with Capn'proto RPC for Rust
Introduction
Capn'proto is a data interchange format and RPC system. Unlike protocol buffers (coupled with its rpc system: grpc), there are no encoding/decoding steps. You can find more details on the Capn'proto website. In this article we will focus on the RPC part of Capn'proto with Rust.
In this quick and silly example, you will setup a new Rust project and create a very simple client/server program with Capn'proto and Remote Procedure Call.
I assume that you have a recent version of Rust and Cargo installed on your machine.
Setup
Install capn'proto
If you are using macOS, you can simply use brew:
brew install capnproto
Otherwise if using Linux, you should be able to use your preferred package manager to install capnproto.
Note: You can find it on AUR for Arch Linux
Creating our project
In the directory of your choice, type:
cargo new --bin hello
This will create a new Rust binary project managed by Cargo. The project structure of a binary project should be as follows:
src
| main.rs
Cargo.toml
.gitignore
Setting up Cargo with Capn'proto
In your Cargo.toml
, include:
[package]
name = "hello"
version = "0.1.0"
authors = ["Name Surname <name@surname.com>"]
[build-dependencies]
capnpc = "0.7.2"
[dependencies]
capnp = "0.7.4"
capnp-rpc = "0.7.4"
gj = "0.2"
gjio = "0.1"
We include the build dependencies we need to compile our .capnp
files under [build-dependencies]
. Under [dependencies]
we include the capnp
data interchange crate, the capnp-rpc
for the rpc capabilities and [gj, gjio]
for the event loop and promises.
Try to build the project:
cargo build
At a later point you can also update your dependencies with:
cargo update
This should complete successfully. Down to the next step.
A simple RPC call
Interface Definition
It is time to create the Hello
interface. Capn'proto uses a Schema language quite similar to grpc while being more concise.
First step is to generate a Unique ID for our capnp file with:
capnp id
This should return an id of the form:
@0xad0c6329ab7dd53a
Now, in your project src
, create a file named hello.capnp
. In this file we'll add the following interface definition:
@0xad0c6329ab7dd53a;
interface Hello {
hello @0 (name: Text) -> (response: Text);
}
The example is trivial, the hello
method takes a name
as a parameter and returns a response
.
Compiling the schema file on build
Much like grpc, you can directly invoke the capnp compile
command to compile the schema file. We are going to adopt a different approach and directly invoke the compile step at build time with cargo build
.
In order to hint cargo to compile the hello.capnp
file at build time, create a new build.rs
file under the root of the project and include:
extern crate capnpc;
fn main() {
::capnpc::CompilerCommand::new()
.src_prefix("src")
.file("src/hello.capnp")
.run()
.unwrap();
}
This calls the compiler command on the hello.capnp
file.
Now in Cargo.toml
, add the following line under authors
:
build = "build.rs"
This will trigger special build steps in build.rs
when calling cargo build
.
cargo build
should again, pass successfully.
Creating our main function
Let's edit the main.rs
file and include our first piece of code for our hello example:
extern crate capnp;
extern crate capnp_rpc;
#[macro_use]
extern crate gj;
extern crate gjio;
pub mod hello_capnp {
include!(concat!(env!("OUT_DIR"), "/hello_capnp.rs"));
}
pub mod client;
pub mod server;
fn main() {
let args : Vec<String> = ::std::env::args().collect();
if args.len() >= 2 {
match &args[1][..] {
"client" => return client::main(),
"server" => return server::main(),
_ => ()
}
}
}
This is our main file, the entry point for our program. We start by referencing the capnproto crates with extern crate <some-crate>
. Then we reference the compiled capnp code that is located in OUT_DIR
(generated folder at build time with capnp::CompilerCommand
located in the target/debug/build
directory).
Note that we import a module called hello_capnp
by including the content of the hello_capnp.rs
file (located in target/debug/build/hello-XXX
).
We can now use the module and start implementing the server with the hello::Server
Trait.
The Hello Server
Implement the Server Trait
Let's first take a look at the hello::Server
trait in our generated code (you can find it under target/debug/build/hello-XXX
and quick search):
[...]
pub trait Server<> {
fn hello(&mut self, _: HelloParams<>, _: HelloResults<>)
-> ::capnp::capability::Promise<(), ::capnp::Error> {
::capnp::capability::Promise::err(
::capnp::Error::unimplemented(
"method not implemented".to_string()
)
)
}
}
[...]
The Trait defines a function signature for hello
. It looks like we need to handle the parameters of the requests (HelloParams
) as well as the results returned to the client (HelloResults
). We also need to return what is called a Promise
.
A Promise
allows for the RPC system to be asynchronous, meaning that the client does not have to wait synchronously for an answer but can collect it at any moment in the future when deemed necessary. You still have the choice to wait synchronously though (as our example will later show). This comes handy for many usage. For example, the server could itself issue many RPC calls to other servers and collect the results all at once at some point in the future.
That said, let's jump onto the next step. Under src
, create a file called server.rs
. We'll start by creating an HelloImpl
that implements hello::Server
:
use capnp_rpc::{RpcSystem, twoparty, rpc_twoparty_capnp};
use hello_capnp::{hello};
use gj::{EventLoop, Promise, TaskReaper, TaskSet};
struct HelloImpl;
impl hello::Server for HelloImpl {
fn hello(
&mut self,
params: hello::HelloParams,
mut results: hello::HelloResults,
) -> Promise<(), ::capnp::Error>
{
println!("received a request for greetings!");
let greeting: &str = "Hello ";
let name: &str = pry!(pry!(params.get()).get_name());
let response = format!("{}{}!", greeting, name);
results.get().set_response(&response);
Promise::ok(())
}
}
In the code above, we implement the hello::Server
Trait for HelloImpl
and define the hello
function which is going to be remotely called by the client. In this function we build the response and set the field into results
to return it to the client. If we don't set the result, the client would get nothing back from remotely calling hello
. Then we call Promise::ok()
to fulfill our promise.
Notice the pry!
for getting the name
parameter passed to the function? It is like try!()
, but for functions that return a Promise<T, E>
rather than a Result<T, E>
. What this does is unwrapping the Result<T, E>
from params.get()
but in error cases, returns Promise::err(e)
instead of Result::err(e)
because our function returns a Promise
. Same thing applies for get_name()
.
Next step is to implement an accept_loop
which listens to the socket, and then creates a new "Task
" and add it to a TaskSet
on accepting a connection. The TaskSet
basically holds a collection of Promise<T, E>
and ensures that these are reaching completion.
pub fn accept_loop(
listener: ::gjio::SocketListener,
mut task_set: TaskSet<(), ::capnp::Error>,
helloc: hello::Client,
) -> Promise<(), ::std::io::Error>
{
listener.accept().then(move |stream| {
let mut network = twoparty::VatNetwork::new(
stream.clone(),
stream,
rpc_twoparty_capnp::Side::Server,
Default::default(),
);
let disconnect_promise = network.on_disconnect();
let rpc_system = RpcSystem::new(Box::new(network), Some(helloc.clone().client));
task_set.add(disconnect_promise.attach(rpc_system));
accept_loop(listener, task_set, helloc)
})
}
struct Reaper;
impl TaskReaper<(), ::capnp::Error> for Reaper {
fn task_failed(&mut self, error: ::capnp::Error) {
println!("Task failed: {}", error);
}
}
We also implement a TaskReaper
, which are just callbacks invoked when a task finishes. In our case, we just handle failed tasks.
Note: You can also handle successful task completions by implementing:
fn task_succeeded(&mut self, _value: T)
Finally, let's implement the main
function:
pub fn main()
{
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 3 {
println!("usage: {} server ADDRESS[:PORT]", args[0]);
return;
}
EventLoop::top_level(move |wait_scope| -> Result<(), ::capnp::Error> {
use std::net::ToSocketAddrs;
let mut event_port = ::gjio::EventPort::new()?;
let network = event_port.get_network();
let addr = args[2].to_socket_addrs()?.next().expect(
"could not parse address",
);
let mut address = network.get_tcp_address(addr);
let listener = address.listen()?;
let hello_server = hello::ToClient::new(HelloImpl).from_server::<::capnp_rpc::Server>();
let task_set = TaskSet::new(Box::new(Reaper));
accept_loop(listener, task_set, hello_server).wait(
wait_scope,
&mut event_port
)?;
Ok(())
}).expect("top level error");
}
We first grab our arguments from the CLI and jump onto the Event Loop. We thus proceed to create three objects:
- A
Listener
, used to handle incoming connections. - A
TaskSet
, used to hold a collection ofPromise<T, E>
and ensures their completion. - A
Server
->HelloImpl
, which contains our implementation ofhello::Server
Then we call accept_loop
passing those objects as parameters.
The Client
For the client, this is just a matter of creating an hello::Client
, connecting to the server, and issuing the remote call to get our result:
use capnp_rpc::{RpcSystem, twoparty, rpc_twoparty_capnp};
use hello_capnp::{hello};
use gj::{EventLoop, Promise};
pub fn main()
{
let args: Vec<String> = ::std::env::args().collect();
if args.len() != 4 {
println!("usage: {} client HOST:PORT NAME", args[0]);
return;
}
EventLoop::top_level(move |wait_scope| -> Result<(), ::capnp::Error> {
use std::net::ToSocketAddrs;
let mut event_port = ::gjio::EventPort::new()?;
let network = event_port.get_network();
let addr = args[2].to_socket_addrs()?.next().expect(
"could not parse address",
);
let address = network.get_tcp_address(addr);
let stream = address.connect().wait(wait_scope, &mut event_port)?;
let network = Box::new(twoparty::VatNetwork::new(
stream.clone(),
stream,
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(network, None);
let hello: hello::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
let mut request = hello.hello_request();
request.get().set_name(&args[3]);
let _result = request
.send()
.promise
.then(|response| {
let response = pry!(pry!(response.get()).get_response());
println!("{}", response);
Promise::ok(())
})
.wait(wait_scope, &mut event_port);
Ok(())
}).expect("top level error");
}
The code above initiates the connection with the Server and creates an hello::Client
.
Now let's focus on this portion:
let _result = request
.send()
.promise
.then(|response| {
let response = pry!(pry!(response.get()).get_response());
println!("{}", response);
Promise::ok(())
})
.wait(wait_scope, &mut event_port);
A lot is happening there. In our case, we call the send()
method on request
to remotely call hello()
on the server. What send()
does is returning a Promise<T, E>
. Now the problem is that we want to do something with the response
received from the server.
Using then()
, we can handle the result of the execution with a callback when the promise is fulfilled. It takes the promised result as a parameter for immediate computation or returning a new result.
Finally, wait()
blocks for the promise to be fulfilled. Without wait()
, the function would exit and close the connection prematurely without waiting for the result.
Hello Stranger!
It's time to test our program. First check that your project builds successfully with cargo build
.
Open two shells and cd
in the target/debug
directory, to start the server:
./hello server 127.0.0.1:6578
For the client:
./hello client 127.0.0.1:6578 Bob
You should see the following result:
On the Client:
Hello Bob!
On the Server:
received request for greetings!
Sources
You can find the sources for this example on this GitHub repository.
For more advanced examples on how to use Capn'proto and RPC, see these Capn'proto rpc examples.
abronan Newsletter
Join the newsletter to receive the latest updates in your inbox.