-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.py
183 lines (133 loc) · 7.58 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import multiprocessing
import os
import pathlib
import re
from collections import defaultdict
from typing import DefaultDict, Dict, List
import networkx as nx
from common import Configuration
from include_analysis import IncludeAnalysisOutput
def get_worker_count():
try:
return len(os.sched_getaffinity(0))
except AttributeError:
return multiprocessing.cpu_count()
def load_config(name: str):
config_file = pathlib.Path(__file__).parent.joinpath("configs", name).with_suffix(".json")
if not config_file.exists():
print(f"error: no config file found: {config_file}")
return 1
config = Configuration.parse_file(config_file)
# TODO - Make it recursive so it can handle deeply nested configs
# TODO - Maybe warn about duplicates when merging?
# Merge the dependencies (maybe this should be made generic to sub-configs?)
for dependency in config.dependencies:
# Dependency paths are automatically added to include directories
config.includeDirs.append(dependency)
if isinstance(config.dependencies[dependency], str):
dependency_config_file = config_file.parent.joinpath(config.dependencies[dependency])
if not dependency_config_file.exists():
print(f"error: no config file found: {dependency_config_file}")
return 1
dependency_config = Configuration.parse_file(dependency_config_file)
dependency_ignores = dependency_config.ignores
else:
dependency_ignores = config.dependencies[dependency].ignores
# Files to skip are relative to the source root
for file_to_skip in dependency_ignores.skip:
config.ignores.skip.append(str(pathlib.Path(dependency).joinpath(file_to_skip)))
for op in ("add", "remove"):
# Filenames are relative to the source root
for filename in getattr(dependency_ignores, op).filenames:
getattr(config.ignores, op).filenames.append(str(pathlib.Path(dependency).joinpath(filename)))
# Headers are accessible both internally and externally, so include them as-is and
# also include them relative to the source root for top-level inclusion
for header in getattr(dependency_ignores, op).headers:
headers = getattr(config.ignores, op).headers
headers.append(header)
headers.append(str(pathlib.Path(dependency).joinpath(header)))
# Edges are only processed if their file is, and that file is relative to the source root
for filename, header in getattr(dependency_ignores, op).edges:
getattr(config.ignores, op).edges.append((str(pathlib.Path(dependency).joinpath(filename)), header))
return config
def get_include_analysis_edge_sizes(include_analysis: IncludeAnalysisOutput, include_directories: List[str] = None):
# Strip off the path prefix for generated file includes so matching will work
generated_file_prefix = re.compile(r"^(?:out/\w+/gen/)?(.*)$")
edge_sizes = {}
if include_directories is None:
include_directories = []
for filename in include_analysis["esizes"]:
edge_sizes[filename] = {}
for include, size in include_analysis["esizes"][filename].items():
includes = [include]
# If an include is in an include directory, strip that prefix and add it to edge sizes for matching
for include_directory in include_directories:
include_directory = include_directory if include_directory.endswith("/") else f"{include_directory}/"
if include.startswith(include_directory):
includes.append(include[len(include_directory) :])
for include in includes:
include = generated_file_prefix.match(include).group(1)
edge_sizes[filename][include] = size
return edge_sizes
def get_include_analysis_edge_prevalence(
include_analysis: IncludeAnalysisOutput, include_directories: List[str] = None
):
# Strip off the path prefix for generated file includes so matching will work
generated_file_prefix = re.compile(r"^(?:out/\w+/gen/)?(.*)$")
files = include_analysis["files"]
root_count = len(include_analysis["roots"])
edge_prevalence: DefaultDict[str, Dict[str, float]] = defaultdict(dict)
if include_directories is None:
include_directories = []
for filename in files:
for include in include_analysis["includes"][filename]:
includes = [include]
# If an include is in an include directory, strip that prefix and add it for matching
for include_directory in include_directories:
include_directory = include_directory if include_directory.endswith("/") else f"{include_directory}/"
if include.startswith(include_directory):
includes.append(include[len(include_directory) :])
for include in includes:
include = generated_file_prefix.match(include).group(1)
edge_prevalence[filename][include] = (100.0 * include_analysis["prevalence"][filename]) / root_count
return edge_prevalence
def create_graph_from_include_analysis(include_analysis: IncludeAnalysisOutput):
DG = nx.DiGraph()
files = include_analysis["files"]
# Add nodes and edges to the graph
# XXX - Unfortunately this is pretty slow, takes several minutes to add the edges
for idx, filename in enumerate(files):
DG.add_node(idx, filename=filename)
for include in include_analysis["includes"][filename]:
DG.add_edge(idx, files.index(include))
return DG
def get_include_analysis_edges_centrality(
include_analysis: IncludeAnalysisOutput, include_directories: List[str] = None
):
# Strip off the path prefix for generated file includes so matching will work
generated_file_prefix = re.compile(r"^(?:out/\w+/gen/)?(.*)$")
DG: nx.DiGraph = create_graph_from_include_analysis(include_analysis)
nodes_in = nx.in_degree_centrality(DG)
nodes_out = nx.out_degree_centrality(DG)
files = include_analysis["files"]
edges_centrality: DefaultDict[str, Dict[str, float]] = defaultdict(dict)
if include_directories is None:
include_directories = []
# Centrality is a metric for a node, but we want to create a metric for an edge.
# For the moment, this will use a herustic which combines the in-degree centrality
# of the node where the edge starts, and the out-degree centrality of the node the
# edge is pulling into the graph. This hopefully creates a metric which lets us find
# edges in commonly included nodes, which pull lots of nodes into the graph.
for idx, filename in enumerate(files):
for absolute_include in include_analysis["includes"][filename]:
includes = [absolute_include]
# If an include is in an include directory, strip that prefix and add it for matching
for include_directory in include_directories:
include_directory = include_directory if include_directory.endswith("/") else f"{include_directory}/"
if absolute_include.startswith(include_directory):
includes.append(absolute_include[len(include_directory) :])
for include in includes:
include = generated_file_prefix.match(include).group(1)
# Scale the value up so it's more human-friendly instead of having lots of leading zeroes
edges_centrality[filename][include] = 100000 * nodes_out[files.index(absolute_include)] * nodes_in[idx]
return edges_centrality