Coverage for rta_reconstruction/dl1_to_dl2.py: 75%

109 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-11 10:03 +0000

1"""DL1 to DL2 processing script.""" 

2 

3import argparse 

4import json 

5from pathlib import Path 

6from typing import Dict 

7 

8import astropy.units as u 

9import joblib 

10import numpy as np 

11import pandas as pd 

12from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor 

13 

14from rta_reconstruction.dl1_reader import ( 

15 convert_az_and_sin_az_angle_to_degree, 

16 filter_dl1, 

17 interpolate_missing_alt_az, 

18 read_dl1, 

19 read_telescope_optics, 

20) 

21from rta_reconstruction.dl2_io import init_dl2_file, write_dl2_df 

22from rta_reconstruction.image_displacement import ( 

23 disp_to_pos, 

24 disp_vector_pol2cart, 

25 update_disp_with_effective_focal_length, 

26) 

27from rta_reconstruction.utils.coordinates import camera_to_altaz, camera_to_shower_coordinates 

28from rta_reconstruction.utils.rta_argparse import EnumAction 

29from rta_reconstruction.utils.rta_logging import LOGGING_LEVELS, init_logging 

30 

31 

32# TODO: move somewhere else ? 

33def reco_source_position_sky(cog_x, cog_y, disp_dx, disp_dy, focal_length, pointing_alt, pointing_az): 

34 """ 

35 Compute the reconstructed source position in the sky 

36 

37 Parameters 

38 ---------- 

39 cog_x: `astropy.units.Quantity` 

40 cog_y: `astropy.units.Quantity` 

41 disp_dx: `astropy.units.Quantity` 

42 disp_dy: `astropy.units.Quantity` 

43 focal_length: `astropy.units.Quantity` 

44 pointing_alt: `astropy.units.Quantity` 

45 pointing_az: `astropy.units.Quantity` 

46 

47 Returns 

48 ------- 

49 sky frame: `astropy.coordinates.sky_coordinate.SkyCoord` 

50 """ 

51 src_x, src_y = disp_to_pos(disp_dx, disp_dy, cog_x, cog_y) 

52 return camera_to_altaz(src_x, src_y, focal_length, pointing_alt, pointing_az) 

53 

54 

55# TODO: use pydantic configuration instead of json to Dict 

56# TODO: use more generic type hints than specific class (sklearn regressor/classifier maybe ?) 

57# TODO: refactor in several steps to isolate the "predict" calls, to make it simpler to implement training 

58def dl1_to_dl2( 

59 dl1_df: pd.DataFrame, 

60 dl1_dl2_config: Dict, 

61 effective_focal_length, 

62 energy_regressor: RandomForestRegressor, 

63 gamma_classifier: RandomForestClassifier, 

64 image_displacement_vector_regressor: RandomForestRegressor | None = None, 

65 disp_norm_regressor: RandomForestRegressor | None = None, 

66 disp_sign_classifier: RandomForestClassifier | None = None, 

67) -> pd.DataFrame: 

68 # TODO: make very clear in doc that this is "inplace" operations 

69 # (this doesn't copy, simply we will add column to the df) 

70 dl2_df = dl1_df 

71 

72 # TODO: refactor model computation to individual functions (if surrounding operations are really required) 

73 

74 # TODO: is log energy or energy used by disp vector, disp norm or disp sign ? then need concatenate 

75 # to avoid copy ? Can't use "out" since predict doesn't support that API, could copy only vector of 

76 # size (#samples) instead of everything though 

77 # what could do, since fortran indexing may be best (TEST!?) 

78 # load everything as a fortran order rec-array (pre-allocated with dl2 columns.) 

79 # that way we can in with column names, and avoid pandas copy 

80 # pytables read has an "out" parameters, but it doesn't check the types between out and datadisk (only copies) 

81 dl2_df["log_reco_energy"] = energy_regressor.predict(dl2_df.loc[:, dl1_dl2_config["energy_regressor_features"]]) 

82 dl2_df["reco_energy"] = 10 ** dl2_df["log_reco_energy"] 

83 

84 if image_displacement_vector_regressor is not None: 84 ↛ 89line 84 didn't jump to line 89 because the condition on line 84 was always true

85 disp_vector = image_displacement_vector_regressor.predict( 

86 dl2_df.loc[:, dl1_dl2_config["disp_vector_regressor_features"]] 

87 ) 

88 else: 

89 dl2_df["reco_disp_norm"] = disp_norm_regressor.predict( 

90 dl2_df.loc[:, dl1_dl2_config["disp_norm_regressor_features"]] 

91 ) 

92 disp_sign_proba = disp_sign_classifier.predict_proba( 

93 dl2_df[:, dl1_dl2_config["disp_sign_classifier_features"]] 

94 ) 

95 # TODO: since we only care about something been gamma or not, 1 probability should be enough 

96 # we need to change the models for it to predict "gamma" or "not-gamma" with single proba. 

97 col = list(disp_sign_classifier.classes_).index(1) 

98 disp_sign = np.where(disp_sign_proba[:, col] > 0.5, 1, -1) 

99 dl2_df["reco_disp_sign"] = disp_sign 

100 dl2_df["reco_disp_sign_proba"] = disp_sign_proba[:, 0] 

101 

102 disp_vector = disp_vector_pol2cart(dl2_df["reco_disp_norm"], dl2_df["psi"], disp_sign) 

103 

104 dl2_df["reco_disp_dx"] = disp_vector[:, 0] 

105 dl2_df["reco_disp_dy"] = disp_vector[:, 1] 

106 dl2_df["reco_src_x"], dl2_df["reco_src_y"] = disp_to_pos( 

107 dl2_df.reco_disp_dx, 

108 dl2_df.reco_disp_dy, 

109 dl2_df.x, 

110 dl2_df.y, 

111 ) 

112 

113 # TODO: stack coordinates and project in // as in pyhiperta ? 

114 longi, _ = camera_to_shower_coordinates( 

115 dl2_df["reco_src_x"], dl2_df["reco_src_y"], dl2_df["x"], dl2_df["y"], dl2_df["psi"] 

116 ) 

117 

118 # TODO: check sign of longitudinal coordinate with HiPeRTA 

119 # TODO: is this required ? 

120 # Obtain the time gradient with sign relative to the reconstructed shower direction (reco_src_x, reco_src_y) 

121 # Defined positive if light arrival times increase with distance to it. Negative otherwise: 

122 dl2_df["signed_time_gradient"] = -1 * np.sign(longi) * dl2_df["time_gradient"] 

123 # Obtain skewness with sign relative to the reconstructed shower direction (reco_src_x, reco_src_y) 

124 # Defined on the major image axis; sign is such that it is typically positive for gammas: 

125 dl2_df["signed_skewness"] = -1 * np.sign(longi) * dl2_df["skewness"] 

126 

127 # TODO: what if we use simulations but still want to use the reconstructed alt az ? 

128 # TODO: better default value (nan ?) 

129 if "mc_alt_tel" in dl2_df.columns: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 alt_tel = dl2_df["mc_alt_tel"].values 

131 az_tel = dl2_df["mc_az_tel"].values 

132 elif "alt_tel" in dl2_df.columns: 132 ↛ 136line 132 didn't jump to line 136 because the condition on line 132 was always true

133 alt_tel = dl2_df["alt_tel"].values 

134 az_tel = dl2_df["az_tel"].values 

135 else: 

136 alt_tel = -np.pi / 2.0 * np.ones(len(dl2_df)) 

137 az_tel = -np.pi / 2.0 * np.ones(len(dl2_df)) 

138 

139 # TODO: this calls astropy coordinates changing routines, can it be optimized ? 

140 src_pos_reco = reco_source_position_sky( 

141 dl2_df.x.values * u.m, 

142 dl2_df.y.values * u.m, 

143 dl2_df.reco_disp_dx.values * u.m, 

144 dl2_df.reco_disp_dy.values * u.m, 

145 effective_focal_length, 

146 alt_tel * u.rad, 

147 az_tel * u.rad, 

148 ) 

149 dl2_df["reco_alt"] = src_pos_reco.alt.rad 

150 dl2_df["reco_az"] = src_pos_reco.az.rad 

151 

152 gammaness = gamma_classifier.predict_proba(dl2_df.loc[:, dl1_dl2_config["gamma_classifier_features"]]) 

153 

154 # TODO: replace this with a single proba predictor like disp sign, so no hardcoded class values! 

155 # gammaness is the prediction probability for the class 0 (proton: class 101) 

156 mc_type_gamma, mc_type_proton = 0, 101 

157 col = list(gamma_classifier.classes_).index(mc_type_gamma) 

158 dl2_df["gammaness"] = gammaness[:, col] 

159 dl2_df["reco_type"] = np.where(gammaness[:, col] > 0.5, mc_type_gamma, mc_type_proton) 

160 

161 return dl2_df 

162 

163 

164def dl1_dl2_arg_parser() -> argparse.ArgumentParser: 

165 parser = argparse.ArgumentParser( 

166 description="Stand-alone DL1 to DL2 processing from ctapipe-lstchain for Real Time Analysis", 

167 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

168 ) 

169 parser.add_argument( 

170 "--config", 

171 "-c", 

172 action="store", 

173 type=Path, 

174 dest="config_path", 

175 help="Path to the configuration file", 

176 required=True, 

177 ) 

178 parser.add_argument( 

179 "--energy_regressor", 

180 "-e", 

181 action="store", 

182 type=Path, 

183 required=True, 

184 dest="energy_regressor_model_path", 

185 help="Path to the energy regressor model (.sav)", 

186 ) 

187 parser.add_argument( 

188 "--gamma_classifier", 

189 "-g", 

190 action="store", 

191 type=Path, 

192 required=True, 

193 dest="gamma_classifier_path", 

194 help="Path to the gamma/hadron classifier model (.sav)", 

195 ) 

196 parser.add_argument( 

197 "--image_displacement_vector_regressor", 

198 "-v", 

199 action="store", 

200 type=Path, 

201 default=None, 

202 dest="disp_vector_regressor_model_path", 

203 help="Path to the image displacement vector regressor model (.sav). " 

204 "Either this argument must be specified, or `image_displacement_norm_regressor` and " 

205 "`image_displacement_sign_classifier`", 

206 ) 

207 parser.add_argument( 

208 "--image_displacement_norm_regressor", 

209 "-n", 

210 action="store", 

211 type=Path, 

212 default=None, 

213 dest="disp_norm_regressor_model_path", 

214 help="Path to the image displacement norm regressor model (.sav)" 

215 "Either this argument and `image_displacement_sign_classifier` must be specified, or " 

216 "`image_displacement_vector_regressor`", 

217 ) 

218 parser.add_argument( 

219 "--image_displacement_sign_classifier", 

220 "-s", 

221 action="store", 

222 type=Path, 

223 default=None, 

224 dest="disp_sign_classifier_model_path", 

225 help="Path to the image displacement sign classifier model (.sav)" 

226 "Either this argument and `image_displacement_norm_regressor` must be specified, or " 

227 "`image_displacement_vector_regressor`", 

228 ) 

229 parser.add_argument( 

230 "--input_dl1", 

231 "-i", 

232 type=Path, 

233 nargs="+", 

234 dest="dl1_file_paths", 

235 help="Path(s) to DL1 file(s) to process to DL2.", 

236 required=True, 

237 ) 

238 parser.add_argument( 

239 "--log_file", 

240 "-l", 

241 type=Path, 

242 dest="log_file_path", 

243 help="Path to a file where this process will log its execution", 

244 ) 

245 parser.add_argument( 

246 "--log_level", 

247 type=LOGGING_LEVELS, 

248 action=EnumAction, 

249 dest="logging_level", 

250 help="Logging level: all messages with a level severity above or equal the specified level will be logged.", 

251 default=LOGGING_LEVELS.WARNING, 

252 ) 

253 parser.add_argument( 

254 "--output_dl2", 

255 "-o", 

256 type=Path, 

257 nargs="+", 

258 dest="dl2_file_paths", 

259 help="Path(s) to DL2 file(s) to write the output.", 

260 required=True, 

261 ) 

262 

263 return parser 

264 

265 

266def main(): 

267 parser = dl1_dl2_arg_parser() 

268 args = parser.parse_args() 

269 

270 init_logging(log_header_str="RTA_DL1_DL2", log_filename=args.log_file_path, log_level=args.logging_level) 

271 

272 if len(args.dl1_file_paths) != len(args.dl2_file_paths): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 raise argparse.ArgumentTypeError( 

274 "The number {} of input dl1 files must match the number {} of output DL2 files.".format( 

275 len(args.dl1_file_paths), len(args.dl2_file_paths) 

276 ) 

277 ) 

278 

279 if args.disp_vector_regressor_model_path is not None and args.disp_norm_regressor_model_path is not None: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 raise argparse.ArgumentTypeError( 

281 "`image_displacement_vector_regressor` and `image_displacement_norm_regressor` are mutually exclusive arguments. " 

282 "Got {} and {}".format(args.disp_vector_regressor_model_path, args.disp_norm_regressor_model_path) 

283 ) 

284 if args.disp_vector_regressor_model_path is not None and args.disp_sign_classifier_model_path is not None: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 raise argparse.ArgumentTypeError( 

286 "`image_displacement_vector_regressor` and `image_displacement_sign_classifier` are mutually exclusive arguments. " 

287 "Got {} and {}".format(args.disp_vector_regressor_model_path, args.disp_sign_classifier_model_path) 

288 ) 

289 if ( 289 ↛ 294line 289 didn't jump to line 294 because the condition on line 289 was never true

290 args.disp_vector_regressor_model_path is None 

291 and args.disp_norm_regressor_model_path is None 

292 and args.disp_sign_classifier_model_path is None 

293 ): 

294 raise argparse.ArgumentTypeError( 

295 "Either `image_displacement_vector_regressor` or `image_displacement_norm_regressor` and " 

296 "`image_displacement_sign_classifier` must be specified, got none of them." 

297 ) 

298 if (args.disp_norm_regressor_model_path is not None and args.disp_sign_classifier_model_path is None) or ( 298 ↛ 301line 298 didn't jump to line 301 because the condition on line 298 was never true

299 args.disp_norm_regressor_model_path is None and args.disp_sign_classifier_model_path is not None 

300 ): 

301 raise argparse.ArgumentError( 

302 "`image_displacement_vector_regressor` and `image_displacement_sign_classifier` must both be specified if used. Got {} and {}".format( 

303 args.disp_norm_regressor_model_path, args.disp_sign_classifier_model_path 

304 ) 

305 ) 

306 

307 with open(args.config_path, "r") as config_f: 

308 config = json.load(config_f) 

309 

310 energy_regressor = joblib.load(args.energy_regressor_model_path) 

311 gamma_classifier = joblib.load(args.gamma_classifier_path) 

312 all_features = config["energy_regressor_features"] + config["gamma_classifier_features"] 

313 

314 using_disp_vector = args.disp_vector_regressor_model_path is not None 

315 if using_disp_vector: 315 ↛ 321line 315 didn't jump to line 321 because the condition on line 315 was always true

316 disp_vector_regressor = joblib.load(args.disp_vector_regressor_model_path) 

317 disp_norm_regressor = None 

318 disp_sign_classifier = None 

319 all_features += config["disp_vector_regressor_features"] 

320 else: 

321 disp_vector_regressor = None 

322 disp_norm_regressor = joblib.load(args.disp_norm_regressor_model_path) 

323 disp_sign_classifier = joblib.load(args.disp_sign_classifier_model_path) 

324 all_features += config["disp_norm_regressor_features"] + config["disp_sign_classifier_features"] 

325 

326 for dl1_file_path, dl2_file_path in zip(args.dl1_file_paths, args.dl2_file_paths): 

327 # TODO: read dl1 straight in a numpy array (avoid pandas to avoid copy before passing to sklearn) 

328 # should be Fortran order ? -> benchmark 

329 dl1_df = read_dl1(dl1_file_path) # TODO: make table "key" configurable 

330 interpolate_missing_alt_az(dl1_df) # TODO: required after dl1 alt-az maker ? what about simple dropna 

331 tel_optics = read_telescope_optics(dl1_file_path) # TODO: make sure correct 

332 convert_az_and_sin_az_angle_to_degree( 

333 dl1_df 

334 ) # TODO: is this required ? (angles come from arctan2, and sin is not used ?) 

335 dl1_df = filter_dl1( 

336 dl1_df, filters=config["events_filters"], finite_params=all_features 

337 ) # TODO: config ? (filter events based on column names, filters, ~isnan, etc.) 

338 

339 # TODO: is this required ? 

340 # Update parameters related to target direction on camera frame for MC data 

341 # taking into account of the aberration effect using effective focal length 

342 if "disp_norm" in dl1_df.columns: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true

343 update_disp_with_effective_focal_length(dl1_df, effective_focal_length=tel_optics.effective_focal_length) 

344 

345 # TODO: make dl1_to_dl2 not return anything - inplace operations in the df 

346 dl2_df = dl1_to_dl2( 

347 dl1_df, 

348 config, 

349 tel_optics.effective_focal_length, 

350 energy_regressor, 

351 gamma_classifier, 

352 disp_vector_regressor, 

353 disp_norm_regressor, 

354 disp_sign_classifier, 

355 ) 

356 

357 init_dl2_file(dl2_file_path, dl1_file_path) 

358 write_dl2_df( 

359 dl2_file_path, 

360 dl2_df, 

361 attributes={"config": config}, 

362 ) 

363 

364 

365if __name__ == "__main__": 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true

366 main()