Skip to content

Commit

Permalink
Merge pull request #4509 from EthanYuan/develop
Browse files Browse the repository at this point in the history
[Need Migration] Improve query performance of `get_cells` in rich-indexer
  • Loading branch information
zhangsoledad authored Sep 25, 2024
2 parents c3b933b + 4b7d39a commit 21f49f2
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 21 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions util/rich-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ num-bigint = "0.4"
once_cell = "1.8.0"
sql-builder = "3.1"
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres"] }
include_dir = "0.7"
tempfile = "3"

[dev-dependencies]
hex = "0.4"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- 20240630_add_is_spent_to_output.sql

ALTER TABLE output
ADD COLUMN is_spent INTEGER DEFAULT 0;

UPDATE output
SET is_spent = 1
FROM input
WHERE input.output_id = output.id;
26 changes: 26 additions & 0 deletions util/rich-indexer/src/indexer/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,32 @@ pub(crate) async fn bulk_insert_tx_association_cell_dep_table(
.await
}

pub(crate) async fn spend_cell(
out_point: &OutPoint,
tx: &mut Transaction<'_, Any>,
) -> Result<bool, Error> {
let output_tx_hash = out_point.tx_hash().raw_data().to_vec();
let output_index: u32 = out_point.index().unpack();

let updated_rows = sqlx::query(
r#"
UPDATE output
SET is_spent = 1
WHERE
tx_id = (SELECT ckb_transaction.id FROM ckb_transaction WHERE tx_hash = $1)
AND output_index = $2
"#,
)
.bind(output_tx_hash)
.bind(output_index as i32)
.execute(&mut *tx)
.await
.map_err(|err| Error::DB(err.to_string()))?
.rows_affected();

Ok(updated_rows > 0)
}

pub(crate) async fn query_output_cell(
out_point: &OutPoint,
tx: &mut Transaction<'_, Any>,
Expand Down
3 changes: 3 additions & 0 deletions util/rich-indexer/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ impl AsyncRichIndexer {
if tx_index != 0 {
for (input_index, input) in tx_view.inputs().into_iter().enumerate() {
let out_point = input.previous_output();
if !spend_cell(&out_point, tx).await? {
break;
}
if self.custom_filters.is_cell_filter_enabled() {
if let Some((output_id, output, output_data)) =
query_output_cell(&out_point, tx).await?
Expand Down
25 changes: 25 additions & 0 deletions util/rich-indexer/src/indexer/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub(crate) async fn rollback_block(tx: &mut Transaction<'_, Any>) -> Result<(),
let tx_id_list = query_tx_id_list_by_block_id(block_id, tx).await?;
let output_lock_type_list = query_outputs_by_tx_id_list(&tx_id_list, tx).await?;

// update spent cells
reset_spent_cells(&tx_id_list, tx).await?;

// remove transactions, associations, inputs, output
remove_batch_by_blobs("ckb_transaction", "id", &tx_id_list, tx).await?;
remove_batch_by_blobs("tx_association_cell_dep", "tx_id", &tx_id_list, tx).await?;
Expand Down Expand Up @@ -80,6 +83,28 @@ async fn remove_batch_by_blobs(
Ok(())
}

async fn reset_spent_cells(tx_id_list: &[i64], tx: &mut Transaction<'_, Any>) -> Result<(), Error> {
let query = SqlBuilder::update_table("output")
.set("is_spent", 0)
.and_where_in_query(
"id",
SqlBuilder::select_from("input")
.field("output_id")
.and_where_in("consumed_tx_id", tx_id_list)
.query()
.map_err(|err| Error::DB(err.to_string()))?,
)
.sql()
.map_err(|err| Error::DB(err.to_string()))?;

sqlx::query(&query)
.execute(&mut *tx)
.await
.map_err(|err| Error::DB(err.to_string()))?;

Ok(())
}

async fn query_uncle_id_list_by_block_id(
block_id: i64,
tx: &mut Transaction<'_, Any>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ impl AsyncRichIndexerHandle {
.join(name!("script";"lock_script"))
.on("output.lock_script_id = lock_script.id"),
}
.join("input")
.on("output.id = input.output_id")
.and_where("input.output_id IS NULL"); // live cells
.and_where("output.is_spent = 0"); // live cells

// filter cells in pool
let mut dead_cells = Vec::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,25 @@ impl AsyncRichIndexerHandle {
.join("ckb_transaction")
.on("output.tx_id = ckb_transaction.id");
}
query_builder
.left()
.join("input")
.on("output.id = input.output_id");
if let Some(ref filter) = search_key.filter {
if filter.script.is_some() || filter.script_len_range.is_some() {
match search_key.script_type {
IndexerScriptType::Lock => {
query_builder
.left()
.join(name!("script";"type_script"))
.on("output.type_script_id = type_script.id");
}
IndexerScriptType::Type => {
query_builder
.left()
.join(name!("script";"lock_script"))
.on("output.lock_script_id = lock_script.id");
}
}
}
}
query_builder.and_where("input.output_id IS NULL"); // live cells
query_builder.and_where("output.is_spent = 0"); // live cells

// filter cells in pool
let mut dead_cells = Vec::new();
Expand Down
40 changes: 27 additions & 13 deletions util/rich-indexer/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use anyhow::{anyhow, Result};
use ckb_app_config::{DBDriver, RichIndexerConfig};
use futures::TryStreamExt;
use include_dir::{include_dir, Dir};
use log::LevelFilter;
use once_cell::sync::OnceCell;
use sqlx::{
any::{Any, AnyArguments, AnyConnectOptions, AnyPool, AnyPoolOptions, AnyRow},
migrate::Migrator,
query::{Query, QueryAs},
ConnectOptions, IntoArguments, Row, Transaction,
};
use tempfile::tempdir;

use std::fs::OpenOptions;
use std::fs::{self, OpenOptions};
use std::marker::{Send, Unpin};
use std::path::PathBuf;
use std::str::FromStr;
Expand All @@ -20,6 +23,7 @@ const SQL_SQLITE_CREATE_TABLE: &str = include_str!("../resources/create_sqlite_t
const SQL_SQLITE_CREATE_INDEX: &str = include_str!("../resources/create_sqlite_index.sql");
const SQL_POSTGRES_CREATE_TABLE: &str = include_str!("../resources/create_postgres_table.sql");
const SQL_POSTGRES_CREATE_INDEX: &str = include_str!("../resources/create_postgres_index.sql");
static MIGRATIONS_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/resources/migrations");

#[derive(Clone, Default)]
pub struct SQLXPool {
Expand All @@ -36,21 +40,14 @@ impl Debug for SQLXPool {
}

impl SQLXPool {
pub fn new() -> Self {
SQLXPool {
pool: Arc::new(OnceCell::new()),
db_driver: DBDriver::default(),
}
}

pub async fn connect(&mut self, db_config: &RichIndexerConfig) -> Result<()> {
let pool_options = AnyPoolOptions::new()
.max_connections(10)
.min_connections(0)
.acquire_timeout(Duration::from_secs(60))
.max_lifetime(Duration::from_secs(1800))
.idle_timeout(Duration::from_secs(30));
match db_config.db_type {
let pool = match db_config.db_type {
DBDriver::Sqlite => {
let require_init = is_sqlite_require_init(db_config);
let uri = build_url_for_sqlite(db_config);
Expand All @@ -59,13 +56,13 @@ impl SQLXPool {
let pool = pool_options.connect_with(connection_options).await?;
log::info!("SQLite is connected.");
self.pool
.set(pool)
.set(pool.clone())
.map_err(|_| anyhow!("set pool failed!"))?;
if require_init {
self.create_tables_for_sqlite().await?;
}
self.db_driver = DBDriver::Sqlite;
Ok(())
pool
}
DBDriver::Postgres => {
let require_init = self.is_postgres_require_init(db_config).await?;
Expand All @@ -75,15 +72,32 @@ impl SQLXPool {
let pool = pool_options.connect_with(connection_options).await?;
log::info!("PostgreSQL is connected.");
self.pool
.set(pool)
.set(pool.clone())
.map_err(|_| anyhow!("set pool failed"))?;
if require_init {
self.create_tables_for_postgres().await?;
}
self.db_driver = DBDriver::Postgres;
Ok(())
pool
}
};

// Run migrations
log::info!("Running migrations...");
let temp_dir = tempdir()?;
for file in MIGRATIONS_DIR.files() {
log::info!("Found migration file: {:?}", file.path());
let file_path = temp_dir.path().join(file.path());
if let Some(parent) = file_path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(&file_path, file.contents())?;
}
let migrator = Migrator::new(temp_dir.path()).await?;
migrator.run(&pool).await?;
log::info!("Migrations are done.");

Ok(())
}

pub async fn fetch_count(&self, table_name: &str) -> Result<u64> {
Expand Down

0 comments on commit 21f49f2

Please sign in to comment.