forked from miztiik/my-first-cdk-project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
custom_cloudwatch_metrics.py
78 lines (69 loc) · 4.18 KB
/
custom_cloudwatch_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from aws_cdk import aws_cloudwatch as _cloudwatch
from aws_cdk import aws_lambda as _lambda
from aws_cdk import aws_logs as _logs
from aws_cdk import core
class CustomMetricsStack(core.Stack):
def __init__(self, scope: core.Construct, id: str, ** kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Read Lambda Code):
try:
with open("serverless_stacks/lambda_src/konstone_custom_metric_log_generator.py", mode="r") as f:
konstone_custom_metric_fn_code = f.read()
except OSError:
print("Unable to read Lambda Function Code")
konstone_custom_metric_fn = _lambda.Function(self,
"konstoneFunction",
function_name="konstone_custom_metric_fn",
runtime=_lambda.Runtime.PYTHON_3_7,
handler="index.lambda_handler",
code=_lambda.InlineCode(
konstone_custom_metric_fn_code),
timeout=core.Duration.seconds(
3),
reserved_concurrent_executions=1,
environment={
"LOG_LEVEL": "INFO",
"PERCENTAGE_ERRORS": "75"
}
)
# Create Custom Loggroup
# /aws/lambda/function-name
konstone_custom_metric_lg = _logs.LogGroup(self,
"konstoneLoggroup",
log_group_name=f"/aws/lambda/{konstone_custom_metric_fn.function_name}",
removal_policy=core.RemovalPolicy.DESTROY,
retention=_logs.RetentionDays.ONE_DAY,
)
# Create Custom Metric Namespace
third_party_error_metric = _cloudwatch.Metric(
namespace=f"third-party-error-metric",
metric_name="third_party_error_metric",
label="Total No. of Third Party API Errors",
period=core.Duration.minutes(1),
statistic="Sum"
)
# Create Custom Metric Log Filter
third_party_error_metric_filter = _logs.MetricFilter(self,
"thirdPartyApiErrorMetricFilter",
filter_pattern=_logs.FilterPattern.boolean_value(
"$.third_party_api_error", True),
log_group=konstone_custom_metric_lg,
metric_namespace=third_party_error_metric.namespace,
metric_name=third_party_error_metric.metric_name,
default_value=0,
metric_value="1"
)
# Create Third Party Error Alarm
third_party_error_alarm = _cloudwatch.Alarm(
self,
"thirdPartyApiErrorAlarm",
alarm_description="Alert if 3rd party API has more than 2 errors in the last two minutes",
alarm_name="third-party-api-alarm",
metric=third_party_error_metric,
comparison_operator=_cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
threshold=2,
evaluation_periods=2,
datapoints_to_alarm=1,
period=core.Duration.minutes(1),
treat_missing_data=_cloudwatch.TreatMissingData.NOT_BREACHING
)