diff --git a/pagetop/Cargo.toml b/pagetop/Cargo.toml index 8723ad23..a5710ee2 100644 --- a/pagetop/Cargo.toml +++ b/pagetop/Cargo.toml @@ -56,12 +56,12 @@ maud = { version = "0.24.0", features = ["actix-web"] } serde = { version = "1.0", features = ["derive"] } [dependencies.sea-orm] -version = "0.10.7" +version = "0.11.1" features = ["debug-print", "macros", "runtime-async-std-native-tls"] default-features = false optional = true [dependencies.sea-schema] -version = "0.10.3" +version = "0.11.0" optional = true [features] diff --git a/pagetop/src/core/module/all.rs b/pagetop/src/core/module/all.rs index 0329df44..84b1a085 100644 --- a/pagetop/src/core/module/all.rs +++ b/pagetop/src/core/module/all.rs @@ -140,7 +140,7 @@ pub fn run_migrations() { migrations } } - Migrator::up(&DBCONN, None) + Migrator::up(SchemaManagerConnection::Connection(&DBCONN), None) }) .unwrap(); @@ -155,7 +155,7 @@ pub fn run_migrations() { migrations } } - Migrator::down(&DBCONN, None) + Migrator::down(SchemaManagerConnection::Connection(&DBCONN), None) }) .unwrap(); } diff --git a/pagetop/src/db/migration.rs b/pagetop/src/db/migration.rs index eb9ee6dc..7670de69 100644 --- a/pagetop/src/db/migration.rs +++ b/pagetop/src/db/migration.rs @@ -1,10 +1,12 @@ //pub mod cli; +pub mod connection; pub mod manager; pub mod migrator; pub mod prelude; pub mod seaql_migrations; //pub mod util; +pub use connection::*; pub use manager::*; //pub use migrator::*; diff --git a/pagetop/src/db/migration/connection.rs b/pagetop/src/db/migration/connection.rs new file mode 100644 index 00000000..116185e4 --- /dev/null +++ b/pagetop/src/db/migration/connection.rs @@ -0,0 +1,148 @@ +use futures::Future; +use sea_orm::{ + AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr, + ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionTrait, +}; +use std::pin::Pin; + +pub enum SchemaManagerConnection<'c> { + Connection(&'c DatabaseConnection), + Transaction(&'c DatabaseTransaction), +} + +#[async_trait::async_trait] +impl<'c> ConnectionTrait for SchemaManagerConnection<'c> { + fn get_database_backend(&self) -> DbBackend { + match self { + SchemaManagerConnection::Connection(conn) => conn.get_database_backend(), + SchemaManagerConnection::Transaction(trans) => trans.get_database_backend(), + } + } + + async fn execute(&self, stmt: Statement) -> Result { + match self { + SchemaManagerConnection::Connection(conn) => conn.execute(stmt).await, + SchemaManagerConnection::Transaction(trans) => trans.execute(stmt).await, + } + } + + async fn execute_unprepared(&self, sql: &str) -> Result { + match self { + SchemaManagerConnection::Connection(conn) => conn.execute_unprepared(sql).await, + SchemaManagerConnection::Transaction(trans) => trans.execute_unprepared(sql).await, + } + } + + async fn query_one(&self, stmt: Statement) -> Result, DbErr> { + match self { + SchemaManagerConnection::Connection(conn) => conn.query_one(stmt).await, + SchemaManagerConnection::Transaction(trans) => trans.query_one(stmt).await, + } + } + + async fn query_all(&self, stmt: Statement) -> Result, DbErr> { + match self { + SchemaManagerConnection::Connection(conn) => conn.query_all(stmt).await, + SchemaManagerConnection::Transaction(trans) => trans.query_all(stmt).await, + } + } + + fn is_mock_connection(&self) -> bool { + match self { + SchemaManagerConnection::Connection(conn) => conn.is_mock_connection(), + SchemaManagerConnection::Transaction(trans) => trans.is_mock_connection(), + } + } +} + +#[async_trait::async_trait] +impl<'c> TransactionTrait for SchemaManagerConnection<'c> { + async fn begin(&self) -> Result { + match self { + SchemaManagerConnection::Connection(conn) => conn.begin().await, + SchemaManagerConnection::Transaction(trans) => trans.begin().await, + } + } + + async fn begin_with_config( + &self, + isolation_level: Option, + access_mode: Option, + ) -> Result { + match self { + SchemaManagerConnection::Connection(conn) => { + conn.begin_with_config(isolation_level, access_mode).await + } + SchemaManagerConnection::Transaction(trans) => { + trans.begin_with_config(isolation_level, access_mode).await + } + } + } + + async fn transaction(&self, callback: F) -> Result> + where + F: for<'a> FnOnce( + &'a DatabaseTransaction, + ) -> Pin> + Send + 'a>> + + Send, + T: Send, + E: std::error::Error + Send, + { + match self { + SchemaManagerConnection::Connection(conn) => conn.transaction(callback).await, + SchemaManagerConnection::Transaction(trans) => trans.transaction(callback).await, + } + } + + async fn transaction_with_config( + &self, + callback: F, + isolation_level: Option, + access_mode: Option, + ) -> Result> + where + F: for<'a> FnOnce( + &'a DatabaseTransaction, + ) -> Pin> + Send + 'a>> + + Send, + T: Send, + E: std::error::Error + Send, + { + match self { + SchemaManagerConnection::Connection(conn) => { + conn.transaction_with_config(callback, isolation_level, access_mode) + .await + } + SchemaManagerConnection::Transaction(trans) => { + trans + .transaction_with_config(callback, isolation_level, access_mode) + .await + } + } + } +} + +pub trait IntoSchemaManagerConnection<'c>: Send +where + Self: 'c, +{ + fn into_schema_manager_connection(self) -> SchemaManagerConnection<'c>; +} + +impl<'c> IntoSchemaManagerConnection<'c> for SchemaManagerConnection<'c> { + fn into_schema_manager_connection(self) -> SchemaManagerConnection<'c> { + self + } +} + +impl<'c> IntoSchemaManagerConnection<'c> for &'c DatabaseConnection { + fn into_schema_manager_connection(self) -> SchemaManagerConnection<'c> { + SchemaManagerConnection::Connection(self) + } +} + +impl<'c> IntoSchemaManagerConnection<'c> for &'c DatabaseTransaction { + fn into_schema_manager_connection(self) -> SchemaManagerConnection<'c> { + SchemaManagerConnection::Transaction(self) + } +} diff --git a/pagetop/src/db/migration/manager.rs b/pagetop/src/db/migration/manager.rs index 71e91b37..f70c9212 100644 --- a/pagetop/src/db/migration/manager.rs +++ b/pagetop/src/db/migration/manager.rs @@ -1,20 +1,26 @@ +use super::{IntoSchemaManagerConnection, SchemaManagerConnection}; use sea_orm::sea_query::{ extension::postgres::{TypeAlterStatement, TypeCreateStatement, TypeDropStatement}, ForeignKeyCreateStatement, ForeignKeyDropStatement, IndexCreateStatement, IndexDropStatement, TableAlterStatement, TableCreateStatement, TableDropStatement, TableRenameStatement, TableTruncateStatement, }; -use sea_orm::{ConnectionTrait, DbBackend, DbConn, DbErr, StatementBuilder}; +use sea_orm::{ConnectionTrait, DbBackend, DbErr, StatementBuilder}; use sea_schema::{mysql::MySql, postgres::Postgres, probe::SchemaProbe, sqlite::Sqlite}; /// Helper struct for writing migration scripts in migration file pub struct SchemaManager<'c> { - conn: &'c DbConn, + conn: SchemaManagerConnection<'c>, } impl<'c> SchemaManager<'c> { - pub fn new(conn: &'c DbConn) -> Self { - Self { conn } + pub fn new(conn: T) -> Self + where + T: IntoSchemaManagerConnection<'c>, + { + Self { + conn: conn.into_schema_manager_connection(), + } } pub async fn exec_stmt(&self, stmt: S) -> Result<(), DbErr> @@ -29,8 +35,8 @@ impl<'c> SchemaManager<'c> { self.conn.get_database_backend() } - pub fn get_connection(&self) -> &'c DbConn { - self.conn + pub fn get_connection(&self) -> &SchemaManagerConnection<'c> { + &self.conn } } diff --git a/pagetop/src/db/migration/migrator.rs b/pagetop/src/db/migration/migrator.rs index 5efdcd09..5ed18f0b 100644 --- a/pagetop/src/db/migration/migrator.rs +++ b/pagetop/src/db/migration/migrator.rs @@ -1,18 +1,23 @@ +use futures::Future; use std::collections::HashSet; use std::fmt::Display; +use std::pin::Pin; use std::time::SystemTime; use tracing::info; -use sea_orm::sea_query::{Alias, Expr, ForeignKey, Query, SelectStatement, SimpleExpr, Table}; +use sea_orm::sea_query::{ + self, extension::postgres::Type, Alias, Expr, ForeignKey, Iden, JoinType, Query, + SelectStatement, SimpleExpr, Table, +}; use sea_orm::{ - ActiveModelTrait, ActiveValue, ColumnTrait, Condition, ConnectionTrait, DbBackend, DbConn, - DbErr, EntityTrait, QueryFilter, QueryOrder, Schema, Statement, + ActiveModelTrait, ActiveValue, ColumnTrait, Condition, ConnectionTrait, DbBackend, DbErr, + EntityTrait, QueryFilter, QueryOrder, Schema, Statement, TransactionTrait, }; use sea_schema::{mysql::MySql, postgres::Postgres, probe::SchemaProbe, sqlite::Sqlite}; -use super::{seaql_migrations, MigrationTrait, SchemaManager}; +use super::{seaql_migrations, IntoSchemaManagerConnection, MigrationTrait, SchemaManager}; -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, PartialEq, Eq)] /// Status of migration pub enum MigrationStatus { /// Not yet applied @@ -32,7 +37,7 @@ impl Display for MigrationStatus { MigrationStatus::Pending => "Pending", MigrationStatus::Applied => "Applied", }; - write!(f, "{}", status) + write!(f, "{status}") } } @@ -54,7 +59,10 @@ pub trait MigratorTrait: Send { } /// Get list of applied migrations from database - async fn get_migration_models(db: &DbConn) -> Result, DbErr> { + async fn get_migration_models(db: &C) -> Result, DbErr> + where + C: ConnectionTrait, + { Self::install(db).await?; seaql_migrations::Entity::find() .order_by_asc(seaql_migrations::Column::Version) @@ -63,7 +71,10 @@ pub trait MigratorTrait: Send { } /// Get list of migrations with status - async fn get_migration_with_status(db: &DbConn) -> Result, DbErr> { + async fn get_migration_with_status(db: &C) -> Result, DbErr> + where + C: ConnectionTrait, + { Self::install(db).await?; let mut migration_files = Self::get_migration_files(); let migration_models = Self::get_migration_models(db).await?; @@ -88,7 +99,7 @@ pub trait MigratorTrait: Send { let errors: Vec = missing_migrations_in_fs .iter() .map(|missing_migration| { - format!("Migration file of version '{}' is missing, this migration has been applied but its file is missing", missing_migration) + format!("Migration file of version '{missing_migration}' is missing, this migration has been applied but its file is missing") }).collect(); if !errors.is_empty() { @@ -99,7 +110,10 @@ pub trait MigratorTrait: Send { } /// Get list of pending migrations - async fn get_pending_migrations(db: &DbConn) -> Result, DbErr> { + async fn get_pending_migrations(db: &C) -> Result, DbErr> + where + C: ConnectionTrait, + { Self::install(db).await?; Ok(Self::get_migration_with_status(db) .await? @@ -109,7 +123,10 @@ pub trait MigratorTrait: Send { } /// Get list of applied migrations - async fn get_applied_migrations(db: &DbConn) -> Result, DbErr> { + async fn get_applied_migrations(db: &C) -> Result, DbErr> + where + C: ConnectionTrait, + { Self::install(db).await?; Ok(Self::get_migration_with_status(db) .await? @@ -119,7 +136,10 @@ pub trait MigratorTrait: Send { } /// Create migration table `seaql_migrations` in the database - async fn install(db: &DbConn) -> Result<(), DbErr> { + async fn install(db: &C) -> Result<(), DbErr> + where + C: ConnectionTrait, + { let builder = db.get_database_backend(); let schema = Schema::new(builder); let mut stmt = schema.create_table_from_entity(seaql_migrations::Entity); @@ -127,103 +147,11 @@ pub trait MigratorTrait: Send { db.execute(builder.build(&stmt)).await.map(|_| ()) } - /// Drop all tables from the database, then reapply all migrations - async fn fresh(db: &DbConn) -> Result<(), DbErr> { - Self::install(db).await?; - let db_backend = db.get_database_backend(); - - // Temporarily disable the foreign key check - if db_backend == DbBackend::Sqlite { - info!("Disabling foreign key check"); - db.execute(Statement::from_string( - db_backend, - "PRAGMA foreign_keys = OFF".to_owned(), - )) - .await?; - info!("Foreign key check disabled"); - } - - // Drop all foreign keys - if db_backend == DbBackend::MySql { - info!("Dropping all foreign keys"); - let mut stmt = Query::select(); - stmt.columns([Alias::new("TABLE_NAME"), Alias::new("CONSTRAINT_NAME")]) - .from(( - Alias::new("information_schema"), - Alias::new("table_constraints"), - )) - .cond_where( - Condition::all() - .add( - Expr::expr(get_current_schema(db)).equals( - Alias::new("table_constraints"), - Alias::new("table_schema"), - ), - ) - .add(Expr::expr(Expr::value("FOREIGN KEY")).equals( - Alias::new("table_constraints"), - Alias::new("constraint_type"), - )), - ); - let rows = db.query_all(db_backend.build(&stmt)).await?; - for row in rows.into_iter() { - let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?; - let table_name: String = row.try_get("", "TABLE_NAME")?; - info!( - "Dropping foreign key '{}' from table '{}'", - constraint_name, table_name - ); - let mut stmt = ForeignKey::drop(); - stmt.table(Alias::new(table_name.as_str())) - .name(constraint_name.as_str()); - db.execute(db_backend.build(&stmt)).await?; - info!("Foreign key '{}' has been dropped", constraint_name); - } - info!("All foreign keys dropped"); - } - - // Drop all tables - let stmt = query_tables(db); - let rows = db.query_all(db_backend.build(&stmt)).await?; - for row in rows.into_iter() { - let table_name: String = row.try_get("", "table_name")?; - info!("Dropping table '{}'", table_name); - let mut stmt = Table::drop(); - stmt.table(Alias::new(table_name.as_str())) - .if_exists() - .cascade(); - db.execute(db_backend.build(&stmt)).await?; - info!("Table '{}' has been dropped", table_name); - } - - // Restore the foreign key check - if db_backend == DbBackend::Sqlite { - info!("Restoring foreign key check"); - db.execute(Statement::from_string( - db_backend, - "PRAGMA foreign_keys = ON".to_owned(), - )) - .await?; - info!("Foreign key check restored"); - } - - // Reapply all migrations - Self::up(db, None).await - } - - /// Rollback all applied migrations, then reapply all migrations - async fn refresh(db: &DbConn) -> Result<(), DbErr> { - Self::down(db, None).await?; - Self::up(db, None).await - } - - /// Rollback all applied migrations - async fn reset(db: &DbConn) -> Result<(), DbErr> { - Self::down(db, None).await - } - /// Check the status of all migrations - async fn status(db: &DbConn) -> Result<(), DbErr> { + async fn status(db: &C) -> Result<(), DbErr> + where + C: ConnectionTrait, + { Self::install(db).await?; info!("Checking migration status"); @@ -235,81 +163,257 @@ pub trait MigratorTrait: Send { Ok(()) } + /// Drop all tables from the database, then reapply all migrations + async fn fresh<'c, C>(db: C) -> Result<(), DbErr> + where + C: IntoSchemaManagerConnection<'c>, + { + exec_with_connection::<'_, _, _, Self>(db, move |manager| { + Box::pin(async move { exec_fresh::(manager).await }) + }) + .await + } + + /// Rollback all applied migrations, then reapply all migrations + async fn refresh<'c, C>(db: C) -> Result<(), DbErr> + where + C: IntoSchemaManagerConnection<'c>, + { + exec_with_connection::<'_, _, _, Self>(db, move |manager| { + Box::pin(async move { + exec_down::(manager, None).await?; + exec_up::(manager, None).await + }) + }) + .await + } + + /// Rollback all applied migrations + async fn reset<'c, C>(db: C) -> Result<(), DbErr> + where + C: IntoSchemaManagerConnection<'c>, + { + exec_with_connection::<'_, _, _, Self>(db, move |manager| { + Box::pin(async move { exec_down::(manager, None).await }) + }) + .await + } + /// Apply pending migrations - async fn up(db: &DbConn, mut steps: Option) -> Result<(), DbErr> { - Self::install(db).await?; - let manager = SchemaManager::new(db); - - if let Some(steps) = steps { - info!("Applying {} pending migrations", steps); - } else { - info!("Applying all pending migrations"); - } - - let migrations = Self::get_pending_migrations(db).await?.into_iter(); - if migrations.len() == 0 { - info!("No pending migrations"); - } - for Migration { migration, .. } in migrations { - if let Some(steps) = steps.as_mut() { - if steps == &0 { - break; - } - *steps -= 1; - } - info!("Applying migration '{}'", migration.name()); - migration.up(&manager).await?; - info!("Migration '{}' has been applied", migration.name()); - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("SystemTime before UNIX EPOCH!"); - seaql_migrations::ActiveModel { - version: ActiveValue::Set(migration.name().to_owned()), - applied_at: ActiveValue::Set(now.as_secs() as i64), - } - .insert(db) - .await?; - } - - Ok(()) + async fn up<'c, C>(db: C, steps: Option) -> Result<(), DbErr> + where + C: IntoSchemaManagerConnection<'c>, + { + exec_with_connection::<'_, _, _, Self>(db, move |manager| { + Box::pin(async move { exec_up::(manager, steps).await }) + }) + .await } /// Rollback applied migrations - async fn down(db: &DbConn, mut steps: Option) -> Result<(), DbErr> { - Self::install(db).await?; - let manager = SchemaManager::new(db); - - if let Some(steps) = steps { - info!("Rolling back {} applied migrations", steps); - } else { - info!("Rolling back all applied migrations"); - } - - let migrations = Self::get_applied_migrations(db).await?.into_iter().rev(); - if migrations.len() == 0 { - info!("No applied migrations"); - } - for Migration { migration, .. } in migrations { - if let Some(steps) = steps.as_mut() { - if steps == &0 { - break; - } - *steps -= 1; - } - info!("Rolling back migration '{}'", migration.name()); - migration.down(&manager).await?; - info!("Migration '{}' has been rollbacked", migration.name()); - seaql_migrations::Entity::delete_many() - .filter(seaql_migrations::Column::Version.eq(migration.name())) - .exec(db) - .await?; - } - - Ok(()) + async fn down<'c, C>(db: C, steps: Option) -> Result<(), DbErr> + where + C: IntoSchemaManagerConnection<'c>, + { + exec_with_connection::<'_, _, _, Self>(db, move |manager| { + Box::pin(async move { exec_down::(manager, steps).await }) + }) + .await } } -pub(crate) fn query_tables(db: &DbConn) -> SelectStatement { +async fn exec_with_connection<'c, C, F, M>(db: C, f: F) -> Result<(), DbErr> +where + C: IntoSchemaManagerConnection<'c>, + F: for<'b> Fn( + &'b SchemaManager<'_>, + ) -> Pin> + Send + 'b>>, + M: MigratorTrait + ?Sized, +{ + let db = db.into_schema_manager_connection(); + + match db.get_database_backend() { + DbBackend::Postgres => { + let transaction = db.begin().await?; + let manager = SchemaManager::new(&transaction); + f(&manager).await?; + transaction.commit().await + } + DbBackend::MySql | DbBackend::Sqlite => { + let manager = SchemaManager::new(db); + f(&manager).await + } + } +} + +async fn exec_fresh(manager: &SchemaManager<'_>) -> Result<(), DbErr> +where + M: MigratorTrait + ?Sized, +{ + let db = manager.get_connection(); + + M::install(db).await?; + let db_backend = db.get_database_backend(); + + // Temporarily disable the foreign key check + if db_backend == DbBackend::Sqlite { + info!("Disabling foreign key check"); + db.execute(Statement::from_string( + db_backend, + "PRAGMA foreign_keys = OFF".to_owned(), + )) + .await?; + info!("Foreign key check disabled"); + } + + // Drop all foreign keys + if db_backend == DbBackend::MySql { + info!("Dropping all foreign keys"); + let stmt = query_mysql_foreign_keys(db); + let rows = db.query_all(db_backend.build(&stmt)).await?; + for row in rows.into_iter() { + let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?; + let table_name: String = row.try_get("", "TABLE_NAME")?; + info!( + "Dropping foreign key '{}' from table '{}'", + constraint_name, table_name + ); + let mut stmt = ForeignKey::drop(); + stmt.table(Alias::new(table_name.as_str())) + .name(constraint_name.as_str()); + db.execute(db_backend.build(&stmt)).await?; + info!("Foreign key '{}' has been dropped", constraint_name); + } + info!("All foreign keys dropped"); + } + + // Drop all tables + let stmt = query_tables(db); + let rows = db.query_all(db_backend.build(&stmt)).await?; + for row in rows.into_iter() { + let table_name: String = row.try_get("", "table_name")?; + info!("Dropping table '{}'", table_name); + let mut stmt = Table::drop(); + stmt.table(Alias::new(table_name.as_str())) + .if_exists() + .cascade(); + db.execute(db_backend.build(&stmt)).await?; + info!("Table '{}' has been dropped", table_name); + } + + // Drop all types + if db_backend == DbBackend::Postgres { + info!("Dropping all types"); + let stmt = query_pg_types(db); + let rows = db.query_all(db_backend.build(&stmt)).await?; + for row in rows { + let type_name: String = row.try_get("", "typname")?; + info!("Dropping type '{}'", type_name); + let mut stmt = Type::drop(); + stmt.name(Alias::new(&type_name as &str)); + db.execute(db_backend.build(&stmt)).await?; + info!("Type '{}' has been dropped", type_name); + } + } + + // Restore the foreign key check + if db_backend == DbBackend::Sqlite { + info!("Restoring foreign key check"); + db.execute(Statement::from_string( + db_backend, + "PRAGMA foreign_keys = ON".to_owned(), + )) + .await?; + info!("Foreign key check restored"); + } + + // Reapply all migrations + exec_up::(manager, None).await +} + +async fn exec_up(manager: &SchemaManager<'_>, mut steps: Option) -> Result<(), DbErr> +where + M: MigratorTrait + ?Sized, +{ + let db = manager.get_connection(); + + M::install(db).await?; + + if let Some(steps) = steps { + info!("Applying {} pending migrations", steps); + } else { + info!("Applying all pending migrations"); + } + + let migrations = M::get_pending_migrations(db).await?.into_iter(); + if migrations.len() == 0 { + info!("No pending migrations"); + } + for Migration { migration, .. } in migrations { + if let Some(steps) = steps.as_mut() { + if steps == &0 { + break; + } + *steps -= 1; + } + info!("Applying migration '{}'", migration.name()); + migration.up(manager).await?; + info!("Migration '{}' has been applied", migration.name()); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH!"); + seaql_migrations::ActiveModel { + version: ActiveValue::Set(migration.name().to_owned()), + applied_at: ActiveValue::Set(now.as_secs() as i64), + } + .insert(db) + .await?; + } + + Ok(()) +} + +async fn exec_down(manager: &SchemaManager<'_>, mut steps: Option) -> Result<(), DbErr> +where + M: MigratorTrait + ?Sized, +{ + let db = manager.get_connection(); + + M::install(db).await?; + + if let Some(steps) = steps { + info!("Rolling back {} applied migrations", steps); + } else { + info!("Rolling back all applied migrations"); + } + + let migrations = M::get_applied_migrations(db).await?.into_iter().rev(); + if migrations.len() == 0 { + info!("No applied migrations"); + } + for Migration { migration, .. } in migrations { + if let Some(steps) = steps.as_mut() { + if steps == &0 { + break; + } + *steps -= 1; + } + info!("Rolling back migration '{}'", migration.name()); + migration.down(manager).await?; + info!("Migration '{}' has been rollbacked", migration.name()); + seaql_migrations::Entity::delete_many() + .filter(seaql_migrations::Column::Version.eq(migration.name())) + .exec(db) + .await?; + } + + Ok(()) +} + +fn query_tables(db: &C) -> SelectStatement +where + C: ConnectionTrait, +{ match db.get_database_backend() { DbBackend::MySql => MySql::query_tables(), DbBackend::Postgres => Postgres::query_tables(), @@ -317,10 +421,95 @@ pub(crate) fn query_tables(db: &DbConn) -> SelectStatement { } } -pub(crate) fn get_current_schema(db: &DbConn) -> SimpleExpr { +fn get_current_schema(db: &C) -> SimpleExpr +where + C: ConnectionTrait, +{ match db.get_database_backend() { DbBackend::MySql => MySql::get_current_schema(), DbBackend::Postgres => Postgres::get_current_schema(), DbBackend::Sqlite => unimplemented!(), } } + +#[derive(Iden)] +enum InformationSchema { + #[iden = "information_schema"] + Schema, + #[iden = "TABLE_NAME"] + TableName, + #[iden = "CONSTRAINT_NAME"] + ConstraintName, + TableConstraints, + TableSchema, + ConstraintType, +} + +fn query_mysql_foreign_keys(db: &C) -> SelectStatement +where + C: ConnectionTrait, +{ + let mut stmt = Query::select(); + stmt.columns([ + InformationSchema::TableName, + InformationSchema::ConstraintName, + ]) + .from(( + InformationSchema::Schema, + InformationSchema::TableConstraints, + )) + .cond_where( + Condition::all() + .add(Expr::expr(get_current_schema(db)).equals(( + InformationSchema::TableConstraints, + InformationSchema::TableSchema, + ))) + .add( + Expr::col(( + InformationSchema::TableConstraints, + InformationSchema::ConstraintType, + )) + .eq("FOREIGN KEY"), + ), + ); + stmt +} + +#[derive(Iden)] +enum PgType { + Table, + Typname, + Typnamespace, + Typelem, +} + +#[derive(Iden)] +enum PgNamespace { + Table, + Oid, + Nspname, +} + +fn query_pg_types(db: &C) -> SelectStatement +where + C: ConnectionTrait, +{ + let mut stmt = Query::select(); + stmt.column(PgType::Typname) + .from(PgType::Table) + .join( + JoinType::LeftJoin, + PgNamespace::Table, + Expr::col((PgNamespace::Table, PgNamespace::Oid)) + .equals((PgType::Table, PgType::Typnamespace)), + ) + .cond_where( + Condition::all() + .add( + Expr::expr(get_current_schema(db)) + .equals((PgNamespace::Table, PgNamespace::Nspname)), + ) + .add(Expr::col((PgType::Table, PgType::Typelem)).eq(0)), + ); + stmt +} diff --git a/pagetop/src/db/migration/prelude.rs b/pagetop/src/db/migration/prelude.rs index 5d1e6f49..fac0e132 100644 --- a/pagetop/src/db/migration/prelude.rs +++ b/pagetop/src/db/migration/prelude.rs @@ -1,4 +1,7 @@ //pub use super::cli; + +pub use super::connection::IntoSchemaManagerConnection; +pub use super::connection::SchemaManagerConnection; pub use super::manager::SchemaManager; pub use super::migrator::MigratorTrait; pub use super::{MigrationName, MigrationTrait}; diff --git a/pagetop/src/db/migration/seaql_migrations.rs b/pagetop/src/db/migration/seaql_migrations.rs index cc06c44d..9926ea9c 100644 --- a/pagetop/src/db/migration/seaql_migrations.rs +++ b/pagetop/src/db/migration/seaql_migrations.rs @@ -1,6 +1,6 @@ use sea_orm::entity::prelude::*; -#[derive(Clone, Debug, Eq, PartialEq, DeriveEntityModel)] +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] #[sea_orm(table_name = "seaql_migrations")] pub struct Model { #[sea_orm(primary_key, auto_increment = false)]