replace how parsing works
This commit is contained in:
@@ -1,18 +1,13 @@
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use axum::Router;
|
|
||||||
use axum::body::Body;
|
|
||||||
use axum::extract::{ConnectInfo, Request, State};
|
|
||||||
use axum::http::StatusCode;
|
|
||||||
use axum::response::{IntoResponse, Response};
|
|
||||||
use color_eyre::Result;
|
use color_eyre::Result;
|
||||||
use hyper_util::rt::TokioIo;
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::protocol::{StreamHeader, write_stream_header};
|
use crate::protocol::{StreamHeader, write_stream_header};
|
||||||
use crate::relay::QuicBiStream;
|
use crate::relay::{QuicBiStream, relay};
|
||||||
use crate::server::state::ServerState;
|
use crate::server::state::ServerState;
|
||||||
|
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
@@ -20,63 +15,67 @@ pub async fn run(
|
|||||||
state: Arc<ServerState>,
|
state: Arc<ServerState>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let app = Router::new().fallback(proxy_handler).with_state(state);
|
let listener = TcpListener::bind(addr).await?;
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
|
||||||
info!(%addr, "HTTP server listening");
|
info!(%addr, "HTTP server listening");
|
||||||
|
|
||||||
axum::serve(
|
loop {
|
||||||
listener,
|
tokio::select! {
|
||||||
app.into_make_service_with_connect_info::<SocketAddr>(),
|
_ = cancel.cancelled() => break,
|
||||||
)
|
accepted = listener.accept() => {
|
||||||
.with_graceful_shutdown(async move { cancel.cancelled().await })
|
let (stream, peer) = match accepted {
|
||||||
.await?;
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("HTTP accept error: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let state = state.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = handle_connection(stream, peer, state).await {
|
||||||
|
warn!("HTTP connection error: {e:#}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn proxy_handler(
|
/// Extract the Host header from raw HTTP bytes without consuming them.
|
||||||
State(state): State<Arc<ServerState>>,
|
/// Returns (subdomain, peer_display_string).
|
||||||
ConnectInfo(peer): ConnectInfo<SocketAddr>,
|
fn extract_host_from_headers(buf: &[u8], base_domain: &str) -> Option<String> {
|
||||||
req: Request<Body>,
|
let header_str = std::str::from_utf8(buf).ok()?;
|
||||||
) -> Response {
|
|
||||||
match do_proxy(state, peer, req).await {
|
// Find Host header (case-insensitive)
|
||||||
Ok(resp) => resp,
|
for line in header_str.split("\r\n").skip(1) {
|
||||||
Err(e) => {
|
if line.is_empty() {
|
||||||
warn!("HTTP proxy error: {e:#}");
|
break;
|
||||||
(StatusCode::NOT_FOUND, format!("{e}")).into_response()
|
}
|
||||||
|
if let Some(value) = line.strip_prefix("Host:").or_else(|| line.strip_prefix("host:")) {
|
||||||
|
let host = value.trim();
|
||||||
|
// Strip port if present
|
||||||
|
let host = host.split(':').next().unwrap_or(host);
|
||||||
|
let suffix = format!(".{base_domain}");
|
||||||
|
let subdomain = host.strip_suffix(&suffix).filter(|s| !s.is_empty())?;
|
||||||
|
return Some(subdomain.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_proxy(
|
async fn handle_connection(
|
||||||
state: Arc<ServerState>,
|
stream: TcpStream,
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
req: Request<Body>,
|
state: Arc<ServerState>,
|
||||||
) -> Result<Response> {
|
) -> Result<()> {
|
||||||
// Use X-Forwarded-For if present (from Traefik), otherwise direct peer
|
// Peek at the beginning of the HTTP request to extract the Host header.
|
||||||
let peer_str = req
|
// We read into a buffer but then send ALL of it through the tunnel.
|
||||||
.headers()
|
let mut buf = vec![0u8; 8192];
|
||||||
.get("x-forwarded-for")
|
let n = stream.peek(&mut buf).await?;
|
||||||
.and_then(|v| v.to_str().ok())
|
let buf = &buf[..n];
|
||||||
.map(|v| v.split(',').next().unwrap_or(v).trim().to_string())
|
|
||||||
.unwrap_or_else(|| peer.to_string());
|
|
||||||
|
|
||||||
// Extract subdomain from Host header
|
let subdomain = extract_host_from_headers(buf, &state.base_domain)
|
||||||
let host_header = req
|
.ok_or_else(|| color_eyre::eyre::eyre!("no matching Host header in HTTP request"))?;
|
||||||
.headers()
|
|
||||||
.get("host")
|
|
||||||
.and_then(|v| v.to_str().ok())
|
|
||||||
.ok_or_else(|| color_eyre::eyre::eyre!("missing Host header"))?;
|
|
||||||
|
|
||||||
// Strip port if present (e.g. "myapp.bore.localhost:8080" -> "myapp.bore.localhost")
|
|
||||||
let host = host_header.split(':').next().unwrap_or(host_header);
|
|
||||||
|
|
||||||
let suffix = format!(".{}", state.base_domain);
|
|
||||||
let subdomain = host
|
|
||||||
.strip_suffix(&suffix)
|
|
||||||
.filter(|s| !s.is_empty())
|
|
||||||
.ok_or_else(|| color_eyre::eyre::eyre!("no tunnel found for host {host}"))?
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// Look up the tunnel
|
// Look up the tunnel
|
||||||
let tunnel_id = *state
|
let tunnel_id = *state
|
||||||
@@ -87,34 +86,29 @@ async fn do_proxy(
|
|||||||
let entry = state
|
let entry = state
|
||||||
.tunnels
|
.tunnels
|
||||||
.get(&tunnel_id)
|
.get(&tunnel_id)
|
||||||
.ok_or_else(|| color_eyre::eyre::eyre!("tunnel {tunnel_id} not found in state"))?;
|
.ok_or_else(|| color_eyre::eyre::eyre!("tunnel {tunnel_id} not found"))?;
|
||||||
|
|
||||||
let connection = entry.connection.clone();
|
let connection = entry.connection.clone();
|
||||||
drop(entry);
|
drop(entry);
|
||||||
|
|
||||||
// Open QUIC stream to client
|
// Open QUIC stream and write header
|
||||||
let (mut quic_send, quic_recv) = connection.open_bi().await?;
|
let (mut quic_send, quic_recv) = connection.open_bi().await?;
|
||||||
write_stream_header(
|
write_stream_header(
|
||||||
&mut quic_send,
|
&mut quic_send,
|
||||||
&StreamHeader {
|
&StreamHeader {
|
||||||
tunnel_id,
|
tunnel_id,
|
||||||
peer_addr: peer_str,
|
peer_addr: peer.to_string(),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Use hyper to proxy the HTTP request over the QUIC stream
|
// Raw bidirectional relay — TCP stream carries the full HTTP conversation
|
||||||
let io = TokioIo::new(QuicBiStream {
|
// including upgrades (WebSocket), SSE, chunked responses, etc.
|
||||||
|
let quic_stream = QuicBiStream {
|
||||||
send: quic_send,
|
send: quic_send,
|
||||||
recv: quic_recv,
|
recv: quic_recv,
|
||||||
});
|
};
|
||||||
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
|
relay(stream, quic_stream).await?;
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = conn.await {
|
|
||||||
warn!("HTTP proxy connection error: {e}");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let resp = sender.send_request(req).await?;
|
Ok(())
|
||||||
Ok(resp.map(Body::new))
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user