Skip to content

Commit

Permalink
chore: add backoff for sql execution (#947)
Browse files Browse the repository at this point in the history
  • Loading branch information
dengmingtong authored Mar 6, 2024
1 parent fc2bffb commit 65109f5
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 38 deletions.
17 changes: 2 additions & 15 deletions src/analytics/lambdas/load-data-workflow/check-load-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { DeleteCommand } from '@aws-sdk/lib-dynamodb';
import { Context } from 'aws-lambda';
import { logger } from '../../../common/powertools';
import { aws_sdk_client_common_config } from '../../../common/sdk-client-config';
import { calculateWaitTime, WaitTimeInfo } from '../../../common/workflow';
import { ManifestBody } from '../../private/model';
import { getRedshiftClient } from '../redshift-data';

Expand Down Expand Up @@ -50,11 +51,6 @@ export interface CheckLoadStatusEvent {
waitTimeInfo: WaitTimeInfo;
}

interface WaitTimeInfo {
waitTime: number;
loopCount: number;
}

const redshiftDataApiClient = getRedshiftClient(REDSHIFT_DATA_API_ROLE_ARN);

/**
Expand Down Expand Up @@ -202,13 +198,4 @@ export const delFinishedJobInDynamodb = async (tableName: string, s3Uri: string)

const response = await ddbClient.send(new DeleteCommand(params));
return response;
};

function calculateWaitTime(waitTime: number, loopCount: number, maxWaitTime = 600) {
if (loopCount > 4) {
const additionalTime = (loopCount - 4) * 10;
waitTime += additionalTime;
}
loopCount++;
return { waitTime: Math.min(waitTime, maxWaitTime), loopCount };
}
};
23 changes: 16 additions & 7 deletions src/analytics/lambdas/sql-execution-sfn/sql-execution-step-fn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@

import { DescribeStatementCommand, RedshiftDataClient } from '@aws-sdk/client-redshift-data';
import { logger } from '../../../common/powertools';
import { calculateWaitTime, WaitTimeInfo } from '../../../common/workflow';
import { exeucteBySqlorS3File, getRedshiftClient } from '../redshift-data';

interface EventType {
waitTimeInfo: WaitTimeInfo;
queryId?: string;
sql?: string;
}

interface SubmitSqlResponse {
queryId: string;
waitTimeInfo: WaitTimeInfo;
}

interface QueryResponse {
status: string;
queryId: string;
reason?: string;
waitTimeInfo: WaitTimeInfo;
}

type ResponseType = SubmitSqlResponse | QueryResponse;
Expand All @@ -52,18 +56,18 @@ export const handler = async (event: EventType): Promise<ResponseType> => {
async function _handler(event: EventType): Promise<ResponseType> {

const redShiftClient = getRedshiftClient(dataAPIRole);
const waitTimeInfo = calculateWaitTime(event.waitTimeInfo.waitTime, event.waitTimeInfo.loopCount);
if (event.sql) {

return submitSql(event.sql, redShiftClient);
return submitSql(event.sql, redShiftClient, waitTimeInfo);
} else if (event.queryId) {
return queryStatus(event.queryId, redShiftClient);
return queryStatus(event.queryId, redShiftClient, waitTimeInfo);
} else {
logger.error('event', { event });
throw new Error('Invalid event');
}
}

async function submitSql(sqlOrs3File: string, redShiftClient: RedshiftDataClient): Promise<SubmitSqlResponse> {
async function submitSql(sqlOrs3File: string, redShiftClient: RedshiftDataClient, waitTimeInfo: WaitTimeInfo): Promise<SubmitSqlResponse> {
logger.info('submitSql() sqlOrs3File: ' + sqlOrs3File);

let provisionedRedshiftProps = undefined;
Expand All @@ -85,10 +89,13 @@ async function submitSql(sqlOrs3File: string, redShiftClient: RedshiftDataClient
}
const res = await exeucteBySqlorS3File(sqlOrs3File, redShiftClient, serverlessRedshiftProps, provisionedRedshiftProps, databaseName);
logger.info('submitSql() return queryId: ' + res.queryId);
return res;
return {
queryId: res.queryId,
waitTimeInfo,
};
}

async function queryStatus(queryId: string, redShiftClient: RedshiftDataClient): Promise<QueryResponse> {
async function queryStatus(queryId: string, redShiftClient: RedshiftDataClient, waitTimeInfo: WaitTimeInfo): Promise<QueryResponse> {
logger.info('queryStatus() queryId: ' + queryId);

const checkParams = new DescribeStatementCommand({
Expand All @@ -106,6 +113,7 @@ async function queryStatus(queryId: string, redShiftClient: RedshiftDataClient):
status: 'FINISHED',
queryId: queryId,
reason: errorMsg,
waitTimeInfo,
};
}
}
Expand All @@ -114,5 +122,6 @@ async function queryStatus(queryId: string, redShiftClient: RedshiftDataClient):
status: response.Status!,
queryId: queryId,
reason: errorMsg,
waitTimeInfo,
};
}
}
21 changes: 17 additions & 4 deletions src/analytics/private/sql-exectution-stepfuncs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { Function, Runtime } from 'aws-cdk-lib/aws-lambda';
import { RetentionDays } from 'aws-cdk-lib/aws-logs';
import {
StateMachine, TaskInput, Wait, WaitTime, Succeed, Choice, Map,
Condition, Fail, DefinitionBody, JsonPath, LogLevel,
Condition, Fail, DefinitionBody, JsonPath, LogLevel, Pass,
} from 'aws-cdk-lib/aws-stepfunctions';
import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { Construct } from 'constructs';
Expand Down Expand Up @@ -54,13 +54,14 @@ export function createSQLExecutionStepFunctions(scope: Construct, props: SQLExec
});

const wait1 = new Wait(scope, 'Wait #1', {
time: WaitTime.duration(Duration.seconds(2)),
time: WaitTime.secondsPath('$.waitTimeInfo.waitTime'),
});

const checkStatus = new LambdaInvoke(scope, 'Check Status', {
lambdaFunction: fn,
payload: TaskInput.fromObject({
'queryId.$': '$.queryId',
'waitTimeInfo.$': '$.waitTimeInfo',
}),
outputPath: '$.Payload',
});
Expand All @@ -75,10 +76,22 @@ export function createSQLExecutionStepFunctions(scope: Construct, props: SQLExec
const isDoneChoice = new Choice(scope, 'Is Done?');

const checkStatusAgain = new Wait(scope, 'Wait #2', {
time: WaitTime.duration(Duration.seconds(2)),
time: WaitTime.secondsPath('$.waitTimeInfo.waitTime'),
}).next(checkStatus);

const definition = submitSQL.next(wait1).next(checkStatus).next(isDoneChoice);
const initWaitTimeInfo = new Pass(scope, 'Init wait time info', {
parameters: {
waitTime: 2,
loopCount: 0,
},
resultPath: '$.waitTimeInfo',
});

const definition = initWaitTimeInfo
.next(submitSQL)
.next(wait1)
.next(checkStatus)
.next(isDoneChoice);

isDoneChoice.when(Condition.stringEquals('$.status', 'FAILED'), new Fail(scope, 'Fail'));
isDoneChoice.when(Condition.stringEquals('$.status', 'FINISHED'), new Succeed(scope, 'Succeed'));
Expand Down
27 changes: 27 additions & 0 deletions src/common/workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
* with the License. A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES
* OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

export interface WaitTimeInfo {
waitTime: number;
loopCount: number;
}

export function calculateWaitTime(waitTime: number, loopCount: number, maxWaitTime = 600) {
if (loopCount > 4) {
const additionalTime = (loopCount - 4) * 10;
waitTime += additionalTime;
}
loopCount++;
return { waitTime: Math.min(waitTime, maxWaitTime), loopCount };
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ beforeEach(async () => {
s3Mock.reset();
});

const waitTimeInfo = {
waitTime: 2,
loopCount: 0,
};

test('handler submit sql - s3 file', async () => {
const event = { sql: 's3://test/test.sql' };
const event = {
sql: 's3://test/test.sql',
waitTimeInfo,
};
s3Mock.on(GetObjectCommand).resolves({
Body: {
transformToString: () => { return 'select * from test'; },
Expand All @@ -47,7 +55,13 @@ test('handler submit sql - s3 file', async () => {

const response = await handler(event);

expect(response).toEqual({ queryId: 'id-1' });
expect(response).toEqual({
queryId: 'id-1',
waitTimeInfo: {
waitTime: 2,
loopCount: 1,
},
});
expect(s3Mock).toHaveReceivedCommandTimes(GetObjectCommand, 1);
expect(redshiftDataMock).toHaveReceivedCommandTimes(ExecuteStatementCommand, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ beforeEach(async () => {
s3Mock.reset();
});

const waitTimeInfo = {
waitTime: 2,
loopCount: 0,
};

test('handler submit sql - s3 file', async () => {
const event = { sql: 's3://test/test.sql' };
const event = {
sql: 's3://test/test.sql',
waitTimeInfo,
};
s3Mock.on(GetObjectCommand).resolves({
Body: {
transformToString: () => { return 'select * from test'; },
Expand All @@ -47,7 +55,13 @@ test('handler submit sql - s3 file', async () => {

const response = await handler(event);

expect(response).toEqual({ queryId: 'id-1' });
expect(response).toEqual({
queryId: 'id-1',
waitTimeInfo: {
waitTime: 2,
loopCount: 1,
},
});
expect(s3Mock).toHaveReceivedCommandTimes(GetObjectCommand, 1);
expect(redshiftDataMock).toHaveReceivedCommandTimes(ExecuteStatementCommand, 1);

Expand All @@ -63,12 +77,21 @@ test('handler submit sql - s3 file', async () => {


test('handler submit sql - raw sql', async () => {
const event = { sql: 'select * from test1' };
const event = {
sql: 'select * from test1',
waitTimeInfo,
};
redshiftDataMock.on(ExecuteStatementCommand).resolves({ Id: 'id-1' });

const response = await handler(event);

expect(response).toEqual({ queryId: 'id-1' });
expect(response).toEqual({
queryId: 'id-1',
waitTimeInfo: {
waitTime: 2,
loopCount: 1,
},
});
expect(s3Mock).toHaveReceivedCommandTimes(GetObjectCommand, 0);
expect(redshiftDataMock).toHaveReceivedCommandTimes(ExecuteStatementCommand, 1);

Expand All @@ -84,37 +107,58 @@ test('handler submit sql - raw sql', async () => {


test('handler query result - finished', async () => {
const event = { queryId: 'queryId-111' };
const event = {
queryId: 'queryId-111',
waitTimeInfo,
};
redshiftDataMock.on(DescribeStatementCommand).resolves({ Status: 'FINISHED' });
const response = await handler(event);
expect(response).toEqual({
status: 'FINISHED',
queryId: 'queryId-111',
reason: undefined,
waitTimeInfo: {
waitTime: 2,
loopCount: 1,
},
});

});

test('handler query result - failed', async () => {
const event = { queryId: 'queryId-111' };
const event = {
queryId: 'queryId-111',
waitTimeInfo,
};
redshiftDataMock.on(DescribeStatementCommand).resolves({ Status: 'FAILED', Error: 'error' });
const response = await handler(event);
expect(response).toEqual({
status: 'FAILED',
queryId: 'queryId-111',
reason: 'error',
waitTimeInfo: {
waitTime: 2,
loopCount: 1,
},
});
});


test('handler query result - object already exists', async () => {
const event = { queryId: 'queryId-111' };
const event = {
queryId: 'queryId-111',
waitTimeInfo,
};
redshiftDataMock.on(DescribeStatementCommand).resolves({ Status: 'FAILED', Error: 'table xxxx already exists' });
const response = await handler(event);
expect(response).toEqual({
status: 'FINISHED',
queryId: 'queryId-111',
reason: 'table xxxx already exists',
waitTimeInfo: {
waitTime: 2,
loopCount: 1,
},
});
});

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"Fn::Join": [
"",
[
"{\"StartAt\":\"Execute SQL statements\",\"States\":{\"Execute SQL statements\":{\"Type\":\"Map\",\"End\":true,\"Parameters\":{\"sql.$\":\"$$.Map.Item.Value\"},\"Iterator\":{\"StartAt\":\"Submit SQL\",\"States\":{\"Submit SQL\":{\"Next\":\"Wait #1\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ClientExecutionTimeoutException\",\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2},{\"ErrorEquals\":[\"Lambda.TooManyRequestsException\"],\"IntervalSeconds\":3,\"MaxAttempts\":10,\"BackoffRate\":2}],\"Type\":\"Task\",\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
"{\"StartAt\":\"Execute SQL statements\",\"States\":{\"Execute SQL statements\":{\"Type\":\"Map\",\"End\":true,\"Parameters\":{\"sql.$\":\"$$.Map.Item.Value\"},\"Iterator\":{\"StartAt\":\"Init wait time info\",\"States\":{\"Init wait time info\":{\"Type\":\"Pass\",\"ResultPath\":\"$.waitTimeInfo\",\"Parameters\":{\"waitTime\":2,\"loopCount\":0},\"Next\":\"Submit SQL\"},\"Submit SQL\":{\"Next\":\"Wait #1\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ClientExecutionTimeoutException\",\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2},{\"ErrorEquals\":[\"Lambda.TooManyRequestsException\"],\"IntervalSeconds\":3,\"MaxAttempts\":10,\"BackoffRate\":2}],\"Type\":\"Task\",\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
Expand All @@ -13,7 +13,7 @@
"Arn"
]
},
"\",\"Payload.$\":\"$\"}},\"Wait #1\":{\"Type\":\"Wait\",\"Seconds\":2,\"Next\":\"Check Status\"},\"Check Status\":{\"Next\":\"Is Done?\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ClientExecutionTimeoutException\",\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2},{\"ErrorEquals\":[\"Lambda.TooManyRequestsException\"],\"IntervalSeconds\":3,\"MaxAttempts\":10,\"BackoffRate\":2}],\"Type\":\"Task\",\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
"\",\"Payload.$\":\"$\"}},\"Wait #1\":{\"Type\":\"Wait\",\"SecondsPath\":\"$.waitTimeInfo.waitTime\",\"Next\":\"Check Status\"},\"Check Status\":{\"Next\":\"Is Done?\",\"Retry\":[{\"ErrorEquals\":[\"Lambda.ClientExecutionTimeoutException\",\"Lambda.ServiceException\",\"Lambda.AWSLambdaException\",\"Lambda.SdkClientException\"],\"IntervalSeconds\":2,\"MaxAttempts\":6,\"BackoffRate\":2},{\"ErrorEquals\":[\"Lambda.TooManyRequestsException\"],\"IntervalSeconds\":3,\"MaxAttempts\":10,\"BackoffRate\":2}],\"Type\":\"Task\",\"OutputPath\":\"$.Payload\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
Expand All @@ -24,7 +24,7 @@
"Arn"
]
},
"\",\"Payload\":{\"queryId.$\":\"$.queryId\"}}},\"Wait #2\":{\"Type\":\"Wait\",\"Seconds\":2,\"Next\":\"Check Status\"},\"Is Done?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Fail\"},{\"Variable\":\"$.status\",\"StringEquals\":\"FINISHED\",\"Next\":\"Succeed\"}],\"Default\":\"Wait #2\"},\"Fail\":{\"Type\":\"Fail\"},\"Succeed\":{\"Type\":\"Succeed\"}}},\"ItemsPath\":\"$.sqls\",\"MaxConcurrency\":1}},\"TimeoutSeconds\":7200,\"Comment\":\"Execute SQL in Redshift using Redshift Data API\"}"
"\",\"Payload\":{\"queryId.$\":\"$.queryId\",\"waitTimeInfo.$\":\"$.waitTimeInfo\"}}},\"Wait #2\":{\"Type\":\"Wait\",\"SecondsPath\":\"$.waitTimeInfo.waitTime\",\"Next\":\"Check Status\"},\"Is Done?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Fail\"},{\"Variable\":\"$.status\",\"StringEquals\":\"FINISHED\",\"Next\":\"Succeed\"}],\"Default\":\"Wait #2\"},\"Fail\":{\"Type\":\"Fail\"},\"Succeed\":{\"Type\":\"Succeed\"}}},\"ItemsPath\":\"$.sqls\",\"MaxConcurrency\":1}},\"TimeoutSeconds\":7200,\"Comment\":\"Execute SQL in Redshift using Redshift Data API\"}"
]
]
}

0 comments on commit 65109f5

Please sign in to comment.