Added the ability to send an http request from a reader

This commit is contained in:
Jack Wright 2024-06-28 10:56:54 -07:00
parent d41e13e539
commit 60e44ffbb0

View File

@ -216,13 +216,13 @@ pub fn send_request2(
send_cancellable_request(&request_url, Box::new(|| request.call()), ctrl_c)
}
HttpBody::ByteStream(byte_stream) => {
let bytes = byte_stream.into_bytes()?;
let req = if let Some(content_type) = content_type {
request.set("Content-Type", &content_type)
} else {
request
};
send_cancellable_request(
&request_url,
Box::new(move || request.send_bytes(&bytes)),
ctrl_c,
)
send_cancellable_request_bytes(&request_url, req, byte_stream, ctrl_c)
}
HttpBody::Value(body) => {
let body_type = match content_type {
@ -352,6 +352,61 @@ fn send_cancellable_request(
}
}
// Helper method used to make blocking HTTP request calls cancellable with ctrl+c
// ureq functions can block for a long time (default 30s?) while attempting to make an HTTP connection
fn send_cancellable_request_bytes(
request_url: &str,
request: Request,
byte_stream: ByteStream,
ctrl_c: Option<Arc<AtomicBool>>,
) -> Result<Response, ShellErrorOrRequestError> {
let (tx, rx) = mpsc::channel::<Result<Response, ShellErrorOrRequestError>>();
let request_url_string = request_url.to_string();
// Make the blocking request on a background thread...
std::thread::Builder::new()
.name("HTTP requester".to_string())
.spawn(move || {
let ret = byte_stream
.reader()
.ok_or_else(|| {
ShellErrorOrRequestError::ShellError(ShellError::GenericError {
error: "Could not read byte stream".to_string(),
msg: "".into(),
span: None,
help: None,
inner: vec![],
})
})
.and_then(|reader| {
request.send(reader).map_err(|e| {
ShellErrorOrRequestError::RequestError(request_url_string, Box::new(e))
})
});
// may fail if the user has cancelled the operation
let _ = tx.send(ret);
})
.map_err(ShellError::from)?;
// ...and poll the channel for responses
loop {
if nu_utils::ctrl_c::was_pressed(&ctrl_c) {
// Return early and give up on the background thread. The connection will either time out or be disconnected
return Err(ShellErrorOrRequestError::ShellError(
ShellError::InterruptedByUser { span: None },
));
}
// 100ms wait time chosen arbitrarily
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(result) => return result,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => panic!("http response channel disconnected"),
}
}
}
pub fn request_set_timeout(
timeout: Option<Value>,
mut request: Request,