nushell/crates/nu-cmd-dataframe/src/dataframe/lazy/aggregate.rs
Jack Wright f879c00f9d
The ability to specify a schema when using dfr open and dfr into-df (#11634)
# Description

There are times where explicitly specifying a schema for a dataframe is
needed such as:
- Opening CSV and JSON lines files and needing provide more information
to polars to keep it from failing or in a desire to override default
type conversion
- When converting a nushell value to a dataframe and wanting to override
the default conversion behaviors.

This pull requests provides:
- A flag to allow specifying a schema when using dfr into-df
- A flag to allow specifying a schema when using dfr open that works for
CSV and JSON types
- A new command `dfr schema` which displays schema information and will
allow display support schema dtypes

Schema is specified creating a record that has the key value and the
dtype. Examples usages:

```
{a:1, b:{a:2}} | dfr into-df -s {a: u8, b: {a: i32}} | dfr schema
{a: 1, b: {a: [1 2 3]}, c: [a b c]} | dfr into-df -s {a: u8, b: {a: list<u64>}, c: list<str>} | dfr schema
 dfr open -s {pid: i32, ppid: i32, name: str, status: str, cpu: f64, mem: i64, virtual: i64} /tmp/ps.jsonl  | dfr schema
```

Supported dtypes:
null                                                   
bool                                                   
u8                                                     
u16                                                    
u32                                                    
u64                                                    
i8                                                     
i16                                                    
i32                                                    
i64                                                    
f32                                                    
f64                                                    
str                                                    
binary                                                 
date                                                   
datetime[time_unit: (ms, us, ns) timezone (optional)]  
duration[time_unit: (ms, us, ns)]                      
time                                                   
object                                                 
unknown                                                
list[dtype]


structs are also supported but are specified via another record:
{a: u8, b: {d: str}}

Another feature with the dfr schema command is that it returns the data
back in a format that can be passed to provide a valid schema that can
be passed in as schema argument:

<img width="638" alt="Screenshot 2024-01-29 at 10 23 58"
src="https://github.com/nushell/nushell/assets/56345/b49c3bff-5cda-4c86-975a-dfd91d991373">

---------

Co-authored-by: Jack Wright <jack.wright@disqo.com>
2024-01-29 13:26:04 -06:00

222 lines
7.9 KiB
Rust

use crate::dataframe::values::{Column, NuDataFrame, NuExpression, NuLazyFrame, NuLazyGroupBy};
use nu_engine::CallExt;
use nu_protocol::{
ast::Call,
engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Span, SyntaxShape, Type, Value,
};
use polars::{datatypes::DataType, prelude::Expr};
#[derive(Clone)]
pub struct LazyAggregate;
impl Command for LazyAggregate {
fn name(&self) -> &str {
"dfr agg"
}
fn usage(&self) -> &str {
"Performs a series of aggregations from a group-by."
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.rest(
"Group-by expressions",
SyntaxShape::Any,
"Expression(s) that define the aggregations to be applied",
)
.input_output_type(
Type::Custom("dataframe".into()),
Type::Custom("dataframe".into()),
)
.category(Category::Custom("lazyframe".into()))
}
fn examples(&self) -> Vec<Example> {
vec![
Example {
description: "Group by and perform an aggregation",
example: r#"[[a b]; [1 2] [1 4] [2 6] [2 4]]
| dfr into-df
| dfr group-by a
| dfr agg [
(dfr col b | dfr min | dfr as "b_min")
(dfr col b | dfr max | dfr as "b_max")
(dfr col b | dfr sum | dfr as "b_sum")
]"#,
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(2)],
),
Column::new(
"b_min".to_string(),
vec![Value::test_int(2), Value::test_int(4)],
),
Column::new(
"b_max".to_string(),
vec![Value::test_int(4), Value::test_int(6)],
),
Column::new(
"b_sum".to_string(),
vec![Value::test_int(6), Value::test_int(10)],
),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
Example {
description: "Group by and perform an aggregation",
example: r#"[[a b]; [1 2] [1 4] [2 6] [2 4]]
| dfr into-lazy
| dfr group-by a
| dfr agg [
(dfr col b | dfr min | dfr as "b_min")
(dfr col b | dfr max | dfr as "b_max")
(dfr col b | dfr sum | dfr as "b_sum")
]
| dfr collect"#,
result: Some(
NuDataFrame::try_from_columns(
vec![
Column::new(
"a".to_string(),
vec![Value::test_int(1), Value::test_int(2)],
),
Column::new(
"b_min".to_string(),
vec![Value::test_int(2), Value::test_int(4)],
),
Column::new(
"b_max".to_string(),
vec![Value::test_int(4), Value::test_int(6)],
),
Column::new(
"b_sum".to_string(),
vec![Value::test_int(6), Value::test_int(10)],
),
],
None,
)
.expect("simple df for test should not fail")
.into_value(Span::test_data()),
),
},
]
}
fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let vals: Vec<Value> = call.rest(engine_state, stack, 0)?;
let value = Value::list(vals, call.head);
let expressions = NuExpression::extract_exprs(value)?;
let group_by = NuLazyGroupBy::try_from_pipeline(input, call.head)?;
if let Some(schema) = &group_by.schema {
for expr in expressions.iter() {
if let Some(name) = get_col_name(expr) {
let dtype = schema.get(name.as_str());
if matches!(dtype, Some(DataType::Object(..))) {
return Err(ShellError::GenericError {
error: "Object type column not supported for aggregation".into(),
msg: format!("Column '{name}' is type Object"),
span: Some(call.head),
help: Some("Aggregations cannot be performed on Object type columns. Use dtype command to check column types".into()),
inner: vec![],
});
}
}
}
}
let lazy = NuLazyFrame {
from_eager: group_by.from_eager,
lazy: Some(group_by.into_polars().agg(&expressions)),
schema: None,
};
let res = lazy.into_value(call.head)?;
Ok(PipelineData::Value(res, None))
}
}
fn get_col_name(expr: &Expr) -> Option<String> {
match expr {
Expr::Column(column) => Some(column.to_string()),
Expr::Agg(agg) => match agg {
polars::prelude::AggExpr::Min { input: e, .. }
| polars::prelude::AggExpr::Max { input: e, .. }
| polars::prelude::AggExpr::Median(e)
| polars::prelude::AggExpr::NUnique(e)
| polars::prelude::AggExpr::First(e)
| polars::prelude::AggExpr::Last(e)
| polars::prelude::AggExpr::Mean(e)
| polars::prelude::AggExpr::Implode(e)
| polars::prelude::AggExpr::Count(e, _)
| polars::prelude::AggExpr::Sum(e)
| polars::prelude::AggExpr::AggGroups(e)
| polars::prelude::AggExpr::Std(e, _)
| polars::prelude::AggExpr::Var(e, _) => get_col_name(e.as_ref()),
polars::prelude::AggExpr::Quantile { expr, .. } => get_col_name(expr.as_ref()),
},
Expr::Filter { input: expr, .. }
| Expr::Slice { input: expr, .. }
| Expr::Cast { expr, .. }
| Expr::Sort { expr, .. }
| Expr::Gather { expr, .. }
| Expr::SortBy { expr, .. }
| Expr::Exclude(expr, _)
| Expr::Alias(expr, _)
| Expr::KeepName(expr)
| Expr::Explode(expr) => get_col_name(expr.as_ref()),
Expr::Ternary { .. }
| Expr::AnonymousFunction { .. }
| Expr::Function { .. }
| Expr::Columns(_)
| Expr::DtypeColumn(_)
| Expr::Literal(_)
| Expr::BinaryExpr { .. }
| Expr::Window { .. }
| Expr::Wildcard
| Expr::RenameAlias { .. }
| Expr::Count
| Expr::Nth(_)
| Expr::SubPlan(_, _)
| Expr::Selector(_) => None,
}
}
#[cfg(test)]
mod test {
use super::super::super::test_dataframe::test_dataframe;
use super::*;
use crate::dataframe::expressions::{ExprAlias, ExprMax, ExprMin, ExprSum};
use crate::dataframe::lazy::groupby::ToLazyGroupBy;
#[test]
fn test_examples() {
test_dataframe(vec![
Box::new(LazyAggregate {}),
Box::new(ToLazyGroupBy {}),
Box::new(ExprAlias {}),
Box::new(ExprMin {}),
Box::new(ExprMax {}),
Box::new(ExprSum {}),
])
}
}