Skip to content

Commit

Permalink
Merge pull request #112 from provectus/DQ-154
Browse files Browse the repository at this point in the history
[MAINTENANCE] Added unit-test for rule based algorithm
  • Loading branch information
bvolodarskiy authored Aug 15, 2023
2 parents f1bcd77 + 33e82a3 commit f2d5a7d
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 18 deletions.
35 changes: 19 additions & 16 deletions functions/data_test/data_test/profiling.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import math

import numpy as np
from ydata_profiling import ProfileReport
import os
import boto3
Expand Down Expand Up @@ -28,7 +28,7 @@
qa_bucket_name = os.environ['BUCKET']


def generic_expectations_without_null(name, summary, batch, *args):
def expectations_unique(name, summary, batch, *args):
batch.expect_column_to_exist(column=name)
if summary["p_unique"] >= 0.9:
batch.expect_column_values_to_be_unique(column=name)
Expand All @@ -50,9 +50,8 @@ def expectations_mean(name, summary, batch, *args):

def expectations_median(name, summary, batch, *args):
min_median, max_median = calculate_median(summary)
if min_median and max_median:
batch.expect_column_median_to_be_between(
column=name, min_value=min_median, max_value=max_median)
batch.expect_column_median_to_be_between(
column=name, min_value=min_median, max_value=max_median)
return name, summary, batch


Expand All @@ -64,11 +63,10 @@ def expectations_stdev(name, summary, batch, *args):


def expectations_quantile(name, summary, batch, *args):
value_ranges = calculate_q_ranges(summary)
q_ranges = {
"quantiles": [0.05, 0.25, 0.5, 0.75, 0.95],
"value_ranges": [[summary["5%"], summary["25%"]], [summary["25%"], summary["50%"]],
[summary["50%"], summary["75%"]], [summary["75%"], summary["95%"]],
[summary["95%"], summary["max"]]]
"value_ranges": value_ranges
}
batch.expect_column_quantile_values_to_be_between(
column=name, quantile_ranges=q_ranges)
Expand Down Expand Up @@ -100,7 +98,7 @@ def __init__(self, typeset, *args, **kwargs):
expectations_null,
],
"Numeric": [
generic_expectations_without_null,
expectations_unique,
expectations_null,
expectation_algorithms.numeric_expectations,
expectations_mean,
Expand Down Expand Up @@ -209,19 +207,18 @@ def calculate_mean(summary):


def calculate_median(summary):
min_median = None
max_median = None
raw_values = summary["value_counts_index_sorted"]
values = []
for key, v in raw_values.items():
key = [key] * v
values.extend(key)
q = 0.5
j = int(len(values) * q - 2.58 * math.sqrt(len(values) * q * (1 - q)))
k = int(len(values) * q + 2.58 * math.sqrt(len(values) * q * (1 - q)))
if j < len(values) and k < len(values):
min_median = values[j]
max_median = values[k]
k = int(len(values) * q + 2.58 * math.sqrt(len(values) * q * (1 - q))) - 1
if j >= 1:
j -= 1
min_median = values[j]
max_median = values[k]
return min_median, max_median


Expand All @@ -244,10 +241,16 @@ def calculate_z_score(summary):
maximum = summary["max"]
significance_level = 0.005
threshold = (maximum - mean) / std
if std:
if std and not np.isnan(std):
return threshold + significance_level


def calculate_q_ranges(summary):
return [[summary["5%"], summary["25%"]], [summary["25%"], summary["50%"]],
[summary["50%"], summary["75%"]], [summary["75%"], summary["95%"]],
[summary["95%"], summary["max"]]]


def profile_data(df, suite_name, cloudfront, datasource_root, source_covered,
mapping_config, run_name):
qa_bucket = s3.Bucket(qa_bucket_name)
Expand Down
224 changes: 222 additions & 2 deletions functions/data_test/tests/test_profiling.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,78 @@
import pytest
import numpy as np
from profiling import (add_local_s3_to_stores,
read_gx_config_file)
read_gx_config_file,
expectations_unique,
expectations_null,
expectations_mean,
calculate_mean,
calculate_stdev,
expectations_stdev,
calculate_z_score,
expectations_z_score,
expectations_quantile,
calculate_q_ranges,
calculate_median,
expectations_median,
change_ge_config)
import great_expectations as gx
import pandas as pd
from datetime import datetime

ENDPOINT_URL = "http://localhost:4566"
summary_template = {
"n_distinct": 418,
"p_distinct": 1.0,
"is_unique": True,
"n_unique": 418,
"p_unique": 1.0,
"type": "Numeric",
"hashable": True,
"value_counts_without_nan": "892",
"value_counts_index_sorted": pd.Series({892: 1, 893: 1, 894: 1, 1004: 2, 1500: 1}),
"ordering": True,
"n_missing": 0,
"n": 418,
"p_missing": 0.0,
"count": 418,
"memory_size": 3472,
"n_negative": "0",
"p_negative": 0.0,
"n_infinite": "0",
"n_zeros": 0,
"mean": 1100.5,
"std": 120.81045760473994,
"variance": 14595.166666666666,
"min": "892",
"max": "1309",
"kurtosis": -1.2,
"skewness": 0.0,
"sum": "460009",
"mad": 104.5,
"range": "417",
"5%": 912.85,
"25%": 996.25,
"50%": 1100.5,
"75%": 1204.75,
"95%": 1288.15,
"iqr": 208.5,
"cv": 0.1097777897362471,
"p_zeros": 0.0,
"p_infinite": 0.0,
"monotonic_increase": True,
"monotonic_decrease": False,
"monotonic_increase_strict": True,
"monotonic_decrease_strict": False,
"monotonic": 2,
"histogram": ["[9]"]
}


@pytest.mark.parametrize("stores, expected_output", [
({"store1": {"store_backend": {"type": "s3", "bucket": "my-bucket"}}},
{"store1": {"store_backend": {"type": "s3", "bucket": "my-bucket",
"boto3_options":
{"endpoint_url": ENDPOINT_URL}}}}),
{"endpoint_url": ENDPOINT_URL}}}}),
({}, {})
])
def test_add_local_s3_to_stores(stores, expected_output):
Expand All @@ -26,3 +89,160 @@ def test_gx_config_file_path_is_not_none(tmpdir):
p.write("config_version: 10.0")
config_file = read_gx_config_file(path=p)
assert config_file["config_version"] == 10.0


def change_template(params, params_name):
name_expected = "PassengerId"
summary_expected = summary_template
for param, name in zip(params, params_name):
summary_expected[name] = param
return name_expected, summary_expected


@pytest.fixture(autouse=True)
def before_and_after_test():
df = pd.DataFrame(columns=['PassengerId'])
context_gx = gx.get_context()
suite_name = f"test_{datetime.now()}"
datasource = context_gx.sources.add_pandas(name=suite_name)
data_asset = datasource.add_dataframe_asset(name=suite_name, dataframe=df)
batch_request = data_asset.build_batch_request()
batch_empty = context_gx.get_validator(
batch_request=batch_request,
)

yield batch_empty

context_gx.delete_datasource(suite_name)


@pytest.mark.parametrize("p_unique, applied", [(0.95, True), (0.9, True), (0.1, False)])
def test_expectations_unique(p_unique, applied, before_and_after_test):
p_unique = eval("p_unique")
applied = eval("applied")
name_expected, summary_expected = change_template([p_unique], ["p_unique"])
expectation_type = "expect_column_values_to_be_unique"
batch_empty = before_and_after_test

name, summary, batch = expectations_unique(name_expected, summary_expected, batch_empty)

assert name == name_expected
assert "expect_column_to_exist" in str(batch.expectation_suite)
assert (expectation_type in str(batch.expectation_suite)) == applied


@pytest.mark.parametrize("p_missing, applied", [(0.4, True), (0.2, True), (0.5, False)])
def test_expectations_null(p_missing, applied, before_and_after_test):
p_missing = eval("p_missing")
applied = eval("applied")
name_expected, summary_expected = change_template([p_missing], ["p_missing"])
expectation_type = "expect_column_values_to_not_be_null"
batch_empty = before_and_after_test

name, summary, batch = expectations_null(name_expected, summary_expected, batch_empty)

assert name == name_expected
assert (expectation_type in str(batch.expectation_suite)) == applied


@pytest.mark.parametrize("n,std,mean,max_mean,min_mean",
[(418, 120.81045760473994, 1100.5, 1106.349942307408, 1094.650057692592)])
def test_expectations_mean(n, std, mean, max_mean, min_mean, before_and_after_test):
n = eval("n")
std = eval("std")
mean = eval("mean")
max_mean_expected = eval("max_mean")
min_mean_expected = eval("min_mean")
name_expected, summary_expected = change_template([n, std, mean], ["n", "std", "mean"])
expectation_type = "expect_column_mean_to_be_between"
batch_empty = before_and_after_test

min_mean, max_mean = calculate_mean(summary_expected)
name, summary, batch = expectations_mean(name_expected, summary_expected, batch_empty)

assert (min_mean == min_mean_expected and max_mean == max_mean_expected)
assert name == name_expected
assert expectation_type in str(batch.expectation_suite)


@pytest.mark.parametrize("n,std,max_std,min_std",
[(418, 120.81045760473994, 136.10108739120102, 105.51982781827887)])
def test_expectations_stdev(n, std, max_std, min_std, before_and_after_test):
n = eval("n")
std = eval("std")
max_std_expected = eval("max_std")
min_std_expected = eval("min_std")
name_expected, summary_expected = change_template([n, std], ["n", "std"])
expectation_type = "expect_column_stdev_to_be_between"
batch_empty = before_and_after_test

min_std, max_std = calculate_stdev(summary_expected)
name, summary, batch = expectations_stdev(name_expected, summary_expected, batch_empty)

assert (min_std == min_std_expected and max_std == max_std_expected)
assert name == name_expected
assert expectation_type in str(batch.expectation_suite)


@pytest.mark.parametrize("mean,std,max,threshold,applied",
[(418, 120.81045760473994, 1309, 7.380189347557294, True),
(418, np.nan, 1309, None, False)])
def test_expectations_z_score(mean, std, max, threshold, applied, before_and_after_test):
mean = eval("mean")
std = eval("std")
max = eval("max")
threshold_expected = eval("threshold")
applied = eval("applied")
name_expected, summary_expected = change_template([mean, std, max], ["mean", "std", "max"])
expectation_type = "expect_column_value_z_scores_to_be_less_than"
batch_empty = before_and_after_test

threshold = calculate_z_score(summary_expected)
name, summary, batch = expectations_z_score(name_expected, summary_expected, batch_empty)

assert threshold == threshold_expected
assert name == name_expected
assert (expectation_type in str(batch.expectation_suite)) == applied


@pytest.mark.parametrize("q1,q2,q3,q4,q5,q6",
[(912.85, 996.25, 1100.5, 1204.75, 1288.15, 1309)])
def test_expectations_quantile(q1, q2, q3, q4, q5, q6, before_and_after_test):
q1 = eval("q1")
q2 = eval("q2")
q3 = eval("q3")
q4 = eval("q4")
q5 = eval("q5")
q6 = eval("q6")
expected_ranges = [[q1, q2], [q2, q3],
[q3, q4], [q4, q5],
[q5, q6]]
name_expected, summary_expected = change_template([q1, q2, q3, q4, q5, q6],
["5%", "25%", "50%", "75%", "95%", "max"])
expectation_type = "expect_column_quantile_values_to_be_between"
batch_empty = before_and_after_test

q_ranges = calculate_q_ranges(summary_expected)
name, summary, batch = expectations_quantile(name_expected, summary_expected, batch_empty)

assert expected_ranges == q_ranges
assert name == name_expected
assert expectation_type in str(batch.expectation_suite)

@pytest.mark.parametrize("min_median,max_median,value_counts_index_sorted,applied",
[(892, 1500, pd.Series({892: 1, 893: 1, 894: 1, 1004: 2, 1500: 1}), True)])
def test_expectations_median(min_median, max_median, value_counts_index_sorted, applied, before_and_after_test):
min_median_expected = eval("min_median")
max_median_expected = eval("max_median")
value_counts_index_sorted = eval("value_counts_index_sorted")
applied = eval("applied")
name_expected, summary_expected = change_template([value_counts_index_sorted], ["value_counts_index_sorted"])
expectation_type = "expect_column_median_to_be_between"
batch_empty = before_and_after_test

min_median, max_median = calculate_median(summary_expected)
name, summary, batch = expectations_median(name_expected, summary_expected, batch_empty)

assert (min_median == min_median_expected and max_median == max_median_expected)
assert name == name_expected
assert (expectation_type in str(batch.expectation_suite)) == applied

0 comments on commit f2d5a7d

Please sign in to comment.