diff --git a/src/commands/pick.rs b/src/commands/pick.rs index a4a35aa26a..39700adc09 100644 --- a/src/commands/pick.rs +++ b/src/commands/pick.rs @@ -2,8 +2,9 @@ use crate::commands::WholeStreamCommand; use crate::context::CommandRegistry; use crate::data::base::select_fields; use crate::prelude::*; +use futures_util::pin_mut; use nu_errors::ShellError; -use nu_protocol::{Signature, SyntaxShape}; +use nu_protocol::{Primitive, ReturnSuccess, ReturnValue, Signature, SyntaxShape, UntaggedValue}; use nu_source::Tagged; #[derive(Deserialize)] @@ -49,9 +50,33 @@ fn pick( let fields: Vec<_> = fields.iter().map(|f| f.item.clone()).collect(); - let objects = input - .values - .map(move |value| select_fields(&value, &fields, value.tag.clone())); + let stream = async_stream! { + let values = input.values; + pin_mut!(values); - Ok(objects.from_input_stream()) + let mut empty = true; + + while let Some(value) = values.next().await { + let new_value = select_fields(&value, &fields, value.tag.clone()); + + if let UntaggedValue::Row(dict) = &new_value.value { + if dict + .entries + .values() + .any(|v| v.value != UntaggedValue::Primitive(Primitive::Nothing)) + { + empty = false; + yield ReturnSuccess::value(new_value); + } + } + } + + if empty { + yield Err(ShellError::labeled_error("None of the columns were found in the input", "could not find columns given", name)); + } + }; + + let stream: BoxStream<'static, ReturnValue> = stream.boxed(); + + Ok(stream.to_output_stream()) }