Skip to content

Commit

Permalink
Merge pull request #212 from danmiller192/fix_postgres_indexer_async_…
Browse files Browse the repository at this point in the history
…issue

Updated Postgres indexer to handle out of order updates
  • Loading branch information
v1r3n authored Jul 18, 2024
2 parents 4e604c7 + dc3a2e1 commit 55a31af
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ public PostgresIndexDAO(
@Override
public void indexWorkflow(WorkflowSummary workflow) {
String INSERT_WORKFLOW_INDEX_SQL =
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data "
+ "WHERE EXCLUDED.update_time > workflow_index.update_time";

if (onlyIndexOnStatusChange) {
INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status";
INSERT_WORKFLOW_INDEX_SQL += " AND workflow_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(workflow.getUpdateTime());
Timestamp updateTime = Timestamp.from(Instant.from(updateTa));

TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(ta));

Expand All @@ -102,6 +106,7 @@ public void indexWorkflow(WorkflowSummary workflow) {
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
Expand Down Expand Up @@ -135,10 +140,11 @@ public void indexTask(TaskSummary task) {
"INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) "
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data "
+ "WHERE EXCLUDED.update_time > task_index.update_time";

if (onlyIndexOnStatusChange) {
INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status";
INSERT_TASK_INDEX_SQL += " AND task_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALTER TABLE workflow_index
ADD update_time TIMESTAMP WITH TIME ZONE NULL;

UPDATE workflow_index
SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SSZ')::timestamp WITH time zone;

ALTER TABLE workflow_index
ALTER COLUMN update_time SET NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) {
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setUpdateTime("2023-02-07T08:43:45Z");
wfs.setStatus(Workflow.WorkflowStatus.RUNNING);
return wfs;
}
Expand Down Expand Up @@ -142,13 +143,15 @@ public void testIndexWorkflowOnlyStatusChange() throws SQLException {

// Change the record, but not the status, and re-index
wfs.setCorrelationId("new-correlation-id");
wfs.setUpdateTime("2023-02-07T08:44:45Z");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it hasn't changed
checkWorkflow("workflow-id", "RUNNING", "correlation-id");

// Change the status and re-index
wfs.setStatus(Workflow.WorkflowStatus.FAILED);
wfs.setUpdateTime("2023-02-07T08:45:45Z");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it has changed
Expand All @@ -172,9 +175,10 @@ public void testIndexTaskOnlyStatusChange() throws SQLException {

// Change the status and re-index
ts.setStatus(Task.Status.FAILED);
ts.setUpdateTime("2023-02-07T10:43:45Z");
indexDAO.indexTask(ts);

// retrieve the record, make sure it has changed
checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0");
checkTask("task-id", "FAILED", "2023-02-07 10:43:45.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) {
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setUpdateTime("2023-02-07T08:43:45Z");
wfs.setStatus(Workflow.WorkflowStatus.COMPLETED);
return wfs;
}
Expand Down Expand Up @@ -173,7 +174,7 @@ private void compareTaskSummary(TaskSummary ts) throws SQLException {

@Test
public void testIndexNewWorkflow() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-new");

indexDAO.indexWorkflow(wfs);

Expand All @@ -182,22 +183,44 @@ public void testIndexNewWorkflow() throws SQLException {

@Test
public void testIndexExistingWorkflow() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);

wfs.setStatus(Workflow.WorkflowStatus.FAILED);
wfs.setUpdateTime("2023-02-07T08:44:45Z");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);
}

@Test
public void testIndexExistingWorkflowWithOlderUpdateToEnsureItsNotIndexed()
throws SQLException {

WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing-no-index");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);

// Set the update time to the past
wfs.setUpdateTime("2023-02-07T08:42:45Z");
wfs.setStatus(Workflow.WorkflowStatus.FAILED);

indexDAO.indexWorkflow(wfs);

// Reset the workflow to check it's not been updated
wfs = getMockWorkflowSummary("workflow-id-existing-no-index");
compareWorkflowSummary(wfs);
}

@Test
public void testIndexNewTask() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");
TaskSummary ts = getMockTaskSummary("task-id-new");

indexDAO.indexTask(ts);

Expand All @@ -206,16 +229,36 @@ public void testIndexNewTask() throws SQLException {

@Test
public void testIndexExistingTask() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");
TaskSummary ts = getMockTaskSummary("task-id-existing");

indexDAO.indexTask(ts);

compareTaskSummary(ts);

ts.setUpdateTime("2023-02-07T09:43:45Z");
ts.setStatus(Task.Status.FAILED);

indexDAO.indexTask(ts);

compareTaskSummary(ts);
}

@Test
public void testIndexExistingTaskWithOlderUpdateToEnsureItsNotIndexed() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id-exiting-no-update");

indexDAO.indexTask(ts);

compareTaskSummary(ts);

// Set the update time to the past
ts.setUpdateTime("2023-02-07T09:41:45Z");
ts.setStatus(Task.Status.FAILED);

indexDAO.indexTask(ts);

// Reset the task to check it's not been updated
ts = getMockTaskSummary("task-id-exiting-no-update");
compareTaskSummary(ts);
}

Expand Down Expand Up @@ -275,7 +318,7 @@ public void testFullTextSearchWorkflowSummary() {

@Test
public void testJsonSearchWorkflowSummary() {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-summary");
wfs.setVersion(3);

indexDAO.indexWorkflow(wfs);
Expand All @@ -297,40 +340,40 @@ public void testJsonSearchWorkflowSummary() {
@Test
public void testSearchWorkflowSummaryPagination() {
for (int i = 0; i < 5; i++) {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-" + i);
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-pagination-" + i);
indexDAO.indexWorkflow(wfs);
}

List<String> orderBy = Arrays.asList(new String[] {"workflowId:DESC"});
SearchResult<WorkflowSummary> results =
indexDAO.searchWorkflowSummary("", "*", 0, 2, orderBy);
indexDAO.searchWorkflowSummary("", "workflow-id-pagination*", 0, 2, orderBy);
assertEquals("Wrong totalHits returned", 3, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-4",
"workflow-id-pagination-4",
results.getResults().get(0).getWorkflowId());
assertEquals(
"Results returned in wrong order",
"workflow-id-3",
"workflow-id-pagination-3",
results.getResults().get(1).getWorkflowId());
results = indexDAO.searchWorkflowSummary("", "*", 2, 2, orderBy);
assertEquals("Wrong totalHits returned", 5, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-2",
"workflow-id-pagination-2",
results.getResults().get(0).getWorkflowId());
assertEquals(
"Results returned in wrong order",
"workflow-id-1",
"workflow-id-pagination-1",
results.getResults().get(1).getWorkflowId());
results = indexDAO.searchWorkflowSummary("", "*", 4, 2, orderBy);
assertEquals("Wrong totalHits returned", 7, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-0",
"workflow-id-pagination-0",
results.getResults().get(0).getWorkflowId());
}

Expand All @@ -351,7 +394,7 @@ public void testSearchTaskSummary() {
@Test
public void testSearchTaskSummaryPagination() {
for (int i = 0; i < 5; i++) {
TaskSummary ts = getMockTaskSummary("task-id-" + i);
TaskSummary ts = getMockTaskSummary("task-id-pagination-" + i);
indexDAO.indexTask(ts);
}

Expand All @@ -361,29 +404,29 @@ public void testSearchTaskSummaryPagination() {
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-4",
"task-id-pagination-4",
results.getResults().get(0).getTaskId());
assertEquals(
"Results returned in wrong order",
"task-id-3",
"task-id-pagination-3",
results.getResults().get(1).getTaskId());
results = indexDAO.searchTaskSummary("", "*", 2, 2, orderBy);
assertEquals("Wrong totalHits returned", 5, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-2",
"task-id-pagination-2",
results.getResults().get(0).getTaskId());
assertEquals(
"Results returned in wrong order",
"task-id-1",
"task-id-pagination-1",
results.getResults().get(1).getTaskId());
results = indexDAO.searchTaskSummary("", "*", 4, 2, orderBy);
assertEquals("Wrong totalHits returned", 7, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-0",
"task-id-pagination-0",
results.getResults().get(0).getTaskId());
}

Expand Down

0 comments on commit 55a31af

Please sign in to comment.