diff --git a/crates/nu-command/src/default_context.rs b/crates/nu-command/src/default_context.rs index 91c13590f6..5890d791cb 100644 --- a/crates/nu-command/src/default_context.rs +++ b/crates/nu-command/src/default_context.rs @@ -7,7 +7,7 @@ use nu_protocol::{ use crate::{ where_::Where, Alias, Benchmark, BuildString, Def, Do, Each, External, For, Git, GitCheckout, - If, Length, Let, LetEnv, ListGitBranches, Ls, Table, + If, Length, Let, LetEnv, Lines, ListGitBranches, Ls, Table, }; pub fn create_default_context() -> Rc> { @@ -50,6 +50,8 @@ pub fn create_default_context() -> Rc> { working_set.add_decl(Box::new(External)); + working_set.add_decl(Box::new(Lines)); + // This is a WIP proof of concept working_set.add_decl(Box::new(ListGitBranches)); working_set.add_decl(Box::new(Git)); diff --git a/crates/nu-command/src/lib.rs b/crates/nu-command/src/lib.rs index 96f0839fb3..1f150c2c65 100644 --- a/crates/nu-command/src/lib.rs +++ b/crates/nu-command/src/lib.rs @@ -12,6 +12,7 @@ mod if_; mod length; mod let_; mod let_env; +mod lines; mod list_git_branches; mod ls; mod run_external; @@ -32,6 +33,7 @@ pub use if_::If; pub use length::Length; pub use let_::Let; pub use let_env::LetEnv; +pub use lines::Lines; pub use list_git_branches::ListGitBranches; pub use ls::Ls; pub use run_external::External; diff --git a/crates/nu-command/src/lines.rs b/crates/nu-command/src/lines.rs new file mode 100644 index 0000000000..3694f26e20 --- /dev/null +++ b/crates/nu-command/src/lines.rs @@ -0,0 +1,91 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use nu_protocol::ast::Call; +use nu_protocol::engine::{Command, EvaluationContext}; +use nu_protocol::{Signature, Span, Value, ValueStream}; + +pub struct Lines; + +const SPLIT_CHAR: char = '\n'; + +impl Command for Lines { + fn name(&self) -> &str { + "lines" + } + + fn usage(&self) -> &str { + "Converts input to lines" + } + + fn signature(&self) -> nu_protocol::Signature { + Signature::build("lines") + } + + fn run( + &self, + _context: &EvaluationContext, + _call: &Call, + input: Value, + ) -> Result { + let value = match input { + #[allow(clippy::needless_collect)] + // Collect is needed because the string may not live long enough for + // the Rc structure to continue using it. If split could take ownership + // of the split values, then this wouldn't be needed + Value::String { val, span } => { + let lines = val + .split(SPLIT_CHAR) + .map(|s| s.to_string()) + .collect::>(); + + let iter = lines.into_iter().filter_map(move |s| { + if !s.is_empty() { + Some(Value::String { val: s, span }) + } else { + None + } + }); + + Value::Stream { + stream: ValueStream(Rc::new(RefCell::new(iter))), + span: Span::unknown(), + } + } + Value::Stream { stream, span: _ } => { + let iter = stream + .into_iter() + .filter_map(|value| { + if let Value::String { val, span } = value { + let inner = val + .split(SPLIT_CHAR) + .filter_map(|s| { + if !s.is_empty() { + Some(Value::String { + val: s.into(), + span, + }) + } else { + None + } + }) + .collect::>(); + + Some(inner) + } else { + None + } + }) + .flatten(); + + Value::Stream { + stream: ValueStream(Rc::new(RefCell::new(iter))), + span: Span::unknown(), + } + } + _ => unimplemented!(), + }; + + Ok(value) + } +} diff --git a/crates/nu-command/src/run_external.rs b/crates/nu-command/src/run_external.rs index e89e471eda..557d9d7408 100644 --- a/crates/nu-command/src/run_external.rs +++ b/crates/nu-command/src/run_external.rs @@ -1,14 +1,22 @@ +use std::borrow::Cow; +use std::cell::RefCell; use std::env; -use std::process::Command as CommandSys; +use std::io::{BufRead, BufReader, Write}; +use std::process::{ChildStdin, Command as CommandSys, Stdio}; +use std::rc::Rc; +use std::sync::mpsc; use nu_protocol::{ ast::{Call, Expression}, engine::{Command, EvaluationContext}, ShellError, Signature, SyntaxShape, Value, }; +use nu_protocol::{Span, ValueStream}; use nu_engine::eval_expression; +const OUTPUT_BUFFER_SIZE: usize = 8192; + pub struct External; impl Command for External { @@ -21,7 +29,9 @@ impl Command for External { } fn signature(&self) -> nu_protocol::Signature { - Signature::build("run_external").rest("rest", SyntaxShape::Any, "external command to run") + Signature::build("run_external") + .switch("last_expression", "last_expression", None) + .rest("rest", SyntaxShape::Any, "external command to run") } fn run( @@ -39,6 +49,7 @@ pub struct ExternalCommand<'call, 'contex> { pub name: &'call Expression, pub args: &'call [Expression], pub context: &'contex EvaluationContext, + pub last_expression: bool, } impl<'call, 'contex> ExternalCommand<'call, 'contex> { @@ -54,6 +65,7 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { name: &call.positional[0], args: &call.positional[1..], context, + last_expression: call.has_flag("last_expression"), }) } @@ -70,7 +82,7 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { .collect() } - pub fn run_with_input(&self, _input: Value) -> Result { + pub fn run_with_input(&self, input: Value) -> Result { let mut process = self.create_command(); // TODO. We don't have a way to know the current directory @@ -81,18 +93,120 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { let envs = self.context.stack.get_env_vars(); process.envs(envs); + // If the external is not the last command, its output will get piped + // either as a string or binary + if !self.last_expression { + process.stdout(Stdio::piped()); + } + + // If there is an input from the pipeline. The stdin from the process + // is piped so it can be used to send the input information + if let Value::String { .. } = input { + process.stdin(Stdio::piped()); + } + + if let Value::Stream { .. } = input { + process.stdin(Stdio::piped()); + } + match process.spawn() { Err(err) => Err(ShellError::ExternalCommand( format!("{}", err), self.name.span, )), - Ok(mut child) => match child.wait() { - Err(err) => Err(ShellError::ExternalCommand( - format!("{}", err), - self.name.span, - )), - Ok(_) => Ok(Value::nothing()), - }, + Ok(mut child) => { + // if there is a string or a stream, that is sent to the pipe std + match input { + Value::Nothing { span: _ } => (), + Value::String { val, span: _ } => { + if let Some(mut stdin_write) = child.stdin.take() { + self.write_to_stdin(&mut stdin_write, val.as_bytes())? + } + } + Value::Binary { val, span: _ } => { + if let Some(mut stdin_write) = child.stdin.take() { + self.write_to_stdin(&mut stdin_write, &val)? + } + } + Value::Stream { stream, span: _ } => { + if let Some(mut stdin_write) = child.stdin.take() { + for value in stream { + match value { + Value::String { val, span: _ } => { + self.write_to_stdin(&mut stdin_write, val.as_bytes())? + } + Value::Binary { val, span: _ } => { + self.write_to_stdin(&mut stdin_write, &val)? + } + _ => continue, + } + } + } + } + _ => { + return Err(ShellError::ExternalCommand( + "Input is not string or binary".to_string(), + self.name.span, + )) + } + } + + // If this external is not the last expression, then its output is piped to a channel + // and we create a ValueStream that can be consumed + let value = if !self.last_expression { + let (tx, rx) = mpsc::sync_channel(0); + let stdout = child.stdout.take().ok_or_else(|| { + ShellError::ExternalCommand( + "Error taking stdout from external".to_string(), + self.name.span, + ) + })?; + + std::thread::spawn(move || { + // Stdout is read using the Buffer reader. It will do so until there is an + // error or there are no more bytes to read + let mut buf_read = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, stdout); + while let Ok(bytes) = buf_read.fill_buf() { + if bytes.is_empty() { + break; + } + + // The Cow generated from the function represents the conversion + // from bytes to String. If no replacements are required, then the + // borrowed value is a proper UTF-8 string. The Owned option represents + // a string where the values had to be replaced, thus marking it as bytes + let data = match String::from_utf8_lossy(bytes) { + Cow::Borrowed(s) => Data::String(s.into()), + Cow::Owned(_) => Data::Bytes(bytes.to_vec()), + }; + + let length = bytes.len(); + buf_read.consume(length); + + match tx.send(data) { + Ok(_) => continue, + Err(_) => break, + } + } + }); + + // The ValueStream is consumed by the next expression in the pipeline + Value::Stream { + stream: ValueStream(Rc::new(RefCell::new(ChannelReceiver::new(rx)))), + span: Span::unknown(), + } + } else { + Value::nothing() + }; + + match child.wait() { + Err(err) => Err(ShellError::ExternalCommand( + format!("{}", err), + self.name.span, + )), + Ok(_) => Ok(value), + } + } } } @@ -120,4 +234,54 @@ impl<'call, 'contex> ExternalCommand<'call, 'contex> { process } } + + fn write_to_stdin(&self, stdin_write: &mut ChildStdin, val: &[u8]) -> Result<(), ShellError> { + if stdin_write.write(val).is_err() { + Err(ShellError::ExternalCommand( + "Error writing input to stdin".to_string(), + self.name.span, + )) + } else { + Ok(()) + } + } +} + +// The piped data from stdout from the external command can be either String +// or binary. We use this enum to pass the data from the spawned process +enum Data { + String(String), + Bytes(Vec), +} + +// Receiver used for the ValueStream +// It implements iterator so it can be used as a ValueStream +struct ChannelReceiver { + rx: mpsc::Receiver, +} + +impl ChannelReceiver { + pub fn new(rx: mpsc::Receiver) -> Self { + Self { rx } + } +} + +impl Iterator for ChannelReceiver { + type Item = Value; + + fn next(&mut self) -> Option { + match self.rx.recv() { + Ok(v) => match v { + Data::String(s) => Some(Value::String { + val: s, + span: Span::unknown(), + }), + Data::Bytes(b) => Some(Value::Binary { + val: b, + span: Span::unknown(), + }), + }, + Err(_) => None, + } + } } diff --git a/crates/nu-engine/src/eval.rs b/crates/nu-engine/src/eval.rs index 30db23220d..29ba6fc3e3 100644 --- a/crates/nu-engine/src/eval.rs +++ b/crates/nu-engine/src/eval.rs @@ -73,6 +73,7 @@ fn eval_external( name: &Span, args: &[Span], input: Value, + last_expression: bool, ) -> Result { let engine_state = context.engine_state.borrow(); @@ -98,6 +99,10 @@ fn eval_external( }) .collect(); + if last_expression { + call.named.push(("last_expression".into(), None)) + } + command.run(context, &call, input) } @@ -158,7 +163,9 @@ pub fn eval_expression( } Expr::RowCondition(_, expr) => eval_expression(context, expr), Expr::Call(call) => eval_call(context, call, Value::nothing()), - Expr::ExternalCall(name, args) => eval_external(context, name, args, Value::nothing()), + Expr::ExternalCall(name, args) => { + eval_external(context, name, args, Value::nothing(), true) + } Expr::Operator(_) => Ok(Value::Nothing { span: expr.span }), Expr::BinaryOp(lhs, op, rhs) => { let op_span = op.span; @@ -239,9 +246,9 @@ pub fn eval_block( block: &Block, mut input: Value, ) -> Result { - for stmt in &block.stmts { + for stmt in block.stmts.iter() { if let Statement::Pipeline(pipeline) = stmt { - for elem in &pipeline.expressions { + for (i, elem) in pipeline.expressions.iter().enumerate() { match elem { Expression { expr: Expr::Call(call), @@ -253,7 +260,13 @@ pub fn eval_block( expr: Expr::ExternalCall(name, args), .. } => { - input = eval_external(context, name, args, input)?; + input = eval_external( + context, + name, + args, + input, + i == pipeline.expressions.len() - 1, + )?; } elem => { diff --git a/crates/nu-protocol/src/ty.rs b/crates/nu-protocol/src/ty.rs index 7e6c2aa5e9..6338a6e476 100644 --- a/crates/nu-protocol/src/ty.rs +++ b/crates/nu-protocol/src/ty.rs @@ -20,6 +20,7 @@ pub enum Type { ValueStream, Unknown, Error, + Binary, } impl Display for Type { @@ -43,6 +44,7 @@ impl Display for Type { Type::ValueStream => write!(f, "value stream"), Type::Unknown => write!(f, "unknown"), Type::Error => write!(f, "error"), + Type::Binary => write!(f, "binary"), } } } diff --git a/crates/nu-protocol/src/value/mod.rs b/crates/nu-protocol/src/value/mod.rs index 5d81e03f91..bdb31de395 100644 --- a/crates/nu-protocol/src/value/mod.rs +++ b/crates/nu-protocol/src/value/mod.rs @@ -59,6 +59,10 @@ pub enum Value { Error { error: ShellError, }, + Binary { + val: Vec, + span: Span, + }, } impl Value { @@ -83,6 +87,7 @@ impl Value { Value::Block { span, .. } => *span, Value::Stream { span, .. } => *span, Value::Nothing { span, .. } => *span, + Value::Binary { span, .. } => *span, } } @@ -100,6 +105,7 @@ impl Value { Value::Block { span, .. } => *span = new_span, Value::Nothing { span, .. } => *span = new_span, Value::Error { .. } => {} + Value::Binary { span, .. } => *span = new_span, } self @@ -121,6 +127,7 @@ impl Value { Value::Block { .. } => Type::Block, Value::Stream { .. } => Type::ValueStream, Value::Error { .. } => Type::Error, + Value::Binary { .. } => Type::Binary, } } @@ -159,6 +166,7 @@ impl Value { Value::Block { val, .. } => format!("", val), Value::Nothing { .. } => String::new(), Value::Error { error } => format!("{:?}", error), + Value::Binary { val, .. } => format!("{:?}", val), } } diff --git a/crates/nu-protocol/src/value/stream.rs b/crates/nu-protocol/src/value/stream.rs index ad5a27208f..ca56f39f05 100644 --- a/crates/nu-protocol/src/value/stream.rs +++ b/crates/nu-protocol/src/value/stream.rs @@ -30,8 +30,7 @@ impl Iterator for ValueStream { fn next(&mut self) -> Option { { - let mut iter = self.0.borrow_mut(); - iter.next() + self.0.borrow_mut().next() } } }