Make HTTP requests cancellable when trying to connect (#8591)
Closes #8585. Prior to this change, the `http` commands could get stuck for 30s while attempting to make a connection to a remote server. After this change, `ctrl+c` works as expected:  To make this work, we perform blocking `ureq` calls in a background thread and poll the channel while checking `ctrl+c`.
This commit is contained in:
parent
dd22647fcd
commit
b4b68afa17
|
@ -13,6 +13,9 @@ use std::collections::HashMap;
|
||||||
use std::io::BufReader;
|
use std::io::BufReader;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::mpsc::{self, RecvTimeoutError};
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
|
@ -146,13 +149,11 @@ pub fn send_request(
|
||||||
request: Request,
|
request: Request,
|
||||||
body: Option<Value>,
|
body: Option<Value>,
|
||||||
content_type: Option<String>,
|
content_type: Option<String>,
|
||||||
|
ctrl_c: Option<Arc<AtomicBool>>,
|
||||||
) -> Result<Response, ShellErrorOrRequestError> {
|
) -> Result<Response, ShellErrorOrRequestError> {
|
||||||
let request_url = request.url().to_string();
|
let request_url = request.url().to_string();
|
||||||
let error_handler = |err: Error| -> ShellErrorOrRequestError {
|
|
||||||
ShellErrorOrRequestError::RequestError(request_url, err)
|
|
||||||
};
|
|
||||||
if body.is_none() {
|
if body.is_none() {
|
||||||
return request.call().map_err(error_handler);
|
return send_cancellable_request(&request_url, Box::new(|| request.call()), ctrl_c);
|
||||||
}
|
}
|
||||||
let body = body.expect("Should never be none.");
|
let body = body.expect("Should never be none.");
|
||||||
|
|
||||||
|
@ -162,11 +163,19 @@ pub fn send_request(
|
||||||
_ => BodyType::Unknown,
|
_ => BodyType::Unknown,
|
||||||
};
|
};
|
||||||
match body {
|
match body {
|
||||||
Value::Binary { val, .. } => request.send_bytes(&val).map_err(error_handler),
|
Value::Binary { val, .. } => send_cancellable_request(
|
||||||
Value::String { val, .. } => request.send_string(&val).map_err(error_handler),
|
&request_url,
|
||||||
|
Box::new(move || request.send_bytes(&val)),
|
||||||
|
ctrl_c,
|
||||||
|
),
|
||||||
|
Value::String { val, .. } => send_cancellable_request(
|
||||||
|
&request_url,
|
||||||
|
Box::new(move || request.send_string(&val)),
|
||||||
|
ctrl_c,
|
||||||
|
),
|
||||||
Value::Record { .. } if body_type == BodyType::Json => {
|
Value::Record { .. } if body_type == BodyType::Json => {
|
||||||
let data = value_to_json_value(&body);
|
let data = value_to_json_value(&body);
|
||||||
request.send_json(data).map_err(error_handler)
|
send_cancellable_request(&request_url, Box::new(|| request.send_json(data)), ctrl_c)
|
||||||
}
|
}
|
||||||
Value::Record { cols, vals, .. } if body_type == BodyType::Form => {
|
Value::Record { cols, vals, .. } if body_type == BodyType::Form => {
|
||||||
let mut data: Vec<(String, String)> = Vec::with_capacity(cols.len());
|
let mut data: Vec<(String, String)> = Vec::with_capacity(cols.len());
|
||||||
|
@ -176,12 +185,15 @@ pub fn send_request(
|
||||||
data.push((col.clone(), val_string))
|
data.push((col.clone(), val_string))
|
||||||
}
|
}
|
||||||
|
|
||||||
let data = data
|
let request_fn = move || {
|
||||||
.iter()
|
// coerce `data` into a shape that send_form() is happy with
|
||||||
.map(|(a, b)| (a.as_str(), b.as_str()))
|
let data = data
|
||||||
.collect::<Vec<(&str, &str)>>();
|
.iter()
|
||||||
|
.map(|(a, b)| (a.as_str(), b.as_str()))
|
||||||
request.send_form(&data[..]).map_err(error_handler)
|
.collect::<Vec<(&str, &str)>>();
|
||||||
|
request.send_form(&data)
|
||||||
|
};
|
||||||
|
send_cancellable_request(&request_url, Box::new(request_fn), ctrl_c)
|
||||||
}
|
}
|
||||||
Value::List { vals, .. } if body_type == BodyType::Form => {
|
Value::List { vals, .. } if body_type == BodyType::Form => {
|
||||||
if vals.len() % 2 != 0 {
|
if vals.len() % 2 != 0 {
|
||||||
|
@ -200,12 +212,15 @@ pub fn send_request(
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<(String, String)>, ShellErrorOrRequestError>>()?;
|
.collect::<Result<Vec<(String, String)>, ShellErrorOrRequestError>>()?;
|
||||||
|
|
||||||
let data = data
|
let request_fn = move || {
|
||||||
.iter()
|
// coerce `data` into a shape that send_form() is happy with
|
||||||
.map(|(a, b)| (a.as_str(), b.as_str()))
|
let data = data
|
||||||
.collect::<Vec<(&str, &str)>>();
|
.iter()
|
||||||
|
.map(|(a, b)| (a.as_str(), b.as_str()))
|
||||||
request.send_form(&data).map_err(error_handler)
|
.collect::<Vec<(&str, &str)>>();
|
||||||
|
request.send_form(&data)
|
||||||
|
};
|
||||||
|
send_cancellable_request(&request_url, Box::new(request_fn), ctrl_c)
|
||||||
}
|
}
|
||||||
_ => Err(ShellErrorOrRequestError::ShellError(ShellError::IOError(
|
_ => Err(ShellErrorOrRequestError::ShellError(ShellError::IOError(
|
||||||
"unsupported body input".into(),
|
"unsupported body input".into(),
|
||||||
|
@ -213,6 +228,46 @@ pub fn send_request(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper method used to make blocking HTTP request calls cancellable with ctrl+c
|
||||||
|
// ureq functions can block for a long time (default 30s?) while attempting to make an HTTP connection
|
||||||
|
fn send_cancellable_request(
|
||||||
|
request_url: &str,
|
||||||
|
request_fn: Box<dyn FnOnce() -> Result<Response, Error> + Sync + Send>,
|
||||||
|
ctrl_c: Option<Arc<AtomicBool>>,
|
||||||
|
) -> Result<Response, ShellErrorOrRequestError> {
|
||||||
|
let (tx, rx) = mpsc::channel::<Result<Response, Error>>();
|
||||||
|
|
||||||
|
// Make the blocking request on a background thread...
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name("HTTP requester".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
let ret = request_fn();
|
||||||
|
let _ = tx.send(ret); // may fail if the user has cancelled the operation
|
||||||
|
})
|
||||||
|
.expect("Failed to create thread");
|
||||||
|
|
||||||
|
// ...and poll the channel for responses
|
||||||
|
loop {
|
||||||
|
if nu_utils::ctrl_c::was_pressed(&ctrl_c) {
|
||||||
|
// Return early and give up on the background thread. The connection will either time out or be disconnected
|
||||||
|
return Err(ShellErrorOrRequestError::ShellError(
|
||||||
|
ShellError::InterruptedByUser { span: None },
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 100ms wait time chosen arbitrarily
|
||||||
|
match rx.recv_timeout(Duration::from_millis(100)) {
|
||||||
|
Ok(result) => {
|
||||||
|
return result.map_err(|e| {
|
||||||
|
ShellErrorOrRequestError::RequestError(request_url.to_string(), e)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(RecvTimeoutError::Timeout) => continue,
|
||||||
|
Err(RecvTimeoutError::Disconnected) => panic!("http response channel disconnected"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn request_set_timeout(
|
pub fn request_set_timeout(
|
||||||
timeout: Option<Value>,
|
timeout: Option<Value>,
|
||||||
mut request: Request,
|
mut request: Request,
|
||||||
|
|
|
@ -184,6 +184,7 @@ fn helper(
|
||||||
args: Arguments,
|
args: Arguments,
|
||||||
) -> Result<PipelineData, ShellError> {
|
) -> Result<PipelineData, ShellError> {
|
||||||
let span = args.url.span()?;
|
let span = args.url.span()?;
|
||||||
|
let ctrl_c = engine_state.ctrlc.clone();
|
||||||
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
||||||
|
|
||||||
let client = http_client(args.insecure);
|
let client = http_client(args.insecure);
|
||||||
|
@ -193,7 +194,7 @@ fn helper(
|
||||||
request = request_add_authorization_header(args.user, args.password, request);
|
request = request_add_authorization_header(args.user, args.password, request);
|
||||||
request = request_add_custom_headers(args.headers, request)?;
|
request = request_add_custom_headers(args.headers, request)?;
|
||||||
|
|
||||||
let response = send_request(request, args.data, args.content_type);
|
let response = send_request(request, args.data, args.content_type, ctrl_c);
|
||||||
|
|
||||||
let request_flags = RequestFlags {
|
let request_flags = RequestFlags {
|
||||||
raw: args.raw,
|
raw: args.raw,
|
||||||
|
|
|
@ -163,6 +163,7 @@ fn helper(
|
||||||
args: Arguments,
|
args: Arguments,
|
||||||
) -> Result<PipelineData, ShellError> {
|
) -> Result<PipelineData, ShellError> {
|
||||||
let span = args.url.span()?;
|
let span = args.url.span()?;
|
||||||
|
let ctrl_c = engine_state.ctrlc.clone();
|
||||||
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
||||||
|
|
||||||
let client = http_client(args.insecure);
|
let client = http_client(args.insecure);
|
||||||
|
@ -172,7 +173,7 @@ fn helper(
|
||||||
request = request_add_authorization_header(args.user, args.password, request);
|
request = request_add_authorization_header(args.user, args.password, request);
|
||||||
request = request_add_custom_headers(args.headers, request)?;
|
request = request_add_custom_headers(args.headers, request)?;
|
||||||
|
|
||||||
let response = send_request(request, None, None);
|
let response = send_request(request, None, None, ctrl_c);
|
||||||
|
|
||||||
let request_flags = RequestFlags {
|
let request_flags = RequestFlags {
|
||||||
raw: args.raw,
|
raw: args.raw,
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use nu_engine::CallExt;
|
use nu_engine::CallExt;
|
||||||
use nu_protocol::ast::Call;
|
use nu_protocol::ast::Call;
|
||||||
use nu_protocol::engine::{Command, EngineState, Stack};
|
use nu_protocol::engine::{Command, EngineState, Stack};
|
||||||
|
@ -126,13 +129,18 @@ fn run_head(
|
||||||
password: call.get_flag(engine_state, stack, "password")?,
|
password: call.get_flag(engine_state, stack, "password")?,
|
||||||
timeout: call.get_flag(engine_state, stack, "max-time")?,
|
timeout: call.get_flag(engine_state, stack, "max-time")?,
|
||||||
};
|
};
|
||||||
|
let ctrl_c = engine_state.ctrlc.clone();
|
||||||
|
|
||||||
helper(call, args)
|
helper(call, args, ctrl_c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function that actually goes to retrieve the resource from the url given
|
// Helper function that actually goes to retrieve the resource from the url given
|
||||||
// The Option<String> return a possible file extension which can be used in AutoConvert commands
|
// The Option<String> return a possible file extension which can be used in AutoConvert commands
|
||||||
fn helper(call: &Call, args: Arguments) -> Result<PipelineData, ShellError> {
|
fn helper(
|
||||||
|
call: &Call,
|
||||||
|
args: Arguments,
|
||||||
|
ctrlc: Option<Arc<AtomicBool>>,
|
||||||
|
) -> Result<PipelineData, ShellError> {
|
||||||
let span = args.url.span()?;
|
let span = args.url.span()?;
|
||||||
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
||||||
|
|
||||||
|
@ -143,7 +151,7 @@ fn helper(call: &Call, args: Arguments) -> Result<PipelineData, ShellError> {
|
||||||
request = request_add_authorization_header(args.user, args.password, request);
|
request = request_add_authorization_header(args.user, args.password, request);
|
||||||
request = request_add_custom_headers(args.headers, request)?;
|
request = request_add_custom_headers(args.headers, request)?;
|
||||||
|
|
||||||
let response = send_request(request, None, None);
|
let response = send_request(request, None, None, ctrlc);
|
||||||
request_handle_response_headers(span, response)
|
request_handle_response_headers(span, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -174,6 +174,7 @@ fn helper(
|
||||||
args: Arguments,
|
args: Arguments,
|
||||||
) -> Result<PipelineData, ShellError> {
|
) -> Result<PipelineData, ShellError> {
|
||||||
let span = args.url.span()?;
|
let span = args.url.span()?;
|
||||||
|
let ctrl_c = engine_state.ctrlc.clone();
|
||||||
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
||||||
|
|
||||||
let client = http_client(args.insecure);
|
let client = http_client(args.insecure);
|
||||||
|
@ -183,7 +184,7 @@ fn helper(
|
||||||
request = request_add_authorization_header(args.user, args.password, request);
|
request = request_add_authorization_header(args.user, args.password, request);
|
||||||
request = request_add_custom_headers(args.headers, request)?;
|
request = request_add_custom_headers(args.headers, request)?;
|
||||||
|
|
||||||
let response = send_request(request, Some(args.data), args.content_type);
|
let response = send_request(request, Some(args.data), args.content_type, ctrl_c);
|
||||||
|
|
||||||
let request_flags = RequestFlags {
|
let request_flags = RequestFlags {
|
||||||
raw: args.raw,
|
raw: args.raw,
|
||||||
|
|
|
@ -174,6 +174,7 @@ fn helper(
|
||||||
args: Arguments,
|
args: Arguments,
|
||||||
) -> Result<PipelineData, ShellError> {
|
) -> Result<PipelineData, ShellError> {
|
||||||
let span = args.url.span()?;
|
let span = args.url.span()?;
|
||||||
|
let ctrl_c = engine_state.ctrlc.clone();
|
||||||
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
||||||
|
|
||||||
let client = http_client(args.insecure);
|
let client = http_client(args.insecure);
|
||||||
|
@ -183,7 +184,7 @@ fn helper(
|
||||||
request = request_add_authorization_header(args.user, args.password, request);
|
request = request_add_authorization_header(args.user, args.password, request);
|
||||||
request = request_add_custom_headers(args.headers, request)?;
|
request = request_add_custom_headers(args.headers, request)?;
|
||||||
|
|
||||||
let response = send_request(request, Some(args.data), args.content_type);
|
let response = send_request(request, Some(args.data), args.content_type, ctrl_c);
|
||||||
|
|
||||||
let request_flags = RequestFlags {
|
let request_flags = RequestFlags {
|
||||||
raw: args.raw,
|
raw: args.raw,
|
||||||
|
|
|
@ -174,6 +174,7 @@ fn helper(
|
||||||
args: Arguments,
|
args: Arguments,
|
||||||
) -> Result<PipelineData, ShellError> {
|
) -> Result<PipelineData, ShellError> {
|
||||||
let span = args.url.span()?;
|
let span = args.url.span()?;
|
||||||
|
let ctrl_c = engine_state.ctrlc.clone();
|
||||||
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
let (requested_url, _) = http_parse_url(call, span, args.url)?;
|
||||||
|
|
||||||
let client = http_client(args.insecure);
|
let client = http_client(args.insecure);
|
||||||
|
@ -183,7 +184,7 @@ fn helper(
|
||||||
request = request_add_authorization_header(args.user, args.password, request);
|
request = request_add_authorization_header(args.user, args.password, request);
|
||||||
request = request_add_custom_headers(args.headers, request)?;
|
request = request_add_custom_headers(args.headers, request)?;
|
||||||
|
|
||||||
let response = send_request(request, Some(args.data), args.content_type);
|
let response = send_request(request, Some(args.data), args.content_type, ctrl_c);
|
||||||
|
|
||||||
let request_flags = RequestFlags {
|
let request_flags = RequestFlags {
|
||||||
raw: args.raw,
|
raw: args.raw,
|
||||||
|
|
|
@ -1056,6 +1056,13 @@ pub enum ShellError {
|
||||||
#[label("Could not access '{column_name}' on this record")]
|
#[label("Could not access '{column_name}' on this record")]
|
||||||
span: Span,
|
span: Span,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Operation interrupted by user
|
||||||
|
#[error("Operation interrupted by user")]
|
||||||
|
InterruptedByUser {
|
||||||
|
#[label("This operation was interrupted")]
|
||||||
|
span: Option<Span>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<std::io::Error> for ShellError {
|
impl From<std::io::Error> for ShellError {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user