PR feedback changes

This commit is contained in:
Jack Wright 2024-08-01 09:48:43 -07:00
parent 0ae6c4fa67
commit de0428a448
7 changed files with 109 additions and 147 deletions

View File

@ -174,19 +174,19 @@ pub trait InterfaceManager {
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
self.prepare_pipeline_data(match header { self.prepare_pipeline_data(match header {
PipelineDataHeader::Empty => PipelineData::Empty, PipelineDataHeader::Empty => PipelineData::Empty,
PipelineDataHeader::Value { value, metadata } => PipelineData::Value(value, metadata), PipelineDataHeader::Value(value, metadata) => PipelineData::Value(value, metadata),
PipelineDataHeader::ListStream { info, metadata } => { PipelineDataHeader::ListStream(info) => {
let handle = self.stream_manager().get_handle(); let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?; let reader = handle.read_stream(info.id, self.get_interface())?;
let ls = ListStream::new(reader, info.span, signals.clone()); let ls = ListStream::new(reader, info.span, signals.clone());
PipelineData::ListStream(ls, metadata) PipelineData::ListStream(ls, info.metadata)
} }
PipelineDataHeader::ByteStream { info, metadata } => { PipelineDataHeader::ByteStream(info) => {
let handle = self.stream_manager().get_handle(); let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?; let reader = handle.read_stream(info.id, self.get_interface())?;
let bs = let bs =
ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_); ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_);
PipelineData::ByteStream(bs, metadata) PipelineData::ByteStream(bs, info.metadata)
} }
}) })
} }
@ -249,20 +249,18 @@ pub trait Interface: Clone + Send {
}; };
match self.prepare_pipeline_data(data, context)? { match self.prepare_pipeline_data(data, context)? {
PipelineData::Value(value, metadata) => Ok(( PipelineData::Value(value, metadata) => Ok((
PipelineDataHeader::Value { value, metadata }, PipelineDataHeader::Value(value, metadata),
PipelineDataWriter::None, PipelineDataWriter::None,
)), )),
PipelineData::Empty => Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)), PipelineData::Empty => Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)),
PipelineData::ListStream(stream, metadata) => { PipelineData::ListStream(stream, metadata) => {
let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?; let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?;
Ok(( Ok((
PipelineDataHeader::ListStream { PipelineDataHeader::ListStream(ListStreamInfo {
info: ListStreamInfo { id,
id, span: stream.span(),
span: stream.span(),
},
metadata, metadata,
}, }),
PipelineDataWriter::ListStream(writer, stream), PipelineDataWriter::ListStream(writer, stream),
)) ))
} }
@ -271,10 +269,12 @@ pub trait Interface: Clone + Send {
let type_ = stream.type_(); let type_ = stream.type_();
if let Some(reader) = stream.reader() { if let Some(reader) = stream.reader() {
let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?; let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?;
let header = PipelineDataHeader::ByteStream { let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
info: ByteStreamInfo { id, span, type_ }, id,
span,
type_,
metadata, metadata,
}; });
Ok((header, PipelineDataWriter::ByteStream(writer, reader))) Ok((header, PipelineDataWriter::ByteStream(writer, reader)))
} else { } else {
Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)) Ok((PipelineDataHeader::Empty, PipelineDataWriter::None))

View File

@ -143,11 +143,7 @@ fn read_pipeline_data_value() -> Result<(), ShellError> {
data_source: DataSource::FilePath("/test/path".into()), data_source: DataSource::FilePath("/test/path".into()),
content_type: None, content_type: None,
}); });
let header = PipelineDataHeader::Value { let header = PipelineDataHeader::Value(value.clone(), metadata.clone());
value: value.clone(),
metadata: metadata.clone(),
};
match manager.read_pipeline_data(header, &Signals::empty())? { match manager.read_pipeline_data(header, &Signals::empty())? {
PipelineData::Value(read_value, read_metadata) => { PipelineData::Value(read_value, read_metadata) => {
assert_eq!(value, read_value); assert_eq!(value, read_value);
@ -178,13 +174,11 @@ fn read_pipeline_data_list_stream() -> Result<(), ShellError> {
content_type: Some("foobar".into()), content_type: Some("foobar".into()),
}); });
let header = PipelineDataHeader::ListStream { let header = PipelineDataHeader::ListStream(ListStreamInfo {
info: ListStreamInfo { id: 7,
id: 7, span: Span::test_data(),
span: Span::test_data(),
},
metadata, metadata,
}; });
let pipe = manager.read_pipeline_data(header, &Signals::empty())?; let pipe = manager.read_pipeline_data(header, &Signals::empty())?;
assert!( assert!(
@ -230,14 +224,12 @@ fn read_pipeline_data_byte_stream() -> Result<(), ShellError> {
content_type: Some("foobar".into()), content_type: Some("foobar".into()),
}); });
let header = PipelineDataHeader::ByteStream { let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
info: ByteStreamInfo { id: 12,
id: 12, span: test_span,
span: test_span, type_: ByteStreamType::Unknown,
type_: ByteStreamType::Unknown,
},
metadata, metadata,
}; });
let pipe = manager.read_pipeline_data(header, &Signals::empty())?; let pipe = manager.read_pipeline_data(header, &Signals::empty())?;
@ -285,13 +277,11 @@ fn read_pipeline_data_prepared_properly() -> Result<(), ShellError> {
content_type: Some("foobar".into()), content_type: Some("foobar".into()),
}); });
let header = PipelineDataHeader::ListStream { let header = PipelineDataHeader::ListStream(ListStreamInfo {
info: ListStreamInfo { id: 0,
id: 0, span: Span::test_data(),
span: Span::test_data(),
},
metadata, metadata,
}; });
match manager.read_pipeline_data(header, &Signals::empty())? { match manager.read_pipeline_data(header, &Signals::empty())? {
PipelineData::ListStream(_, meta) => match meta { PipelineData::ListStream(_, meta) => match meta {
Some(PipelineMetadata { data_source, .. }) => match data_source { Some(PipelineMetadata { data_source, .. }) => match data_source {
@ -338,9 +328,7 @@ fn write_pipeline_data_value() -> Result<(), ShellError> {
interface.init_write_pipeline_data(PipelineData::Value(value.clone(), None), &())?; interface.init_write_pipeline_data(PipelineData::Value(value.clone(), None), &())?;
match header { match header {
PipelineDataHeader::Value { PipelineDataHeader::Value(read_value, _) => assert_eq!(value, read_value),
value: read_value, ..
} => assert_eq!(value, read_value),
_ => panic!("unexpected header: {header:?}"), _ => panic!("unexpected header: {header:?}"),
} }
@ -401,7 +389,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> {
let (header, writer) = interface.init_write_pipeline_data(pipe, &())?; let (header, writer) = interface.init_write_pipeline_data(pipe, &())?;
let info = match header { let info = match header {
PipelineDataHeader::ListStream { info, .. } => info, PipelineDataHeader::ListStream(info) => info,
_ => panic!("unexpected header: {header:?}"), _ => panic!("unexpected header: {header:?}"),
}; };
@ -456,7 +444,7 @@ fn write_pipeline_data_byte_stream() -> Result<(), ShellError> {
let (header, writer) = interface.init_write_pipeline_data(data, &())?; let (header, writer) = interface.init_write_pipeline_data(data, &())?;
let info = match header { let info = match header {
PipelineDataHeader::ByteStream { info, .. } => info, PipelineDataHeader::ByteStream(info) => info,
_ => panic!("unexpected header: {header:?}"), _ => panic!("unexpected header: {header:?}"),
}; };

View File

@ -132,10 +132,7 @@ macro_rules! generate_tests {
let plugin_call = PluginCall::Run(CallInfo { let plugin_call = PluginCall::Run(CallInfo {
name: name.clone(), name: name.clone(),
call: call.clone(), call: call.clone(),
input: PipelineDataHeader::Value { input: PipelineDataHeader::Value(input.clone(), metadata.clone()),
value: input.clone(),
metadata: metadata.clone(),
},
}); });
let plugin_input = PluginInput::Call(1, plugin_call); let plugin_input = PluginInput::Call(1, plugin_call);
@ -153,13 +150,7 @@ macro_rules! generate_tests {
match returned { match returned {
PluginInput::Call(1, PluginCall::Run(call_info)) => { PluginInput::Call(1, PluginCall::Run(call_info)) => {
assert_eq!(name, call_info.name); assert_eq!(name, call_info.name);
assert_eq!( assert_eq!(PipelineDataHeader::Value(input, metadata), call_info.input);
PipelineDataHeader::Value {
value: input,
metadata
},
call_info.input
);
assert_eq!(call.head, call_info.call.head); assert_eq!(call.head, call_info.call.head);
assert_eq!(call.positional.len(), call_info.call.positional.len()); assert_eq!(call.positional.len(), call_info.call.positional.len());
@ -320,10 +311,7 @@ macro_rules! generate_tests {
match returned { match returned {
PluginOutput::CallResponse( PluginOutput::CallResponse(
4, 4,
PluginCallResponse::PipelineData(PipelineDataHeader::Value { PluginCallResponse::PipelineData(PipelineDataHeader::Value(returned_value, _)),
value: returned_value,
..
}),
) => { ) => {
assert_eq!(value, returned_value) assert_eq!(value, returned_value)
} }
@ -359,10 +347,7 @@ macro_rules! generate_tests {
match returned { match returned {
PluginOutput::CallResponse( PluginOutput::CallResponse(
5, 5,
PluginCallResponse::PipelineData(PipelineDataHeader::Value { PluginCallResponse::PipelineData(PipelineDataHeader::Value(returned_value, _)),
value: returned_value,
..
}),
) => { ) => {
assert_eq!(span, returned_value.span()); assert_eq!(span, returned_value.span());

View File

@ -53,10 +53,7 @@ fn manager_consume_all_exits_after_streams_and_interfaces_are_dropped() -> Resul
// Create a stream... // Create a stream...
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::list_stream(ListStreamInfo { PipelineDataHeader::list_stream(ListStreamInfo::new(0, Span::test_data())),
id: 0,
span: Span::test_data(),
}),
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -109,10 +106,7 @@ fn manager_consume_all_propagates_io_error_to_readers() -> Result<(), ShellError
test.set_read_error(test_io_error()); test.set_read_error(test_io_error());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::list_stream(ListStreamInfo { PipelineDataHeader::list_stream(ListStreamInfo::new(0, Span::test_data())),
id: 0,
span: Span::test_data(),
}),
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -155,11 +149,11 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell
test.add(invalid_output()); test.add(invalid_output());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::byte_stream(ByteStreamInfo { PipelineDataHeader::byte_stream(ByteStreamInfo::new(
id: 0, 0,
span: Span::test_data(), Span::test_data(),
type_: ByteStreamType::Unknown, ByteStreamType::Unknown,
}), )),
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -332,10 +326,10 @@ fn manager_consume_call_response_forwards_to_subscriber_with_pipeline_data(
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
0, 0,
PluginCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo { PluginCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo::new(
id: 0, 0,
span: Span::test_data(), Span::test_data(),
})), ))),
))?; ))?;
for i in 0..2 { for i in 0..2 {
@ -376,18 +370,18 @@ fn manager_consume_call_response_registers_streams() -> Result<(), ShellError> {
// Check list streams, byte streams // Check list streams, byte streams
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
0, 0,
PluginCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo { PluginCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo::new(
id: 0, 0,
span: Span::test_data(), Span::test_data(),
})), ))),
))?; ))?;
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
1, 1,
PluginCallResponse::PipelineData(PipelineDataHeader::byte_stream(ByteStreamInfo { PluginCallResponse::PipelineData(PipelineDataHeader::byte_stream(ByteStreamInfo::new(
id: 1, 1,
span: Span::test_data(), Span::test_data(),
type_: ByteStreamType::Unknown, ByteStreamType::Unknown,
})), ))),
))?; ))?;
// ListStream should have one // ListStream should have one
@ -443,10 +437,7 @@ fn manager_consume_engine_call_forwards_to_subscriber_with_pipeline_data() -> Re
span: Span::test_data(), span: Span::test_data(),
}, },
positional: vec![], positional: vec![],
input: PipelineDataHeader::list_stream(ListStreamInfo { input: PipelineDataHeader::list_stream(ListStreamInfo::new(2, Span::test_data())),
id: 2,
span: Span::test_data(),
}),
redirect_stdout: false, redirect_stdout: false,
redirect_stderr: false, redirect_stderr: false,
}, },
@ -832,7 +823,7 @@ fn interface_write_plugin_call_writes_run_with_value_input() -> Result<(), Shell
PluginCall::Run(CallInfo { name, input, .. }) => { PluginCall::Run(CallInfo { name, input, .. }) => {
assert_eq!("foo", name); assert_eq!("foo", name);
match input { match input {
PipelineDataHeader::Value { value, metadata } => { PipelineDataHeader::Value(value, metadata) => {
assert_eq!(-1, value.as_int()?); assert_eq!(-1, value.as_int()?);
assert_eq!(metadata0, metadata.expect("there should be metadata")); assert_eq!(metadata0, metadata.expect("there should be metadata"));
} }
@ -875,7 +866,7 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel
PluginCall::Run(CallInfo { name, input, .. }) => { PluginCall::Run(CallInfo { name, input, .. }) => {
assert_eq!("foo", name); assert_eq!("foo", name);
match input { match input {
PipelineDataHeader::ListStream { info, .. } => info, PipelineDataHeader::ListStream(info) => info,
_ => panic!("unexpected input header: {input:?}"), _ => panic!("unexpected input header: {input:?}"),
} }
} }

View File

@ -78,24 +78,15 @@ pub enum PipelineDataHeader {
/// No input /// No input
Empty, Empty,
/// A single value /// A single value
Value { Value(Value, Option<PipelineMetadata>),
value: Value,
metadata: Option<PipelineMetadata>,
},
/// Initiate [`nu_protocol::PipelineData::ListStream`]. /// Initiate [`nu_protocol::PipelineData::ListStream`].
/// ///
/// Items are sent via [`StreamData`] /// Items are sent via [`StreamData`]
ListStream { ListStream(ListStreamInfo),
info: ListStreamInfo,
metadata: Option<PipelineMetadata>,
},
/// Initiate [`nu_protocol::PipelineData::ByteStream`]. /// Initiate [`nu_protocol::PipelineData::ByteStream`].
/// ///
/// Items are sent via [`StreamData`] /// Items are sent via [`StreamData`]
ByteStream { ByteStream(ByteStreamInfo),
info: ByteStreamInfo,
metadata: Option<PipelineMetadata>,
},
} }
impl PipelineDataHeader { impl PipelineDataHeader {
@ -103,31 +94,22 @@ impl PipelineDataHeader {
pub fn stream_id(&self) -> Option<StreamId> { pub fn stream_id(&self) -> Option<StreamId> {
match self { match self {
PipelineDataHeader::Empty => None, PipelineDataHeader::Empty => None,
PipelineDataHeader::Value { .. } => None, PipelineDataHeader::Value(_, _) => None,
PipelineDataHeader::ListStream { info, .. } => Some(info.id), PipelineDataHeader::ListStream(info) => Some(info.id),
PipelineDataHeader::ByteStream { info, .. } => Some(info.id), PipelineDataHeader::ByteStream(info) => Some(info.id),
} }
} }
pub fn value(value: Value) -> Self { pub fn value(value: Value) -> Self {
PipelineDataHeader::Value { PipelineDataHeader::Value(value, None)
value,
metadata: None,
}
} }
pub fn list_stream(info: ListStreamInfo) -> Self { pub fn list_stream(info: ListStreamInfo) -> Self {
PipelineDataHeader::ListStream { PipelineDataHeader::ListStream(info)
info,
metadata: None,
}
} }
pub fn byte_stream(info: ByteStreamInfo) -> Self { pub fn byte_stream(info: ByteStreamInfo) -> Self {
PipelineDataHeader::ByteStream { PipelineDataHeader::ByteStream(info)
info,
metadata: None,
}
} }
} }
@ -136,6 +118,18 @@ impl PipelineDataHeader {
pub struct ListStreamInfo { pub struct ListStreamInfo {
pub id: StreamId, pub id: StreamId,
pub span: Span, pub span: Span,
pub metadata: Option<PipelineMetadata>,
}
impl ListStreamInfo {
/// Create a new `ListStreamInfo` with a unique ID
pub fn new(id: StreamId, span: Span) -> Self {
ListStreamInfo {
id,
span,
metadata: None,
}
}
} }
/// Additional information about byte streams /// Additional information about byte streams
@ -145,6 +139,19 @@ pub struct ByteStreamInfo {
pub span: Span, pub span: Span,
#[serde(rename = "type")] #[serde(rename = "type")]
pub type_: ByteStreamType, pub type_: ByteStreamType,
pub metadata: Option<PipelineMetadata>,
}
impl ByteStreamInfo {
/// Create a new `ByteStreamInfo` with a unique ID
pub fn new(id: StreamId, span: Span, type_: ByteStreamType) -> Self {
ByteStreamInfo {
id,
span,
type_,
metadata: None,
}
}
} }
/// Calls that a plugin can execute. The type parameter determines the input type. /// Calls that a plugin can execute. The type parameter determines the input type.

View File

@ -55,10 +55,7 @@ fn manager_consume_all_exits_after_streams_and_interfaces_are_dropped() -> Resul
// Create a stream... // Create a stream...
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::list_stream(ListStreamInfo { PipelineDataHeader::list_stream(ListStreamInfo::new(0, Span::test_data())),
id: 0,
span: Span::test_data(),
}),
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -111,10 +108,7 @@ fn manager_consume_all_propagates_io_error_to_readers() -> Result<(), ShellError
test.set_read_error(test_io_error()); test.set_read_error(test_io_error());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::list_stream(ListStreamInfo { PipelineDataHeader::list_stream(ListStreamInfo::new(0, Span::test_data())),
id: 0,
span: Span::test_data(),
}),
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -157,11 +151,11 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell
test.add(invalid_input()); test.add(invalid_input());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::byte_stream(ByteStreamInfo { PipelineDataHeader::byte_stream(ByteStreamInfo::new(
id: 0, 0,
span: Span::test_data(), Span::test_data(),
type_: ByteStreamType::Unknown, ByteStreamType::Unknown,
}), )),
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -414,10 +408,7 @@ fn manager_consume_call_run_forwards_to_receiver_with_pipeline_data() -> Result<
positional: vec![], positional: vec![],
named: vec![], named: vec![],
}, },
input: PipelineDataHeader::list_stream(ListStreamInfo { input: PipelineDataHeader::list_stream(ListStreamInfo::new(6, Span::test_data())),
id: 6,
span: Span::test_data(),
}),
}), }),
))?; ))?;
@ -556,10 +547,10 @@ fn manager_consume_engine_call_response_forwards_to_subscriber_with_pipeline_dat
manager.consume(PluginInput::EngineCallResponse( manager.consume(PluginInput::EngineCallResponse(
0, 0,
EngineCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo { EngineCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo::new(
id: 0, 0,
span: Span::test_data(), Span::test_data(),
})), ))),
))?; ))?;
for i in 0..2 { for i in 0..2 {
@ -707,7 +698,7 @@ fn interface_write_response_with_value() -> Result<(), ShellError> {
assert_eq!(33, id, "id"); assert_eq!(33, id, "id");
match response { match response {
PluginCallResponse::PipelineData(header) => match header { PluginCallResponse::PipelineData(header) => match header {
PipelineDataHeader::Value { value, .. } => assert_eq!(6, value.as_int()?), PipelineDataHeader::Value(value, _) => assert_eq!(6, value.as_int()?),
_ => panic!("unexpected pipeline data header: {header:?}"), _ => panic!("unexpected pipeline data header: {header:?}"),
}, },
_ => panic!("unexpected response: {response:?}"), _ => panic!("unexpected response: {response:?}"),
@ -739,7 +730,7 @@ fn interface_write_response_with_stream() -> Result<(), ShellError> {
let info = match written { let info = match written {
PluginOutput::CallResponse(_, response) => match response { PluginOutput::CallResponse(_, response) => match response {
PluginCallResponse::PipelineData(header) => match header { PluginCallResponse::PipelineData(header) => match header {
PipelineDataHeader::ListStream { info, .. } => info, PipelineDataHeader::ListStream(info) => info,
_ => panic!("expected ListStream header: {header:?}"), _ => panic!("expected ListStream header: {header:?}"),
}, },
_ => panic!("wrong response: {response:?}"), _ => panic!("wrong response: {response:?}"),

View File

@ -3,7 +3,7 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Metadata that is valid for the whole [`PipelineData`](crate::PipelineData) /// Metadata that is valid for the whole [`PipelineData`](crate::PipelineData)
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct PipelineMetadata { pub struct PipelineMetadata {
pub data_source: DataSource, pub data_source: DataSource,
pub content_type: Option<String>, pub content_type: Option<String>,
@ -29,7 +29,7 @@ impl PipelineMetadata {
/// ///
/// This can either be a particular family of commands (useful so downstream commands can adjust /// This can either be a particular family of commands (useful so downstream commands can adjust
/// the presentation e.g. `Ls`) or the opened file to protect against overwrite-attempts properly. /// the presentation e.g. `Ls`) or the opened file to protect against overwrite-attempts properly.
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub enum DataSource { pub enum DataSource {
Ls, Ls,
HtmlThemes, HtmlThemes,