Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made consumer count dynamic per actor #52

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/main/java/io/appform/dropwizard/actors/actor/Actor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.Set;
import java.util.function.Supplier;

/**
* A simpler derivation of {@link BaseActor} to be used in most common actor use cases. This is managed by dropwizard.
Expand All @@ -42,28 +43,28 @@ public abstract class Actor<MessageType extends Enum<MessageType>, Message> exte
@Deprecated
protected Actor(
MessageType type,
ActorConfig config,
Supplier<ActorConfig> configSupplier,
RMQConnection connection,
ObjectMapper mapper,
RetryStrategyFactory retryStrategyFactory,
ExceptionHandlingFactory exceptionHandlingFactory,
Class<? extends Message> clazz,
Set<Class<?>> droppedExceptionTypes) {
super(type.name(), config, connection, mapper, retryStrategyFactory, exceptionHandlingFactory,
super(type.name(), configSupplier, connection, mapper, retryStrategyFactory, exceptionHandlingFactory,
clazz, droppedExceptionTypes);
this.type = type;
}

protected Actor(
MessageType type,
ActorConfig config,
Supplier<ActorConfig> configSupplier,
ConnectionRegistry connectionRegistry,
ObjectMapper mapper,
RetryStrategyFactory retryStrategyFactory,
ExceptionHandlingFactory exceptionHandlingFactory,
Class<? extends Message> clazz,
Set<Class<?>> droppedExceptionTypes) {
super(type.name(), config, connectionRegistry, mapper, retryStrategyFactory, exceptionHandlingFactory,
super(type.name(), configSupplier, connectionRegistry, mapper, retryStrategyFactory, exceptionHandlingFactory,
clazz, droppedExceptionTypes);
this.type = type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.util.Collections;
import java.util.Set;
import java.util.function.Supplier;

/**
* This is a managed wrapper for {@link UnmanagedBaseActor} this is managed and therefore started by D/W.
Expand Down Expand Up @@ -65,7 +66,7 @@ protected BaseActor(UnmanagedPublisher<Message> produceActor,
@Deprecated
protected BaseActor(
String name,
ActorConfig config,
Supplier<ActorConfig> configSupplier,
RMQConnection connection,
ObjectMapper mapper,
RetryStrategyFactory retryStrategyFactory,
Expand All @@ -75,15 +76,15 @@ protected BaseActor(
this.droppedExceptionTypes
= null == droppedExceptionTypes
? Collections.emptySet() : droppedExceptionTypes;
actorImpl = new UnmanagedBaseActor<>(name, config, connection, mapper, retryStrategyFactory,
actorImpl = new UnmanagedBaseActor<>(name, configSupplier, connection, mapper, retryStrategyFactory,
exceptionHandlingFactory, clazz,
this::handle,
this::isExceptionIgnorable);
}

protected BaseActor(
String name,
ActorConfig config,
Supplier<ActorConfig> configSupplier,
ConnectionRegistry connectionRegistry,
ObjectMapper mapper,
RetryStrategyFactory retryStrategyFactory,
Expand All @@ -93,7 +94,7 @@ protected BaseActor(
this.droppedExceptionTypes
= null == droppedExceptionTypes
? Collections.emptySet() : droppedExceptionTypes;
actorImpl = new UnmanagedBaseActor<>(name, config, connectionRegistry, mapper, retryStrategyFactory,
actorImpl = new UnmanagedBaseActor<>(name, configSupplier, connectionRegistry, mapper, retryStrategyFactory,
exceptionHandlingFactory, clazz,
this::handle,
this::isExceptionIgnorable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.commons.lang3.NotImplementedException;

import java.util.function.Function;
import java.util.function.Supplier;

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
Expand All @@ -62,35 +63,35 @@ public UnmanagedBaseActor(UnmanagedPublisher<Message> publishActor,

public UnmanagedBaseActor(
String name,
ActorConfig config,
Supplier<ActorConfig> configSupplier,
RMQConnection connection,
ObjectMapper mapper,
RetryStrategyFactory retryStrategyFactory,
ExceptionHandlingFactory exceptionHandlingFactory,
Class<? extends Message> clazz,
MessageHandlingFunction<Message, Boolean> handlerFunction,
Function<Throwable, Boolean> errorCheckFunction) {
this(new UnmanagedPublisher<>(name, config, connection, mapper),
this(new UnmanagedPublisher<>(name, configSupplier, connection, mapper),
new UnmanagedConsumer<>(
name, config, connection, mapper, retryStrategyFactory, exceptionHandlingFactory, clazz,
name, configSupplier, connection, mapper, retryStrategyFactory, exceptionHandlingFactory, clazz,
handlerFunction, errorCheckFunction));
}

public UnmanagedBaseActor(
String name,
ActorConfig config,
Supplier<ActorConfig> configSupplier,
ConnectionRegistry connectionRegistry,
ObjectMapper mapper,
RetryStrategyFactory retryStrategyFactory,
ExceptionHandlingFactory exceptionHandlingFactory,
Class<? extends Message> clazz,
MessageHandlingFunction<Message, Boolean> handlerFunction,
Function<Throwable, Boolean> errorCheckFunction) {
val consumerConnection = connectionRegistry.createOrGet(consumerConnectionName(config.getConsumer()));
val producerConnection = connectionRegistry.createOrGet(producerConnectionName(config.getProducer()));
this.publishActor = new UnmanagedPublisher<>(name, config, producerConnection, mapper);
val consumerConnection = connectionRegistry.createOrGet(consumerConnectionName(configSupplier.get().getConsumer()));
val producerConnection = connectionRegistry.createOrGet(producerConnectionName(configSupplier.get().getProducer()));
this.publishActor = new UnmanagedPublisher<>(name, configSupplier, producerConnection, mapper);
this.consumeActor = new UnmanagedConsumer<>(
name, config, consumerConnection, mapper, retryStrategyFactory, exceptionHandlingFactory, clazz,
name, configSupplier, consumerConnection, mapper, retryStrategyFactory, exceptionHandlingFactory, clazz,
handlerFunction, errorCheckFunction);
}

Expand Down
122 changes: 83 additions & 39 deletions src/main/java/io/appform/dropwizard/actors/base/UnmanagedConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,32 @@
import io.appform.dropwizard.actors.retry.RetryStrategyFactory;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

@Slf4j
public class UnmanagedConsumer<Message> {

private final String name;
private final ActorConfig config;
private final Supplier<ActorConfig> configSupplier;
private final RMQConnection connection;
private final ObjectMapper mapper;
private final Class<? extends Message> clazz;
private final int prefetchCount;
private final MessageHandlingFunction<Message, Boolean> handlerFunction;
private final Function<Throwable, Boolean> errorCheckFunction;
private final String queueName;
private final RetryStrategy retryStrategy;
private final ExceptionHandler exceptionHandler;

private final List<Handler<Message>> handlers = Lists.newArrayList();
private final List<Handler<Message>> handlers = Collections.synchronizedList(Lists.newArrayList());
private final ScheduledExecutorService consumerRefresher;

public UnmanagedConsumer(final String name,
final ActorConfig config,
final Supplier<ActorConfig> configSupplier,
final RMQConnection connection,
final ObjectMapper mapper,
final RetryStrategyFactory retryStrategyFactory,
Expand All @@ -43,54 +47,94 @@ public UnmanagedConsumer(final String name,
final MessageHandlingFunction<Message, Boolean> handlerFunction,
final Function<Throwable, Boolean> errorCheckFunction) {
this.name = NamingUtils.prefixWithNamespace(name);
this.config = config;
this.configSupplier = configSupplier;
this.connection = connection;
this.mapper = mapper;
this.clazz = clazz;
this.prefetchCount = config.getPrefetchCount();
this.handlerFunction = handlerFunction;
this.errorCheckFunction = errorCheckFunction;
this.queueName = NamingUtils.queueName(config.getPrefix(), name);
this.retryStrategy = retryStrategyFactory.create(config.getRetryConfig());
this.exceptionHandler = exceptionHandlingFactory.create(config.getExceptionHandlerConfig());
this.queueName = NamingUtils.queueName(configSupplier.get().getPrefix(), name);
this.retryStrategy = retryStrategyFactory.create(configSupplier.get().getRetryConfig());
this.exceptionHandler = exceptionHandlingFactory.create(configSupplier.get().getExceptionHandlerConfig());
this.consumerRefresher = Executors.newSingleThreadScheduledExecutor();
}

public void start() throws Exception {
final ActorConfig config = configSupplier.get();
this.consumerRefresher.scheduleWithFixedDelay(() -> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

18% of developers fix this issue

FutureReturnValueIgnored: Return value of methods returning Future must be checked. Ignoring returned Futures suppresses exceptions thrown from the code that completes the Future.


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

try {
refreshHandlers();
} catch (Exception e) {
log.error("Consumer list refresh failed for [{}] with prefix [{}]", name, config.getPrefix());
}
}, 30, 30, TimeUnit.SECONDS);
for (int i = 1; i <= config.getConcurrency(); i++) {
Channel consumeChannel = connection.newChannel();
final Handler<Message> handler =
new Handler<>(consumeChannel, mapper, clazz, prefetchCount, errorCheckFunction, retryStrategy,
exceptionHandler, handlerFunction);
String queueNameForConsumption;
if (config.isSharded()) {
queueNameForConsumption = NamingUtils.getShardedQueueName(queueName, i % config.getShardCount());
createHandler();
}
}

public void stop() {
handlers.forEach(this::destroyHandler);
this.consumerRefresher.shutdown();
}

private void createHandler() throws Exception {
final ActorConfig config = configSupplier.get();
final Channel consumeChannel = connection.newChannel();
final Handler<Message> handler =
new Handler<>(consumeChannel, mapper, clazz, config.getPrefetchCount(), errorCheckFunction, retryStrategy,
exceptionHandler, handlerFunction);
String queueNameForConsumption;
if (config.isSharded()) {
queueNameForConsumption = NamingUtils.getShardedQueueName(queueName,
handlers.size() % config.getShardCount());
} else {
queueNameForConsumption = queueName;
}
final String tag = consumeChannel.basicConsume(queueNameForConsumption, false, handler);
handler.setTag(tag);
handlers.add(handler);
log.info("Consumer channel started for [{}] with prefix [{}]", name, config.getPrefix());
}

private void destroyHandler(final Handler<Message> handler) {
final ActorConfig config = configSupplier.get();
try {
final Channel channel = handler.getChannel();
if(channel.isOpen()) {
channel.basicCancel(handler.getTag());
//Wait till the handler completes consuming and ack'ing the current message.
log.info("Waiting for handler to complete processing the current message..");
while(handler.isRunning());
channel.close();
log.info("Consumer channel closed for [{}] with prefix [{}]", name, config.getPrefix());
} else {
queueNameForConsumption = queueName;
log.warn("Consumer channel already closed for [{}] with prefix [{}]", name, config.getPrefix());
}
final String tag = consumeChannel.basicConsume(queueNameForConsumption, false, handler);
handler.setTag(tag);
handlers.add(handler);
log.info("Started consumer {} of type {}", i, name);
} catch (Exception e) {
log.error(String.format("Error closing consumer channel [%s] for [%s] with prefix [%s]", handler.getTag(), name, config.getPrefix()), e);
}
}

public void stop() {
handlers.forEach(handler -> {
try {
final Channel channel = handler.getChannel();
if(channel.isOpen()) {
channel.basicCancel(handler.getTag());
//Wait till the handler completes consuming and ack'ing the current message.
log.info("Waiting for handler to complete processing the current message..");
while(handler.isRunning());
channel.close();
log.info("Consumer channel closed for [{}] with prefix [{}]", name, config.getPrefix());
} else {
log.warn("Consumer channel already closed for [{}] with prefix [{}]", name, config.getPrefix());
}
} catch (Exception e) {
log.error(String.format("Error closing consumer channel [%s] for [%s] with prefix [%s]", handler.getTag(), name, config.getPrefix()), e);
private synchronized void refreshHandlers() throws Exception {
final ActorConfig config = configSupplier.get();
if (handlers.size() == config.getConcurrency()) {
log.info("Current handler list has same size as concurrency for [{}] with prefix [{}]", name,
config.getPrefix());
return;
}

if (handlers.size() < config.getConcurrency()) {
int consumersToBeAdded = config.getConcurrency() - handlers.size();
for (int i = 0; i < consumersToBeAdded; i++) {
createHandler();
}
});
} else {
int consumersToBeRemoved = handlers.size() - config.getConcurrency();
for (int i = 0; i < consumersToBeRemoved; i++) {
final Handler<Message> handler = handlers.remove(0);
destroyHandler(handler);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

import java.io.IOException;
import java.util.Collections;
import java.util.function.Supplier;

@Slf4j
public class UnmanagedPublisher<Message> {

private final String name;
private final ActorConfig config;
private final Supplier<ActorConfig> configSupplier;
private final RMQConnection connection;
private final ObjectMapper mapper;
private final String queueName;
Expand All @@ -28,17 +29,18 @@ public class UnmanagedPublisher<Message> {

public UnmanagedPublisher(
String name,
ActorConfig config,
Supplier<ActorConfig> configSupplier,
RMQConnection connection,
ObjectMapper mapper) {
this.name = NamingUtils.prefixWithNamespace(name);
this.config = config;
this.configSupplier = configSupplier;
this.connection = connection;
this.mapper = mapper;
this.queueName = NamingUtils.queueName(config.getPrefix(), name);
this.queueName = NamingUtils.queueName(configSupplier.get().getPrefix(), name);
}

public final void publishWithDelay(Message message, long delayMilliseconds) throws Exception {
final ActorConfig config = configSupplier.get();
log.info("Publishing message to exchange with delay: {}", delayMilliseconds);
if (!config.isDelayed()) {
log.warn("Publishing delayed message to non-delayed queue queue:{}", queueName);
Expand Down Expand Up @@ -66,23 +68,23 @@ public final void publish(Message message) throws Exception {

public final void publish(Message message, AMQP.BasicProperties properties) throws Exception {
String routingKey;
if (config.isSharded()) {
if (configSupplier.get().isSharded()) {
routingKey = NamingUtils.getShardedQueueName(queueName, getShardId());
} else {
routingKey = queueName;
}
publishChannel.basicPublish(config.getExchange(), routingKey, properties, mapper().writeValueAsBytes(message));
publishChannel.basicPublish(configSupplier.get().getExchange(), routingKey, properties, mapper().writeValueAsBytes(message));
}

private final int getShardId() {
return RandomUtils.nextInt(0, config.getShardCount());
return RandomUtils.nextInt(0, configSupplier.get().getShardCount());
}

public final long pendingMessagesCount() {
try {
if (config.isSharded()) {
if (configSupplier.get().isSharded()) {
long messageCount = 0 ;
for (int i = 0; i < config.getShardCount(); i++) {
for (int i = 0; i < configSupplier.get().getShardCount(); i++) {
String shardedQueueName = NamingUtils.getShardedQueueName(queueName, i);
messageCount += publishChannel.messageCount(shardedQueueName);
}
Expand All @@ -107,6 +109,7 @@ public final long pendingSidelineMessagesCount() {
}

public void start() throws Exception {
final ActorConfig config = configSupplier.get();
final String exchange = config.getExchange();
final String dlx = config.getExchange() + "_SIDELINE";
if (config.isDelayed()) {
Expand Down Expand Up @@ -150,7 +153,8 @@ private void ensureExchange(String exchange) throws IOException {
log.info("Created exchange: {}", exchange);
}

private void ensureDelayedExchange(String exchange) throws IOException {
private void ensureDelayedExchange(final String exchange) throws IOException {
final ActorConfig config = configSupplier.get();
if (config.getDelayType() == DelayType.TTL) {
ensureExchange(ttlExchange(config));
} else {
Expand All @@ -177,6 +181,7 @@ private String ttlQueue(String queueName) {
}

public void stop() throws Exception {
final ActorConfig config = configSupplier.get();
try {
if(publishChannel.isOpen()) {
publishChannel.close();
Expand Down
Loading