Skip to content

Commit

Permalink
Added ActorConfig based support to specify required message metadata …
Browse files Browse the repository at this point in the history
…generators
  • Loading branch information
manojsharma27 committed Dec 1, 2023
1 parent add4660 commit 5f4b972
Show file tree
Hide file tree
Showing 21 changed files with 738 additions and 51 deletions.
23 changes: 16 additions & 7 deletions src/main/java/io/appform/dropwizard/actors/actor/ActorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@
import io.appform.dropwizard.actors.retry.config.NoRetryConfig;
import io.appform.dropwizard.actors.retry.config.RetryConfig;
import io.dropwizard.validation.ValidationMethod;
import java.util.List;
import java.util.Objects;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;

import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.Objects;

/**
* Configuration for an actor
*/
Expand All @@ -45,6 +45,7 @@
@NoArgsConstructor
@Builder
public class ActorConfig {

@NotNull
@NotEmpty
private String exchange;
Expand Down Expand Up @@ -96,6 +97,14 @@ public class ActorConfig {
@Max(32)
private Integer shardCount;

@Valid
@Builder.Default
private List<String> messageMetaGeneratorClasses = List.of(
"io.appform.dropwizard.actors.actor.metadata.MessageReDeliveredMetaGenerator",
"io.appform.dropwizard.actors.actor.metadata.MessageDelayMetaGenerator",
"io.appform.dropwizard.actors.actor.metadata.MessageExpiredMetaGenerator"
);

public boolean isSharded() {
return Objects.nonNull(shardCount);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
package io.appform.dropwizard.actors.actor;

import com.rabbitmq.client.AMQP;
import java.util.Date;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public final class MessageMetadata {

private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date msgSentTimestamp;
private String type;
private String userId;
private String appId;
private String clusterId;

// custom fields
private boolean redelivered;
private long delayInMs;
private AMQP.BasicProperties properties;

private boolean expired;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.appform.dropwizard.actors.actor.metadata;

import static io.appform.dropwizard.actors.common.Constants.MESSAGE_PUBLISHED_TEXT;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.base.utils.MessageMetaUtils;
import java.time.Instant;
import java.util.Optional;

public class MessageDelayMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setDelayInMs(calculateDelayInMs(messageMetaContext));
}

private long calculateDelayInMs(MessageMetaContext messageMetaContext) {
Optional<Long> publishedTimestamp = MessageMetaUtils.extractMessagePropertiesHeader(
messageMetaContext.getHeaders(),
MESSAGE_PUBLISHED_TEXT, Long.class);
return publishedTimestamp.map(aLong -> Math.max(Instant.now().toEpochMilli() - aLong, 0))
.orElse(-1L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.appform.dropwizard.actors.actor.metadata;

import static io.appform.dropwizard.actors.common.Constants.MESSAGE_EXPIRY_TEXT;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.base.utils.MessageMetaUtils;
import java.time.Instant;
import java.util.Optional;

public class MessageExpiredMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setExpired(isExpired(messageMetaContext));
}

private boolean isExpired(MessageMetaContext messageMetaContext) {
Optional<Long> msgExpiry = MessageMetaUtils.extractMessagePropertiesHeader(messageMetaContext.getHeaders(),
MESSAGE_EXPIRY_TEXT, Long.class);
return msgExpiry.filter(expiry -> Instant.now().toEpochMilli() >= expiry).isPresent();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.appform.dropwizard.actors.actor.metadata;

import io.appform.dropwizard.actors.actor.MessageMetadata;

public class MessageHeadersMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setHeaders(messageMetaContext.getHeaders());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.appform.dropwizard.actors.actor.metadata;

import java.util.Date;
import java.util.Map;
import lombok.Builder;
import lombok.Data;

/**
* Ref : <a href="https://www.rabbitmq.com/amqp-0-9-1-reference.html#class.basic">AMQP properties</a>
*/
@Data
@Builder
public class MessageMetaContext {

private boolean redelivered;
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.appform.dropwizard.actors.actor.metadata;

import io.appform.dropwizard.actors.actor.MessageMetadata;

public class MessageMetaInfoGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setContentType(messageMetaContext.getContentType());
messageMetadata.setContentEncoding(messageMetaContext.getContentEncoding());
messageMetadata.setDeliveryMode(messageMetaContext.getDeliveryMode());
messageMetadata.setPriority(messageMetaContext.getPriority());
messageMetadata.setCorrelationId(messageMetaContext.getCorrelationId());
messageMetadata.setReplyTo(messageMetaContext.getReplyTo());
messageMetadata.setExpiration(messageMetaContext.getExpiration());
messageMetadata.setMessageId(messageMetaContext.getMessageId());
messageMetadata.setMsgSentTimestamp(messageMetaContext.getTimestamp());
messageMetadata.setType(messageMetaContext.getType());
messageMetadata.setUserId(messageMetaContext.getUserId());
messageMetadata.setAppId(messageMetaContext.getAppId());
messageMetadata.setClusterId(messageMetaContext.getClusterId());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.appform.dropwizard.actors.actor.metadata;

import io.appform.dropwizard.actors.actor.MessageMetadata;

public interface MessageMetadataGenerator {

void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.appform.dropwizard.actors.actor.metadata;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.common.RabbitmqActorException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class MessageMetadataProvider {

private final List<MessageMetadataGenerator> messageMetadataGenerators;

public MessageMetadataProvider(List<String> messageMetaGeneratorClasses) {
messageMetadataGenerators = configureMessageMetadataGenerators(messageMetaGeneratorClasses);
}

private List<MessageMetadataGenerator> configureMessageMetadataGenerators(List<String> messageMetaGeneratorClasses) {
if (Objects.isNull(messageMetaGeneratorClasses)) {
return Collections.emptyList();
}

return messageMetaGeneratorClasses.stream()
.distinct()
.map(className -> {
try {
@SuppressWarnings("unchecked")
Class<MessageMetadataGenerator> clazz = (Class<MessageMetadataGenerator>) Class.forName(
className);
return clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException |
IllegalAccessException | InvocationTargetException e) {
throw RabbitmqActorException.propagate("Failed to initialize messageMetadataGenerators", e);
}
})
.toList();
}

public MessageMetadata createMetadata(final Envelope envelope, final AMQP.BasicProperties properties) {
Objects.requireNonNull(envelope, "Envelope should not be null");
Objects.requireNonNull(properties, "AMQP.BasicProperties should not be null");
MessageMetaContext messageMetaContext = MessageMetaContext.builder()
.redelivered(envelope.isRedeliver())
.contentType(properties.getContentType())
.contentEncoding(properties.getContentEncoding())
.headers(properties.getHeaders())
.deliveryMode(properties.getDeliveryMode())
.priority(properties.getPriority())
.correlationId(properties.getCorrelationId())
.replyTo(properties.getReplyTo())
.expiration(properties.getExpiration())
.messageId(properties.getMessageId())
.timestamp(properties.getTimestamp())
.type(properties.getType())
.userId(properties.getUserId())
.appId(properties.getAppId())
.clusterId(properties.getClusterId())
.build();

MessageMetadata messageMetadata = MessageMetadata.builder().build();
messageMetadataGenerators.forEach(populator -> populator.generate(messageMetaContext, messageMetadata));
return messageMetadata;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.appform.dropwizard.actors.actor.metadata;

import io.appform.dropwizard.actors.actor.MessageMetadata;

public class MessageReDeliveredMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setRedelivered(messageMetaContext.isRedelivered());
}

}
44 changes: 10 additions & 34 deletions src/main/java/io/appform/dropwizard/actors/base/Handler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,17 @@
import com.rabbitmq.client.Envelope;
import io.appform.dropwizard.actors.actor.MessageHandlingFunction;
import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetadataProvider;
import io.appform.dropwizard.actors.exceptionhandler.handlers.ExceptionHandler;
import io.appform.dropwizard.actors.retry.RetryStrategy;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.function.Function;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.function.Function;

import static io.appform.dropwizard.actors.common.Constants.MESSAGE_EXPIRY_TEXT;
import static io.appform.dropwizard.actors.common.Constants.MESSAGE_PUBLISHED_TEXT;

@Slf4j
public class Handler<Message> extends DefaultConsumer {

Expand All @@ -33,7 +29,7 @@ public class Handler<Message> extends DefaultConsumer {
private final ExceptionHandler exceptionHandler;
private final MessageHandlingFunction<Message, Boolean> messageHandlingFunction;
private final MessageHandlingFunction<Message, Boolean> expiredMessageHandlingFunction;

private final MessageMetadataProvider messageMetadataProvider;
@Getter
private volatile boolean running;

Expand All @@ -49,7 +45,8 @@ public Handler(final Channel channel,
final RetryStrategy retryStrategy,
final ExceptionHandler exceptionHandler,
final MessageHandlingFunction<Message, Boolean> messageHandlingFunction,
final MessageHandlingFunction<Message, Boolean> expiredMessageHandlingFunction) throws Exception {
final MessageHandlingFunction<Message, Boolean> expiredMessageHandlingFunction,
final MessageMetadataProvider messageMetadataProvider) throws Exception {
super(channel);
this.mapper = mapper;
this.clazz = clazz;
Expand All @@ -59,11 +56,12 @@ public Handler(final Channel channel,
this.exceptionHandler = exceptionHandler;
this.messageHandlingFunction = messageHandlingFunction;
this.expiredMessageHandlingFunction = expiredMessageHandlingFunction;
this.messageMetadataProvider = messageMetadataProvider;
}

private boolean handle(final Message message, final MessageMetadata messageMetadata) throws Exception {
running = true;
val result = isExpired(messageMetadata.getProperties())
val result = messageMetadata.isExpired()
? expiredMessageHandlingFunction.apply(message, messageMetadata)
: messageHandlingFunction.apply(message, messageMetadata);
running = false;
Expand Down Expand Up @@ -101,29 +99,7 @@ private Callable<Boolean> getHandleCallable(final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body) throws IOException {
val message = mapper.readValue(body, clazz);
return () -> handle(message, populateMessageMeta(envelope, properties));
}

private long getDelayInMs(final AMQP.BasicProperties properties) {
if (properties.getHeaders() != null
&& properties.getHeaders().containsKey(MESSAGE_PUBLISHED_TEXT)) {
val publishedAt = (long) properties.getHeaders().get(MESSAGE_PUBLISHED_TEXT);
return Math.max(Instant.now().toEpochMilli() - publishedAt, 0);
}
return -1;
return () -> handle(message, messageMetadataProvider.createMetadata(envelope, properties));
}

private boolean isExpired(final AMQP.BasicProperties properties) {
if (properties.getHeaders() != null
&& properties.getHeaders().containsKey(MESSAGE_EXPIRY_TEXT)) {
val expiresAt = (long) properties.getHeaders().get(MESSAGE_EXPIRY_TEXT);
return Instant.now().toEpochMilli() >= expiresAt;
}
return false;
}

private MessageMetadata populateMessageMeta(final Envelope envelope, final AMQP.BasicProperties properties) {
val delayInMs = getDelayInMs(properties);
return new MessageMetadata(envelope.isRedeliver(), delayInMs, properties);
}
}
Loading

0 comments on commit 5f4b972

Please sign in to comment.