lazy dataframe reader (#6321)

* lazy dataframe reader

* correct space for polars dependencies
This commit is contained in:
Fernando Herrera 2022-08-14 14:06:31 +02:00 committed by GitHub
parent eb55fd2383
commit 9d8d305e9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 214 additions and 98 deletions

View File

@ -103,11 +103,30 @@ optional = true
version = "0.23.2" version = "0.23.2"
optional = true optional = true
features = [ features = [
"default", "to_dummies", "parquet", "json", "serde", "serde-lazy", "arg_where",
"object", "checked_arithmetic", "strings", "cum_agg", "is_in", "checked_arithmetic",
"rolling_window", "strings", "rows", "random", "concat_str",
"dtype-datetime", "dtype-struct", "lazy", "cross_join", "cross_join",
"dynamic_groupby", "dtype-categorical", "concat_str", "arg_where" "csv-file",
"cum_agg",
"default",
"dtype-datetime",
"dtype-struct",
"dtype-categorical",
"dynamic_groupby",
"is_in",
"json",
"lazy",
"object",
"parquet",
"random",
"rolling_window",
"rows",
"serde",
"serde-lazy",
"strings",
"strings",
"to_dummies",
] ]
[target.'cfg(windows)'.dependencies.windows] [target.'cfg(windows)'.dependencies.windows]

View File

@ -1,14 +1,17 @@
use super::super::values::NuDataFrame; use super::super::values::{NuDataFrame, NuLazyFrame};
use nu_engine::CallExt; use nu_engine::CallExt;
use nu_protocol::{ use nu_protocol::{
ast::Call, ast::Call,
engine::{Command, EngineState, Stack}, engine::{Command, EngineState, Stack},
Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Value,
}; };
use std::{fs::File, io::BufReader, path::PathBuf}; use std::{fs::File, io::BufReader, path::PathBuf};
use polars::prelude::{CsvEncoding, CsvReader, JsonReader, ParquetReader, SerReader}; use polars::prelude::{
CsvEncoding, CsvReader, JsonReader, LazyCsvReader, LazyFrame, ParallelStrategy, ParquetReader,
ScanArgsParquet, SerReader,
};
#[derive(Clone)] #[derive(Clone)]
pub struct OpenDataFrame; pub struct OpenDataFrame;
@ -29,6 +32,7 @@ impl Command for OpenDataFrame {
SyntaxShape::Filepath, SyntaxShape::Filepath,
"file path to load values from", "file path to load values from",
) )
.switch("lazy", "creates a lazy dataframe", Some('l'))
.named( .named(
"delimiter", "delimiter",
SyntaxShape::String, SyntaxShape::String,
@ -87,7 +91,6 @@ fn command(
stack: &mut Stack, stack: &mut Stack,
call: &Call, call: &Call,
) -> Result<PipelineData, ShellError> { ) -> Result<PipelineData, ShellError> {
let span = call.head;
let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?; let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
match file.item.extension() { match file.item.extension() {
@ -105,49 +108,80 @@ fn command(
file.span, file.span,
)), )),
} }
.map(|df| PipelineData::Value(NuDataFrame::dataframe_into_value(df, span), None)) .map(|value| PipelineData::Value(value, None))
} }
fn from_parquet( fn from_parquet(
engine_state: &EngineState, engine_state: &EngineState,
stack: &mut Stack, stack: &mut Stack,
call: &Call, call: &Call,
) -> Result<polars::prelude::DataFrame, ShellError> { ) -> Result<Value, ShellError> {
let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?; if call.has_flag("lazy") {
let columns: Option<Vec<String>> = call.get_flag(engine_state, stack, "columns")?; let file: String = call.req(engine_state, stack, 0)?;
let args = ScanArgsParquet {
n_rows: None,
cache: true,
parallel: ParallelStrategy::Auto,
rechunk: false,
row_count: None,
low_memory: false,
};
let r = File::open(&file.item).map_err(|e| { let df: NuLazyFrame = LazyFrame::scan_parquet(file, args)
ShellError::GenericError( .map_err(|e| {
"Error opening file".into(), ShellError::GenericError(
e.to_string(), "Parquet reader error".into(),
Some(file.span), format!("{:?}", e),
None, Some(call.head),
Vec::new(), None,
) Vec::new(),
})?; )
let reader = ParquetReader::new(r); })?
.into();
let reader = match columns { df.into_value(call.head)
None => reader, } else {
Some(columns) => reader.with_columns(Some(columns)), let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
}; let columns: Option<Vec<String>> = call.get_flag(engine_state, stack, "columns")?;
reader.finish().map_err(|e| { let r = File::open(&file.item).map_err(|e| {
ShellError::GenericError( ShellError::GenericError(
"Parquet reader error".into(), "Error opening file".into(),
format!("{:?}", e), e.to_string(),
Some(call.head), Some(file.span),
None, None,
Vec::new(), Vec::new(),
) )
}) })?;
let reader = ParquetReader::new(r);
let reader = match columns {
None => reader,
Some(columns) => reader.with_columns(Some(columns)),
};
let df: NuDataFrame = reader
.finish()
.map_err(|e| {
ShellError::GenericError(
"Parquet reader error".into(),
format!("{:?}", e),
Some(call.head),
None,
Vec::new(),
)
})?
.into();
Ok(df.into_value(call.head))
}
} }
fn from_json( fn from_json(
engine_state: &EngineState, engine_state: &EngineState,
stack: &mut Stack, stack: &mut Stack,
call: &Call, call: &Call,
) -> Result<polars::prelude::DataFrame, ShellError> { ) -> Result<Value, ShellError> {
let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?; let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
let file = File::open(&file.item).map_err(|e| { let file = File::open(&file.item).map_err(|e| {
ShellError::GenericError( ShellError::GenericError(
@ -162,86 +196,149 @@ fn from_json(
let buf_reader = BufReader::new(file); let buf_reader = BufReader::new(file);
let reader = JsonReader::new(buf_reader); let reader = JsonReader::new(buf_reader);
reader.finish().map_err(|e| { let df: NuDataFrame = reader
ShellError::GenericError( .finish()
"Json reader error".into(), .map_err(|e| {
format!("{:?}", e), ShellError::GenericError(
Some(call.head), "Json reader error".into(),
None, format!("{:?}", e),
Vec::new(), Some(call.head),
) None,
}) Vec::new(),
)
})?
.into();
Ok(df.into_value(call.head))
} }
fn from_csv( fn from_csv(
engine_state: &EngineState, engine_state: &EngineState,
stack: &mut Stack, stack: &mut Stack,
call: &Call, call: &Call,
) -> Result<polars::prelude::DataFrame, ShellError> { ) -> Result<Value, ShellError> {
let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
let delimiter: Option<Spanned<String>> = call.get_flag(engine_state, stack, "delimiter")?; let delimiter: Option<Spanned<String>> = call.get_flag(engine_state, stack, "delimiter")?;
let no_header: bool = call.has_flag("no-header"); let no_header: bool = call.has_flag("no-header");
let infer_schema: Option<usize> = call.get_flag(engine_state, stack, "infer-schema")?; let infer_schema: Option<usize> = call.get_flag(engine_state, stack, "infer-schema")?;
let skip_rows: Option<usize> = call.get_flag(engine_state, stack, "skip-rows")?; let skip_rows: Option<usize> = call.get_flag(engine_state, stack, "skip-rows")?;
let columns: Option<Vec<String>> = call.get_flag(engine_state, stack, "columns")?; let columns: Option<Vec<String>> = call.get_flag(engine_state, stack, "columns")?;
let csv_reader = CsvReader::from_path(&file.item) if call.has_flag("lazy") {
.map_err(|e| { let file: String = call.req(engine_state, stack, 0)?;
ShellError::GenericError( let csv_reader = LazyCsvReader::new(file);
"Error creating CSV reader".into(),
e.to_string(),
Some(file.span),
None,
Vec::new(),
)
})?
.with_encoding(CsvEncoding::LossyUtf8);
let csv_reader = match delimiter { let csv_reader = match delimiter {
None => csv_reader, None => csv_reader,
Some(d) => { Some(d) => {
if d.item.len() != 1 { if d.item.len() != 1 {
return Err(ShellError::GenericError( return Err(ShellError::GenericError(
"Incorrect delimiter".into(), "Incorrect delimiter".into(),
"Delimiter has to be one character".into(), "Delimiter has to be one character".into(),
Some(d.span), Some(d.span),
None,
Vec::new(),
));
} else {
let delimiter = match d.item.chars().next() {
Some(d) => d as u8,
None => unreachable!(),
};
csv_reader.with_delimiter(delimiter)
}
}
};
let csv_reader = csv_reader.has_header(!no_header);
let csv_reader = match infer_schema {
None => csv_reader,
Some(r) => csv_reader.with_infer_schema_length(Some(r)),
};
let csv_reader = match skip_rows {
None => csv_reader,
Some(r) => csv_reader.with_skip_rows(r),
};
let df: NuLazyFrame = csv_reader
.finish()
.map_err(|e| {
ShellError::GenericError(
"Parquet reader error".into(),
format!("{:?}", e),
Some(call.head),
None, None,
Vec::new(), Vec::new(),
)); )
} else { })?
let delimiter = match d.item.chars().next() { .into();
Some(d) => d as u8,
None => unreachable!(), df.into_value(call.head)
}; } else {
csv_reader.with_delimiter(delimiter) let file: Spanned<PathBuf> = call.req(engine_state, stack, 0)?;
let csv_reader = CsvReader::from_path(&file.item)
.map_err(|e| {
ShellError::GenericError(
"Error creating CSV reader".into(),
e.to_string(),
Some(file.span),
None,
Vec::new(),
)
})?
.with_encoding(CsvEncoding::LossyUtf8);
let csv_reader = match delimiter {
None => csv_reader,
Some(d) => {
if d.item.len() != 1 {
return Err(ShellError::GenericError(
"Incorrect delimiter".into(),
"Delimiter has to be one character".into(),
Some(d.span),
None,
Vec::new(),
));
} else {
let delimiter = match d.item.chars().next() {
Some(d) => d as u8,
None => unreachable!(),
};
csv_reader.with_delimiter(delimiter)
}
} }
} };
};
let csv_reader = csv_reader.has_header(!no_header); let csv_reader = csv_reader.has_header(!no_header);
let csv_reader = match infer_schema { let csv_reader = match infer_schema {
None => csv_reader, None => csv_reader,
Some(r) => csv_reader.infer_schema(Some(r)), Some(r) => csv_reader.infer_schema(Some(r)),
}; };
let csv_reader = match skip_rows { let csv_reader = match skip_rows {
None => csv_reader, None => csv_reader,
Some(r) => csv_reader.with_skip_rows(r), Some(r) => csv_reader.with_skip_rows(r),
}; };
let csv_reader = match columns { let csv_reader = match columns {
None => csv_reader, None => csv_reader,
Some(columns) => csv_reader.with_columns(Some(columns)), Some(columns) => csv_reader.with_columns(Some(columns)),
}; };
csv_reader.finish().map_err(|e| { let df: NuDataFrame = csv_reader
ShellError::GenericError( .finish()
"Parquet reader error".into(), .map_err(|e| {
format!("{:?}", e), ShellError::GenericError(
Some(call.head), "Parquet reader error".into(),
None, format!("{:?}", e),
Vec::new(), Some(call.head),
) None,
}) Vec::new(),
)
})?
.into();
Ok(df.into_value(call.head))
}
} }