At Fly.io, we run a Rust-based load-balancer which handles almost all of our traffic. It stands on the shoulders of Tokio and Hyper. When the Tokio team announced 0.3 and then 1.0, we figured we’d have to upgrade sooner than later to access related crate upgrades. The Rust ecosystem moves pretty fast and we wanted to be able to keep our software up to date.
After a few weeks of patience, most of the ecosystem had already been
upgraded. I believe this was a well-coordinated move from the Tokio team,
making sure a good chunk of the ecosystem was going to be available
when (or soon after) they released 1.0. We use hyper,
reqwest,
async-compression,
tokio-io-timeout,
tokio-tungstenite and
sentry
in addition to tokio
, so we waited until they were ready.
Differences with tokio 0.2
Here are some notes about the Tokio upgrade. They’re by no means exhaustive.
No implementations of Stream
With Tokio 1.0, the team has decided to forego all Stream
implementations until its stabilization in the stdlib.
If you still want Stream
‘s, then you probably should try tokio-stream
. It implements Stream
for TcpListener
, Interval
and many more.
If not, most of the time, you can get around this change by looping:
let listener = tokio::net::TcpListener::bind("[::]:0").await?;
loop {
let conn = listener.accept().await?;
}
Mutable access requirements relaxed
In the previous code snippet, you might’ve noticed I don’t need a
mut
-able TcpListener
. You can now use more of the tokio API
without mutable access! That’s a welcome change: it reduces much of
the locking required in our proxy.
New AsyncRead
and AsyncWrite
traits
Notably, AsyncRead::poll_read
and AsyncWrite::poll_write
don’t return the number of bytes read/written anymore, just Result<()>
. A quick work around is this:
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let before = buf.filled().len();
// inner poll_read...
let nread = buf.filled().len() - before;
// ...
}
More pinning required
Futures likes Sleep
and Shutdown
are now explicitly !Unpin
. If you need to use them multiple times in a tokio::select!
(like we do, all the time), then you’ll need to either pin them on the heap with Box::pin
or pin them on the stack with tokio::pin!
. The documentation explains it in more detail.
let sleep = tokio::time::sleep(Duration::from_secs(1));
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => {
println!("timed out after 1 second!");
},
_ = &mut fut => {
println!("future finished");
}
}
}
runtime::Handle
doesn’t have block_on
(yet)
There are discussions around adding back Handle::block_on
, but for now, it’s left unimplemented.
We needed it for the following pattern:
handle.spawn_blocking(move || handle.block_on(future));
We ended up getting around that restriction by using Arc<Runtime>
since block_on
doesn’t require mutable access to Runtime
anymore.
let (tx, rx) = tokio::sync::oneshot();
thread::spawn(||{
let rt = Arc::new(tokio::runtime::Builder::new_current_thread().build().unwrap());
// share your Runtime somehow, we send a single-threaded runtime via a oneshot.
tx.send(rt.clone()).unwrap();
// Runtime::block_on does not require a `&mut self`
rt.block_on(async move {
// do your thing, asynchronously
// probably want to trigger the end of this runtime somehow (Signal, or other)
});
});
let rt = rx.await.unwrap();
// now you can use spawn_blocking and block_on
rt.spawn_blocking({
let rt = rt.clone();
move || {
rt.block_on(fut)
}
});
TcpListener::from_std
needs to be set to nonblocking
… or else you’ll be surprised that everything hangs when you start accepting connections.
For example, if you use socket2
to fashion a TcpListener
with a few more options, you’ll need to use tokio::net::TcpListener::from_std(std_listener)
. Before doing that, you’ll want to set_nonblocking(true)
on your std listener.
let sock = socket2::Socket::new(
socket2::Domain::ipv4(),
socket2::Type::stream(),
Some(socket2::Protocol::tcp()),
).unwrap();
sock.set_reuse_port(true).unwrap();
sock.set_nonblocking(true).unwrap(); // <---
sock.bind(&sock_addr).unwrap();
sock.listen(10240).unwrap();
tokio::net::TcpListener::from_std(sock.into_tcp_listener()).unwrap();
Miscellaneous API changes
tokio::time::delay_for
->tokio::time::sleep
tokio::sync::watch::Sender::broadcast
->tokio::sync::watch::Sender::send
Notify::notify
-> Notify::notify_waiters
Hyper WebSockets
WebSockets upgrades are now on the whole request/response instance,
not just Body
( body.on_upgrade()
vs
hyper::upgrade::on(req)
). Here’s an example.
Hyper now relies on OnUpgrade
being present on the Response
’s or Request
’s Extensions
. We were previously replacing extensions with our own. We had to make sure we copied OnUpgrade
into our new Extensions
, if present, before overwriting.
Kudos to the Tokio team
This was a major version upgrade, but it wasn’t hard at all. Especially with the sporadic question/answer sessions on the very helpful Tokio Discord (hat tip to Alice Ryhl in particular).