-
Notifications
You must be signed in to change notification settings - Fork 1
/
validation.py
261 lines (232 loc) · 11.3 KB
/
validation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
#!/usr/bin/env python3
"""
Validate a PV forecast against PV_Live.
- Vlad Bondarenko <[email protected]>
- Jamie Taylor <[email protected]>
- First Authored: 2020-03-13
"""
from datetime import timedelta, time
import pandas as pd
import numpy as np
from pvlive_api import PVLive
from helper import shift_n_days
class Validation:
def __init__(self, logger):
self.logger = logger
self.pvlive = PVLive()
self.pes_list = self.pvlive._get_pes_list()
self.gsp_list = self.pvlive._get_gsp_list().merge(self.pes_list, how="left", on=("pes_id"))
def run_validation(self, entity_type, entity_ids, forecast, min_yield=0.01, val="full"):
"""
Runs full validation for all entity_ids
Parameters
----------
entity_type: str
Either "pes" or "gsp".
entity_ids: list
A list of Entity IDs for which forecast data will be provided
forecast: pd.DataFrame, optional
Forecast data to be validated with index | entity_ids | Datetime | Horizon |
min_yield: float
Cut out all the values with yield below min_yield
val: {"full", "fast"}
Perform either full validation or fast returning only one value
Returns
-------
val_data: dict
Dictionary with all of the validation results ready for plotting
"""
val_data = {"data": {}}
display_data = val_data["data"]
for entity_id in entity_ids:
entity_name = self.__get_entity_name(entity_type, entity_id)
entity_id_str = f"{entity_type.upper()} {entity_id} ({entity_name})"
display_data[entity_id_str] = dict()
val_data[entity_id_str] = dict()
try:
entity_forecast = forecast.loc[entity_id]
except KeyError:
entity_forecast = forecast.loc[:, entity_id]
data = self.get_data(entity_forecast, entity_type, entity_id, min_yield)
val_data[entity_id_str]["wmape"] = self.wmape(data["forecast"], data["actual"], data["cap"]).mean()
val_data[entity_id_str]["r_squared"] = self.r_squared(data["forecast"], data["actual"])
if val == "full":
display_data[entity_id_str]["intraday"] = dict()
display_data[entity_id_str]["day1"] = dict()
horizons = data.index.get_level_values(1)
dt = data.index.get_level_values(0)
for base in dt.hour.unique():
base_str = str(time(hour=base))[:5]
intraday = data[(dt.hour == base) & (horizons.isin(np.arange(0, 48 - base)))]
dayp1 = data[(dt.hour == base) & (horizons.isin(np.arange(48 - (base * 2), 96 - (base * 2))))]
period_names = ["intraday", "day1"]
for i, period_data in enumerate([intraday, dayp1]):
display_data[entity_id_str][period_names[i]][base_str] = dict()
display_period = display_data[entity_id_str][period_names[i]][base_str]
pred, actual, cap = period_data["forecast"], period_data["actual"], period_data["cap"]
pred_u, actual_u, cap_u = pred.unstack(), actual.unstack(), cap.unstack()
errors = pd.DataFrame(index=pred_u.index)
errors["mape"] = self.wmape(pred_u, actual_u, cap_u, axis=1)
errors["r_squared"] = self.r_squared(pred, actual)
display_period["r_squared"] = self.r_squared(pred.reset_index().forecast,
actual.reset_index().actual)
display_period["mape"] = errors["mape"].values.tolist()
display_period["actual"] = actual.values.tolist()
display_period["predicted"] = pred.values.tolist()
display_period["heatmap"] = self.calc_heatmap(pred, actual, cap)
return val_data
def __get_entity_name(self, entity_type, entity_id):
"""Get the name of a given entity."""
self.logger.debug("Getting entity name for %s %s", entity_type, entity_id)
if entity_type.lower() in ("national", "pes"):
return self.pes_list.query(f"`pes_id`=={entity_id}").pes_name.values[0]
elif entity_type.lower() == "gsp":
return gsp_list.query(f"`gsp_id`=={entity_id}")\
.apply(lambda x: f"{x.gsp_name} ({x.pes_name})", axis=1)\
.values[0]
else:
raise ValueError("entity_type must be 'national', 'pes' or 'gsp'")
def get_data(self, forecast, entity_type, entity_id, min_yield):
"""
Get pvlive data, fixes indexes to be same and keeps day values
Parameters
----------
forecast: pd.DataFrame
A Pandas DataFrame with index | DateTime | Horizon |
entity_type: str
Entity type for which the forecast has been produced.
entity_id: int
Entity ID for which the forecast has been produced.
min_yield: float
Cut out all the values with yield below provided
Returns
-------
df: pd.DataFrame
DataFrame with actual and forecast data
"""
gen, cap = self.get_pvlive_data(forecast.index, entity_type, entity_id)
gen.dropna(inplace=True)
forecast = forecast.reindex(gen.index).dropna()
cap = cap.reindex(gen.index).dropna()
day_values = (gen.values.flatten() / cap.values.flatten()) > min_yield
df = forecast[day_values].copy()
if not isinstance(df, pd.DataFrame):
df = df.to_frame()
df.columns = ["forecast"]
df["actual"] = gen.loc[day_values]
df["cap"] = cap[day_values]
return df
def get_pvlive_data(self, forecast_index, entity_type, entity_id):
"""
Extract pvlive data from api
Parameters
----------
forecast_index: pd.MultiIndex
A Pandas index | DateTime | Horizon | from the forecast data
entity_type: int
Entity type to get the pv data from.
entity_id: int
Entity ID to get the pv data from.
Returns
-------
pv_data
"""
datetimes = forecast_index.get_level_values(0).unique()
horizons = forecast_index.get_level_values(1).unique()
start = datetimes[0]
end = datetimes[-1] + timedelta(hours=73)
self.logger.debug(f"Fetching PV_Live data: {start}, {end}, {entity_type}, {entity_id}")
entity_type_ = "pes" if entity_type == "national" else entity_type
pvlive_data = self.pvlive.between(start, end, entity_type=entity_type_, entity_id=entity_id,
extra_fields="installedcapacity_mwp", dataframe=True)
pvlive_data.sort_values(by=[f"{entity_type_}_id", "datetime_gmt"], inplace=True)
pvlive_df = pvlive_data.rename(columns={"generation_mw": "gen", "installedcapacity_mwp": "cap"})
pvlive_df.index = pd.to_datetime(pvlive_df["datetime_gmt"])
pvlive_df.drop(columns=["datetime_gmt", f"{entity_type_}_id"], inplace=True)
pv_gen = shift_n_days(pvlive_df["gen"].values.reshape(-1, 1), horizons[0], horizons[-1]+1, reverse=True)
pv_gen = pd.DataFrame(data=pv_gen, index=pd.to_datetime(pvlive_df.index))\
.drop(columns=0).stack().reindex(forecast_index)
pv_cap = shift_n_days(pvlive_df["cap"].values.reshape(-1, 1), horizons[0], horizons[-1] + 1, reverse=True)
pv_cap = pd.DataFrame(data=pv_cap, index=pd.to_datetime(pvlive_df.index))\
.drop(columns=0).stack().reindex(forecast_index)
return pv_gen, pv_cap
@staticmethod
def r_squared(predictions, actuals):
r"""
Calculate the coefficient of determination (a.k.a R-Squared) [1]_.
Parameters
----------
`predictions` : numpy array of floats
Predictions being tested.
`actuals`: numpy array of floats
Actual values corresponding to `predictions`. Must be same size as `predictions`.
Returns
-------
float
Coefficient of determination.
Notes
-----
.. math::
\begin{align*}
y=Actuals,\quad f&=Predictions,\quad \bar{y}=\frac{1}{n}\sum_{i=1}^n{y_i}\\
SS_{tot}&=\sum_i{(y_i-\bar{y})^2}\\
SS_{res}&=\sum_i{(y_i-f_i)^2}\\
R^2&=1-\frac{SS_{res}}{SS_{tot}}
\end{align*}
References
----------
.. [1] https://en.wikipedia.org/wiki/Coefficient_of_determination
"""
mean_actual = np.mean(actuals)
ss_tot = np.sum(np.power(actuals - mean_actual, 2))
ss_res = np.sum(np.power(actuals - predictions, 2))
r_sqr = 1 - ss_res / ss_tot
return np.squeeze(r_sqr)
@staticmethod
def wmape(predictions, actuals, norms=None, weights=None, axis=0):
r"""
Calculate the weighted Mean Absolute Percent Error (MAPE).
Parameters
----------
`predictions` : numpy array of floats
Predictions being tested.
`actuals` : numpy array of floats
Actual values corresponding to `predictions`. Must be same size as `predictions`.
`norms` : numpy array of floats
Normalisation values. Must be same size as `predictions`. Default is to use `actuals`.
`weights` : numpy array of floats
Weighting values. Must be same size as `predictions`. Default is to use `actuals`.
Returns
-------
float
wMAPE.
Notes
-----
.. math::
\begin{gathered}
y=Actuals,\quad f=Predictions,\quad n=Normalisations,\quad w=Weights\\
\mathit{wMAPE}=
\frac{\sum_i{w_i\times\mathrm{abs}\left(\frac{f_i-y_i}{n_i}\right)\times100\%}}{\sum_i{w_i}}
\end{gathered}
"""
norms = actuals if norms is None else norms
weights = actuals if weights is None else weights
mapes = np.abs((predictions - actuals) / norms) * 100.
if axis == 1:
wmape = np.sum((weights * mapes), axis=axis) / np.sum(weights, axis=axis)
else:
wmape = mapes
return wmape
def calc_heatmap(self, forecast, actuals, capacity):
heatmap = dict()
mapes = self.wmape(forecast, actuals, capacity, axis=0).to_frame()
times = mapes.index.get_level_values(0) + pd.to_timedelta(30*mapes.index.get_level_values(1), "m")
mapes["month"] = times.month
mapes["time"] = [t[:5] for t in times.time.astype(str)]
mapes.set_index(["month", "time"], inplace=True, drop=True)
mapes_mean = mapes.groupby(["month", "time"]).mean().squeeze()
heatmap_df = mapes_mean.unstack().fillna(0)
heatmap["xlabels"] = heatmap_df.columns.values.tolist()
heatmap["ylabels"] = heatmap_df.index.values.tolist()
heatmap["values"] = heatmap_df.values.tolist()
return heatmap