Skip to content

Getting started with Capn'proto RPC for Rust

Alexandre Beslic
7 min read
Getting started with Capn'proto RPC for Rust

Introduction

Capn'proto is a data interchange format and RPC system. Unlike protocol buffers (coupled with its rpc system: grpc), there are no encoding/decoding steps. You can find more details on the Capn'proto website. In this article we will focus on the RPC part of Capn'proto with Rust.

In this quick and silly example, you will setup a new Rust project and create a very simple client/server program with Capn'proto and Remote Procedure Call.

I assume that you have a recent version of Rust and Cargo installed on your machine.


Setup

Install capn'proto

If you are using macOS, you can simply use brew:

brew install capnproto

Otherwise if using Linux, you should be able to use your preferred package manager to install capnproto.

Note: You can find it on AUR for Arch Linux

Creating our project

In the directory of your choice, type:

cargo new --bin hello

This will create a new Rust binary project managed by Cargo. The project structure of a binary project should be as follows:

src
 | main.rs
Cargo.toml
.gitignore

Setting up Cargo with Capn'proto

In your Cargo.toml, include:

[package]
name = "hello"
version = "0.1.0"
authors = ["Name Surname <name@surname.com>"]

[build-dependencies]
capnpc = "0.7.2"

[dependencies]
capnp = "0.7.4"
capnp-rpc = "0.7.4"
gj = "0.2"
gjio = "0.1"

We include the build dependencies we need to compile our .capnp files under [build-dependencies]. Under [dependencies] we include the capnp data interchange crate, the capnp-rpc for the rpc capabilities and [gj, gjio] for the event loop and promises.

Try to build the project:

cargo build

At a later point you can also update your dependencies with:

cargo update

This should complete successfully. Down to the next step.


A simple RPC call

Interface Definition

It is time to create the Hello interface. Capn'proto uses a Schema language quite similar to grpc while being more concise.

First step is to generate a Unique ID for our capnp file with:

capnp id

This should return an id of the form:

@0xad0c6329ab7dd53a

Now, in your project src, create a file named hello.capnp. In this file we'll add the following interface definition:

@0xad0c6329ab7dd53a;

interface Hello {
  hello @0 (name: Text) -> (response: Text);
}

The example is trivial, the hello method takes a name as a parameter and returns a response.

Compiling the schema file on build

Much like grpc, you can directly invoke the capnp compile command to compile the schema file. We are going to adopt a different approach and directly invoke the compile step at build time with cargo build.

In order to hint cargo to compile the hello.capnp file at build time, create a new build.rs file under the root of the project and include:

extern crate capnpc;

fn main() {
    ::capnpc::CompilerCommand::new()
        .src_prefix("src")
        .file("src/hello.capnp")
        .run()
        .unwrap();
}

This calls the compiler command on the hello.capnp file.

Now in Cargo.toml, add the following line under authors:

build = "build.rs"

This will trigger special build steps in build.rs when calling cargo build.

cargo build should again, pass successfully.


Creating our main function

Let's edit the main.rs file and include our first piece of code for our hello example:

extern crate capnp;
extern crate capnp_rpc;

#[macro_use]
extern crate gj;
extern crate gjio;

pub mod hello_capnp {
    include!(concat!(env!("OUT_DIR"), "/hello_capnp.rs"));
}

pub mod client;
pub mod server;

fn main() {
    let args : Vec<String> = ::std::env::args().collect();
    if args.len() >= 2 {
        match &args[1][..] {
            "client" => return client::main(),
            "server" => return server::main(),
            _ => ()
        }
    }
}

This is our main file, the entry point for our program. We start by referencing the capnproto crates with extern crate <some-crate>. Then we reference the compiled capnp code that is located in OUT_DIR (generated folder at build time with capnp::CompilerCommand located in the target/debug/build directory).

Note that we import a module called hello_capnp by including the content of the hello_capnp.rs file (located in target/debug/build/hello-XXX).

We can now use the module and start implementing the server with the hello::Server Trait.


The Hello Server

Implement the Server Trait

Let's first take a look at the hello::Server trait in our generated code (you can find it under target/debug/build/hello-XXX and quick search):

[...]

pub trait Server<>  {
    fn hello(&mut self, _: HelloParams<>, _: HelloResults<>) 
    -> ::capnp::capability::Promise<(), ::capnp::Error> { 
        ::capnp::capability::Promise::err(
            ::capnp::Error::unimplemented(
                "method not implemented".to_string()
            )
        )
    }
}

[...]

The Trait defines a function signature for hello. It looks like we need to handle the parameters of the requests (HelloParams) as well as the results returned to the client (HelloResults). We also need to return what is called a Promise.

A Promise allows for the RPC system to be asynchronous, meaning that the client does not have to wait synchronously for an answer but can collect it at any moment in the future when deemed necessary. You still have the choice to wait synchronously though (as our example will later show). This comes handy for many usage. For example, the server could itself issue many RPC calls to other servers and collect the results all at once at some point in the future.

That said, let's jump onto the next step. Under src, create a file called server.rs. We'll start by creating an HelloImpl that implements hello::Server:

use capnp_rpc::{RpcSystem, twoparty, rpc_twoparty_capnp};
use hello_capnp::{hello};

use gj::{EventLoop, Promise, TaskReaper, TaskSet};

struct HelloImpl;

impl hello::Server for HelloImpl {
    fn hello(
        &mut self,
        params: hello::HelloParams,
        mut results: hello::HelloResults,
    ) -> Promise<(), ::capnp::Error>
    {

        println!("received a request for greetings!");

        let greeting: &str = "Hello ";
        let name: &str = pry!(pry!(params.get()).get_name());
        let response = format!("{}{}!", greeting, name);

        results.get().set_response(&response);

        Promise::ok(())
    }
}

In the code above, we implement the hello::Server Trait for HelloImpl and define the hello function which is going to be remotely called by the client. In this function we build the response and set the field into results to return it to the client. If we don't set the result, the client would get nothing back from remotely calling hello. Then we call Promise::ok() to fulfill our promise.

Notice the pry! for getting the name parameter passed to the function? It is like try!(), but for functions that return a Promise<T, E> rather than a Result<T, E>. What this does is unwrapping the Result<T, E> from params.get() but in error cases, returns Promise::err(e) instead of Result::err(e) because our function returns a Promise. Same thing applies for get_name().


Next step is to implement an accept_loop which listens to the socket, and then creates a new "Task" and add it to a TaskSet on accepting a connection. The TaskSet basically holds a collection of Promise<T, E> and ensures that these are reaching completion.

pub fn accept_loop(
    listener: ::gjio::SocketListener,
    mut task_set: TaskSet<(), ::capnp::Error>,
    helloc: hello::Client,
) -> Promise<(), ::std::io::Error>
{
    listener.accept().then(move |stream| {
        let mut network = twoparty::VatNetwork::new(
            stream.clone(),
            stream,
            rpc_twoparty_capnp::Side::Server,
            Default::default(),
        );

        let disconnect_promise = network.on_disconnect();

        let rpc_system = RpcSystem::new(Box::new(network), Some(helloc.clone().client));

        task_set.add(disconnect_promise.attach(rpc_system));
        accept_loop(listener, task_set, helloc)
    })
}

struct Reaper;

impl TaskReaper<(), ::capnp::Error> for Reaper {
    fn task_failed(&mut self, error: ::capnp::Error) {
        println!("Task failed: {}", error);
    }
}

We also implement a TaskReaper, which are just callbacks invoked when a task finishes. In our case, we just handle failed tasks.

Note: You can also handle successful task completions by implementing:

fn task_succeeded(&mut self, _value: T)


Finally, let's implement the main function:

pub fn main()
{
    let args: Vec<String> = ::std::env::args().collect();
    if args.len() != 3 {
        println!("usage: {} server ADDRESS[:PORT]", args[0]);
        return;
    }

    EventLoop::top_level(move |wait_scope| -> Result<(), ::capnp::Error> {
        use std::net::ToSocketAddrs;
        let mut event_port = ::gjio::EventPort::new()?;
        let network = event_port.get_network();

        let addr = args[2].to_socket_addrs()?.next().expect(
            "could not parse address",
        );

        let mut address = network.get_tcp_address(addr);
        let listener = address.listen()?;

        let hello_server = hello::ToClient::new(HelloImpl).from_server::<::capnp_rpc::Server>();

        let task_set = TaskSet::new(Box::new(Reaper));

        accept_loop(listener, task_set, hello_server).wait(
            wait_scope,
            &mut event_port
        )?;

        Ok(())
    }).expect("top level error");
}

We first grab our arguments from the CLI and jump onto the Event Loop. We thus proceed to create three objects:

  • A Listener, used to handle incoming connections.
  • A TaskSet, used to hold a collection of Promise<T, E> and ensures their completion.
  • A Server -> HelloImpl, which contains our implementation of hello::Server

Then we call accept_loop passing those objects as parameters.


The Client

For the client, this is just a matter of creating an hello::Client, connecting to the server, and issuing the remote call to get our result:

use capnp_rpc::{RpcSystem, twoparty, rpc_twoparty_capnp};
use hello_capnp::{hello};

use gj::{EventLoop, Promise};

pub fn main()
{
    let args: Vec<String> = ::std::env::args().collect();
    if args.len() != 4 {
        println!("usage: {} client HOST:PORT NAME", args[0]);
        return;
    }

    EventLoop::top_level(move |wait_scope| -> Result<(), ::capnp::Error> {
        use std::net::ToSocketAddrs;
        let mut event_port = ::gjio::EventPort::new()?;
        let network = event_port.get_network();

        let addr = args[2].to_socket_addrs()?.next().expect(
            "could not parse address",
        );

        let address = network.get_tcp_address(addr);
        let stream = address.connect().wait(wait_scope, &mut event_port)?;

        let network = Box::new(twoparty::VatNetwork::new(
            stream.clone(),
            stream,
            rpc_twoparty_capnp::Side::Client,
            Default::default(),
        ));

        let mut rpc_system = RpcSystem::new(network, None);
        let hello: hello::Client = rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);

        let mut request = hello.hello_request();
        request.get().set_name(&args[3]);

        let _result = request
            .send()
            .promise
            .then(|response| {
                let response = pry!(pry!(response.get()).get_response());
                println!("{}", response);
                Promise::ok(())
            })
            .wait(wait_scope, &mut event_port);

        Ok(())
    }).expect("top level error");
}

The code above initiates the connection with the Server and creates an hello::Client.

Now let's focus on this portion:

let _result = request
        .send()
        .promise
        .then(|response| {
            let response = pry!(pry!(response.get()).get_response());
            println!("{}", response);
            Promise::ok(())
        })
        .wait(wait_scope, &mut event_port);

A lot is happening there. In our case, we call the send() method on request to remotely call hello() on the server. What send() does is returning a Promise<T, E>. Now the problem is that we want to do something with the response received from the server.

Using then(), we can handle the result of the execution with a callback when the promise is fulfilled. It takes the promised result as a parameter for immediate computation or returning a new result.

Finally, wait() blocks for the promise to be fulfilled. Without wait(), the function would exit and close the connection prematurely without waiting for the result.


Hello Stranger!

It's time to test our program. First check that your project builds successfully with cargo build.

Open two shells and cd in the target/debug directory, to start the server:

./hello server 127.0.0.1:6578

For the client:

./hello client 127.0.0.1:6578 Bob

You should see the following result:

On the Client:

Hello Bob!

On the Server:

received request for greetings!

Sources

You can find the sources for this example on this GitHub repository.

For more advanced examples on how to use Capn'proto and RPC, see these Capn'proto rpc examples.

rpcrustcapnprotocargo

Comments


Related Posts

Members Public

Building a Rust project on CircleCI

While Travis supports Rust natively for its build pipeline, CircleCI still misses first-class support for Rust. This short post explains how to build a Rust project on CircleCI so you don't have to go through all the trouble. We'll be using CircleCI 2.0, which comes

Members Public

Rust, Builder Pattern, Trait Objects, Box<T> and Rc<T>

One of the intimidating parts of learning Rust is to master all the basic container types: Box<T>, Rc<T>, Arc<T>, RefCell<T>, Mutex<T>, etc. The least we can say is that they are not really intuitive to