From c3e0e8eb5c95b9fc4071dd9eb149b33b9f07d548 Mon Sep 17 00:00:00 2001 From: JT <547158+jntrnr@users.noreply.github.com> Date: Sun, 6 Feb 2022 19:28:09 -0500 Subject: [PATCH] Add par-each group (#965) --- crates/nu-command/src/default_context.rs | 1 + crates/nu-command/src/filters/mod.rs | 2 + .../nu-command/src/filters/par_each_group.rs | 138 ++++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 crates/nu-command/src/filters/par_each_group.rs diff --git a/crates/nu-command/src/default_context.rs b/crates/nu-command/src/default_context.rs index 74c6c7e142..73bf92ea6f 100644 --- a/crates/nu-command/src/default_context.rs +++ b/crates/nu-command/src/default_context.rs @@ -85,6 +85,7 @@ pub fn create_default_context(cwd: impl AsRef) -> EngineState { Lines, Nth, ParEach, + ParEachGroup, Prepend, Range, Reduce, diff --git a/crates/nu-command/src/filters/mod.rs b/crates/nu-command/src/filters/mod.rs index adfe753083..66f20a5501 100644 --- a/crates/nu-command/src/filters/mod.rs +++ b/crates/nu-command/src/filters/mod.rs @@ -23,6 +23,7 @@ mod merge; mod move_; mod nth; mod par_each; +mod par_each_group; mod prepend; mod range; mod reduce; @@ -68,6 +69,7 @@ pub use merge::Merge; pub use move_::Move; pub use nth::Nth; pub use par_each::ParEach; +pub use par_each_group::ParEachGroup; pub use prepend::Prepend; pub use range::Range; pub use reduce::Reduce; diff --git a/crates/nu-command/src/filters/par_each_group.rs b/crates/nu-command/src/filters/par_each_group.rs new file mode 100644 index 0000000000..48ec051df9 --- /dev/null +++ b/crates/nu-command/src/filters/par_each_group.rs @@ -0,0 +1,138 @@ +use nu_engine::{eval_block_with_redirect, CallExt}; +use nu_protocol::ast::Call; +use nu_protocol::engine::{CaptureBlock, Command, EngineState, Stack}; +use nu_protocol::{ + Category, Example, IntoInterruptiblePipelineData, IntoPipelineData, PipelineData, Signature, + Spanned, SyntaxShape, Value, +}; +use rayon::prelude::*; + +#[derive(Clone)] +pub struct ParEachGroup; + +impl Command for ParEachGroup { + fn name(&self) -> &str { + "par-each group" + } + + fn signature(&self) -> Signature { + Signature::build("par-each group") + .required("group_size", SyntaxShape::Int, "the size of each group") + .required( + "block", + SyntaxShape::Block(Some(vec![SyntaxShape::Any])), + "the block to run on each group", + ) + .category(Category::Filters) + } + + fn usage(&self) -> &str { + "Runs a block on groups of `group_size` rows of a table at a time." + } + + fn examples(&self) -> Vec { + vec![Example { + example: "echo [1 2 3 4] | par-each group 2 { $it.0 + $it.1 }", + description: "Multiplies elements in list", + result: None, + }] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + let group_size: Spanned = call.req(engine_state, stack, 0)?; + let capture_block: CaptureBlock = call.req(engine_state, stack, 1)?; + let ctrlc = engine_state.ctrlc.clone(); + let span = call.head; + + let stack = stack.captures_to_stack(&capture_block.captures); + + //FIXME: add in support for external redirection when engine-q supports it generally + + let each_group_iterator = EachGroupIterator { + group_size: group_size.item, + input: Box::new(input.into_iter()), + }; + + Ok(each_group_iterator + .par_bridge() + .map(move |x| { + let block = engine_state.get_block(capture_block.block_id); + + let mut stack = stack.clone(); + + if let Some(var) = block.signature.get_positional(0) { + if let Some(var_id) = &var.var_id { + stack.add_var(*var_id, Value::List { vals: x, span }); + } + } + + match eval_block_with_redirect( + engine_state, + &mut stack, + block, + PipelineData::new(span), + ) { + Ok(v) => v, + Err(error) => Value::Error { error }.into_pipeline_data(), + } + }) + .collect::>() + .into_iter() + .flatten() + .into_pipeline_data(ctrlc)) + } +} + +struct EachGroupIterator { + group_size: usize, + input: Box + Send>, +} + +impl Iterator for EachGroupIterator { + type Item = Vec; + + fn next(&mut self) -> Option { + let mut group = vec![]; + let mut current_count = 0; + + loop { + let item = self.input.next(); + + match item { + Some(v) => { + group.push(v); + + current_count += 1; + if current_count >= self.group_size { + break; + } + } + None => break, + } + } + + if group.is_empty() { + return None; + } + + Some(group) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_examples() { + use crate::test_examples; + + test_examples(ParEachGroup {}) + } +}