From ad6deadf247164016318ed8002fd9b310fe8f8b8 Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Thu, 2 May 2024 16:51:16 -0700 Subject: [PATCH] Flush on every plugin `Data` message (#12728) # Description This helps to ensure data produced on a stream is immediately available to the consumer of the stream. The BufWriter introduced for performance reasons in 0.93 exposed the behavior that data messages wouldn't make it to the other side until they filled the buffer in @cablehead's [`nu_plugin_from_sse`](https://github.com/cablehead/nu_plugin_from_sse). I had originally not flushed on every `Data` message because I figured that it isn't really critical that the other side sees those messages immediately, since they're not used for control and they are flushed when waiting for acknowledgement or when the buffer is too full anyway. Increasing the amount of data that can be sent with a single underlying write increases performance, but this interferes with some plugins that want to use streams in a more real-time way. In the future I would like to make this configurable, maybe even per-command, so that a command can decide what the priority is. But for now I think this is reasonable. In the worst case, this decreases performance by about 40%, when sending very small values (just numbers). But for larger values, this PR actually increases performance by about 20%, because I've increased the buffer size about 2x to 16,384 bytes. The previous value of 8,192 bytes was too small to fit a full buffer coming from an external command, so doubling it makes sense, and now a write of a buffer from an external command can be done in exactly one write call, which I think makes sense. I'm doing this at the same time because flushing each data message would make it very likely that each individual data message from an external stream would require exactly two writes rather than approximately one (amortized). Again, hopefully the tradeoff isn't too bad, and if it is I'll just make it configurable. # User-Facing Changes - Performance of plugin streams will be a bit different - Plugins that expect to send streams in real-time will work again # Tests + Formatting - :green_circle: `toolkit fmt` - :green_circle: `toolkit clippy` - :green_circle: `toolkit test` - :green_circle: `toolkit test stdlib` --- crates/nu-plugin-core/src/interface/stream/mod.rs | 7 +++++-- crates/nu-plugin-engine/src/init.rs | 5 ++++- crates/nu-plugin/src/plugin/mod.rs | 5 ++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/crates/nu-plugin-core/src/interface/stream/mod.rs b/crates/nu-plugin-core/src/interface/stream/mod.rs index e8f48939dd..541e1176a0 100644 --- a/crates/nu-plugin-core/src/interface/stream/mod.rs +++ b/crates/nu-plugin-core/src/interface/stream/mod.rs @@ -202,10 +202,13 @@ where if !self.ended { self.writer .write_stream_message(StreamMessage::Data(self.id, data.into()))?; + // Flush after each data message to ensure they do predictably appear on the other side + // when they're generated + // + // TODO: make the buffering configurable, as this is a factor for performance + self.writer.flush()?; // This implements flow control, so we don't write too many messages: if !self.signal.notify_sent()? { - // Flush the output, and then wait for acknowledgements - self.writer.flush()?; self.signal.wait_for_drain() } else { Ok(()) diff --git a/crates/nu-plugin-engine/src/init.rs b/crates/nu-plugin-engine/src/init.rs index 2092935fad..0ba70b49c0 100644 --- a/crates/nu-plugin-engine/src/init.rs +++ b/crates/nu-plugin-engine/src/init.rs @@ -24,7 +24,10 @@ use crate::{ PluginSource, }; -pub(crate) const OUTPUT_BUFFER_SIZE: usize = 8192; +/// This should be larger than the largest commonly sent message to avoid excessive fragmentation. +/// +/// The buffers coming from external streams are typically each 8192 bytes, so double that. +pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384; /// Spawn the command for a plugin, in the given `mode`. After spawning, it can be passed to /// [`make_plugin_interface()`] to get a [`PluginInterface`]. diff --git a/crates/nu-plugin/src/plugin/mod.rs b/crates/nu-plugin/src/plugin/mod.rs index c7283d10f3..0ec170f4cd 100644 --- a/crates/nu-plugin/src/plugin/mod.rs +++ b/crates/nu-plugin/src/plugin/mod.rs @@ -28,8 +28,11 @@ mod interface; pub use command::{create_plugin_signature, PluginCommand, SimplePluginCommand}; pub use interface::{EngineInterface, EngineInterfaceManager}; +/// This should be larger than the largest commonly sent message to avoid excessive fragmentation. +/// +/// The buffers coming from external streams are typically each 8192 bytes, so double that. #[allow(dead_code)] -pub(crate) const OUTPUT_BUFFER_SIZE: usize = 8192; +pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384; /// The API for a Nushell plugin ///