diff --git a/crates/nu_plugin_polars/src/cache.rs b/crates/nu_plugin_polars/src/cache.rs index a1763fbba2..d295f449a9 100644 --- a/crates/nu_plugin_polars/src/cache.rs +++ b/crates/nu_plugin_polars/src/cache.rs @@ -3,19 +3,28 @@ use std::{ sync::{Mutex, MutexGuard}, }; +use chrono::{DateTime, FixedOffset, Local}; use nu_plugin::EngineInterface; -use nu_protocol::{LabeledError, ShellError}; +use nu_protocol::{LabeledError, ShellError, Span}; use uuid::Uuid; use crate::{plugin_debug, values::PolarsPluginObject, PolarsPlugin}; +#[derive(Debug, Clone)] +pub struct CacheValue { + pub uuid: Uuid, + pub value: PolarsPluginObject, + pub created: DateTime, + pub span: Span, +} + #[derive(Default)] pub struct Cache { - cache: Mutex>, + cache: Mutex>, } impl Cache { - fn lock(&self) -> Result>, ShellError> { + fn lock(&self) -> Result>, ShellError> { self.cache.lock().map_err(|e| ShellError::GenericError { error: format!("error acquiring cache lock: {e}"), msg: "".into(), @@ -31,7 +40,7 @@ impl Cache { &self, maybe_engine: Option<&EngineInterface>, uuid: &Uuid, - ) -> Result, ShellError> { + ) -> Result, ShellError> { let mut lock = self.lock()?; let removed = lock.remove(uuid); plugin_debug!("PolarsPlugin: removing {uuid} from cache: {removed:?}"); @@ -55,7 +64,8 @@ impl Cache { maybe_engine: Option<&EngineInterface>, uuid: Uuid, value: PolarsPluginObject, - ) -> Result, ShellError> { + span: Span, + ) -> Result, ShellError> { let mut lock = self.lock()?; plugin_debug!("PolarsPlugin: Inserting {uuid} into cache: {value:?}"); // turn off plugin gc the first time an entry is added to the cache @@ -68,12 +78,18 @@ impl Cache { } _ => (), }; - let result = lock.insert(uuid, value); + let cache_value = CacheValue { + uuid, + value, + created: Local::now().into(), + span, + }; + let result = lock.insert(uuid, cache_value); drop(lock); Ok(result) } - pub fn get(&self, uuid: &Uuid) -> Result, ShellError> { + pub fn get(&self, uuid: &Uuid) -> Result, ShellError> { let lock = self.lock()?; let result = lock.get(uuid).cloned(); drop(lock); @@ -82,12 +98,11 @@ impl Cache { pub fn process_entries(&self, mut func: F) -> Result, ShellError> where - F: FnMut((&Uuid, &PolarsPluginObject)) -> Result, + F: FnMut((&Uuid, &CacheValue)) -> Result, { let lock = self.lock()?; let mut vals: Vec = Vec::new(); for entry in lock.iter() { - eprintln!("entry: {:?}", entry); let val = func(entry)?; vals.push(val); } @@ -103,18 +118,24 @@ pub trait Cacheable: Sized + Clone { fn from_cache_value(cv: PolarsPluginObject) -> Result; - fn cache(self, plugin: &PolarsPlugin, engine: &EngineInterface) -> Result { + fn cache( + self, + plugin: &PolarsPlugin, + engine: &EngineInterface, + span: Span, + ) -> Result { plugin.cache.insert( Some(engine), self.cache_id().to_owned(), self.to_cache_value()?, + span, )?; Ok(self) } fn get_cached(plugin: &PolarsPlugin, id: &Uuid) -> Result, ShellError> { if let Some(cache_value) = plugin.cache.get(id)? { - Ok(Some(Self::from_cache_value(cache_value)?)) + Ok(Some(Self::from_cache_value(cache_value.value)?)) } else { Ok(None) } diff --git a/crates/nu_plugin_polars/src/dataframe/eager/list.rs b/crates/nu_plugin_polars/src/dataframe/eager/list.rs index 1548013415..68390ae2bc 100644 --- a/crates/nu_plugin_polars/src/dataframe/eager/list.rs +++ b/crates/nu_plugin_polars/src/dataframe/eager/list.rs @@ -35,59 +35,86 @@ impl PluginCommand for ListDF { fn run( &self, plugin: &Self::Plugin, - _engine: &EngineInterface, + engine: &EngineInterface, call: &EvaluatedCall, _input: PipelineData, ) -> Result { - let vals = plugin.cache.process_entries(|(key, value)| match value { - PolarsPluginObject::NuDataFrame(df) => Ok(Some(Value::record( - record! { - "key" => Value::string(key.to_string(), call.head), - "columns" => Value::int(df.as_ref().width() as i64, call.head), - "rows" => Value::int(df.as_ref().height() as i64, call.head), - "type" => Value::string("NuDataFrame", call.head), - }, - call.head, - ))), - PolarsPluginObject::NuLazyFrame(lf) => { - let lf = lf.clone().collect(call.head)?; - Ok(Some(Value::record( + let vals = plugin.cache.process_entries(|(key, value)| { + let span_contents = engine.get_span_contents(value.span)?; + let span_contents = String::from_utf8_lossy(&span_contents); + match &value.value { + PolarsPluginObject::NuDataFrame(df) => Ok(Some(Value::record( record! { "key" => Value::string(key.to_string(), call.head), - "columns" => Value::int(lf.as_ref().width() as i64, call.head), - "rows" => Value::int(lf.as_ref().height() as i64, call.head), - "type" => Value::string("NuLazyFrame", call.head), + "created" => Value::date(value.created, call.head), + "columns" => Value::int(df.as_ref().width() as i64, call.head), + "rows" => Value::int(df.as_ref().height() as i64, call.head), + "type" => Value::string("NuDataFrame", call.head), + "estimated_size" => Value::filesize(df.to_polars().estimated_size() as i64, call.head), + "span_contents" => Value::string(span_contents, value.span), + "span_start" => Value::int(value.span.start as i64, call.head), + "span_end" => Value::int(value.span.end as i64, call.head), }, call.head, - ))) + ))), + PolarsPluginObject::NuLazyFrame(lf) => { + let lf = lf.clone().collect(call.head)?; + Ok(Some(Value::record( + record! { + "key" => Value::string(key.to_string(), call.head), + "created" => Value::date(value.created, call.head), + "columns" => Value::int(lf.as_ref().width() as i64, call.head), + "rows" => Value::int(lf.as_ref().height() as i64, call.head), + "type" => Value::string("NuLazyFrame", call.head), + "estimated_size" => Value::filesize(lf.to_polars().estimated_size() as i64, call.head), + "span_contents" => Value::string(span_contents, value.span), + "span_start" => Value::int(value.span.start as i64, call.head), + "span_end" => Value::int(value.span.end as i64, call.head), + }, + call.head, + ))) + } + PolarsPluginObject::NuExpression(_) => Ok(Some(Value::record( + record! { + "key" => Value::string(key.to_string(), call.head), + "created" => Value::date(value.created, call.head), + "columns" => Value::nothing(call.head), + "rows" => Value::nothing(call.head), + "type" => Value::string("NuExpression", call.head), + "estimated_size" => Value::nothing(call.head), + "span_contents" => Value::string(span_contents, value.span), + "span_start" => Value::int(value.span.start as i64, call.head), + "span_end" => Value::int(value.span.end as i64, call.head), + }, + call.head, + ))), + PolarsPluginObject::NuLazyGroupBy(_) => Ok(Some(Value::record( + record! { + "key" => Value::string(key.to_string(), call.head), + "columns" => Value::nothing(call.head), + "rows" => Value::nothing(call.head), + "type" => Value::string("NuLazyGroupBy", call.head), + "estimated_size" => Value::nothing(call.head), + "span_contents" => Value::string(span_contents, call.head), + "span_start" => Value::int(call.head.start as i64, call.head), + "span_end" => Value::int(call.head.end as i64, call.head), + }, + call.head, + ))), + PolarsPluginObject::NuWhen(_) => Ok(Some(Value::record( + record! { + "key" => Value::string(key.to_string(), call.head), + "columns" => Value::nothing(call.head), + "rows" => Value::nothing(call.head), + "type" => Value::string("NuWhen", call.head), + "estimated_size" => Value::nothing(call.head), + "span_contents" => Value::string(span_contents.to_string(), call.head), + "span_start" => Value::int(call.head.start as i64, call.head), + "span_end" => Value::int(call.head.end as i64, call.head), + }, + call.head, + ))), } - PolarsPluginObject::NuExpression(_) => Ok(Some(Value::record( - record! { - "key" => Value::string(key.to_string(), call.head), - "columns" => Value::nothing(call.head), - "rows" => Value::nothing(call.head), - "type" => Value::string("NuExpression", call.head), - }, - call.head, - ))), - PolarsPluginObject::NuLazyGroupBy(_) => Ok(Some(Value::record( - record! { - "key" => Value::string(key.to_string(), call.head), - "columns" => Value::nothing(call.head), - "rows" => Value::nothing(call.head), - "type" => Value::string("NuLazyGroupBy", call.head), - }, - call.head, - ))), - PolarsPluginObject::NuWhen(_) => Ok(Some(Value::record( - record! { - "key" => Value::string(key.to_string(), call.head), - "columns" => Value::nothing(call.head), - "rows" => Value::nothing(call.head), - "type" => Value::string("NuWhen", call.head), - }, - call.head, - ))), })?; let vals = vals.into_iter().flatten().collect(); let list = Value::list(vals, call.head); diff --git a/crates/nu_plugin_polars/src/dataframe/lazy/collect.rs b/crates/nu_plugin_polars/src/dataframe/lazy/collect.rs index 9cfe903497..c14098e28b 100644 --- a/crates/nu_plugin_polars/src/dataframe/lazy/collect.rs +++ b/crates/nu_plugin_polars/src/dataframe/lazy/collect.rs @@ -66,14 +66,16 @@ impl PluginCommand for LazyCollect { PolarsPluginObject::NuLazyFrame(lazy) => { let eager = lazy.collect(call.head)?; Ok(PipelineData::Value( - eager.cache(plugin, engine)?.into_value(call.head), + eager + .cache(plugin, engine, call.head)? + .into_value(call.head), None, )) } PolarsPluginObject::NuDataFrame(df) => { // just return the dataframe, add to cache again to be safe Ok(PipelineData::Value( - df.cache(plugin, engine)?.into_value(call.head), + df.cache(plugin, engine, call.head)?.into_value(call.head), None, )) } diff --git a/crates/nu_plugin_polars/src/dataframe/lazy/to_lazy.rs b/crates/nu_plugin_polars/src/dataframe/lazy/to_lazy.rs index 2991a0fb33..5d279ee38a 100644 --- a/crates/nu_plugin_polars/src/dataframe/lazy/to_lazy.rs +++ b/crates/nu_plugin_polars/src/dataframe/lazy/to_lazy.rs @@ -54,7 +54,7 @@ impl PluginCommand for ToLazyFrame { let df = NuDataFrame::try_from_iter(plugin, input.into_iter(), maybe_schema)?; let lazy = NuLazyFrame::from_dataframe(df); Ok(PipelineData::Value( - lazy.cache(plugin, engine)?.into_value(call.head), + lazy.cache(plugin, engine, call.head)?.into_value(call.head), None, )) } diff --git a/crates/nu_plugin_polars/src/dataframe/values/mod.rs b/crates/nu_plugin_polars/src/dataframe/values/mod.rs index 6f984c6bf8..5fa26ff896 100644 --- a/crates/nu_plugin_polars/src/dataframe/values/mod.rs +++ b/crates/nu_plugin_polars/src/dataframe/values/mod.rs @@ -318,14 +318,14 @@ pub fn cache_and_to_value( // if it was from a lazy value, make it lazy again PolarsPluginObject::NuDataFrame(df) if df.from_lazy => { let df = df.lazy(); - Ok(df.cache(plugin, engine)?.into_value(span)) + Ok(df.cache(plugin, engine, span)?.into_value(span)) } // if it was from an eager value, make it eager again PolarsPluginObject::NuLazyFrame(lf) if lf.from_eager => { let lf = lf.collect(span)?; - Ok(lf.cache(plugin, engine)?.into_value(span)) + Ok(lf.cache(plugin, engine, span)?.into_value(span)) } - _ => Ok(cv.cache(plugin, engine)?.into_value(span)), + _ => Ok(cv.cache(plugin, engine, span)?.into_value(span)), } } diff --git a/crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/custom_value.rs b/crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/custom_value.rs index a76683c114..b94f812dac 100644 --- a/crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/custom_value.rs +++ b/crates/nu_plugin_polars/src/dataframe/values/nu_dataframe/custom_value.rs @@ -81,7 +81,7 @@ impl PolarsPluginCustomValue for NuDataFrameCustomValue { let df = NuDataFrame::try_from_custom_value(plugin, self)?; Ok(df .compute_with_value(plugin, lhs_span, operator.item, operator.span, &right)? - .cache(plugin, engine)? + .cache(plugin, engine, lhs_span)? .into_value(lhs_span)) } @@ -105,7 +105,9 @@ impl PolarsPluginCustomValue for NuDataFrameCustomValue { ) -> Result { let df = NuDataFrame::try_from_custom_value(plugin, self)?; let column = df.column(&column_name.item, self_span)?; - Ok(column.cache(plugin, engine)?.into_value(self_span)) + Ok(column + .cache(plugin, engine, self_span)? + .into_value(self_span)) } fn custom_value_partial_cmp( diff --git a/crates/nu_plugin_polars/src/dataframe/values/nu_expression/custom_value.rs b/crates/nu_plugin_polars/src/dataframe/values/nu_expression/custom_value.rs index 8f5f478c8c..2aae66c36f 100644 --- a/crates/nu_plugin_polars/src/dataframe/values/nu_expression/custom_value.rs +++ b/crates/nu_plugin_polars/src/dataframe/values/nu_expression/custom_value.rs @@ -135,32 +135,32 @@ fn with_operator( Operator::Comparison(Comparison::Equal) => Ok(left .clone() .apply_with_expr(right.clone(), Expr::eq) - .cache(plugin, engine)? + .cache(plugin, engine, lhs_span)? .into_value(lhs_span)), Operator::Comparison(Comparison::NotEqual) => Ok(left .clone() .apply_with_expr(right.clone(), Expr::neq) - .cache(plugin, engine)? + .cache(plugin, engine, lhs_span)? .into_value(lhs_span)), Operator::Comparison(Comparison::GreaterThan) => Ok(left .clone() .apply_with_expr(right.clone(), Expr::gt) - .cache(plugin, engine)? + .cache(plugin, engine, lhs_span)? .into_value(lhs_span)), Operator::Comparison(Comparison::GreaterThanOrEqual) => Ok(left .clone() .apply_with_expr(right.clone(), Expr::gt_eq) - .cache(plugin, engine)? + .cache(plugin, engine, lhs_span)? .into_value(lhs_span)), Operator::Comparison(Comparison::LessThan) => Ok(left .clone() .apply_with_expr(right.clone(), Expr::lt) - .cache(plugin, engine)? + .cache(plugin, engine, lhs_span)? .into_value(lhs_span)), Operator::Comparison(Comparison::LessThanOrEqual) => Ok(left .clone() .apply_with_expr(right.clone(), Expr::lt_eq) - .cache(plugin, engine)? + .cache(plugin, engine, lhs_span)? .into_value(lhs_span)), _ => Err(ShellError::OperatorMismatch { op_span, @@ -185,7 +185,7 @@ where { let expr: NuExpression = f(left.as_ref().clone(), right.as_ref().clone()).into(); - Ok(expr.cache(plugin, engine)?.into_value(span)) + Ok(expr.cache(plugin, engine, span)?.into_value(span)) } impl PolarsPluginCustomValue for NuExpressionCustomValue { diff --git a/crates/nu_plugin_polars/src/lib.rs b/crates/nu_plugin_polars/src/lib.rs index 3b8679b606..03685505b7 100644 --- a/crates/nu_plugin_polars/src/lib.rs +++ b/crates/nu_plugin_polars/src/lib.rs @@ -180,7 +180,7 @@ pub mod test { use crate::values::PolarsPluginObject; use nu_command::IntoDatetime; use nu_plugin_test_support::PluginTest; - use nu_protocol::ShellError; + use nu_protocol::{ShellError, Span}; pub fn test_polars_plugin_command(command: &impl PluginCommand) -> Result<(), ShellError> { let mut plugin = PolarsPlugin::default(); @@ -193,7 +193,10 @@ pub mod test { // if it's a polars plugin object, try to cache it if let Ok(obj) = PolarsPluginObject::try_from_value(&plugin, result) { let id = obj.id(); - plugin.cache.insert(None, id, obj).unwrap(); + plugin + .cache + .insert(None, id, obj, Span::test_data()) + .unwrap(); } } }