From fb57ec71bb01083c4403dded4a056b05c9a84e01 Mon Sep 17 00:00:00 2001
From: Dale LaBossiere
+ * A variety of aggregation processing models are provided. + *
* Changes in a partition's contents trigger an invocation of
@@ -78,8 +101,9 @@ public interface TWindow
* Each partition "batch" triggers an invocation of
* {@code batcher.apply(tuples, key)}, where {@code tuples} is
@@ -105,7 +129,84 @@ public interface TWindow
+ * Periodically trigger an invocation of
+ * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List
+ * A non-null {@code aggregator} result is added to the returned stream.
+ *
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param Tuple type
+ * @param period how often to invoke the aggregator
+ * @param unit TimeUnit for {@code period}
+ * @param aggregator
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of partitions in this window.
+ *
+ * @see #aggregate(BiFunction)
+ */
+ TStream timedAggregate(long period, TimeUnit unit, BiFunction
+ * Periodically trigger an invocation of
+ * {@code batcher.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List
+ * A non-null {@code batcher} result is added to the returned stream.
+ * The partition's contents are cleared after a batch is processed.
+ *
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
+ *
+ * @param Tuple type
+ * @param period how often to invoke the batcher
+ * @param unit TimeUnit for {@code period}
+ * @param batcher
+ * Logic to aggregation a partition.
+ * @return A stream that contains the latest aggregations of partitions in this window.
+ *
+ * @see #batch(BiFunction)
+ */
+ TStream timedBatch(long period, TimeUnit unit, BiFunction
+ * This method makes it easier to create aggregation streams
+ * using the lower level {@code Window} API to construct and
+ * supply windows with configurations not directly exposed by {@code TWindow}.
+ *
+ * @param window the window implementation
+ * @param aggregator the aggregation function
+ *
+ * @return A stream that contains the latest aggregations of partitions in this window.
+ */
+ > TStream process(Window, K, U> aggregator);
/**
- * Declares a stream that represents a batched aggregation of
- * partitions in this window.
+ * Declares a stream that represents a
+ * content change triggered batched aggregation of
+ * partitions in this window.
*
, K, U> batcher);
+
+ /**
+ * Declares a stream that is a continuous, sliding,
+ * timer triggered aggregation of
+ * partitions in this window.
+ *
, K, U> aggregator);
+ /**
+ * Declares a stream that represents a
+ * timer triggered batched aggregation of
+ * partitions in this window.
+ *
, K, U> batcher);
+
+ /**
+ * Declares a stream that represents an aggregation of
+ * partitions in this window using the specified {@link Window}.
+ *
, K, U> aggregator);
+
/**
* Returns the key function used to map tuples to partitions.
* @return Key function used to map tuples to partitions.
diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/AbstractTWindow.java b/spi/topology/src/main/java/quarks/topology/spi/graph/AbstractTWindow.java
index 64c0b5ea..2c2b941d 100644
--- a/spi/topology/src/main/java/quarks/topology/spi/graph/AbstractTWindow.java
+++ b/spi/topology/src/main/java/quarks/topology/spi/graph/AbstractTWindow.java
@@ -18,10 +18,16 @@ Licensed to the Apache Software Foundation (ASF) under one
*/
package quarks.topology.spi.graph;
+import java.util.List;
+
+import quarks.function.BiFunction;
import quarks.function.Function;
+import quarks.function.Functions;
+import quarks.oplet.window.Aggregate;
import quarks.topology.TStream;
import quarks.topology.TWindow;
import quarks.topology.Topology;
+import quarks.window.Window;
public abstract class AbstractTWindow
, K, U> aggregator) {
+ aggregator = Functions.synchronizedBiFunction(aggregator);
+ Aggregate
,K, U> processor) {
- processor = Functions.synchronizedBiFunction(processor);
Window
, K, U> batcher) {
- batcher = Functions.synchronizedBiFunction(batcher);
Window
, K, U> batcher) {
Policies.processWhenFullAndEvict(size),
getKeyFunction(),
() -> new ArrayList
,K, U> processor) {
+ return null; // TODO the window impl
+// Window
, K, U> batcher) {
+ return null; // TODO the window impl
+// Window
,K, U> processor) {
- processor = Functions.synchronizedBiFunction(processor);
Window
,K, U> processor) {
processOnInsert(),
getKeyFunction(),
insertionTimeList());
-
- Aggregate
, K, U> batcher) {
- batcher = Functions.synchronizedBiFunction(batcher);
Window
, K, U> batcher) {
(partition, tuple) -> {},
getKeyFunction(),
() -> new ArrayList
,K, U> aggregator) {
+ return null; // TODO the window impl
+// Window
, K, U> batcher) {
+ return null; // TODO the window impl
+// Window