init
This commit is contained in:
@@ -0,0 +1,120 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::Router;
|
||||
use axum::body::Body;
|
||||
use axum::extract::{ConnectInfo, Request, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use color_eyre::Result;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::protocol::{StreamHeader, write_stream_header};
|
||||
use crate::relay::QuicBiStream;
|
||||
use crate::server::state::ServerState;
|
||||
|
||||
pub async fn run(
|
||||
addr: SocketAddr,
|
||||
state: Arc<ServerState>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<()> {
|
||||
let app = Router::new().fallback(proxy_handler).with_state(state);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||
info!(%addr, "HTTP server listening");
|
||||
|
||||
axum::serve(
|
||||
listener,
|
||||
app.into_make_service_with_connect_info::<SocketAddr>(),
|
||||
)
|
||||
.with_graceful_shutdown(async move { cancel.cancelled().await })
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn proxy_handler(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
ConnectInfo(peer): ConnectInfo<SocketAddr>,
|
||||
req: Request<Body>,
|
||||
) -> Response {
|
||||
match do_proxy(state, peer, req).await {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
warn!("HTTP proxy error: {e:#}");
|
||||
(StatusCode::NOT_FOUND, format!("{e}")).into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_proxy(
|
||||
state: Arc<ServerState>,
|
||||
peer: SocketAddr,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response> {
|
||||
// Use X-Forwarded-For if present (from Traefik), otherwise direct peer
|
||||
let peer_str = req
|
||||
.headers()
|
||||
.get("x-forwarded-for")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|v| v.split(',').next().unwrap_or(v).trim().to_string())
|
||||
.unwrap_or_else(|| peer.to_string());
|
||||
|
||||
// Extract subdomain from Host header
|
||||
let host_header = req
|
||||
.headers()
|
||||
.get("host")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.ok_or_else(|| color_eyre::eyre::eyre!("missing Host header"))?;
|
||||
|
||||
// Strip port if present (e.g. "myapp.bore.localhost:8080" -> "myapp.bore.localhost")
|
||||
let host = host_header.split(':').next().unwrap_or(host_header);
|
||||
|
||||
let suffix = format!(".{}", state.base_domain);
|
||||
let subdomain = host
|
||||
.strip_suffix(&suffix)
|
||||
.filter(|s| !s.is_empty())
|
||||
.ok_or_else(|| color_eyre::eyre::eyre!("no tunnel found for host {host}"))?
|
||||
.to_string();
|
||||
|
||||
// Look up the tunnel
|
||||
let tunnel_id = *state
|
||||
.http_routes
|
||||
.get(&subdomain)
|
||||
.ok_or_else(|| color_eyre::eyre::eyre!("no tunnel for subdomain {subdomain}"))?;
|
||||
|
||||
let entry = state
|
||||
.tunnels
|
||||
.get(&tunnel_id)
|
||||
.ok_or_else(|| color_eyre::eyre::eyre!("tunnel {tunnel_id} not found in state"))?;
|
||||
|
||||
let connection = entry.connection.clone();
|
||||
drop(entry);
|
||||
|
||||
// Open QUIC stream to client
|
||||
let (mut quic_send, quic_recv) = connection.open_bi().await?;
|
||||
write_stream_header(
|
||||
&mut quic_send,
|
||||
&StreamHeader {
|
||||
tunnel_id,
|
||||
peer_addr: peer_str,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Use hyper to proxy the HTTP request over the QUIC stream
|
||||
let io = TokioIo::new(QuicBiStream {
|
||||
send: quic_send,
|
||||
recv: quic_recv,
|
||||
});
|
||||
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
warn!("HTTP proxy connection error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
let resp = sender.send_request(req).await?;
|
||||
Ok(resp.map(Body::new))
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
pub mod http;
|
||||
pub mod tcp;
|
||||
pub mod udp;
|
||||
@@ -0,0 +1,75 @@
|
||||
use color_eyre::Result;
|
||||
use quinn::Connection;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use crate::protocol::{StreamHeader, write_stream_header};
|
||||
use crate::relay::{QuicBiStream, relay};
|
||||
use crate::tunnel::TunnelId;
|
||||
|
||||
/// Bind a TCP listener on the given port and relay each accepted connection
|
||||
/// through a new QUIC bidirectional stream to the client.
|
||||
/// Returns the actually assigned port.
|
||||
pub async fn bind_and_relay(
|
||||
tunnel_id: TunnelId,
|
||||
port: u16,
|
||||
connection: Connection,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<u16> {
|
||||
let listener = TcpListener::bind(("0.0.0.0", port)).await?;
|
||||
let assigned_port = listener.local_addr()?.port();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
accepted = listener.accept() => {
|
||||
let (tcp_stream, peer) = match accepted {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!(tunnel_id, "TCP accept error: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
info!(tunnel_id, %peer, "accepted TCP connection");
|
||||
|
||||
let connection = connection.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = relay_tcp(tunnel_id, peer, tcp_stream, connection).await {
|
||||
warn!(tunnel_id, %peer, "TCP relay error: {e:#}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(assigned_port)
|
||||
}
|
||||
|
||||
async fn relay_tcp(
|
||||
tunnel_id: TunnelId,
|
||||
peer: SocketAddr,
|
||||
tcp_stream: tokio::net::TcpStream,
|
||||
connection: Connection,
|
||||
) -> Result<()> {
|
||||
let (mut quic_send, quic_recv) = connection.open_bi().await?;
|
||||
write_stream_header(
|
||||
&mut quic_send,
|
||||
&StreamHeader {
|
||||
tunnel_id,
|
||||
peer_addr: peer.to_string(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let quic_stream = QuicBiStream {
|
||||
send: quic_send,
|
||||
recv: quic_recv,
|
||||
};
|
||||
relay(tcp_stream, quic_stream).await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use color_eyre::Result;
|
||||
use quinn::Connection;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::protocol::{StreamHeader, read_udp_frame, write_stream_header, write_udp_frame};
|
||||
use crate::tunnel::TunnelId;
|
||||
|
||||
/// Bind a UDP socket and relay datagrams through QUIC streams (one stream per source address).
|
||||
/// Returns the actually assigned port.
|
||||
pub async fn bind_and_relay(
|
||||
tunnel_id: TunnelId,
|
||||
port: u16,
|
||||
connection: Connection,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<u16> {
|
||||
let socket = Arc::new(UdpSocket::bind(("0.0.0.0", port)).await?);
|
||||
let assigned_port = socket.local_addr()?.port();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_udp_relay(tunnel_id, socket, connection, cancel).await {
|
||||
warn!(tunnel_id, "UDP relay error: {e:#}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(assigned_port)
|
||||
}
|
||||
|
||||
async fn run_udp_relay(
|
||||
tunnel_id: TunnelId,
|
||||
socket: Arc<UdpSocket>,
|
||||
connection: Connection,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<()> {
|
||||
// Track active sessions: source_addr -> QUIC send stream
|
||||
let sessions: Arc<Mutex<HashMap<SocketAddr, quinn::SendStream>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let mut buf = vec![0u8; 65536];
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
result = socket.recv_from(&mut buf) => {
|
||||
let (n, src_addr) = result?;
|
||||
let data = buf[..n].to_vec();
|
||||
|
||||
let mut sessions_guard = sessions.lock().await;
|
||||
if let Some(send) = sessions_guard.get_mut(&src_addr) {
|
||||
// Existing session: send datagram on existing stream
|
||||
if let Err(e) = write_udp_frame(send, &data).await {
|
||||
warn!(tunnel_id, %src_addr, "failed to write to QUIC stream: {e}");
|
||||
sessions_guard.remove(&src_addr);
|
||||
}
|
||||
} else {
|
||||
// New session: open a new QUIC stream
|
||||
match connection.open_bi().await {
|
||||
Ok((mut quic_send, quic_recv)) => {
|
||||
if let Err(e) = write_stream_header(&mut quic_send, &StreamHeader {
|
||||
tunnel_id,
|
||||
peer_addr: src_addr.to_string(),
|
||||
}).await {
|
||||
warn!(tunnel_id, %src_addr, "failed to write stream header: {e}");
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = write_udp_frame(&mut quic_send, &data).await {
|
||||
warn!(tunnel_id, %src_addr, "failed to write first datagram: {e}");
|
||||
continue;
|
||||
}
|
||||
sessions_guard.insert(src_addr, quic_send);
|
||||
info!(tunnel_id, %src_addr, "new UDP session");
|
||||
|
||||
// Spawn task to read replies from client
|
||||
let socket = socket.clone();
|
||||
let sessions = sessions.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_udp_replies(
|
||||
tunnel_id, src_addr, quic_recv, socket, sessions,
|
||||
).await {
|
||||
warn!(tunnel_id, %src_addr, "UDP reply handler error: {e:#}");
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(tunnel_id, "failed to open QUIC stream: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_udp_replies(
|
||||
tunnel_id: TunnelId,
|
||||
src_addr: SocketAddr,
|
||||
mut quic_recv: quinn::RecvStream,
|
||||
socket: Arc<UdpSocket>,
|
||||
sessions: Arc<Mutex<HashMap<SocketAddr, quinn::SendStream>>>,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
let data = match read_udp_frame(&mut quic_recv).await {
|
||||
Ok(d) => d,
|
||||
Err(_) => break,
|
||||
};
|
||||
socket.send_to(&data, src_addr).await?;
|
||||
}
|
||||
|
||||
// Clean up session
|
||||
sessions.lock().await.remove(&src_addr);
|
||||
info!(tunnel_id, %src_addr, "UDP session ended");
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
use std::process::Command;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use color_eyre::Result;
|
||||
use tracing::{info, warn};
|
||||
|
||||
static CHAIN_CREATED: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
const CHAIN: &str = "BORE";
|
||||
|
||||
fn iptables(args: &[&str]) -> Result<()> {
|
||||
let output = Command::new("iptables").args(args).output()?;
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
return Err(color_eyre::eyre::eyre!(
|
||||
"iptables {} failed: {}",
|
||||
args.join(" "),
|
||||
stderr.trim()
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn chain_exists() -> bool {
|
||||
Command::new("iptables")
|
||||
.args(["-n", "-L", CHAIN])
|
||||
.output()
|
||||
.is_ok_and(|o| o.status.success())
|
||||
}
|
||||
|
||||
/// Create the BORE chain and jump rule. Flushes any stale rules from a previous run.
|
||||
pub fn init() -> Result<()> {
|
||||
if chain_exists() {
|
||||
iptables(&["-F", CHAIN])?;
|
||||
info!("flushed stale iptables chain {CHAIN}");
|
||||
} else {
|
||||
iptables(&["-N", CHAIN])?;
|
||||
info!("created iptables chain {CHAIN}");
|
||||
}
|
||||
|
||||
// Add jump from INPUT to BORE if not already present
|
||||
let check = Command::new("iptables")
|
||||
.args(["-C", "INPUT", "-j", CHAIN])
|
||||
.output()?;
|
||||
if !check.status.success() {
|
||||
iptables(&["-I", "INPUT", "-j", CHAIN])?;
|
||||
info!("added INPUT -> {CHAIN} jump rule");
|
||||
}
|
||||
|
||||
CHAIN_CREATED.store(true, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Allow inbound traffic on a TCP or UDP port.
|
||||
pub fn allow_port(port: u16, proto: &str) {
|
||||
if !CHAIN_CREATED.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
let port_str = port.to_string();
|
||||
if let Err(e) = iptables(&["-A", CHAIN, "-p", proto, "--dport", &port_str, "-j", "ACCEPT"]) {
|
||||
warn!("failed to add firewall rule for {proto}/{port}: {e}");
|
||||
} else {
|
||||
info!("firewall: opened {proto}/{port}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove the allow rule for a TCP or UDP port.
|
||||
pub fn deny_port(port: u16, proto: &str) {
|
||||
if !CHAIN_CREATED.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
let port_str = port.to_string();
|
||||
if let Err(e) = iptables(&["-D", CHAIN, "-p", proto, "--dport", &port_str, "-j", "ACCEPT"]) {
|
||||
warn!("failed to remove firewall rule for {proto}/{port}: {e}");
|
||||
} else {
|
||||
info!("firewall: closed {proto}/{port}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush the BORE chain and remove the jump rule. Call on shutdown.
|
||||
pub fn cleanup() {
|
||||
if !CHAIN_CREATED.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
// Remove jump rule
|
||||
let _ = iptables(&["-D", "INPUT", "-j", CHAIN]);
|
||||
// Flush chain
|
||||
let _ = iptables(&["-F", CHAIN]);
|
||||
// Delete chain
|
||||
let _ = iptables(&["-X", CHAIN]);
|
||||
info!("firewall: cleaned up {CHAIN} chain");
|
||||
CHAIN_CREATED.store(false, Ordering::Relaxed);
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
pub mod endpoints;
|
||||
pub mod firewall;
|
||||
pub mod quic_listener;
|
||||
pub mod state;
|
||||
pub mod traefik;
|
||||
|
||||
use endpoints::http;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use color_eyre::Result;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::quic;
|
||||
use state::ServerState;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct ServerArgs {
|
||||
/// Address for the QUIC listener
|
||||
#[arg(long, default_value = "0.0.0.0:4843", env = "BORE_LISTEN_ADDR")]
|
||||
pub listen_addr: SocketAddr,
|
||||
|
||||
/// Address for the HTTP tunnel proxy
|
||||
#[arg(long, default_value = "0.0.0.0:8080", env = "BORE_HTTP_ADDR")]
|
||||
pub http_addr: SocketAddr,
|
||||
|
||||
/// Address for the Traefik provider API
|
||||
#[arg(long, default_value = "127.0.0.1:3100", env = "BORE_API_ADDR")]
|
||||
pub api_addr: SocketAddr,
|
||||
|
||||
/// Base domain for HTTP tunnel subdomains (e.g. bore.example.com)
|
||||
#[arg(long, env = "BORE_BASE_DOMAIN")]
|
||||
pub base_domain: String,
|
||||
|
||||
/// Shared secret for client authentication
|
||||
#[arg(long, env = "BORE_SECRET")]
|
||||
pub secret: String,
|
||||
|
||||
/// Traefik entrypoint name for HTTP tunnel routers
|
||||
#[arg(long, default_value = "websecure", env = "BORE_TRAEFIK_ENTRYPOINT")]
|
||||
pub traefik_entrypoint: String,
|
||||
|
||||
/// Traefik TLS cert resolver name (e.g. "letsencrypt"). Omit to disable TLS in generated config.
|
||||
#[arg(long, env = "BORE_TRAEFIK_CERT_RESOLVER")]
|
||||
pub traefik_cert_resolver: Option<String>,
|
||||
|
||||
/// Manage iptables rules for tunnel ports (requires root/CAP_NET_ADMIN)
|
||||
#[arg(long, default_value_t = false, env = "BORE_MANAGE_FIREWALL")]
|
||||
pub manage_firewall: bool,
|
||||
|
||||
/// Directory for persistent data (TLS cert/key)
|
||||
#[arg(long, default_value = "./bore-data", env = "BORE_DATA_DIR")]
|
||||
pub data_dir: PathBuf,
|
||||
}
|
||||
|
||||
pub async fn run(args: ServerArgs) -> Result<()> {
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
if args.manage_firewall {
|
||||
firewall::init()?;
|
||||
}
|
||||
|
||||
let state = Arc::new(ServerState::new(
|
||||
args.secret.clone(),
|
||||
args.base_domain.clone(),
|
||||
args.http_addr,
|
||||
args.traefik_entrypoint.clone(),
|
||||
args.traefik_cert_resolver.clone(),
|
||||
args.manage_firewall,
|
||||
));
|
||||
|
||||
let (certs, key) = quic::load_or_generate_cert(&args.data_dir)?;
|
||||
tracing::info!(
|
||||
"server cert fingerprint: {}",
|
||||
quic::cert_fingerprint(certs[0].as_ref())
|
||||
);
|
||||
let endpoint = quic::make_server_endpoint(args.listen_addr, certs, key)?;
|
||||
|
||||
let quic_handle = {
|
||||
let state = state.clone();
|
||||
let cancel = cancel.clone();
|
||||
tokio::spawn(async move { quic_listener::run(endpoint, state, cancel).await })
|
||||
};
|
||||
|
||||
let http_handle = {
|
||||
let state = state.clone();
|
||||
let cancel = cancel.clone();
|
||||
tokio::spawn(async move { http::run(args.http_addr, state, cancel).await })
|
||||
};
|
||||
|
||||
let api_handle = {
|
||||
let state = state.clone();
|
||||
let cancel = cancel.clone();
|
||||
tokio::spawn(async move { traefik::run(args.api_addr, state, cancel).await })
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
res = quic_handle => {
|
||||
tracing::error!("QUIC listener exited: {res:?}");
|
||||
}
|
||||
res = http_handle => {
|
||||
tracing::error!("HTTP server exited: {res:?}");
|
||||
}
|
||||
res = api_handle => {
|
||||
tracing::error!("Traefik API exited: {res:?}");
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
tracing::info!("shutting down");
|
||||
cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
if args.manage_firewall {
|
||||
firewall::cleanup();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,211 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use color_eyre::Result;
|
||||
use quinn::Endpoint;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::protocol::{
|
||||
ClientMessage, ServerMessage, TunnelProtocol, read_client_message, write_server_message,
|
||||
};
|
||||
use crate::server::endpoints::{tcp, udp};
|
||||
use crate::tunnel::TunnelInfo;
|
||||
|
||||
use super::state::ServerState;
|
||||
|
||||
pub async fn run(
|
||||
endpoint: Endpoint,
|
||||
state: Arc<ServerState>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
incoming = endpoint.accept() => {
|
||||
let Some(incoming) = incoming else { break };
|
||||
let state = state.clone();
|
||||
let cancel = cancel.child_token();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_connection(incoming, state, cancel).await {
|
||||
warn!("connection error: {e:#}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
incoming: quinn::Incoming,
|
||||
state: Arc<ServerState>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<()> {
|
||||
let connection = incoming.await?;
|
||||
let remote = connection.remote_address();
|
||||
info!(%remote, "new QUIC connection");
|
||||
|
||||
let connection_id = state.next_connection_id();
|
||||
|
||||
// Spawn watcher to clean up when connection closes
|
||||
{
|
||||
let state = state.clone();
|
||||
let conn = connection.clone();
|
||||
tokio::spawn(async move {
|
||||
conn.closed().await;
|
||||
info!(%remote, "connection closed, cleaning up");
|
||||
state.remove_connection(connection_id);
|
||||
});
|
||||
}
|
||||
|
||||
// Accept the control stream (first bidirectional stream)
|
||||
let (mut send, mut recv) = connection.accept_bi().await?;
|
||||
|
||||
// Authenticate
|
||||
let msg = read_client_message(&mut recv).await?;
|
||||
match msg {
|
||||
ClientMessage::Auth { secret } => {
|
||||
if secret != state.secret {
|
||||
write_server_message(
|
||||
&mut send,
|
||||
&ServerMessage::Error {
|
||||
message: "invalid secret".to_string(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
write_server_message(&mut send, &ServerMessage::AuthOk).await?;
|
||||
info!(%remote, "authenticated");
|
||||
}
|
||||
_ => {
|
||||
write_server_message(
|
||||
&mut send,
|
||||
&ServerMessage::Error {
|
||||
message: "expected Auth message".to_string(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Process tunnel requests
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => break,
|
||||
msg = read_client_message(&mut recv) => {
|
||||
let msg = match msg {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
info!(%remote, "control stream closed: {e:#}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match msg {
|
||||
ClientMessage::RequestTunnel { protocol, local_port, remote_port, subdomain } => {
|
||||
let tunnel_id = state.next_tunnel_id();
|
||||
let tunnel_cancel = cancel.child_token();
|
||||
|
||||
match protocol {
|
||||
TunnelProtocol::Tcp => {
|
||||
let port = remote_port.unwrap_or(0);
|
||||
match tcp::bind_and_relay(
|
||||
tunnel_id,
|
||||
port,
|
||||
connection.clone(),
|
||||
tunnel_cancel,
|
||||
).await {
|
||||
Ok(assigned_port) => {
|
||||
let info = TunnelInfo {
|
||||
id: tunnel_id,
|
||||
protocol,
|
||||
target: format!("client:{local_port}"),
|
||||
remote_port: Some(assigned_port),
|
||||
subdomain: None,
|
||||
};
|
||||
state.register_tunnel(connection_id, info, connection.clone());
|
||||
write_server_message(&mut send, &ServerMessage::TunnelCreated {
|
||||
tunnel_id,
|
||||
protocol,
|
||||
assigned_port: Some(assigned_port),
|
||||
assigned_subdomain: None,
|
||||
}).await?;
|
||||
tracing::info!(tunnel_id, assigned_port, "TCP tunnel created");
|
||||
}
|
||||
Err(e) => {
|
||||
write_server_message(&mut send, &ServerMessage::Error {
|
||||
message: format!("failed to bind TCP: {e}"),
|
||||
}).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
TunnelProtocol::Udp => {
|
||||
let port = remote_port.unwrap_or(0);
|
||||
match udp::bind_and_relay(
|
||||
tunnel_id,
|
||||
port,
|
||||
connection.clone(),
|
||||
tunnel_cancel,
|
||||
).await {
|
||||
Ok(assigned_port) => {
|
||||
let info = TunnelInfo {
|
||||
id: tunnel_id,
|
||||
protocol,
|
||||
target: format!("client:{local_port}"),
|
||||
remote_port: Some(assigned_port),
|
||||
subdomain: None,
|
||||
};
|
||||
state.register_tunnel(connection_id, info, connection.clone());
|
||||
write_server_message(&mut send, &ServerMessage::TunnelCreated {
|
||||
tunnel_id,
|
||||
protocol,
|
||||
assigned_port: Some(assigned_port),
|
||||
assigned_subdomain: None,
|
||||
}).await?;
|
||||
tracing::info!(tunnel_id, assigned_port, "UDP tunnel created");
|
||||
}
|
||||
Err(e) => {
|
||||
write_server_message(&mut send, &ServerMessage::Error {
|
||||
message: format!("failed to bind UDP: {e}"),
|
||||
}).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
TunnelProtocol::Http => {
|
||||
let subdomain = subdomain.unwrap_or_else(|| {
|
||||
uuid::Uuid::new_v4().to_string()[..8].to_string()
|
||||
});
|
||||
let fqdn = format!("{}.{}", subdomain, state.base_domain);
|
||||
let info = TunnelInfo {
|
||||
id: tunnel_id,
|
||||
protocol,
|
||||
target: format!("client:{local_port}"),
|
||||
remote_port: None,
|
||||
subdomain: Some(subdomain.clone()),
|
||||
};
|
||||
state.register_tunnel(connection_id, info, connection.clone());
|
||||
let url = format!("http://{fqdn}");
|
||||
write_server_message(&mut send, &ServerMessage::TunnelCreated {
|
||||
tunnel_id,
|
||||
protocol,
|
||||
assigned_port: None,
|
||||
assigned_subdomain: Some(url),
|
||||
}).await?;
|
||||
tracing::info!(tunnel_id, %fqdn, "HTTP tunnel created");
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
write_server_message(&mut send, &ServerMessage::Error {
|
||||
message: "unexpected message on control stream".to_string(),
|
||||
}).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use quinn::Connection;
|
||||
|
||||
use crate::protocol::TunnelProtocol;
|
||||
use crate::tunnel::{TunnelId, TunnelInfo};
|
||||
use super::firewall;
|
||||
|
||||
/// A registered tunnel on the server.
|
||||
pub struct TunnelEntry {
|
||||
pub info: TunnelInfo,
|
||||
/// The QUIC connection to the client that owns this tunnel.
|
||||
pub connection: Connection,
|
||||
}
|
||||
|
||||
/// Shared server state.
|
||||
pub struct ServerState {
|
||||
pub secret: String,
|
||||
pub base_domain: String,
|
||||
pub http_addr: SocketAddr,
|
||||
pub traefik_entrypoint: String,
|
||||
pub traefik_cert_resolver: Option<String>,
|
||||
pub manage_firewall: bool,
|
||||
/// tunnel_id -> TunnelEntry
|
||||
pub tunnels: DashMap<TunnelId, TunnelEntry>,
|
||||
/// subdomain -> tunnel_id (for HTTP routing)
|
||||
pub http_routes: DashMap<String, TunnelId>,
|
||||
/// connection_id -> list of tunnel_ids (for cleanup)
|
||||
pub connection_tunnels: DashMap<usize, Vec<TunnelId>>,
|
||||
next_tunnel_id: AtomicU64,
|
||||
next_connection_id: AtomicU64,
|
||||
}
|
||||
|
||||
impl ServerState {
|
||||
pub fn new(
|
||||
secret: String,
|
||||
base_domain: String,
|
||||
http_addr: SocketAddr,
|
||||
traefik_entrypoint: String,
|
||||
traefik_cert_resolver: Option<String>,
|
||||
manage_firewall: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
secret,
|
||||
base_domain,
|
||||
http_addr,
|
||||
traefik_entrypoint,
|
||||
traefik_cert_resolver,
|
||||
manage_firewall,
|
||||
tunnels: DashMap::new(),
|
||||
http_routes: DashMap::new(),
|
||||
connection_tunnels: DashMap::new(),
|
||||
next_tunnel_id: AtomicU64::new(1),
|
||||
next_connection_id: AtomicU64::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next_tunnel_id(&self) -> TunnelId {
|
||||
self.next_tunnel_id.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn next_connection_id(&self) -> usize {
|
||||
self.next_connection_id.fetch_add(1, Ordering::Relaxed) as usize
|
||||
}
|
||||
|
||||
pub fn register_tunnel(&self, connection_id: usize, info: TunnelInfo, connection: Connection) {
|
||||
let tunnel_id = info.id;
|
||||
|
||||
if info.protocol == TunnelProtocol::Http
|
||||
&& let Some(ref subdomain) = info.subdomain
|
||||
{
|
||||
self.http_routes.insert(subdomain.clone(), tunnel_id);
|
||||
}
|
||||
|
||||
if self.manage_firewall {
|
||||
if let Some(port) = info.remote_port {
|
||||
let proto = match info.protocol {
|
||||
TunnelProtocol::Tcp => "tcp",
|
||||
TunnelProtocol::Udp => "udp",
|
||||
TunnelProtocol::Http => "",
|
||||
};
|
||||
if !proto.is_empty() {
|
||||
firewall::allow_port(port, proto);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.tunnels
|
||||
.insert(tunnel_id, TunnelEntry { info, connection });
|
||||
self.connection_tunnels
|
||||
.entry(connection_id)
|
||||
.or_default()
|
||||
.push(tunnel_id);
|
||||
}
|
||||
|
||||
pub fn remove_connection(&self, connection_id: usize) {
|
||||
if let Some((_, tunnel_ids)) = self.connection_tunnels.remove(&connection_id) {
|
||||
for tid in tunnel_ids {
|
||||
if let Some((_, entry)) = self.tunnels.remove(&tid) {
|
||||
if entry.info.protocol == TunnelProtocol::Http
|
||||
&& let Some(ref subdomain) = entry.info.subdomain
|
||||
{
|
||||
self.http_routes.remove(subdomain);
|
||||
}
|
||||
|
||||
if self.manage_firewall {
|
||||
if let Some(port) = entry.info.remote_port {
|
||||
let proto = match entry.info.protocol {
|
||||
TunnelProtocol::Tcp => "tcp",
|
||||
TunnelProtocol::Udp => "udp",
|
||||
TunnelProtocol::Http => "",
|
||||
};
|
||||
if !proto.is_empty() {
|
||||
firewall::deny_port(port, proto);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(tunnel_id = tid, "removed tunnel");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,113 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::{Json, Router};
|
||||
use color_eyre::Result;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
use super::state::ServerState;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct TraefikConfig {
|
||||
http: TraefikHttp,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct TraefikHttp {
|
||||
routers: HashMap<String, TraefikRouter>,
|
||||
services: HashMap<String, TraefikService>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TraefikRouter {
|
||||
rule: String,
|
||||
service: String,
|
||||
entry_points: Vec<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
tls: Option<TraefikTls>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TraefikTls {
|
||||
cert_resolver: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TraefikService {
|
||||
load_balancer: TraefikLoadBalancer,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct TraefikLoadBalancer {
|
||||
servers: Vec<TraefikServer>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct TraefikServer {
|
||||
url: String,
|
||||
}
|
||||
|
||||
pub async fn run(
|
||||
addr: SocketAddr,
|
||||
state: Arc<ServerState>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<()> {
|
||||
let app = Router::new()
|
||||
.route("/api/traefik", axum::routing::get(handler))
|
||||
.with_state(state);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||
info!(%addr, "Traefik API listening");
|
||||
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(async move { cancel.cancelled().await })
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handler(State(state): State<Arc<ServerState>>) -> Json<TraefikConfig> {
|
||||
let mut routers = HashMap::new();
|
||||
let mut services = HashMap::new();
|
||||
|
||||
let bore_url = format!("http://{}", state.http_addr);
|
||||
|
||||
for entry in state.http_routes.iter() {
|
||||
let subdomain = entry.key();
|
||||
let name = format!("bore-{subdomain}");
|
||||
let fqdn = format!("{subdomain}.{}", state.base_domain);
|
||||
|
||||
routers.insert(
|
||||
name.clone(),
|
||||
TraefikRouter {
|
||||
rule: format!("Host(`{fqdn}`)"),
|
||||
service: name.clone(),
|
||||
entry_points: vec![state.traefik_entrypoint.clone()],
|
||||
tls: state.traefik_cert_resolver.as_ref().map(|r| TraefikTls {
|
||||
cert_resolver: r.clone(),
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
services.insert(
|
||||
name,
|
||||
TraefikService {
|
||||
load_balancer: TraefikLoadBalancer {
|
||||
servers: vec![TraefikServer {
|
||||
url: bore_url.clone(),
|
||||
}],
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Json(TraefikConfig {
|
||||
http: TraefikHttp { routers, services },
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user