From 46eef502f20c7f371e030948756436abc99d53fe Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Tue, 17 Dec 2024 18:30:09 +0000 Subject: [PATCH 1/2] prototype support for 1xx informational responses This is a prototype intended to spur discussion about what support for 1xx informational responses should look like in a Hyper server. The good news is that it works great (for HTTP/1 only, so far). The bad news is it's kind of ugly. Here's what I did: - Add `ext::InformationalSender`, a type which wraps a `futures_channel::mspc::Sender>`. This may be added as an extension to an inbound `Request` by the Hyper server, and the application and/or middleware may use it to send one or more informational responses before sending the real one. - Add code to `proto::h1::dispatch` and friends to add such an extension to each inbound request and then poll the `Receiver` end along with the future representing the final response. If the app never sends any informational responses, then everything proceeds as normal. Otherwise, we send those responses as they become available until the final response is ready. TODO items: - [ ] Also support informational responses in the HTTP/2 server. - [ ] Determine best way to handle when the app sends an informational response with a non-1xx status code. Currently we just silently ignore it. - [ ] Come up with a less hacky API? - [ ] Add test coverage. Signed-off-by: Joel Dice --- src/error.rs | 15 -------- src/ext/mod.rs | 10 ++++++ src/proto/h1/conn.rs | 2 +- src/proto/h1/dispatch.rs | 74 ++++++++++++++++++++++++++++++++-------- src/proto/h1/mod.rs | 3 +- src/proto/h1/role.rs | 43 ++++++++++++----------- 6 files changed, 96 insertions(+), 51 deletions(-) diff --git a/src/error.rs b/src/error.rs index 5134b581a3..7917c474fd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -146,10 +146,6 @@ pub(super) enum User { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] UnexpectedHeader, - /// User tried to respond with a 1xx (not 101) response code. - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - UnsupportedStatusCode, /// User tried polling for an upgrade that doesn't exist. NoUpgrade, @@ -437,12 +433,6 @@ impl Error { Error::new(Kind::HeaderTimeout) } - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - pub(super) fn new_user_unsupported_status_code() -> Error { - Error::new_user(User::UnsupportedStatusCode) - } - pub(super) fn new_user_no_upgrade() -> Error { Error::new_user(User::NoUpgrade) } @@ -582,11 +572,6 @@ impl Error { #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "server")] Kind::User(User::UnexpectedHeader) => "user sent unexpected header", - #[cfg(feature = "http1")] - #[cfg(feature = "server")] - Kind::User(User::UnsupportedStatusCode) => { - "response has 1xx status code, not supported by server" - } Kind::User(User::NoUpgrade) => "no upgrade available", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use", diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 0fccb5ffec..2b25df71b5 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -296,3 +296,13 @@ impl OriginalHeaderOrder { self.entry_order.iter() } } + +/// Request extension type for sending one or more 1xx informational responses +/// prior to the final response. +/// +/// This extension is meant to be attached to inbound `Request`s, allowing a +/// server to send informational responses immediately (i.e. without delaying +/// them until it has constructed a final, non-informational response). +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +#[derive(Clone, Debug)] +pub struct InformationalSender(pub futures_channel::mpsc::Sender>); diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index dfcadcee56..a661699f0d 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -642,7 +642,7 @@ where head.extensions.remove::(); } - Some(encoder) + encoder } Err(err) => { self.state.error = Some(err); diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 3a9b536ea4..eb32860dd7 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -8,14 +8,22 @@ use std::{ use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; -use futures_core::ready; +#[cfg(feature = "server")] +use futures_channel::mpsc::{self, Receiver}; +use futures_util::ready; +#[cfg(feature = "server")] +use futures_util::StreamExt; use http::Request; +#[cfg(feature = "server")] +use http::Response; use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, Incoming as IncomingBody}; #[cfg(feature = "client")] use crate::client::dispatch::TrySendError; use crate::common::task; +#[cfg(feature = "server")] +use crate::ext::InformationalSender; use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead}; use crate::upgrade::OnUpgrade; @@ -35,7 +43,7 @@ pub(crate) trait Dispatch { fn poll_msg( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>>; + ) -> Poll), Self::PollError>>>; fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; @@ -46,6 +54,7 @@ cfg_server! { use crate::service::HttpService; pub(crate) struct Server, B> { + informational_rx: Option>>, in_flight: Pin>>, pub(crate) service: S, } @@ -355,17 +364,22 @@ where if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { let (head, body) = msg.map_err(crate::Error::new_user_service)?; - let body_type = if body.is_end_stream() { + let body_type = if let Some(body) = body { + if body.is_end_stream() { + self.body_rx.set(None); + None + } else { + let btype = body + .size_hint() + .exact() + .map(BodyLength::Known) + .or(Some(BodyLength::Unknown)); + self.body_rx.set(Some(body)); + btype + } + } else { self.body_rx.set(None); None - } else { - let btype = body - .size_hint() - .exact() - .map(BodyLength::Known) - .or(Some(BodyLength::Unknown)); - self.body_rx.set(Some(body)); - btype }; self.conn.write_head(head, body_type); } else { @@ -562,6 +576,7 @@ cfg_server! { { pub(crate) fn new(service: S) -> Server { Server { + informational_rx: None, in_flight: Box::pin(None), service, } @@ -589,8 +604,33 @@ cfg_server! { fn poll_msg( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll), Self::PollError>>> { let mut this = self.as_mut(); + + if let Some(informational_rx) = this.informational_rx.as_mut() { + if let Poll::Ready(informational) = informational_rx.poll_next_unpin(cx) { + if let Some(informational) = informational { + let (parts, _) = informational.into_parts(); + if parts.status.is_informational() { + let head = MessageHead { + version: parts.version, + subject: parts.status, + headers: parts.headers, + extensions: parts.extensions, + }; + return Poll::Ready(Some(Ok((head, None)))); + } else { + // TODO: We should return an error here, but we have + // no way of creating a `Self::PollError`; might + // need to change the signature of + // `Dispatch::poll_msg`. + } + } else { + this.informational_rx = None; + } + } + } + let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { let resp = ready!(fut.as_mut().poll(cx)?); let (parts, body) = resp.into_parts(); @@ -600,13 +640,14 @@ cfg_server! { headers: parts.headers, extensions: parts.extensions, }; - Poll::Ready(Some(Ok((head, body)))) + Poll::Ready(Some(Ok((head, Some(body))))) } else { unreachable!("poll_msg shouldn't be called if no inflight"); }; // Since in_flight finished, remove it this.in_flight.set(None); + this.informational_rx = None; ret } @@ -618,7 +659,10 @@ cfg_server! { *req.headers_mut() = msg.headers; *req.version_mut() = msg.version; *req.extensions_mut() = msg.extensions; + let (informational_tx, informational_rx) = mpsc::channel(1); + assert!(req.extensions_mut().insert(InformationalSender(informational_tx)).is_none()); let fut = self.service.call(req); + self.informational_rx = Some(informational_rx); self.in_flight.set(Some(fut)); Ok(()) } @@ -664,7 +708,7 @@ cfg_client! { fn poll_msg( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll), Infallible>>> { let mut this = self.as_mut(); debug_assert!(!this.rx_closed); match this.rx.poll_recv(cx) { @@ -684,7 +728,7 @@ cfg_client! { extensions: parts.extensions, }; this.callback = Some(cb); - Poll::Ready(Some(Ok((head, body)))) + Poll::Ready(Some(Ok((head, Some(body))))) } } } diff --git a/src/proto/h1/mod.rs b/src/proto/h1/mod.rs index 0a442c4c7e..491f2f7684 100644 --- a/src/proto/h1/mod.rs +++ b/src/proto/h1/mod.rs @@ -33,7 +33,8 @@ pub(crate) trait Http1Transaction { #[cfg(feature = "tracing")] const LOG: &'static str; fn parse(bytes: &mut BytesMut, ctx: ParseContext<'_>) -> ParseResult; - fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result; + fn encode(enc: Encode<'_, Self::Outgoing>, dst: &mut Vec) + -> crate::Result>; fn on_error(err: &crate::Error) -> Option>; diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 740c8e1d56..a312fa2add 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -117,7 +117,7 @@ fn is_complete_fast(bytes: &[u8], prev_len: usize) -> bool { pub(super) fn encode_headers( enc: Encode<'_, T::Outgoing>, dst: &mut Vec, -) -> crate::Result +) -> crate::Result> where T: Http1Transaction, { @@ -367,7 +367,10 @@ impl Http1Transaction for Server { })) } - fn encode(mut msg: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result { + fn encode( + msg: Encode<'_, Self::Outgoing>, + dst: &mut Vec, + ) -> crate::Result> { trace!( "Server::encode status={:?}, body={:?}, req_method={:?}", msg.head.subject, @@ -377,25 +380,19 @@ impl Http1Transaction for Server { let mut wrote_len = false; - // hyper currently doesn't support returning 1xx status codes as a Response - // This is because Service only allows returning a single Response, and - // so if you try to reply with a e.g. 100 Continue, you have no way of - // replying with the latter status code response. - let (ret, is_last) = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS { - (Ok(()), true) + let informational = msg.head.subject.is_informational(); + + let is_last = if msg.head.subject == StatusCode::SWITCHING_PROTOCOLS { + true } else if msg.req_method == &Some(Method::CONNECT) && msg.head.subject.is_success() { // Sending content-length or transfer-encoding header on 2xx response // to CONNECT is forbidden in RFC 7231. wrote_len = true; - (Ok(()), true) - } else if msg.head.subject.is_informational() { - warn!("response with 1xx status code not supported"); - *msg.head = MessageHead::default(); - msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR; - msg.body = None; - (Err(crate::Error::new_user_unsupported_status_code()), true) + true + } else if informational { + false } else { - (Ok(()), !msg.keep_alive) + !msg.keep_alive }; // In some error cases, we don't know about the invalid message until already @@ -453,6 +450,7 @@ impl Http1Transaction for Server { } orig_headers => orig_headers, }; + let encoder = if let Some(orig_headers) = orig_headers { Self::encode_headers_with_original_case( msg, @@ -466,7 +464,11 @@ impl Http1Transaction for Server { Self::encode_headers_with_lower_case(msg, dst, is_last, orig_len, wrote_len)? }; - ret.map(|()| encoder) + // If we're sending a 1xx informational response, it won't have a body, + // so we'll return `None` here. Additionally, that will tell + // `Conn::write_head` to stay in the `Writing::Init` state since we + // haven't yet sent the final response. + Ok(if informational { None } else { Some(encoder) }) } fn on_error(err: &crate::Error) -> Option> { @@ -1183,7 +1185,10 @@ impl Http1Transaction for Client { } } - fn encode(msg: Encode<'_, Self::Outgoing>, dst: &mut Vec) -> crate::Result { + fn encode( + msg: Encode<'_, Self::Outgoing>, + dst: &mut Vec, + ) -> crate::Result> { trace!( "Client::encode method={:?}, body={:?}", msg.head.subject.0, @@ -1229,7 +1234,7 @@ impl Http1Transaction for Client { extend(dst, b"\r\n"); msg.head.headers.clear(); //TODO: remove when switching to drain() - Ok(body) + Ok(Some(body)) } fn on_error(_err: &crate::Error) -> Option> { From 1be0e88815f030690bf5933838827994f5b998da Mon Sep 17 00:00:00 2001 From: Apu Islam Date: Tue, 9 Dec 2025 11:39:06 +0000 Subject: [PATCH 2/2] feat(http2): implement 103 Early Hints support Add complete HTTP/2 103 Early Hints implementation with client and server support: - Add InformationalSender extension for server-side hint transmission via mpsc channel - Create InformationalCallback system for client-side informational response handling - Extend HTTP/2 client builder with informational_responses() configuration method - Implement informational response polling in h2 client task with callback invocation - Add server-side informational response forwarding using h2's send_informational API - Include extensive integration tests covering multiple scenarios and edge cases - Add `enable_informational` config field (defaults to false) and conditional channel creation per request - Add complete working example with TLS, resource preloading, and performance monitoring - Update Cargo.toml with local h2 dependency and example build configuration The implementation enables servers to send resource preload hints before final responses, allowing browsers to start downloading critical resources early and improve page load performance. Clients can register callbacks to process 103 Early Hints and other informational responses. Closes #3980, #2426 --- Cargo.toml | 11 +- examples/README.md | 2 + examples/http2_early_hints.rs | 259 +++++++++++++++++++++++++++++++ src/client/conn/http2.rs | 58 ++++++- src/client/conn/informational.rs | 171 ++++++++++++++++++++ src/client/conn/mod.rs | 2 + src/ext/informational_sender.rs | 234 ++++++++++++++++++++++++++++ src/ext/mod.rs | 11 +- src/proto/h1/dispatch.rs | 4 +- src/proto/h1/role.rs | 2 +- src/proto/h2/client.rs | 44 ++++++ src/proto/h2/server.rs | 163 ++++++++++++++++++- src/server/conn/http2.rs | 97 ++++++++++++ tests/integration-early-hints.rs | 208 +++++++++++++++++++++++++ 14 files changed, 1257 insertions(+), 9 deletions(-) create mode 100644 examples/http2_early_hints.rs create mode 100644 src/client/conn/informational.rs create mode 100644 src/ext/informational_sender.rs create mode 100644 tests/integration-early-hints.rs diff --git a/Cargo.toml b/Cargo.toml index beb8225c4d..2f3b4af31b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,10 @@ tokio = { version = "1", features = [ ] } tokio-test = "0.4" tokio-util = "0.7.10" +# Additional dependencies for HTTP/2 Early Hints example +rcgen = "0.12" +tokio-rustls = "0.25" +rustls-pemfile = "2.0" [features] # Nothing by default @@ -86,7 +90,7 @@ http2 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:h2" # Client/Server client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"] -server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec"] +server = ["dep:httpdate", "dep:pin-project-lite", "dep:smallvec", "dep:futures-util"] # C-API support (currently unstable (no semver)) ffi = ["dep:http-body-util", "dep:futures-util"] @@ -305,6 +309,11 @@ name = "web_api" path = "examples/web_api.rs" required-features = ["full"] +[[example]] +name = "http2_early_hints" +path = "examples/http2_early_hints.rs" +required-features = ["full"] + [[bench]] name = "body" diff --git a/examples/README.md b/examples/README.md index de38911e9c..ad27667916 100644 --- a/examples/README.md +++ b/examples/README.md @@ -38,6 +38,8 @@ futures-util = { version = "0.3", default-features = false } * [`echo`](echo.rs) - An echo server that copies POST request's content to the response content. +* [`http2_early_hints`](http2_early_hints.rs) - An HTTP/2 server that sends 103 Early Hints. + ## Going Further * [`gateway`](gateway.rs) - A server gateway (reverse proxy) that proxies to the `hello` service above. diff --git a/examples/http2_early_hints.rs b/examples/http2_early_hints.rs new file mode 100644 index 0000000000..635b14a81f --- /dev/null +++ b/examples/http2_early_hints.rs @@ -0,0 +1,259 @@ +//! HTTP/2 server demonstrating 103 Early Hints +//! +//! This example shows the recommended approach: 103 Early Hints. +//! +//! Run with: +//! ``` +//! cargo run --example http2_early_hints --features full +//! ``` + +use std::convert::Infallible; +use std::fs; +use std::net::SocketAddr; +use std::time::Instant; + +use bytes::Bytes; +use http::{Request, Response, StatusCode}; +use http_body_util::Full; +use hyper::body::Incoming as IncomingBody; +use hyper::ext::early_hints_pusher; +use hyper::server::conn::http2; +use hyper::service::service_fn; +use tokio::net::TcpListener; +use tokio_rustls::rustls::{ + pki_types::{CertificateDer, PrivateKeyDer}, + ServerConfig, +}; +use tokio_rustls::TlsAcceptor; + +#[path = "../benches/support/mod.rs"] +mod support; +use support::{TokioExecutor, TokioIo}; + +/// Load certificates from provided files +fn load_certificates() -> Result< + (Vec>, PrivateKeyDer<'static>), + Box, +> { + // Read certificate file + let cert_pem = fs::read_to_string("/tmp/cert.txt")?; + + // Parse certificate chain + let mut certs = Vec::new(); + for cert in rustls_pemfile::certs(&mut cert_pem.as_bytes()) { + certs.push(cert?); + } + + // Read private key file + let key_pem = fs::read_to_string("/tmp/key.txt")?; + + // Parse private key + let mut key_reader = key_pem.as_bytes(); + let key = + rustls_pemfile::private_key(&mut key_reader)?.ok_or("No private key found in key file")?; + + Ok((certs, key)) +} + +/// Generate a self-signed certificate for testing (fallback) +fn generate_self_signed_cert() -> (Vec>, PrivateKeyDer<'static>) { + use rcgen::{Certificate as RcgenCert, CertificateParams, DistinguishedName}; + + let mut params = CertificateParams::new(vec!["localhost".to_string()]); + params.distinguished_name = DistinguishedName::new(); + + let cert = RcgenCert::from_params(params).unwrap(); + let cert_der = cert.serialize_der().unwrap(); + let private_key_der = cert.serialize_private_key_der(); + + ( + vec![CertificateDer::from(cert_der)], + PrivateKeyDer::try_from(private_key_der).unwrap(), + ) +} + +/// HTTP service demonstrating 103 Early Hints +async fn handle_request( + mut req: Request, +) -> Result>, Infallible> { + let path = req.uri().path(); + println!("Received request: {} {}", req.method(), req.uri()); + + // Handle static resources that we hinted about + match path { + "/css/critical.css" | "/css/layout.css" => { + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/css") + .body(Full::new(Bytes::from("body { font-family: sans-serif; }"))) + .unwrap()); + } + + "/js/app.js" | "/js/vendor.js" => { + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "application/javascript") + .body(Full::new(Bytes::from("console.log('loaded');"))) + .unwrap()); + } + + "/fonts/main.woff2" | "/fonts/icons.woff2" => { + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "font/woff2") + .body(Full::new(Bytes::from(&b"WOFF2"[..]))) + .unwrap()); + } + + "/images/hero.webp" => { + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "image/webp") + .body(Full::new(Bytes::from(&b"RIFF"[..]))) + .unwrap()); + } + + // Root path - serve HTML page with all the hinted resources + "/" => { + // Send 103 Early Hints using the early_hints_pusher API + if let Ok(mut pusher) = early_hints_pusher(&mut req) { + println!("Sending 103 Early Hints (all critical resources)"); + + let start_time = Instant::now(); + + let hints = Response::builder() + .status(StatusCode::EARLY_HINTS) + // Critical CSS (highest priority - render blocking) + .header("link", "; rel=preload; as=style") + .header("link", "; rel=preload; as=style") + // Critical JavaScript (high priority - interaction) + .header("link", "; rel=preload; as=script") + .header("link", "; rel=preload; as=script") + // Fonts (medium priority - text rendering) + .header( + "link", + "; rel=preload; as=font; crossorigin", + ) + .header( + "link", + "; rel=preload; as=font; crossorigin", + ) + // Hero image (medium priority - above fold) + .header("link", "; rel=preload; as=image") + // Metadata for tracking + .header("x-resource-count", "7") + .header("x-priority-order", "css,js,fonts,images") + .body(()) + .unwrap(); + + if let Err(e) = pusher.send_hints(hints).await { + eprintln!("Failed to send hints: {}", e); + } else { + let send_duration = start_time.elapsed(); + println!("103 Early Hints sent in: {:?}", send_duration); + println!(" 7 resources hinted in single response"); + println!(" Browser processes once, starts all preloads immediately"); + } + + // Simulate realistic server processing time + println!("Processing request (simulating database queries, template rendering...)"); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + } + + let html_content = r#" + + + 103 Early Hints Demo + + + + +

HTTP/2 103 Early Hints

+

The resources above were hinted via 103 before this response arrived.

+ + +"#; + + return Ok(Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/html") + .body(Full::new(Bytes::from(html_content))) + .unwrap()); + } + + // Default 404 handler + _ => { + return Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::from("Not Found"))) + .unwrap()); + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + pretty_env_logger::init(); + + let addr: SocketAddr = ([0, 0, 0, 0], 3000).into(); + + // Load provided certificates or fallback to self-signed + let (certs, key) = match load_certificates() { + Ok((certs, key)) => { + println!("Loaded certificates from /tmp/cert.txt and /tmp/key.txt"); + (certs, key) + } + Err(e) => { + println!( + "Failed to load provided certificates ({}), generating self-signed certificate...", + e + ); + generate_self_signed_cert() + } + }; + + // Configure TLS + let mut config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key)?; + + // Enable HTTP/2 + config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + + let tls_acceptor = TlsAcceptor::from(std::sync::Arc::new(config)); + + // Create TCP listener + let listener = TcpListener::bind(addr).await?; + println!("103 Early Hints Server listening on https://{}", addr); + println!("Test: curl -k --http2 -v https://localhost:3000/"); + println!("Expected: 1 103 response + 1 final 200 response"); + println!("Benefits: Minimal browser overhead, maximum performance"); + + loop { + let (tcp_stream, _) = listener.accept().await?; + let tls_acceptor = tls_acceptor.clone(); + + tokio::spawn(async move { + // Perform TLS handshake + let tls_stream = match tls_acceptor.accept(tcp_stream).await { + Ok(stream) => stream, + Err(e) => { + eprintln!("TLS handshake failed: {}", e); + return; + } + }; + + // Serve HTTP/2 connection with Early Hints support enabled + let service = service_fn(handle_request); + + if let Err(e) = http2::Builder::new(TokioExecutor) + .enable_informational() // Enable 103 Early Hints support + .serve_connection(TokioIo::new(tls_stream), service) + .await + { + eprintln!("HTTP/2 connection error: {}", e); + } + }); + } +} diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index ec046e0a39..93d3346b75 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -14,6 +14,7 @@ use futures_core::ready; use http::{Request, Response}; use super::super::dispatch::{self, TrySendError}; +use super::informational::InformationalConfig; use crate::body::{Body, Incoming as IncomingBody}; use crate::common::time::Time; use crate::proto; @@ -68,6 +69,7 @@ pub struct Builder { pub(super) exec: Ex, pub(super) timer: Time, h2_builder: proto::h2::client::Config, + informational_config: InformationalConfig, } /// Returns a handshake future over some IO. @@ -299,6 +301,7 @@ where exec, timer: Time::Empty, h2_builder: proto::h2::client::Config::default(), + informational_config: InformationalConfig::new(), } } @@ -542,6 +545,50 @@ where self } + /// Configures handling of informational responses (1xx status codes). + /// + /// By default, informational responses are ignored. This method allows you to + /// provide a callback that will be invoked whenever an informational response + /// is received, such as 103 Early Hints. + /// + /// # Examples + /// + /// ```rust + /// use hyper::client::conn::http2::Builder; + /// use hyper::client::conn::informational::InformationalConfig; + /// use http::StatusCode; + /// + /// #[derive(Clone)] + /// struct TokioExecutor; + /// + /// impl hyper::rt::Executor for TokioExecutor + /// where + /// F: std::future::Future + Send + 'static, + /// F::Output: Send + 'static, + /// { + /// fn execute(&self, fut: F) { + /// tokio::task::spawn(fut); + /// } + /// } + /// + /// let mut builder = Builder::new(TokioExecutor); + /// builder.informational_responses( + /// InformationalConfig::new().with_callback(|response| { + /// if response.status() == StatusCode::EARLY_HINTS { + /// println!("Received 103 Early Hints"); + /// // Process Link headers for resource preloading + /// for link in response.headers().get_all("link") { + /// println!("Preload: {:?}", link); + /// } + /// } + /// }) + /// ); + /// ``` + pub fn informational_responses(&mut self, config: InformationalConfig) -> &mut Self { + self.informational_config = config; + self + } + /// Constructs a connection with the configured options and IO. /// See [`client::conn`](crate::client::conn) for more. /// @@ -564,8 +611,15 @@ where trace!("client handshake HTTP/2"); let (tx, rx) = dispatch::channel(); - let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer) - .await?; + let h2 = proto::h2::client::handshake( + io, + rx, + &opts.h2_builder, + opts.exec, + opts.timer, + Some(opts.informational_config.clone()), + ) + .await?; Ok(( SendRequest { dispatch: tx.unbound(), diff --git a/src/client/conn/informational.rs b/src/client/conn/informational.rs new file mode 100644 index 0000000000..e308393815 --- /dev/null +++ b/src/client/conn/informational.rs @@ -0,0 +1,171 @@ +//! Informational response handling for HTTP/2 client connections. +//! +//! This module provides callback-based handling of 1xx informational responses, +//! including 103 Early Hints, for HTTP/2 client connections. + +use http::Response; +use std::fmt; +use std::sync::Arc; + +/// A callback function for handling informational responses (1xx status codes). +/// +/// This callback is invoked whenever the client receives an informational response +/// from the server, such as 103 Early Hints. The callback receives the complete +/// informational response including headers. +/// +/// # Examples +/// +/// ```rust +/// use hyper::client::conn::informational::InformationalCallback; +/// use http::{Response, StatusCode}; +/// use std::sync::Arc; +/// +/// let callback: InformationalCallback = Arc::new(|response: Response<()>| { +/// if response.status() == StatusCode::EARLY_HINTS { +/// println!("Received 103 Early Hints with {} headers", +/// response.headers().len()); +/// // Process Link headers for resource preloading +/// for link in response.headers().get_all("link") { +/// println!("Preload: {:?}", link); +/// } +/// } +/// }); +/// ``` +pub type InformationalCallback = Arc) + Send + Sync>; + +/// Configuration for informational response handling. +/// +/// This struct allows configuring how informational responses should be handled +/// by the HTTP/2 client connection. +#[derive(Default)] +pub struct InformationalConfig { + /// Optional callback for handling informational responses. + /// If None, informational responses are ignored (current behavior). + pub callback: Option, +} + +impl InformationalConfig { + /// Creates a new informational configuration with no callback. + pub fn new() -> Self { + Self::default() + } + + /// Sets the callback for handling informational responses. + pub fn with_callback(mut self, callback: F) -> Self + where + F: Fn(Response<()>) + Send + Sync + 'static, + { + self.callback = Some(Arc::new(callback)); + self + } + + /// Returns true if a callback is configured. + pub fn has_callback(&self) -> bool { + self.callback.is_some() + } + + /// Invokes the callback if one is configured. + /// + /// This is a test helper method - in production code, the callback + /// is extracted and called directly for better performance. + #[cfg(test)] + pub(crate) fn invoke_callback(&self, response: Response<()>) { + if let Some(ref callback) = self.callback { + callback(response); + } + } +} + +impl fmt::Debug for InformationalConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("InformationalConfig") + .field("has_callback", &self.has_callback()) + .finish() + } +} + +impl Clone for InformationalConfig { + fn clone(&self) -> Self { + // Arc allows us to clone the callback + Self { + callback: self.callback.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http::StatusCode; + use std::sync::{Arc, Mutex}; + + #[test] + fn test_informational_config_creation() { + let config = InformationalConfig::new(); + assert!(!config.has_callback()); + } + + #[test] + fn test_informational_config_with_callback() { + let called = Arc::new(Mutex::new(false)); + let called_clone = called.clone(); + + let config = InformationalConfig::new().with_callback(move |_response| { + *called_clone.lock().unwrap() = true; + }); + + assert!(config.has_callback()); + + // Test callback invocation + let mut response = Response::new(()); + *response.status_mut() = StatusCode::EARLY_HINTS; + config.invoke_callback(response); + + assert!(*called.lock().unwrap()); + } + + #[test] + fn test_informational_config_clone() { + let config = InformationalConfig::new().with_callback(|_| {}); + assert!(config.has_callback()); + + let cloned = config.clone(); + assert!(cloned.has_callback()); // Callback is cloned with Arc + } + + #[test] + fn test_early_hints_callback() { + let received_links = Arc::new(Mutex::new(Vec::new())); + let received_links_clone = received_links.clone(); + + let config = InformationalConfig::new().with_callback(move |response| { + if response.status() == StatusCode::EARLY_HINTS { + for link in response.headers().get_all("link") { + received_links_clone + .lock() + .unwrap() + .push(link.to_str().unwrap().to_string()); + } + } + }); + + // Simulate 103 Early Hints response + let mut response = Response::new(()); + *response.status_mut() = StatusCode::EARLY_HINTS; + response.headers_mut().insert( + "link", + "; rel=preload; as=style".parse().unwrap(), + ); + response.headers_mut().append( + "link", + "; rel=preload; as=script".parse().unwrap(), + ); + + config.invoke_callback(response); + + let links = received_links.lock().unwrap(); + assert_eq!(links.len(), 2); + assert!(links.contains(&"; rel=preload; as=style".to_string())); + assert!(links.contains(&"; rel=preload; as=script".to_string())); + } +} diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index f982ae6ddb..24e0764c28 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -18,5 +18,7 @@ pub mod http1; #[cfg(feature = "http2")] pub mod http2; +#[cfg(feature = "http2")] +pub mod informational; pub use super::dispatch::TrySendError; diff --git a/src/ext/informational_sender.rs b/src/ext/informational_sender.rs new file mode 100644 index 0000000000..9c22c9e96f --- /dev/null +++ b/src/ext/informational_sender.rs @@ -0,0 +1,234 @@ +//! Support for sending HTTP 103 Early Hints responses. +//! +//! This module provides the `early_hints_pusher()` function which allows +//! server handlers to send informational responses (1xx status codes) +//! before the final response. + +use http::{Request, Response, StatusCode}; + +use super::InformationalSender; + +/// A handle for sending HTTP 103 Early Hints responses. +/// +/// Obtained by calling `early_hints_pusher()` on a request. +#[derive(Debug, Clone)] +pub struct EarlyHintsPusher { + sender: futures_channel::mpsc::Sender>, +} + +impl EarlyHintsPusher { + /// Send an HTTP 103 Early Hints response. + /// + /// The response must have status code 103 and an empty body. + /// + /// Returns an error if the response is invalid or if sending fails. + pub async fn send_hints(&mut self, response: Response<()>) -> Result<(), EarlyHintsError> { + // Validate that this is a 103 response + if response.status() != StatusCode::EARLY_HINTS { + return Err(EarlyHintsError::InvalidStatus); + } + + self.sender + .try_send(response) + .map_err(|_| EarlyHintsError::SendFailed) + } +} + +/// Error type for early hints operations. +#[derive(Debug)] +pub enum EarlyHintsError { + /// The response status was not 103 + InvalidStatus, + /// Failed to send the response (channel full or closed) + SendFailed, + /// Early hints are not supported for this request + NotSupported, +} + +impl std::fmt::Display for EarlyHintsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EarlyHintsError::InvalidStatus => write!(f, "response must have status 103"), + EarlyHintsError::SendFailed => write!(f, "failed to send early hints response"), + EarlyHintsError::NotSupported => { + write!(f, "early hints not supported for this request") + } + } + } +} + +impl std::error::Error for EarlyHintsError {} + +/// Obtain a pusher for sending HTTP 103 Early Hints responses. +/// +/// This function lazily creates a channel for sending informational responses. +/// If called multiple times on the same request, it returns pushers that share +/// the same underlying channel. +/// +/// Returns `Err` if early hints are not supported for this request +/// (for example, if the connection doesn't support HTTP/2 or HTTP/1.1). +/// +/// # Example +/// +/// ```rust,no_run +/// use hyper::{Request, Response, StatusCode}; +/// use hyper::body::Incoming; +/// use hyper::ext::early_hints_pusher; +/// +/// async fn handle(mut req: Request) -> Result, hyper::Error> { +/// let preload = r#"; rel="preload"; as="style""#; +/// +/// match early_hints_pusher(&mut req) { +/// Ok(mut pusher) => { +/// let hints = Response::builder() +/// .status(StatusCode::EARLY_HINTS) +/// .header("Link", preload) +/// .body(()) +/// .unwrap(); +/// +/// if let Err(e) = pusher.send_hints(hints).await { +/// eprintln!("Failed to send early hints: {}", e); +/// } +/// } +/// Err(e) => { +/// eprintln!("Early hints not available: {}", e); +/// } +/// } +/// +/// // Send final response with the same Link header +/// Ok(Response::builder() +/// .header("Link", preload) +/// .body("...".to_string()) +/// .unwrap()) +/// } +/// ``` +pub fn early_hints_pusher(req: &mut Request) -> Result { + // Check if sender exists (pre-created by the server) + if let Some(sender) = req.extensions().get::() { + // Return a pusher that uses the existing sender + return Ok(EarlyHintsPusher { + sender: sender.0.clone(), + }); + } + + // Sender not found - early hints not supported for this request + Err(EarlyHintsError::NotSupported) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_early_hints_pusher_returns_error_when_not_supported() { + let mut req = Request::new(()); + let result = early_hints_pusher(&mut req); + + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), EarlyHintsError::NotSupported)); + } + + #[test] + fn test_early_hints_pusher_succeeds_when_sender_present() { + let mut req = Request::new(()); + let (tx, _rx) = futures_channel::mpsc::channel(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let result = early_hints_pusher(&mut req); + assert!(result.is_ok()); + } + + #[test] + fn test_early_hints_pusher_multiple_calls_reuse_sender() { + let mut req = Request::new(()); + let (tx, _rx) = futures_channel::mpsc::channel(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let pusher1 = early_hints_pusher(&mut req).unwrap(); + let pusher2 = early_hints_pusher(&mut req).unwrap(); + + // Both pushers should be valid (cloning the sender) + assert!(pusher1.sender.is_closed() == pusher2.sender.is_closed()); + } + + #[tokio::test] + async fn test_send_hints_rejects_non_103_status() { + let mut req = Request::new(()); + let (tx, _rx) = futures_channel::mpsc::channel(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let mut pusher = early_hints_pusher(&mut req).unwrap(); + + // Try to send a 200 response instead of 103 + let invalid_response = Response::builder().status(200).body(()).unwrap(); + + let result = pusher.send_hints(invalid_response).await; + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + EarlyHintsError::InvalidStatus + )); + } + + #[tokio::test] + async fn test_send_hints_accepts_103_status() { + let mut req = Request::new(()); + let (tx, mut rx) = futures_channel::mpsc::channel(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let mut pusher = early_hints_pusher(&mut req).unwrap(); + + // Send a valid 103 response + let valid_response = Response::builder() + .status(103) + .header("link", "; rel=preload; as=style") + .body(()) + .unwrap(); + + let result = pusher.send_hints(valid_response).await; + assert!(result.is_ok()); + + // Verify the response was sent through the channel + let received = rx.try_next().unwrap(); + assert!(received.is_some()); + let response = received.unwrap(); + assert_eq!(response.status(), 103); + } + + #[tokio::test] + async fn test_send_hints_fails_when_channel_closed() { + let mut req = Request::new(()); + let (tx, rx) = futures_channel::mpsc::channel::>(10); + req.extensions_mut().insert(InformationalSender(tx)); + + let mut pusher = early_hints_pusher(&mut req).unwrap(); + + // Close the receiver + drop(rx); + + // Try to send hints - should fail + let response = Response::builder().status(103).body(()).unwrap(); + + let result = pusher.send_hints(response).await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), EarlyHintsError::SendFailed)); + } + + #[test] + fn test_early_hints_error_display() { + let invalid_status = EarlyHintsError::InvalidStatus; + assert_eq!(invalid_status.to_string(), "response must have status 103"); + + let send_failed = EarlyHintsError::SendFailed; + assert_eq!( + send_failed.to_string(), + "failed to send early hints response" + ); + + let not_supported = EarlyHintsError::NotSupported; + assert_eq!( + not_supported.to_string(), + "early hints not supported for this request" + ); + } +} diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 2b25df71b5..e11e9e78ef 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -303,6 +303,15 @@ impl OriginalHeaderOrder { /// This extension is meant to be attached to inbound `Request`s, allowing a /// server to send informational responses immediately (i.e. without delaying /// them until it has constructed a final, non-informational response). +/// +/// **Note:** This type should not be constructed directly by users. +/// Use the `early_hints_pusher()` function to obtain a pusher +/// that will lazily create this extension when needed. #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] #[derive(Clone, Debug)] -pub struct InformationalSender(pub futures_channel::mpsc::Sender>); +pub struct InformationalSender(pub(crate) futures_channel::mpsc::Sender>); + +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +mod informational_sender; +#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] +pub use informational_sender::{early_hints_pusher, EarlyHintsError, EarlyHintsPusher}; diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index eb32860dd7..3721cfd544 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -10,9 +10,9 @@ use crate::rt::{Read, Write}; use bytes::{Buf, Bytes}; #[cfg(feature = "server")] use futures_channel::mpsc::{self, Receiver}; -use futures_util::ready; +use futures_core::ready; #[cfg(feature = "server")] -use futures_util::StreamExt; +use futures_util::stream::StreamExt; use http::Request; #[cfg(feature = "server")] use http::Response; diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index a312fa2add..512ada600c 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -2653,7 +2653,7 @@ mod tests { ) .unwrap(); - assert!(encoder.is_last()); + assert!(encoder.expect("encoder should exist").is_last()); } #[cfg(feature = "server")] diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 4803dba0f1..db98ee256c 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -153,6 +153,7 @@ pub(crate) async fn handshake( config: &Config, mut exec: E, timer: Time, + informational_config: Option, ) -> crate::Result> where T: Read + Write + Unpin, @@ -201,6 +202,7 @@ where h2_tx, req_rx, fut_ctx: None, + informational_callback: informational_config.and_then(|config| config.callback), marker: PhantomData, }) } @@ -418,6 +420,7 @@ where body_tx: SendStream>, body: B, cb: Callback, Response>, + informational_callback: Option, } impl Unpin for FutCtx {} @@ -434,6 +437,7 @@ where h2_tx: SendRequest>, req_rx: ClientRx, fut_ctx: Option>, + informational_callback: Option, marker: PhantomData, } @@ -571,6 +575,7 @@ where send_stream: Some(send_stream), exec: self.executor.clone(), cancel_tx: Some(cancel_tx), + informational_callback: f.informational_callback, }, call_back: Some(f.cb), }, @@ -591,6 +596,7 @@ pin_project! { send_stream: Option::Data>>>>, exec: E, cancel_tx: Option>, + informational_callback: Option, } } @@ -613,6 +619,39 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); + // First, check for any informational responses and invoke the callback if present + if let Some(callback) = &this.informational_callback { + let mut processed_informational = false; + loop { + match this.fut.poll_informational(cx) { + Poll::Ready(Some(Ok(informational_response))) => { + // Invoke the callback with the informational response + callback(informational_response); + processed_informational = true; + // Continue polling for more informational responses + continue; + } + Poll::Ready(Some(Err(_err))) => { + // Error in informational response, log it but don't fail the main response + debug!("informational response error: {}", _err); + break; + } + Poll::Ready(None) => { + // No more informational responses expected + break; + } + Poll::Pending => { + // If we processed any informational responses, return Pending to allow + // the H2 layer to process them before polling the main response + if processed_informational { + return Poll::Pending; + } + break; + } + } + } + } + let result = ready!(this.fut.poll(cx)); let ping = this.ping.take().expect("Future polled twice"); @@ -705,6 +744,10 @@ where continue; } let (head, body) = req.into_parts(); + + // Use the connection-level informational callback + let informational_callback = self.informational_callback.clone(); + let mut req = ::http::Request::from_parts(head, ()); super::strip_connection_headers(req.headers_mut(), super::MessageKind::Request); if let Some(len) = body.size_hint().exact() { @@ -751,6 +794,7 @@ where body_tx, body, cb, + informational_callback, }; // Check poll_ready() again. diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 6645734f31..75c6d3ab83 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -11,6 +11,10 @@ use h2::{Reason, RecvStream}; use http::{Method, Request}; use pin_project_lite::pin_project; +use futures_channel::mpsc::Receiver; +use futures_util::stream::StreamExt; +use http::Response; + use super::{ping, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; use crate::common::date; @@ -25,7 +29,6 @@ use crate::rt::{Read, Write}; use crate::service::HttpService; use crate::upgrade::{OnUpgrade, Pending, Upgraded}; -use crate::Response; // Our defaults are chosen for the "majority" case, which usually are not // resource constrained, and so the spec default of 64kb can be too limiting @@ -56,6 +59,7 @@ pub(crate) struct Config { pub(crate) header_table_size: Option, pub(crate) max_header_list_size: u32, pub(crate) date_header: bool, + pub(crate) enable_informational: bool, } impl Default for Config { @@ -75,6 +79,7 @@ impl Default for Config { max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE, max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE, date_header: true, + enable_informational: false, } } } @@ -90,6 +95,7 @@ pin_project! { service: S, state: State, date_header: bool, + enable_informational: bool, close_pending: bool } } @@ -178,6 +184,7 @@ where }, service, date_header: config.date_header, + enable_informational: config.enable_informational, close_pending: false, } } @@ -234,7 +241,12 @@ where if me.close_pending && srv.closing.is_none() { srv.conn.graceful_shutdown(); } - ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?; + ready!(srv.poll_server( + cx, + &mut me.service, + &mut me.exec, + me.enable_informational + ))?; return Poll::Ready(Ok(Dispatched::Shutdown)); } }; @@ -253,6 +265,7 @@ where cx: &mut Context<'_>, service: &mut S, exec: &mut E, + enable_informational: bool, ) -> Poll> where S: HttpService, @@ -309,12 +322,28 @@ where req.extensions_mut().insert(Protocol::from_inner(protocol)); } + // Conditionally create channel infrastructure for early hints + // Only when explicitly enabled via builder + let informational_rx = if enable_informational { + const CHANNEL_CAPACITY: usize = 10; + let (tx, rx) = futures_channel::mpsc::channel(CHANNEL_CAPACITY); + + // Insert sender for early_hints_pusher() to find + req.extensions_mut() + .insert(crate::ext::InformationalSender(tx)); + + Some(rx) + } else { + None + }; + let fut = H2Stream::new( service.call(req), connect_parts, respond, self.date_header, exec.clone(), + informational_rx, ); exec.execute_h2stream(fut); @@ -373,6 +402,7 @@ pin_project! { state: H2StreamState, date_header: bool, exec: E, + informational_rx: Option>>, } } @@ -410,12 +440,14 @@ where respond: SendResponse>, date_header: bool, exec: E, + informational_rx: Option>>, ) -> H2Stream { H2Stream { reply: respond, state: H2StreamState::Service { fut, connect_parts }, date_header, exec, + informational_rx, } } } @@ -442,6 +474,37 @@ where Ex: Http2UpgradedExec, E: Into>, { + /// Poll and send any pending informational responses. + /// Returns Poll::Ready(Err) if sending fails, Poll::Pending otherwise. + fn poll_informational( + informational_rx: &mut Option>>, + reply: &mut SendResponse>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(rx) = informational_rx.as_mut() { + while let Poll::Ready(informational) = rx.poll_next_unpin(cx) { + match informational { + Some(informational) => { + trace!( + "Sending informational response: {:?}", + informational.status() + ); + if let Err(e) = reply.send_informational(informational) { + debug!("Failed to send informational response: {}", e); + return Poll::Ready(Err(crate::Error::new_h2(e))); + } + } + None => { + trace!("Informational channel closed"); + *informational_rx = None; + break; + } + } + } + } + Poll::Pending + } + fn poll2(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.as_mut().project(); loop { @@ -450,6 +513,14 @@ where fut: h, connect_parts, } => { + // Check for informational responses only in Service state + // (1xx responses must come before the final response per HTTP/2 spec) + if let Poll::Ready(Err(e)) = + Self::poll_informational(me.informational_rx, me.reply, cx) + { + return Poll::Ready(Err(e)); + } + let res = match h.poll(cx) { Poll::Ready(Ok(r)) => r, Poll::Pending => { @@ -471,6 +542,14 @@ where } }; + // Service returned Ready - poll receiver one final time to catch any + // informational responses sent during service execution + if let Poll::Ready(Err(e)) = + Self::poll_informational(me.informational_rx, me.reply, cx) + { + return Poll::Ready(Err(e)); + } + let (head, body) = res.into_parts(); let mut res = ::http::Response::from_parts(head, ()); super::strip_connection_headers( @@ -521,6 +600,9 @@ where headers::set_content_length_if_missing(res.headers_mut(), len); } + // Clear informational receiver since we're now sending the final response body + *me.informational_rx = None; + let body_tx = reply!(me, res, false); H2StreamState::Body { pipe: PipeToSendStream::new(body, body_tx), @@ -558,3 +640,80 @@ where }) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_default_values() { + let config = Config::default(); + + // Verify all default values + assert!(!config.adaptive_window); + assert_eq!(config.initial_conn_window_size, DEFAULT_CONN_WINDOW); + assert_eq!(config.initial_stream_window_size, DEFAULT_STREAM_WINDOW); + assert_eq!(config.max_frame_size, DEFAULT_MAX_FRAME_SIZE); + assert!(!config.enable_connect_protocol); + assert_eq!(config.max_concurrent_streams, Some(200)); + assert_eq!(config.max_pending_accept_reset_streams, None); + assert_eq!( + config.max_local_error_reset_streams, + Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS) + ); + assert_eq!(config.keep_alive_interval, None); + assert_eq!(config.keep_alive_timeout, Duration::from_secs(20)); + assert_eq!(config.max_send_buffer_size, DEFAULT_MAX_SEND_BUF_SIZE); + assert_eq!( + config.max_header_list_size, + DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE + ); + assert!(config.date_header); + + // Most importantly, informational should be disabled by default + assert!(!config.enable_informational); + } + + #[test] + fn test_config_enable_informational_flag() { + let mut config = Config::default(); + assert!(!config.enable_informational); + + config.enable_informational = true; + assert!(config.enable_informational); + } + + #[test] + fn test_config_preserves_other_values_when_setting_informational() { + let mut config = Config::default(); + config.max_concurrent_streams = Some(100); + config.max_frame_size = 32768; + + config.enable_informational = true; + + assert!(config.enable_informational); + assert_eq!(config.max_concurrent_streams, Some(100)); + assert_eq!(config.max_frame_size, 32768); + } + + #[test] + fn test_config_can_be_cloned() { + let mut config = Config::default(); + config.enable_informational = true; + config.max_concurrent_streams = Some(50); + + let cloned = config.clone(); + + assert_eq!(cloned.enable_informational, config.enable_informational); + assert_eq!(cloned.max_concurrent_streams, config.max_concurrent_streams); + } + + #[test] + fn test_config_debug_format() { + let config = Config::default(); + let debug_output = format!("{:?}", config); + + // Verify debug output contains the field + assert!(debug_output.contains("enable_informational")); + } +} diff --git a/src/server/conn/http2.rs b/src/server/conn/http2.rs index e4dac78bea..e735c5510f 100644 --- a/src/server/conn/http2.rs +++ b/src/server/conn/http2.rs @@ -299,6 +299,42 @@ impl Builder { self } + /// Enable support for HTTP/2 informational responses (1xx status codes). + /// + /// When enabled, the server can send informational responses like 103 Early Hints + /// before the final response using the `early_hints_pusher()` function. + /// + /// This creates a small amount of overhead for each request to support the feature. + /// When disabled (default), `early_hints_pusher()` will return an error. + /// + /// Default is false (disabled). + /// + /// # Example + /// + /// ```rust + /// use hyper::server::conn::http2; + /// + /// #[derive(Clone)] + /// struct TokioExecutor; + /// + /// impl hyper::rt::Executor for TokioExecutor + /// where + /// F: std::future::Future + Send + 'static, + /// F::Output: Send + 'static, + /// { + /// fn execute(&self, fut: F) { + /// tokio::spawn(fut); + /// } + /// } + /// + /// let builder = http2::Builder::new(TokioExecutor) + /// .enable_informational(); + /// ``` + pub fn enable_informational(&mut self) -> &mut Self { + self.h2_builder.enable_informational = true; + self + } + /// Bind a connection together with a [`Service`](crate::service::Service). /// /// This returns a Future that must be polled in order for HTTP to be @@ -322,3 +358,64 @@ impl Builder { Connection { conn: proto } } } + +#[cfg(test)] +mod tests { + use super::*; + + // Mock executor for testing + #[derive(Clone)] + struct MockExec; + + #[test] + fn test_builder_default_informational_disabled() { + let builder = Builder::new(MockExec); + assert!(!builder.h2_builder.enable_informational); + } + + #[test] + fn test_enable_informational_sets_flag() { + let mut builder = Builder::new(MockExec); + builder.enable_informational(); + assert!(builder.h2_builder.enable_informational); + } + + #[test] + fn test_enable_informational_returns_self() { + let mut builder = Builder::new(MockExec); + let returned = builder.enable_informational(); + // Verify method chaining works + assert!(returned.h2_builder.enable_informational); + } + + #[test] + fn test_enable_informational_can_be_chained() { + let builder = Builder::new(MockExec).enable_informational().clone(); + assert!(builder.h2_builder.enable_informational); + } + + #[test] + fn test_builder_preserves_other_settings_when_enabling_informational() { + let mut builder = Builder::new(MockExec); + builder.max_concurrent_streams(Some(100)); + builder.enable_informational(); + + assert!(builder.h2_builder.enable_informational); + assert_eq!(builder.h2_builder.max_concurrent_streams, Some(100)); + } + + #[test] + fn test_config_values_propagate_to_builder() { + let mut builder = Builder::new(MockExec); + builder + .enable_informational() + .max_concurrent_streams(Some(50)) + .initial_stream_window_size(Some(65535)) + .max_frame_size(Some(32768)); + + assert!(builder.h2_builder.enable_informational); + assert_eq!(builder.h2_builder.max_concurrent_streams, Some(50)); + assert_eq!(builder.h2_builder.initial_stream_window_size, 65535); + assert_eq!(builder.h2_builder.max_frame_size, 32768); + } +} diff --git a/tests/integration-early-hints.rs b/tests/integration-early-hints.rs new file mode 100644 index 0000000000..5eab3b4dd0 --- /dev/null +++ b/tests/integration-early-hints.rs @@ -0,0 +1,208 @@ +#![deny(warnings)] +#![cfg(feature = "http2")] + +//! Integration tests for HTTP/2 103 Early Hints support. + +use bytes::Bytes; +use http_body_util::Full; +use hyper::client::conn::http2::Builder; +use hyper::client::conn::informational::InformationalConfig; +use hyper::server::conn::http2::Builder as ServerBuilder; +use hyper::service::service_fn; +use hyper::{Request, Response, StatusCode}; +use std::sync::{Arc, Mutex}; +use tokio::net::{TcpListener, TcpStream}; + +#[path = "support/mod.rs"] +mod support; +use support::{TokioExecutor, TokioIo}; + +/// Basic end-to-end test: server sends 103 Early Hints, client receives it +/// via callback, then gets the final 200 response. +#[tokio::test] +async fn test_http2_103_early_hints_basic() { + let _ = pretty_env_logger::try_init(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Server: sends one 103 Early Hints then a 200 OK + let server_handle = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let io = TokioIo::new(stream); + + let service = service_fn(|mut req: Request| async move { + if let Ok(mut pusher) = hyper::ext::early_hints_pusher(&mut req) { + let hints = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=style") + .header("link", "; rel=preload; as=script") + .body(()) + .unwrap(); + let _ = pusher.send_hints(hints).await; + } + + Ok::<_, hyper::Error>( + Response::builder() + .status(StatusCode::OK) + .header("content-type", "text/html") + .body(Full::new(Bytes::from("hello"))) + .unwrap(), + ) + }); + + ServerBuilder::new(TokioExecutor) + .enable_informational() + .serve_connection(io, service) + .await + .unwrap(); + }); + + // Client: connects, registers informational callback, sends request + let received = Arc::new(Mutex::new(Vec::<(u16, Vec<(String, String)>)>::new())); + let received_clone = received.clone(); + + let config = InformationalConfig::new().with_callback(move |response: Response<()>| { + let headers: Vec<(String, String)> = response + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string())) + .collect(); + received_clone + .lock() + .unwrap() + .push((response.status().as_u16(), headers)); + }); + + let stream = TcpStream::connect(addr).await.unwrap(); + let io = TokioIo::new(stream); + + let (mut sender, conn) = Builder::new(TokioExecutor) + .informational_responses(config) + .handshake(io) + .await + .unwrap(); + + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .uri("/") + .body(Full::new(Bytes::new())) + .unwrap(); + + let response = sender.send_request(req).await.unwrap(); + + // Verify final response + assert_eq!(response.status(), StatusCode::OK); + + // Verify informational response was received + let informational = received.lock().unwrap(); + assert_eq!(informational.len(), 1, "Expected one 103 response"); + assert_eq!(informational[0].0, 103); + + // Check that Link headers were received + let link_headers: Vec<&String> = informational[0] + .1 + .iter() + .filter(|(k, _)| k == "link") + .map(|(_, v)| v) + .collect(); + assert!(!link_headers.is_empty(), "Expected Link headers in 103"); + + server_handle.abort(); +} + +/// Test that multiple 103 Early Hints responses are received in sequence. +#[tokio::test] +async fn test_http2_103_early_hints_multiple() { + let _ = pretty_env_logger::try_init(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Server: sends two 103 Early Hints then a 200 OK + let server_handle = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let io = TokioIo::new(stream); + + let service = service_fn(|mut req: Request| async move { + if let Ok(mut pusher) = hyper::ext::early_hints_pusher(&mut req) { + // First 103: CSS hints + let hints1 = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=style") + .body(()) + .unwrap(); + let _ = pusher.send_hints(hints1).await; + + // Second 103: JS hints + let hints2 = Response::builder() + .status(StatusCode::EARLY_HINTS) + .header("link", "; rel=preload; as=script") + .body(()) + .unwrap(); + let _ = pusher.send_hints(hints2).await; + } + + Ok::<_, hyper::Error>( + Response::builder() + .status(StatusCode::OK) + .body(Full::new(Bytes::from("done"))) + .unwrap(), + ) + }); + + ServerBuilder::new(TokioExecutor) + .enable_informational() + .serve_connection(io, service) + .await + .unwrap(); + }); + + // Client + let received = Arc::new(Mutex::new(Vec::<(u16, Vec<(String, String)>)>::new())); + let received_clone = received.clone(); + + let config = InformationalConfig::new().with_callback(move |response: Response<()>| { + let headers: Vec<(String, String)> = response + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string())) + .collect(); + received_clone + .lock() + .unwrap() + .push((response.status().as_u16(), headers)); + }); + + let stream = TcpStream::connect(addr).await.unwrap(); + let io = TokioIo::new(stream); + + let (mut sender, conn) = Builder::new(TokioExecutor) + .informational_responses(config) + .handshake(io) + .await + .unwrap(); + + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .uri("/") + .body(Full::new(Bytes::new())) + .unwrap(); + + let response = sender.send_request(req).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + // Verify both 103 responses were received + let informational = received.lock().unwrap(); + assert_eq!(informational.len(), 2, "Expected two 103 responses"); + assert_eq!(informational[0].0, 103); + assert_eq!(informational[1].0, 103); + + server_handle.abort(); +}