Added the ability to turn on performance debugging through and env variable

This commit is contained in:
Jack Wright 2024-06-20 14:17:04 -07:00
parent 7d2d573eb8
commit e417bc7035
4 changed files with 154 additions and 29 deletions

View File

@ -13,7 +13,7 @@ use nu_plugin::{EngineInterface, PluginCommand};
use nu_protocol::{LabeledError, ShellError, Span}; use nu_protocol::{LabeledError, ShellError, Span};
use uuid::Uuid; use uuid::Uuid;
use crate::{plugin_debug, values::PolarsPluginObject, PolarsPlugin}; use crate::{plugin_debug, values::PolarsPluginObject, EngineWrapper, PolarsPlugin};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CacheValue { pub struct CacheValue {
@ -47,7 +47,7 @@ impl Cache {
/// * `force` - Delete even if there are multiple references /// * `force` - Delete even if there are multiple references
pub fn remove( pub fn remove(
&self, &self,
maybe_engine: Option<&EngineInterface>, engine: impl EngineWrapper,
key: &Uuid, key: &Uuid,
force: bool, force: bool,
) -> Result<Option<CacheValue>, ShellError> { ) -> Result<Option<CacheValue>, ShellError> {
@ -60,22 +60,23 @@ impl Cache {
let removed = if force || reference_count.unwrap_or_default() < 1 { let removed = if force || reference_count.unwrap_or_default() < 1 {
let removed = lock.remove(key); let removed = lock.remove(key);
plugin_debug!("PolarsPlugin: removing {key} from cache: {removed:?}"); plugin_debug!(
engine,
"PolarsPlugin: removing {key} from cache: {removed:?}"
);
removed removed
} else { } else {
plugin_debug!("PolarsPlugin: decrementing reference count for {key}"); plugin_debug!(
engine,
"PolarsPlugin: decrementing reference count for {key}"
);
None None
}; };
// Once there are no more entries in the cache // Once there are no more entries in the cache
// we can turn plugin gc back on // we can turn plugin gc back on
match maybe_engine { plugin_debug!(engine, "PolarsPlugin: Cache is empty enabling GC");
Some(engine) if lock.is_empty() => { engine.set_gc_disabled(false).map_err(LabeledError::from)?;
plugin_debug!("PolarsPlugin: Cache is empty enabling GC");
engine.set_gc_disabled(false).map_err(LabeledError::from)?;
}
_ => (),
};
drop(lock); drop(lock);
Ok(removed) Ok(removed)
} }
@ -84,23 +85,21 @@ impl Cache {
/// The maybe_engine parameter is required outside of testing /// The maybe_engine parameter is required outside of testing
pub fn insert( pub fn insert(
&self, &self,
maybe_engine: Option<&EngineInterface>, engine: impl EngineWrapper,
uuid: Uuid, uuid: Uuid,
value: PolarsPluginObject, value: PolarsPluginObject,
span: Span, span: Span,
) -> Result<Option<CacheValue>, ShellError> { ) -> Result<Option<CacheValue>, ShellError> {
let mut lock = self.lock()?; let mut lock = self.lock()?;
plugin_debug!("PolarsPlugin: Inserting {uuid} into cache: {value:?}"); plugin_debug!(
engine,
"PolarsPlugin: Inserting {uuid} into cache: {value:?}"
);
// turn off plugin gc the first time an entry is added to the cache // turn off plugin gc the first time an entry is added to the cache
// as we don't want the plugin to be garbage collected if there // as we don't want the plugin to be garbage collected if there
// is any live data // is any live data
match maybe_engine { plugin_debug!(engine, "PolarsPlugin: Cache has values disabling GC");
Some(engine) if lock.is_empty() => { engine.set_gc_disabled(true).map_err(LabeledError::from)?;
plugin_debug!("PolarsPlugin: Cache has values disabling GC");
engine.set_gc_disabled(true).map_err(LabeledError::from)?;
}
_ => (),
};
let cache_value = CacheValue { let cache_value = CacheValue {
uuid, uuid,
value, value,
@ -154,7 +153,7 @@ pub trait Cacheable: Sized + Clone {
span: Span, span: Span,
) -> Result<Self, ShellError> { ) -> Result<Self, ShellError> {
plugin.cache.insert( plugin.cache.insert(
Some(engine), engine,
self.cache_id().to_owned(), self.cache_id().to_owned(),
self.to_cache_value()?, self.to_cache_value()?,
span, span,

View File

@ -63,7 +63,7 @@ fn remove_cache_entry(
let key = as_uuid(key, span)?; let key = as_uuid(key, span)?;
let msg = plugin let msg = plugin
.cache .cache
.remove(Some(engine), &key, true)? .remove(engine, &key, true)?
.map(|_| format!("Removed: {key}")) .map(|_| format!("Removed: {key}"))
.unwrap_or_else(|| format!("No value found for key: {key}")); .unwrap_or_else(|| format!("No value found for key: {key}"));
Ok(Value::string(msg, span)) Ok(Value::string(msg, span))

View File

@ -1,5 +1,6 @@
use crate::{ use crate::{
dataframe::values::NuSchema, dataframe::values::NuSchema,
perf,
values::{CustomValueSupport, NuLazyFrame}, values::{CustomValueSupport, NuLazyFrame},
PolarsPlugin, PolarsPlugin,
}; };
@ -378,7 +379,10 @@ fn from_jsonl(
.get_flag("schema")? .get_flag("schema")?
.map(|schema| NuSchema::try_from(&schema)) .map(|schema| NuSchema::try_from(&schema))
.transpose()?; .transpose()?;
if call.has_flag("lazy")? { if call.has_flag("lazy")? {
let start_time = std::time::Instant::now();
let df = LazyJsonLineReader::new(file_path) let df = LazyJsonLineReader::new(file_path)
.with_infer_schema_length(infer_schema) .with_infer_schema_length(infer_schema)
.with_schema(maybe_schema.map(|s| s.into())) .with_schema(maybe_schema.map(|s| s.into()))
@ -390,6 +394,16 @@ fn from_jsonl(
help: None, help: None,
inner: vec![], inner: vec![],
})?; })?;
perf(
engine,
"Lazy json lines dataframe open",
start_time,
file!(),
line!(),
column!(),
);
let df = NuLazyFrame::new(false, df); let df = NuLazyFrame::new(false, df);
df.cache_and_to_value(plugin, engine, call.head) df.cache_and_to_value(plugin, engine, call.head)
} else { } else {
@ -410,6 +424,8 @@ fn from_jsonl(
None => reader, None => reader,
}; };
let start_time = std::time::Instant::now();
let df: NuDataFrame = reader let df: NuDataFrame = reader
.finish() .finish()
.map_err(|e| ShellError::GenericError { .map_err(|e| ShellError::GenericError {
@ -421,6 +437,15 @@ fn from_jsonl(
})? })?
.into(); .into();
perf(
engine,
"Eager json lines dataframe open",
start_time,
file!(),
line!(),
column!(),
);
df.cache_and_to_value(plugin, engine, call.head) df.cache_and_to_value(plugin, engine, call.head)
} }
} }
@ -484,6 +509,7 @@ fn from_csv(
Some(r) => csv_reader.with_skip_rows(r), Some(r) => csv_reader.with_skip_rows(r),
}; };
let start_time = std::time::Instant::now();
let df: NuLazyFrame = csv_reader let df: NuLazyFrame = csv_reader
.finish() .finish()
.map_err(|e| ShellError::GenericError { .map_err(|e| ShellError::GenericError {
@ -495,8 +521,18 @@ fn from_csv(
})? })?
.into(); .into();
perf(
engine,
"Lazy CSV dataframe open",
start_time,
file!(),
line!(),
column!(),
);
df.cache_and_to_value(plugin, engine, call.head) df.cache_and_to_value(plugin, engine, call.head)
} else { } else {
let start_time = std::time::Instant::now();
let df = CsvReadOptions::default() let df = CsvReadOptions::default()
.with_has_header(!no_header) .with_has_header(!no_header)
.with_infer_schema_length(infer_schema) .with_infer_schema_length(infer_schema)
@ -529,6 +565,16 @@ fn from_csv(
help: None, help: None,
inner: vec![], inner: vec![],
})?; })?;
perf(
engine,
"Eager CSV dataframe open",
start_time,
file!(),
line!(),
column!(),
);
let df = NuDataFrame::new(false, df); let df = NuDataFrame::new(false, df);
df.cache_and_to_value(plugin, engine, call.head) df.cache_and_to_value(plugin, engine, call.head)
} }

View File

@ -8,25 +8,89 @@ use nu_plugin::{EngineInterface, Plugin, PluginCommand};
mod cache; mod cache;
pub mod dataframe; pub mod dataframe;
pub use dataframe::*; pub use dataframe::*;
use nu_protocol::{ast::Operator, CustomValue, LabeledError, Spanned, Value}; use nu_protocol::{ast::Operator, CustomValue, LabeledError, ShellError, Span, Spanned, Value};
use crate::{ use crate::{
eager::eager_commands, expressions::expr_commands, lazy::lazy_commands, eager::eager_commands, expressions::expr_commands, lazy::lazy_commands,
series::series_commands, values::PolarsPluginCustomValue, series::series_commands, values::PolarsPluginCustomValue,
}; };
pub trait EngineWrapper {
fn get_env_var(&self, key: &str) -> Option<String>;
fn use_color(&self) -> bool;
fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError>;
}
impl EngineWrapper for &EngineInterface {
fn get_env_var(&self, key: &str) -> Option<String> {
EngineInterface::get_env_var(self, key)
.ok()
.flatten()
.map(|x| match x {
Value::String { val, .. } => val,
_ => "".to_string(),
})
}
fn use_color(&self) -> bool {
self.get_config()
.ok()
.and_then(|config| config.color_config.get("use_color").cloned())
.unwrap_or(Value::bool(false, Span::unknown()))
.is_true()
}
fn set_gc_disabled(&self, disabled: bool) -> Result<(), ShellError> {
EngineInterface::set_gc_disabled(self, disabled)
}
}
#[macro_export] #[macro_export]
macro_rules! plugin_debug { macro_rules! plugin_debug {
($($arg:tt)*) => {{ ($env_var_provider:tt, $($arg:tt)*) => {{
if std::env::var("POLARS_PLUGIN_DEBUG") if $env_var_provider.get_env_var("POLARS_PLUGIN_DEBUG")
.ok() .filter(|s| s == "1" || s == "true")
.filter(|x| x == "1" || x == "true")
.is_some() { .is_some() {
eprintln!($($arg)*); eprintln!($($arg)*);
} }
}}; }};
} }
pub fn perf(
env: impl EngineWrapper,
msg: &str,
dur: std::time::Instant,
file: &str,
line: u32,
column: u32,
) {
if env
.get_env_var("POLARS_PLUGIN_PERF")
.filter(|s| s == "1" || s == "true")
.is_some()
{
if env.use_color() {
eprintln!(
"perf: {}:{}:{} \x1b[32m{}\x1b[0m took \x1b[33m{:?}\x1b[0m",
file,
line,
column,
msg,
dur.elapsed(),
);
} else {
eprintln!(
"perf: {}:{}:{} {} took {:?}",
file,
line,
column,
msg,
dur.elapsed(),
);
}
}
}
#[derive(Default)] #[derive(Default)]
pub struct PolarsPlugin { pub struct PolarsPlugin {
pub(crate) cache: Cache, pub(crate) cache: Cache,
@ -52,7 +116,7 @@ impl Plugin for PolarsPlugin {
) -> Result<(), LabeledError> { ) -> Result<(), LabeledError> {
if !self.disable_cache_drop { if !self.disable_cache_drop {
let id = CustomValueType::try_from_custom_value(custom_value)?.id(); let id = CustomValueType::try_from_custom_value(custom_value)?.id();
let _ = self.cache.remove(Some(engine), &id, false); let _ = self.cache.remove(engine, &id, false);
} }
Ok(()) Ok(())
} }
@ -193,6 +257,22 @@ pub mod test {
} }
} }
struct TestEngineWrapper;
impl EngineWrapper for TestEngineWrapper {
fn get_env_var(&self, key: &str) -> Option<String> {
std::env::var(key).ok()
}
fn use_color(&self) -> bool {
false
}
fn set_gc_disabled(&self, _disabled: bool) -> Result<(), ShellError> {
Ok(())
}
}
pub fn test_polars_plugin_command(command: &impl PluginCommand) -> Result<(), ShellError> { pub fn test_polars_plugin_command(command: &impl PluginCommand) -> Result<(), ShellError> {
test_polars_plugin_command_with_decls(command, vec![]) test_polars_plugin_command_with_decls(command, vec![])
} }
@ -212,7 +292,7 @@ pub mod test {
let id = obj.id(); let id = obj.id();
plugin plugin
.cache .cache
.insert(None, id, obj, Span::test_data()) .insert(TestEngineWrapper {}, id, obj, Span::test_data())
.unwrap(); .unwrap();
} }
} }