155 lines
5.0 KiB
Rust
155 lines
5.0 KiB
Rust
use core::time;
|
|
|
|
use color_eyre::eyre::{eyre, ContextCompat, Report, Result};
|
|
use sea_schema::sea_query::TableCreateStatement;
|
|
use url::Url;
|
|
|
|
use crate::config::db::DbConfig;
|
|
#[derive(Debug, Clone)]
|
|
pub enum DbType {
|
|
MySql,
|
|
Postgres,
|
|
Sqlite,
|
|
}
|
|
|
|
pub async fn get_tables(
|
|
database_url: String,
|
|
filter: Box<dyn Fn(&String) -> bool>,
|
|
database_config: &DbConfig,
|
|
) -> Result<(Vec<TableCreateStatement>, DbType)> {
|
|
let url = Url::parse(&database_url)?;
|
|
|
|
tracing::trace!(?url);
|
|
|
|
let is_sqlite = url.scheme() == "sqlite";
|
|
|
|
let database_name: &str = (if !is_sqlite {
|
|
let database_name = url
|
|
.path_segments()
|
|
.context("No database name as part of path")?
|
|
.next()
|
|
.context("No database name as part of path")?;
|
|
|
|
if database_name.is_empty() {
|
|
return Err(eyre!("Database path name is empty"));
|
|
}
|
|
Ok::<&str, Report>(database_name)
|
|
} else {
|
|
Ok(Default::default())
|
|
})?;
|
|
|
|
let (table_stmts, db_type) = match url.scheme() {
|
|
"mysql" => {
|
|
use sea_schema::mysql::discovery::SchemaDiscovery;
|
|
use sqlx::MySql;
|
|
|
|
tracing::info!("Connecting to MySQL");
|
|
let connection = sqlx_connect::<MySql>(
|
|
database_config.max_connections,
|
|
database_config.acquire_timeout,
|
|
url.as_str(),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
tracing::info!("Discovering schema");
|
|
let schema_discovery = SchemaDiscovery::new(connection, database_name);
|
|
let schema = schema_discovery.discover().await?;
|
|
let table_stmts = schema
|
|
.tables
|
|
.into_iter()
|
|
.filter(|schema| filter(&schema.info.name))
|
|
.map(|schema| schema.write())
|
|
.collect();
|
|
(table_stmts, DbType::MySql)
|
|
}
|
|
"sqlite" => {
|
|
use sea_schema::sqlite::discovery::SchemaDiscovery;
|
|
use sqlx::Sqlite;
|
|
|
|
tracing::info!("Connecting to SQLite");
|
|
let connection = sqlx_connect::<Sqlite>(
|
|
database_config.max_connections,
|
|
database_config.acquire_timeout,
|
|
url.as_str(),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
tracing::info!("Discovering schema");
|
|
let schema_discovery = SchemaDiscovery::new(connection);
|
|
let schema = schema_discovery
|
|
.discover()
|
|
.await?
|
|
.merge_indexes_into_table();
|
|
let table_stmts = schema
|
|
.tables
|
|
.into_iter()
|
|
.filter(|schema| filter(&schema.name))
|
|
.map(|schema| schema.write())
|
|
.collect();
|
|
(table_stmts, DbType::Sqlite)
|
|
}
|
|
"postgres" | "potgresql" => {
|
|
use sea_schema::postgres::discovery::SchemaDiscovery;
|
|
use sqlx::Postgres;
|
|
|
|
tracing::info!("Connecting to Postgres");
|
|
let schema = &database_config
|
|
.database_schema
|
|
.as_deref()
|
|
.unwrap_or("public");
|
|
let connection = sqlx_connect::<Postgres>(
|
|
database_config.max_connections,
|
|
database_config.acquire_timeout,
|
|
url.as_str(),
|
|
Some(schema),
|
|
)
|
|
.await?;
|
|
tracing::info!("Discovering schema");
|
|
let schema_discovery = SchemaDiscovery::new(connection, schema);
|
|
let schema = schema_discovery.discover().await?;
|
|
tracing::info!(?schema);
|
|
let table_stmts = schema
|
|
.tables
|
|
.into_iter()
|
|
.filter(|schema| filter(&schema.info.name))
|
|
.map(|schema| schema.write())
|
|
.collect();
|
|
(table_stmts, DbType::Postgres)
|
|
}
|
|
_ => unimplemented!("{} is not supported", url.scheme()),
|
|
};
|
|
tracing::info!("Schema discovered");
|
|
|
|
Ok((table_stmts, db_type))
|
|
}
|
|
async fn sqlx_connect<DB>(
|
|
max_connections: u32,
|
|
acquire_timeout: u64,
|
|
url: &str,
|
|
schema: Option<&str>,
|
|
) -> Result<sqlx::Pool<DB>>
|
|
where
|
|
DB: sqlx::Database,
|
|
for<'a> &'a mut <DB as sqlx::Database>::Connection: sqlx::Executor<'a>,
|
|
{
|
|
let mut pool_options = sqlx::pool::PoolOptions::<DB>::new()
|
|
.max_connections(max_connections)
|
|
.acquire_timeout(time::Duration::from_secs(acquire_timeout));
|
|
// Set search_path for Postgres, E.g. Some("public") by default
|
|
// MySQL & SQLite connection initialize with schema `None`
|
|
if let Some(schema) = schema {
|
|
let sql = format!("SET search_path = '{schema}'");
|
|
pool_options = pool_options.after_connect(move |conn, _| {
|
|
let sql = sql.clone();
|
|
Box::pin(async move {
|
|
sqlx::Executor::execute(conn, sql.as_str())
|
|
.await
|
|
.map(|_| ())
|
|
})
|
|
});
|
|
}
|
|
pool_options.connect(url).await.map_err(Into::into)
|
|
}
|