From 90a33824de6c68bcbeb84e9d186122227f366706 Mon Sep 17 00:00:00 2001 From: Brett McChesney Date: Fri, 4 Oct 2024 08:16:05 -0600 Subject: [PATCH] Upgrade gRPC example deps --- examples/grpc/Cargo.toml | 19 ++++---- examples/grpc/build.rs | 21 ++------- examples/grpc/src/main.rs | 97 ++++++++++++++++++--------------------- src/net/tcp/stream.rs | 4 +- 4 files changed, 58 insertions(+), 83 deletions(-) diff --git a/examples/grpc/Cargo.toml b/examples/grpc/Cargo.toml index 05e58e1..b39cc8d 100644 --- a/examples/grpc/Cargo.toml +++ b/examples/grpc/Cargo.toml @@ -4,21 +4,18 @@ version = "0.1.0" edition = "2021" publish = false - [dependencies] -tonic = "0.12" -prost = "0.13" -hyper = "1.4" async-stream = "0.3" -turmoil = { path = "../.." } +hyper = "1" +hyper-util = { version = "0.1", features = ["tokio"] } +prost = "0.13" +tokio = "1" +tonic = "0.12" +tower = "0.4" tracing = "0.1" tracing-subscriber = "0.3" -tokio = "1" -tower = "0.5" -hyper-util = "0.1" +turmoil = { path = "../.." } [build-dependencies] tonic-build = "0.12" -prost = "0.13" -prost-build = "0.13" -protox = "0.7.0" +protobuf-src = "2" \ No newline at end of file diff --git a/examples/grpc/build.rs b/examples/grpc/build.rs index 6085463..091037d 100644 --- a/examples/grpc/build.rs +++ b/examples/grpc/build.rs @@ -1,18 +1,5 @@ -use prost::Message; -use std::path::PathBuf; - -fn main() { - let file_descriptors = protox::compile(["helloworld.proto"], ["."]).unwrap(); - let file_descriptor_path = PathBuf::from(std::env::var_os("OUT_DIR").expect("OUT_DIR not set")) - .join("file_descriptor_set.bin"); - std::fs::write(&file_descriptor_path, file_descriptors.encode_to_vec()).unwrap(); - - let mut config = prost_build::Config::new(); - config - .file_descriptor_set_path(&file_descriptor_path) - .skip_protoc_run(); - - tonic_build::configure() - .compile_with_config(config, &["helloworld.proto"], &["."]) - .unwrap(); +fn main() -> std::io::Result<()> { + // avoids having a local install of protoc + std::env::set_var("PROTOC", protobuf_src::protoc()); + tonic_build::compile_protos("helloworld.proto") } diff --git a/examples/grpc/src/main.rs b/examples/grpc/src/main.rs index 06f5d8d..c0c0a12 100644 --- a/examples/grpc/src/main.rs +++ b/examples/grpc/src/main.rs @@ -1,8 +1,13 @@ +use connector::connector; +use proto::greeter_client::GreeterClient; +use proto::greeter_server::{Greeter, GreeterServer}; +use proto::{HelloReply, HelloRequest}; use std::net::{IpAddr, Ipv4Addr}; use tonic::transport::{Endpoint, Server}; use tonic::Status; use tonic::{Request, Response}; use tracing::{info_span, Instrument}; +use turmoil::net::TcpListener; use turmoil::Builder; #[allow(non_snake_case)] @@ -10,12 +15,6 @@ mod proto { tonic::include_proto!("helloworld"); } -use crate::connector::{TurmoilTcpConnector, TurmoilTcpStream}; -use crate::proto::greeter_client::GreeterClient; -use proto::greeter_server::{Greeter, GreeterServer}; -use proto::{HelloReply, HelloRequest}; -use turmoil::net::TcpListener; - fn main() { configure_tracing(); @@ -33,11 +32,10 @@ fn main() { .serve_with_incoming(async_stream::stream! { let listener = TcpListener::bind(addr).await?; loop { - yield listener.accept().await.map(|(s, _)| TurmoilTcpStream(s)); + yield listener.accept().await.map(|(s, _)| incoming::Accepted(s)); } }) - .await - .unwrap(); + .await?; Ok(()) } @@ -48,7 +46,7 @@ fn main() { "client", async move { let ch = Endpoint::new("http://server:9999")? - .connect_with_connector(TurmoilTcpConnector) + .connect_with_connector(connector()) .await?; let mut greeter_client = GreeterClient::new(ch); @@ -109,88 +107,81 @@ impl Greeter for MyGreeter { } } -mod connector { - use hyper::Uri; - use hyper_util::client::legacy::connect::Connected; - use hyper_util::rt::TokioIo; +mod incoming { + use std::pin::Pin; use std::task::{Context, Poll}; - use std::{future::Future, pin::Pin}; - use tokio::io; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - use tonic::transport::server::TcpConnectInfo; - use tower::Service; + use tonic::transport::server::{Connected, TcpConnectInfo}; use turmoil::net::TcpStream; - #[derive(Clone)] - pub struct TurmoilTcpConnector; - - impl Service for TurmoilTcpConnector { - type Response = TokioIo; - type Error = io::Error; - type Future = Pin> + Send>>; + pub struct Accepted(pub TcpStream); - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, uri: Uri) -> Self::Future { - Box::pin(async move { - let stream = TcpStream::connect(uri.authority().unwrap().as_str()).await?; - Ok(TokioIo::new(TurmoilTcpStream(stream))) - }) - } - } - - pub struct TurmoilTcpStream(pub TcpStream); - - impl hyper_util::client::legacy::connect::Connection for TurmoilTcpStream { - fn connected(&self) -> Connected { - Connected::new() - } - } - - impl tonic::transport::server::Connected for TurmoilTcpStream { + impl Connected for Accepted { type ConnectInfo = TcpConnectInfo; fn connect_info(&self) -> Self::ConnectInfo { - TcpConnectInfo { + Self::ConnectInfo { local_addr: self.0.local_addr().ok(), remote_addr: self.0.peer_addr().ok(), } } } - impl AsyncRead for TurmoilTcpStream { + impl AsyncRead for Accepted { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, - ) -> Poll> { + ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } - impl AsyncWrite for TurmoilTcpStream { + impl AsyncWrite for Accepted { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], - ) -> Poll> { + ) -> Poll> { Pin::new(&mut self.0).poll_write(cx, buf) } fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Pin::new(&mut self.0).poll_flush(cx) } fn poll_shutdown( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Pin::new(&mut self.0).poll_shutdown(cx) } } } + +mod connector { + use std::{future::Future, pin::Pin}; + + use hyper::Uri; + use hyper_util::rt::TokioIo; + + use tower::Service; + use turmoil::net::TcpStream; + + type Fut = Pin, std::io::Error>> + Send>>; + + pub fn connector( + ) -> impl Service, Error = std::io::Error, Future = Fut> + Clone + { + tower::service_fn(|uri: Uri| { + Box::pin(async move { + let conn = TcpStream::connect(uri.authority().unwrap().as_str()).await?; + Ok::<_, std::io::Error>(TokioIo::new(conn)) + }) as Fut + }) + } +} diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index b60748b..a4d38a0 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -160,7 +160,7 @@ impl TcpStream { ) } - /// Has no effect in turmoil. API parity with + /// Has no effect in turmoil. API parity with /// https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_nodelay pub fn set_nodelay(&self, _nodelay: bool) -> Result<()> { Ok(()) @@ -430,4 +430,4 @@ impl Drop for WriteHalf { world.current_host_mut().tcp.close_stream_half(*self.pair); }) } -} +} \ No newline at end of file