⬆️ [pagetop] Actualiza SeaORM a la versión 0.11
This commit is contained in:
parent
2a8946d948
commit
27e79ee3cc
8 changed files with 536 additions and 188 deletions
|
|
@ -56,12 +56,12 @@ maud = { version = "0.24.0", features = ["actix-web"] }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|
||||||
[dependencies.sea-orm]
|
[dependencies.sea-orm]
|
||||||
version = "0.10.7"
|
version = "0.11.1"
|
||||||
features = ["debug-print", "macros", "runtime-async-std-native-tls"]
|
features = ["debug-print", "macros", "runtime-async-std-native-tls"]
|
||||||
default-features = false
|
default-features = false
|
||||||
optional = true
|
optional = true
|
||||||
[dependencies.sea-schema]
|
[dependencies.sea-schema]
|
||||||
version = "0.10.3"
|
version = "0.11.0"
|
||||||
optional = true
|
optional = true
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
|
|
||||||
|
|
@ -140,7 +140,7 @@ pub fn run_migrations() {
|
||||||
migrations
|
migrations
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Migrator::up(&DBCONN, None)
|
Migrator::up(SchemaManagerConnection::Connection(&DBCONN), None)
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|
@ -155,7 +155,7 @@ pub fn run_migrations() {
|
||||||
migrations
|
migrations
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Migrator::down(&DBCONN, None)
|
Migrator::down(SchemaManagerConnection::Connection(&DBCONN), None)
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,12 @@
|
||||||
//pub mod cli;
|
//pub mod cli;
|
||||||
|
pub mod connection;
|
||||||
pub mod manager;
|
pub mod manager;
|
||||||
pub mod migrator;
|
pub mod migrator;
|
||||||
pub mod prelude;
|
pub mod prelude;
|
||||||
pub mod seaql_migrations;
|
pub mod seaql_migrations;
|
||||||
//pub mod util;
|
//pub mod util;
|
||||||
|
|
||||||
|
pub use connection::*;
|
||||||
pub use manager::*;
|
pub use manager::*;
|
||||||
//pub use migrator::*;
|
//pub use migrator::*;
|
||||||
|
|
||||||
|
|
|
||||||
148
pagetop/src/db/migration/connection.rs
Normal file
148
pagetop/src/db/migration/connection.rs
Normal file
|
|
@ -0,0 +1,148 @@
|
||||||
|
use futures::Future;
|
||||||
|
use sea_orm::{
|
||||||
|
AccessMode, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
|
||||||
|
ExecResult, IsolationLevel, QueryResult, Statement, TransactionError, TransactionTrait,
|
||||||
|
};
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
pub enum SchemaManagerConnection<'c> {
|
||||||
|
Connection(&'c DatabaseConnection),
|
||||||
|
Transaction(&'c DatabaseTransaction),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<'c> ConnectionTrait for SchemaManagerConnection<'c> {
|
||||||
|
fn get_database_backend(&self) -> DbBackend {
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => conn.get_database_backend(),
|
||||||
|
SchemaManagerConnection::Transaction(trans) => trans.get_database_backend(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => conn.execute(stmt).await,
|
||||||
|
SchemaManagerConnection::Transaction(trans) => trans.execute(stmt).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => conn.execute_unprepared(sql).await,
|
||||||
|
SchemaManagerConnection::Transaction(trans) => trans.execute_unprepared(sql).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => conn.query_one(stmt).await,
|
||||||
|
SchemaManagerConnection::Transaction(trans) => trans.query_one(stmt).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => conn.query_all(stmt).await,
|
||||||
|
SchemaManagerConnection::Transaction(trans) => trans.query_all(stmt).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_mock_connection(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => conn.is_mock_connection(),
|
||||||
|
SchemaManagerConnection::Transaction(trans) => trans.is_mock_connection(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<'c> TransactionTrait for SchemaManagerConnection<'c> {
|
||||||
|
async fn begin(&self) -> Result<DatabaseTransaction, DbErr> {
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => conn.begin().await,
|
||||||
|
SchemaManagerConnection::Transaction(trans) => trans.begin().await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn begin_with_config(
|
||||||
|
&self,
|
||||||
|
isolation_level: Option<IsolationLevel>,
|
||||||
|
access_mode: Option<AccessMode>,
|
||||||
|
) -> Result<DatabaseTransaction, DbErr> {
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => {
|
||||||
|
conn.begin_with_config(isolation_level, access_mode).await
|
||||||
|
}
|
||||||
|
SchemaManagerConnection::Transaction(trans) => {
|
||||||
|
trans.begin_with_config(isolation_level, access_mode).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn transaction<F, T, E>(&self, callback: F) -> Result<T, TransactionError<E>>
|
||||||
|
where
|
||||||
|
F: for<'a> FnOnce(
|
||||||
|
&'a DatabaseTransaction,
|
||||||
|
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
|
||||||
|
+ Send,
|
||||||
|
T: Send,
|
||||||
|
E: std::error::Error + Send,
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => conn.transaction(callback).await,
|
||||||
|
SchemaManagerConnection::Transaction(trans) => trans.transaction(callback).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn transaction_with_config<F, T, E>(
|
||||||
|
&self,
|
||||||
|
callback: F,
|
||||||
|
isolation_level: Option<IsolationLevel>,
|
||||||
|
access_mode: Option<AccessMode>,
|
||||||
|
) -> Result<T, TransactionError<E>>
|
||||||
|
where
|
||||||
|
F: for<'a> FnOnce(
|
||||||
|
&'a DatabaseTransaction,
|
||||||
|
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>
|
||||||
|
+ Send,
|
||||||
|
T: Send,
|
||||||
|
E: std::error::Error + Send,
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
SchemaManagerConnection::Connection(conn) => {
|
||||||
|
conn.transaction_with_config(callback, isolation_level, access_mode)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
SchemaManagerConnection::Transaction(trans) => {
|
||||||
|
trans
|
||||||
|
.transaction_with_config(callback, isolation_level, access_mode)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait IntoSchemaManagerConnection<'c>: Send
|
||||||
|
where
|
||||||
|
Self: 'c,
|
||||||
|
{
|
||||||
|
fn into_schema_manager_connection(self) -> SchemaManagerConnection<'c>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'c> IntoSchemaManagerConnection<'c> for SchemaManagerConnection<'c> {
|
||||||
|
fn into_schema_manager_connection(self) -> SchemaManagerConnection<'c> {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'c> IntoSchemaManagerConnection<'c> for &'c DatabaseConnection {
|
||||||
|
fn into_schema_manager_connection(self) -> SchemaManagerConnection<'c> {
|
||||||
|
SchemaManagerConnection::Connection(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'c> IntoSchemaManagerConnection<'c> for &'c DatabaseTransaction {
|
||||||
|
fn into_schema_manager_connection(self) -> SchemaManagerConnection<'c> {
|
||||||
|
SchemaManagerConnection::Transaction(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,20 +1,26 @@
|
||||||
|
use super::{IntoSchemaManagerConnection, SchemaManagerConnection};
|
||||||
use sea_orm::sea_query::{
|
use sea_orm::sea_query::{
|
||||||
extension::postgres::{TypeAlterStatement, TypeCreateStatement, TypeDropStatement},
|
extension::postgres::{TypeAlterStatement, TypeCreateStatement, TypeDropStatement},
|
||||||
ForeignKeyCreateStatement, ForeignKeyDropStatement, IndexCreateStatement, IndexDropStatement,
|
ForeignKeyCreateStatement, ForeignKeyDropStatement, IndexCreateStatement, IndexDropStatement,
|
||||||
TableAlterStatement, TableCreateStatement, TableDropStatement, TableRenameStatement,
|
TableAlterStatement, TableCreateStatement, TableDropStatement, TableRenameStatement,
|
||||||
TableTruncateStatement,
|
TableTruncateStatement,
|
||||||
};
|
};
|
||||||
use sea_orm::{ConnectionTrait, DbBackend, DbConn, DbErr, StatementBuilder};
|
use sea_orm::{ConnectionTrait, DbBackend, DbErr, StatementBuilder};
|
||||||
use sea_schema::{mysql::MySql, postgres::Postgres, probe::SchemaProbe, sqlite::Sqlite};
|
use sea_schema::{mysql::MySql, postgres::Postgres, probe::SchemaProbe, sqlite::Sqlite};
|
||||||
|
|
||||||
/// Helper struct for writing migration scripts in migration file
|
/// Helper struct for writing migration scripts in migration file
|
||||||
pub struct SchemaManager<'c> {
|
pub struct SchemaManager<'c> {
|
||||||
conn: &'c DbConn,
|
conn: SchemaManagerConnection<'c>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'c> SchemaManager<'c> {
|
impl<'c> SchemaManager<'c> {
|
||||||
pub fn new(conn: &'c DbConn) -> Self {
|
pub fn new<T>(conn: T) -> Self
|
||||||
Self { conn }
|
where
|
||||||
|
T: IntoSchemaManagerConnection<'c>,
|
||||||
|
{
|
||||||
|
Self {
|
||||||
|
conn: conn.into_schema_manager_connection(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn exec_stmt<S>(&self, stmt: S) -> Result<(), DbErr>
|
pub async fn exec_stmt<S>(&self, stmt: S) -> Result<(), DbErr>
|
||||||
|
|
@ -29,8 +35,8 @@ impl<'c> SchemaManager<'c> {
|
||||||
self.conn.get_database_backend()
|
self.conn.get_database_backend()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_connection(&self) -> &'c DbConn {
|
pub fn get_connection(&self) -> &SchemaManagerConnection<'c> {
|
||||||
self.conn
|
&self.conn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,18 +1,23 @@
|
||||||
|
use futures::Future;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
|
use std::pin::Pin;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use sea_orm::sea_query::{Alias, Expr, ForeignKey, Query, SelectStatement, SimpleExpr, Table};
|
use sea_orm::sea_query::{
|
||||||
|
self, extension::postgres::Type, Alias, Expr, ForeignKey, Iden, JoinType, Query,
|
||||||
|
SelectStatement, SimpleExpr, Table,
|
||||||
|
};
|
||||||
use sea_orm::{
|
use sea_orm::{
|
||||||
ActiveModelTrait, ActiveValue, ColumnTrait, Condition, ConnectionTrait, DbBackend, DbConn,
|
ActiveModelTrait, ActiveValue, ColumnTrait, Condition, ConnectionTrait, DbBackend, DbErr,
|
||||||
DbErr, EntityTrait, QueryFilter, QueryOrder, Schema, Statement,
|
EntityTrait, QueryFilter, QueryOrder, Schema, Statement, TransactionTrait,
|
||||||
};
|
};
|
||||||
use sea_schema::{mysql::MySql, postgres::Postgres, probe::SchemaProbe, sqlite::Sqlite};
|
use sea_schema::{mysql::MySql, postgres::Postgres, probe::SchemaProbe, sqlite::Sqlite};
|
||||||
|
|
||||||
use super::{seaql_migrations, MigrationTrait, SchemaManager};
|
use super::{seaql_migrations, IntoSchemaManagerConnection, MigrationTrait, SchemaManager};
|
||||||
|
|
||||||
#[derive(Debug, Eq, PartialEq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
/// Status of migration
|
/// Status of migration
|
||||||
pub enum MigrationStatus {
|
pub enum MigrationStatus {
|
||||||
/// Not yet applied
|
/// Not yet applied
|
||||||
|
|
@ -32,7 +37,7 @@ impl Display for MigrationStatus {
|
||||||
MigrationStatus::Pending => "Pending",
|
MigrationStatus::Pending => "Pending",
|
||||||
MigrationStatus::Applied => "Applied",
|
MigrationStatus::Applied => "Applied",
|
||||||
};
|
};
|
||||||
write!(f, "{}", status)
|
write!(f, "{status}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -54,7 +59,10 @@ pub trait MigratorTrait: Send {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get list of applied migrations from database
|
/// Get list of applied migrations from database
|
||||||
async fn get_migration_models(db: &DbConn) -> Result<Vec<seaql_migrations::Model>, DbErr> {
|
async fn get_migration_models<C>(db: &C) -> Result<Vec<seaql_migrations::Model>, DbErr>
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
Self::install(db).await?;
|
Self::install(db).await?;
|
||||||
seaql_migrations::Entity::find()
|
seaql_migrations::Entity::find()
|
||||||
.order_by_asc(seaql_migrations::Column::Version)
|
.order_by_asc(seaql_migrations::Column::Version)
|
||||||
|
|
@ -63,7 +71,10 @@ pub trait MigratorTrait: Send {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get list of migrations with status
|
/// Get list of migrations with status
|
||||||
async fn get_migration_with_status(db: &DbConn) -> Result<Vec<Migration>, DbErr> {
|
async fn get_migration_with_status<C>(db: &C) -> Result<Vec<Migration>, DbErr>
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
Self::install(db).await?;
|
Self::install(db).await?;
|
||||||
let mut migration_files = Self::get_migration_files();
|
let mut migration_files = Self::get_migration_files();
|
||||||
let migration_models = Self::get_migration_models(db).await?;
|
let migration_models = Self::get_migration_models(db).await?;
|
||||||
|
|
@ -88,7 +99,7 @@ pub trait MigratorTrait: Send {
|
||||||
let errors: Vec<String> = missing_migrations_in_fs
|
let errors: Vec<String> = missing_migrations_in_fs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|missing_migration| {
|
.map(|missing_migration| {
|
||||||
format!("Migration file of version '{}' is missing, this migration has been applied but its file is missing", missing_migration)
|
format!("Migration file of version '{missing_migration}' is missing, this migration has been applied but its file is missing")
|
||||||
}).collect();
|
}).collect();
|
||||||
|
|
||||||
if !errors.is_empty() {
|
if !errors.is_empty() {
|
||||||
|
|
@ -99,7 +110,10 @@ pub trait MigratorTrait: Send {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get list of pending migrations
|
/// Get list of pending migrations
|
||||||
async fn get_pending_migrations(db: &DbConn) -> Result<Vec<Migration>, DbErr> {
|
async fn get_pending_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
Self::install(db).await?;
|
Self::install(db).await?;
|
||||||
Ok(Self::get_migration_with_status(db)
|
Ok(Self::get_migration_with_status(db)
|
||||||
.await?
|
.await?
|
||||||
|
|
@ -109,7 +123,10 @@ pub trait MigratorTrait: Send {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get list of applied migrations
|
/// Get list of applied migrations
|
||||||
async fn get_applied_migrations(db: &DbConn) -> Result<Vec<Migration>, DbErr> {
|
async fn get_applied_migrations<C>(db: &C) -> Result<Vec<Migration>, DbErr>
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
Self::install(db).await?;
|
Self::install(db).await?;
|
||||||
Ok(Self::get_migration_with_status(db)
|
Ok(Self::get_migration_with_status(db)
|
||||||
.await?
|
.await?
|
||||||
|
|
@ -119,7 +136,10 @@ pub trait MigratorTrait: Send {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create migration table `seaql_migrations` in the database
|
/// Create migration table `seaql_migrations` in the database
|
||||||
async fn install(db: &DbConn) -> Result<(), DbErr> {
|
async fn install<C>(db: &C) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
let builder = db.get_database_backend();
|
let builder = db.get_database_backend();
|
||||||
let schema = Schema::new(builder);
|
let schema = Schema::new(builder);
|
||||||
let mut stmt = schema.create_table_from_entity(seaql_migrations::Entity);
|
let mut stmt = schema.create_table_from_entity(seaql_migrations::Entity);
|
||||||
|
|
@ -127,103 +147,11 @@ pub trait MigratorTrait: Send {
|
||||||
db.execute(builder.build(&stmt)).await.map(|_| ())
|
db.execute(builder.build(&stmt)).await.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drop all tables from the database, then reapply all migrations
|
|
||||||
async fn fresh(db: &DbConn) -> Result<(), DbErr> {
|
|
||||||
Self::install(db).await?;
|
|
||||||
let db_backend = db.get_database_backend();
|
|
||||||
|
|
||||||
// Temporarily disable the foreign key check
|
|
||||||
if db_backend == DbBackend::Sqlite {
|
|
||||||
info!("Disabling foreign key check");
|
|
||||||
db.execute(Statement::from_string(
|
|
||||||
db_backend,
|
|
||||||
"PRAGMA foreign_keys = OFF".to_owned(),
|
|
||||||
))
|
|
||||||
.await?;
|
|
||||||
info!("Foreign key check disabled");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drop all foreign keys
|
|
||||||
if db_backend == DbBackend::MySql {
|
|
||||||
info!("Dropping all foreign keys");
|
|
||||||
let mut stmt = Query::select();
|
|
||||||
stmt.columns([Alias::new("TABLE_NAME"), Alias::new("CONSTRAINT_NAME")])
|
|
||||||
.from((
|
|
||||||
Alias::new("information_schema"),
|
|
||||||
Alias::new("table_constraints"),
|
|
||||||
))
|
|
||||||
.cond_where(
|
|
||||||
Condition::all()
|
|
||||||
.add(
|
|
||||||
Expr::expr(get_current_schema(db)).equals(
|
|
||||||
Alias::new("table_constraints"),
|
|
||||||
Alias::new("table_schema"),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.add(Expr::expr(Expr::value("FOREIGN KEY")).equals(
|
|
||||||
Alias::new("table_constraints"),
|
|
||||||
Alias::new("constraint_type"),
|
|
||||||
)),
|
|
||||||
);
|
|
||||||
let rows = db.query_all(db_backend.build(&stmt)).await?;
|
|
||||||
for row in rows.into_iter() {
|
|
||||||
let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?;
|
|
||||||
let table_name: String = row.try_get("", "TABLE_NAME")?;
|
|
||||||
info!(
|
|
||||||
"Dropping foreign key '{}' from table '{}'",
|
|
||||||
constraint_name, table_name
|
|
||||||
);
|
|
||||||
let mut stmt = ForeignKey::drop();
|
|
||||||
stmt.table(Alias::new(table_name.as_str()))
|
|
||||||
.name(constraint_name.as_str());
|
|
||||||
db.execute(db_backend.build(&stmt)).await?;
|
|
||||||
info!("Foreign key '{}' has been dropped", constraint_name);
|
|
||||||
}
|
|
||||||
info!("All foreign keys dropped");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drop all tables
|
|
||||||
let stmt = query_tables(db);
|
|
||||||
let rows = db.query_all(db_backend.build(&stmt)).await?;
|
|
||||||
for row in rows.into_iter() {
|
|
||||||
let table_name: String = row.try_get("", "table_name")?;
|
|
||||||
info!("Dropping table '{}'", table_name);
|
|
||||||
let mut stmt = Table::drop();
|
|
||||||
stmt.table(Alias::new(table_name.as_str()))
|
|
||||||
.if_exists()
|
|
||||||
.cascade();
|
|
||||||
db.execute(db_backend.build(&stmt)).await?;
|
|
||||||
info!("Table '{}' has been dropped", table_name);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restore the foreign key check
|
|
||||||
if db_backend == DbBackend::Sqlite {
|
|
||||||
info!("Restoring foreign key check");
|
|
||||||
db.execute(Statement::from_string(
|
|
||||||
db_backend,
|
|
||||||
"PRAGMA foreign_keys = ON".to_owned(),
|
|
||||||
))
|
|
||||||
.await?;
|
|
||||||
info!("Foreign key check restored");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reapply all migrations
|
|
||||||
Self::up(db, None).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Rollback all applied migrations, then reapply all migrations
|
|
||||||
async fn refresh(db: &DbConn) -> Result<(), DbErr> {
|
|
||||||
Self::down(db, None).await?;
|
|
||||||
Self::up(db, None).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Rollback all applied migrations
|
|
||||||
async fn reset(db: &DbConn) -> Result<(), DbErr> {
|
|
||||||
Self::down(db, None).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check the status of all migrations
|
/// Check the status of all migrations
|
||||||
async fn status(db: &DbConn) -> Result<(), DbErr> {
|
async fn status<C>(db: &C) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
Self::install(db).await?;
|
Self::install(db).await?;
|
||||||
|
|
||||||
info!("Checking migration status");
|
info!("Checking migration status");
|
||||||
|
|
@ -235,81 +163,257 @@ pub trait MigratorTrait: Send {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Drop all tables from the database, then reapply all migrations
|
||||||
|
async fn fresh<'c, C>(db: C) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
C: IntoSchemaManagerConnection<'c>,
|
||||||
|
{
|
||||||
|
exec_with_connection::<'_, _, _, Self>(db, move |manager| {
|
||||||
|
Box::pin(async move { exec_fresh::<Self>(manager).await })
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rollback all applied migrations, then reapply all migrations
|
||||||
|
async fn refresh<'c, C>(db: C) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
C: IntoSchemaManagerConnection<'c>,
|
||||||
|
{
|
||||||
|
exec_with_connection::<'_, _, _, Self>(db, move |manager| {
|
||||||
|
Box::pin(async move {
|
||||||
|
exec_down::<Self>(manager, None).await?;
|
||||||
|
exec_up::<Self>(manager, None).await
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rollback all applied migrations
|
||||||
|
async fn reset<'c, C>(db: C) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
C: IntoSchemaManagerConnection<'c>,
|
||||||
|
{
|
||||||
|
exec_with_connection::<'_, _, _, Self>(db, move |manager| {
|
||||||
|
Box::pin(async move { exec_down::<Self>(manager, None).await })
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
/// Apply pending migrations
|
/// Apply pending migrations
|
||||||
async fn up(db: &DbConn, mut steps: Option<u32>) -> Result<(), DbErr> {
|
async fn up<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
|
||||||
Self::install(db).await?;
|
where
|
||||||
let manager = SchemaManager::new(db);
|
C: IntoSchemaManagerConnection<'c>,
|
||||||
|
{
|
||||||
if let Some(steps) = steps {
|
exec_with_connection::<'_, _, _, Self>(db, move |manager| {
|
||||||
info!("Applying {} pending migrations", steps);
|
Box::pin(async move { exec_up::<Self>(manager, steps).await })
|
||||||
} else {
|
})
|
||||||
info!("Applying all pending migrations");
|
.await
|
||||||
}
|
|
||||||
|
|
||||||
let migrations = Self::get_pending_migrations(db).await?.into_iter();
|
|
||||||
if migrations.len() == 0 {
|
|
||||||
info!("No pending migrations");
|
|
||||||
}
|
|
||||||
for Migration { migration, .. } in migrations {
|
|
||||||
if let Some(steps) = steps.as_mut() {
|
|
||||||
if steps == &0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
*steps -= 1;
|
|
||||||
}
|
|
||||||
info!("Applying migration '{}'", migration.name());
|
|
||||||
migration.up(&manager).await?;
|
|
||||||
info!("Migration '{}' has been applied", migration.name());
|
|
||||||
let now = SystemTime::now()
|
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
|
||||||
.expect("SystemTime before UNIX EPOCH!");
|
|
||||||
seaql_migrations::ActiveModel {
|
|
||||||
version: ActiveValue::Set(migration.name().to_owned()),
|
|
||||||
applied_at: ActiveValue::Set(now.as_secs() as i64),
|
|
||||||
}
|
|
||||||
.insert(db)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rollback applied migrations
|
/// Rollback applied migrations
|
||||||
async fn down(db: &DbConn, mut steps: Option<u32>) -> Result<(), DbErr> {
|
async fn down<'c, C>(db: C, steps: Option<u32>) -> Result<(), DbErr>
|
||||||
Self::install(db).await?;
|
where
|
||||||
let manager = SchemaManager::new(db);
|
C: IntoSchemaManagerConnection<'c>,
|
||||||
|
{
|
||||||
if let Some(steps) = steps {
|
exec_with_connection::<'_, _, _, Self>(db, move |manager| {
|
||||||
info!("Rolling back {} applied migrations", steps);
|
Box::pin(async move { exec_down::<Self>(manager, steps).await })
|
||||||
} else {
|
})
|
||||||
info!("Rolling back all applied migrations");
|
.await
|
||||||
}
|
|
||||||
|
|
||||||
let migrations = Self::get_applied_migrations(db).await?.into_iter().rev();
|
|
||||||
if migrations.len() == 0 {
|
|
||||||
info!("No applied migrations");
|
|
||||||
}
|
|
||||||
for Migration { migration, .. } in migrations {
|
|
||||||
if let Some(steps) = steps.as_mut() {
|
|
||||||
if steps == &0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
*steps -= 1;
|
|
||||||
}
|
|
||||||
info!("Rolling back migration '{}'", migration.name());
|
|
||||||
migration.down(&manager).await?;
|
|
||||||
info!("Migration '{}' has been rollbacked", migration.name());
|
|
||||||
seaql_migrations::Entity::delete_many()
|
|
||||||
.filter(seaql_migrations::Column::Version.eq(migration.name()))
|
|
||||||
.exec(db)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn query_tables(db: &DbConn) -> SelectStatement {
|
async fn exec_with_connection<'c, C, F, M>(db: C, f: F) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
C: IntoSchemaManagerConnection<'c>,
|
||||||
|
F: for<'b> Fn(
|
||||||
|
&'b SchemaManager<'_>,
|
||||||
|
) -> Pin<Box<dyn Future<Output = Result<(), DbErr>> + Send + 'b>>,
|
||||||
|
M: MigratorTrait + ?Sized,
|
||||||
|
{
|
||||||
|
let db = db.into_schema_manager_connection();
|
||||||
|
|
||||||
|
match db.get_database_backend() {
|
||||||
|
DbBackend::Postgres => {
|
||||||
|
let transaction = db.begin().await?;
|
||||||
|
let manager = SchemaManager::new(&transaction);
|
||||||
|
f(&manager).await?;
|
||||||
|
transaction.commit().await
|
||||||
|
}
|
||||||
|
DbBackend::MySql | DbBackend::Sqlite => {
|
||||||
|
let manager = SchemaManager::new(db);
|
||||||
|
f(&manager).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn exec_fresh<M>(manager: &SchemaManager<'_>) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
M: MigratorTrait + ?Sized,
|
||||||
|
{
|
||||||
|
let db = manager.get_connection();
|
||||||
|
|
||||||
|
M::install(db).await?;
|
||||||
|
let db_backend = db.get_database_backend();
|
||||||
|
|
||||||
|
// Temporarily disable the foreign key check
|
||||||
|
if db_backend == DbBackend::Sqlite {
|
||||||
|
info!("Disabling foreign key check");
|
||||||
|
db.execute(Statement::from_string(
|
||||||
|
db_backend,
|
||||||
|
"PRAGMA foreign_keys = OFF".to_owned(),
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
info!("Foreign key check disabled");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop all foreign keys
|
||||||
|
if db_backend == DbBackend::MySql {
|
||||||
|
info!("Dropping all foreign keys");
|
||||||
|
let stmt = query_mysql_foreign_keys(db);
|
||||||
|
let rows = db.query_all(db_backend.build(&stmt)).await?;
|
||||||
|
for row in rows.into_iter() {
|
||||||
|
let constraint_name: String = row.try_get("", "CONSTRAINT_NAME")?;
|
||||||
|
let table_name: String = row.try_get("", "TABLE_NAME")?;
|
||||||
|
info!(
|
||||||
|
"Dropping foreign key '{}' from table '{}'",
|
||||||
|
constraint_name, table_name
|
||||||
|
);
|
||||||
|
let mut stmt = ForeignKey::drop();
|
||||||
|
stmt.table(Alias::new(table_name.as_str()))
|
||||||
|
.name(constraint_name.as_str());
|
||||||
|
db.execute(db_backend.build(&stmt)).await?;
|
||||||
|
info!("Foreign key '{}' has been dropped", constraint_name);
|
||||||
|
}
|
||||||
|
info!("All foreign keys dropped");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop all tables
|
||||||
|
let stmt = query_tables(db);
|
||||||
|
let rows = db.query_all(db_backend.build(&stmt)).await?;
|
||||||
|
for row in rows.into_iter() {
|
||||||
|
let table_name: String = row.try_get("", "table_name")?;
|
||||||
|
info!("Dropping table '{}'", table_name);
|
||||||
|
let mut stmt = Table::drop();
|
||||||
|
stmt.table(Alias::new(table_name.as_str()))
|
||||||
|
.if_exists()
|
||||||
|
.cascade();
|
||||||
|
db.execute(db_backend.build(&stmt)).await?;
|
||||||
|
info!("Table '{}' has been dropped", table_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop all types
|
||||||
|
if db_backend == DbBackend::Postgres {
|
||||||
|
info!("Dropping all types");
|
||||||
|
let stmt = query_pg_types(db);
|
||||||
|
let rows = db.query_all(db_backend.build(&stmt)).await?;
|
||||||
|
for row in rows {
|
||||||
|
let type_name: String = row.try_get("", "typname")?;
|
||||||
|
info!("Dropping type '{}'", type_name);
|
||||||
|
let mut stmt = Type::drop();
|
||||||
|
stmt.name(Alias::new(&type_name as &str));
|
||||||
|
db.execute(db_backend.build(&stmt)).await?;
|
||||||
|
info!("Type '{}' has been dropped", type_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore the foreign key check
|
||||||
|
if db_backend == DbBackend::Sqlite {
|
||||||
|
info!("Restoring foreign key check");
|
||||||
|
db.execute(Statement::from_string(
|
||||||
|
db_backend,
|
||||||
|
"PRAGMA foreign_keys = ON".to_owned(),
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
info!("Foreign key check restored");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reapply all migrations
|
||||||
|
exec_up::<M>(manager, None).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn exec_up<M>(manager: &SchemaManager<'_>, mut steps: Option<u32>) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
M: MigratorTrait + ?Sized,
|
||||||
|
{
|
||||||
|
let db = manager.get_connection();
|
||||||
|
|
||||||
|
M::install(db).await?;
|
||||||
|
|
||||||
|
if let Some(steps) = steps {
|
||||||
|
info!("Applying {} pending migrations", steps);
|
||||||
|
} else {
|
||||||
|
info!("Applying all pending migrations");
|
||||||
|
}
|
||||||
|
|
||||||
|
let migrations = M::get_pending_migrations(db).await?.into_iter();
|
||||||
|
if migrations.len() == 0 {
|
||||||
|
info!("No pending migrations");
|
||||||
|
}
|
||||||
|
for Migration { migration, .. } in migrations {
|
||||||
|
if let Some(steps) = steps.as_mut() {
|
||||||
|
if steps == &0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
*steps -= 1;
|
||||||
|
}
|
||||||
|
info!("Applying migration '{}'", migration.name());
|
||||||
|
migration.up(manager).await?;
|
||||||
|
info!("Migration '{}' has been applied", migration.name());
|
||||||
|
let now = SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.expect("SystemTime before UNIX EPOCH!");
|
||||||
|
seaql_migrations::ActiveModel {
|
||||||
|
version: ActiveValue::Set(migration.name().to_owned()),
|
||||||
|
applied_at: ActiveValue::Set(now.as_secs() as i64),
|
||||||
|
}
|
||||||
|
.insert(db)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn exec_down<M>(manager: &SchemaManager<'_>, mut steps: Option<u32>) -> Result<(), DbErr>
|
||||||
|
where
|
||||||
|
M: MigratorTrait + ?Sized,
|
||||||
|
{
|
||||||
|
let db = manager.get_connection();
|
||||||
|
|
||||||
|
M::install(db).await?;
|
||||||
|
|
||||||
|
if let Some(steps) = steps {
|
||||||
|
info!("Rolling back {} applied migrations", steps);
|
||||||
|
} else {
|
||||||
|
info!("Rolling back all applied migrations");
|
||||||
|
}
|
||||||
|
|
||||||
|
let migrations = M::get_applied_migrations(db).await?.into_iter().rev();
|
||||||
|
if migrations.len() == 0 {
|
||||||
|
info!("No applied migrations");
|
||||||
|
}
|
||||||
|
for Migration { migration, .. } in migrations {
|
||||||
|
if let Some(steps) = steps.as_mut() {
|
||||||
|
if steps == &0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
*steps -= 1;
|
||||||
|
}
|
||||||
|
info!("Rolling back migration '{}'", migration.name());
|
||||||
|
migration.down(manager).await?;
|
||||||
|
info!("Migration '{}' has been rollbacked", migration.name());
|
||||||
|
seaql_migrations::Entity::delete_many()
|
||||||
|
.filter(seaql_migrations::Column::Version.eq(migration.name()))
|
||||||
|
.exec(db)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query_tables<C>(db: &C) -> SelectStatement
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
match db.get_database_backend() {
|
match db.get_database_backend() {
|
||||||
DbBackend::MySql => MySql::query_tables(),
|
DbBackend::MySql => MySql::query_tables(),
|
||||||
DbBackend::Postgres => Postgres::query_tables(),
|
DbBackend::Postgres => Postgres::query_tables(),
|
||||||
|
|
@ -317,10 +421,95 @@ pub(crate) fn query_tables(db: &DbConn) -> SelectStatement {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn get_current_schema(db: &DbConn) -> SimpleExpr {
|
fn get_current_schema<C>(db: &C) -> SimpleExpr
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
match db.get_database_backend() {
|
match db.get_database_backend() {
|
||||||
DbBackend::MySql => MySql::get_current_schema(),
|
DbBackend::MySql => MySql::get_current_schema(),
|
||||||
DbBackend::Postgres => Postgres::get_current_schema(),
|
DbBackend::Postgres => Postgres::get_current_schema(),
|
||||||
DbBackend::Sqlite => unimplemented!(),
|
DbBackend::Sqlite => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Iden)]
|
||||||
|
enum InformationSchema {
|
||||||
|
#[iden = "information_schema"]
|
||||||
|
Schema,
|
||||||
|
#[iden = "TABLE_NAME"]
|
||||||
|
TableName,
|
||||||
|
#[iden = "CONSTRAINT_NAME"]
|
||||||
|
ConstraintName,
|
||||||
|
TableConstraints,
|
||||||
|
TableSchema,
|
||||||
|
ConstraintType,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query_mysql_foreign_keys<C>(db: &C) -> SelectStatement
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
|
let mut stmt = Query::select();
|
||||||
|
stmt.columns([
|
||||||
|
InformationSchema::TableName,
|
||||||
|
InformationSchema::ConstraintName,
|
||||||
|
])
|
||||||
|
.from((
|
||||||
|
InformationSchema::Schema,
|
||||||
|
InformationSchema::TableConstraints,
|
||||||
|
))
|
||||||
|
.cond_where(
|
||||||
|
Condition::all()
|
||||||
|
.add(Expr::expr(get_current_schema(db)).equals((
|
||||||
|
InformationSchema::TableConstraints,
|
||||||
|
InformationSchema::TableSchema,
|
||||||
|
)))
|
||||||
|
.add(
|
||||||
|
Expr::col((
|
||||||
|
InformationSchema::TableConstraints,
|
||||||
|
InformationSchema::ConstraintType,
|
||||||
|
))
|
||||||
|
.eq("FOREIGN KEY"),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Iden)]
|
||||||
|
enum PgType {
|
||||||
|
Table,
|
||||||
|
Typname,
|
||||||
|
Typnamespace,
|
||||||
|
Typelem,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Iden)]
|
||||||
|
enum PgNamespace {
|
||||||
|
Table,
|
||||||
|
Oid,
|
||||||
|
Nspname,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query_pg_types<C>(db: &C) -> SelectStatement
|
||||||
|
where
|
||||||
|
C: ConnectionTrait,
|
||||||
|
{
|
||||||
|
let mut stmt = Query::select();
|
||||||
|
stmt.column(PgType::Typname)
|
||||||
|
.from(PgType::Table)
|
||||||
|
.join(
|
||||||
|
JoinType::LeftJoin,
|
||||||
|
PgNamespace::Table,
|
||||||
|
Expr::col((PgNamespace::Table, PgNamespace::Oid))
|
||||||
|
.equals((PgType::Table, PgType::Typnamespace)),
|
||||||
|
)
|
||||||
|
.cond_where(
|
||||||
|
Condition::all()
|
||||||
|
.add(
|
||||||
|
Expr::expr(get_current_schema(db))
|
||||||
|
.equals((PgNamespace::Table, PgNamespace::Nspname)),
|
||||||
|
)
|
||||||
|
.add(Expr::col((PgType::Table, PgType::Typelem)).eq(0)),
|
||||||
|
);
|
||||||
|
stmt
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,7 @@
|
||||||
//pub use super::cli;
|
//pub use super::cli;
|
||||||
|
|
||||||
|
pub use super::connection::IntoSchemaManagerConnection;
|
||||||
|
pub use super::connection::SchemaManagerConnection;
|
||||||
pub use super::manager::SchemaManager;
|
pub use super::manager::SchemaManager;
|
||||||
pub use super::migrator::MigratorTrait;
|
pub use super::migrator::MigratorTrait;
|
||||||
pub use super::{MigrationName, MigrationTrait};
|
pub use super::{MigrationName, MigrationTrait};
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
use sea_orm::entity::prelude::*;
|
use sea_orm::entity::prelude::*;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, DeriveEntityModel)]
|
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
|
||||||
#[sea_orm(table_name = "seaql_migrations")]
|
#[sea_orm(table_name = "seaql_migrations")]
|
||||||
pub struct Model {
|
pub struct Model {
|
||||||
#[sea_orm(primary_key, auto_increment = false)]
|
#[sea_orm(primary_key, auto_increment = false)]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue