How can I test a future that is bound to a tokio TcpStream?
A simple way to test async code may be to use a dedicated runtime for each test: start it, wait for future completion and shutdown the runtime at the end of the test.
#[test]
fn my_case() {
// setup future f
// ...
tokio::run(f);
}
I don't know if there are consolidated patterns already in the Rust ecosystem; see this discussion about the evolution of testing support for future based code.
Why your code does not work as expected
When you invoke poll()
, the future is queried to check if a value is available.
If a value is not available, an interest is registered so that poll()
will be invoked again when something happens that can resolve the future.
When your MyFuture::poll()
is invoked:
TcpStream::connect
creates a new futureTcpStreamNew
TcpStreamNew::poll
is invoked immediately only once on the future's creation at step 1.- The future goes out of scope, so the next time you invoke
MyFuture::poll
you never resolve the previously created futures.
You have registered an interest for a future that, if not resolved the first time you poll it, you never ask back again (poll) for a resolved value or for an error.
The reason of the "nondeterministic" behavior is because the first poll
sometimes resolve immediately with a ConnectionRefused
error and sometimes it waits forever for a future connection event or a failure that it is never retrieved.
Look at mio::sys::unix::tcp::TcpStream
used by Tokio:
impl TcpStream {
pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
set_nonblock(stream.as_raw_fd())?;
match stream.connect(addr) {
Ok(..) => {}
Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(e) => return Err(e),
}
Ok(TcpStream {
inner: stream,
})
}
When you connect
on a non-blocking socket, the system call may connect/fail immediately or return EINPROGRESS
, in this last case a poll must be triggered for retrieving the value of the error.
The issue is not with the test but with the implementation.
This working test case based on yours has no custom future implementation and only calls TcpStream::connect()
. It works as you expect it to.
extern crate futures;
extern crate tokio;
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::prelude::*;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = TcpStream::connect(&addr)
.and_then(|f| {
println!("connected");
f.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
playground
You are connecting to the same endpoint over and over again in your poll()
method. That's not how a future works. The poll()
method will be called repeatedly, with the expectation that at some point it will return either Ok(Async::Ready(..))
or Err(..)
.
If you initiate a new TCP connection every time poll()
is called, it will be unlikely to complete in time.
Here is a modified example that does what you expect:
#[macro_use(try_ready)]
extern crate futures;
extern crate tokio;
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::{ConnectFuture, TcpStream};
use tokio::prelude::*;
struct MyFuture {
tcp: ConnectFuture,
}
impl MyFuture {
fn new(addr: SocketAddr) -> MyFuture {
MyFuture {
tcp: TcpStream::connect(&addr),
}
}
}
impl Future for MyFuture {
type Item = Framed<TcpStream, LinesCodec>;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
let strm = try_ready!(self.tcp.poll());
Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = MyFuture::new(addr)
.and_then(|f| {
println!("connected");
let cn = f.get_ref();
cn.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
I'm not certain what you intend your future to do, though; I can't comment if it's the right approach.