nushell/crates/nu-protocol/src/value/stream.rs
2021-10-28 17:13:10 +13:00

122 lines
2.9 KiB
Rust

use crate::*;
use std::{
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
pub struct ValueStream {
pub stream: Box<dyn Iterator<Item = Value> + Send + 'static>,
pub ctrlc: Option<Arc<AtomicBool>>,
}
impl ValueStream {
pub fn into_string(self) -> String {
format!(
"[{}]",
self.map(|x: Value| x.into_string())
.collect::<Vec<String>>()
.join(", ")
)
}
pub fn collect_string(self) -> String {
self.map(|x: Value| x.collect_string())
.collect::<Vec<String>>()
.join("\n")
}
pub fn from_stream(
input: impl Iterator<Item = Value> + Send + 'static,
ctrlc: Option<Arc<AtomicBool>>,
) -> ValueStream {
ValueStream {
stream: Box::new(input),
ctrlc,
}
}
}
impl Debug for ValueStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ValueStream").finish()
}
}
impl Iterator for ValueStream {
type Item = Value;
fn next(&mut self) -> Option<Self::Item> {
if let Some(ctrlc) = &self.ctrlc {
if ctrlc.load(Ordering::SeqCst) {
None
} else {
self.stream.next()
}
} else {
self.stream.next()
}
}
}
// impl Serialize for ValueStream {
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
// where
// S: serde::Serializer,
// {
// let mut seq = serializer.serialize_seq(None)?;
// for element in self.0.borrow_mut().into_iter() {
// seq.serialize_element(&element)?;
// }
// seq.end()
// }
// }
// impl<'de> Deserialize<'de> for ValueStream {
// fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
// where
// D: serde::Deserializer<'de>,
// {
// deserializer.deserialize_seq(MySeqVisitor)
// }
// }
// struct MySeqVisitor;
// impl<'a> serde::de::Visitor<'a> for MySeqVisitor {
// type Value = ValueStream;
// fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
// formatter.write_str("a value stream")
// }
// fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
// where
// A: serde::de::SeqAccess<'a>,
// {
// let mut output: Vec<Value> = vec![];
// while let Some(value) = seq.next_element()? {
// output.push(value);
// }
// Ok(ValueStream(Rc::new(RefCell::new(output.into_iter()))))
// }
// }
// pub trait IntoValueStream {
// fn into_value_stream(self) -> ValueStream;
// }
// impl<T> IntoValueStream for T
// where
// T: Iterator<Item = Value> + 'static,
// {
// fn into_value_stream(self) -> ValueStream {
// ValueStream::from_stream(self)
// }
// }