-
Notifications
You must be signed in to change notification settings - Fork 136
[WIP] [COMMENTS?] [QUARKS-230] Add timer triggered window aggregations #167
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,11 @@ Licensed to the Apache Software Foundation (ASF) under one | |
package quarks.topology; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import quarks.function.BiFunction; | ||
import quarks.function.Function; | ||
import quarks.window.Window; | ||
|
||
/** | ||
* Partitioned window of tuples. Logically a window | ||
|
@@ -36,6 +38,26 @@ Licensed to the Apache Software Foundation (ASF) under one | |
* then each key has its own window containing the last | ||
* three tuples with the same key obtained from the tuple's identity using {@code getId()}. | ||
* </P> | ||
* <P> | ||
* A variety of aggregation processing models are provided. | ||
* <UL> | ||
* <LI>Partition content change triggered aggregations or timer triggered aggregations.</LI> | ||
* <LI>Sliding aggregators or batch aggregators. | ||
* A sliding aggregator does not affect the contents of a partition; | ||
* a particular tuple may be included in multiple aggregations. | ||
* Sliding aggregators are typically used to perform "signal smoothing" | ||
* over the specified window. | ||
* <BR> | ||
* Batch aggregators process partition contents in batches, clearing | ||
* the contents after a batch is processed; a particular tuple is | ||
* present in at most one aggregation. | ||
* Batch aggregators are often used to perform data reduction | ||
* - e.g., reducing a stream of 1KHz sampled sensor values down | ||
* to a 1Hz stream of aggregated sensor values.</LI> | ||
* </UL> | ||
* Additionally, {@link #process(Window, BiFunction)} enables use of the | ||
* lower level {@link Window} API to define and use additional | ||
* aggregation processing models. | ||
* | ||
* @param <T> Tuple type | ||
* @param <K> Partition key type | ||
|
@@ -45,7 +67,8 @@ Licensed to the Apache Software Foundation (ASF) under one | |
*/ | ||
public interface TWindow<T, K> extends TopologyElement { | ||
/** | ||
* Declares a stream that is a continuous, sliding, aggregation of | ||
* Declares a stream that is a continuous, sliding, | ||
* content change triggered aggregation of | ||
* partitions in this window. | ||
* <P> | ||
* Changes in a partition's contents trigger an invocation of | ||
|
@@ -78,8 +101,9 @@ public interface TWindow<T, K> extends TopologyElement { | |
<U> TStream<U> aggregate(BiFunction<List<T>, K, U> aggregator); | ||
|
||
/** | ||
* Declares a stream that represents a batched aggregation of | ||
* partitions in this window. | ||
* Declares a stream that represents a | ||
* content change triggered batched aggregation of | ||
* partitions in this window. | ||
* <P> | ||
* Each partition "batch" triggers an invocation of | ||
* {@code batcher.apply(tuples, key)}, where {@code tuples} is | ||
|
@@ -105,7 +129,84 @@ public interface TWindow<T, K> extends TopologyElement { | |
* @return A stream that contains the latest aggregations of partitions in this window. | ||
*/ | ||
<U> TStream<U> batch(BiFunction<List<T>, K, U> batcher); | ||
|
||
/** | ||
* Declares a stream that is a continuous, sliding, | ||
* timer triggered aggregation of | ||
* partitions in this window. | ||
* <P> | ||
* Periodically trigger an invocation of | ||
* {@code aggregator.apply(tuples, key)}, where {@code tuples} is | ||
* a {@code List<T>} containing all the tuples in the partition in | ||
* insertion order from oldest to newest. The list is stable | ||
* during the aggregator invocation. | ||
* The list will be empty if the partition is empty. | ||
* </P> | ||
* <P> | ||
* A non-null {@code aggregator} result is added to the returned stream. | ||
* </P> | ||
* <P> | ||
* Thus the returned stream will contain a sequence of tuples where the | ||
* most recent tuple represents the most up to date aggregation of a | ||
* partition. | ||
* | ||
* @param <U> Tuple type | ||
* @param period how often to invoke the aggregator | ||
* @param unit TimeUnit for {@code period} | ||
* @param aggregator | ||
* Logic to aggregation a partition. | ||
* @return A stream that contains the latest aggregations of partitions in this window. | ||
* | ||
* @see #aggregate(BiFunction) | ||
*/ | ||
<U> TStream<U> timedAggregate(long period, TimeUnit unit, BiFunction<List<T>, K, U> aggregator); | ||
|
||
/** | ||
* Declares a stream that represents a | ||
* timer triggered batched aggregation of | ||
* partitions in this window. | ||
* <P> | ||
* Periodically trigger an invocation of | ||
* {@code batcher.apply(tuples, key)}, where {@code tuples} is | ||
* a {@code List<T>} containing all the tuples in the partition in | ||
* insertion order from oldest to newest The list is stable | ||
* during the batcher invocation. | ||
* The list will be empty if the partition is empty. | ||
* <P> | ||
* A non-null {@code batcher} result is added to the returned stream. | ||
* The partition's contents are cleared after a batch is processed. | ||
* </P> | ||
* <P> | ||
* Thus the returned stream will contain a sequence of tuples where the | ||
* most recent tuple represents the most up to date aggregation of a | ||
* partition. | ||
* | ||
* @param <U> Tuple type | ||
* @param period how often to invoke the batcher | ||
* @param unit TimeUnit for {@code period} | ||
* @param batcher | ||
* Logic to aggregation a partition. | ||
* @return A stream that contains the latest aggregations of partitions in this window. | ||
* | ||
* @see #batch(BiFunction) | ||
*/ | ||
<U> TStream<U> timedBatch(long period, TimeUnit unit, BiFunction<List<T>, K, U> batcher); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand this. Batch processing is process all the tuples as a batch and then discard them, how would it be different to the timedAggregate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a timedAggregate() aggregation invocation doesn't evict anything afterwards (only the window's count/time config affects eviction, just like the non-timed aggregate()). Hence a tuple, if it isn't evicted between aggregator invocations, will be included in multiple aggregations. A batched aggregation, timer triggered or content change triggered, always evicts everything hence a tuple will be included in at most one aggregation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still a bit lost, What does this do that is different, e.g. how does this behave against a last(3) or last(10, SECONDS) ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So you're asking "why have timedBatch", right? A timedBatch and timedAggregate yield the same aggregations when the trigger interval is => the effectiveWindowWidth... because the effectiveWindowWidth will have flushed all tuples since the last batch/agg in both cases. "effectiveWindowWidth" => with predictable/regular tuple arrival rates, one can specify a last(N) and last(S sec) that yield the same window content at any time. e.g., with a 1 tup/sec arrival rate, last(10) and last(10sec) are equivalent. The behavior of the two is different if the trigger rate is shorter than the effectiveWindowWidth. That said, observe that the last(10)-timedBatch(3sec) with 1tps arrival yields the same result as either last(3)-batch() or last(3sec)-batch(). Right? So if timedBatch yields the same result as timedAggregate when the trigger period is >= effectiveWindowWidth, and it yields the same result as those untimed batch() when timedBatch trigger period is < effectiveWindowWidth, then why have timedBatch()? I believe there are only equivalences in the cases where the tuple arrival is regular/reliable -- not bursty or lossy. In other cases I don't think you can come up with a last(N) and last(Ssec) that are equivalent. Hence none of these equivalences are possible. Hence timedBatch() isn't redundant. e g., even the trigger period >= effectiveWindowWidth case, when there is burstyness, a last(10sec) window can contain different collections of tuples than a last(N) window, for any N (more or less depending on the burstyness and values of N). Right? Am I missing/misthinking something and there are always equiv configs to any timedBatch() config? Where's a signal-processing guy when you need them? :-) |
||
|
||
/** | ||
* Declares a stream that represents an aggregation of | ||
* partitions in this window using the specified {@link Window}. | ||
* <P> | ||
* This method makes it easier to create aggregation streams | ||
* using the lower level {@code Window} API to construct and | ||
* supply windows with configurations not directly exposed by {@code TWindow}. | ||
* | ||
* @param window the window implementation | ||
* @param aggregator the aggregation function | ||
* | ||
* @return A stream that contains the latest aggregations of partitions in this window. | ||
*/ | ||
<U, L extends List<T>> TStream<U> process(Window<T,K,L> window, BiFunction<List<T>, K, U> aggregator); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be explained a bit more, how does the passed in window relate to the window this is defined on? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uh... :-) Yeah, something's not quite right in that the TWindow.last(...) established the time-based/count-based nature of the TWindow and as presented above, the supplied Window can violate that. Maybe it's more like TStream.last(window, processor)? Mostly I was just trying to expose a way the make it easier to utilize the Window API without having to know about other impl details like synchronizing the processor, creating an Aggregator oplet, ... |
||
|
||
/** | ||
* Returns the key function used to map tuples to partitions. | ||
* @return Key function used to map tuples to partitions. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the issues I had with this in a previous system was that with many partitions the behaviour was not desired in that if the partition did not change the window still fired, thus wasting cpu cycles to produce the same result. Thus I wonder if it should be more along the lines of:
Aggregation of window partition on any partition change with a minimum period of
period
between aggregations.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A reasonable question.
In the particular use case that came up, it was OK/desirable that an unchanged partition still yielded an aggregation. e.g., the, less than perfect, interface that was desired between the device and the iothub was to publish events on the "current location" of the device even if it hadn't moved a meaningful distance.
A more efficient, less chatty, device/iothub interface would have been to only publish under that condition.
So maybe a timed-aggregator interface that only supported timed-trigger-if-changed semantics might not be acceptible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if the continue sending updates even if nothing has changed might be better handled by a separate operation, then it could be applied to anything, rather than just a window.
Something like pass any input tuple to the output, but send the last tuple if nothing has been received for the declared period.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... per-partition. Sounds like that separate operation could be a count(1)-timedAggregate-evenIfDidntChange. Does it make sense to force such a user to have to use a timed-trigger-if-changed window followed by this separate operation rather than just use a count(N)-timedAggregate-evenIfDidntChange? (I can imagine it would be ok, just want to be sure)