rewrite generator

This commit is contained in:
2025-04-03 21:28:46 +04:00
parent 556ae626ae
commit 8805072573
10 changed files with 569 additions and 359 deletions

150
src/generator/discover.rs Normal file
View File

@@ -0,0 +1,150 @@
use core::time;
use color_eyre::eyre::{eyre, Context, ContextCompat, Report, Result};
use sea_schema::sea_query::TableCreateStatement;
use url::Url;
use crate::config::Config;
pub async fn get_tables(
database_url: String,
config: &Config,
) -> Result<(Option<String>, Vec<TableCreateStatement>)> {
let url = Url::parse(&database_url)?;
tracing::trace!(?url);
let is_sqlite = url.scheme() == "sqlite";
let filter_tables = config.sea_orm.entity.tables.get_filter();
let database_name: &str = (if !is_sqlite {
let database_name = url
.path_segments()
.context("No database name as part of path")?
.next()
.context("No database name as part of path")?;
if database_name.is_empty() {
return Err(eyre!("Database path name is empty"));
}
Ok::<&str, Report>(database_name)
} else {
Ok(Default::default())
})?;
let (schema_name, table_stmts) = match url.scheme() {
"mysql" => {
use sea_schema::mysql::discovery::SchemaDiscovery;
use sqlx::MySql;
tracing::info!("Connecting to MySQL");
let connection = sqlx_connect::<MySql>(
config.db.max_connections,
config.db.acquire_timeout,
url.as_str(),
None,
)
.await?;
tracing::info!("Discovering schema");
let schema_discovery = SchemaDiscovery::new(connection, database_name);
let schema = schema_discovery.discover().await?;
let table_stmts = schema
.tables
.into_iter()
.filter(|schema| filter_tables(&schema.info.name))
// .filter(|schema| filter_hidden_tables(&schema.info.name))
// .filter(|schema| filter_skip_tables(&schema.info.name))
.map(|schema| schema.write())
.collect();
(None, table_stmts)
}
"sqlite" => {
use sea_schema::sqlite::discovery::SchemaDiscovery;
use sqlx::Sqlite;
tracing::info!("Connecting to SQLite");
let connection = sqlx_connect::<Sqlite>(
config.db.max_connections,
config.db.acquire_timeout,
url.as_str(),
None,
)
.await?;
tracing::info!("Discovering schema");
let schema_discovery = SchemaDiscovery::new(connection);
let schema = schema_discovery
.discover()
.await?
.merge_indexes_into_table();
let table_stmts = schema
.tables
.into_iter()
.filter(|schema| filter_tables(&schema.name))
// .filter(|schema| filter_hidden_tables(&schema.name))
// .filter(|schema| filter_skip_tables(&schema.name))
.map(|schema| schema.write())
.collect();
(None, table_stmts)
}
"postgres" | "potgresql" => {
use sea_schema::postgres::discovery::SchemaDiscovery;
use sqlx::Postgres;
tracing::info!("Connecting to Postgres");
let schema = &config.db.database_schema.as_deref().unwrap_or("public");
let connection = sqlx_connect::<Postgres>(
config.db.max_connections,
config.db.acquire_timeout,
url.as_str(),
Some(schema),
)
.await?;
tracing::info!("Discovering schema");
let schema_discovery = SchemaDiscovery::new(connection, schema);
let schema = schema_discovery.discover().await?;
tracing::info!(?schema);
let table_stmts = schema
.tables
.into_iter()
.filter(|schema| filter_tables(&schema.info.name))
// .filter(|schema| filter_hidden_tables(&schema.info.name))
// .filter(|schema| filter_skip_tables(&schema.info.name))
.map(|schema| schema.write())
.collect();
(config.db.database_schema.clone(), table_stmts)
}
_ => unimplemented!("{} is not supported", url.scheme()),
};
tracing::info!("Schema discovered");
Ok((schema_name, table_stmts))
}
async fn sqlx_connect<DB>(
max_connections: u32,
acquire_timeout: u64,
url: &str,
schema: Option<&str>,
) -> Result<sqlx::Pool<DB>>
where
DB: sqlx::Database,
for<'a> &'a mut <DB as sqlx::Database>::Connection: sqlx::Executor<'a>,
{
let mut pool_options = sqlx::pool::PoolOptions::<DB>::new()
.max_connections(max_connections)
.acquire_timeout(time::Duration::from_secs(acquire_timeout));
// Set search_path for Postgres, E.g. Some("public") by default
// MySQL & SQLite connection initialize with schema `None`
if let Some(schema) = schema {
let sql = format!("SET search_path = '{schema}'");
pool_options = pool_options.after_connect(move |conn, _| {
let sql = sql.clone();
Box::pin(async move {
sqlx::Executor::execute(conn, sql.as_str())
.await
.map(|_| ())
})
});
}
pool_options.connect(url).await.map_err(Into::into)
}