Skip to content

Commit

Permalink
Merge pull request #2 from fetlife/distribute-to-indexes
Browse files Browse the repository at this point in the history
Distribute indexes
  • Loading branch information
Antti authored Mar 21, 2024
2 parents bd50a76 + ea1ac63 commit 98c1339
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 14 deletions.
2 changes: 1 addition & 1 deletion lib/distributing_iterator/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module DistributingIterator
VERSION = "0.0.1"
VERSION = "0.0.2"
end
8 changes: 4 additions & 4 deletions src/distribute_csv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{Context, Result};
use std::collections::VecDeque;
use anyhow::{Result, Context};

use crate::distributing_iterator;
use csv::ByteRecord;
Expand All @@ -11,14 +11,14 @@ pub fn distribute(data: &str, field: &str, spread: u64) -> Result<String> {
let headers = csv.headers()?.clone();
let field_index = headers
.iter()
.position(|header| header == field).context(format!("field `{field}` not found in CSV headers"))?;
.position(|header| header == field)
.context(format!("field `{field}` not found in CSV headers"))?;
let data = csv
.into_byte_records()
.map(|record| record.map_err(anyhow::Error::from))
.collect::<Result<VecDeque<_>>>()?;
let id_func = move |item: &ByteRecord| item[field_index].to_vec();
let iterator =
distributing_iterator::DistributingIterator::new(data, spread as usize, id_func);
let iterator = distributing_iterator::DistributingIterator::new(data, spread as usize, id_func);
let data: Vec<_> = iterator.collect();
let mut wtr = csv::Writer::from_writer(vec![]);
wtr.write_record(&headers).context("writing headers")?;
Expand Down
133 changes: 133 additions & 0 deletions src/distribute_indexes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use fnv::FnvHashMap;
use indexmap::IndexMap;
use std::collections::VecDeque;

pub fn distribute<'a, T: 'a, ID>(
data: &'a [T],
mut spread: usize,
id_func: impl Fn(&'a T) -> ID + Send + 'static,
) -> Vec<usize>
where
ID: Eq + std::hash::Hash,
{
let mut result = Vec::with_capacity(data.len());
let mut queue_per_id: FnvHashMap<ID, VecDeque<usize>> = Default::default();
let mut last_pos: IndexMap<ID, usize> = Default::default();
let mut output_pos = 0;
let mut data_pos = 0;
let mut iterator_reached_end = false;

loop {
let item = loop {
let mut result = None;
let mut adjust_spread = false;
let sorted_spreadable_ids = last_pos
.iter()
.filter(|(_id, &last_pos)| output_pos - last_pos >= spread)
.map(|(id, _last_pos)| id);
for id in sorted_spreadable_ids {
match queue_per_id.get_mut(id) {
Some(queue) => {
if let Some(item) = queue.pop_front() {
if iterator_reached_end && queue.is_empty() {
queue_per_id.remove(id);
adjust_spread = true
}
result = Some(item);
break;
}
}
None => continue,
}
}
if result.is_some() {
if adjust_spread {
spread = calculate_spread(&queue_per_id);
}
break result;
}

if iterator_reached_end {
if queue_per_id.values().flatten().any(|_| true) {
panic!(
"Nothing can be returned even though the queue is not empty. This is a bug"
);
} else {
break None;
}
}

let current_data_pos = data_pos;
data_pos += 1;

match data.get(current_data_pos) {
Some(item) => {
let id = (id_func)(item);
if !last_pos.contains_key(&id) {
break Some(current_data_pos);
} else {
queue_per_id
.entry(id)
.or_insert_with(|| VecDeque::with_capacity(100))
.push_back(current_data_pos);
}
}
None => {
spread = calculate_spread(&queue_per_id);
iterator_reached_end = true;
}
}
};
if let Some(output_idx) = item {
let id = (id_func)(&data[output_idx]);
last_pos.shift_remove(&id);
last_pos.insert(id, output_pos);
result.push(output_idx);
output_pos += 1;
} else {
break;
}
}
result
}

/// Distribute items that are themselves IDs
pub fn distribute_ids<T>(data: &[T], spread: usize) -> Vec<usize> where T: Eq + std::hash::Hash, {
distribute(data, spread, |item| item)
}

fn calculate_spread<T, ID>(queue_per_id: &FnvHashMap<ID, VecDeque<T>>) -> usize {
queue_per_id
.iter()
.filter(|(_id, queue)| !queue.is_empty())
.count()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_distribute() {
let data = vec![
"1", "1", "1", "2", "3", "3", "2", "2", "3", "3", "2", "3", "2", "3", "3",
];
let result = distribute(&data, 3, |item| item.parse::<usize>().unwrap());
assert_eq!(
result,
vec![0, 3, 4, 1, 6, 5, 2, 7, 8, 10, 9, 12, 11, 13, 14]
);
}

#[test]
fn test_distribute2() {
let data = vec!["Picture", "Post", "Video", "Video", "Picture", "Post", "Picture", "Picture", "Video"];
let result = distribute_ids(&data, 3);
let result_with_labels = result
.iter()
.map(|idx| data[*idx])
.collect::<Vec<_>>();
assert_eq!(result_with_labels, vec!["Picture", "Post", "Video", "Picture", "Post", "Video", "Picture", "Video", "Picture"]);
assert_eq!(result, vec![0, 1, 2, 4, 5, 3, 6, 8, 7]);
}
}
6 changes: 3 additions & 3 deletions src/distributing_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ where
}
}
if result.is_some() {
if adjust_spread {
self.spread = Self::calculate_spread(&queue_per_id);
}
break result;
}

Expand Down Expand Up @@ -133,9 +136,6 @@ where
}
}
};
if adjust_spread {
self.spread = Self::calculate_spread(&queue_per_id);
}
self.queue_per_id = Some(queue_per_id);
result
}
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
mod distribute_csv;
mod distribute_indexes;
mod distributing_iterator;

#[cfg(feature = "magnus")]
mod ruby_ext;

pub use distribute_csv::distribute as distribute_csv;
pub use distribute_indexes::distribute;
pub use distribute_indexes::distribute_ids;
pub use distributing_iterator::DistributingIterator;
15 changes: 9 additions & 6 deletions src/ruby_ext.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use magnus::{error::Result, function, exception, Error};
use magnus::{error::Result, exception, function, Error, Value};

use crate::distribute_csv;
use crate::{distribute_ids, distribute_csv};

fn distribute_csv_ruby(data: String, field: String, spread: u64) -> Result<String> {
match distribute_csv(&data, &field, spread) {
Ok(result) => Ok(result),
Err(e) => {
Err(Error::new(exception::standard_error(), format!("{:?}", e)))
}
Err(e) => Err(Error::new(exception::standard_error(), format!("{:?}", e))),
}
}

fn distribute_indexes_ruby(data: Vec<String>, spread: usize) -> Result<Vec<usize>> {
Ok(distribute_ids(&data, spread))
}

#[magnus::init]
fn init(ruby: &magnus::Ruby) -> Result<()> {
let module = ruby.define_module("DistributingIterator")?;
module.define_module_function("distribute", function!(distribute_csv_ruby, 3))?;
module.define_module_function("distribute_csv", function!(distribute_csv_ruby, 3))?;
module.define_module_function("distribute_indexes", function!(distribute_indexes_ruby, 2))?;
Ok(())
}

0 comments on commit 98c1339

Please sign in to comment.