Skip to content
This repository has been archived by the owner on Nov 15, 2019. It is now read-only.

[WIP] [COMMENTS?] [QUARKS-230] Add timer triggered window aggregations #167

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 104 additions & 3 deletions api/topology/src/main/java/quarks/topology/TWindow.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ Licensed to the Apache Software Foundation (ASF) under one
package quarks.topology;

import java.util.List;
import java.util.concurrent.TimeUnit;

import quarks.function.BiFunction;
import quarks.function.Function;
import quarks.window.Window;

/**
* Partitioned window of tuples. Logically a window
Expand All @@ -36,6 +38,26 @@ Licensed to the Apache Software Foundation (ASF) under one
* then each key has its own window containing the last
* three tuples with the same key obtained from the tuple's identity using {@code getId()}.
* </P>
* <P>
* A variety of aggregation processing models are provided.
* <UL>
* <LI>Partition content change triggered aggregations or timer triggered aggregations.</LI>
* <LI>Sliding aggregators or batch aggregators.
* A sliding aggregator does not affect the contents of a partition;
* a particular tuple may be included in multiple aggregations.
* Sliding aggregators are typically used to perform "signal smoothing"
* over the specified window.
* <BR>
* Batch aggregators process partition contents in batches, clearing
* the contents after a batch is processed; a particular tuple is
* present in at most one aggregation.
* Batch aggregators are often used to perform data reduction
* - e.g., reducing a stream of 1KHz sampled sensor values down
* to a 1Hz stream of aggregated sensor values.</LI>
* </UL>
* Additionally, {@link #process(Window, BiFunction)} enables use of the
* lower level {@link Window} API to define and use additional
* aggregation processing models.
*
* @param <T> Tuple type
* @param <K> Partition key type
Expand All @@ -45,7 +67,8 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
public interface TWindow<T, K> extends TopologyElement {
/**
* Declares a stream that is a continuous, sliding, aggregation of
* Declares a stream that is a continuous, sliding,
* content change triggered aggregation of
* partitions in this window.
* <P>
* Changes in a partition's contents trigger an invocation of
Expand Down Expand Up @@ -78,8 +101,9 @@ public interface TWindow<T, K> extends TopologyElement {
<U> TStream<U> aggregate(BiFunction<List<T>, K, U> aggregator);

/**
* Declares a stream that represents a batched aggregation of
* partitions in this window.
* Declares a stream that represents a
* content change triggered batched aggregation of
* partitions in this window.
* <P>
* Each partition "batch" triggers an invocation of
* {@code batcher.apply(tuples, key)}, where {@code tuples} is
Expand All @@ -105,7 +129,84 @@ public interface TWindow<T, K> extends TopologyElement {
* @return A stream that contains the latest aggregations of partitions in this window.
*/
<U> TStream<U> batch(BiFunction<List<T>, K, U> batcher);

/**
* Declares a stream that is a continuous, sliding,
* timer triggered aggregation of
* partitions in this window.
* <P>
* Periodically trigger an invocation of
* {@code aggregator.apply(tuples, key)}, where {@code tuples} is
* a {@code List<T>} containing all the tuples in the partition in
* insertion order from oldest to newest. The list is stable
* during the aggregator invocation.
* The list will be empty if the partition is empty.
* </P>
* <P>
* A non-null {@code aggregator} result is added to the returned stream.
* </P>
* <P>
* Thus the returned stream will contain a sequence of tuples where the
* most recent tuple represents the most up to date aggregation of a
* partition.
*
* @param <U> Tuple type
* @param period how often to invoke the aggregator
* @param unit TimeUnit for {@code period}
* @param aggregator
* Logic to aggregation a partition.
* @return A stream that contains the latest aggregations of partitions in this window.
*
* @see #aggregate(BiFunction)
*/
<U> TStream<U> timedAggregate(long period, TimeUnit unit, BiFunction<List<T>, K, U> aggregator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the issues I had with this in a previous system was that with many partitions the behaviour was not desired in that if the partition did not change the window still fired, thus wasting cpu cycles to produce the same result. Thus I wonder if it should be more along the lines of:

Aggregation of window partition on any partition change with a minimum period of period between aggregations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reasonable question.
In the particular use case that came up, it was OK/desirable that an unchanged partition still yielded an aggregation. e.g., the, less than perfect, interface that was desired between the device and the iothub was to publish events on the "current location" of the device even if it hadn't moved a meaningful distance.
A more efficient, less chatty, device/iothub interface would have been to only publish under that condition.
So maybe a timed-aggregator interface that only supported timed-trigger-if-changed semantics might not be acceptible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if the continue sending updates even if nothing has changed might be better handled by a separate operation, then it could be applied to anything, rather than just a window.

Something like pass any input tuple to the output, but send the last tuple if nothing has been received for the declared period.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... per-partition. Sounds like that separate operation could be a count(1)-timedAggregate-evenIfDidntChange. Does it make sense to force such a user to have to use a timed-trigger-if-changed window followed by this separate operation rather than just use a count(N)-timedAggregate-evenIfDidntChange? (I can imagine it would be ok, just want to be sure)


/**
* Declares a stream that represents a
* timer triggered batched aggregation of
* partitions in this window.
* <P>
* Periodically trigger an invocation of
* {@code batcher.apply(tuples, key)}, where {@code tuples} is
* a {@code List<T>} containing all the tuples in the partition in
* insertion order from oldest to newest The list is stable
* during the batcher invocation.
* The list will be empty if the partition is empty.
* <P>
* A non-null {@code batcher} result is added to the returned stream.
* The partition's contents are cleared after a batch is processed.
* </P>
* <P>
* Thus the returned stream will contain a sequence of tuples where the
* most recent tuple represents the most up to date aggregation of a
* partition.
*
* @param <U> Tuple type
* @param period how often to invoke the batcher
* @param unit TimeUnit for {@code period}
* @param batcher
* Logic to aggregation a partition.
* @return A stream that contains the latest aggregations of partitions in this window.
*
* @see #batch(BiFunction)
*/
<U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>, K, U> batcher);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this. Batch processing is process all the tuples as a batch and then discard them, how would it be different to the timedAggregate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a timedAggregate() aggregation invocation doesn't evict anything afterwards (only the window's count/time config affects eviction, just like the non-timed aggregate()). Hence a tuple, if it isn't evicted between aggregator invocations, will be included in multiple aggregations. A batched aggregation, timer triggered or content change triggered, always evicts everything hence a tuple will be included in at most one aggregation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a bit lost, batch against last(N) or last(time), processes the tuples once in batch mode.

What does this do that is different, e.g. how does this behave against a last(3) or last(10, SECONDS) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're asking "why have timedBatch", right?

A timedBatch and timedAggregate yield the same aggregations when the trigger interval is => the effectiveWindowWidth... because the effectiveWindowWidth will have flushed all tuples since the last batch/agg in both cases. "effectiveWindowWidth" => with predictable/regular tuple arrival rates, one can specify a last(N) and last(S sec) that yield the same window content at any time. e.g., with a 1 tup/sec arrival rate, last(10) and last(10sec) are equivalent.

The behavior of the two is different if the trigger rate is shorter than the effectiveWindowWidth.
e.g., last(10) with a 1 tup/sec arrival rate
timedBatch(3sec) - agg1[1-3], agg2[4-6], agg3[7-9],agg4[10-12],agg5[13-15]
timedAgg(3sec) - agg1[1-3], agg2[1-6], agg3[1-9],agg4[3-12],agg5[6-15]
Right?

That said, observe that the last(10)-timedBatch(3sec) with 1tps arrival yields the same result as either last(3)-batch() or last(3sec)-batch(). Right?

So if timedBatch yields the same result as timedAggregate when the trigger period is >= effectiveWindowWidth, and it yields the same result as those untimed batch() when timedBatch trigger period is < effectiveWindowWidth, then why have timedBatch()?

I believe there are only equivalences in the cases where the tuple arrival is regular/reliable -- not bursty or lossy. In other cases I don't think you can come up with a last(N) and last(Ssec) that are equivalent. Hence none of these equivalences are possible. Hence timedBatch() isn't redundant. e g., even the trigger period >= effectiveWindowWidth case, when there is burstyness, a last(10sec) window can contain different collections of tuples than a last(N) window, for any N (more or less depending on the burstyness and values of N). Right?

Am I missing/misthinking something and there are always equiv configs to any timedBatch() config? Where's a signal-processing guy when you need them? :-)


/**
* Declares a stream that represents an aggregation of
* partitions in this window using the specified {@link Window}.
* <P>
* This method makes it easier to create aggregation streams
* using the lower level {@code Window} API to construct and
* supply windows with configurations not directly exposed by {@code TWindow}.
*
* @param window the window implementation
* @param aggregator the aggregation function
*
* @return A stream that contains the latest aggregations of partitions in this window.
*/
<U, L extends List<T>> TStream<U> process(Window<T,K,L> window, BiFunction<List<T>, K, U> aggregator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be explained a bit more, how does the passed in window relate to the window this is defined on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh... :-) Yeah, something's not quite right in that the TWindow.last(...) established the time-based/count-based nature of the TWindow and as presented above, the supplied Window can violate that. Maybe it's more like TStream.last(window, processor)? Mostly I was just trying to expose a way the make it easier to utilize the Window API without having to know about other impl details like synchronizing the processor, creating an Aggregator oplet, ...


/**
* Returns the key function used to map tuples to partitions.
* @return Key function used to map tuples to partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
package quarks.topology.spi.graph;

import java.util.List;

import quarks.function.BiFunction;
import quarks.function.Function;
import quarks.function.Functions;
import quarks.oplet.window.Aggregate;
import quarks.topology.TStream;
import quarks.topology.TWindow;
import quarks.topology.Topology;
import quarks.window.Window;

public abstract class AbstractTWindow<T, K> implements TWindow<T, K> {
private final TStream<T> feed;
Expand All @@ -45,4 +51,12 @@ public Function<T, K> getKeyFunction() {
public TStream<T> feeder() {
return feed;
}

@Override
public <U, L extends List<T>> TStream<U> process(Window<T,K,L> window, BiFunction<List<T>, K, U> aggregator) {
aggregator = Functions.synchronizedBiFunction(aggregator);
Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, aggregator);
return feeder().pipe(op);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import quarks.function.BiFunction;
import quarks.function.Function;
import quarks.function.Functions;
import quarks.oplet.window.Aggregate;
import quarks.topology.TStream;
import quarks.window.Policies;
import quarks.window.Window;
Expand All @@ -43,15 +42,12 @@ public class TWindowImpl<T, K> extends AbstractTWindow<T, K> {

@Override
public <U> TStream<U> aggregate(BiFunction<List<T>,K, U> processor) {
processor = Functions.synchronizedBiFunction(processor);
Window<T, K, LinkedList<T>> window = Windows.lastNProcessOnInsert(size, getKeyFunction());
Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, processor);
return feeder().pipe(op);
return process(window, processor);
}

@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
batcher = Functions.synchronizedBiFunction(batcher);
Window<T, K, List<T>> window =
Windows.window(
alwaysInsert(),
Expand All @@ -60,9 +56,29 @@ public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
Policies.processWhenFullAndEvict(size),
getKeyFunction(),
() -> new ArrayList<T>(size));

Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
return feeder().pipe(op);
return process(window, batcher);
}

@Override
public <U> TStream<U> timedAggregate(long period, TimeUnit unit, BiFunction<List<T>,K, U> processor) {
return null; // TODO the window impl
// Window<T, K, List<T>> window =
// Windows.window(
// alwaysInsert(),
// ...
// );
// return process(window, aggregator);
}

@Override
public <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>, K, U> batcher) {
return null; // TODO the window impl
// Window<T, K, List<T>> window =
// Windows.window(
// alwaysInsert(),
// ...
// );
// return process(window, batcher);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,18 @@ Licensed to the Apache Software Foundation (ASF) under one
import static quarks.window.Policies.processOnInsert;
import static quarks.window.Policies.scheduleEvictIfEmpty;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import quarks.function.BiFunction;
import quarks.function.Function;
import quarks.function.Functions;
import quarks.oplet.window.Aggregate;
import quarks.topology.TStream;
import quarks.window.InsertionTimeList;
import quarks.window.Policies;
import quarks.window.Window;
import quarks.window.Windows;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class TWindowTimeImpl<T, K> extends AbstractTWindow<T, K> {
private long time;
private TimeUnit unit;
Expand Down Expand Up @@ -66,7 +64,6 @@ public class TWindowTimeImpl<T, K> extends AbstractTWindow<T, K> {
*/
@Override
public <U> TStream<U> aggregate(BiFunction<List<T>,K, U> processor) {
processor = Functions.synchronizedBiFunction(processor);
Window<T, K, InsertionTimeList<T>> window =
Windows.window(
alwaysInsert(),
Expand All @@ -75,14 +72,11 @@ public <U> TStream<U> aggregate(BiFunction<List<T>,K, U> processor) {
processOnInsert(),
getKeyFunction(),
insertionTimeList());

Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, processor);
return feeder().pipe(op);
return process(window, processor);
}

@Override
public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
batcher = Functions.synchronizedBiFunction(batcher);
Window<T, K, List<T>> window =
Windows.window(
alwaysInsert(),
Expand All @@ -91,9 +85,29 @@ public <U> TStream<U> batch(BiFunction<List<T>, K, U> batcher) {
(partition, tuple) -> {},
getKeyFunction(),
() -> new ArrayList<T>());

Aggregate<T,U,K> op = new Aggregate<T,U,K>(window, batcher);
return feeder().pipe(op);
return process(window, batcher);
}

@Override
public <U> TStream<U> timedAggregate(long period, TimeUnit unit, BiFunction<List<T>,K, U> aggregator) {
return null; // TODO the window impl
// Window<T, K, List<T>> window =
// Windows.window(
// alwaysInsert(),
// ...
// );
// return process(window, aggregator);
}

@Override
public <U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>, K, U> batcher) {
return null; // TODO the window impl
// Window<T, K, List<T>> window =
// Windows.window(
// alwaysInsert(),
// ...
// );
// return process(window, batcher);
}

/**
Expand Down