Skip to content

Commit

Permalink
fix(su): fix message fetch not working before rocksdb ready
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Sep 5, 2024
1 parent a4775ee commit 33a48b6
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 76 deletions.
192 changes: 116 additions & 76 deletions servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,82 +695,122 @@ impl DataStore for StoreClient {
limit_val
};

let db_messages_result: Result<Vec<DbMessageWithoutData>, DieselError> = query
.select((
row_id,
process_id,
message_id,
assignment_id,
epoch,
nonce,
timestamp,
hash_chain,
))
.order(timestamp.asc())
.limit(adjusted_limit_val + 1) // Fetch one extra record to determine if a next page exists
.load(conn);

match db_messages_result {
Ok(db_messages) => {
let has_next_page = db_messages.len() as i64 > adjusted_limit_val;

// Take only up to the limit if there's an extra indicating a next page
let messages_o = if has_next_page {
&db_messages[..(adjusted_limit_val as usize)]
} else {
&db_messages[..]
};

let mut messages_mapped: Vec<Message> = vec![];

// Include the process as the first message if determined to be on the first page and has assignment
if include_process {
let process_message = Message::from_process(process_in.clone())?;
messages_mapped.push(process_message);
}

// Map database messages to the Message struct
let message_ids: Vec<(String, Option<String>, String, String)> = messages_o
.iter()
.map(|msg| {
(
msg.message_id.clone(),
msg.assignment_id.clone(),
msg.process_id.clone(),
msg.timestamp.to_string().clone(),
)
})
.collect();

let binaries = self.bytestore.clone().read_binaries(message_ids).await?;

for db_message in messages_o.iter() {
match binaries.get(&(
db_message.message_id.clone(),
db_message.assignment_id.clone(),
db_message.process_id.clone(),
db_message.timestamp.to_string().clone(),
)) {
Some(bytes_result) => {
let mapped = Message::from_bytes(bytes_result.clone())?;
messages_mapped.push(mapped);
}
None => {
// Fall back to the database if the binary isn't available
let full_message = self.get_message_internal(
&db_message.message_id,
&db_message.assignment_id,
)?;
messages_mapped.push(full_message);
}
}
}

// Create paginated result
let paginated = PaginatedMessages::from_messages(messages_mapped, has_next_page)?;
Ok(paginated)
}
Err(e) => Err(StoreErrorType::from(e)),
if self.bytestore.clone().is_ready() {
let db_messages_result: Result<Vec<DbMessageWithoutData>, DieselError> = query
.select((
row_id,
process_id,
message_id,
assignment_id,
epoch,
nonce,
timestamp,
hash_chain,
))
.order(timestamp.asc())
.limit(adjusted_limit_val + 1) // Fetch one extra record to determine if a next page exists
.load(conn);

match db_messages_result {
Ok(db_messages) => {
let has_next_page = db_messages.len() as i64 > adjusted_limit_val;

// Take only up to the limit if there's an extra indicating a next page
let messages_o = if has_next_page {
&db_messages[..(adjusted_limit_val as usize)]
} else {
&db_messages[..]
};

let mut messages_mapped: Vec<Message> = vec![];

// Include the process as the first message if determined to be on the first page and has assignment
if include_process {
let process_message = Message::from_process(process_in.clone())?;
messages_mapped.push(process_message);
}

// Map database messages to the Message struct
let message_ids: Vec<(String, Option<String>, String, String)> = messages_o
.iter()
.map(|msg| {
(
msg.message_id.clone(),
msg.assignment_id.clone(),
msg.process_id.clone(),
msg.timestamp.to_string().clone(),
)
})
.collect();

let binaries = self.bytestore.clone().read_binaries(message_ids).await?;

for db_message in messages_o.iter() {
match binaries.get(&(
db_message.message_id.clone(),
db_message.assignment_id.clone(),
db_message.process_id.clone(),
db_message.timestamp.to_string().clone(),
)) {
Some(bytes_result) => {
let mapped = Message::from_bytes(bytes_result.clone())?;
messages_mapped.push(mapped);
}
None => {
// Fall back to the database if the binary isn't available
let full_message = self.get_message_internal(
&db_message.message_id,
&db_message.assignment_id,
)?;
messages_mapped.push(full_message);
}
}
}

// Create paginated result
let paginated = PaginatedMessages::from_messages(messages_mapped, has_next_page)?;
Ok(paginated)
}
Err(e) => Err(StoreErrorType::from(e)),
}
} else {
let db_messages_result: Result<Vec<DbMessage>, DieselError> = query
.order(timestamp.asc())
.limit(adjusted_limit_val + 1) // Fetch one extra record to determine if a next page exists
.load(conn);

match db_messages_result {
Ok(db_messages) => {
let has_next_page = db_messages.len() as i64 > adjusted_limit_val;

// Take only up to the limit if there's an extra indicating a next page
let messages_o = if has_next_page {
&db_messages[..(adjusted_limit_val as usize)]
} else {
&db_messages[..]
};

let mut messages_mapped: Vec<Message> = vec![];

// Include the process as the first message if determined to be on the first page and has assignment
if include_process {
let process_message = Message::from_process(process_in.clone())?;
messages_mapped.push(process_message);
}

for db_message in messages_o.iter() {
let json = serde_json::from_value(db_message.message_data.clone())?;
let bytes: Vec<u8> = db_message.bundle.clone();
let mapped = Message::from_val(&json, bytes)?;
messages_mapped.push(mapped);
}

let paginated =
PaginatedMessages::from_messages(messages_mapped, has_next_page)?;
Ok(paginated)
}
Err(e) => Err(StoreErrorType::from(e)),
}
}
}

Expand Down
Binary file modified servers/su/su
Binary file not shown.

0 comments on commit 33a48b6

Please sign in to comment.