diff --git a/crates/nu-plugin/src/plugin/interface/engine.rs b/crates/nu-plugin/src/plugin/interface/engine.rs index 547c57a1f8..2cceecbda3 100644 --- a/crates/nu-plugin/src/plugin/interface/engine.rs +++ b/crates/nu-plugin/src/plugin/interface/engine.rs @@ -252,64 +252,58 @@ impl InterfaceManager for EngineInterfaceManager { }) } PluginInput::Stream(message) => self.consume_stream_message(message), - PluginInput::Call(id, call) => match call { - // We just let the receiver handle it rather than trying to store signature here - // or something - PluginCall::Signature => self.send_plugin_call(ReceivedPluginCall::Signature { - engine: self.interface_for_context(id), - }), - // Set up the streams from the input and reformat to a ReceivedPluginCall - PluginCall::Run(CallInfo { - name, - mut call, - input, - }) => { - let interface = self.interface_for_context(id); - // If there's an error with initialization of the input stream, just send - // the error response rather than failing here - match self.read_pipeline_data(input, None) { - Ok(input) => { - // Deserialize custom values in the arguments - if let Err(err) = deserialize_call_args(&mut call) { - return interface.write_response(Err(err))?.write(); - } - // Send the plugin call to the receiver - self.send_plugin_call(ReceivedPluginCall::Run { - engine: interface, - call: CallInfo { name, call, input }, - }) + PluginInput::Call(id, call) => { + let interface = self.interface_for_context(id); + // Read streams in the input + let call = match call.map_data(|input| self.read_pipeline_data(input, None)) { + Ok(call) => call, + Err(err) => { + // If there's an error with initialization of the input stream, just send + // the error response rather than failing here + return interface.write_response(Err(err))?.write(); + } + }; + match call { + // We just let the receiver handle it rather than trying to store signature here + // or something + PluginCall::Signature => { + self.send_plugin_call(ReceivedPluginCall::Signature { engine: interface }) + } + // Parse custom values and send a ReceivedPluginCall + PluginCall::Run(mut call_info) => { + // Deserialize custom values in the arguments + if let Err(err) = deserialize_call_args(&mut call_info.call) { + return interface.write_response(Err(err))?.write(); } - err @ Err(_) => interface.write_response(err)?.write(), + // Send the plugin call to the receiver + self.send_plugin_call(ReceivedPluginCall::Run { + engine: interface, + call: call_info, + }) + } + // Send request with the custom value + PluginCall::CustomValueOp(custom_value, op) => { + self.send_plugin_call(ReceivedPluginCall::CustomValueOp { + engine: interface, + custom_value, + op, + }) } } - // Send request with the custom value - PluginCall::CustomValueOp(custom_value, op) => { - self.send_plugin_call(ReceivedPluginCall::CustomValueOp { - engine: self.interface_for_context(id), - custom_value, - op, - }) - } - }, + } PluginInput::Goodbye => { // Remove the plugin call sender so it hangs up drop(self.plugin_call_sender.take()); Ok(()) } PluginInput::EngineCallResponse(id, response) => { - let response = match response { - EngineCallResponse::Error(err) => EngineCallResponse::Error(err), - EngineCallResponse::Config(config) => EngineCallResponse::Config(config), - EngineCallResponse::ValueMap(map) => EngineCallResponse::ValueMap(map), - EngineCallResponse::PipelineData(header) => { + let response = response + .map_data(|header| self.read_pipeline_data(header, None)) + .unwrap_or_else(|err| { // If there's an error with initializing this stream, change it to an engine // call error response, but send it anyway - match self.read_pipeline_data(header, None) { - Ok(data) => EngineCallResponse::PipelineData(data), - Err(err) => EngineCallResponse::Error(err), - } - } - }; + EngineCallResponse::Error(err) + }); self.send_engine_call_response(id, response) } } @@ -442,36 +436,13 @@ impl EngineInterface { let (tx, rx) = mpsc::channel(); // Convert the call into one with a header and handle the stream, if necessary - let (call, writer) = match call { - EngineCall::EvalClosure { - closure, - positional, - input, - redirect_stdout, - redirect_stderr, - } => { - let (header, writer) = self.init_write_pipeline_data(input)?; - ( - EngineCall::EvalClosure { - closure, - positional, - input: header, - redirect_stdout, - redirect_stderr, - }, - writer, - ) - } - // These calls have no pipeline data, so they're just the same on both sides - EngineCall::GetConfig => (EngineCall::GetConfig, Default::default()), - EngineCall::GetPluginConfig => (EngineCall::GetPluginConfig, Default::default()), - EngineCall::GetEnvVar(name) => (EngineCall::GetEnvVar(name), Default::default()), - EngineCall::GetEnvVars => (EngineCall::GetEnvVars, Default::default()), - EngineCall::GetCurrentDir => (EngineCall::GetCurrentDir, Default::default()), - EngineCall::AddEnvVar(name, value) => { - (EngineCall::AddEnvVar(name, value), Default::default()) - } - }; + let mut writer = None; + + let call = call.map_data(|input| { + let (input_header, input_writer) = self.init_write_pipeline_data(input)?; + writer = Some(input_writer); + Ok(input_header) + })?; // Register the channel self.state @@ -486,7 +457,7 @@ impl EngineInterface { self.write(PluginOutput::EngineCall { context, id, call })?; self.flush()?; - Ok((writer, rx)) + Ok((writer.unwrap_or_default(), rx)) } /// Perform an engine call. Input and output streams are handled. diff --git a/crates/nu-plugin/src/plugin/interface/plugin.rs b/crates/nu-plugin/src/plugin/interface/plugin.rs index 3e678c9440..5d1288166f 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin.rs @@ -460,15 +460,8 @@ impl InterfaceManager for PluginInterfaceManager { }, PluginOutput::CallResponse(id, response) => { // Handle reading the pipeline data, if any - let response = match response { - PluginCallResponse::Error(err) => PluginCallResponse::Error(err), - PluginCallResponse::Signature(sigs) => PluginCallResponse::Signature(sigs), - PluginCallResponse::Ordering(ordering) => { - PluginCallResponse::Ordering(ordering) - } - PluginCallResponse::PipelineData(data) => { - // If there's an error with initializing this stream, change it to a plugin - // error response, but send it anyway + let response = response + .map_data(|data| { let ctrlc = self.get_ctrlc(id)?; // Register the streams in the response @@ -476,12 +469,13 @@ impl InterfaceManager for PluginInterfaceManager { self.recv_stream_started(id, stream_id); } - match self.read_pipeline_data(data, ctrlc.as_ref()) { - Ok(data) => PluginCallResponse::PipelineData(data), - Err(err) => PluginCallResponse::Error(err.into()), - } - } - }; + self.read_pipeline_data(data, ctrlc.as_ref()) + }) + .unwrap_or_else(|err| { + // If there's an error with initializing this stream, change it to a plugin + // error response, but send it anyway + PluginCallResponse::Error(err.into()) + }); let result = self.send_plugin_call_response(id, response); if result.is_ok() { // When a call ends, it releases a lock on the GC @@ -493,36 +487,19 @@ impl InterfaceManager for PluginInterfaceManager { } PluginOutput::EngineCall { context, id, call } => { // Handle reading the pipeline data, if any - let ctrlc = self.get_ctrlc(context)?; - let call = match call { - EngineCall::GetConfig => Ok(EngineCall::GetConfig), - EngineCall::GetPluginConfig => Ok(EngineCall::GetPluginConfig), - EngineCall::GetEnvVar(name) => Ok(EngineCall::GetEnvVar(name)), - EngineCall::GetEnvVars => Ok(EngineCall::GetEnvVars), - EngineCall::GetCurrentDir => Ok(EngineCall::GetCurrentDir), - EngineCall::AddEnvVar(name, value) => Ok(EngineCall::AddEnvVar(name, value)), - EngineCall::EvalClosure { - closure, - mut positional, - input, - redirect_stdout, - redirect_stderr, - } => { - // Add source to any plugin custom values in the arguments - for arg in positional.iter_mut() { - PluginCustomValue::add_source(arg, &self.state.source); - } - self.read_pipeline_data(input, ctrlc.as_ref()).map(|input| { - EngineCall::EvalClosure { - closure, - positional, - input, - redirect_stdout, - redirect_stderr, - } - }) + let mut call = call.map_data(|input| { + let ctrlc = self.get_ctrlc(context)?; + self.read_pipeline_data(input, ctrlc.as_ref()) + }); + // Add source to any plugin custom values in the arguments + if let Ok(EngineCall::EvalClosure { + ref mut positional, .. + }) = call + { + for arg in positional.iter_mut() { + PluginCustomValue::add_source(arg, &self.state.source); } - }; + } match call { Ok(call) => self.send_engine_call(context, id, call), // If there was an error with setting up the call, just write the error @@ -603,16 +580,12 @@ impl PluginInterface { response: EngineCallResponse, ) -> Result<(), ShellError> { // Set up any stream if necessary - let (response, writer) = match response { - EngineCallResponse::PipelineData(data) => { - let (header, writer) = self.init_write_pipeline_data(data)?; - (EngineCallResponse::PipelineData(header), Some(writer)) - } - // No pipeline data: - EngineCallResponse::Error(err) => (EngineCallResponse::Error(err), None), - EngineCallResponse::Config(config) => (EngineCallResponse::Config(config), None), - EngineCallResponse::ValueMap(map) => (EngineCallResponse::ValueMap(map), None), - }; + let mut writer = None; + let response = response.map_data(|data| { + let (data_header, data_writer) = self.init_write_pipeline_data(data)?; + writer = Some(data_writer); + Ok(data_header) + })?; // Write the response, including the pipeline data header if present self.write(PluginInput::EngineCallResponse(id, response))?; @@ -753,20 +726,18 @@ impl PluginInterface { let resp = handle_engine_call(engine_call, context).unwrap_or_else(EngineCallResponse::Error); // Handle stream - let (resp, writer) = match resp { - EngineCallResponse::Error(error) => (EngineCallResponse::Error(error), None), - EngineCallResponse::Config(config) => (EngineCallResponse::Config(config), None), - EngineCallResponse::ValueMap(map) => (EngineCallResponse::ValueMap(map), None), - EngineCallResponse::PipelineData(data) => { - match self.init_write_pipeline_data(data) { - Ok((header, writer)) => { - (EngineCallResponse::PipelineData(header), Some(writer)) - } - // just respond with the error if we fail to set it up - Err(err) => (EngineCallResponse::Error(err), None), - } - } - }; + let mut writer = None; + let resp = resp + .map_data(|data| { + let (data_header, data_writer) = self.init_write_pipeline_data(data)?; + writer = Some(data_writer); + Ok(data_header) + }) + .unwrap_or_else(|err| { + // If we fail to set up the response write, change to an error response here + writer = None; + EngineCallResponse::Error(err) + }); // Write the response, then the stream self.write(PluginInput::EngineCallResponse(engine_call_id, resp))?; self.flush()?; diff --git a/crates/nu-plugin/src/protocol/mod.rs b/crates/nu-plugin/src/protocol/mod.rs index d6ad73e857..6a636f268a 100644 --- a/crates/nu-plugin/src/protocol/mod.rs +++ b/crates/nu-plugin/src/protocol/mod.rs @@ -43,6 +43,20 @@ pub struct CallInfo { pub input: D, } +impl CallInfo { + /// Convert the type of `input` from `D` to `T`. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(CallInfo { + name: self.name, + call: self.call, + input: f(self.input)?, + }) + } +} + /// The initial (and perhaps only) part of any [`nu_protocol::PipelineData`] sent over the wire. /// /// This may contain a single value, or may initiate a stream with a [`StreamId`]. @@ -128,6 +142,23 @@ pub enum PluginCall { CustomValueOp(Spanned, CustomValueOp), } +impl PluginCall { + /// Convert the data type from `D` to `T`. The function will not be called if the variant does + /// not contain data. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(match self { + PluginCall::Signature => PluginCall::Signature, + PluginCall::Run(call) => PluginCall::Run(call.map_data(f)?), + PluginCall::CustomValueOp(custom_value, op) => { + PluginCall::CustomValueOp(custom_value, op) + } + }) + } +} + /// Operations supported for custom values. #[derive(Serialize, Deserialize, Debug, Clone)] pub enum CustomValueOp { @@ -337,6 +368,22 @@ pub enum PluginCallResponse { PipelineData(D), } +impl PluginCallResponse { + /// Convert the data type from `D` to `T`. The function will not be called if the variant does + /// not contain data. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(match self { + PluginCallResponse::Error(err) => PluginCallResponse::Error(err), + PluginCallResponse::Signature(sigs) => PluginCallResponse::Signature(sigs), + PluginCallResponse::Ordering(ordering) => PluginCallResponse::Ordering(ordering), + PluginCallResponse::PipelineData(input) => PluginCallResponse::PipelineData(f(input)?), + }) + } +} + impl PluginCallResponse { /// Construct a plugin call response with a single value pub fn value(value: Value) -> PluginCallResponse { @@ -494,6 +541,35 @@ impl EngineCall { EngineCall::EvalClosure { .. } => "EvalClosure", } } + + /// Convert the data type from `D` to `T`. The function will not be called if the variant does + /// not contain data. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(match self { + EngineCall::GetConfig => EngineCall::GetConfig, + EngineCall::GetPluginConfig => EngineCall::GetPluginConfig, + EngineCall::GetEnvVar(name) => EngineCall::GetEnvVar(name), + EngineCall::GetEnvVars => EngineCall::GetEnvVars, + EngineCall::GetCurrentDir => EngineCall::GetCurrentDir, + EngineCall::AddEnvVar(name, value) => EngineCall::AddEnvVar(name, value), + EngineCall::EvalClosure { + closure, + positional, + input, + redirect_stdout, + redirect_stderr, + } => EngineCall::EvalClosure { + closure, + positional, + input: f(input)?, + redirect_stdout, + redirect_stderr, + }, + }) + } } /// The response to an [EngineCall]. The type parameter determines the output type for pipeline @@ -506,6 +582,22 @@ pub enum EngineCallResponse { ValueMap(HashMap), } +impl EngineCallResponse { + /// Convert the data type from `D` to `T`. The function will not be called if the variant does + /// not contain data. + pub(crate) fn map_data( + self, + f: impl FnOnce(D) -> Result, + ) -> Result, ShellError> { + Ok(match self { + EngineCallResponse::Error(err) => EngineCallResponse::Error(err), + EngineCallResponse::PipelineData(data) => EngineCallResponse::PipelineData(f(data)?), + EngineCallResponse::Config(config) => EngineCallResponse::Config(config), + EngineCallResponse::ValueMap(map) => EngineCallResponse::ValueMap(map), + }) + } +} + impl EngineCallResponse { /// Build an [`EngineCallResponse::PipelineData`] from a [`Value`] pub(crate) fn value(value: Value) -> EngineCallResponse {