From d7b1480ad04e5deb43728a69b3bc399a63188044 Mon Sep 17 00:00:00 2001 From: Jonathan Turner Date: Sat, 13 Jun 2020 16:54:35 -0700 Subject: [PATCH] Another batch of removing async_stream (#1981) --- crates/nu-cli/src/commands/keep_until.rs | 133 +++++++++++---------- crates/nu-cli/src/commands/keep_while.rs | 133 +++++++++++---------- crates/nu-cli/src/commands/skip_until.rs | 135 +++++++++++----------- crates/nu-cli/src/commands/skip_while.rs | 135 +++++++++++----------- crates/nu-cli/src/stream/interruptible.rs | 8 +- 5 files changed, 266 insertions(+), 278 deletions(-) diff --git a/crates/nu-cli/src/commands/keep_until.rs b/crates/nu-cli/src/commands/keep_until.rs index 5c20677d2a..326a380807 100644 --- a/crates/nu-cli/src/commands/keep_until.rs +++ b/crates/nu-cli/src/commands/keep_until.rs @@ -3,9 +3,7 @@ use crate::evaluate::evaluate_baseline_expr; use crate::prelude::*; use log::trace; use nu_errors::ShellError; -use nu_protocol::{ - hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value}; pub struct KeepUntil; @@ -34,80 +32,81 @@ impl WholeStreamCommand for KeepUntil { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - let registry = registry.clone(); - let scope = args.call_info.scope.clone(); - let stream = async_stream! { - let mut call_info = args.evaluate_once(®istry).await?; + let registry = Arc::new(registry.clone()); + let scope = Arc::new(args.call_info.scope.clone()); - let block = call_info.args.expect_nth(0)?.clone(); + let call_info = args.evaluate_once(®istry).await?; - let condition = match block { - Value { - value: UntaggedValue::Block(block), - tag, - } => { - if block.block.len() != 1 { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - match block.block[0].list.get(0) { - Some(item) => match item { - ClassifiedCommand::Expr(expr) => expr.clone(), - _ => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - }, - None => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - } - } - Value { tag, .. } => { - yield Err(ShellError::labeled_error( + let block = call_info.args.expect_nth(0)?.clone(); + + let condition = Arc::new(match block { + Value { + value: UntaggedValue::Block(block), + tag, + } => { + if block.block.len() != 1 { + return Err(ShellError::labeled_error( "Expected a condition", "expected a condition", tag, )); - return; } - }; - - while let Some(item) = call_info.input.next().await { - let condition = condition.clone(); - trace!("ITEM = {:?}", item); - let result = - evaluate_baseline_expr(&*condition, ®istry, &item, &scope.vars, &scope.env) - .await; - trace!("RESULT = {:?}", result); - - let return_value = match result { - Ok(ref v) if v.is_true() => false, - _ => true, - }; - - if return_value { - yield ReturnSuccess::value(item); - } else { - break; + match block.block[0].list.get(0) { + Some(item) => match item { + ClassifiedCommand::Expr(expr) => expr.clone(), + _ => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + }, + None => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } } } - }; + Value { tag, .. } => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + }); - Ok(stream.to_output_stream()) + Ok(call_info + .input + .take_while(move |item| { + let condition = condition.clone(); + let registry = registry.clone(); + let scope = scope.clone(); + let item = item.clone(); + trace!("ITEM = {:?}", item); + + async move { + let result = evaluate_baseline_expr( + &*condition, + ®istry, + &item, + &scope.vars, + &scope.env, + ) + .await; + trace!("RESULT = {:?}", result); + + match result { + Ok(ref v) if v.is_true() => false, + _ => true, + } + } + }) + .to_output_stream()) } } diff --git a/crates/nu-cli/src/commands/keep_while.rs b/crates/nu-cli/src/commands/keep_while.rs index 8bf43ab46a..a449bf3813 100644 --- a/crates/nu-cli/src/commands/keep_while.rs +++ b/crates/nu-cli/src/commands/keep_while.rs @@ -3,9 +3,7 @@ use crate::evaluate::evaluate_baseline_expr; use crate::prelude::*; use log::trace; use nu_errors::ShellError; -use nu_protocol::{ - hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value}; pub struct KeepWhile; @@ -34,80 +32,81 @@ impl WholeStreamCommand for KeepWhile { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - let registry = registry.clone(); - let scope = args.call_info.scope.clone(); - let stream = async_stream! { - let mut call_info = args.evaluate_once(®istry).await?; + let registry = Arc::new(registry.clone()); + let scope = Arc::new(args.call_info.scope.clone()); + let call_info = args.evaluate_once(®istry).await?; - let block = call_info.args.expect_nth(0)?.clone(); + let block = call_info.args.expect_nth(0)?.clone(); - let condition = match block { - Value { - value: UntaggedValue::Block(block), - tag, - } => { - if block.block.len() != 1 { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - match block.block[0].list.get(0) { - Some(item) => match item { - ClassifiedCommand::Expr(expr) => expr.clone(), - _ => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - }, - None => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - } - } - Value { tag, .. } => { - yield Err(ShellError::labeled_error( + let condition = Arc::new(match block { + Value { + value: UntaggedValue::Block(block), + tag, + } => { + if block.block.len() != 1 { + return Err(ShellError::labeled_error( "Expected a condition", "expected a condition", tag, )); - return; } - }; - - while let Some(item) = call_info.input.next().await { - let condition = condition.clone(); - trace!("ITEM = {:?}", item); - let result = - evaluate_baseline_expr(&*condition, ®istry, &item, &scope.vars, &scope.env) - .await; - trace!("RESULT = {:?}", result); - - let return_value = match result { - Ok(ref v) if v.is_true() => true, - _ => false, - }; - - if return_value { - yield ReturnSuccess::value(item); - } else { - break; + match block.block[0].list.get(0) { + Some(item) => match item { + ClassifiedCommand::Expr(expr) => expr.clone(), + _ => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + }, + None => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } } } - }; + Value { tag, .. } => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + }); - Ok(stream.to_output_stream()) + Ok(call_info + .input + .take_while(move |item| { + let condition = condition.clone(); + let registry = registry.clone(); + let scope = scope.clone(); + let item = item.clone(); + + trace!("ITEM = {:?}", item); + + async move { + let result = evaluate_baseline_expr( + &*condition, + ®istry, + &item, + &scope.vars, + &scope.env, + ) + .await; + trace!("RESULT = {:?}", result); + + match result { + Ok(ref v) if v.is_true() => true, + _ => false, + } + } + }) + .to_output_stream()) } } diff --git a/crates/nu-cli/src/commands/skip_until.rs b/crates/nu-cli/src/commands/skip_until.rs index c69625e863..7cbcbb9f39 100644 --- a/crates/nu-cli/src/commands/skip_until.rs +++ b/crates/nu-cli/src/commands/skip_until.rs @@ -3,9 +3,7 @@ use crate::evaluate::evaluate_baseline_expr; use crate::prelude::*; use log::trace; use nu_errors::ShellError; -use nu_protocol::{ - hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value}; pub struct SkipUntil; @@ -34,83 +32,80 @@ impl WholeStreamCommand for SkipUntil { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - let registry = registry.clone(); - let scope = args.call_info.scope.clone(); - let stream = async_stream! { - let mut call_info = args.evaluate_once(®istry).await?; + let registry = Arc::new(registry.clone()); + let scope = Arc::new(args.call_info.scope.clone()); + let call_info = args.evaluate_once(®istry).await?; - let block = call_info.args.expect_nth(0)?.clone(); + let block = call_info.args.expect_nth(0)?.clone(); - let condition = match block { - Value { - value: UntaggedValue::Block(block), - tag, - } => { - if block.block.len() != 1 { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - match block.block[0].list.get(0) { - Some(item) => match item { - ClassifiedCommand::Expr(expr) => expr.clone(), - _ => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - }, - None => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - } - } - Value { tag, .. } => { - yield Err(ShellError::labeled_error( + let condition = Arc::new(match block { + Value { + value: UntaggedValue::Block(block), + tag, + } => { + if block.block.len() != 1 { + return Err(ShellError::labeled_error( "Expected a condition", "expected a condition", tag, )); - return; } - }; - - let mut skipping = true; - while let Some(item) = call_info.input.next().await { - let condition = condition.clone(); - trace!("ITEM = {:?}", item); - let result = - evaluate_baseline_expr(&*condition, ®istry, &item, &scope.vars, &scope.env) - .await; - trace!("RESULT = {:?}", result); - - let return_value = match result { - Ok(ref v) if v.is_true() => true, - _ => false, - }; - - if return_value { - skipping = false; - } - - if !skipping { - yield ReturnSuccess::value(item); + match block.block[0].list.get(0) { + Some(item) => match item { + ClassifiedCommand::Expr(expr) => expr.clone(), + _ => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + }, + None => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } } } - }; + Value { tag, .. } => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + }); - Ok(stream.to_output_stream()) + Ok(call_info + .input + .skip_while(move |item| { + let condition = condition.clone(); + let registry = registry.clone(); + let scope = scope.clone(); + let item = item.clone(); + trace!("ITEM = {:?}", item); + + async move { + let result = evaluate_baseline_expr( + &*condition, + ®istry, + &item, + &scope.vars, + &scope.env, + ) + .await; + trace!("RESULT = {:?}", result); + + match result { + Ok(ref v) if v.is_true() => true, + _ => false, + } + } + }) + .to_output_stream()) } } diff --git a/crates/nu-cli/src/commands/skip_while.rs b/crates/nu-cli/src/commands/skip_while.rs index 599f577c69..3abc8f06b9 100644 --- a/crates/nu-cli/src/commands/skip_while.rs +++ b/crates/nu-cli/src/commands/skip_while.rs @@ -3,9 +3,7 @@ use crate::evaluate::evaluate_baseline_expr; use crate::prelude::*; use log::trace; use nu_errors::ShellError; -use nu_protocol::{ - hir::ClassifiedCommand, ReturnSuccess, Signature, SyntaxShape, UntaggedValue, Value, -}; +use nu_protocol::{hir::ClassifiedCommand, Signature, SyntaxShape, UntaggedValue, Value}; pub struct SkipWhile; @@ -34,83 +32,80 @@ impl WholeStreamCommand for SkipWhile { args: CommandArgs, registry: &CommandRegistry, ) -> Result { - let registry = registry.clone(); - let scope = args.call_info.scope.clone(); - let stream = async_stream! { - let mut call_info = args.evaluate_once(®istry).await?; + let registry = Arc::new(registry.clone()); + let scope = Arc::new(args.call_info.scope.clone()); + let call_info = args.evaluate_once(®istry).await?; - let block = call_info.args.expect_nth(0)?.clone(); + let block = call_info.args.expect_nth(0)?.clone(); - let condition = match block { - Value { - value: UntaggedValue::Block(block), - tag, - } => { - if block.block.len() != 1 { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - match block.block[0].list.get(0) { - Some(item) => match item { - ClassifiedCommand::Expr(expr) => expr.clone(), - _ => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - }, - None => { - yield Err(ShellError::labeled_error( - "Expected a condition", - "expected a condition", - tag, - )); - return; - } - } - } - Value { tag, .. } => { - yield Err(ShellError::labeled_error( + let condition = Arc::new(match block { + Value { + value: UntaggedValue::Block(block), + tag, + } => { + if block.block.len() != 1 { + return Err(ShellError::labeled_error( "Expected a condition", "expected a condition", tag, )); - return; } - }; - - let mut skipping = true; - while let Some(item) = call_info.input.next().await { - let condition = condition.clone(); - trace!("ITEM = {:?}", item); - let result = - evaluate_baseline_expr(&*condition, ®istry, &item, &scope.vars, &scope.env) - .await; - trace!("RESULT = {:?}", result); - - let return_value = match result { - Ok(ref v) if v.is_true() => false, - _ => true, - }; - - if return_value { - skipping = false; - } - - if !skipping { - yield ReturnSuccess::value(item); + match block.block[0].list.get(0) { + Some(item) => match item { + ClassifiedCommand::Expr(expr) => expr.clone(), + _ => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + }, + None => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } } } - }; + Value { tag, .. } => { + return Err(ShellError::labeled_error( + "Expected a condition", + "expected a condition", + tag, + )); + } + }); - Ok(stream.to_output_stream()) + Ok(call_info + .input + .skip_while(move |item| { + let item = item.clone(); + let condition = condition.clone(); + let registry = registry.clone(); + let scope = scope.clone(); + trace!("ITEM = {:?}", item); + + async move { + let result = evaluate_baseline_expr( + &*condition, + ®istry, + &item, + &scope.vars, + &scope.env, + ) + .await; + trace!("RESULT = {:?}", result); + + match result { + Ok(ref v) if v.is_true() => true, + _ => false, + } + } + }) + .to_output_stream()) } } diff --git a/crates/nu-cli/src/stream/interruptible.rs b/crates/nu-cli/src/stream/interruptible.rs index d938225293..439059d833 100644 --- a/crates/nu-cli/src/stream/interruptible.rs +++ b/crates/nu-cli/src/stream/interruptible.rs @@ -4,17 +4,17 @@ use std::sync::atomic::{AtomicBool, Ordering}; pub struct InterruptibleStream { inner: BoxStream<'static, V>, - ctrl_c: Arc, + interrupt_signal: Arc, } impl InterruptibleStream { - pub fn new(inner: S, ctrl_c: Arc) -> InterruptibleStream + pub fn new(inner: S, interrupt_signal: Arc) -> InterruptibleStream where S: Stream + Send + 'static, { InterruptibleStream { inner: inner.boxed(), - ctrl_c, + interrupt_signal, } } } @@ -26,7 +26,7 @@ impl Stream for InterruptibleStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> core::task::Poll> { - if self.ctrl_c.load(Ordering::SeqCst) { + if self.interrupt_signal.load(Ordering::SeqCst) { Poll::Ready(None) } else { Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)