Skip to content

Commit

Permalink
feat(su): boot loader #730
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Sep 4, 2024
1 parent 7465389 commit 7f83842
Show file tree
Hide file tree
Showing 15 changed files with 575 additions and 1,110 deletions.
895 changes: 28 additions & 867 deletions servers/su/Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion servers/su/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ default-run = "su"
[dependencies]
actix-web = "4"
async-trait = "0.1.74"
bundlr-sdk = "0.5.0"
reqwest = "0.11.22"
serde = "1.0.188"
serde_json = "1.0.107"
Expand Down Expand Up @@ -35,6 +34,8 @@ rocksdb = "0.22.0"
actix-web-prom = { version = "0.8.0", features = ["process"] }
prometheus = { version = "0.13.4", features = ["process"] }
lru = "0.12.4"
lazy_static = "1.5.0"
avro-rs = "0.13.0"

[[bin]]
name = "su"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE processes
DROP COLUMN epoch,
DROP COLUMN nonce,
DROP COLUMN hash_chain,
DROP COLUMN "timestamp";
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE processes
ADD COLUMN epoch INTEGER NULL,
ADD COLUMN nonce INTEGER NULL,
ADD COLUMN hash_chain TEXT NULL,
ADD COLUMN "timestamp" BIGINT NULL;
4 changes: 4 additions & 0 deletions servers/su/src/domain/clients/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ table! {
process_id -> Varchar,
process_data -> Jsonb,
bundle -> Bytea,
epoch -> Nullable<Int4>,
nonce -> Nullable<Int4>,
timestamp -> Nullable<BigInt>,
hash_chain -> Nullable<Text>,
}
}

Expand Down
217 changes: 114 additions & 103 deletions servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,21 @@ impl DataStore for StoreClient {
use super::schema::processes::dsl::*;
let conn = &mut self.get_conn()?;

let (process_epoch, process_hash_chain, process_timestamp, process_nonce) = (
process.epoch().ok(),
process.hash_chain().ok(),
process.timestamp().ok(),
process.nonce().ok(),
);

let new_process = NewProcess {
process_id: &process.process_id,
process_id: &process.process.process_id,
process_data: serde_json::to_value(process).expect("Failed to serialize Process"),
bundle: bundle_in,
epoch: process_epoch,
hash_chain: process_hash_chain.as_deref(),
nonce: process_nonce,
timestamp: process_timestamp,
};

match diesel::insert_into(processes)
Expand Down Expand Up @@ -533,7 +544,7 @@ impl DataStore for StoreClient {

match db_process_result {
Ok(Some(db_process)) => {
let process: Process = serde_json::from_value(db_process.process_data.clone())?;
let process: Process = Process::from_val(&db_process.process_data)?;
self.in_memory_cache
.insert_process(process_id_in.to_string(), process.clone())
.await;
Expand Down Expand Up @@ -629,14 +640,16 @@ impl DataStore for StoreClient {

async fn get_messages(
&self,
process_id_in: &str,
process_in: &Process,
from: &Option<String>,
to: &Option<String>,
limit: &Option<i32>,
) -> Result<PaginatedMessages, StoreErrorType> {
use super::schema::messages::dsl::*;
let conn = &mut self.get_read_conn()?;
let mut query = messages.filter(process_id.eq(process_id_in)).into_boxed();
let mut query = messages
.filter(process_id.eq(process_in.process.process_id.clone()))
.into_boxed();

// Apply 'from' timestamp filtering if 'from' is provided
if let Some(from_timestamp_str) = from {
Expand All @@ -655,113 +668,103 @@ impl DataStore for StoreClient {
}

// Apply limit, converting Option<i32> to i64 and adding 1 to check for the next page
let limit_val = limit.unwrap_or(5000) as i64; // Default limit if none is provided

if self.bytestore.clone().is_ready() {
let db_messages_result: Result<Vec<DbMessageWithoutData>, DieselError> = query
.select((
row_id,
process_id,
message_id,
assignment_id,
epoch,
nonce,
timestamp,
hash_chain,
))
.order(timestamp.asc())
.limit(limit_val + 1) // Fetch one extra record to determine if a next page exists
.load(conn);

match db_messages_result {
Ok(db_messages) => {
let has_next_page = db_messages.len() as i64 > limit_val;
// Take only up to the limit if there's an extra indicating a next page
let messages_o = if has_next_page {
&db_messages[..(limit_val as usize)]
} else {
&db_messages[..]
};

let message_ids: Vec<(String, Option<String>, String, String)> = messages_o
.iter()
.map(|msg| {
(
msg.message_id.clone(),
msg.assignment_id.clone(),
msg.process_id.clone(),
msg.timestamp.to_string().clone(),
)
})
.collect();
let limit_val = limit.unwrap_or(100) as i64; // Default limit if none is provided

// Determine if the 'from' timestamp matches the process timestamp and if assignment is present
let include_process = process_in.assignment.is_some()
&& match from {
Some(from_timestamp_str) => {
let from_timestamp = from_timestamp_str
.parse::<i64>()
.map_err(StoreErrorType::from)?;
from_timestamp == process_in.process.timestamp
}
None => true, // No 'from' timestamp means it's the first page
};

let binaries = self.bytestore.clone().read_binaries(message_ids).await?;
// If including the process, reduce the limit for the database query by 1
let adjusted_limit_val = if include_process {
limit_val - 1
} else {
limit_val
};

let mut messages_mapped: Vec<Message> = vec![];
let db_messages_result: Result<Vec<DbMessageWithoutData>, DieselError> = query
.select((
row_id,
process_id,
message_id,
assignment_id,
epoch,
nonce,
timestamp,
hash_chain,
))
.order(timestamp.asc())
.limit(adjusted_limit_val + 1) // Fetch one extra record to determine if a next page exists
.load(conn);

for db_message in messages_o.iter() {
/*
binaries is keyed by the tuple (message_id, assignment_id, process_id, timestamp)
*/
match binaries.get(&(
db_message.message_id.clone(),
db_message.assignment_id.clone(),
db_message.process_id.clone(),
db_message.timestamp.to_string().clone(),
)) {
Some(bytes_result) => {
let mapped = Message::from_bytes(bytes_result.clone())?;
messages_mapped.push(mapped);
}
None => {
/*
If for some reason we dont have a file available
this is a fall back to the database
*/
let full_message = self.get_message_internal(
&db_message.message_id,
&db_message.assignment_id,
)?;
messages_mapped.push(full_message);
}
match db_messages_result {
Ok(db_messages) => {
let has_next_page = db_messages.len() as i64 > adjusted_limit_val;

// Take only up to the limit if there's an extra indicating a next page
let messages_o = if has_next_page {
&db_messages[..(adjusted_limit_val as usize)]
} else {
&db_messages[..]
};

let mut messages_mapped: Vec<Message> = vec![];

// Include the process as the first message if determined to be on the first page and has assignment
if include_process {
let process_message = Message::from_process(process_in.clone())?;
messages_mapped.push(process_message);
}

// Map database messages to the Message struct
let message_ids: Vec<(String, Option<String>, String, String)> = messages_o
.iter()
.map(|msg| {
(
msg.message_id.clone(),
msg.assignment_id.clone(),
msg.process_id.clone(),
msg.timestamp.to_string().clone(),
)
})
.collect();

let binaries = self.bytestore.clone().read_binaries(message_ids).await?;

for db_message in messages_o.iter() {
match binaries.get(&(
db_message.message_id.clone(),
db_message.assignment_id.clone(),
db_message.process_id.clone(),
db_message.timestamp.to_string().clone(),
)) {
Some(bytes_result) => {
let mapped = Message::from_bytes(bytes_result.clone())?;
messages_mapped.push(mapped);
}
None => {
// Fall back to the database if the binary isn't available
let full_message = self.get_message_internal(
&db_message.message_id,
&db_message.assignment_id,
)?;
messages_mapped.push(full_message);
}
}
let paginated =
PaginatedMessages::from_messages(messages_mapped, has_next_page)?;
Ok(paginated)
}
Err(e) => Err(StoreErrorType::from(e)),
}
} else {
let db_messages_result: Result<Vec<DbMessage>, DieselError> = query
.order(timestamp.asc())
.limit(limit_val + 1) // Fetch one extra record to determine if a next page exists
.load(conn);

match db_messages_result {
Ok(db_messages) => {
let has_next_page = db_messages.len() as i64 > limit_val;
// Take only up to the limit if there's an extra indicating a next page
let messages_o = if has_next_page {
&db_messages[..(limit_val as usize)]
} else {
&db_messages[..]
};

let mut messages_mapped: Vec<Message> = vec![];
for db_message in messages_o.iter() {
let json = serde_json::from_value(db_message.message_data.clone())?;
let bytes: Vec<u8> = db_message.bundle.clone();
let mapped = Message::from_val(&json, bytes)?;
messages_mapped.push(mapped);
}

let paginated =
PaginatedMessages::from_messages(messages_mapped, has_next_page)?;
Ok(paginated)
}
Err(e) => Err(StoreErrorType::from(e)),
// Create paginated result
let paginated = PaginatedMessages::from_messages(messages_mapped, has_next_page)?;
Ok(paginated)
}
Err(e) => Err(StoreErrorType::from(e)),
}
}

Expand Down Expand Up @@ -989,6 +992,10 @@ pub struct DbProcess {
pub process_id: String,
pub process_data: serde_json::Value,
pub bundle: Vec<u8>,
pub epoch: Option<i32>,
pub nonce: Option<i32>,
pub timestamp: Option<i64>,
pub hash_chain: Option<String>,
}

#[derive(Queryable, Selectable)]
Expand Down Expand Up @@ -1041,6 +1048,10 @@ pub struct NewProcess<'a> {
pub process_id: &'a str,
pub process_data: serde_json::Value,
pub bundle: &'a [u8],
pub epoch: Option<i32>, // New nullable field
pub nonce: Option<i32>, // New nullable field
pub hash_chain: Option<&'a str>, // New nullable field
pub timestamp: Option<i64>, // New nullable field
}

#[derive(Queryable, Selectable)]
Expand Down
Loading

0 comments on commit 7f83842

Please sign in to comment.