Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making message properties accessible while handling/consuming the message #69

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions src/main/java/io/appform/dropwizard/actors/actor/ActorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,27 @@
package io.appform.dropwizard.actors.actor;

import io.appform.dropwizard.actors.TtlConfig;
import io.appform.dropwizard.actors.actor.metadata.generators.MessageDelayMetaGenerator;
import io.appform.dropwizard.actors.actor.metadata.generators.MessageExpiredMetaGenerator;
import io.appform.dropwizard.actors.actor.metadata.generators.MessageReDeliveredMetaGenerator;
import io.appform.dropwizard.actors.exceptionhandler.config.ExceptionHandlerConfig;
import io.appform.dropwizard.actors.retry.config.NoRetryConfig;
import io.appform.dropwizard.actors.retry.config.RetryConfig;
import io.dropwizard.validation.ValidationMethod;
import java.util.List;
import java.util.Objects;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;

import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.Objects;

/**
* Configuration for an actor
*/
Expand All @@ -45,6 +48,13 @@
@NoArgsConstructor
@Builder
public class ActorConfig {

public static final List<String> DEFAULT_METADATA_GENERATORS = List.of(
MessageReDeliveredMetaGenerator.class.getName(),
MessageDelayMetaGenerator.class.getName(),
MessageExpiredMetaGenerator.class.getName()
);

@NotNull
@NotEmpty
private String exchange;
Expand Down Expand Up @@ -96,6 +106,8 @@ public class ActorConfig {
@Max(32)
private Integer shardCount;

private List<String> messageMetaGeneratorClasses;

public boolean isSharded() {
return Objects.nonNull(shardCount);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
package io.appform.dropwizard.actors.actor;

import java.util.Date;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public final class MessageMetadata {

private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
santanusinha marked this conversation as resolved.
Show resolved Hide resolved
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date msgSentTimestamp;
private String type;
private String userId;
private String appId;
private String clusterId;

// custom fields
private boolean redelivered;
private long delayInMs;

private boolean expired;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.appform.dropwizard.actors.actor.metadata;

import java.util.Date;
import java.util.Map;
import lombok.Builder;
import lombok.Data;

/**
* Ref : <a href="https://www.rabbitmq.com/amqp-0-9-1-reference.html#class.basic">AMQP properties</a>
*/
@Data
@Builder
public class MessageMetaContext {

private boolean redelivered;
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
manojsharma27 marked this conversation as resolved.
Show resolved Hide resolved
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.appform.dropwizard.actors.actor.metadata;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.generators.MessageMetadataGenerator;
import io.appform.dropwizard.actors.common.RabbitmqActorException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Objects;
import lombok.val;

public class MessageMetadataProvider {

private final List<MessageMetadataGenerator> messageMetadataGenerators;

public MessageMetadataProvider(List<String> messageMetaGeneratorClasses) {
messageMetadataGenerators = configureMessageMetadataGenerators(messageMetaGeneratorClasses);
}

public MessageMetadata createMetadata(final Envelope envelope, final AMQP.BasicProperties properties) {
Objects.requireNonNull(envelope, "Envelope should not be null");
Objects.requireNonNull(properties, "AMQP.BasicProperties should not be null");
val messageMetaContext = MessageMetaContext.builder()
.redelivered(envelope.isRedeliver())
.contentType(properties.getContentType())
.contentEncoding(properties.getContentEncoding())
.headers(properties.getHeaders())
.deliveryMode(properties.getDeliveryMode())
.priority(properties.getPriority())
.correlationId(properties.getCorrelationId())
.replyTo(properties.getReplyTo())
.expiration(properties.getExpiration())
.messageId(properties.getMessageId())
.timestamp(properties.getTimestamp())
.type(properties.getType())
.userId(properties.getUserId())
.appId(properties.getAppId())
.clusterId(properties.getClusterId())
.build();

val messageMetadata = MessageMetadata.builder().build();
manojsharma27 marked this conversation as resolved.
Show resolved Hide resolved
messageMetadataGenerators.forEach(generator -> generator.generate(messageMetaContext, messageMetadata));
return messageMetadata;
}

private List<MessageMetadataGenerator> configureMessageMetadataGenerators(List<String> messageMetaGeneratorClasses) {
if (null == messageMetaGeneratorClasses) {
return List.of();
}

return messageMetaGeneratorClasses.stream()
.distinct()
.map(className -> {
try {
return (MessageMetadataGenerator) Class.forName(className)
.getDeclaredConstructor()
.newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException |
IllegalAccessException | InvocationTargetException e) {
throw RabbitmqActorException.propagate("Failed to initialize messageMetadataGenerator "
+ className, e);
}
})
.toList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import static io.appform.dropwizard.actors.common.Constants.MESSAGE_PUBLISHED_TEXT;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;
import io.appform.dropwizard.actors.utils.CommonUtils;
import java.time.Instant;

public class MessageDelayMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setDelayInMs(calculateDelayInMs(messageMetaContext));
}

private long calculateDelayInMs(MessageMetaContext messageMetaContext) {
return CommonUtils.extractMessagePropertiesHeader(messageMetaContext.getHeaders(),
MESSAGE_PUBLISHED_TEXT, Long.class)
.map(publishedAt -> Math.max(Instant.now().toEpochMilli() - publishedAt, 0))
.orElse(0L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import static io.appform.dropwizard.actors.common.Constants.MESSAGE_EXPIRY_TEXT;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;
import io.appform.dropwizard.actors.utils.CommonUtils;
import java.time.Instant;

public class MessageExpiredMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setExpired(isExpired(messageMetaContext));
}

private boolean isExpired(MessageMetaContext messageMetaContext) {
return CommonUtils.extractMessagePropertiesHeader(messageMetaContext.getHeaders(),
MESSAGE_EXPIRY_TEXT, Long.class)
.filter(expiry -> Instant.now().toEpochMilli() >= expiry)
.isPresent();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;

public class MessageHeadersMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setHeaders(messageMetaContext.getHeaders());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;

public class MessageMetaInfoGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setContentType(messageMetaContext.getContentType());
messageMetadata.setContentEncoding(messageMetaContext.getContentEncoding());
messageMetadata.setDeliveryMode(messageMetaContext.getDeliveryMode());
messageMetadata.setPriority(messageMetaContext.getPriority());
messageMetadata.setCorrelationId(messageMetaContext.getCorrelationId());
messageMetadata.setReplyTo(messageMetaContext.getReplyTo());
messageMetadata.setExpiration(messageMetaContext.getExpiration());
messageMetadata.setMessageId(messageMetaContext.getMessageId());
messageMetadata.setMsgSentTimestamp(messageMetaContext.getTimestamp());
messageMetadata.setType(messageMetaContext.getType());
messageMetadata.setUserId(messageMetaContext.getUserId());
messageMetadata.setAppId(messageMetaContext.getAppId());
messageMetadata.setClusterId(messageMetaContext.getClusterId());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;

@FunctionalInterface
public interface MessageMetadataGenerator {

void generate(final MessageMetaContext messageMetaContext, final MessageMetadata messageMetadata);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.appform.dropwizard.actors.actor.metadata.generators;

import io.appform.dropwizard.actors.actor.MessageMetadata;
import io.appform.dropwizard.actors.actor.metadata.MessageMetaContext;

public class MessageReDeliveredMetaGenerator implements MessageMetadataGenerator {

@Override
public void generate(final MessageMetaContext messageMetaContext, MessageMetadata messageMetadata) {
messageMetadata.setRedelivered(messageMetaContext.isRedelivered());
}

}
Loading