diff --git a/api/topology/src/main/java/quarks/topology/TWindow.java b/api/topology/src/main/java/quarks/topology/TWindow.java index 91a4a17b..924e0b18 100644 --- a/api/topology/src/main/java/quarks/topology/TWindow.java +++ b/api/topology/src/main/java/quarks/topology/TWindow.java @@ -19,9 +19,11 @@ Licensed to the Apache Software Foundation (ASF) under one package quarks.topology; import java.util.List; +import java.util.concurrent.TimeUnit; import quarks.function.BiFunction; import quarks.function.Function; +import quarks.window.Window; /** * Partitioned window of tuples. Logically a window @@ -36,6 +38,26 @@ Licensed to the Apache Software Foundation (ASF) under one * then each key has its own window containing the last * three tuples with the same key obtained from the tuple's identity using {@code getId()}. *

+ *

+ * A variety of aggregation processing models are provided. + *

+ * Additionally, {@link #process(Window, BiFunction)} enables use of the + * lower level {@link Window} API to define and use additional + * aggregation processing models. * * @param Tuple type * @param Partition key type @@ -45,7 +67,8 @@ Licensed to the Apache Software Foundation (ASF) under one */ public interface TWindow extends TopologyElement { /** - * Declares a stream that is a continuous, sliding, aggregation of + * Declares a stream that is a continuous, sliding, + * content change triggered aggregation of * partitions in this window. *

* Changes in a partition's contents trigger an invocation of @@ -78,8 +101,9 @@ public interface TWindow extends TopologyElement { TStream aggregate(BiFunction, K, U> aggregator); /** - * Declares a stream that represents a batched aggregation of - * partitions in this window. + * Declares a stream that represents a + * content change triggered batched aggregation of + * partitions in this window. *

* Each partition "batch" triggers an invocation of * {@code batcher.apply(tuples, key)}, where {@code tuples} is @@ -105,7 +129,84 @@ public interface TWindow extends TopologyElement { * @return A stream that contains the latest aggregations of partitions in this window. */ TStream batch(BiFunction, K, U> batcher); + + /** + * Declares a stream that is a continuous, sliding, + * timer triggered aggregation of + * partitions in this window. + *

+ * Periodically trigger an invocation of + * {@code aggregator.apply(tuples, key)}, where {@code tuples} is + * a {@code List} containing all the tuples in the partition in + * insertion order from oldest to newest. The list is stable + * during the aggregator invocation. + * The list will be empty if the partition is empty. + *

+ *

+ * A non-null {@code aggregator} result is added to the returned stream. + *

+ *

+ * Thus the returned stream will contain a sequence of tuples where the + * most recent tuple represents the most up to date aggregation of a + * partition. + * + * @param Tuple type + * @param period how often to invoke the aggregator + * @param unit TimeUnit for {@code period} + * @param aggregator + * Logic to aggregation a partition. + * @return A stream that contains the latest aggregations of partitions in this window. + * + * @see #aggregate(BiFunction) + */ + TStream timedAggregate(long period, TimeUnit unit, BiFunction, K, U> aggregator); + /** + * Declares a stream that represents a + * timer triggered batched aggregation of + * partitions in this window. + *

+ * Periodically trigger an invocation of + * {@code batcher.apply(tuples, key)}, where {@code tuples} is + * a {@code List} containing all the tuples in the partition in + * insertion order from oldest to newest The list is stable + * during the batcher invocation. + * The list will be empty if the partition is empty. + *

+ * A non-null {@code batcher} result is added to the returned stream. + * The partition's contents are cleared after a batch is processed. + *

+ *

+ * Thus the returned stream will contain a sequence of tuples where the + * most recent tuple represents the most up to date aggregation of a + * partition. + * + * @param Tuple type + * @param period how often to invoke the batcher + * @param unit TimeUnit for {@code period} + * @param batcher + * Logic to aggregation a partition. + * @return A stream that contains the latest aggregations of partitions in this window. + * + * @see #batch(BiFunction) + */ + TStream timedBatch(long period, TimeUnit unit, BiFunction, K, U> batcher); + + /** + * Declares a stream that represents an aggregation of + * partitions in this window using the specified {@link Window}. + *

+ * This method makes it easier to create aggregation streams + * using the lower level {@code Window} API to construct and + * supply windows with configurations not directly exposed by {@code TWindow}. + * + * @param window the window implementation + * @param aggregator the aggregation function + * + * @return A stream that contains the latest aggregations of partitions in this window. + */ + > TStream process(Window window, BiFunction, K, U> aggregator); + /** * Returns the key function used to map tuples to partitions. * @return Key function used to map tuples to partitions. diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/AbstractTWindow.java b/spi/topology/src/main/java/quarks/topology/spi/graph/AbstractTWindow.java index 64c0b5ea..2c2b941d 100644 --- a/spi/topology/src/main/java/quarks/topology/spi/graph/AbstractTWindow.java +++ b/spi/topology/src/main/java/quarks/topology/spi/graph/AbstractTWindow.java @@ -18,10 +18,16 @@ Licensed to the Apache Software Foundation (ASF) under one */ package quarks.topology.spi.graph; +import java.util.List; + +import quarks.function.BiFunction; import quarks.function.Function; +import quarks.function.Functions; +import quarks.oplet.window.Aggregate; import quarks.topology.TStream; import quarks.topology.TWindow; import quarks.topology.Topology; +import quarks.window.Window; public abstract class AbstractTWindow implements TWindow { private final TStream feed; @@ -45,4 +51,12 @@ public Function getKeyFunction() { public TStream feeder() { return feed; } + + @Override + public > TStream process(Window window, BiFunction, K, U> aggregator) { + aggregator = Functions.synchronizedBiFunction(aggregator); + Aggregate op = new Aggregate(window, aggregator); + return feeder().pipe(op); + } + } diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/TWindowImpl.java b/spi/topology/src/main/java/quarks/topology/spi/graph/TWindowImpl.java index cd9fee20..e97cbdbf 100644 --- a/spi/topology/src/main/java/quarks/topology/spi/graph/TWindowImpl.java +++ b/spi/topology/src/main/java/quarks/topology/spi/graph/TWindowImpl.java @@ -23,11 +23,10 @@ Licensed to the Apache Software Foundation (ASF) under one import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; import quarks.function.BiFunction; import quarks.function.Function; -import quarks.function.Functions; -import quarks.oplet.window.Aggregate; import quarks.topology.TStream; import quarks.window.Policies; import quarks.window.Window; @@ -43,15 +42,12 @@ public class TWindowImpl extends AbstractTWindow { @Override public TStream aggregate(BiFunction,K, U> processor) { - processor = Functions.synchronizedBiFunction(processor); Window> window = Windows.lastNProcessOnInsert(size, getKeyFunction()); - Aggregate op = new Aggregate(window, processor); - return feeder().pipe(op); + return process(window, processor); } @Override public TStream batch(BiFunction, K, U> batcher) { - batcher = Functions.synchronizedBiFunction(batcher); Window> window = Windows.window( alwaysInsert(), @@ -60,9 +56,29 @@ public TStream batch(BiFunction, K, U> batcher) { Policies.processWhenFullAndEvict(size), getKeyFunction(), () -> new ArrayList(size)); - - Aggregate op = new Aggregate(window, batcher); - return feeder().pipe(op); + return process(window, batcher); + } + + @Override + public TStream timedAggregate(long period, TimeUnit unit, BiFunction,K, U> processor) { + return null; // TODO the window impl +// Window> window = +// Windows.window( +// alwaysInsert(), +// ... +// ); +// return process(window, aggregator); + } + + @Override + public TStream timedBatch(long period, TimeUnit unit, BiFunction, K, U> batcher) { + return null; // TODO the window impl +// Window> window = +// Windows.window( +// alwaysInsert(), +// ... +// ); +// return process(window, batcher); } /** diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/TWindowTimeImpl.java b/spi/topology/src/main/java/quarks/topology/spi/graph/TWindowTimeImpl.java index a3ef0f3e..accc2331 100644 --- a/spi/topology/src/main/java/quarks/topology/spi/graph/TWindowTimeImpl.java +++ b/spi/topology/src/main/java/quarks/topology/spi/graph/TWindowTimeImpl.java @@ -24,20 +24,18 @@ Licensed to the Apache Software Foundation (ASF) under one import static quarks.window.Policies.processOnInsert; import static quarks.window.Policies.scheduleEvictIfEmpty; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + import quarks.function.BiFunction; import quarks.function.Function; -import quarks.function.Functions; -import quarks.oplet.window.Aggregate; import quarks.topology.TStream; import quarks.window.InsertionTimeList; import quarks.window.Policies; import quarks.window.Window; import quarks.window.Windows; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class TWindowTimeImpl extends AbstractTWindow { private long time; private TimeUnit unit; @@ -66,7 +64,6 @@ public class TWindowTimeImpl extends AbstractTWindow { */ @Override public TStream aggregate(BiFunction,K, U> processor) { - processor = Functions.synchronizedBiFunction(processor); Window> window = Windows.window( alwaysInsert(), @@ -75,14 +72,11 @@ public TStream aggregate(BiFunction,K, U> processor) { processOnInsert(), getKeyFunction(), insertionTimeList()); - - Aggregate op = new Aggregate(window, processor); - return feeder().pipe(op); + return process(window, processor); } @Override public TStream batch(BiFunction, K, U> batcher) { - batcher = Functions.synchronizedBiFunction(batcher); Window> window = Windows.window( alwaysInsert(), @@ -91,9 +85,29 @@ public TStream batch(BiFunction, K, U> batcher) { (partition, tuple) -> {}, getKeyFunction(), () -> new ArrayList()); - - Aggregate op = new Aggregate(window, batcher); - return feeder().pipe(op); + return process(window, batcher); + } + + @Override + public TStream timedAggregate(long period, TimeUnit unit, BiFunction,K, U> aggregator) { + return null; // TODO the window impl +// Window> window = +// Windows.window( +// alwaysInsert(), +// ... +// ); +// return process(window, aggregator); + } + + @Override + public TStream timedBatch(long period, TimeUnit unit, BiFunction, K, U> batcher) { + return null; // TODO the window impl +// Window> window = +// Windows.window( +// alwaysInsert(), +// ... +// ); +// return process(window, batcher); } /**