Skip to content

Commit

Permalink
Added ActorConfig based support to specify required message metadata …
Browse files Browse the repository at this point in the history
…generators
  • Loading branch information
manojsharma27 committed Dec 10, 2023
1 parent 9f18ad5 commit ebf0c38
Show file tree
Hide file tree
Showing 21 changed files with 750 additions and 67 deletions.
26 changes: 19 additions & 7 deletions src/main/java/io/appform/dropwizard/actors/actor/ActorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,27 @@
package io.appform.dropwizard.actors.actor;

import io.appform.dropwizard.actors.TtlConfig;
import io.appform.dropwizard.actors.actor.metadata.generators.MessageDelayMetaGenerator;
import io.appform.dropwizard.actors.actor.metadata.generators.MessageExpiredMetaGenerator;
import io.appform.dropwizard.actors.actor.metadata.generators.MessageReDeliveredMetaGenerator;
import io.appform.dropwizard.actors.exceptionhandler.config.ExceptionHandlerConfig;
import io.appform.dropwizard.actors.retry.config.NoRetryConfig;
import io.appform.dropwizard.actors.retry.config.RetryConfig;
import io.dropwizard.validation.ValidationMethod;
import java.util.List;
import java.util.Objects;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;

import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.Objects;

/**
* Configuration for an actor
*/
Expand All @@ -45,6 +48,13 @@
@NoArgsConstructor
@Builder
public class ActorConfig {

public static final List<String> DEFAULT_METADATA_GENERATORS = List.of(
MessageReDeliveredMetaGenerator.class.getName(),
MessageDelayMetaGenerator.class.getName(),
MessageExpiredMetaGenerator.class.getName()
);

@NotNull
@NotEmpty
private String exchange;
Expand Down Expand Up @@ -96,6 +106,8 @@ public class ActorConfig {
@Max(32)
private Integer shardCount;

private List<String> messageMetaGeneratorClasses;

public boolean isSharded() {
return Objects.nonNull(shardCount);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
package io.appform.dropwizard.actors.actor;

import com.rabbitmq.client.AMQP;
import java.util.Date;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public final class MessageMetadata {

private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date msgSentTimestamp;
private String type;
private String userId;
private String appId;
private String clusterId;

// custom fields
private boolean redelivered;
private long delayInMs;
private AMQP.BasicProperties properties;

private boolean expired;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.appform.dropwizard.actors.actor.metadata;

import java.util.Date;
import java.util.Map;
import lombok.Builder;
import lombok.Data;

/**
* Ref : <a href="https://www.rabbitmq.com/amqp-0-9-1-reference.html#class.basic">AMQP properties</a>
*/
@Data
@Builder
public class MessageMetaContext {

private boolean redelivered;
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.appform.dropwizard.actors.actor.metadata;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.generators.MessageMetadataGenerator;
import io.appform.dropwizard.actors.common.RabbitmqActorException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Objects;
import lombok.val;

public class MessageMetadataProvider {

private final List<MessageMetadataGenerator> messageMetadataGenerators;

public MessageMetadataProvider(List<String> messageMetaGeneratorClasses) {
messageMetadataGenerators = configureMessageMetadataGenerators(messageMetaGeneratorClasses);
}

public MessageMetadata createMetadata(final Envelope envelope, final AMQP.BasicProperties properties) {
Objects.requireNonNull(envelope, "Envelope should not be null");
Objects.requireNonNull(properties, "AMQP.BasicProperties should not be null");
val messageMetaContext = MessageMetaContext.builder()
.redelivered(envelope.isRedeliver())
.contentType(properties.getContentType())
.contentEncoding(properties.getContentEncoding())
.headers(properties.getHeaders())
.deliveryMode(properties.getDeliveryMode())
.priority(properties.getPriority())
.correlationId(properties.getCorrelationId())
.replyTo(properties.getReplyTo())
.expiration(properties.getExpiration())
.messageId(properties.getMessageId())
.timestamp(properties.getTimestamp())
.type(properties.getType())
.userId(properties.getUserId())
.appId(properties.getAppId())
.clusterId(properties.getClusterId())
.build();

val messageMetadata = MessageMetadata.builder().build();
messageMetadataGenerators.forEach(generator -> generator.generate(messageMetaContext, messageMetadata));
return messageMetadata;
}

private List<MessageMetadataGenerator> configureMessageMetadataGenerators(List<String> messageMetaGeneratorClasses) {
if (null == messageMetaGeneratorClasses) {
return List.of();
}

return messageMetaGeneratorClasses.stream()
.distinct()
.map(className -> {
try {
return (MessageMetadataGenerator) Class.forName(className)
.getDeclaredConstructor()
.newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException |
IllegalAccessException | InvocationTargetException e) {
throw RabbitmqActorException.propagate("Failed to initialize messageMetadataGenerator "
+ className, e);
}
})
.toList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import static io.appform.dropwizard.actors.common.Constants.MESSAGE_PUBLISHED_TEXT;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;
import io.appform.dropwizard.actors.utils.CommonUtils;
import java.time.Instant;

public class MessageDelayMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setDelayInMs(calculateDelayInMs(messageMetaContext));
}

private long calculateDelayInMs(MessageMetaContext messageMetaContext) {
return CommonUtils.extractMessagePropertiesHeader(messageMetaContext.getHeaders(),
MESSAGE_PUBLISHED_TEXT, Long.class)
.map(publishedAt -> Math.max(Instant.now().toEpochMilli() - publishedAt, 0))
.orElse(0L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import static io.appform.dropwizard.actors.common.Constants.MESSAGE_EXPIRY_TEXT;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;
import io.appform.dropwizard.actors.utils.CommonUtils;
import java.time.Instant;

public class MessageExpiredMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setExpired(isExpired(messageMetaContext));
}

private boolean isExpired(MessageMetaContext messageMetaContext) {
return CommonUtils.extractMessagePropertiesHeader(messageMetaContext.getHeaders(),
MESSAGE_EXPIRY_TEXT, Long.class)
.filter(expiry -> Instant.now().toEpochMilli() >= expiry)
.isPresent();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;

public class MessageHeadersMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setHeaders(messageMetaContext.getHeaders());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;

public class MessageMetaInfoGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setContentType(messageMetaContext.getContentType());
messageMetadata.setContentEncoding(messageMetaContext.getContentEncoding());
messageMetadata.setDeliveryMode(messageMetaContext.getDeliveryMode());
messageMetadata.setPriority(messageMetaContext.getPriority());
messageMetadata.setCorrelationId(messageMetaContext.getCorrelationId());
messageMetadata.setReplyTo(messageMetaContext.getReplyTo());
messageMetadata.setExpiration(messageMetaContext.getExpiration());
messageMetadata.setMessageId(messageMetaContext.getMessageId());
messageMetadata.setMsgSentTimestamp(messageMetaContext.getTimestamp());
messageMetadata.setType(messageMetaContext.getType());
messageMetadata.setUserId(messageMetaContext.getUserId());
messageMetadata.setAppId(messageMetaContext.getAppId());
messageMetadata.setClusterId(messageMetaContext.getClusterId());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;

@FunctionalInterface
public interface MessageMetadataGenerator {

void generate(final MessageMetaContext messageMetaContext, final MessageMetadata messageMetadata);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;

public class MessageReDeliveredMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setRedelivered(messageMetaContext.isRedelivered());
}

}
Loading

0 comments on commit ebf0c38

Please sign in to comment.