fixing tests for unpivot and adding support for lazy dataframes
This commit is contained in:
parent
f1e5bcd2c2
commit
522e2cc141
|
@ -3,9 +3,12 @@ use nu_protocol::{
|
||||||
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Spanned,
|
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Spanned,
|
||||||
SyntaxShape, Type, Value,
|
SyntaxShape, Type, Value,
|
||||||
};
|
};
|
||||||
|
use polars::frame::explode::UnpivotArgs;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
dataframe::values::utils::convert_columns_string, values::CustomValueSupport, PolarsPlugin,
|
dataframe::values::utils::convert_columns_string,
|
||||||
|
values::{CustomValueSupport, NuLazyFrame, PolarsPluginObject},
|
||||||
|
PolarsPlugin,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::super::values::{Column, NuDataFrame};
|
use super::super::values::{Column, NuDataFrame};
|
||||||
|
@ -54,65 +57,127 @@ impl PluginCommand for UnpivotDF {
|
||||||
Type::Custom("dataframe".into()),
|
Type::Custom("dataframe".into()),
|
||||||
Type::Custom("dataframe".into()),
|
Type::Custom("dataframe".into()),
|
||||||
)
|
)
|
||||||
|
.switch(
|
||||||
|
"streamable",
|
||||||
|
"Whether or not to use the polars streaming engine. Only valid for lazy dataframes",
|
||||||
|
Some('s'),
|
||||||
|
)
|
||||||
.category(Category::Custom("dataframe".into()))
|
.category(Category::Custom("dataframe".into()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn examples(&self) -> Vec<Example> {
|
fn examples(&self) -> Vec<Example> {
|
||||||
vec![Example {
|
vec![
|
||||||
description: "unpivot dataframe",
|
Example {
|
||||||
example:
|
description: "unpivot on an eager dataframe",
|
||||||
"[[a b c d]; [x 1 4 a] [y 2 5 b] [z 3 6 c]] | polars into-df | polars unpivot -c [b c] -v [a d]",
|
example:
|
||||||
result: Some(
|
"[[a b c d]; [x 1 4 a] [y 2 5 b] [z 3 6 c]] | polars into-df | polars unpivot -c [b c] -v [a d]",
|
||||||
NuDataFrame::try_from_columns(vec![
|
result: Some(
|
||||||
Column::new(
|
NuDataFrame::try_from_columns(vec![
|
||||||
"b".to_string(),
|
Column::new(
|
||||||
vec![
|
"b".to_string(),
|
||||||
Value::test_int(1),
|
vec![
|
||||||
Value::test_int(2),
|
Value::test_int(1),
|
||||||
Value::test_int(3),
|
Value::test_int(2),
|
||||||
Value::test_int(1),
|
Value::test_int(3),
|
||||||
Value::test_int(2),
|
Value::test_int(1),
|
||||||
Value::test_int(3),
|
Value::test_int(2),
|
||||||
],
|
Value::test_int(3),
|
||||||
),
|
],
|
||||||
Column::new(
|
),
|
||||||
"c".to_string(),
|
Column::new(
|
||||||
vec![
|
"c".to_string(),
|
||||||
Value::test_int(4),
|
vec![
|
||||||
Value::test_int(5),
|
Value::test_int(4),
|
||||||
Value::test_int(6),
|
Value::test_int(5),
|
||||||
Value::test_int(4),
|
Value::test_int(6),
|
||||||
Value::test_int(5),
|
Value::test_int(4),
|
||||||
Value::test_int(6),
|
Value::test_int(5),
|
||||||
],
|
Value::test_int(6),
|
||||||
),
|
],
|
||||||
Column::new(
|
),
|
||||||
"variable".to_string(),
|
Column::new(
|
||||||
vec![
|
"variable".to_string(),
|
||||||
Value::test_string("a"),
|
vec![
|
||||||
Value::test_string("a"),
|
Value::test_string("a"),
|
||||||
Value::test_string("a"),
|
Value::test_string("a"),
|
||||||
Value::test_string("d"),
|
Value::test_string("a"),
|
||||||
Value::test_string("d"),
|
Value::test_string("d"),
|
||||||
Value::test_string("d"),
|
Value::test_string("d"),
|
||||||
],
|
Value::test_string("d"),
|
||||||
),
|
],
|
||||||
Column::new(
|
),
|
||||||
"value".to_string(),
|
Column::new(
|
||||||
vec![
|
"value".to_string(),
|
||||||
Value::test_string("x"),
|
vec![
|
||||||
Value::test_string("y"),
|
Value::test_string("x"),
|
||||||
Value::test_string("z"),
|
Value::test_string("y"),
|
||||||
Value::test_string("a"),
|
Value::test_string("z"),
|
||||||
Value::test_string("b"),
|
Value::test_string("a"),
|
||||||
Value::test_string("c"),
|
Value::test_string("b"),
|
||||||
],
|
Value::test_string("c"),
|
||||||
),
|
],
|
||||||
], None)
|
),
|
||||||
.expect("simple df for test should not fail")
|
], None)
|
||||||
.into_value(Span::test_data()),
|
.expect("simple df for test should not fail")
|
||||||
),
|
.into_value(Span::test_data()),
|
||||||
}]
|
),
|
||||||
|
},
|
||||||
|
Example {
|
||||||
|
description: "unpivot on a lazy dataframe",
|
||||||
|
example:
|
||||||
|
"[[a b c d]; [x 1 4 a] [y 2 5 b] [z 3 6 c]] | polars into-lazy | polars unpivot -c [b c] -v [a d] | polars collect",
|
||||||
|
result: Some(
|
||||||
|
NuDataFrame::try_from_columns(vec![
|
||||||
|
Column::new(
|
||||||
|
"b".to_string(),
|
||||||
|
vec![
|
||||||
|
Value::test_int(1),
|
||||||
|
Value::test_int(2),
|
||||||
|
Value::test_int(3),
|
||||||
|
Value::test_int(1),
|
||||||
|
Value::test_int(2),
|
||||||
|
Value::test_int(3),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
Column::new(
|
||||||
|
"c".to_string(),
|
||||||
|
vec![
|
||||||
|
Value::test_int(4),
|
||||||
|
Value::test_int(5),
|
||||||
|
Value::test_int(6),
|
||||||
|
Value::test_int(4),
|
||||||
|
Value::test_int(5),
|
||||||
|
Value::test_int(6),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
Column::new(
|
||||||
|
"variable".to_string(),
|
||||||
|
vec![
|
||||||
|
Value::test_string("a"),
|
||||||
|
Value::test_string("a"),
|
||||||
|
Value::test_string("a"),
|
||||||
|
Value::test_string("d"),
|
||||||
|
Value::test_string("d"),
|
||||||
|
Value::test_string("d"),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
Column::new(
|
||||||
|
"value".to_string(),
|
||||||
|
vec![
|
||||||
|
Value::test_string("x"),
|
||||||
|
Value::test_string("y"),
|
||||||
|
Value::test_string("z"),
|
||||||
|
Value::test_string("a"),
|
||||||
|
Value::test_string("b"),
|
||||||
|
Value::test_string("c"),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
], None)
|
||||||
|
.expect("simple df for test should not fail")
|
||||||
|
.into_value(Span::test_data()),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run(
|
fn run(
|
||||||
|
@ -122,15 +187,26 @@ impl PluginCommand for UnpivotDF {
|
||||||
call: &EvaluatedCall,
|
call: &EvaluatedCall,
|
||||||
input: PipelineData,
|
input: PipelineData,
|
||||||
) -> Result<PipelineData, LabeledError> {
|
) -> Result<PipelineData, LabeledError> {
|
||||||
command(plugin, engine, call, input).map_err(LabeledError::from)
|
match PolarsPluginObject::try_from_pipeline(plugin, input, call.head)? {
|
||||||
|
PolarsPluginObject::NuDataFrame(df) => command_eager(plugin, engine, call, df),
|
||||||
|
PolarsPluginObject::NuLazyFrame(lazy) => command_lazy(plugin, engine, call, lazy),
|
||||||
|
_ => Err(ShellError::GenericError {
|
||||||
|
error: "Must be a dataframe or lazy dataframe".into(),
|
||||||
|
msg: "".into(),
|
||||||
|
span: Some(call.head),
|
||||||
|
help: None,
|
||||||
|
inner: vec![],
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
.map_err(LabeledError::from)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn command(
|
fn command_eager(
|
||||||
plugin: &PolarsPlugin,
|
plugin: &PolarsPlugin,
|
||||||
engine: &EngineInterface,
|
engine: &EngineInterface,
|
||||||
call: &EvaluatedCall,
|
call: &EvaluatedCall,
|
||||||
input: PipelineData,
|
df: NuDataFrame,
|
||||||
) -> Result<PipelineData, ShellError> {
|
) -> Result<PipelineData, ShellError> {
|
||||||
let id_col: Vec<Value> = call.get_flag("columns")?.expect("required value");
|
let id_col: Vec<Value> = call.get_flag("columns")?.expect("required value");
|
||||||
let val_col: Vec<Value> = call.get_flag("values")?.expect("required value");
|
let val_col: Vec<Value> = call.get_flag("values")?.expect("required value");
|
||||||
|
@ -141,14 +217,12 @@ fn command(
|
||||||
let (id_col_string, id_col_span) = convert_columns_string(id_col, call.head)?;
|
let (id_col_string, id_col_span) = convert_columns_string(id_col, call.head)?;
|
||||||
let (val_col_string, val_col_span) = convert_columns_string(val_col, call.head)?;
|
let (val_col_string, val_col_span) = convert_columns_string(val_col, call.head)?;
|
||||||
|
|
||||||
let df = NuDataFrame::try_from_pipeline_coerce(plugin, input, call.head)?;
|
|
||||||
|
|
||||||
check_column_datatypes(df.as_ref(), &id_col_string, id_col_span)?;
|
check_column_datatypes(df.as_ref(), &id_col_string, id_col_span)?;
|
||||||
check_column_datatypes(df.as_ref(), &val_col_string, val_col_span)?;
|
check_column_datatypes(df.as_ref(), &val_col_string, val_col_span)?;
|
||||||
|
|
||||||
let mut res = df
|
let mut res = df
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.unpivot(&id_col_string, &val_col_string)
|
.unpivot(&val_col_string, &id_col_string)
|
||||||
.map_err(|e| ShellError::GenericError {
|
.map_err(|e| ShellError::GenericError {
|
||||||
error: "Error calculating unpivot".into(),
|
error: "Error calculating unpivot".into(),
|
||||||
msg: e.to_string(),
|
msg: e.to_string(),
|
||||||
|
@ -183,6 +257,37 @@ fn command(
|
||||||
res.to_pipeline_data(plugin, engine, call.head)
|
res.to_pipeline_data(plugin, engine, call.head)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn command_lazy(
|
||||||
|
plugin: &PolarsPlugin,
|
||||||
|
engine: &EngineInterface,
|
||||||
|
call: &EvaluatedCall,
|
||||||
|
df: NuLazyFrame,
|
||||||
|
) -> Result<PipelineData, ShellError> {
|
||||||
|
let id_col: Vec<Value> = call.get_flag("columns")?.expect("required value");
|
||||||
|
let val_col: Vec<Value> = call.get_flag("values")?.expect("required value");
|
||||||
|
|
||||||
|
let (id_col_string, _id_col_span) = convert_columns_string(id_col, call.head)?;
|
||||||
|
let (val_col_string, _val_col_span) = convert_columns_string(val_col, call.head)?;
|
||||||
|
|
||||||
|
let value_name: Option<String> = call.get_flag("value-name")?;
|
||||||
|
let variable_name: Option<String> = call.get_flag("variable-name")?;
|
||||||
|
|
||||||
|
let streamable = call.has_flag("streamable")?;
|
||||||
|
|
||||||
|
let unpivot_args = UnpivotArgs {
|
||||||
|
on: val_col_string.iter().map(Into::into).collect(),
|
||||||
|
index: id_col_string.iter().map(Into::into).collect(),
|
||||||
|
value_name: value_name.map(Into::into),
|
||||||
|
variable_name: variable_name.map(Into::into),
|
||||||
|
streamable,
|
||||||
|
};
|
||||||
|
|
||||||
|
let polars_df = df.to_polars().unpivot(unpivot_args);
|
||||||
|
|
||||||
|
let res = NuLazyFrame::new(false, polars_df);
|
||||||
|
res.to_pipeline_data(plugin, engine, call.head)
|
||||||
|
}
|
||||||
|
|
||||||
fn check_column_datatypes<T: AsRef<str>>(
|
fn check_column_datatypes<T: AsRef<str>>(
|
||||||
df: &polars::prelude::DataFrame,
|
df: &polars::prelude::DataFrame,
|
||||||
cols: &[T],
|
cols: &[T],
|
||||||
|
|
|
@ -78,22 +78,16 @@ impl NuLazyFrame {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn schema(&mut self) -> Result<NuSchema, ShellError> {
|
pub fn schema(&mut self) -> Result<NuSchema, ShellError> {
|
||||||
let internal_schema = Arc::get_mut(&mut self.lazy)
|
let internal_schema =
|
||||||
.ok_or(ShellError::GenericError {
|
Arc::make_mut(&mut self.lazy)
|
||||||
error: "Error getting schema from lazy frame".into(),
|
.schema()
|
||||||
msg: "LazyFrame is not mutable".into(),
|
.map_err(|e| ShellError::GenericError {
|
||||||
span: None,
|
error: "Error getting schema from lazy frame".into(),
|
||||||
help: None,
|
msg: e.to_string(),
|
||||||
inner: vec![],
|
span: None,
|
||||||
})?
|
help: None,
|
||||||
.schema()
|
inner: vec![],
|
||||||
.map_err(|e| ShellError::GenericError {
|
})?;
|
||||||
error: "Error getting schema from lazy frame".into(),
|
|
||||||
msg: e.to_string(),
|
|
||||||
span: None,
|
|
||||||
help: None,
|
|
||||||
inner: vec![],
|
|
||||||
})?;
|
|
||||||
Ok(internal_schema.into())
|
Ok(internal_schema.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user