diff --git a/crates/nu-cli/src/commands/debug.rs b/crates/nu-cli/src/commands/debug.rs index 13361ff83b..f46f79bd81 100644 --- a/crates/nu-cli/src/commands/debug.rs +++ b/crates/nu-cli/src/commands/debug.rs @@ -29,26 +29,27 @@ impl WholeStreamCommand for Debug { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - debug_value(args, registry) + debug_value(args, registry).await } } -fn debug_value(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn debug_value( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (DebugArgs { raw }, mut input) = args.process(®istry).await?; - while let Some(v) = input.next().await { + let (DebugArgs { raw }, input) = args.process(®istry).await?; + Ok(input + .map(move |v| { if raw { - yield ReturnSuccess::value( + ReturnSuccess::value( UntaggedValue::string(format!("{:#?}", v)).into_untagged_value(), - ); + ) } else { - yield ReturnSuccess::debug_value(v); + ReturnSuccess::debug_value(v) } - } - }; - - Ok(stream.to_output_stream()) + }) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/default.rs b/crates/nu-cli/src/commands/default.rs index ffabe0bc2e..b18882126a 100644 --- a/crates/nu-cli/src/commands/default.rs +++ b/crates/nu-cli/src/commands/default.rs @@ -39,7 +39,7 @@ impl WholeStreamCommand for Default { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - default(args, registry) + default(args, registry).await } fn examples(&self) -> Vec { @@ -51,11 +51,15 @@ impl WholeStreamCommand for Default { } } -fn default(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn default( + args: CommandArgs, + registry: &CommandRegistry, +) -> Result { let registry = registry.clone(); - let stream = async_stream! { - let (DefaultArgs { column, value }, mut input) = args.process(®istry).await?; - while let Some(item) = input.next().await { + let (DefaultArgs { column, value }, input) = args.process(®istry).await?; + + Ok(input + .map(move |item| { let should_add = match item { Value { value: UntaggedValue::Row(ref r), @@ -66,17 +70,14 @@ fn default(args: CommandArgs, registry: &CommandRegistry) -> Result yield ReturnSuccess::value(new_value), - None => yield ReturnSuccess::value(item), + Some(new_value) => ReturnSuccess::value(new_value), + None => ReturnSuccess::value(item), } } else { - yield ReturnSuccess::value(item); + ReturnSuccess::value(item) } - - } - }; - - Ok(stream.to_output_stream()) + }) + .to_output_stream()) } #[cfg(test)] diff --git a/crates/nu-cli/src/commands/drop.rs b/crates/nu-cli/src/commands/drop.rs index baf2c40205..746eb9492d 100644 --- a/crates/nu-cli/src/commands/drop.rs +++ b/crates/nu-cli/src/commands/drop.rs @@ -2,7 +2,7 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::prelude::*; use nu_errors::ShellError; -use nu_protocol::{ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value}; +use nu_protocol::{Signature, SyntaxShape, UntaggedValue}; use nu_source::Tagged; pub struct Drop; @@ -35,7 +35,20 @@ impl WholeStreamCommand for Drop { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - drop(args, registry) + let (DropArgs { rows }, input) = args.process(®istry).await?; + let mut v: Vec<_> = input.into_vec().await; + + let rows_to_drop = if let Some(quantity) = rows { + *quantity as usize + } else { + 1 + }; + + for _ in 0..rows_to_drop { + v.pop(); + } + + Ok(futures::stream::iter(v).to_output_stream()) } fn examples(&self) -> Vec { @@ -57,29 +70,6 @@ impl WholeStreamCommand for Drop { } } -fn drop(args: CommandArgs, registry: &CommandRegistry) -> Result { - let registry = registry.clone(); - let stream = async_stream! { - let (DropArgs { rows }, mut input) = args.process(®istry).await?; - let v: Vec<_> = input.into_vec().await; - - let rows_to_drop = if let Some(quantity) = rows { - *quantity as usize - } else { - 1 - }; - - if rows_to_drop < v.len() { - let k = v.len() - rows_to_drop; - for x in v[0..k].iter() { - let y: Value = x.clone(); - yield ReturnSuccess::value(y) - } - } - }; - Ok(stream.to_output_stream()) -} - #[cfg(test)] mod tests { use super::Drop; diff --git a/crates/nu-cli/src/commands/du.rs b/crates/nu-cli/src/commands/du.rs index db26fbb811..4b3cd6aa85 100644 --- a/crates/nu-cli/src/commands/du.rs +++ b/crates/nu-cli/src/commands/du.rs @@ -78,7 +78,7 @@ impl WholeStreamCommand for Du { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - du(args, registry) + du(args, registry).await } fn examples(&self) -> Vec { @@ -90,76 +90,75 @@ impl WholeStreamCommand for Du { } } -fn du(args: CommandArgs, registry: &CommandRegistry) -> Result { +async fn du(args: CommandArgs, registry: &CommandRegistry) -> Result { let registry = registry.clone(); let tag = args.call_info.name_tag.clone(); let ctrl_c = args.ctrl_c.clone(); let ctrl_c_copy = ctrl_c.clone(); - let stream = async_stream! { - let (args, mut input): (DuArgs, _) = args.process(®istry).await?; - let exclude = args.exclude.map_or(Ok(None), move |x| { - Pattern::new(&x.item) - .map(Option::Some) - .map_err(|e| ShellError::labeled_error(e.msg, "glob error", x.tag.clone())) - })?; + let (args, _): (DuArgs, _) = args.process(®istry).await?; + let exclude = args.exclude.map_or(Ok(None), move |x| { + Pattern::new(&x.item) + .map(Option::Some) + .map_err(|e| ShellError::labeled_error(e.msg, "glob error", x.tag.clone())) + })?; - let include_files = args.all; - let paths = match args.path { - Some(p) => { - let p = p.item.to_str().expect("Why isn't this encoded properly?"); - glob::glob_with(p, GLOB_PARAMS) - } - None => glob::glob_with("*", GLOB_PARAMS), + let include_files = args.all; + let paths = match args.path { + Some(p) => { + let p = p.item.to_str().expect("Why isn't this encoded properly?"); + glob::glob_with(p, GLOB_PARAMS) } - .map_err(|e| ShellError::labeled_error(e.msg, "glob error", tag.clone()))? - .filter(move |p| { - if include_files { - true - } else { - match p { - Ok(f) if f.is_dir() => true, - Err(e) if e.path().is_dir() => true, - _ => false, - } - } - }) - .map(|v| v.map_err(glob_err_into)); - - let all = args.all; - let deref = args.deref; - let max_depth = args.max_depth.map(|f| f.item); - let min_size = args.min_size.map(|f| f.item); - - let params = DirBuilder { - tag: tag.clone(), - min: min_size, - deref, - exclude, - all, - }; - - let mut inp = futures::stream::iter(paths).interruptible(ctrl_c.clone()); - - while let Some(path) = inp.next().await { - match path { - Ok(p) => { - if p.is_dir() { - yield Ok(ReturnSuccess::Value( - DirInfo::new(p, ¶ms, max_depth, ctrl_c.clone()).into(), - )) - } else { - for v in FileInfo::new(p, deref, tag.clone()).into_iter() { - yield Ok(ReturnSuccess::Value(v.into())); - } - } - } - Err(e) => yield Err(e), + None => glob::glob_with("*", GLOB_PARAMS), + } + .map_err(|e| ShellError::labeled_error(e.msg, "glob error", tag.clone()))? + .filter(move |p| { + if include_files { + true + } else { + match p { + Ok(f) if f.is_dir() => true, + Err(e) if e.path().is_dir() => true, + _ => false, } } + }) + .map(|v| v.map_err(glob_err_into)); + + let all = args.all; + let deref = args.deref; + let max_depth = args.max_depth.map(|f| f.item); + let min_size = args.min_size.map(|f| f.item); + + let params = DirBuilder { + tag: tag.clone(), + min: min_size, + deref, + exclude, + all, }; - Ok(stream.interruptible(ctrl_c_copy).to_output_stream()) + let inp = futures::stream::iter(paths); + + Ok(inp + .flat_map(move |path| match path { + Ok(p) => { + let mut output = vec![]; + if p.is_dir() { + output.push(Ok(ReturnSuccess::Value( + DirInfo::new(p, ¶ms, max_depth, ctrl_c.clone()).into(), + ))); + } else { + for v in FileInfo::new(p, deref, tag.clone()).into_iter() { + output.push(Ok(ReturnSuccess::Value(v.into()))); + } + } + futures::stream::iter(output) + } + Err(e) => futures::stream::iter(vec![Err(e)]), + }) + .interruptible(ctrl_c_copy) + .to_output_stream()) } pub struct DirBuilder {