diff --git a/Cargo.toml b/Cargo.toml index beb8225c4d..2f3b4af31b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,10 @@ tokio = { version = "1", features = [ ] } tokio-test = "0.4" tokio-util = "0.7.10" +# Additional dependencies for HTTP/2 Early Hints example +rcgen = "0.12" +tokio-rustls = "0.25" +rustls-pemfile = "2.0" [features] # Nothing by default @@ -86,7 +90,7 @@ http2 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:h2" # Client/Server client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"] -server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec"] +server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec", "dep:futures-util"] # C-API support (currently unstable (no semver)) ffi = ["dep:http-body-util", "dep:futures-util"] @@ -305,6 +309,11 @@ name = "web_api" path = "examples/web_api.rs" required-features = ["full"] +[[example]] +name = "http2_early_hints" +path = "examples/http2_early_hints.rs" +required-features = ["full"] + [[bench]] name = "body" diff --git a/examples/README.md b/examples/README.md index de38911e9c..ad27667916 100644 --- a/examples/README.md +++ b/examples/README.md @@ -38,6 +38,8 @@ futures-util = { version = "0.3", default-features = false } * [`echo`](echo.rs) - An echo server that copies POST request's content to the response content. +* [`http2_early_hints`](http2_early_hints.rs) - An HTTP/2 server that sends 103 Early Hints. + ## Going Further * [`gateway`](gateway.rs) - A server gateway (reverse proxy) that proxies to the `hello` service above. diff --git a/examples/http2_early_hints.rs b/examples/http2_early_hints.rs new file mode 100644 index 0000000000..635b14a81f --- /dev/null +++ b/examples/http2_early_hints.rs @@ -0,0 +1,259 @@ +//! HTTP/2 server demonstrating 103 Early Hints +//! +//! This example shows the recommended approach: 103 Early Hints. +//! +//! Run with: +//! ``` +//! cargo run --example http2_early_hints --features full +//! ``` + +use std::convert::Infallible; +use std::fs; +use std::net::SocketAddr; +use std::time::Instant; + +use bytes::Bytes; +use http::{Request, Response, StatusCode}; +use http_body_util::Full; +use hyper::body::Incoming as IncomingBody; +use hyper::ext::early_hints_pusher; +use hyper::server::conn::http2; +use hyper::service::service_fn; +use tokio::net::TcpListener; +use tokio_rustls::rustls::{ + pki_types::{CertificateDer, PrivateKeyDer}, + ServerConfig, +}; +use tokio_rustls::TlsAcceptor; + +#[path = "../benches/support/mod.rs"] +mod support; +use support::{TokioExecutor, TokioIo}; + +/// Load certificates from provided files +fn load_certificates() -> Result< + (Vec>, PrivateKeyDer<'static>), + Box, +> { + // Read certificate file + let cert_pem = fs::read_to_string("/tmp/cert.txt")?; + + // Parse certificate chain + let mut certs = Vec::new(); + for cert in rustls_pemfile::certs(&mut cert_pem.as_bytes()) { + certs.push(cert?); + } + + // Read private key file + let key_pem = fs::read_to_string("/tmp/key.txt")?; + + // Parse private key + let mut key_reader = key_pem.as_bytes(); + let key = + rustls_pemfile::private_key(&mut key_reader)?.ok_or("No private key found in key file")?; + + Ok((certs, key)) +} + +/// Generate a self-signed certificate for testing (fallback) +fn generate_self_signed_cert() -> (Vec>, PrivateKeyDer<'static>) { + use rcgen::{Certificate as RcgenCert, CertificateParams, DistinguishedName}; + + let mut params = CertificateParams::new(vec!["localhost".to_string()]); + params.distinguished_name = DistinguishedName::new(); + + let cert = RcgenCert::from_params(params).unwrap(); + let cert_der = cert.serialize_der().unwrap(); + let private_key_der = cert.serialize_private_key_der(); + + ( + vec![CertificateDer::from(cert_der)], + PrivateKeyDer::try_from(private_key_der).unwrap(), + ) +} + +/// HTTP service demonstrating 103 Early Hints +async fn handle_request( + mut req: Request, +) -> Result>, Infallible> { + let path = req.uri().path(); + println!("Received request: {} {}", req.method(), req.uri()); + + // Handle static resources that we hinted about + match path { + "/css/critical.css" | "/css/layout.css" => { + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/css") + .body(Full::new(Bytes::from("body { font-family: sans-serif; }"))) + .unwrap()); + } + + "/js/app.js" | "/js/vendor.js" => { + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/javascript") + .body(Full::new(Bytes::from("console.log('loaded');"))) + .unwrap()); + } + + "/fonts/main.woff2" | "/fonts/icons.woff2" => { + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "font/woff2") + .body(Full::new(Bytes::from(&b"WOFF2"[..]))) + .unwrap()); + } + + "/images/hero.webp" => { + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "image/webp") + .body(Full::new(Bytes::from(&b"RIFF"[..]))) + .unwrap()); + } + + // Root path - serve HTML page with all the hinted resources + "/" => { + // Send 103 Early Hints using the early_hints_pusher API + if let Ok(mut pusher) = early_hints_pusher(&mut req) { + println!("Sending 103 Early Hints (all critical resources)"); + + let start_time = Instant::now(); + + let hints = Response::builder() + .status(StatusCode::EARLY_HINTS) + // Critical CSS (highest priority - render blocking) + .header("link", "; rel=preload; as=style") + .header("link", "; rel=preload; as=style") + // Critical JavaScript (high priority - interaction) + .header("link", "; rel=preload; as=script") + .header("link", "; rel=preload; as=script") + // Fonts (medium priority - text rendering) + .header( + "link", + "; rel=preload; as=font; crossorigin", + ) + .header( + "link", + "; rel=preload; as=font; crossorigin", + ) + // Hero image (medium priority - above fold) + .header("link", "; rel=preload; as=image") + // Metadata for tracking + .header("x-resource-count", "7") + .header("x-priority-order", "css,js,fonts,images") + .body(()) + .unwrap(); + + if let Err(e) = pusher.send_hints(hints).await { + eprintln!("Failed to send hints: {}", e); + } else { + let send_duration = start_time.elapsed(); + println!("103 Early Hints sent in: {:?}", send_duration); + println!(" 7 resources hinted in single response"); + println!(" Browser processes once, starts all preloads immediately"); + } + + // Simulate realistic server processing time + println!("Processing request (simulating database queries, template rendering...)"); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + + let html_content = r#" + + + 103 Early Hints Demo + + + + +

HTTP/2 103 Early Hints

+

The resources above were hinted via 103 before this response arrived.

+ + +"#; + + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/html") + .body(Full::new(Bytes::from(html_content))) + .unwrap()); + } + + // Default 404 handler + _ => { + return Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::from("Not Found"))) + .unwrap()); + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + pretty_env_logger::init(); + + let addr: SocketAddr = ([0, 0, 0, 0], 3000).into(); + + // Load provided certificates or fallback to self-signed + let (certs, key) = match load_certificates() { + Ok((certs, key)) => { + println!("Loaded certificates from /tmp/cert.txt and /tmp/key.txt"); + (certs, key) + } + Err(e) => { + println!( + "Failed to load provided certificates ({}), generating self-signed certificate...", + e + ); + generate_self_signed_cert() + } + }; + + // Configure TLS + let mut config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key)?; + + // Enable HTTP/2 + config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + + let tls_acceptor = TlsAcceptor::from(std::sync::Arc::new(config)); + + // Create TCP listener + let listener = TcpListener::bind(addr).await?; + println!("103 Early Hints Server listening on https://{}", addr); + println!("Test: curl -k --http2 -v https://localhost:3000/"); + println!("Expected: 1 103 response + 1 final 200 response"); + println!("Benefits: Minimal browser overhead, maximum performance"); + + loop { + let (tcp_stream, _) = listener.accept().await?; + let tls_acceptor = tls_acceptor.clone(); + + tokio::spawn(async move { + // Perform TLS handshake + let tls_stream = match tls_acceptor.accept(tcp_stream).await { + Ok(stream) => stream, + Err(e) => { + eprintln!("TLS handshake failed: {}", e); + return; + } + }; + + // Serve HTTP/2 connection with Early Hints support enabled + let service = service_fn(handle_request); + + if let Err(e) = http2::Builder::new(TokioExecutor) + .enable_informational() // Enable 103 Early Hints support + .serve_connection(TokioIo::new(tls_stream), service) + .await + { + eprintln!("HTTP/2 connection error: {}", e); + } + }); + } +} diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index ec046e0a39..93d3346b75 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -14,6 +14,7 @@ use futures_core::ready; use http::{Request, Response}; use super::super::dispatch::{self, TrySendError}; +use super::informational::InformationalConfig; use crate::body::{Body, Incoming as IncomingBody}; use crate::common::time::Time; use crate::proto; @@ -68,6 +69,7 @@ pub struct Builder { pub(super) exec: Ex, pub(super) timer: Time, h2_builder: proto::h2::client::Config, + informational_config: InformationalConfig, } /// Returns a handshake future over some IO. @@ -299,6 +301,7 @@ where exec, timer: Time::Empty, h2_builder: proto::h2::client::Config::default(), + informational_config: InformationalConfig::new(), } } @@ -542,6 +545,50 @@ where self } + /// Configures handling of informational responses (1xx status codes). + /// + /// By default, informational responses are ignored. This method allows you to + /// provide a callback that will be invoked whenever an informational response + /// is received, such as 103 Early Hints. + /// + /// # Examples + /// + /// ```rust + /// use hyper::client::conn::http2::Builder; + /// use hyper::client::conn::informational::InformationalConfig; + /// use http::StatusCode; + /// + /// #[derive(Clone)] + /// struct TokioExecutor; + /// + /// impl hyper::rt::Executor for TokioExecutor + /// where + /// F: std::future::Future + Send + 'static, + /// F::Output: Send + 'static, + /// { + /// fn execute(&self, fut: F) { + /// tokio::task::spawn(fut); + /// } + /// } + /// + /// let mut builder = Builder::new(TokioExecutor); + /// builder.informational_responses( + /// InformationalConfig::new().with_callback(|response| { + /// if response.status() == StatusCode::EARLY_HINTS { + /// println!("Received 103 Early Hints"); + /// // Process Link headers for resource preloading + /// for link in response.headers().get_all("link") { + /// println!("Preload: {:?}", link); + /// } + /// } + /// }) + /// ); + /// ``` + pub fn informational_responses(&mut self, config: InformationalConfig) -> &mut Self { + self.informational_config = config; + self + } + /// Constructs a connection with the configured options and IO. /// See [`client::conn`](crate::client::conn) for more. /// @@ -564,8 +611,15 @@ where trace!("client handshake HTTP/2"); let (tx, rx) = dispatch::channel(); - let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer) - .await?; + let h2 = proto::h2::client::handshake( + io, + rx, + &opts.h2_builder, + opts.exec, + opts.timer, + Some(opts.informational_config.clone()), + ) + .await?; Ok(( SendRequest { dispatch: tx.unbound(), diff --git a/src/client/conn/informational.rs b/src/client/conn/informational.rs new file mode 100644 index 0000000000..e308393815 --- /dev/null +++ b/src/client/conn/informational.rs @@ -0,0 +1,171 @@ +//! Informational response handling for HTTP/2 client connections. +//! +//! This module provides callback-based handling of 1xx informational responses, +//! including 103 Early Hints, for HTTP/2 client connections. + +use http::Response; +use std::fmt; +use std::sync::Arc; + +/// A callback function for handling informational responses (1xx status codes). +/// +/// This callback is invoked whenever the client receives an informational response +/// from the server, such as 103 Early Hints. The callback receives the complete +/// informational response including headers. +/// +/// # Examples +/// +/// ```rust +/// use hyper::client::conn::informational::InformationalCallback; +/// use http::{Response, StatusCode}; +/// use std::sync::Arc; +/// +/// let callback: InformationalCallback = Arc::new(|response: Response<()>| { +/// if response.status() == StatusCode::EARLY_HINTS { +/// println!("Received 103 Early Hints with {} headers", +/// response.headers().len()); +/// // Process Link headers for resource preloading +/// for link in response.headers().get_all("link") { +/// println!("Preload: {:?}", link); +/// } +/// } +/// }); +/// ``` +pub type InformationalCallback = Arc) + Send + Sync>; + +/// Configuration for informational response handling. +/// +/// This struct allows configuring how informational responses should be handled +/// by the HTTP/2 client connection. +#[derive(Default)] +pub struct InformationalConfig { + /// Optional callback for handling informational responses. + /// If None, informational responses are ignored (current behavior). + pub callback: Option, +} + +impl InformationalConfig { + /// Creates a new informational configuration with no callback. + pub fn new() -> Self { + Self::default() + } + + /// Sets the callback for handling informational responses. + pub fn with_callback(mut self, callback: F) -> Self + where + F: Fn(Response<()>) + Send + Sync + 'static, + { + self.callback = Some(Arc::new(callback)); + self + } + + /// Returns true if a callback is configured. + pub fn has_callback(&self) -> bool { + self.callback.is_some() + } + + /// Invokes the callback if one is configured. + /// + /// This is a test helper method - in production code, the callback + /// is extracted and called directly for better performance. + #[cfg(test)] + pub(crate) fn invoke_callback(&self, response: Response<()>) { + if let Some(ref callback) = self.callback { + callback(response); + } + } +} + +impl fmt::Debug for InformationalConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("InformationalConfig") + .field("has_callback", &self.has_callback()) + .finish() + } +} + +impl Clone for InformationalConfig { + fn clone(&self) -> Self { + // Arc allows us to clone the callback + Self { + callback: self.callback.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http::StatusCode; + use std::sync::{Arc, Mutex}; + + #[test] + fn test_informational_config_creation() { + let config = InformationalConfig::new(); + assert!(!config.has_callback()); + } + + #[test] + fn test_informational_config_with_callback() { + let called = Arc::new(Mutex::new(false)); + let called_clone = called.clone(); + + let config = InformationalConfig::new().with_callback(move |_response| { + *called_clone.lock().unwrap() = true; + }); + + assert!(config.has_callback()); + + // Test callback invocation + let mut response = Response::new(()); + *response.status_mut() = StatusCode::EARLY_HINTS; + config.invoke_callback(response); + + assert!(*called.lock().unwrap()); + } + + #[test] + fn test_informational_config_clone() { + let config = InformationalConfig::new().with_callback(|_| {}); + assert!(config.has_callback()); + + let cloned = config.clone(); + assert!(cloned.has_callback()); // Callback is cloned with Arc + } + + #[test] + fn test_early_hints_callback() { + let received_links = Arc::new(Mutex::new(Vec::new())); + let received_links_clone = received_links.clone(); + + let config = InformationalConfig::new().with_callback(move |response| { + if response.status() == StatusCode::EARLY_HINTS { + for link in response.headers().get_all("link") { + received_links_clone + .lock() + .unwrap() + .push(link.to_str().unwrap().to_string()); + } + } + }); + + // Simulate 103 Early Hints response + let mut response = Response::new(()); + *response.status_mut() = StatusCode::EARLY_HINTS; + response.headers_mut().insert( + "link", + "; rel=preload; as=style".parse().unwrap(), + ); + response.headers_mut().append( + "link", + "; rel=preload; as=script".parse().unwrap(), + ); + + config.invoke_callback(response); + + let links = received_links.lock().unwrap(); + assert_eq!(links.len(), 2); + assert!(links.contains(&"; rel=preload; as=style".to_string())); + assert!(links.contains(&"; rel=preload; as=script".to_string())); + } +} diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index f982ae6ddb..24e0764c28 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -18,5 +18,7 @@ pub mod http1; #[cfg(feature = "http2")] pub mod http2; +#[cfg(feature = "http2")] +pub mod informational; pub use super::dispatch::TrySendError; diff --git a/src/error.rs b/src/error.rs index 5134b581a3..7917c474fd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -146,10 +146,6 @@ pub(super) enum User { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] UnexpectedHeader, - /// User tried to respond with a 1xx (not 101) response code. - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - UnsupportedStatusCode, /// User tried polling for an upgrade that doesn't exist. NoUpgrade, @@ -437,12 +433,6 @@ impl Error { Error::new(Kind::HeaderTimeout) } - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - pub(super) fn new_user_unsupported_status_code() -> Error { - Error::new_user(User::UnsupportedStatusCode) - } - pub(super) fn new_user_no_upgrade() -> Error { Error::new_user(User::NoUpgrade) } @@ -582,11 +572,6 @@ impl Error { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] Kind::User(User::UnexpectedHeader) => "user sent unexpected header", - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - Kind::User(User::UnsupportedStatusCode) => { - "response has 1xx status code, not supported by server" - } Kind::User(User::NoUpgrade) => "no upgrade available", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use", diff --git a/src/ext/informational_sender.rs b/src/ext/informational_sender.rs new file mode 100644 index 0000000000..9c22c9e96f --- /dev/null +++ b/src/ext/informational_sender.rs @@ -0,0 +1,234 @@ +//! Support for sending HTTP 103 Early Hints responses. +//! +//! This module provides the `early_hints_pusher()` function which allows +//! server handlers to send informational responses (1xx status codes) +//! before the final response. + +use http::{Request, Response, StatusCode}; + +use super::InformationalSender; + +/// A handle for sending HTTP 103 Early Hints responses. +/// +/// Obtained by calling `early_hints_pusher()` on a request. +#[derive(Debug, Clone)] +pub struct EarlyHintsPusher { + sender: futures_channel::mpsc::Sender>, +} + +impl EarlyHintsPusher { + /// Send an HTTP 103 Early Hints response. + /// + /// The response must have status code 103 and an empty body. + /// + /// Returns an error if the response is invalid or if sending fails. + pub async fn send_hints(&mut self, response: Response<()>) -> Result<(), EarlyHintsError> { + // Validate that this is a 103 response + if response.status() != StatusCode::EARLY_HINTS { + return Err(EarlyHintsError::InvalidStatus); + } + + self.sender + .try_send(response) + .map_err(|_| EarlyHintsError::SendFailed) + } +} + +/// Error type for early hints operations. +#[derive(Debug)] +pub enum EarlyHintsError { + /// The response status was not 103 + InvalidStatus, + /// Failed to send the response (channel full or closed) + SendFailed, + /// Early hints are not supported for this request + NotSupported, +} + +impl std::fmt::Display for EarlyHintsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EarlyHintsError::InvalidStatus => write!(f, "response must have status 103"), + EarlyHintsError::SendFailed => write!(f, "failed to send early hints response"), + EarlyHintsError::NotSupported => { + write!(f, "early hints not supported for this request") + } + } + } +} + +impl std::error::Error for EarlyHintsError {} + +/// Obtain a pusher for sending HTTP 103 Early Hints responses. +/// +/// This function lazily creates a channel for sending informational responses. +/// If called multiple times on the same request, it returns pushers that share +/// the same underlying channel. +/// +/// Returns `Err` if early hints are not supported for this request +/// (for example, if the connection doesn't support HTTP/2 or HTTP/1.1). +/// +/// # Example +/// +/// ```rust,no_run +/// use hyper::{Request, Response, StatusCode}; +/// use hyper::body::Incoming; +/// use hyper::ext::early_hints_pusher; +/// +/// async fn handle(mut req: Request) -> Result, hyper::Error> { +/// let preload = r#"; rel="preload"; as="style""#; +/// +/// match early_hints_pusher(&mut req) { +/// Ok(mut pusher) => { +/// let hints = Response::builder() +/// .status(StatusCode::EARLY_HINTS) +/// .header("Link", preload) +/// .body(()) +/// .unwrap(); +/// +/// if let Err(e) = pusher.send_hints(hints).await { +/// eprintln!("Failed to send early hints: {}", e); +/// } +/// } +/// Err(e) => { +/// eprintln!("Early hints not available: {}", e); +/// } +/// } +/// +/// // Send final response with the same Link header +/// Ok(Response::builder() +/// .header("Link", preload) +/// .body("...".to_string()) +/// .unwrap()) +/// } +/// ``` +pub fn early_hints_pusher(req: &mut Request) -> Result { + // Check if sender exists (pre-created by the server) + if let Some(sender) = req.extensions().get::() { + // Return a pusher that uses the existing sender + return Ok(EarlyHintsPusher { + sender: sender.0.clone(), + }); + } + + // Sender not found - early hints not supported for this request + Err(EarlyHintsError::NotSupported) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_early_hints_pusher_returns_error_when_not_supported() { + let mut req = Request::new(()); + let result = early_hints_pusher(&mut req); + + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), EarlyHintsError::NotSupported)); + } + + #[test] + fn test_early_hints_pusher_succeeds_when_sender_present() { + let mut req = Request::new(()); + let (tx, _rx) = futures_channel::mpsc::channel(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let result = early_hints_pusher(&mut req); + assert!(result.is_ok()); + } + + #[test] + fn test_early_hints_pusher_multiple_calls_reuse_sender() { + let mut req = Request::new(()); + let (tx, _rx) = futures_channel::mpsc::channel(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let pusher1 = early_hints_pusher(&mut req).unwrap(); + let pusher2 = early_hints_pusher(&mut req).unwrap(); + + // Both pushers should be valid (cloning the sender) + assert!(pusher1.sender.is_closed() == pusher2.sender.is_closed()); + } + + #[tokio::test] + async fn test_send_hints_rejects_non_103_status() { + let mut req = Request::new(()); + let (tx, _rx) = futures_channel::mpsc::channel(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let mut pusher = early_hints_pusher(&mut req).unwrap(); + + // Try to send a 200 response instead of 103 + let invalid_response = Response::builder().status(200).body(()).unwrap(); + + let result = pusher.send_hints(invalid_response).await; + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + EarlyHintsError::InvalidStatus + )); + } + + #[tokio::test] + async fn test_send_hints_accepts_103_status() { + let mut req = Request::new(()); + let (tx, mut rx) = futures_channel::mpsc::channel(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let mut pusher = early_hints_pusher(&mut req).unwrap(); + + // Send a valid 103 response + let valid_response = Response::builder() + .status(103) + .header("link", "; rel=preload; as=style") + .body(()) + .unwrap(); + + let result = pusher.send_hints(valid_response).await; + assert!(result.is_ok()); + + // Verify the response was sent through the channel + let received = rx.try_next().unwrap(); + assert!(received.is_some()); + let response = received.unwrap(); + assert_eq!(response.status(), 103); + } + + #[tokio::test] + async fn test_send_hints_fails_when_channel_closed() { + let mut req = Request::new(()); + let (tx, rx) = futures_channel::mpsc::channel::>(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let mut pusher = early_hints_pusher(&mut req).unwrap(); + + // Close the receiver + drop(rx); + + // Try to send hints - should fail + let response = Response::builder().status(103).body(()).unwrap(); + + let result = pusher.send_hints(response).await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), EarlyHintsError::SendFailed)); + } + + #[test] + fn test_early_hints_error_display() { + let invalid_status = EarlyHintsError::InvalidStatus; + assert_eq!(invalid_status.to_string(), "response must have status 103"); + + let send_failed = EarlyHintsError::SendFailed; + assert_eq!( + send_failed.to_string(), + "failed to send early hints response" + ); + + let not_supported = EarlyHintsError::NotSupported; + assert_eq!( + not_supported.to_string(), + "early hints not supported for this request" + ); + } +} diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 0fccb5ffec..e11e9e78ef 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -296,3 +296,22 @@ impl OriginalHeaderOrder { self.entry_order.iter() } } + +/// Request extension type for sending one or more 1xx informational responses +/// prior to the final response. +/// +/// This extension is meant to be attached to inbound `Request`s, allowing a +/// server to send informational responses immediately (i.e. without delaying +/// them until it has constructed a final, non-informational response). +/// +/// **Note:** This type should not be constructed directly by users. +/// Use the `early_hints_pusher()` function to obtain a pusher +/// that will lazily create this extension when needed. +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +#[derive(Clone, Debug)] +pub struct InformationalSender(pub(crate) futures_channel::mpsc::Sender>); + +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +mod informational_sender; +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +pub use informational_sender::{early_hints_pusher, EarlyHintsError, EarlyHintsPusher}; diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index dfcadcee56..a661699f0d 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -642,7 +642,7 @@ where head.extensions.remove::(); } - Some(encoder) + encoder } Err(err) => { self.state.error = Some(err); diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 3a9b536ea4..3721cfd544 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -8,14 +8,22 @@ use std::{ use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; +#[cfg(feature = "server")] +use futures_channel::mpsc::{self, Receiver}; use futures_core::ready; +#[cfg(feature = "server")] +use futures_util::stream::StreamExt; use http::Request; +#[cfg(feature = "server")] +use http::Response; use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, Incoming as IncomingBody}; #[cfg(feature = "client")] use crate::client::dispatch::TrySendError; use crate::common::task; +#[cfg(feature = "server")] +use crate::ext::InformationalSender; use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead}; use crate::upgrade::OnUpgrade; @@ -35,7 +43,7 @@ pub(crate) trait Dispatch { fn poll_msg( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>>; + ) -> Poll), Self::PollError>>>; fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; @@ -46,6 +54,7 @@ cfg_server! { use crate::service::HttpService; pub(crate) struct Server, B> { + informational_rx: Option>>, in_flight: Pin>>, pub(crate) service: S, } @@ -355,17 +364,22 @@ where if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { let (head, body) = msg.map_err(crate::Error::new_user_service)?; - let body_type = if body.is_end_stream() { + let body_type = if let Some(body) = body { + if body.is_end_stream() { + self.body_rx.set(None); + None + } else { + let btype = body + .size_hint() + .exact() + .map(BodyLength::Known) + .or(Some(BodyLength::Unknown)); + self.body_rx.set(Some(body)); + btype + } + } else { self.body_rx.set(None); None - } else { - let btype = body - .size_hint() - .exact() - .map(BodyLength::Known) - .or(Some(BodyLength::Unknown)); - self.body_rx.set(Some(body)); - btype }; self.conn.write_head(head, body_type); } else { @@ -562,6 +576,7 @@ cfg_server! { { pub(crate) fn new(service: S) -> Server { Server { + informational_rx: None, in_flight: Box::pin(None), service, } @@ -589,8 +604,33 @@ cfg_server! { fn poll_msg( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll), Self::PollError>>> { let mut this = self.as_mut(); + + if let Some(informational_rx) = this.informational_rx.as_mut() { + if let Poll::Ready(informational) = informational_rx.poll_next_unpin(cx) { + if let Some(informational) = informational { + let (parts, _) = informational.into_parts(); + if parts.status.is_informational() { + let head = MessageHead { + version: parts.version, + subject: parts.status, + headers: parts.headers, + extensions: parts.extensions, + }; + return Poll::Ready(Some(Ok((head, None)))); + } else { + // TODO: We should return an error here, but we have + // no way of creating a `Self::PollError`; might + // need to change the signature of + // `Dispatch::poll_msg`. + } + } else { + this.informational_rx = None; + } + } + } + let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { let resp = ready!(fut.as_mut().poll(cx)?); let (parts, body) = resp.into_parts(); @@ -600,13 +640,14 @@ cfg_server! { headers: parts.headers, extensions: parts.extensions, }; - Poll::Ready(Some(Ok((head, body)))) + Poll::Ready(Some(Ok((head, Some(body))))) } else { unreachable!("poll_msg shouldn't be called if no inflight"); }; // Since in_flight finished, remove it this.in_flight.set(None); + this.informational_rx = None; ret } @@ -618,7 +659,10 @@ cfg_server! { *req.headers_mut() = msg.headers; *req.version_mut() = msg.version; *req.extensions_mut() = msg.extensions; + let (informational_tx, informational_rx) = mpsc::channel(1); + assert!(req.extensions_mut().insert(InformationalSender(informational_tx)).is_none()); let fut = self.service.call(req); + self.informational_rx = Some(informational_rx); self.in_flight.set(Some(fut)); Ok(()) } @@ -664,7 +708,7 @@ cfg_client! { fn poll_msg( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll), Infallible>>> { let mut this = self.as_mut(); debug_assert!(!this.rx_closed); match this.rx.poll_recv(cx) { @@ -684,7 +728,7 @@ cfg_client! { extensions: parts.extensions, }; this.callback = Some(cb); - Poll::Ready(Some(Ok((head, body)))) + Poll::Ready(Some(Ok((head, Some(body))))) } } } diff --git a/src/proto/h1/mod.rs b/src/proto/h1/mod.rs index 0a442c4c7e..491f2f7684 100644 --- a/src/proto/h1/mod.rs +++ b/src/proto/h1/mod.rs @@ -33,7 +33,8 @@ pub(crate) trait Http1Transaction { #[cfg(feature = "tracing")] const LOG: &'static str; fn parse(bytes: &mut BytesMut, ctx: ParseContext<'_>) -> ParseResult; - fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result; + fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec) + -> crate::Result>; fn on_error(err: &crate::Error) -> Option>; diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 740c8e1d56..512ada600c 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -117,7 +117,7 @@ fn is_complete_fast(bytes: &[u8], prev_len: usize) -> bool { pub(super) fn encode_headers( enc: Encode<'_, T::Outgoing>, dst: &mut Vec, -) -> crate::Result +) -> crate::Result> where T: Http1Transaction, { @@ -367,7 +367,10 @@ impl Http1Transaction for Server { })) } - fn encode(mut msg: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result { + fn encode( + msg: Encode<'_, Self::Outgoing>, + dst: &mut Vec, + ) -> crate::Result> { trace!( "Server::encode status={:?}, body={:?}, req_method={:?}", msg.head.subject, @@ -377,25 +380,19 @@ impl Http1Transaction for Server { let mut wrote_len = false; - // hyper currently doesn't support returning 1xx status codes as a Response - // This is because Service only allows returning a single Response, and - // so if you try to reply with a e.g. 100 Continue, you have no way of - // replying with the latter status code response. - let (ret, is_last) = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS { - (Ok(()), true) + let informational = msg.head.subject.is_informational(); + + let is_last = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS { + true } else if msg.req_method == &Some(Method::CONNECT) && msg.head.subject.is_success() { // Sending content-length or transfer-encoding header on 2xx response // to CONNECT is forbidden in RFC 7231. wrote_len = true; - (Ok(()), true) - } else if msg.head.subject.is_informational() { - warn!("response with 1xx status code not supported"); - *msg.head = MessageHead::default(); - msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR; - msg.body = None; - (Err(crate::Error::new_user_unsupported_status_code()), true) + true + } else if informational { + false } else { - (Ok(()), !msg.keep_alive) + !msg.keep_alive }; // In some error cases, we don't know about the invalid message until already @@ -453,6 +450,7 @@ impl Http1Transaction for Server { } orig_headers => orig_headers, }; + let encoder = if let Some(orig_headers) = orig_headers { Self::encode_headers_with_original_case( msg, @@ -466,7 +464,11 @@ impl Http1Transaction for Server { Self::encode_headers_with_lower_case(msg, dst, is_last, orig_len, wrote_len)? }; - ret.map(|()| encoder) + // If we're sending a 1xx informational response, it won't have a body, + // so we'll return `None` here. Additionally, that will tell + // `Conn::write_head` to stay in the `Writing::Init` state since we + // haven't yet sent the final response. + Ok(if informational { None } else { Some(encoder) }) } fn on_error(err: &crate::Error) -> Option> { @@ -1183,7 +1185,10 @@ impl Http1Transaction for Client { } } - fn encode(msg: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result { + fn encode( + msg: Encode<'_, Self::Outgoing>, + dst: &mut Vec, + ) -> crate::Result> { trace!( "Client::encode method={:?}, body={:?}", msg.head.subject.0, @@ -1229,7 +1234,7 @@ impl Http1Transaction for Client { extend(dst, b"\r\n"); msg.head.headers.clear(); //TODO: remove when switching to drain() - Ok(body) + Ok(Some(body)) } fn on_error(_err: &crate::Error) -> Option> { @@ -2648,7 +2653,7 @@ mod tests { ) .unwrap(); - assert!(encoder.is_last()); + assert!(encoder.expect("encoder should exist").is_last()); } #[cfg(feature = "server")] diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 4803dba0f1..db98ee256c 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -153,6 +153,7 @@ pub(crate) async fn handshake( config: &Config, mut exec: E, timer: Time, + informational_config: Option, ) -> crate::Result> where T: Read + Write + Unpin, @@ -201,6 +202,7 @@ where h2_tx, req_rx, fut_ctx: None, + informational_callback: informational_config.and_then(|config| config.callback), marker: PhantomData, }) } @@ -418,6 +420,7 @@ where body_tx: SendStream>, body: B, cb: Callback, Response>, + informational_callback: Option, } impl Unpin for FutCtx {} @@ -434,6 +437,7 @@ where h2_tx: SendRequest>, req_rx: ClientRx, fut_ctx: Option>, + informational_callback: Option, marker: PhantomData, } @@ -571,6 +575,7 @@ where send_stream: Some(send_stream), exec: self.executor.clone(), cancel_tx: Some(cancel_tx), + informational_callback: f.informational_callback, }, call_back: Some(f.cb), }, @@ -591,6 +596,7 @@ pin_project! { send_stream: Option::Data>>>>, exec: E, cancel_tx: Option>, + informational_callback: Option, } } @@ -613,6 +619,39 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); + // First, check for any informational responses and invoke the callback if present + if let Some(callback) = &this.informational_callback { + let mut processed_informational = false; + loop { + match this.fut.poll_informational(cx) { + Poll::Ready(Some(Ok(informational_response))) => { + // Invoke the callback with the informational response + callback(informational_response); + processed_informational = true; + // Continue polling for more informational responses + continue; + } + Poll::Ready(Some(Err(_err))) => { + // Error in informational response, log it but don't fail the main response + debug!("informational response error: {}", _err); + break; + } + Poll::Ready(None) => { + // No more informational responses expected + break; + } + Poll::Pending => { + // If we processed any informational responses, return Pending to allow + // the H2 layer to process them before polling the main response + if processed_informational { + return Poll::Pending; + } + break; + } + } + } + } + let result = ready!(this.fut.poll(cx)); let ping = this.ping.take().expect("Future polled twice"); @@ -705,6 +744,10 @@ where continue; } let (head, body) = req.into_parts(); + + // Use the connection-level informational callback + let informational_callback = self.informational_callback.clone(); + let mut req = ::http::Request::from_parts(head, ()); super::strip_connection_headers(req.headers_mut(), super::MessageKind::Request); if let Some(len) = body.size_hint().exact() { @@ -751,6 +794,7 @@ where body_tx, body, cb, + informational_callback, }; // Check poll_ready() again. diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 6645734f31..75c6d3ab83 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -11,6 +11,10 @@ use h2::{Reason, RecvStream}; use http::{Method, Request}; use pin_project_lite::pin_project; +use futures_channel::mpsc::Receiver; +use futures_util::stream::StreamExt; +use http::Response; + use super::{ping, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; use crate::common::date; @@ -25,7 +29,6 @@ use crate::rt::{Read, Write}; use crate::service::HttpService; use crate::upgrade::{OnUpgrade, Pending, Upgraded}; -use crate::Response; // Our defaults are chosen for the "majority" case, which usually are not // resource constrained, and so the spec default of 64kb can be too limiting @@ -56,6 +59,7 @@ pub(crate) struct Config { pub(crate) header_table_size: Option, pub(crate) max_header_list_size: u32, pub(crate) date_header: bool, + pub(crate) enable_informational: bool, } impl Default for Config { @@ -75,6 +79,7 @@ impl Default for Config { max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, date_header: true, + enable_informational: false, } } } @@ -90,6 +95,7 @@ pin_project! { service: S, state: State, date_header: bool, + enable_informational: bool, close_pending: bool } } @@ -178,6 +184,7 @@ where }, service, date_header: config.date_header, + enable_informational: config.enable_informational, close_pending: false, } } @@ -234,7 +241,12 @@ where if me.close_pending && srv.closing.is_none() { srv.conn.graceful_shutdown(); } - ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?; + ready!(srv.poll_server( + cx, + &mut me.service, + &mut me.exec, + me.enable_informational + ))?; return Poll::Ready(Ok(Dispatched::Shutdown)); } }; @@ -253,6 +265,7 @@ where cx: &mut Context<'_>, service: &mut S, exec: &mut E, + enable_informational: bool, ) -> Poll> where S: HttpService, @@ -309,12 +322,28 @@ where req.extensions_mut().insert(Protocol::from_inner(protocol)); } + // Conditionally create channel infrastructure for early hints + // Only when explicitly enabled via builder + let informational_rx = if enable_informational { + const CHANNEL_CAPACITY: usize = 10; + let (tx, rx) = futures_channel::mpsc::channel(CHANNEL_CAPACITY); + + // Insert sender for early_hints_pusher() to find + req.extensions_mut() + .insert(crate::ext::InformationalSender(tx)); + + Some(rx) + } else { + None + }; + let fut = H2Stream::new( service.call(req), connect_parts, respond, self.date_header, exec.clone(), + informational_rx, ); exec.execute_h2stream(fut); @@ -373,6 +402,7 @@ pin_project! { state: H2StreamState, date_header: bool, exec: E, + informational_rx: Option>>, } } @@ -410,12 +440,14 @@ where respond: SendResponse>, date_header: bool, exec: E, + informational_rx: Option>>, ) -> H2Stream { H2Stream { reply: respond, state: H2StreamState::Service { fut, connect_parts }, date_header, exec, + informational_rx, } } } @@ -442,6 +474,37 @@ where Ex: Http2UpgradedExec, E: Into>, { + /// Poll and send any pending informational responses. + /// Returns Poll::Ready(Err) if sending fails, Poll::Pending otherwise. + fn poll_informational( + informational_rx: &mut Option>>, + reply: &mut SendResponse>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(rx) = informational_rx.as_mut() { + while let Poll::Ready(informational) = rx.poll_next_unpin(cx) { + match informational { + Some(informational) => { + trace!( + "Sending informational response: {:?}", + informational.status() + ); + if let Err(e) = reply.send_informational(informational) { + debug!("Failed to send informational response: {}", e); + return Poll::Ready(Err(crate::Error::new_h2(e))); + } + } + None => { + trace!("Informational channel closed"); + *informational_rx = None; + break; + } + } + } + } + Poll::Pending + } + fn poll2(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.as_mut().project(); loop { @@ -450,6 +513,14 @@ where fut: h, connect_parts, } => { + // Check for informational responses only in Service state + // (1xx responses must come before the final response per HTTP/2 spec) + if let Poll::Ready(Err(e)) = + Self::poll_informational(me.informational_rx, me.reply, cx) + { + return Poll::Ready(Err(e)); + } + let res = match h.poll(cx) { Poll::Ready(Ok(r)) => r, Poll::Pending => { @@ -471,6 +542,14 @@ where } }; + // Service returned Ready - poll receiver one final time to catch any + // informational responses sent during service execution + if let Poll::Ready(Err(e)) = + Self::poll_informational(me.informational_rx, me.reply, cx) + { + return Poll::Ready(Err(e)); + } + let (head, body) = res.into_parts(); let mut res = ::http::Response::from_parts(head, ()); super::strip_connection_headers( @@ -521,6 +600,9 @@ where headers::set_content_length_if_missing(res.headers_mut(), len); } + // Clear informational receiver since we're now sending the final response body + *me.informational_rx = None; + let body_tx = reply!(me, res, false); H2StreamState::Body { pipe: PipeToSendStream::new(body, body_tx), @@ -558,3 +640,80 @@ where }) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_default_values() { + let config = Config::default(); + + // Verify all default values + assert!(!config.adaptive_window); + assert_eq!(config.initial_conn_window_size, DEFAULT_CONN_WINDOW); + assert_eq!(config.initial_stream_window_size, DEFAULT_STREAM_WINDOW); + assert_eq!(config.max_frame_size, DEFAULT_MAX_FRAME_SIZE); + assert!(!config.enable_connect_protocol); + assert_eq!(config.max_concurrent_streams, Some(200)); + assert_eq!(config.max_pending_accept_reset_streams, None); + assert_eq!( + config.max_local_error_reset_streams, + Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS) + ); + assert_eq!(config.keep_alive_interval, None); + assert_eq!(config.keep_alive_timeout, Duration::from_secs(20)); + assert_eq!(config.max_send_buffer_size, DEFAULT_MAX_SEND_BUF_SIZE); + assert_eq!( + config.max_header_list_size, + DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE + ); + assert!(config.date_header); + + // Most importantly, informational should be disabled by default + assert!(!config.enable_informational); + } + + #[test] + fn test_config_enable_informational_flag() { + let mut config = Config::default(); + assert!(!config.enable_informational); + + config.enable_informational = true; + assert!(config.enable_informational); + } + + #[test] + fn test_config_preserves_other_values_when_setting_informational() { + let mut config = Config::default(); + config.max_concurrent_streams = Some(100); + config.max_frame_size = 32768; + + config.enable_informational = true; + + assert!(config.enable_informational); + assert_eq!(config.max_concurrent_streams, Some(100)); + assert_eq!(config.max_frame_size, 32768); + } + + #[test] + fn test_config_can_be_cloned() { + let mut config = Config::default(); + config.enable_informational = true; + config.max_concurrent_streams = Some(50); + + let cloned = config.clone(); + + assert_eq!(cloned.enable_informational, config.enable_informational); + assert_eq!(cloned.max_concurrent_streams, config.max_concurrent_streams); + } + + #[test] + fn test_config_debug_format() { + let config = Config::default(); + let debug_output = format!("{:?}", config); + + // Verify debug output contains the field + assert!(debug_output.contains("enable_informational")); + } +} diff --git a/src/server/conn/http2.rs b/src/server/conn/http2.rs index e4dac78bea..e735c5510f 100644 --- a/src/server/conn/http2.rs +++ b/src/server/conn/http2.rs @@ -299,6 +299,42 @@ impl Builder { self } + /// Enable support for HTTP/2 informational responses (1xx status codes). + /// + /// When enabled, the server can send informational responses like 103 Early Hints + /// before the final response using the `early_hints_pusher()` function. + /// + /// This creates a small amount of overhead for each request to support the feature. + /// When disabled (default), `early_hints_pusher()` will return an error. + /// + /// Default is false (disabled). + /// + /// # Example + /// + /// ```rust + /// use hyper::server::conn::http2; + /// + /// #[derive(Clone)] + /// struct TokioExecutor; + /// + /// impl hyper::rt::Executor for TokioExecutor + /// where + /// F: std::future::Future + Send + 'static, + /// F::Output: Send + 'static, + /// { + /// fn execute(&self, fut: F) { + /// tokio::spawn(fut); + /// } + /// } + /// + /// let builder = http2::Builder::new(TokioExecutor) + /// .enable_informational(); + /// ``` + pub fn enable_informational(&mut self) -> &mut Self { + self.h2_builder.enable_informational = true; + self + } + /// Bind a connection together with a [`Service`](crate::service::Service). /// /// This returns a Future that must be polled in order for HTTP to be @@ -322,3 +358,64 @@ impl Builder { Connection { conn: proto } } } + +#[cfg(test)] +mod tests { + use super::*; + + // Mock executor for testing + #[derive(Clone)] + struct MockExec; + + #[test] + fn test_builder_default_informational_disabled() { + let builder = Builder::new(MockExec); + assert!(!builder.h2_builder.enable_informational); + } + + #[test] + fn test_enable_informational_sets_flag() { + let mut builder = Builder::new(MockExec); + builder.enable_informational(); + assert!(builder.h2_builder.enable_informational); + } + + #[test] + fn test_enable_informational_returns_self() { + let mut builder = Builder::new(MockExec); + let returned = builder.enable_informational(); + // Verify method chaining works + assert!(returned.h2_builder.enable_informational); + } + + #[test] + fn test_enable_informational_can_be_chained() { + let builder = Builder::new(MockExec).enable_informational().clone(); + assert!(builder.h2_builder.enable_informational); + } + + #[test] + fn test_builder_preserves_other_settings_when_enabling_informational() { + let mut builder = Builder::new(MockExec); + builder.max_concurrent_streams(Some(100)); + builder.enable_informational(); + + assert!(builder.h2_builder.enable_informational); + assert_eq!(builder.h2_builder.max_concurrent_streams, Some(100)); + } + + #[test] + fn test_config_values_propagate_to_builder() { + let mut builder = Builder::new(MockExec); + builder + .enable_informational() + .max_concurrent_streams(Some(50)) + .initial_stream_window_size(Some(65535)) + .max_frame_size(Some(32768)); + + assert!(builder.h2_builder.enable_informational); + assert_eq!(builder.h2_builder.max_concurrent_streams, Some(50)); + assert_eq!(builder.h2_builder.initial_stream_window_size, 65535); + assert_eq!(builder.h2_builder.max_frame_size, 32768); + } +} diff --git a/tests/integration-early-hints.rs b/tests/integration-early-hints.rs new file mode 100644 index 0000000000..5eab3b4dd0 --- /dev/null +++ b/tests/integration-early-hints.rs @@ -0,0 +1,208 @@ +#![deny(warnings)] +#![cfg(feature = "http2")] + +//! Integration tests for HTTP/2 103 Early Hints support. + +use bytes::Bytes; +use http_body_util::Full; +use hyper::client::conn::http2::Builder; +use hyper::client::conn::informational::InformationalConfig; +use hyper::server::conn::http2::Builder as ServerBuilder; +use hyper::service::service_fn; +use hyper::{Request, Response, StatusCode}; +use std::sync::{Arc, Mutex}; +use tokio::net::{TcpListener, TcpStream}; + +#[path = "support/mod.rs"] +mod support; +use support::{TokioExecutor, TokioIo}; + +/// Basic end-to-end test: server sends 103 Early Hints, client receives it +/// via callback, then gets the final 200 response. +#[tokio::test] +async fn test_http2_103_early_hints_basic() { + let _ = pretty_env_logger::try_init(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Server: sends one 103 Early Hints then a 200 OK + let server_handle = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let io = TokioIo::new(stream); + + let service = service_fn(|mut req: Request| async move { + if let Ok(mut pusher) = hyper::ext::early_hints_pusher(&mut req) { + let hints = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=style") + .header("link", "; rel=preload; as=script") + .body(()) + .unwrap(); + let _ = pusher.send_hints(hints).await; + } + + Ok::<_, hyper::Error>( + Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/html") + .body(Full::new(Bytes::from("hello"))) + .unwrap(), + ) + }); + + ServerBuilder::new(TokioExecutor) + .enable_informational() + .serve_connection(io, service) + .await + .unwrap(); + }); + + // Client: connects, registers informational callback, sends request + let received = Arc::new(Mutex::new(Vec::<(u16, Vec<(String, String)>)>::new())); + let received_clone = received.clone(); + + let config = InformationalConfig::new().with_callback(move |response: Response<()>| { + let headers: Vec<(String, String)> = response + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string())) + .collect(); + received_clone + .lock() + .unwrap() + .push((response.status().as_u16(), headers)); + }); + + let stream = TcpStream::connect(addr).await.unwrap(); + let io = TokioIo::new(stream); + + let (mut sender, conn) = Builder::new(TokioExecutor) + .informational_responses(config) + .handshake(io) + .await + .unwrap(); + + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .uri("/") + .body(Full::new(Bytes::new())) + .unwrap(); + + let response = sender.send_request(req).await.unwrap(); + + // Verify final response + assert_eq!(response.status(), StatusCode::OK); + + // Verify informational response was received + let informational = received.lock().unwrap(); + assert_eq!(informational.len(), 1, "Expected one 103 response"); + assert_eq!(informational[0].0, 103); + + // Check that Link headers were received + let link_headers: Vec<&String> = informational[0] + .1 + .iter() + .filter(|(k, _)| k == "link") + .map(|(_, v)| v) + .collect(); + assert!(!link_headers.is_empty(), "Expected Link headers in 103"); + + server_handle.abort(); +} + +/// Test that multiple 103 Early Hints responses are received in sequence. +#[tokio::test] +async fn test_http2_103_early_hints_multiple() { + let _ = pretty_env_logger::try_init(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Server: sends two 103 Early Hints then a 200 OK + let server_handle = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let io = TokioIo::new(stream); + + let service = service_fn(|mut req: Request| async move { + if let Ok(mut pusher) = hyper::ext::early_hints_pusher(&mut req) { + // First 103: CSS hints + let hints1 = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=style") + .body(()) + .unwrap(); + let _ = pusher.send_hints(hints1).await; + + // Second 103: JS hints + let hints2 = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=script") + .body(()) + .unwrap(); + let _ = pusher.send_hints(hints2).await; + } + + Ok::<_, hyper::Error>( + Response::builder() + .status(StatusCode::OK) + .body(Full::new(Bytes::from("done"))) + .unwrap(), + ) + }); + + ServerBuilder::new(TokioExecutor) + .enable_informational() + .serve_connection(io, service) + .await + .unwrap(); + }); + + // Client + let received = Arc::new(Mutex::new(Vec::<(u16, Vec<(String, String)>)>::new())); + let received_clone = received.clone(); + + let config = InformationalConfig::new().with_callback(move |response: Response<()>| { + let headers: Vec<(String, String)> = response + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string())) + .collect(); + received_clone + .lock() + .unwrap() + .push((response.status().as_u16(), headers)); + }); + + let stream = TcpStream::connect(addr).await.unwrap(); + let io = TokioIo::new(stream); + + let (mut sender, conn) = Builder::new(TokioExecutor) + .informational_responses(config) + .handshake(io) + .await + .unwrap(); + + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .uri("/") + .body(Full::new(Bytes::new())) + .unwrap(); + + let response = sender.send_request(req).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + // Verify both 103 responses were received + let informational = received.lock().unwrap(); + assert_eq!(informational.len(), 2, "Expected two 103 responses"); + assert_eq!(informational[0].0, 103); + assert_eq!(informational[1].0, 103); + + server_handle.abort(); +}