-
Notifications
You must be signed in to change notification settings - Fork 0
/
score_non_recom_plans.py
107 lines (93 loc) · 6.02 KB
/
score_non_recom_plans.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from plan_metrics import PlanMetrics
from gerrychain import Graph, Election, Partition
from gerrychain.updaters import Tally
import pandas as pd
import argparse
import json
import warnings
from configuration import *
from glob import glob
from functools import reduce
import os
SUPPORTED_METRIC_IDS = list(SUPPORTED_METRICS.keys())
HOMEDIR = os.path.expanduser('~')
parser = argparse.ArgumentParser(description="VTD Plan Scorer",
prog="score_non_ensemble_plans.py")
parser.add_argument("st", metavar="state", type=str,
choices=SUPPORTED_STATES,
help="Which state to run the ensemble on?")
parser.add_argument("map", metavar="map", type=str,
choices=SUPPORTED_PLAN_TYPES,
help="the map to redistrict")
parser.add_argument("--proposed_plan_dirs", type=str, nargs="*", metavar="proposed plan directory",
help="list of directories with proposed plan CSVs, doesnt need to be in brackets", default=[])
parser.add_argument("--citizen_plans_files", type=str, nargs="*", metavar="citzen plan files",
help="list of files with citizen plan CSVs", default=[])
parser.add_argument("--dropbox", action="store_const", const=True, default=False,
help="Save the output files to dropbox folder? (default: save in this repo)")
args = parser.parse_args()
STATE = args.st
PLAN_TYPE = args.map
DROPBOX = args.dropbox
proposed_dirs = args.proposed_plan_dirs
citizen_paths = args.citizen_plans_files
with open("{}/{}.json".format(STATE_SPECS_DIR, STATE)) as fin:
state_specification = json.load(fin)
k = state_specification["districts"][PLAN_TYPE]
dual_graph_file = "{}/{}".format(DUAL_GRAPH_DIR, state_specification["dual_graph"][PLAN_TYPE])
pop_col = state_specification["pop_col"]
county_col = state_specification["county_col"]
party = state_specification["pov_party"]
elections = state_specification["elections"]
demographic_cols = [m["id"] for m in state_specification["metrics"] if "type" in m and m["type"] == "col_tally"]
state_metric_ids = set([m["id"] for m in state_specification["metrics"] if "type" not in m or m["type"] != "col_tally"])
state_metrics = [{**m, "type": SUPPORTED_METRICS["col_tally"]} if ("type" in m and m["type"] == "col_tally") \
else {**m, "type": SUPPORTED_METRICS[m["id"]]} \
for m in filter(lambda m: m["id"] in SUPPORTED_METRIC_IDS or ("type" in m and m["type"] == "col_tally"),
state_specification["metrics"])]
municipality_col = state_specification["municipal_col"] if "num_municipal_pieces" in state_metric_ids or "num_split_municipalities" in state_metric_ids else None
incumbent_col = state_specification["incumbent_cols"][PLAN_TYPE] if "num_double_bunked" in state_metric_ids or "num_zero_bunked" in state_metric_ids else None
if len(state_metric_ids - set(SUPPORTED_METRIC_IDS)) > 0:
warnings.warn("Some state metrics are not supported. Will continue without tracking them.\n.\
Unsupported metrics: {}".format(str(state_metric_ids - set(SUPPORTED_METRIC_IDS))))
election_names = [e["name"] for e in elections]
## sort candidates alphabetically so that the "first" party is consistent.
election_updaters = {e["name"]: Election(e["name"], {c["name"]: c["key"] for c in sorted(e["candidates"],
key=lambda c: c["name"])})
for e in elections}
demographic_updaters = {demo_col: Tally(demo_col, alias=demo_col) for demo_col in demographic_cols + [incumbent_col]}
graph = Graph.from_json(dual_graph_file)
scores = PlanMetrics(graph, election_names, party, pop_col, state_metrics, updaters=election_updaters,
county_col=county_col, demographic_cols=demographic_cols,
municipality_col=municipality_col, incumbent_col=incumbent_col)
if DROPBOX:
output_path_proposed = f"{HOMEDIR}/Dropbox/PlanAnalysis/proposed_plans/{STATE}/{PLAN_TYPE}/proposed_plans.jsonl"
output_path_citizen = f"{STATE}/plan_stats/{STATE.lower()}_{PLAN_TYPE}_citizen_plans.jsonl"
else:
output_path_proposed = f"{STATE}/plan_stats/{PLAN_TYPE}_proposed_plans.jsonl"
output_path_citizen = f"{STATE}/plan_stats/{PLAN_TYPE}_citizen_plans.jsonl"
if proposed_dirs != []:
with open(output_path_proposed, "w") as fout:
print(json.dumps(scores.summary_data(elections, num_districts=k, ensemble=False)), file=fout)
plans = reduce(lambda acc, proposed_dir: acc + glob(f"{proposed_dir}/*.csv"), proposed_dirs, [])
for plan_path in (plans):
name = plan_path.split("/")[-1].split(".csv")[0]
plan = pd.read_csv(plan_path, dtype={"GEOID20": "str", "assignment": int}).set_index("GEOID20").to_dict()['assignment']
ddict = {n: plan[graph.nodes()[n]["GEOID20"]] for n in graph.nodes()}
part = Partition(graph, ddict, {**election_updaters, **demographic_updaters})
print(json.dumps(scores.plan_summary(part, plan_type="proposed_plan",
plan_name=name)), file=fout)
if citizen_paths != []:
with open(output_path_citizen, "w") as fout:
print(json.dumps(scores.summary_data(elections, num_districts=k, ensemble=False)), file=fout)
citizen_ens = reduce(lambda acc, citizen_ens: pd.merge(left=acc, right=citizen_ens, on="GEOID20"),
[pd.read_csv(citizen_ens, dtype={"GEOID20": "str"}).set_index("GEOID20")
for citizen_ens in citizen_paths])
plans = citizen_ens.to_dict()
for plan_id, plan in (plans.items()):
try:
ddict = {n: int(plan[graph.nodes()[n]["GEOID20"]]) for n in graph.nodes()}
part = Partition(graph, ddict, {**election_updaters, **demographic_updaters})
print(json.dumps(scores.plan_summary(part, plan_type="citizen_plan", plan_name=plan_id)), file=fout)
except:
pass