diff --git a/api/topology/src/main/java/quarks/topology/TStream.java b/api/topology/src/main/java/quarks/topology/TStream.java index 57888a6a..8f711d10 100644 --- a/api/topology/src/main/java/quarks/topology/TStream.java +++ b/api/topology/src/main/java/quarks/topology/TStream.java @@ -31,6 +31,7 @@ Licensed to the Apache Software Foundation (ASF) under one import quarks.function.ToIntFunction; import quarks.function.UnaryOperator; import quarks.oplet.core.FanIn; +import quarks.oplet.core.Peek; import quarks.oplet.core.Pipe; import quarks.oplet.core.Sink; @@ -249,6 +250,18 @@ else if (WARNING.equals(lr.getLevel())) */ TStream peek(Consumer peeker); + /** + * Declare a stream that contains the same contents as this stream while + * peeking at each element using {@code oplet}.
+ * For each tuple {@code t} on this stream, {@code oplet.peek(t)} will be + * called. + * + * @param oplet + * the {@link Peek} oplet. + * @return {@code this} + */ + TStream peek(Peek oplet); + /** * Sink (terminate) this stream using a function. For each tuple {@code t} on this stream * {@link Consumer#accept(Object) sinker.accept(t)} will be called. This is diff --git a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java index 51068ff7..1663e046 100644 --- a/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java +++ b/spi/topology/src/main/java/quarks/topology/spi/graph/ConnectorStream.java @@ -144,9 +144,13 @@ public > EnumMap> split(Class enumClass, Funct @Override public TStream peek(Consumer peeker) { - peeker = Functions.synchronizedConsumer(peeker); - connector.peek(new Peek(peeker)); - return this; + return peek(new Peek(Functions.synchronizedConsumer(peeker))); + } + + @Override + public TStream peek(quarks.oplet.core.Peek peek) { + connector.peek(peek); + return this; } @Override @@ -158,6 +162,8 @@ public TSink sink(Sink oplet) { @Override public TStream pipe(Pipe pipe) { + if (pipe instanceof quarks.oplet.core.Peek) + throw new IllegalArgumentException("Use peek() to add Peek oplets"); return connectPipe(pipe); } diff --git a/utils/metrics/src/main/java/quarks/metrics/Metrics.java b/utils/metrics/src/main/java/quarks/metrics/Metrics.java index e0c9d799..e5240f90 100644 --- a/utils/metrics/src/main/java/quarks/metrics/Metrics.java +++ b/utils/metrics/src/main/java/quarks/metrics/Metrics.java @@ -30,26 +30,34 @@ public class Metrics { /** * Increment a counter metric when peeking at each tuple. * + *

+ * Same as {@code stream.peek(new CounterOp())} + *

+ * * @param * TStream tuple type - * @param stream to stream to instrument - * @return a {@link TStream} containing the input tuples + * @param stream the stream to monitor + * @return the {@code stream} argument */ public static TStream counter(TStream stream) { - return stream.pipe(new CounterOp()); + return stream.peek(new CounterOp()); } /** * Measure current tuple throughput and calculate one-, five-, and * fifteen-minute exponentially-weighted moving averages. * + *

+ * Same as {@code stream.peek(new RateMeter())} + *

+ * * @param * TStream tuple type - * @param stream to stream to instrument - * @return a {@link TStream} containing the input tuples + * @param stream the stream to monitor + * @return the {@code stream} argument */ public static TStream rateMeter(TStream stream) { - return stream.pipe(new RateMeter()); + return stream.peek(new RateMeter()); } /** diff --git a/utils/metrics/src/test/java/quarks/test/metrics/MetricsEverywhereTest.java b/utils/metrics/src/test/java/quarks/test/metrics/MetricsEverywhereTest.java index 62cdf799..7da35225 100644 --- a/utils/metrics/src/test/java/quarks/test/metrics/MetricsEverywhereTest.java +++ b/utils/metrics/src/test/java/quarks/test/metrics/MetricsEverywhereTest.java @@ -74,7 +74,7 @@ public void automaticMetricCleanup1() throws Exception { Topology t = newTopology(); AtomicInteger n = new AtomicInteger(0); TStream ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS); - ints.pipe(new TestOplet()); + ints.peek(new TestOplet()); // Submit job Future fj = getSubmitter().submit(t); @@ -104,8 +104,8 @@ public void automaticMetricCleanup2() throws Exception { Topology t = newTopology(); AtomicInteger n = new AtomicInteger(0); TStream ints = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS); - TStream ints2 = ints.pipe(new TestOplet()); - ints2.pipe(new TestOplet()); + TStream ints2 = ints.peek(new TestOplet()); + ints2.peek(new TestOplet()); // Submit job Future fj = getSubmitter().submit(t); @@ -154,6 +154,20 @@ public void metricsEverywhereSplit() throws Exception { * OP_0 -- OP_1(Split) ----- OP_4 (Sink) * \ * -- OP_5 (Sink) + * + * Note, OP_2 (Counter) is a peek oplet and as such + * is part of the split[0] stream's peek-chain. + * Metrics insertion only occurs at the end of a peek-chain, + * so there is NOT an injected metric between + * OP_1(Split) -> OP_2 (Counter). + * + * The net is metrics are injected where the "#" are: + * + * -- OP_2 (Counter) -#- OP_3 (Sink) + * / + * OP_0 -#- OP_1(Split) ---#- OP_4 (Sink) + * \ + * -#- OP_5 (Sink) */ Topology t = newTopology(); Graph g = t.graph();