Skip to content

Commit

Permalink
start adding compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
devanbenz committed Apr 28, 2024
1 parent 8f13731 commit f25e0ac
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 37 deletions.
21 changes: 21 additions & 0 deletions src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,24 @@
// pub trait Compaction {
// fn compact();
// }

use std::fs;
use std::path::PathBuf;

pub struct CompactionConfig {
pub levels: usize,
pub file_limit: usize,
}

impl CompactionConfig {
pub fn new(levels: usize, file_limit: usize, sstable_dir: PathBuf) -> Self {
for level in 0..levels {
fs::create_dir(sstable_dir)
}
Self { levels, file_limit }
}

pub fn compact(&self, sstable_dir: PathBuf) {
for level in 0..self.levels {}
}
}
69 changes: 32 additions & 37 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,42 @@ use resp::{Decoder, Value};
use std::io::{BufReader, Read, Write};
use std::net::{TcpListener, TcpStream};

fn process_cmd<'a>(state: &mut GlobalState, cmd: &[u8]) -> Result<Vec<u8>, MemTableError> {
let mut memtable = &mut state.memtable;
fn process_cmd(state: &mut GlobalState, cmd: &[u8]) -> Result<Vec<u8>, MemTableError> {
let memtable = &mut state.memtable;
let indexes = &mut state.indexes;

let mut resp_parser = Decoder::new(BufReader::new(cmd));
let val = resp_parser.decode().expect("cannot decode value");

match val {
Value::Array(v) => {
let mut val_iter = v.iter();
while let Some(val) = val_iter.next() {
match val.to_owned() {
Value::Bulk(v) => match v.as_str() {
"get" => {
if let Some(key) = val_iter.next() {
let value = memtable.get(key.to_string_pretty(), indexes)?;
return Ok(format!("+{}\r\n", value.as_str()).into_bytes());
} else {
return Err(MemTableError::GetError);
}
}
"set" => {
let key = val_iter.next().expect("could not get key");
let value = val_iter.next().expect("could not get value");
memtable.set(key.to_string_pretty(), value.to_string_pretty())?;
if memtable.tree.len() > state.sstable_threshold {
memtable.flush_to_sstable(&mut state.indexes);
}
if let Value::Array(v) = val {
let mut val_iter = v.iter();
match val_iter.next() {
Some(Value::Bulk(v)) => match v.as_str() {
"get" => {
if let Some(key) = val_iter.next() {
let value = memtable.get(key.to_string_pretty(), indexes)?;
return Ok(format!("+{}\r\n", value.as_str()).into_bytes());
} else {
return Err(MemTableError::GetError);
}
}
"set" => {
let key = val_iter.next().expect("could not get key");
let value = val_iter.next().expect("could not get value");
memtable.set(key.to_string_pretty(), value.to_string_pretty())?;
if memtable.tree.len() > state.sstable_threshold {
memtable.flush_to_sstable(&mut state.indexes);
}

return Ok(Vec::from("+OK\r\n".as_bytes()));
}
"COMMAND" => {
return Ok(Vec::from("+OK\r\n".as_bytes()));
}
_ => unimplemented!(),
},
_ => unimplemented!(),
};
}
}
_ => unimplemented!(),
return Ok(Vec::from("+OK\r\n".as_bytes()));
}
"COMMAND" => {
return Ok(Vec::from("+OK\r\n".as_bytes()));
}
_ => unimplemented!(),
},
_ => unimplemented!(),
};
};

Err(MemTableError::SetError)
Expand All @@ -58,10 +53,10 @@ fn handle_client(mut stream: TcpStream, state: &mut GlobalState) {
let cmd = &buf[..bytes_read];
match process_cmd(state, cmd) {
Ok(ret) => {
stream.write(&*ret).expect("could not write to stream");
let _ = stream.write(&ret).expect("could not write to stream");
}
Err(_) => {
stream
let _ = stream
.write(b"-There was an error\r\n")
.expect("could not write to stream");
}
Expand Down

0 comments on commit f25e0ac

Please sign in to comment.