Fixing old usages of PipelineDataHeader

This commit is contained in:
Jack Wright 2024-07-30 21:39:18 -07:00
parent fe6479983c
commit dccf597af1
4 changed files with 131 additions and 67 deletions

View File

@ -145,7 +145,7 @@ fn read_pipeline_data_value() -> Result<(), ShellError> {
}); });
let header = PipelineDataHeader::Value { let header = PipelineDataHeader::Value {
value: value.clone(), value: value.clone(),
metadata, metadata: metadata.clone(),
}; };
match manager.read_pipeline_data(header, &Signals::empty())? { match manager.read_pipeline_data(header, &Signals::empty())? {

View File

@ -1,12 +1,13 @@
macro_rules! generate_tests { macro_rules! generate_tests {
($encoder:expr) => { ($encoder:expr) => {
use nu_plugin_protocol::{ use nu_plugin_protocol::{
CallInfo, CustomValueOp, EvaluatedCall, PipelineDataHeader, CallInfo, CustomValueOp, EvaluatedCall, PipelineDataHeader, PluginCall,
PluginCall, PluginCallResponse, PluginCustomValue, PluginInput, PluginCallResponse, PluginCustomValue, PluginInput, PluginOption, PluginOutput,
PluginOption, PluginOutput, StreamData, StreamData,
}; };
use nu_protocol::{ use nu_protocol::{
DataSource, PipelineMetadata, LabeledError, PluginSignature, Signature, Span, Spanned, SyntaxShape, Value, DataSource, LabeledError, PipelineMetadata, PluginSignature, Signature, Span, Spanned,
SyntaxShape, Value,
}; };
#[test] #[test]
@ -133,7 +134,7 @@ macro_rules! generate_tests {
call: call.clone(), call: call.clone(),
input: PipelineDataHeader::Value { input: PipelineDataHeader::Value {
value: input.clone(), value: input.clone(),
metadata, metadata: metadata.clone(),
}, },
}); });
@ -319,7 +320,10 @@ macro_rules! generate_tests {
match returned { match returned {
PluginOutput::CallResponse( PluginOutput::CallResponse(
4, 4,
PluginCallResponse::PipelineData(PipelineDataHeader::Value(returned_value)), PluginCallResponse::PipelineData(PipelineDataHeader::Value {
value: returned_value,
metadata: None,
}),
) => { ) => {
assert_eq!(value, returned_value) assert_eq!(value, returned_value)
} }
@ -339,7 +343,10 @@ macro_rules! generate_tests {
span, span,
); );
let response = PluginCallResponse::PipelineData(PipelineDataHeader::Value(value)); let response = PluginCallResponse::PipelineData(PipelineDataHeader::Value {
value,
metadata: None,
});
let output = PluginOutput::CallResponse(5, response); let output = PluginOutput::CallResponse(5, response);
let encoder = $encoder; let encoder = $encoder;
@ -355,7 +362,10 @@ macro_rules! generate_tests {
match returned { match returned {
PluginOutput::CallResponse( PluginOutput::CallResponse(
5, 5,
PluginCallResponse::PipelineData(PipelineDataHeader::Value(returned_value)), PluginCallResponse::PipelineData(PipelineDataHeader::Value {
value: returned_value,
metadata: None,
}),
) => { ) => {
assert_eq!(span, returned_value.span()); assert_eq!(span, returned_value.span());

View File

@ -17,8 +17,9 @@ use nu_plugin_protocol::{
use nu_protocol::{ use nu_protocol::{
ast::{Math, Operator}, ast::{Math, Operator},
engine::Closure, engine::Closure,
ByteStreamType, CustomValue, IntoInterruptiblePipelineData, IntoSpanned, PipelineData, ByteStreamType, CustomValue, DataSource, IntoInterruptiblePipelineData, IntoSpanned,
PluginMetadata, PluginSignature, ShellError, Signals, Span, Spanned, Value, PipelineData, PipelineMetadata, PluginMetadata, PluginSignature, ShellError, Signals, Span,
Spanned, Value,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
@ -52,10 +53,13 @@ fn manager_consume_all_exits_after_streams_and_interfaces_are_dropped() -> Resul
// Create a stream... // Create a stream...
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ListStream(ListStreamInfo { PipelineDataHeader::ListStream {
id: 0, info: ListStreamInfo {
span: Span::test_data(), id: 0,
}), span: Span::test_data(),
},
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -108,10 +112,13 @@ fn manager_consume_all_propagates_io_error_to_readers() -> Result<(), ShellError
test.set_read_error(test_io_error()); test.set_read_error(test_io_error());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ListStream(ListStreamInfo { PipelineDataHeader::ListStream {
id: 0, info: ListStreamInfo {
span: Span::test_data(), id: 0,
}), span: Span::test_data(),
},
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -154,11 +161,14 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell
test.add(invalid_output()); test.add(invalid_output());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ByteStream(ByteStreamInfo { PipelineDataHeader::ByteStream {
id: 0, info: ByteStreamInfo {
span: Span::test_data(), id: 0,
type_: ByteStreamType::Unknown, span: Span::test_data(),
}), type_: ByteStreamType::Unknown,
},
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -331,10 +341,13 @@ fn manager_consume_call_response_forwards_to_subscriber_with_pipeline_data(
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
0, 0,
PluginCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { PluginCallResponse::PipelineData(PipelineDataHeader::ListStream {
id: 0, info: ListStreamInfo {
span: Span::test_data(), id: 0,
})), span: Span::test_data(),
},
metadata: None,
}),
))?; ))?;
for i in 0..2 { for i in 0..2 {
@ -375,18 +388,24 @@ fn manager_consume_call_response_registers_streams() -> Result<(), ShellError> {
// Check list streams, byte streams // Check list streams, byte streams
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
0, 0,
PluginCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { PluginCallResponse::PipelineData(PipelineDataHeader::ListStream {
id: 0, info: ListStreamInfo {
span: Span::test_data(), id: 0,
})), span: Span::test_data(),
},
metadata: None,
}),
))?; ))?;
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
1, 1,
PluginCallResponse::PipelineData(PipelineDataHeader::ByteStream(ByteStreamInfo { PluginCallResponse::PipelineData(PipelineDataHeader::ByteStream {
id: 1, info: ByteStreamInfo {
span: Span::test_data(), id: 1,
type_: ByteStreamType::Unknown, span: Span::test_data(),
})), type_: ByteStreamType::Unknown,
},
metadata: None,
}),
))?; ))?;
// ListStream should have one // ListStream should have one
@ -442,10 +461,13 @@ fn manager_consume_engine_call_forwards_to_subscriber_with_pipeline_data() -> Re
span: Span::test_data(), span: Span::test_data(),
}, },
positional: vec![], positional: vec![],
input: PipelineDataHeader::ListStream(ListStreamInfo { input: PipelineDataHeader::ListStream {
id: 2, info: ListStreamInfo {
span: Span::test_data(), id: 2,
}), span: Span::test_data(),
},
metadata: None,
},
redirect_stdout: false, redirect_stdout: false,
redirect_stderr: false, redirect_stderr: false,
}, },
@ -806,6 +828,11 @@ fn interface_write_plugin_call_writes_run_with_value_input() -> Result<(), Shell
let manager = test.plugin("test"); let manager = test.plugin("test");
let interface = manager.get_interface(); let interface = manager.get_interface();
let metadata0 = PipelineMetadata {
data_source: DataSource::None,
content_type: Some("baz".into()),
};
let result = interface.write_plugin_call( let result = interface.write_plugin_call(
PluginCall::Run(CallInfo { PluginCall::Run(CallInfo {
name: "foo".into(), name: "foo".into(),
@ -814,7 +841,7 @@ fn interface_write_plugin_call_writes_run_with_value_input() -> Result<(), Shell
positional: vec![], positional: vec![],
named: vec![], named: vec![],
}, },
input: PipelineData::Value(Value::test_int(-1), None), input: PipelineData::Value(Value::test_int(-1), Some(metadata0.clone())),
}), }),
None, None,
)?; )?;
@ -826,7 +853,10 @@ fn interface_write_plugin_call_writes_run_with_value_input() -> Result<(), Shell
PluginCall::Run(CallInfo { name, input, .. }) => { PluginCall::Run(CallInfo { name, input, .. }) => {
assert_eq!("foo", name); assert_eq!("foo", name);
match input { match input {
PipelineDataHeader::Value(value) => assert_eq!(-1, value.as_int()?), PipelineDataHeader::Value { value, metadata } => {
assert_eq!(-1, value.as_int()?);
assert_eq!(metadata0, metadata.expect("there should be metadata"));
}
_ => panic!("unexpected input header: {input:?}"), _ => panic!("unexpected input header: {input:?}"),
} }
} }
@ -866,7 +896,10 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel
PluginCall::Run(CallInfo { name, input, .. }) => { PluginCall::Run(CallInfo { name, input, .. }) => {
assert_eq!("foo", name); assert_eq!("foo", name);
match input { match input {
PipelineDataHeader::ListStream(info) => info, PipelineDataHeader::ListStream {
info,
metadata: None,
} => info,
_ => panic!("unexpected input header: {input:?}"), _ => panic!("unexpected input header: {input:?}"),
} }
} }

View File

@ -55,10 +55,13 @@ fn manager_consume_all_exits_after_streams_and_interfaces_are_dropped() -> Resul
// Create a stream... // Create a stream...
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ListStream(ListStreamInfo { PipelineDataHeader::ListStream {
id: 0, info: ListStreamInfo {
span: Span::test_data(), id: 0,
}), span: Span::test_data(),
},
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -111,10 +114,13 @@ fn manager_consume_all_propagates_io_error_to_readers() -> Result<(), ShellError
test.set_read_error(test_io_error()); test.set_read_error(test_io_error());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ListStream(ListStreamInfo { PipelineDataHeader::ListStream {
id: 0, info: ListStreamInfo {
span: Span::test_data(), id: 0,
}), span: Span::test_data(),
},
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -157,11 +163,14 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell
test.add(invalid_input()); test.add(invalid_input());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ByteStream(ByteStreamInfo { PipelineDataHeader::ByteStream {
id: 0, info: ByteStreamInfo {
span: Span::test_data(), id: 0,
type_: ByteStreamType::Unknown, span: Span::test_data(),
}), type_: ByteStreamType::Unknown,
},
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -414,10 +423,13 @@ fn manager_consume_call_run_forwards_to_receiver_with_pipeline_data() -> Result<
positional: vec![], positional: vec![],
named: vec![], named: vec![],
}, },
input: PipelineDataHeader::ListStream(ListStreamInfo { input: PipelineDataHeader::ListStream {
id: 6, info: ListStreamInfo {
span: Span::test_data(), id: 6,
}), span: Span::test_data(),
},
metadata: None,
},
}), }),
))?; ))?;
@ -556,10 +568,13 @@ fn manager_consume_engine_call_response_forwards_to_subscriber_with_pipeline_dat
manager.consume(PluginInput::EngineCallResponse( manager.consume(PluginInput::EngineCallResponse(
0, 0,
EngineCallResponse::PipelineData(PipelineDataHeader::ListStream(ListStreamInfo { EngineCallResponse::PipelineData(PipelineDataHeader::ListStream {
id: 0, info: ListStreamInfo {
span: Span::test_data(), id: 0,
})), span: Span::test_data(),
},
metadata: None,
}),
))?; ))?;
for i in 0..2 { for i in 0..2 {
@ -707,7 +722,10 @@ fn interface_write_response_with_value() -> Result<(), ShellError> {
assert_eq!(33, id, "id"); assert_eq!(33, id, "id");
match response { match response {
PluginCallResponse::PipelineData(header) => match header { PluginCallResponse::PipelineData(header) => match header {
PipelineDataHeader::Value(value) => assert_eq!(6, value.as_int()?), PipelineDataHeader::Value {
value,
metadata: None,
} => assert_eq!(6, value.as_int()?),
_ => panic!("unexpected pipeline data header: {header:?}"), _ => panic!("unexpected pipeline data header: {header:?}"),
}, },
_ => panic!("unexpected response: {response:?}"), _ => panic!("unexpected response: {response:?}"),
@ -739,7 +757,10 @@ fn interface_write_response_with_stream() -> Result<(), ShellError> {
let info = match written { let info = match written {
PluginOutput::CallResponse(_, response) => match response { PluginOutput::CallResponse(_, response) => match response {
PluginCallResponse::PipelineData(header) => match header { PluginCallResponse::PipelineData(header) => match header {
PipelineDataHeader::ListStream(info) => info, PipelineDataHeader::ListStream {
info,
metadata: None,
} => info,
_ => panic!("expected ListStream header: {header:?}"), _ => panic!("expected ListStream header: {header:?}"),
}, },
_ => panic!("wrong response: {response:?}"), _ => panic!("wrong response: {response:?}"),