Refactor window
This commit is contained in:
parent
12838e6286
commit
5778a64ddf
|
@ -1,5 +1,6 @@
|
|||
use nu_engine::command_prelude::*;
|
||||
use nu_protocol::ListStream;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Chunks;
|
||||
|
@ -89,26 +90,33 @@ impl Command for Chunks {
|
|||
span: chunk_size.span(),
|
||||
})?;
|
||||
|
||||
if size == 0 {
|
||||
return Err(ShellError::IncorrectValue {
|
||||
msg: "`chunk_size` cannot be zero".into(),
|
||||
val_span: chunk_size.span(),
|
||||
call_span: head,
|
||||
});
|
||||
}
|
||||
let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue {
|
||||
msg: "`chunk_size` cannot be zero".into(),
|
||||
val_span: chunk_size.span(),
|
||||
call_span: head,
|
||||
})?;
|
||||
|
||||
match input {
|
||||
PipelineData::Value(Value::List { vals, .. }, metadata) => {
|
||||
let chunks = ChunksIter::new(vals, size, head);
|
||||
let stream = ListStream::new(chunks, head, engine_state.signals().clone());
|
||||
Ok(PipelineData::ListStream(stream, metadata))
|
||||
}
|
||||
PipelineData::ListStream(stream, metadata) => {
|
||||
let stream = stream.modify(|iter| ChunksIter::new(iter, size, head));
|
||||
Ok(PipelineData::ListStream(stream, metadata))
|
||||
}
|
||||
input => Err(input.unsupported_input_error("list", head)),
|
||||
chunks(engine_state, input, size, head)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn chunks(
|
||||
engine_state: &EngineState,
|
||||
input: PipelineData,
|
||||
chunk_size: NonZeroUsize,
|
||||
span: Span,
|
||||
) -> Result<PipelineData, ShellError> {
|
||||
match input {
|
||||
PipelineData::Value(Value::List { vals, .. }, metadata) => {
|
||||
let chunks = ChunksIter::new(vals, chunk_size, span);
|
||||
let stream = ListStream::new(chunks, span, engine_state.signals().clone());
|
||||
Ok(PipelineData::ListStream(stream, metadata))
|
||||
}
|
||||
PipelineData::ListStream(stream, metadata) => {
|
||||
let stream = stream.modify(|iter| ChunksIter::new(iter, chunk_size, span));
|
||||
Ok(PipelineData::ListStream(stream, metadata))
|
||||
}
|
||||
input => Err(input.unsupported_input_error("list", span)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,10 +127,10 @@ struct ChunksIter<I: Iterator<Item = Value>> {
|
|||
}
|
||||
|
||||
impl<I: Iterator<Item = Value>> ChunksIter<I> {
|
||||
fn new(iter: impl IntoIterator<IntoIter = I>, size: usize, span: Span) -> Self {
|
||||
fn new(iter: impl IntoIterator<IntoIter = I>, size: NonZeroUsize, span: Span) -> Self {
|
||||
Self {
|
||||
iter: iter.into_iter(),
|
||||
size,
|
||||
size: size.into(),
|
||||
span,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use nu_engine::command_prelude::*;
|
||||
use nu_protocol::ValueIterator;
|
||||
use nu_protocol::ListStream;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Window;
|
||||
|
@ -12,8 +13,8 @@ impl Command for Window {
|
|||
fn signature(&self) -> Signature {
|
||||
Signature::build("window")
|
||||
.input_output_types(vec![(
|
||||
Type::List(Box::new(Type::Any)),
|
||||
Type::List(Box::new(Type::List(Box::new(Type::Any)))),
|
||||
Type::list(Type::Any),
|
||||
Type::list(Type::list(Type::Any)),
|
||||
)])
|
||||
.required("window_size", SyntaxShape::Int, "The size of each window.")
|
||||
.named(
|
||||
|
@ -77,116 +78,169 @@ impl Command for Window {
|
|||
input: PipelineData,
|
||||
) -> Result<PipelineData, ShellError> {
|
||||
let head = call.head;
|
||||
let group_size: Spanned<usize> = call.req(engine_state, stack, 0)?;
|
||||
let metadata = input.metadata();
|
||||
let stride: Option<usize> = call.get_flag(engine_state, stack, "stride")?;
|
||||
let window_size: Value = call.req(engine_state, stack, 0)?;
|
||||
let stride: Option<Value> = call.get_flag(engine_state, stack, "stride")?;
|
||||
let remainder = call.has_flag(engine_state, stack, "remainder")?;
|
||||
|
||||
let stride = stride.unwrap_or(1);
|
||||
let size =
|
||||
usize::try_from(window_size.as_int()?).map_err(|_| ShellError::NeedsPositiveValue {
|
||||
span: window_size.span(),
|
||||
})?;
|
||||
|
||||
//FIXME: add in support for external redirection when engine-q supports it generally
|
||||
let size = NonZeroUsize::try_from(size).map_err(|_| ShellError::IncorrectValue {
|
||||
msg: "`window_size` cannot be zero".into(),
|
||||
val_span: window_size.span(),
|
||||
call_span: head,
|
||||
})?;
|
||||
|
||||
let each_group_iterator = EachWindowIterator {
|
||||
group_size: group_size.item,
|
||||
input: Box::new(input.into_iter()),
|
||||
span: head,
|
||||
previous: None,
|
||||
stride,
|
||||
remainder,
|
||||
let stride = if let Some(stride_val) = stride {
|
||||
let stride = usize::try_from(stride_val.as_int()?).map_err(|_| {
|
||||
ShellError::NeedsPositiveValue {
|
||||
span: stride_val.span(),
|
||||
}
|
||||
})?;
|
||||
|
||||
NonZeroUsize::try_from(stride).map_err(|_| ShellError::IncorrectValue {
|
||||
msg: "`stride` cannot be zero".into(),
|
||||
val_span: stride_val.span(),
|
||||
call_span: head,
|
||||
})?
|
||||
} else {
|
||||
NonZeroUsize::MIN
|
||||
};
|
||||
|
||||
Ok(each_group_iterator.into_pipeline_data_with_metadata(
|
||||
head,
|
||||
engine_state.signals().clone(),
|
||||
metadata,
|
||||
))
|
||||
if remainder && size == stride {
|
||||
super::chunks::chunks(engine_state, input, size, head)
|
||||
} else if stride >= size {
|
||||
match input {
|
||||
PipelineData::Value(Value::List { vals, .. }, metadata) => {
|
||||
let chunks = WindowGapIter::new(vals, size, stride, remainder, head);
|
||||
let stream = ListStream::new(chunks, head, engine_state.signals().clone());
|
||||
Ok(PipelineData::ListStream(stream, metadata))
|
||||
}
|
||||
PipelineData::ListStream(stream, metadata) => {
|
||||
let stream = stream
|
||||
.modify(|iter| WindowGapIter::new(iter, size, stride, remainder, head));
|
||||
Ok(PipelineData::ListStream(stream, metadata))
|
||||
}
|
||||
input => Err(input.unsupported_input_error("list", head)),
|
||||
}
|
||||
} else {
|
||||
match input {
|
||||
PipelineData::Value(Value::List { vals, .. }, metadata) => {
|
||||
let chunks = WindowOverlapIter::new(vals, size, stride, remainder, head);
|
||||
let stream = ListStream::new(chunks, head, engine_state.signals().clone());
|
||||
Ok(PipelineData::ListStream(stream, metadata))
|
||||
}
|
||||
PipelineData::ListStream(stream, metadata) => {
|
||||
let stream = stream
|
||||
.modify(|iter| WindowOverlapIter::new(iter, size, stride, remainder, head));
|
||||
Ok(PipelineData::ListStream(stream, metadata))
|
||||
}
|
||||
input => Err(input.unsupported_input_error("list", head)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct EachWindowIterator {
|
||||
group_size: usize,
|
||||
input: ValueIterator,
|
||||
span: Span,
|
||||
previous: Option<Vec<Value>>,
|
||||
struct WindowOverlapIter<I: Iterator<Item = Value>> {
|
||||
iter: I,
|
||||
window: Vec<Value>,
|
||||
stride: usize,
|
||||
remainder: bool,
|
||||
span: Span,
|
||||
}
|
||||
|
||||
impl Iterator for EachWindowIterator {
|
||||
impl<I: Iterator<Item = Value>> WindowOverlapIter<I> {
|
||||
fn new(
|
||||
iter: impl IntoIterator<IntoIter = I>,
|
||||
size: NonZeroUsize,
|
||||
stride: NonZeroUsize,
|
||||
remainder: bool,
|
||||
span: Span,
|
||||
) -> Self {
|
||||
Self {
|
||||
iter: iter.into_iter(),
|
||||
window: Vec::with_capacity(size.into()),
|
||||
stride: stride.into(),
|
||||
remainder,
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Iterator<Item = Value>> Iterator for WindowOverlapIter<I> {
|
||||
type Item = Value;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let mut group = self.previous.take().unwrap_or_else(|| {
|
||||
let mut vec = Vec::new();
|
||||
|
||||
// We default to a Vec of capacity size + stride as striding pushes n extra elements to the end
|
||||
vec.try_reserve(self.group_size + self.stride).ok();
|
||||
|
||||
vec
|
||||
});
|
||||
let mut current_count = 0;
|
||||
|
||||
if group.is_empty() {
|
||||
loop {
|
||||
let item = self.input.next();
|
||||
|
||||
match item {
|
||||
Some(v) => {
|
||||
group.push(v);
|
||||
|
||||
current_count += 1;
|
||||
if current_count >= self.group_size {
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if self.remainder {
|
||||
break;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let len = if self.window.is_empty() {
|
||||
self.window.capacity()
|
||||
} else {
|
||||
// our historic buffer is already full, so stride instead
|
||||
self.stride
|
||||
};
|
||||
|
||||
loop {
|
||||
let item = self.input.next();
|
||||
self.window.extend((&mut self.iter).take(len));
|
||||
|
||||
match item {
|
||||
Some(v) => {
|
||||
group.push(v);
|
||||
|
||||
current_count += 1;
|
||||
if current_count >= self.stride {
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if self.remainder {
|
||||
break;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We now have elements + stride in our group, and need to
|
||||
// drop the skipped elements. Drain to preserve allocation and capacity
|
||||
// Dropping this iterator consumes it.
|
||||
group.drain(..self.stride.min(group.len()));
|
||||
if self.window.len() == self.window.capacity()
|
||||
|| (self.remainder && !self.window.is_empty())
|
||||
{
|
||||
let mut next = Vec::with_capacity(self.window.len());
|
||||
next.extend(self.window.iter().skip(self.stride).cloned());
|
||||
let window = std::mem::replace(&mut self.window, next);
|
||||
Some(Value::list(window, self.span))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if group.is_empty() {
|
||||
return None;
|
||||
struct WindowGapIter<I: Iterator<Item = Value>> {
|
||||
iter: I,
|
||||
size: usize,
|
||||
skip: usize,
|
||||
first: bool,
|
||||
remainder: bool,
|
||||
span: Span,
|
||||
}
|
||||
|
||||
impl<I: Iterator<Item = Value>> WindowGapIter<I> {
|
||||
fn new(
|
||||
iter: impl IntoIterator<IntoIter = I>,
|
||||
size: NonZeroUsize,
|
||||
stride: NonZeroUsize,
|
||||
remainder: bool,
|
||||
span: Span,
|
||||
) -> Self {
|
||||
let size = size.into();
|
||||
Self {
|
||||
iter: iter.into_iter(),
|
||||
size,
|
||||
skip: stride.get() - size,
|
||||
first: true,
|
||||
remainder,
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let return_group = group.clone();
|
||||
self.previous = Some(group);
|
||||
impl<I: Iterator<Item = Value>> Iterator for WindowGapIter<I> {
|
||||
type Item = Value;
|
||||
|
||||
Some(Value::list(return_group, self.span))
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let mut window = Vec::with_capacity(self.size);
|
||||
window.extend(
|
||||
(&mut self.iter)
|
||||
.skip(if self.first { 0 } else { self.skip })
|
||||
.take(self.size),
|
||||
);
|
||||
|
||||
self.first = false;
|
||||
|
||||
if window.len() == self.size || (self.remainder && !window.is_empty()) {
|
||||
Some(Value::list(window, self.span))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user