diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 335b8fbd79..72f39d358f 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -12,7 +12,9 @@ */ package com.netflix.conductor.core.reconciliation; +import java.time.Instant; import java.util.Optional; +import java.util.Random; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; @@ -95,19 +97,23 @@ public void sweep(String workflowId) { Monitors.error(CLASS_NAME, "sweep"); LOGGER.error("Error running sweep for " + workflowId, e); } + long workflowOffsetTimeout = + workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds()); if (workflow != null) { - unack(workflow); + long startTime = Instant.now().toEpochMilli(); + unack(workflow, workflowOffsetTimeout); + long endTime = Instant.now().toEpochMilli(); + Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime); } else { LOGGER.warn( "Workflow with {} id can not be found. Attempting to unack using the id", workflowId); - queueDAO.setUnackTimeout( - DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().toMillis()); + queueDAO.setUnackTimeout(DECIDER_QUEUE, workflowId, workflowOffsetTimeout * 1000); } } @VisibleForTesting - void unack(WorkflowModel workflowModel) { + void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { long postponeDurationSeconds = 0; for (TaskModel taskModel : workflowModel.getTasks()) { if (taskModel.getStatus() == Status.IN_PROGRESS) { @@ -116,16 +122,15 @@ void unack(WorkflowModel workflowModel) { postponeDurationSeconds = (taskModel.getWaitTimeout() != 0) ? taskModel.getWaitTimeout() + 1 - : properties.getWorkflowOffsetTimeout().getSeconds(); + : workflowOffsetTimeout; } else { postponeDurationSeconds = (taskModel.getResponseTimeoutSeconds() != 0) ? taskModel.getResponseTimeoutSeconds() + 1 - : properties.getWorkflowOffsetTimeout().getSeconds(); + : workflowOffsetTimeout; } break; - } - if (taskModel.getStatus() == Status.SCHEDULED) { + } else if (taskModel.getStatus() == Status.SCHEDULED) { Optional taskDefinition = taskModel.getTaskDefinition(); if (taskDefinition.isPresent()) { TaskDef taskDef = taskDefinition.get(); @@ -137,13 +142,13 @@ void unack(WorkflowModel workflowModel) { (workflowModel.getWorkflowDefinition().getTimeoutSeconds() != 0) ? workflowModel.getWorkflowDefinition().getTimeoutSeconds() + 1 - : properties.getWorkflowOffsetTimeout().getSeconds(); + : workflowOffsetTimeout; } } else { postponeDurationSeconds = (workflowModel.getWorkflowDefinition().getTimeoutSeconds() != 0) ? workflowModel.getWorkflowDefinition().getTimeoutSeconds() + 1 - : properties.getWorkflowOffsetTimeout().getSeconds(); + : workflowOffsetTimeout; } break; } @@ -151,4 +156,18 @@ void unack(WorkflowModel workflowModel) { queueDAO.setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), postponeDurationSeconds * 1000); } + + /** + * jitter will be +- (1/3) workflowOffsetTimeout for example, if workflowOffsetTimeout is 45 + * seconds, this function returns values between [30-60] seconds + * + * @param workflowOffsetTimeout + * @return + */ + @VisibleForTesting + long workflowOffsetWithJitter(long workflowOffsetTimeout) { + long range = workflowOffsetTimeout / 3; + long jitter = new Random().nextInt((int) (2 * range + 1)) - range; + return workflowOffsetTimeout + jitter; + } } diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 77b98d12ea..9107907877 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -394,6 +394,11 @@ public static void recordWorkflowCompletion( .record(duration, TimeUnit.MILLISECONDS); } + public static void recordUnackTime(String workflowType, long duration) { + getTimer(classQualifier, "workflow_unack", "workflowName", workflowType) + .record(duration, TimeUnit.MILLISECONDS); + } + public static void recordTaskRateLimited(String taskDefName, int limit) { gauge(classQualifier, "task_rate_limited", limit, "taskType", taskDefName); } diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index aa51bf8907..cc1ce59146 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -31,6 +31,7 @@ import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -67,7 +68,7 @@ public void testPostponeDurationForWaitTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, @@ -88,7 +89,7 @@ public void testPostponeDurationForWaitTaskTypeWithWaitTime() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), (waitTimeout + 1) * 1000); @@ -105,7 +106,7 @@ public void testPostponeDurationForTaskInProgress() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, @@ -126,7 +127,7 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), (responseTimeout + 1) * 1000); @@ -146,7 +147,7 @@ public void testPostponeDurationForTaskInScheduled() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, @@ -169,7 +170,7 @@ public void testPostponeDurationForTaskInScheduledWithWorkflowTimeoutSet() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), (workflowTimeout + 1) * 1000); @@ -190,7 +191,7 @@ public void testPostponeDurationForTaskInScheduledWithWorkflowTimeoutSetAndNoPol when(taskModel.getStatus()).thenReturn(Status.SCHEDULED); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), (workflowTimeout + 1) * 1000); @@ -209,7 +210,7 @@ public void testPostponeDurationForTaskInScheduledWithNoWorkflowTimeoutSetAndNoP when(taskModel.getStatus()).thenReturn(Status.SCHEDULED); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, @@ -230,7 +231,7 @@ public void testPostponeDurationForTaskInScheduledWithNoPollTimeoutSet() { when(taskModel.getTaskDefinition()).thenReturn(Optional.of(taskDef)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, @@ -252,9 +253,19 @@ public void testPostponeDurationForTaskInScheduledWithPollTimeoutSet() { when(taskModel.getTaskDefinition()).thenReturn(Optional.of(taskDef)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); - workflowSweeper.unack(workflowModel); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), (pollTimeout + 1) * 1000); } + + @Test + public void testWorkflowOffsetJitter() { + long offset = 45; + for (int i = 0; i < 10; i++) { + long offsetWithJitter = workflowSweeper.workflowOffsetWithJitter(offset); + assertTrue(offsetWithJitter >= 30); + assertTrue(offsetWithJitter <= 60); + } + } }