Make pipeline metadata available to plugins

This commit is contained in:
Jack Wright 2024-07-30 21:13:50 -07:00
parent e68f744dda
commit 382a09e413
7 changed files with 128 additions and 48 deletions

View File

@ -174,16 +174,19 @@ pub trait InterfaceManager {
) -> Result<PipelineData, ShellError> {
self.prepare_pipeline_data(match header {
PipelineDataHeader::Empty => PipelineData::Empty,
PipelineDataHeader::Value(value) => PipelineData::Value(value, None),
PipelineDataHeader::ListStream(info) => {
PipelineDataHeader::Value { value, metadata } => PipelineData::Value(value, metadata),
PipelineDataHeader::ListStream { info, metadata } => {
let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?;
ListStream::new(reader, info.span, signals.clone()).into()
let ls = ListStream::new(reader, info.span, signals.clone());
PipelineData::ListStream(ls, metadata)
}
PipelineDataHeader::ByteStream(info) => {
PipelineDataHeader::ByteStream { info, metadata } => {
let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?;
ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_).into()
let bs =
ByteStream::from_result_iter(reader, info.span, signals.clone(), info.type_);
PipelineData::ByteStream(bs, metadata)
}
})
}
@ -245,26 +248,33 @@ pub trait Interface: Clone + Send {
Ok::<_, ShellError>((id, writer))
};
match self.prepare_pipeline_data(data, context)? {
PipelineData::Value(value, ..) => {
Ok((PipelineDataHeader::Value(value), PipelineDataWriter::None))
}
PipelineData::Value(value, metadata) => Ok((
PipelineDataHeader::Value { value, metadata },
PipelineDataWriter::None,
)),
PipelineData::Empty => Ok((PipelineDataHeader::Empty, PipelineDataWriter::None)),
PipelineData::ListStream(stream, ..) => {
PipelineData::ListStream(stream, metadata) => {
let (id, writer) = new_stream(LIST_STREAM_HIGH_PRESSURE)?;
Ok((
PipelineDataHeader::ListStream(ListStreamInfo {
id,
span: stream.span(),
}),
PipelineDataHeader::ListStream {
info: ListStreamInfo {
id,
span: stream.span(),
},
metadata,
},
PipelineDataWriter::ListStream(writer, stream),
))
}
PipelineData::ByteStream(stream, ..) => {
PipelineData::ByteStream(stream, metadata) => {
let span = stream.span();
let type_ = stream.type_();
if let Some(reader) = stream.reader() {
let (id, writer) = new_stream(RAW_STREAM_HIGH_PRESSURE)?;
let header = PipelineDataHeader::ByteStream(ByteStreamInfo { id, span, type_ });
let header = PipelineDataHeader::ByteStream {
info: ByteStreamInfo { id, span, type_ },
metadata,
};
Ok((header, PipelineDataWriter::ByteStream(writer, reader)))
} else {
Ok((PipelineDataHeader::Empty, PipelineDataWriter::None))

View File

@ -139,10 +139,20 @@ fn read_pipeline_data_empty() -> Result<(), ShellError> {
fn read_pipeline_data_value() -> Result<(), ShellError> {
let manager = TestInterfaceManager::new(&TestCase::new());
let value = Value::test_int(4);
let header = PipelineDataHeader::Value(value.clone());
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foo".into()),
});
let header = PipelineDataHeader::Value {
value: value.clone(),
metadata,
};
match manager.read_pipeline_data(header, &Signals::empty())? {
PipelineData::Value(read_value, ..) => assert_eq!(value, read_value),
PipelineData::Value(read_value, read_metadata) => {
assert_eq!(value, read_value);
assert_eq!(metadata, read_metadata);
}
PipelineData::ListStream(..) => panic!("unexpected ListStream"),
PipelineData::ByteStream(..) => panic!("unexpected ByteStream"),
PipelineData::Empty => panic!("unexpected Empty"),
@ -163,11 +173,19 @@ fn read_pipeline_data_list_stream() -> Result<(), ShellError> {
}
test.add(StreamMessage::End(7));
let header = PipelineDataHeader::ListStream(ListStreamInfo {
id: 7,
span: Span::test_data(),
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foobar".into()),
});
let header = PipelineDataHeader::ListStream {
info: ListStreamInfo {
id: 7,
span: Span::test_data(),
},
metadata,
};
let pipe = manager.read_pipeline_data(header, &Signals::empty())?;
assert!(
matches!(pipe, PipelineData::ListStream(..)),
@ -206,12 +224,21 @@ fn read_pipeline_data_byte_stream() -> Result<(), ShellError> {
test.add(StreamMessage::End(12));
let test_span = Span::new(10, 13);
let header = PipelineDataHeader::ByteStream(ByteStreamInfo {
id: 12,
span: test_span,
type_: ByteStreamType::Unknown,
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foobar".into()),
});
let header = PipelineDataHeader::ByteStream {
info: ByteStreamInfo {
id: 12,
span: test_span,
type_: ByteStreamType::Unknown,
},
metadata,
};
let pipe = manager.read_pipeline_data(header, &Signals::empty())?;
// need to consume input
@ -253,10 +280,18 @@ fn read_pipeline_data_byte_stream() -> Result<(), ShellError> {
#[test]
fn read_pipeline_data_prepared_properly() -> Result<(), ShellError> {
let manager = TestInterfaceManager::new(&TestCase::new());
let header = PipelineDataHeader::ListStream(ListStreamInfo {
id: 0,
span: Span::test_data(),
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foobar".into()),
});
let header = PipelineDataHeader::ListStream {
info: ListStreamInfo {
id: 0,
span: Span::test_data(),
},
metadata,
};
match manager.read_pipeline_data(header, &Signals::empty())? {
PipelineData::ListStream(_, meta) => match meta {
Some(PipelineMetadata { data_source, .. }) => match data_source {
@ -303,7 +338,10 @@ fn write_pipeline_data_value() -> Result<(), ShellError> {
interface.init_write_pipeline_data(PipelineData::Value(value.clone(), None), &())?;
match header {
PipelineDataHeader::Value(read_value) => assert_eq!(value, read_value),
PipelineDataHeader::Value {
value: read_value,
metadata: None,
} => assert_eq!(value, read_value),
_ => panic!("unexpected header: {header:?}"),
}
@ -364,7 +402,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> {
let (header, writer) = interface.init_write_pipeline_data(pipe, &())?;
let info = match header {
PipelineDataHeader::ListStream(info) => info,
PipelineDataHeader::ListStream { info, .. } => info,
_ => panic!("unexpected header: {header:?}"),
};
@ -419,7 +457,10 @@ fn write_pipeline_data_byte_stream() -> Result<(), ShellError> {
let (header, writer) = interface.init_write_pipeline_data(data, &())?;
let info = match header {
PipelineDataHeader::ByteStream(info) => info,
PipelineDataHeader::ByteStream {
info,
metadata: None,
} => info,
_ => panic!("unexpected header: {header:?}"),
};

View File

@ -1,9 +1,9 @@
macro_rules! generate_tests {
($encoder:expr) => {
use nu_plugin_protocol::{
CallInfo, CustomValueOp, EvaluatedCall, PipelineDataHeader, PluginCall,
PluginCallResponse, PluginCustomValue, PluginInput, PluginOption, PluginOutput,
StreamData,
CallInfo, CustomValueOp, DataSource, EvaluatedCall, PipelineDataHeader,
PipelineMetadata, PluginCall, PluginCallResponse, PluginCustomValue, PluginInput,
PluginOption, PluginOutput, StreamData,
};
use nu_protocol::{
LabeledError, PluginSignature, Signature, Span, Spanned, SyntaxShape, Value,
@ -123,10 +123,18 @@ macro_rules! generate_tests {
)],
};
let metadata = Some(PipelineMetadata {
data_source: DataSource::None,
content_type: Some("foobar".into()),
});
let plugin_call = PluginCall::Run(CallInfo {
name: name.clone(),
call: call.clone(),
input: PipelineDataHeader::Value(input.clone()),
input: PipelineDataHeader::Value {
value: input.clone(),
metadata,
},
});
let plugin_input = PluginInput::Call(1, plugin_call);
@ -144,7 +152,13 @@ macro_rules! generate_tests {
match returned {
PluginInput::Call(1, PluginCall::Run(call_info)) => {
assert_eq!(name, call_info.name);
assert_eq!(PipelineDataHeader::Value(input), call_info.input);
assert_eq!(
PipelineDataHeader::Value {
value: input,
metadata
},
call_info.input
);
assert_eq!(call.head, call_info.call.head);
assert_eq!(call.positional.len(), call_info.call.positional.len());

View File

@ -23,7 +23,7 @@ pub mod test_util;
use nu_protocol::{
ast::Operator, engine::Closure, ByteStreamType, Config, DeclId, LabeledError, PipelineData,
PluginMetadata, PluginSignature, ShellError, Span, Spanned, Value,
PipelineMetadata, PluginMetadata, PluginSignature, ShellError, Span, Spanned, Value,
};
use nu_utils::SharedCow;
use serde::{Deserialize, Serialize};
@ -78,15 +78,24 @@ pub enum PipelineDataHeader {
/// No input
Empty,
/// A single value
Value(Value),
Value {
value: Value,
metadata: Option<PipelineMetadata>,
},
/// Initiate [`nu_protocol::PipelineData::ListStream`].
///
/// Items are sent via [`StreamData`]
ListStream(ListStreamInfo),
ListStream {
info: ListStreamInfo,
metadata: Option<PipelineMetadata>,
},
/// Initiate [`nu_protocol::PipelineData::ByteStream`].
///
/// Items are sent via [`StreamData`]
ByteStream(ByteStreamInfo),
ByteStream {
info: ByteStreamInfo,
metadata: Option<PipelineMetadata>,
},
}
impl PipelineDataHeader {
@ -94,9 +103,9 @@ impl PipelineDataHeader {
pub fn stream_id(&self) -> Option<StreamId> {
match self {
PipelineDataHeader::Empty => None,
PipelineDataHeader::Value(_) => None,
PipelineDataHeader::ListStream(info) => Some(info.id),
PipelineDataHeader::ByteStream(info) => Some(info.id),
PipelineDataHeader::Value { .. } => None,
PipelineDataHeader::ListStream { info, .. } => Some(info.id),
PipelineDataHeader::ByteStream { info, .. } => Some(info.id),
}
}
}
@ -342,7 +351,10 @@ impl PluginCallResponse<PipelineDataHeader> {
if value.is_nothing() {
PluginCallResponse::PipelineData(PipelineDataHeader::Empty)
} else {
PluginCallResponse::PipelineData(PipelineDataHeader::Value(value))
PluginCallResponse::PipelineData(PipelineDataHeader::Value {
value,
metadata: None,
})
}
}
}

View File

@ -1,7 +1,9 @@
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
/// Metadata that is valid for the whole [`PipelineData`](crate::PipelineData)
#[derive(Debug, Default, Clone)]
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
pub struct PipelineMetadata {
pub data_source: DataSource,
pub content_type: Option<String>,
@ -27,7 +29,7 @@ impl PipelineMetadata {
///
/// This can either be a particular family of commands (useful so downstream commands can adjust
/// the presentation e.g. `Ls`) or the opened file to protect against overwrite-attempts properly.
#[derive(Debug, Default, Clone)]
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
pub enum DataSource {
Ls,
HtmlThemes,

View File

@ -133,7 +133,7 @@ def process_call [
# Create a Value of type List that will be encoded and sent to Nushell
let value = {
Value: {
Value: { value: {
List: {
vals: (0..9 | each { |x|
{
@ -157,6 +157,7 @@ def process_call [
}),
span: $span
}
}
}
}
@ -265,4 +266,4 @@ def start_plugin [] {
}) |
each { from json | handle_input } |
ignore
}
}

View File

@ -178,7 +178,7 @@ fn handle_message(
id,
{
"PipelineData": {
"Value": return_value
"Value": {"value": return_value}
}
}
]