added helper methods to PipelineDataHeader and cleaned up match statements

This commit is contained in:
Jack Wright 2024-07-31 09:55:54 -07:00
parent 2d560f7a88
commit 0ae6c4fa67
5 changed files with 81 additions and 115 deletions

View File

@ -339,8 +339,7 @@ fn write_pipeline_data_value() -> Result<(), ShellError> {
match header { match header {
PipelineDataHeader::Value { PipelineDataHeader::Value {
value: read_value, value: read_value, ..
metadata: None,
} => assert_eq!(value, read_value), } => assert_eq!(value, read_value),
_ => panic!("unexpected header: {header:?}"), _ => panic!("unexpected header: {header:?}"),
} }
@ -457,10 +456,7 @@ fn write_pipeline_data_byte_stream() -> Result<(), ShellError> {
let (header, writer) = interface.init_write_pipeline_data(data, &())?; let (header, writer) = interface.init_write_pipeline_data(data, &())?;
let info = match header { let info = match header {
PipelineDataHeader::ByteStream { PipelineDataHeader::ByteStream { info, .. } => info,
info,
metadata: None,
} => info,
_ => panic!("unexpected header: {header:?}"), _ => panic!("unexpected header: {header:?}"),
}; };

View File

@ -322,7 +322,7 @@ macro_rules! generate_tests {
4, 4,
PluginCallResponse::PipelineData(PipelineDataHeader::Value { PluginCallResponse::PipelineData(PipelineDataHeader::Value {
value: returned_value, value: returned_value,
metadata: None, ..
}), }),
) => { ) => {
assert_eq!(value, returned_value) assert_eq!(value, returned_value)
@ -343,10 +343,7 @@ macro_rules! generate_tests {
span, span,
); );
let response = PluginCallResponse::PipelineData(PipelineDataHeader::Value { let response = PluginCallResponse::PipelineData(PipelineDataHeader::value(value));
value,
metadata: None,
});
let output = PluginOutput::CallResponse(5, response); let output = PluginOutput::CallResponse(5, response);
let encoder = $encoder; let encoder = $encoder;
@ -364,7 +361,7 @@ macro_rules! generate_tests {
5, 5,
PluginCallResponse::PipelineData(PipelineDataHeader::Value { PluginCallResponse::PipelineData(PipelineDataHeader::Value {
value: returned_value, value: returned_value,
metadata: None, ..
}), }),
) => { ) => {
assert_eq!(span, returned_value.span()); assert_eq!(span, returned_value.span());

View File

@ -53,13 +53,10 @@ fn manager_consume_all_exits_after_streams_and_interfaces_are_dropped() -> Resul
// Create a stream... // Create a stream...
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ListStream { PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
}, }),
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -112,13 +109,10 @@ fn manager_consume_all_propagates_io_error_to_readers() -> Result<(), ShellError
test.set_read_error(test_io_error()); test.set_read_error(test_io_error());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ListStream { PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
}, }),
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -161,14 +155,11 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell
test.add(invalid_output()); test.add(invalid_output());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ByteStream { PipelineDataHeader::byte_stream(ByteStreamInfo {
info: ByteStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
type_: ByteStreamType::Unknown, type_: ByteStreamType::Unknown,
}, }),
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -341,13 +332,10 @@ fn manager_consume_call_response_forwards_to_subscriber_with_pipeline_data(
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
0, 0,
PluginCallResponse::PipelineData(PipelineDataHeader::ListStream { PluginCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
}, })),
metadata: None,
}),
))?; ))?;
for i in 0..2 { for i in 0..2 {
@ -388,24 +376,18 @@ fn manager_consume_call_response_registers_streams() -> Result<(), ShellError> {
// Check list streams, byte streams // Check list streams, byte streams
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
0, 0,
PluginCallResponse::PipelineData(PipelineDataHeader::ListStream { PluginCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
}, })),
metadata: None,
}),
))?; ))?;
manager.consume(PluginOutput::CallResponse( manager.consume(PluginOutput::CallResponse(
1, 1,
PluginCallResponse::PipelineData(PipelineDataHeader::ByteStream { PluginCallResponse::PipelineData(PipelineDataHeader::byte_stream(ByteStreamInfo {
info: ByteStreamInfo {
id: 1, id: 1,
span: Span::test_data(), span: Span::test_data(),
type_: ByteStreamType::Unknown, type_: ByteStreamType::Unknown,
}, })),
metadata: None,
}),
))?; ))?;
// ListStream should have one // ListStream should have one
@ -461,13 +443,10 @@ fn manager_consume_engine_call_forwards_to_subscriber_with_pipeline_data() -> Re
span: Span::test_data(), span: Span::test_data(),
}, },
positional: vec![], positional: vec![],
input: PipelineDataHeader::ListStream { input: PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 2, id: 2,
span: Span::test_data(), span: Span::test_data(),
}, }),
metadata: None,
},
redirect_stdout: false, redirect_stdout: false,
redirect_stderr: false, redirect_stderr: false,
}, },
@ -896,10 +875,7 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel
PluginCall::Run(CallInfo { name, input, .. }) => { PluginCall::Run(CallInfo { name, input, .. }) => {
assert_eq!("foo", name); assert_eq!("foo", name);
match input { match input {
PipelineDataHeader::ListStream { PipelineDataHeader::ListStream { info, .. } => info,
info,
metadata: None,
} => info,
_ => panic!("unexpected input header: {input:?}"), _ => panic!("unexpected input header: {input:?}"),
} }
} }

View File

@ -108,6 +108,27 @@ impl PipelineDataHeader {
PipelineDataHeader::ByteStream { info, .. } => Some(info.id), PipelineDataHeader::ByteStream { info, .. } => Some(info.id),
} }
} }
pub fn value(value: Value) -> Self {
PipelineDataHeader::Value {
value,
metadata: None,
}
}
pub fn list_stream(info: ListStreamInfo) -> Self {
PipelineDataHeader::ListStream {
info,
metadata: None,
}
}
pub fn byte_stream(info: ByteStreamInfo) -> Self {
PipelineDataHeader::ByteStream {
info,
metadata: None,
}
}
} }
/// Additional information about list (value) streams /// Additional information about list (value) streams
@ -351,10 +372,7 @@ impl PluginCallResponse<PipelineDataHeader> {
if value.is_nothing() { if value.is_nothing() {
PluginCallResponse::PipelineData(PipelineDataHeader::Empty) PluginCallResponse::PipelineData(PipelineDataHeader::Empty)
} else { } else {
PluginCallResponse::PipelineData(PipelineDataHeader::Value { PluginCallResponse::PipelineData(PipelineDataHeader::value(value))
value,
metadata: None,
})
} }
} }
} }

View File

@ -55,13 +55,10 @@ fn manager_consume_all_exits_after_streams_and_interfaces_are_dropped() -> Resul
// Create a stream... // Create a stream...
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ListStream { PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
}, }),
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -114,13 +111,10 @@ fn manager_consume_all_propagates_io_error_to_readers() -> Result<(), ShellError
test.set_read_error(test_io_error()); test.set_read_error(test_io_error());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ListStream { PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
}, }),
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -163,14 +157,11 @@ fn manager_consume_all_propagates_message_error_to_readers() -> Result<(), Shell
test.add(invalid_input()); test.add(invalid_input());
let stream = manager.read_pipeline_data( let stream = manager.read_pipeline_data(
PipelineDataHeader::ByteStream { PipelineDataHeader::byte_stream(ByteStreamInfo {
info: ByteStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
type_: ByteStreamType::Unknown, type_: ByteStreamType::Unknown,
}, }),
metadata: None,
},
&Signals::empty(), &Signals::empty(),
)?; )?;
@ -423,13 +414,10 @@ fn manager_consume_call_run_forwards_to_receiver_with_pipeline_data() -> Result<
positional: vec![], positional: vec![],
named: vec![], named: vec![],
}, },
input: PipelineDataHeader::ListStream { input: PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 6, id: 6,
span: Span::test_data(), span: Span::test_data(),
}, }),
metadata: None,
},
}), }),
))?; ))?;
@ -568,13 +556,10 @@ fn manager_consume_engine_call_response_forwards_to_subscriber_with_pipeline_dat
manager.consume(PluginInput::EngineCallResponse( manager.consume(PluginInput::EngineCallResponse(
0, 0,
EngineCallResponse::PipelineData(PipelineDataHeader::ListStream { EngineCallResponse::PipelineData(PipelineDataHeader::list_stream(ListStreamInfo {
info: ListStreamInfo {
id: 0, id: 0,
span: Span::test_data(), span: Span::test_data(),
}, })),
metadata: None,
}),
))?; ))?;
for i in 0..2 { for i in 0..2 {
@ -722,10 +707,7 @@ fn interface_write_response_with_value() -> Result<(), ShellError> {
assert_eq!(33, id, "id"); assert_eq!(33, id, "id");
match response { match response {
PluginCallResponse::PipelineData(header) => match header { PluginCallResponse::PipelineData(header) => match header {
PipelineDataHeader::Value { PipelineDataHeader::Value { value, .. } => assert_eq!(6, value.as_int()?),
value,
metadata: None,
} => assert_eq!(6, value.as_int()?),
_ => panic!("unexpected pipeline data header: {header:?}"), _ => panic!("unexpected pipeline data header: {header:?}"),
}, },
_ => panic!("unexpected response: {response:?}"), _ => panic!("unexpected response: {response:?}"),
@ -757,10 +739,7 @@ fn interface_write_response_with_stream() -> Result<(), ShellError> {
let info = match written { let info = match written {
PluginOutput::CallResponse(_, response) => match response { PluginOutput::CallResponse(_, response) => match response {
PluginCallResponse::PipelineData(header) => match header { PluginCallResponse::PipelineData(header) => match header {
PipelineDataHeader::ListStream { PipelineDataHeader::ListStream { info, .. } => info,
info,
metadata: None,
} => info,
_ => panic!("expected ListStream header: {header:?}"), _ => panic!("expected ListStream header: {header:?}"),
}, },
_ => panic!("wrong response: {response:?}"), _ => panic!("wrong response: {response:?}"),