Coverage for rta_reconstruction/dl1_to_dl2.py: 75%
109 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-11 10:03 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-11 10:03 +0000
1"""DL1 to DL2 processing script."""
3import argparse
4import json
5from pathlib import Path
6from typing import Dict
8import astropy.units as u
9import joblib
10import numpy as np
11import pandas as pd
12from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
14from rta_reconstruction.dl1_reader import (
15 convert_az_and_sin_az_angle_to_degree,
16 filter_dl1,
17 interpolate_missing_alt_az,
18 read_dl1,
19 read_telescope_optics,
20)
21from rta_reconstruction.dl2_io import init_dl2_file, write_dl2_df
22from rta_reconstruction.image_displacement import (
23 disp_to_pos,
24 disp_vector_pol2cart,
25 update_disp_with_effective_focal_length,
26)
27from rta_reconstruction.utils.coordinates import camera_to_altaz, camera_to_shower_coordinates
28from rta_reconstruction.utils.rta_argparse import EnumAction
29from rta_reconstruction.utils.rta_logging import LOGGING_LEVELS, init_logging
32# TODO: move somewhere else ?
33def reco_source_position_sky(cog_x, cog_y, disp_dx, disp_dy, focal_length, pointing_alt, pointing_az):
34 """
35 Compute the reconstructed source position in the sky
37 Parameters
38 ----------
39 cog_x: `astropy.units.Quantity`
40 cog_y: `astropy.units.Quantity`
41 disp_dx: `astropy.units.Quantity`
42 disp_dy: `astropy.units.Quantity`
43 focal_length: `astropy.units.Quantity`
44 pointing_alt: `astropy.units.Quantity`
45 pointing_az: `astropy.units.Quantity`
47 Returns
48 -------
49 sky frame: `astropy.coordinates.sky_coordinate.SkyCoord`
50 """
51 src_x, src_y = disp_to_pos(disp_dx, disp_dy, cog_x, cog_y)
52 return camera_to_altaz(src_x, src_y, focal_length, pointing_alt, pointing_az)
55# TODO: use pydantic configuration instead of json to Dict
56# TODO: use more generic type hints than specific class (sklearn regressor/classifier maybe ?)
57# TODO: refactor in several steps to isolate the "predict" calls, to make it simpler to implement training
58def dl1_to_dl2(
59 dl1_df: pd.DataFrame,
60 dl1_dl2_config: Dict,
61 effective_focal_length,
62 energy_regressor: RandomForestRegressor,
63 gamma_classifier: RandomForestClassifier,
64 image_displacement_vector_regressor: RandomForestRegressor | None = None,
65 disp_norm_regressor: RandomForestRegressor | None = None,
66 disp_sign_classifier: RandomForestClassifier | None = None,
67) -> pd.DataFrame:
68 # TODO: make very clear in doc that this is "inplace" operations
69 # (this doesn't copy, simply we will add column to the df)
70 dl2_df = dl1_df
72 # TODO: refactor model computation to individual functions (if surrounding operations are really required)
74 # TODO: is log energy or energy used by disp vector, disp norm or disp sign ? then need concatenate
75 # to avoid copy ? Can't use "out" since predict doesn't support that API, could copy only vector of
76 # size (#samples) instead of everything though
77 # what could do, since fortran indexing may be best (TEST!?)
78 # load everything as a fortran order rec-array (pre-allocated with dl2 columns.)
79 # that way we can in with column names, and avoid pandas copy
80 # pytables read has an "out" parameters, but it doesn't check the types between out and datadisk (only copies)
81 dl2_df["log_reco_energy"] = energy_regressor.predict(dl2_df.loc[:, dl1_dl2_config["energy_regressor_features"]])
82 dl2_df["reco_energy"] = 10 ** dl2_df["log_reco_energy"]
84 if image_displacement_vector_regressor is not None: 84 ↛ 89line 84 didn't jump to line 89 because the condition on line 84 was always true
85 disp_vector = image_displacement_vector_regressor.predict(
86 dl2_df.loc[:, dl1_dl2_config["disp_vector_regressor_features"]]
87 )
88 else:
89 dl2_df["reco_disp_norm"] = disp_norm_regressor.predict(
90 dl2_df.loc[:, dl1_dl2_config["disp_norm_regressor_features"]]
91 )
92 disp_sign_proba = disp_sign_classifier.predict_proba(
93 dl2_df[:, dl1_dl2_config["disp_sign_classifier_features"]]
94 )
95 # TODO: since we only care about something been gamma or not, 1 probability should be enough
96 # we need to change the models for it to predict "gamma" or "not-gamma" with single proba.
97 col = list(disp_sign_classifier.classes_).index(1)
98 disp_sign = np.where(disp_sign_proba[:, col] > 0.5, 1, -1)
99 dl2_df["reco_disp_sign"] = disp_sign
100 dl2_df["reco_disp_sign_proba"] = disp_sign_proba[:, 0]
102 disp_vector = disp_vector_pol2cart(dl2_df["reco_disp_norm"], dl2_df["psi"], disp_sign)
104 dl2_df["reco_disp_dx"] = disp_vector[:, 0]
105 dl2_df["reco_disp_dy"] = disp_vector[:, 1]
106 dl2_df["reco_src_x"], dl2_df["reco_src_y"] = disp_to_pos(
107 dl2_df.reco_disp_dx,
108 dl2_df.reco_disp_dy,
109 dl2_df.x,
110 dl2_df.y,
111 )
113 # TODO: stack coordinates and project in // as in pyhiperta ?
114 longi, _ = camera_to_shower_coordinates(
115 dl2_df["reco_src_x"], dl2_df["reco_src_y"], dl2_df["x"], dl2_df["y"], dl2_df["psi"]
116 )
118 # TODO: check sign of longitudinal coordinate with HiPeRTA
119 # TODO: is this required ?
120 # Obtain the time gradient with sign relative to the reconstructed shower direction (reco_src_x, reco_src_y)
121 # Defined positive if light arrival times increase with distance to it. Negative otherwise:
122 dl2_df["signed_time_gradient"] = -1 * np.sign(longi) * dl2_df["time_gradient"]
123 # Obtain skewness with sign relative to the reconstructed shower direction (reco_src_x, reco_src_y)
124 # Defined on the major image axis; sign is such that it is typically positive for gammas:
125 dl2_df["signed_skewness"] = -1 * np.sign(longi) * dl2_df["skewness"]
127 # TODO: what if we use simulations but still want to use the reconstructed alt az ?
128 # TODO: better default value (nan ?)
129 if "mc_alt_tel" in dl2_df.columns: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 alt_tel = dl2_df["mc_alt_tel"].values
131 az_tel = dl2_df["mc_az_tel"].values
132 elif "alt_tel" in dl2_df.columns: 132 ↛ 136line 132 didn't jump to line 136 because the condition on line 132 was always true
133 alt_tel = dl2_df["alt_tel"].values
134 az_tel = dl2_df["az_tel"].values
135 else:
136 alt_tel = -np.pi / 2.0 * np.ones(len(dl2_df))
137 az_tel = -np.pi / 2.0 * np.ones(len(dl2_df))
139 # TODO: this calls astropy coordinates changing routines, can it be optimized ?
140 src_pos_reco = reco_source_position_sky(
141 dl2_df.x.values * u.m,
142 dl2_df.y.values * u.m,
143 dl2_df.reco_disp_dx.values * u.m,
144 dl2_df.reco_disp_dy.values * u.m,
145 effective_focal_length,
146 alt_tel * u.rad,
147 az_tel * u.rad,
148 )
149 dl2_df["reco_alt"] = src_pos_reco.alt.rad
150 dl2_df["reco_az"] = src_pos_reco.az.rad
152 gammaness = gamma_classifier.predict_proba(dl2_df.loc[:, dl1_dl2_config["gamma_classifier_features"]])
154 # TODO: replace this with a single proba predictor like disp sign, so no hardcoded class values!
155 # gammaness is the prediction probability for the class 0 (proton: class 101)
156 mc_type_gamma, mc_type_proton = 0, 101
157 col = list(gamma_classifier.classes_).index(mc_type_gamma)
158 dl2_df["gammaness"] = gammaness[:, col]
159 dl2_df["reco_type"] = np.where(gammaness[:, col] > 0.5, mc_type_gamma, mc_type_proton)
161 return dl2_df
164def dl1_dl2_arg_parser() -> argparse.ArgumentParser:
165 parser = argparse.ArgumentParser(
166 description="Stand-alone DL1 to DL2 processing from ctapipe-lstchain for Real Time Analysis",
167 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
168 )
169 parser.add_argument(
170 "--config",
171 "-c",
172 action="store",
173 type=Path,
174 dest="config_path",
175 help="Path to the configuration file",
176 required=True,
177 )
178 parser.add_argument(
179 "--energy_regressor",
180 "-e",
181 action="store",
182 type=Path,
183 required=True,
184 dest="energy_regressor_model_path",
185 help="Path to the energy regressor model (.sav)",
186 )
187 parser.add_argument(
188 "--gamma_classifier",
189 "-g",
190 action="store",
191 type=Path,
192 required=True,
193 dest="gamma_classifier_path",
194 help="Path to the gamma/hadron classifier model (.sav)",
195 )
196 parser.add_argument(
197 "--image_displacement_vector_regressor",
198 "-v",
199 action="store",
200 type=Path,
201 default=None,
202 dest="disp_vector_regressor_model_path",
203 help="Path to the image displacement vector regressor model (.sav). "
204 "Either this argument must be specified, or `image_displacement_norm_regressor` and "
205 "`image_displacement_sign_classifier`",
206 )
207 parser.add_argument(
208 "--image_displacement_norm_regressor",
209 "-n",
210 action="store",
211 type=Path,
212 default=None,
213 dest="disp_norm_regressor_model_path",
214 help="Path to the image displacement norm regressor model (.sav)"
215 "Either this argument and `image_displacement_sign_classifier` must be specified, or "
216 "`image_displacement_vector_regressor`",
217 )
218 parser.add_argument(
219 "--image_displacement_sign_classifier",
220 "-s",
221 action="store",
222 type=Path,
223 default=None,
224 dest="disp_sign_classifier_model_path",
225 help="Path to the image displacement sign classifier model (.sav)"
226 "Either this argument and `image_displacement_norm_regressor` must be specified, or "
227 "`image_displacement_vector_regressor`",
228 )
229 parser.add_argument(
230 "--input_dl1",
231 "-i",
232 type=Path,
233 nargs="+",
234 dest="dl1_file_paths",
235 help="Path(s) to DL1 file(s) to process to DL2.",
236 required=True,
237 )
238 parser.add_argument(
239 "--log_file",
240 "-l",
241 type=Path,
242 dest="log_file_path",
243 help="Path to a file where this process will log its execution",
244 )
245 parser.add_argument(
246 "--log_level",
247 type=LOGGING_LEVELS,
248 action=EnumAction,
249 dest="logging_level",
250 help="Logging level: all messages with a level severity above or equal the specified level will be logged.",
251 default=LOGGING_LEVELS.WARNING,
252 )
253 parser.add_argument(
254 "--output_dl2",
255 "-o",
256 type=Path,
257 nargs="+",
258 dest="dl2_file_paths",
259 help="Path(s) to DL2 file(s) to write the output.",
260 required=True,
261 )
263 return parser
266def main():
267 parser = dl1_dl2_arg_parser()
268 args = parser.parse_args()
270 init_logging(log_header_str="RTA_DL1_DL2", log_filename=args.log_file_path, log_level=args.logging_level)
272 if len(args.dl1_file_paths) != len(args.dl2_file_paths): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 raise argparse.ArgumentTypeError(
274 "The number {} of input dl1 files must match the number {} of output DL2 files.".format(
275 len(args.dl1_file_paths), len(args.dl2_file_paths)
276 )
277 )
279 if args.disp_vector_regressor_model_path is not None and args.disp_norm_regressor_model_path is not None: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 raise argparse.ArgumentTypeError(
281 "`image_displacement_vector_regressor` and `image_displacement_norm_regressor` are mutually exclusive arguments. "
282 "Got {} and {}".format(args.disp_vector_regressor_model_path, args.disp_norm_regressor_model_path)
283 )
284 if args.disp_vector_regressor_model_path is not None and args.disp_sign_classifier_model_path is not None: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 raise argparse.ArgumentTypeError(
286 "`image_displacement_vector_regressor` and `image_displacement_sign_classifier` are mutually exclusive arguments. "
287 "Got {} and {}".format(args.disp_vector_regressor_model_path, args.disp_sign_classifier_model_path)
288 )
289 if ( 289 ↛ 294line 289 didn't jump to line 294 because the condition on line 289 was never true
290 args.disp_vector_regressor_model_path is None
291 and args.disp_norm_regressor_model_path is None
292 and args.disp_sign_classifier_model_path is None
293 ):
294 raise argparse.ArgumentTypeError(
295 "Either `image_displacement_vector_regressor` or `image_displacement_norm_regressor` and "
296 "`image_displacement_sign_classifier` must be specified, got none of them."
297 )
298 if (args.disp_norm_regressor_model_path is not None and args.disp_sign_classifier_model_path is None) or ( 298 ↛ 301line 298 didn't jump to line 301 because the condition on line 298 was never true
299 args.disp_norm_regressor_model_path is None and args.disp_sign_classifier_model_path is not None
300 ):
301 raise argparse.ArgumentError(
302 "`image_displacement_vector_regressor` and `image_displacement_sign_classifier` must both be specified if used. Got {} and {}".format(
303 args.disp_norm_regressor_model_path, args.disp_sign_classifier_model_path
304 )
305 )
307 with open(args.config_path, "r") as config_f:
308 config = json.load(config_f)
310 energy_regressor = joblib.load(args.energy_regressor_model_path)
311 gamma_classifier = joblib.load(args.gamma_classifier_path)
312 all_features = config["energy_regressor_features"] + config["gamma_classifier_features"]
314 using_disp_vector = args.disp_vector_regressor_model_path is not None
315 if using_disp_vector: 315 ↛ 321line 315 didn't jump to line 321 because the condition on line 315 was always true
316 disp_vector_regressor = joblib.load(args.disp_vector_regressor_model_path)
317 disp_norm_regressor = None
318 disp_sign_classifier = None
319 all_features += config["disp_vector_regressor_features"]
320 else:
321 disp_vector_regressor = None
322 disp_norm_regressor = joblib.load(args.disp_norm_regressor_model_path)
323 disp_sign_classifier = joblib.load(args.disp_sign_classifier_model_path)
324 all_features += config["disp_norm_regressor_features"] + config["disp_sign_classifier_features"]
326 for dl1_file_path, dl2_file_path in zip(args.dl1_file_paths, args.dl2_file_paths):
327 # TODO: read dl1 straight in a numpy array (avoid pandas to avoid copy before passing to sklearn)
328 # should be Fortran order ? -> benchmark
329 dl1_df = read_dl1(dl1_file_path) # TODO: make table "key" configurable
330 interpolate_missing_alt_az(dl1_df) # TODO: required after dl1 alt-az maker ? what about simple dropna
331 tel_optics = read_telescope_optics(dl1_file_path) # TODO: make sure correct
332 convert_az_and_sin_az_angle_to_degree(
333 dl1_df
334 ) # TODO: is this required ? (angles come from arctan2, and sin is not used ?)
335 dl1_df = filter_dl1(
336 dl1_df, filters=config["events_filters"], finite_params=all_features
337 ) # TODO: config ? (filter events based on column names, filters, ~isnan, etc.)
339 # TODO: is this required ?
340 # Update parameters related to target direction on camera frame for MC data
341 # taking into account of the aberration effect using effective focal length
342 if "disp_norm" in dl1_df.columns: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true
343 update_disp_with_effective_focal_length(dl1_df, effective_focal_length=tel_optics.effective_focal_length)
345 # TODO: make dl1_to_dl2 not return anything - inplace operations in the df
346 dl2_df = dl1_to_dl2(
347 dl1_df,
348 config,
349 tel_optics.effective_focal_length,
350 energy_regressor,
351 gamma_classifier,
352 disp_vector_regressor,
353 disp_norm_regressor,
354 disp_sign_classifier,
355 )
357 init_dl2_file(dl2_file_path, dl1_file_path)
358 write_dl2_df(
359 dl2_file_path,
360 dl2_df,
361 attributes={"config": config},
362 )
365if __name__ == "__main__": 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true
366 main()