diff --git a/crates/nu-cli/src/prelude.rs b/crates/nu-cli/src/prelude.rs index 93d59aafe9..ccc0a30ebd 100644 --- a/crates/nu-cli/src/prelude.rs +++ b/crates/nu-cli/src/prelude.rs @@ -86,7 +86,7 @@ pub(crate) use crate::shell::filesystem_shell::FilesystemShell; pub(crate) use crate::shell::help_shell::HelpShell; pub(crate) use crate::shell::shell_manager::ShellManager; pub(crate) use crate::shell::value_shell::ValueShell; -pub(crate) use crate::stream::{InputStream, OutputStream}; +pub(crate) use crate::stream::{InputStream, InterruptibleStream, OutputStream}; pub(crate) use async_stream::stream as async_stream; pub(crate) use bigdecimal::BigDecimal; pub(crate) use futures::stream::BoxStream; @@ -102,6 +102,7 @@ pub(crate) use num_traits::cast::ToPrimitive; pub(crate) use serde::Deserialize; pub(crate) use std::collections::VecDeque; pub(crate) use std::future::Future; +pub(crate) use std::sync::atomic::AtomicBool; pub(crate) use std::sync::Arc; pub(crate) use itertools::Itertools; @@ -160,3 +161,16 @@ where } } } + +pub trait Interruptible { + fn interruptible(self, ctrl_c: Arc) -> InterruptibleStream; +} + +impl Interruptible for S +where + S: Stream + Send + 'static, +{ + fn interruptible(self, ctrl_c: Arc) -> InterruptibleStream { + InterruptibleStream::new(self, ctrl_c) + } +} diff --git a/crates/nu-cli/src/stream/interruptible.rs b/crates/nu-cli/src/stream/interruptible.rs new file mode 100644 index 0000000000..d938225293 --- /dev/null +++ b/crates/nu-cli/src/stream/interruptible.rs @@ -0,0 +1,35 @@ +use crate::prelude::*; +use futures::task::Poll; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub struct InterruptibleStream { + inner: BoxStream<'static, V>, + ctrl_c: Arc, +} + +impl InterruptibleStream { + pub fn new(inner: S, ctrl_c: Arc) -> InterruptibleStream + where + S: Stream + Send + 'static, + { + InterruptibleStream { + inner: inner.boxed(), + ctrl_c, + } + } +} + +impl Stream for InterruptibleStream { + type Item = V; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> core::task::Poll> { + if self.ctrl_c.load(Ordering::SeqCst) { + Poll::Ready(None) + } else { + Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) + } + } +} diff --git a/crates/nu-cli/src/stream/mod.rs b/crates/nu-cli/src/stream/mod.rs index 3daa62395f..ee08c663b3 100644 --- a/crates/nu-cli/src/stream/mod.rs +++ b/crates/nu-cli/src/stream/mod.rs @@ -1,5 +1,7 @@ mod input; +mod interruptible; mod output; pub use input::*; +pub use interruptible::*; pub use output::*;