From d08212409f597dfc711deef0d7024bc5efd0f15c Mon Sep 17 00:00:00 2001 From: Aron Nopanen Date: Mon, 12 Sep 2022 16:30:20 -0700 Subject: [PATCH] Support Arrow IPC file format with dataframes (#6548) * Add support for Arrow IPC file format Add support for Arrow IPC file format to dataframes commands. Support opening of Arrow IPC-format files with extension '.arrow' or '.ipc' in the open-df command. Add a 'to arrow' command to write a dataframe to Arrow IPC format. * Add unit test for open-df on Arrow * Add -t flag to open-df command Add a `--type`/`-t` flag to the `open-df` command, to explicitly specify the type of file being used. Allowed values are the same at the set of allowed file extensions. --- Cargo.lock | 2 + crates/nu-command/Cargo.toml | 1 + crates/nu-command/src/dataframe/eager/mod.rs | 3 + crates/nu-command/src/dataframe/eager/open.rs | 108 ++++++++++++++++-- .../src/dataframe/eager/to_arrow.rs | 94 +++++++++++++++ crates/nu-command/tests/commands/open.rs | 16 +++ tests/fixtures/formats/caco3_plastics.arrow | Bin 0 -> 4663 bytes 7 files changed, 214 insertions(+), 10 deletions(-) create mode 100644 crates/nu-command/src/dataframe/eager/to_arrow.rs create mode 100644 tests/fixtures/formats/caco3_plastics.arrow diff --git a/Cargo.lock b/Cargo.lock index 5c5b83df8a..0ddec0d76c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,12 +167,14 @@ dependencies = [ "indexmap", "json-deserializer", "lexical-core", + "lz4", "multiversion", "num-traits", "parquet2", "simdutf8", "streaming-iterator", "strength_reduce", + "zstd", ] [[package]] diff --git a/crates/nu-command/Cargo.toml b/crates/nu-command/Cargo.toml index 9915f3e711..6b1c1179ed 100644 --- a/crates/nu-command/Cargo.toml +++ b/crates/nu-command/Cargo.toml @@ -115,6 +115,7 @@ features = [ "dtype-struct", "dtype-categorical", "dynamic_groupby", + "ipc", "is_in", "json", "lazy", diff --git a/crates/nu-command/src/dataframe/eager/mod.rs b/crates/nu-command/src/dataframe/eager/mod.rs index 1b7442ac3b..bbeb551036 100644 --- a/crates/nu-command/src/dataframe/eager/mod.rs +++ b/crates/nu-command/src/dataframe/eager/mod.rs @@ -18,6 +18,7 @@ mod sample; mod shape; mod slice; mod take; +mod to_arrow; mod to_csv; mod to_df; mod to_nu; @@ -46,6 +47,7 @@ pub use sample::SampleDF; pub use shape::ShapeDF; pub use slice::SliceDF; pub use take::TakeDF; +pub use to_arrow::ToArrow; pub use to_csv::ToCSV; pub use to_df::ToDataFrame; pub use to_nu::ToNu; @@ -84,6 +86,7 @@ pub fn add_eager_decls(working_set: &mut StateWorkingSet) { ShapeDF, SliceDF, TakeDF, + ToArrow, ToCSV, ToDataFrame, ToNu, diff --git a/crates/nu-command/src/dataframe/eager/open.rs b/crates/nu-command/src/dataframe/eager/open.rs index de20048009..4c9d4e2860 100644 --- a/crates/nu-command/src/dataframe/eager/open.rs +++ b/crates/nu-command/src/dataframe/eager/open.rs @@ -9,8 +9,8 @@ use nu_protocol::{ use std::{fs::File, io::BufReader, path::PathBuf}; use polars::prelude::{ - CsvEncoding, CsvReader, JsonReader, LazyCsvReader, LazyFrame, ParallelStrategy, ParquetReader, - ScanArgsParquet, SerReader, + CsvEncoding, CsvReader, IpcReader, JsonReader, LazyCsvReader, LazyFrame, ParallelStrategy, + ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader, }; #[derive(Clone)] @@ -22,7 +22,7 @@ impl Command for OpenDataFrame { } fn usage(&self) -> &str { - "Opens csv, json or parquet file to create dataframe" + "Opens csv, json, arrow, or parquet file to create dataframe" } fn signature(&self) -> Signature { @@ -33,6 +33,12 @@ impl Command for OpenDataFrame { "file path to load values from", ) .switch("lazy", "creates a lazy dataframe", Some('l')) + .named( + "type", + SyntaxShape::String, + "File type: csv, tsv, json, parquet, arrow. If omitted, derive from file extension", + Some('t'), + ) .named( "delimiter", SyntaxShape::String, @@ -93,15 +99,33 @@ fn command( ) -> Result { let file: Spanned = call.req(engine_state, stack, 0)?; - match file.item.extension() { - Some(e) => match e.to_str() { - Some("csv") | Some("tsv") => from_csv(engine_state, stack, call), - Some("parquet") => from_parquet(engine_state, stack, call), - Some("json") => from_json(engine_state, stack, call), - _ => Err(ShellError::FileNotFoundCustom( - "Not a csv, tsv, parquet or json file".into(), + let type_option: Option> = call.get_flag(engine_state, stack, "type")?; + + let type_id = match &type_option { + Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)), + None => match file.item.extension() { + Some(e) => Some(( + e.to_string_lossy().into_owned(), + "Invalid extension", file.span, )), + None => None, + }, + }; + + match type_id { + Some((e, msg, blamed)) => match e.as_str() { + "csv" | "tsv" => from_csv(engine_state, stack, call), + "parquet" => from_parquet(engine_state, stack, call), + "ipc" | "arrow" => from_ipc(engine_state, stack, call), + "json" => from_json(engine_state, stack, call), + _ => Err(ShellError::FileNotFoundCustom( + format!( + "{}. Supported values: csv, tsv, parquet, ipc, arrow, json", + msg + ), + blamed, + )), }, None => Err(ShellError::FileNotFoundCustom( "File without extension".into(), @@ -177,6 +201,70 @@ fn from_parquet( } } +fn from_ipc( + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, +) -> Result { + if call.has_flag("lazy") { + let file: String = call.req(engine_state, stack, 0)?; + let args = ScanArgsIpc { + n_rows: None, + cache: true, + rechunk: false, + row_count: None, + }; + + let df: NuLazyFrame = LazyFrame::scan_ipc(file, args) + .map_err(|e| { + ShellError::GenericError( + "IPC reader error".into(), + format!("{:?}", e), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + df.into_value(call.head) + } else { + let file: Spanned = call.req(engine_state, stack, 0)?; + let columns: Option> = call.get_flag(engine_state, stack, "columns")?; + + let r = File::open(&file.item).map_err(|e| { + ShellError::GenericError( + "Error opening file".into(), + e.to_string(), + Some(file.span), + None, + Vec::new(), + ) + })?; + let reader = IpcReader::new(r); + + let reader = match columns { + None => reader, + Some(columns) => reader.with_columns(Some(columns)), + }; + + let df: NuDataFrame = reader + .finish() + .map_err(|e| { + ShellError::GenericError( + "IPC reader error".into(), + format!("{:?}", e), + Some(call.head), + None, + Vec::new(), + ) + })? + .into(); + + Ok(df.into_value(call.head)) + } +} + fn from_json( engine_state: &EngineState, stack: &mut Stack, diff --git a/crates/nu-command/src/dataframe/eager/to_arrow.rs b/crates/nu-command/src/dataframe/eager/to_arrow.rs new file mode 100644 index 0000000000..f3c52c5dcc --- /dev/null +++ b/crates/nu-command/src/dataframe/eager/to_arrow.rs @@ -0,0 +1,94 @@ +use std::{fs::File, path::PathBuf}; + +use nu_engine::CallExt; +use nu_protocol::{ + ast::Call, + engine::{Command, EngineState, Stack}, + Category, Example, PipelineData, ShellError, Signature, Spanned, SyntaxShape, Type, Value, +}; +use polars::prelude::{IpcWriter, SerWriter}; + +use super::super::values::NuDataFrame; + +#[derive(Clone)] +pub struct ToArrow; + +impl Command for ToArrow { + fn name(&self) -> &str { + "to arrow" + } + + fn usage(&self) -> &str { + "Saves dataframe to arrow file" + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .required("file", SyntaxShape::Filepath, "file path to save dataframe") + .input_type(Type::Custom("dataframe".into())) + .output_type(Type::Any) + .category(Category::Custom("dataframe".into())) + } + + fn examples(&self) -> Vec { + vec![Example { + description: "Saves dataframe to arrow file", + example: "[[a b]; [1 2] [3 4]] | into df | to arrow test.arrow", + result: None, + }] + } + + fn run( + &self, + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, + ) -> Result { + command(engine_state, stack, call, input) + } +} + +fn command( + engine_state: &EngineState, + stack: &mut Stack, + call: &Call, + input: PipelineData, +) -> Result { + let file_name: Spanned = call.req(engine_state, stack, 0)?; + + let mut df = NuDataFrame::try_from_pipeline(input, call.head)?; + + let mut file = File::create(&file_name.item).map_err(|e| { + ShellError::GenericError( + "Error with file name".into(), + e.to_string(), + Some(file_name.span), + None, + Vec::new(), + ) + })?; + + IpcWriter::new(&mut file).finish(df.as_mut()).map_err(|e| { + ShellError::GenericError( + "Error saving file".into(), + e.to_string(), + Some(file_name.span), + None, + Vec::new(), + ) + })?; + + let file_value = Value::String { + val: format!("saved {:?}", &file_name.item), + span: file_name.span, + }; + + Ok(PipelineData::Value( + Value::List { + vals: vec![file_value], + span: call.head, + }, + None, + )) +} diff --git a/crates/nu-command/tests/commands/open.rs b/crates/nu-command/tests/commands/open.rs index a6996da052..c0a1fd0cf1 100644 --- a/crates/nu-command/tests/commands/open.rs +++ b/crates/nu-command/tests/commands/open.rs @@ -208,6 +208,22 @@ fn parses_utf16_ini() { assert_eq!(actual.out, "-236") } +#[cfg(feature = "database")] +#[test] +fn parses_arrow_ipc() { + let actual = nu!( + cwd: "tests/fixtures/formats", pipeline( + r#" + open-df caco3_plastics.arrow + | into nu + | first 1 + | get origin + "# + )); + + assert_eq!(actual.out, "SPAIN") +} + #[test] fn errors_if_file_not_found() { let actual = nu!( diff --git a/tests/fixtures/formats/caco3_plastics.arrow b/tests/fixtures/formats/caco3_plastics.arrow new file mode 100644 index 0000000000000000000000000000000000000000..13b336a8e3764d93e60d5fa51dce6004d9c92291 GIT binary patch literal 4663 zcmeHLO>Epm6rQ%Bq-l`S(gQ!DjQEi%*m`a6deYT%gBZ>45`MB~TAs5TZpwmFNK>4i!`?-aX8gVP z=9@QfW}fYFf+T6b;0SR8<-iUh280m5L*54*5ZiDa61&A9t^?v4?8C4O;zzB!aXq(P zh&NFWpk4ue1@$0y{T<^rmCGD$#Hi@cyA>hq?U5Bp$_}OLr^n1tVAX6+`?LU9| z<(b7h>_3Dt|4sbVf41N6y*_t*X7&!|PyJsz*;`z^!~Q$b|8&21qIYWRYezYCYIcE^ zU9Gzhl&EUokT;v{+aK7UF>sJZ$c$as!9kU8OSApn{QOLBX>MW5dw4H=={4wYxi4jW zZ(;Fte`&4{QQa@s5IfC{a%q_eC`d!tc8Fndzna%Dv|rFp`A200eTbvW5aa=HN5rGX z1O!DDb}2fnPcdF*9HYfSG5kr^4S$*S^O`1JGQ^xd$@&Vn8`@bGj6Y0E6*JBxO&-PY zJFFZ28P*SJn)oTlzmnto_h{l#<}b5u{GVan`2Q`(?@*GW82%LNhQGpkqG>`1MsjOwUl08%+D(>lEh=&Mgx&-OE%CY7oyb zri^bfe!{rI_%-8Y#!e$hvp8(0GKr4_4H?FPY-IJI8BN9EWYm;V*bVCKM5b0?X>xG1 z8%)bIxLMc3-MATtfehQtD1pC*)E1JUTFSrGAZyj-E)ZtW zs>dxL8Aw*GSv-+M8$wlzGV6)v*N^gm%0n8&6O3m4US$0h#t#@jWjxP_fMgOKiIOa8 zH>U#~ULavkR~rzfjCg+2QA17!&1O)qM~x_v^*EJT7$i|f%VJ?KthJ6CJX%3cm903F ze#y3Mw`!IA%E3w+k4KHLjg@b-o8#EYG)QmRNl#>~`ToZw$q)B@{!jSjJAeI5?Ki{x zIzLThl(Ggf!MMQqF5^dxKQdltG|xUP(RjNRWNlfGWEeEUxGl4|gSnF2X=D4HiY+q< z)lgT3wLXg7ZK9WU7h%JY-i{lx9(o z;3ku5Gt@9nCc_CiKAuX)w(U*4uXDAb)lB|2Mp{2mCy=fuTkZV*@FQK^$Lme!KPu+= z@`zT&lZ@thBgtKr>~hhuwXIyVE1KsNZKvqeur2E)$H}$o0sphO5o1}6 zhyXcmyH>P)Z9{-M9>+DUG*fMJJu@pzoq4WrY@5ez-ic=aALjKm`);vbWgKIS852g| zDPZ9&+jc7~tXh!hrmf0CrBd(|yHsL}=dLwrN7`|IolU%Zc-#jWA7gxqvBYTJk0I;k zjNz0EWv6O+K0>Oxh}$n9PM}jYp%q-uw`#WI7AhqVEtP^>C5B(9mCM9*3e~a?fn47z zPV>Hboa4wpA5A_-+-}Y`j*l>0t5imOc-%WaMlDy%mgl){!7G)ls^=9vpW*|xf&e|Y zR;bl#WWeH