Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish with confirm listener added #56

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.appform.dropwizard.actors</groupId>
<artifactId>dropwizard-rabbitmq-actors</artifactId>
<version>2.0.28-1</version>
<version>3.0.2_Test</version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why changing versions manually

<name>Dropwizard RabbitMQ Bundle</name>
<url>https://github.com/santanusinha/dropwizard-rabbitmq-actors</url>
<description>Provides actor abstraction on RabbitMQ for dropwizard based projects.</description>
Expand Down Expand Up @@ -92,6 +92,7 @@
<guava.version>31.0.1-jre</guava.version>
<test.container.version>1.0.6</test.container.version>
<okhttp.version>4.9.3</okhttp.version>
<mockito.version>4.4.0</mockito.version>
<amqp-client.version>5.14.1</amqp-client.version>
</properties>

Expand All @@ -112,6 +113,12 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>
Expand Down Expand Up @@ -186,7 +193,7 @@
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<artifactId>junit-dep</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand All @@ -200,6 +207,11 @@
<artifactId>junit-vintage-engine</artifactId>
<version>5.8.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import io.appform.dropwizard.actors.connectivity.strategy.SharedConnectionStrategy;
import io.appform.dropwizard.actors.exceptionhandler.ExceptionHandlingFactory;
import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand Down Expand Up @@ -124,6 +127,11 @@ public final void publish(Message message, AMQP.BasicProperties properties) thro
publishActor().publish(message, properties);
}

public final List<Message> publishWithConfirmListener(List<Message> messages, AMQP.BasicProperties properties,
long timeout, @NotNull TimeUnit unit) throws Exception {
return publishActor().publishWithConfirmListener(messages, properties, timeout, unit);
}

public final long pendingMessagesCount() {
return publishActor().pendingMessagesCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
import io.appform.dropwizard.actors.actor.DelayType;
import io.appform.dropwizard.actors.base.utils.NamingUtils;
import io.appform.dropwizard.actors.connectivity.RMQConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;

Expand Down Expand Up @@ -106,6 +114,93 @@ public final long pendingSidelineMessagesCount() {
return Long.MAX_VALUE;
}

/**
* @param messages : Messages to be published
* @param properties
* @param timeout : in MS timeout for waiting on countDownLatch
* @param unit : timeout unit
* @return : List of message nacked
* @throws Exception
*/
public List<Message> publishWithConfirmListener(List<Message> messages, AMQP.BasicProperties properties,
long timeout, @NotNull TimeUnit unit) throws Exception {
publishChannel.confirmSelect();
ConcurrentNavigableMap<Long, Message> outstandingConfirms = new ConcurrentSkipListMap<>();
List<Message> nackedMessages = new ArrayList<>();
CountDownLatch publishAckLatch = new CountDownLatch(messages.size());

publishChannel.addConfirmListener((sequenceNumber, multiple) -> {
messagesAck(sequenceNumber, multiple, outstandingConfirms, publishAckLatch);
}, (sequenceNumber, multiple) -> {
nackedMessages.addAll(messagesNack(sequenceNumber, multiple, outstandingConfirms, publishAckLatch));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just assign the return value rather than this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there can be multiple callback with multiple = true

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad

});


long startTime = System.nanoTime();

for (Message message : messages) {
try {
String routingKey = NamingUtils.getRoutingKey(queueName, config);
outstandingConfirms.put(publishChannel.getNextPublishSeqNo(), message);
publishChannel.basicPublish(config.getExchange(), routingKey, properties,
mapper().writeValueAsBytes(message));
} catch (Exception e) {
log.error(String.format("Failed to publish Message : %s with exception %s", message, e));
publishAckLatch.countDown();
}
}

if (!publishAckLatch.await(timeout, unit)) {
log.error("Timed out waiting for publish acks");
}

long endTime = System.nanoTime();

log.info(String.format("Published %d messages with confirmListener in %d ms. Total Messages : %d", messages.size() - outstandingConfirms.size(),
Duration.ofNanos(startTime - endTime).toMillis(), messages.size()));
nackedMessages.addAll(outstandingConfirms.values());
return nackedMessages;
}


private void messagesAck(long sequenceNumber, boolean multiple, ConcurrentNavigableMap<Long, Message> outstandingConfirms, CountDownLatch publishAckLatch)
{
if (multiple) {
ConcurrentNavigableMap<Long, Message> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
for(int i =0;i<confirmed.size();i++)
publishAckLatch.countDown();
confirmed.clear();
} else {
publishAckLatch.countDown();
outstandingConfirms.remove(sequenceNumber);
}
}

private List<Message> messagesNack(long sequenceNumber, boolean multiple, ConcurrentNavigableMap<Long, Message> outstandingConfirms, CountDownLatch publishAckLatch)
{
List<Message> nackedMessages = new ArrayList<>();
if(multiple == true)
{
ConcurrentNavigableMap<Long, Message> nacked = outstandingConfirms.headMap(
sequenceNumber, true
);
for(int i =0;i<nacked.size();i++)
publishAckLatch.countDown();
nackedMessages.addAll(nacked.values());
nacked.clear();
}
else
{
publishAckLatch.countDown();
nackedMessages.add(outstandingConfirms.get(sequenceNumber));
outstandingConfirms.remove(sequenceNumber);
}
return nackedMessages;
}


public void start() throws Exception {
final String exchange = config.getExchange();
final String dlx = config.getExchange() + "_SIDELINE";
Expand Down Expand Up @@ -193,4 +288,8 @@ protected final RMQConnection connection() {
protected final ObjectMapper mapper() {
return mapper;
}

public void setPublishChannel(Channel publishChannel){
this.publishChannel = publishChannel;
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.appform.dropwizard.actors.base.utils;

import io.appform.dropwizard.actors.actor.ActorConfig;
import io.appform.dropwizard.actors.utils.CommonUtils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomUtils;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NamingUtils {
Expand All @@ -26,8 +28,21 @@ public static String prefixWithNamespace(String name) {
return String.format("%s.%s", namespace, name);
}

public static String getRoutingKey(String queueName, ActorConfig config) {
String routingKey = queueName;
if (config.isSharded()) {
int shardId = getShardId(config);
routingKey = getShardedQueueName(queueName, shardId);
}
return routingKey;
}

public static String getShardedQueueName(String queueName, int shardId) {
return queueName + "_" + shardId;
}

private static int getShardId(ActorConfig config) {
return RandomUtils.nextInt(0, config.getShardCount());
}

}
Loading