From 2dbf0553a13305ea54caede771bc82d4c668ec64 Mon Sep 17 00:00:00 2001 From: Nikkuss Date: Thu, 26 Mar 2026 23:00:10 +0400 Subject: [PATCH] replace how parsing works --- src/server/endpoints/http.rs | 128 +++++++++++++++++------------------ 1 file changed, 61 insertions(+), 67 deletions(-) diff --git a/src/server/endpoints/http.rs b/src/server/endpoints/http.rs index b2e27a6..3c51d2d 100644 --- a/src/server/endpoints/http.rs +++ b/src/server/endpoints/http.rs @@ -1,18 +1,13 @@ use std::net::SocketAddr; use std::sync::Arc; -use axum::Router; -use axum::body::Body; -use axum::extract::{ConnectInfo, Request, State}; -use axum::http::StatusCode; -use axum::response::{IntoResponse, Response}; use color_eyre::Result; -use hyper_util::rt::TokioIo; +use tokio::net::{TcpListener, TcpStream}; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use crate::protocol::{StreamHeader, write_stream_header}; -use crate::relay::QuicBiStream; +use crate::relay::{QuicBiStream, relay}; use crate::server::state::ServerState; pub async fn run( @@ -20,63 +15,67 @@ pub async fn run( state: Arc, cancel: CancellationToken, ) -> Result<()> { - let app = Router::new().fallback(proxy_handler).with_state(state); - - let listener = tokio::net::TcpListener::bind(addr).await?; + let listener = TcpListener::bind(addr).await?; info!(%addr, "HTTP server listening"); - axum::serve( - listener, - app.into_make_service_with_connect_info::(), - ) - .with_graceful_shutdown(async move { cancel.cancelled().await }) - .await?; + loop { + tokio::select! { + _ = cancel.cancelled() => break, + accepted = listener.accept() => { + let (stream, peer) = match accepted { + Ok(v) => v, + Err(e) => { + warn!("HTTP accept error: {e}"); + continue; + } + }; + let state = state.clone(); + tokio::spawn(async move { + if let Err(e) = handle_connection(stream, peer, state).await { + warn!("HTTP connection error: {e:#}"); + } + }); + } + } + } Ok(()) } -async fn proxy_handler( - State(state): State>, - ConnectInfo(peer): ConnectInfo, - req: Request, -) -> Response { - match do_proxy(state, peer, req).await { - Ok(resp) => resp, - Err(e) => { - warn!("HTTP proxy error: {e:#}"); - (StatusCode::NOT_FOUND, format!("{e}")).into_response() +/// Extract the Host header from raw HTTP bytes without consuming them. +/// Returns (subdomain, peer_display_string). +fn extract_host_from_headers(buf: &[u8], base_domain: &str) -> Option { + let header_str = std::str::from_utf8(buf).ok()?; + + // Find Host header (case-insensitive) + for line in header_str.split("\r\n").skip(1) { + if line.is_empty() { + break; + } + if let Some(value) = line.strip_prefix("Host:").or_else(|| line.strip_prefix("host:")) { + let host = value.trim(); + // Strip port if present + let host = host.split(':').next().unwrap_or(host); + let suffix = format!(".{base_domain}"); + let subdomain = host.strip_suffix(&suffix).filter(|s| !s.is_empty())?; + return Some(subdomain.to_string()); } } + None } -async fn do_proxy( - state: Arc, +async fn handle_connection( + stream: TcpStream, peer: SocketAddr, - req: Request, -) -> Result { - // Use X-Forwarded-For if present (from Traefik), otherwise direct peer - let peer_str = req - .headers() - .get("x-forwarded-for") - .and_then(|v| v.to_str().ok()) - .map(|v| v.split(',').next().unwrap_or(v).trim().to_string()) - .unwrap_or_else(|| peer.to_string()); + state: Arc, +) -> Result<()> { + // Peek at the beginning of the HTTP request to extract the Host header. + // We read into a buffer but then send ALL of it through the tunnel. + let mut buf = vec![0u8; 8192]; + let n = stream.peek(&mut buf).await?; + let buf = &buf[..n]; - // Extract subdomain from Host header - let host_header = req - .headers() - .get("host") - .and_then(|v| v.to_str().ok()) - .ok_or_else(|| color_eyre::eyre::eyre!("missing Host header"))?; - - // Strip port if present (e.g. "myapp.bore.localhost:8080" -> "myapp.bore.localhost") - let host = host_header.split(':').next().unwrap_or(host_header); - - let suffix = format!(".{}", state.base_domain); - let subdomain = host - .strip_suffix(&suffix) - .filter(|s| !s.is_empty()) - .ok_or_else(|| color_eyre::eyre::eyre!("no tunnel found for host {host}"))? - .to_string(); + let subdomain = extract_host_from_headers(buf, &state.base_domain) + .ok_or_else(|| color_eyre::eyre::eyre!("no matching Host header in HTTP request"))?; // Look up the tunnel let tunnel_id = *state @@ -87,34 +86,29 @@ async fn do_proxy( let entry = state .tunnels .get(&tunnel_id) - .ok_or_else(|| color_eyre::eyre::eyre!("tunnel {tunnel_id} not found in state"))?; + .ok_or_else(|| color_eyre::eyre::eyre!("tunnel {tunnel_id} not found"))?; let connection = entry.connection.clone(); drop(entry); - // Open QUIC stream to client + // Open QUIC stream and write header let (mut quic_send, quic_recv) = connection.open_bi().await?; write_stream_header( &mut quic_send, &StreamHeader { tunnel_id, - peer_addr: peer_str, + peer_addr: peer.to_string(), }, ) .await?; - // Use hyper to proxy the HTTP request over the QUIC stream - let io = TokioIo::new(QuicBiStream { + // Raw bidirectional relay — TCP stream carries the full HTTP conversation + // including upgrades (WebSocket), SSE, chunked responses, etc. + let quic_stream = QuicBiStream { send: quic_send, recv: quic_recv, - }); - let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; - tokio::spawn(async move { - if let Err(e) = conn.await { - warn!("HTTP proxy connection error: {e}"); - } - }); + }; + relay(stream, quic_stream).await?; - let resp = sender.send_request(req).await?; - Ok(resp.map(Body::new)) + Ok(()) }