Add filter and sink plugins

This commit is contained in:
Jonathan Turner 2019-07-02 19:56:20 +12:00
parent ea18583e30
commit 75ddfe9f5a
12 changed files with 268 additions and 194 deletions

View File

@ -80,8 +80,7 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
Arc::new(Config), Arc::new(Config),
Arc::new(SkipWhile), Arc::new(SkipWhile),
command("sort-by", sort_by::sort_by), command("sort-by", sort_by::sort_by),
command("inc", |x| plugin::plugin("inc".into(), x)), command("inc", |x| plugin::filter_plugin("inc".into(), x)),
command("sum", |x| plugin::plugin("sum".into(), x)),
]); ]);
context.add_sinks(vec![ context.add_sinks(vec![
@ -91,6 +90,7 @@ pub async fn cli() -> Result<(), Box<dyn Error>> {
sink("table", table::table), sink("table", table::table),
sink("tree", tree::tree), sink("tree", tree::tree),
sink("vtable", vtable::vtable), sink("vtable", vtable::vtable),
sink("sum", |x| plugin::sink_plugin("sum".into(), x)),
]); ]);
} }

View File

@ -82,6 +82,10 @@ pub trait Command {
optional_positional: vec![], optional_positional: vec![],
rest_positional: true, rest_positional: true,
named: indexmap::IndexMap::new(), named: indexmap::IndexMap::new(),
is_filter: true,
is_sink: false,
can_load: vec![],
can_save: vec![],
} }
} }
} }
@ -97,6 +101,10 @@ pub trait Sink {
optional_positional: vec![], optional_positional: vec![],
rest_positional: true, rest_positional: true,
named: indexmap::IndexMap::new(), named: indexmap::IndexMap::new(),
is_filter: false,
is_sink: true,
can_load: vec![],
can_save: vec![],
} }
} }
} }

View File

@ -33,6 +33,10 @@ impl Command for Config {
optional_positional: vec![], optional_positional: vec![],
rest_positional: false, rest_positional: false,
named, named,
is_sink: true,
is_filter: false,
can_load: vec![],
can_save: vec![],
} }
} }
} }

View File

@ -28,11 +28,19 @@ impl Command for Open {
optional_positional: vec![], optional_positional: vec![],
rest_positional: false, rest_positional: false,
named, named,
is_filter: true,
is_sink: false,
can_load: vec![],
can_save: vec![],
} }
} }
} }
pub fn fetch(cwd: &PathBuf, location: &str, span: Span) -> Result<(Option<String>, String), ShellError> { pub fn fetch(
cwd: &PathBuf,
location: &str,
span: Span,
) -> Result<(Option<String>, String), ShellError> {
let mut cwd = cwd.clone(); let mut cwd = cwd.clone();
if location.starts_with("http:") || location.starts_with("https:") { if location.starts_with("http:") || location.starts_with("https:") {
let response = reqwest::get(location); let response = reqwest::get(location);
@ -154,9 +162,7 @@ pub fn parse_as_value(
name_span, name_span,
) )
}), }),
_ => { _ => Ok(Value::string(contents)),
Ok(Value::string(contents))
}
} }
} }

View File

@ -1,3 +1,4 @@
use crate::commands::command::SinkCommandArgs;
use crate::errors::ShellError; use crate::errors::ShellError;
use crate::prelude::*; use crate::prelude::*;
use serde::{self, Deserialize, Serialize}; use serde::{self, Deserialize, Serialize};
@ -26,21 +27,23 @@ impl<T> JsonRpc<T> {
#[serde(tag = "method")] #[serde(tag = "method")]
#[allow(non_camel_case_types)] #[allow(non_camel_case_types)]
pub enum NuResult { pub enum NuResult {
response { params: VecDeque<ReturnValue> }, response {
params: Result<VecDeque<ReturnValue>, ShellError>,
},
} }
pub fn plugin(plugin_name: String, args: CommandArgs) -> Result<OutputStream, ShellError> { pub fn filter_plugin(plugin_name: String, args: CommandArgs) -> Result<OutputStream, ShellError> {
let input = args.input;
let args = if let Some(ref positional) = args.args.positional {
positional.clone()
} else {
vec![]
};
let mut path = std::path::PathBuf::from("."); let mut path = std::path::PathBuf::from(".");
path.push("target"); path.push("target");
path.push("debug"); path.push("debug");
path.push(format!("nu_plugin_{}", plugin_name)); path.push(format!("nu_plugin_{}", plugin_name));
path = if path.exists() {
path
} else {
std::path::PathBuf::from(format!("nu_plugin_{}", plugin_name))
};
let mut child = std::process::Command::new(path) let mut child = std::process::Command::new(path)
.stdin(std::process::Stdio::piped()) .stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped())
@ -53,7 +56,7 @@ pub fn plugin(plugin_name: String, args: CommandArgs) -> Result<OutputStream, Sh
let _reader = BufReader::new(stdout); let _reader = BufReader::new(stdout);
let request = JsonRpc::new("init", args.clone()); let request = JsonRpc::new("begin_filter", args.args);
let request_raw = serde_json::to_string(&request).unwrap(); let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes())?; stdin.write(format!("{}\n", request_raw).as_bytes())?;
} }
@ -61,7 +64,8 @@ pub fn plugin(plugin_name: String, args: CommandArgs) -> Result<OutputStream, Sh
let mut eos = VecDeque::new(); let mut eos = VecDeque::new();
eos.push_back(Value::Primitive(Primitive::EndOfStream)); eos.push_back(Value::Primitive(Primitive::EndOfStream));
let stream = input let stream = args
.input
.chain(eos) .chain(eos)
.map(move |v| match v { .map(move |v| match v {
Value::Primitive(Primitive::EndOfStream) => { Value::Primitive(Primitive::EndOfStream) => {
@ -90,20 +94,30 @@ pub fn plugin(plugin_name: String, args: CommandArgs) -> Result<OutputStream, Sh
Ok(_) => { Ok(_) => {
let response = serde_json::from_str::<NuResult>(&input); let response = serde_json::from_str::<NuResult>(&input);
match response { match response {
Ok(NuResult::response { params }) => params, Ok(NuResult::response { params }) => match params {
Err(_) => { Ok(params) => params,
Err(e) => {
let mut result = VecDeque::new();
result.push_back(ReturnValue::Value(Value::Error(Box::new(e))));
result
}
},
Err(e) => {
let mut result = VecDeque::new(); let mut result = VecDeque::new();
result.push_back(ReturnValue::Value(Value::Error(Box::new( result.push_back(ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Error while processing input"), ShellError::string(format!(
"Error while processing input: {:?} {}",
e, input
)),
)))); ))));
result result
} }
} }
} }
Err(_) => { Err(e) => {
let mut result = VecDeque::new(); let mut result = VecDeque::new();
result.push_back(ReturnValue::Value(Value::Error(Box::new( result.push_back(ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Error while processing input"), ShellError::string(format!("Error while processing input: {:?}", e)),
)))); ))));
result result
} }
@ -114,3 +128,31 @@ pub fn plugin(plugin_name: String, args: CommandArgs) -> Result<OutputStream, Sh
Ok(stream.boxed()) Ok(stream.boxed())
} }
pub fn sink_plugin(plugin_name: String, args: SinkCommandArgs) -> Result<(), ShellError> {
let mut path = std::path::PathBuf::from(".");
path.push("target");
path.push("debug");
path.push(format!("nu_plugin_{}", plugin_name));
path = if path.exists() {
path
} else {
std::path::PathBuf::from(format!("nu_plugin_{}", plugin_name))
};
let mut child = std::process::Command::new(path)
.stdin(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn child process");
let stdin = child.stdin.as_mut().expect("Failed to open stdin");
let request = JsonRpc::new("sink", (args.args, args.input));
let request_raw = serde_json::to_string(&request).unwrap();
stdin.write(format!("{}\n", request_raw).as_bytes())?;
let _ = child.wait();
Ok(())
}

View File

@ -20,6 +20,10 @@ impl Command for SkipWhile {
optional_positional: vec![], optional_positional: vec![],
rest_positional: false, rest_positional: false,
named: indexmap::IndexMap::new(), named: indexmap::IndexMap::new(),
is_filter: true,
is_sink: false,
can_load: vec![],
can_save: vec![],
} }
} }
} }

View File

@ -19,6 +19,10 @@ impl Command for Where {
optional_positional: vec![], optional_positional: vec![],
rest_positional: false, rest_positional: false,
named: indexmap::IndexMap::new(), named: indexmap::IndexMap::new(),
is_filter: true,
is_sink: false,
can_load: vec![],
can_save: vec![],
} }
} }
} }

View File

@ -15,6 +15,7 @@ mod format;
mod git; mod git;
mod object; mod object;
mod parser; mod parser;
mod plugin;
mod prelude; mod prelude;
mod shell; mod shell;
mod stream; mod stream;
@ -22,7 +23,9 @@ mod stream;
pub use crate::commands::command::ReturnValue; pub use crate::commands::command::ReturnValue;
pub use crate::parser::parse::span::SpannedItem; pub use crate::parser::parse::span::SpannedItem;
pub use crate::parser::Spanned; pub use crate::parser::Spanned;
pub use crate::plugin::{serve_plugin, Plugin};
pub use cli::cli; pub use cli::cli;
pub use errors::ShellError; pub use errors::ShellError;
pub use object::base::{Primitive, Value}; pub use object::base::{Primitive, Value};
pub use parser::parse::text::Text; pub use parser::parse::text::Text;
pub use parser::registry::{Args, CommandConfig};

View File

@ -5,17 +5,18 @@ use derive_new::new;
use getset::Getters; use getset::Getters;
use indexmap::IndexMap; use indexmap::IndexMap;
use log::trace; use log::trace;
use serde::{Deserialize, Serialize};
use std::fmt; use std::fmt;
#[allow(unused)] #[allow(unused)]
#[derive(Debug)] #[derive(Debug, Serialize, Deserialize)]
pub enum NamedType { pub enum NamedType {
Switch, Switch,
Mandatory(NamedValue), Mandatory(NamedValue),
Optional(NamedValue), Optional(NamedValue),
} }
#[derive(Debug)] #[derive(Debug, Serialize, Deserialize)]
pub enum NamedValue { pub enum NamedValue {
Single, Single,
@ -33,7 +34,7 @@ impl NamedValue {
} }
#[allow(unused)] #[allow(unused)]
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PositionalType { pub enum PositionalType {
Value(String), Value(String),
Block(String), Block(String),
@ -55,17 +56,21 @@ impl PositionalType {
} }
} }
#[derive(Debug, Getters)] #[derive(Debug, Getters, Serialize, Deserialize)]
#[get = "crate"] #[get = "crate"]
pub struct CommandConfig { pub struct CommandConfig {
crate name: String, pub name: String,
crate mandatory_positional: Vec<PositionalType>, pub mandatory_positional: Vec<PositionalType>,
crate optional_positional: Vec<PositionalType>, pub optional_positional: Vec<PositionalType>,
crate rest_positional: bool, pub rest_positional: bool,
crate named: IndexMap<String, NamedType>, pub named: IndexMap<String, NamedType>,
pub is_filter: bool,
pub is_sink: bool,
pub can_load: Vec<String>,
pub can_save: Vec<String>,
} }
#[derive(Debug, Default, new)] #[derive(Debug, Default, new, Serialize, Deserialize)]
pub struct Args { pub struct Args {
pub positional: Option<Vec<Spanned<Value>>>, pub positional: Option<Vec<Spanned<Value>>>,
pub named: Option<IndexMap<String, Spanned<Value>>>, pub named: Option<IndexMap<String, Spanned<Value>>>,

101
src/plugin.rs Normal file
View File

@ -0,0 +1,101 @@
use crate::{Args, CommandConfig, ReturnValue, ShellError, Value};
use serde::{Deserialize, Serialize};
use std::io;
pub trait Plugin {
fn config(&mut self) -> Result<CommandConfig, ShellError> {
Err(ShellError::string("`config` not implemented in plugin"))
}
#[allow(unused)]
fn begin_filter(&mut self, args: Args) -> Result<(), ShellError> {
Err(ShellError::string(
"`begin_filter` not implemented in plugin",
))
}
#[allow(unused)]
fn filter(&mut self, input: Value) -> Result<Vec<ReturnValue>, ShellError> {
Err(ShellError::string("`filter` not implemented in plugin"))
}
#[allow(unused)]
fn sink(&mut self, args: Args, input: Vec<Value>) {}
fn quit(&mut self) {
return;
}
}
pub fn serve_plugin(plugin: &mut dyn Plugin) {
loop {
let mut input = String::new();
match io::stdin().read_line(&mut input) {
Ok(_) => {
let command = serde_json::from_str::<NuCommand>(&input);
match command {
Ok(NuCommand::config) => {
send_response(plugin.config());
}
Ok(NuCommand::begin_filter { params }) => {
let _ = plugin.begin_filter(params);
}
Ok(NuCommand::filter { params }) => {
send_response(plugin.filter(params));
}
Ok(NuCommand::sink { params }) => {
plugin.sink(params.0, params.1);
break;
}
Ok(NuCommand::quit) => {
plugin.quit();
break;
}
e => {
send_response(ShellError::string(format!(
"Could not handle plugin message: {:?}",
e,
)));
break;
}
}
}
e => {
send_response(ShellError::string(format!(
"Could not handle plugin message: {:?}",
e,
)));
break;
}
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpc<T> {
jsonrpc: String,
pub method: String,
pub params: T,
}
impl<T> JsonRpc<T> {
pub fn new<U: Into<String>>(method: U, params: T) -> Self {
JsonRpc {
jsonrpc: "2.0".into(),
method: method.into(),
params,
}
}
}
fn send_response<T: Serialize>(result: T) {
let response = JsonRpc::new("response", result);
let response_raw = serde_json::to_string(&response).unwrap();
println!("{}", response_raw);
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method")]
#[allow(non_camel_case_types)]
pub enum NuCommand {
config,
begin_filter { params: Args },
filter { params: Value },
sink { params: (Args, Vec<Value>) },
quit,
}

View File

@ -1,102 +1,49 @@
use nu::{Primitive, ReturnValue, ShellError, Spanned, Value}; use nu::{serve_plugin, Args, Plugin, Primitive, ReturnValue, ShellError, Spanned, Value};
use serde::{Deserialize, Serialize};
use std::io;
/// A wrapper for proactive notifications to the IDE (eg. diagnostics). These must struct Inc {
/// follow the JSON 2.0 RPC spec inc_by: i64,
#[derive(Debug, Serialize, Deserialize)]
pub struct JsonRpc<T> {
jsonrpc: String,
pub method: String,
pub params: Vec<T>,
} }
impl<T> JsonRpc<T> { impl Inc {
pub fn new<U: Into<String>>(method: U, params: Vec<T>) -> Self { fn new() -> Inc {
JsonRpc { Inc { inc_by: 1 }
jsonrpc: "2.0".into(),
method: method.into(),
params,
}
} }
} }
fn send_response<T: Serialize>(result: Vec<T>) { impl Plugin for Inc {
let response = JsonRpc::new("response", result); fn begin_filter(&mut self, args: Args) -> Result<(), ShellError> {
let response_raw = serde_json::to_string(&response).unwrap(); if let Some(args) = args.positional {
println!("{}", response_raw); for arg in args {
} match arg {
#[derive(Debug, Serialize, Deserialize)] Spanned {
#[serde(tag = "method")] item: Value::Primitive(Primitive::Int(i)),
#[allow(non_camel_case_types)] ..
pub enum NuCommand { } => {
init { params: Vec<Spanned<Value>> }, self.inc_by = i;
filter { params: Value },
quit,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut inc_by = 1;
loop {
let mut input = String::new();
match io::stdin().read_line(&mut input) {
Ok(_) => {
let command = serde_json::from_str::<NuCommand>(&input);
match command {
Ok(NuCommand::init { params }) => {
for param in params {
match param {
Spanned {
item: Value::Primitive(Primitive::Int(i)),
..
} => {
inc_by = i;
}
_ => {
send_response(vec![ReturnValue::Value(Value::Error(
Box::new(ShellError::string("Unrecognized type in params")),
))]);
}
}
}
}
Ok(NuCommand::filter { params }) => match params {
Value::Primitive(Primitive::Int(i)) => {
send_response(vec![ReturnValue::Value(Value::int(i + inc_by))]);
}
Value::Primitive(Primitive::Bytes(b)) => {
send_response(vec![ReturnValue::Value(Value::bytes(
b + inc_by as u64,
))]);
}
x => {
send_response(vec![ReturnValue::Value(Value::Error(Box::new(
ShellError::string(format!("Unrecognized type in stream: {:?}", x)),
)))]);
}
},
Ok(NuCommand::quit) => {
break;
}
Err(e) => {
send_response(vec![ReturnValue::Value(Value::Error(Box::new(
ShellError::string(format!(
"Unrecognized type in stream: {} {:?}",
input, e
)),
)))]);
} }
_ => return Err(ShellError::string("Unrecognized type in params")),
} }
} }
Err(_) => {
send_response(vec![ReturnValue::Value(Value::Error(Box::new(
ShellError::string(format!("Unrecognized type in stream: {}", input)),
)))]);
}
} }
Ok(())
} }
Ok(()) fn filter(&mut self, input: Value) -> Result<Vec<ReturnValue>, ShellError> {
match input {
Value::Primitive(Primitive::Int(i)) => {
Ok(vec![ReturnValue::Value(Value::int(i + self.inc_by))])
}
Value::Primitive(Primitive::Bytes(b)) => Ok(vec![ReturnValue::Value(Value::bytes(
b + self.inc_by as u64,
))]),
x => Err(ShellError::string(format!(
"Unrecognized type in stream: {:?}",
x
))),
}
}
}
fn main() {
serve_plugin(&mut Inc::new());
} }

View File

@ -1,83 +1,33 @@
use nu::{Primitive, ReturnValue, ShellError, Spanned, Value}; use nu::{serve_plugin, Args, Plugin, Primitive, Value};
use serde::{Deserialize, Serialize};
use std::io;
/// A wrapper for proactive notifications to the IDE (eg. diagnostics). These must struct Sum;
/// follow the JSON 2.0 RPC spec
#[derive(Debug, Serialize, Deserialize)] impl Sum {
pub struct JsonRpc<T> { fn new() -> Sum {
jsonrpc: String, Sum
pub method: String,
pub params: Vec<T>,
}
impl<T> JsonRpc<T> {
pub fn new<U: Into<String>>(method: U, params: Vec<T>) -> Self {
JsonRpc {
jsonrpc: "2.0".into(),
method: method.into(),
params,
}
} }
} }
fn send_response<T: Serialize>(result: Vec<T>) { impl Plugin for Sum {
let response = JsonRpc::new("response", result); fn sink(&mut self, _args: Args, input: Vec<Value>) {
let response_raw = serde_json::to_string(&response).unwrap(); let mut total = 0i64;
println!("{}", response_raw);
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "method")]
#[allow(non_camel_case_types)]
pub enum NuCommand {
init { params: Vec<Spanned<Value>> },
filter { params: Value },
quit,
}
fn main() -> Result<(), Box<dyn std::error::Error>> { for v in input {
let mut total = 0i64; match v {
Value::Primitive(Primitive::Int(i)) => {
loop { total += i;
let mut input = String::new();
match io::stdin().read_line(&mut input) {
Ok(_) => {
let command = serde_json::from_str::<NuCommand>(&input);
match command {
Ok(NuCommand::init { .. }) => {}
Ok(NuCommand::filter { params }) => match params {
Value::Primitive(Primitive::Int(i)) => {
total += i as i64;
send_response(vec![ReturnValue::Value(Value::int(total))]);
}
Value::Primitive(Primitive::Bytes(b)) => {
total += b as i64;
send_response(vec![ReturnValue::Value(Value::bytes(total as u64))]);
}
_ => {
send_response(vec![ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Unrecognized type in stream"),
)))]);
}
},
Ok(NuCommand::quit) => {
break;
}
Err(_) => {
send_response(vec![ReturnValue::Value(Value::Error(Box::new(
ShellError::string("Unrecognized type in stream"),
)))]);
}
} }
} Value::Primitive(Primitive::Bytes(i)) => {
Err(_) => { total += i as i64;
send_response(vec![ReturnValue::Value(Value::Error(Box::new( }
ShellError::string("Unrecognized type in stream"), _ => {}
)))]);
} }
} }
}
Ok(()) println!("Result: {}", total);
}
}
fn main() {
serve_plugin(&mut Sum::new());
} }