"""DL1 to DL2 processing script."""
import argparse
import json
from pathlib import Path
from typing import Dict
import astropy.units as u
import joblib
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from rta_reconstruction.dl1_reader import (
convert_az_and_sin_az_angle_to_degree,
filter_dl1,
interpolate_missing_alt_az,
read_dl1,
read_telescope_optics,
)
from rta_reconstruction.dl2_io import init_dl2_file, write_dl2_df
from rta_reconstruction.image_displacement import (
disp_to_pos,
disp_vector_pol2cart,
update_disp_with_effective_focal_length,
)
from rta_reconstruction.utils.coordinates import camera_to_altaz, camera_to_shower_coordinates
from rta_reconstruction.utils.rta_argparse import EnumAction
from rta_reconstruction.utils.rta_logging import LOGGING_LEVELS, init_logging
# TODO: move somewhere else ?
[docs]
def reco_source_position_sky(cog_x, cog_y, disp_dx, disp_dy, focal_length, pointing_alt, pointing_az):
"""
Compute the reconstructed source position in the sky
Parameters
----------
cog_x: `astropy.units.Quantity`
cog_y: `astropy.units.Quantity`
disp_dx: `astropy.units.Quantity`
disp_dy: `astropy.units.Quantity`
focal_length: `astropy.units.Quantity`
pointing_alt: `astropy.units.Quantity`
pointing_az: `astropy.units.Quantity`
Returns
-------
sky frame: `astropy.coordinates.sky_coordinate.SkyCoord`
"""
src_x, src_y = disp_to_pos(disp_dx, disp_dy, cog_x, cog_y)
return camera_to_altaz(src_x, src_y, focal_length, pointing_alt, pointing_az)
# TODO: use pydantic configuration instead of json to Dict
# TODO: use more generic type hints than specific class (sklearn regressor/classifier maybe ?)
# TODO: refactor in several steps to isolate the "predict" calls, to make it simpler to implement training
[docs]
def dl1_to_dl2(
dl1_df: pd.DataFrame,
dl1_dl2_config: Dict,
effective_focal_length,
energy_regressor: RandomForestRegressor,
gamma_classifier: RandomForestClassifier,
image_displacement_vector_regressor: RandomForestRegressor | None = None,
disp_norm_regressor: RandomForestRegressor | None = None,
disp_sign_classifier: RandomForestClassifier | None = None,
) -> pd.DataFrame:
# TODO: make very clear in doc that this is "inplace" operations
# (this doesn't copy, simply we will add column to the df)
dl2_df = dl1_df
# TODO: refactor model computation to individual functions (if surrounding operations are really required)
# TODO: is log energy or energy used by disp vector, disp norm or disp sign ? then need concatenate
# to avoid copy ? Can't use "out" since predict doesn't support that API, could copy only vector of
# size (#samples) instead of everything though
# what could do, since fortran indexing may be best (TEST!?)
# load everything as a fortran order rec-array (pre-allocated with dl2 columns.)
# that way we can in with column names, and avoid pandas copy
# pytables read has an "out" parameters, but it doesn't check the types between out and datadisk (only copies)
dl2_df["log_reco_energy"] = energy_regressor.predict(dl2_df.loc[:, dl1_dl2_config["energy_regressor_features"]])
dl2_df["reco_energy"] = 10 ** dl2_df["log_reco_energy"]
if image_displacement_vector_regressor is not None:
disp_vector = image_displacement_vector_regressor.predict(
dl2_df.loc[:, dl1_dl2_config["disp_vector_regressor_features"]]
)
else:
dl2_df["reco_disp_norm"] = disp_norm_regressor.predict(
dl2_df.loc[:, dl1_dl2_config["disp_norm_regressor_features"]]
)
disp_sign_proba = disp_sign_classifier.predict_proba(
dl2_df[:, dl1_dl2_config["disp_sign_classifier_features"]]
)
# TODO: since we only care about something been gamma or not, 1 probability should be enough
# we need to change the models for it to predict "gamma" or "not-gamma" with single proba.
col = list(disp_sign_classifier.classes_).index(1)
disp_sign = np.where(disp_sign_proba[:, col] > 0.5, 1, -1)
dl2_df["reco_disp_sign"] = disp_sign
dl2_df["reco_disp_sign_proba"] = disp_sign_proba[:, 0]
disp_vector = disp_vector_pol2cart(dl2_df["reco_disp_norm"], dl2_df["psi"], disp_sign)
dl2_df["reco_disp_dx"] = disp_vector[:, 0]
dl2_df["reco_disp_dy"] = disp_vector[:, 1]
dl2_df["reco_src_x"], dl2_df["reco_src_y"] = disp_to_pos(
dl2_df.reco_disp_dx,
dl2_df.reco_disp_dy,
dl2_df.x,
dl2_df.y,
)
# TODO: stack coordinates and project in // as in pyhiperta ?
longi, _ = camera_to_shower_coordinates(
dl2_df["reco_src_x"], dl2_df["reco_src_y"], dl2_df["x"], dl2_df["y"], dl2_df["psi"]
)
# TODO: check sign of longitudinal coordinate with HiPeRTA
# TODO: is this required ?
# Obtain the time gradient with sign relative to the reconstructed shower direction (reco_src_x, reco_src_y)
# Defined positive if light arrival times increase with distance to it. Negative otherwise:
dl2_df["signed_time_gradient"] = -1 * np.sign(longi) * dl2_df["time_gradient"]
# Obtain skewness with sign relative to the reconstructed shower direction (reco_src_x, reco_src_y)
# Defined on the major image axis; sign is such that it is typically positive for gammas:
dl2_df["signed_skewness"] = -1 * np.sign(longi) * dl2_df["skewness"]
# TODO: what if we use simulations but still want to use the reconstructed alt az ?
# TODO: better default value (nan ?)
if "mc_alt_tel" in dl2_df.columns:
alt_tel = dl2_df["mc_alt_tel"].values
az_tel = dl2_df["mc_az_tel"].values
elif "alt_tel" in dl2_df.columns:
alt_tel = dl2_df["alt_tel"].values
az_tel = dl2_df["az_tel"].values
else:
alt_tel = -np.pi / 2.0 * np.ones(len(dl2_df))
az_tel = -np.pi / 2.0 * np.ones(len(dl2_df))
# TODO: this calls astropy coordinates changing routines, can it be optimized ?
src_pos_reco = reco_source_position_sky(
dl2_df.x.values * u.m,
dl2_df.y.values * u.m,
dl2_df.reco_disp_dx.values * u.m,
dl2_df.reco_disp_dy.values * u.m,
effective_focal_length,
alt_tel * u.rad,
az_tel * u.rad,
)
dl2_df["reco_alt"] = src_pos_reco.alt.rad
dl2_df["reco_az"] = src_pos_reco.az.rad
gammaness = gamma_classifier.predict_proba(dl2_df.loc[:, dl1_dl2_config["gamma_classifier_features"]])
# TODO: replace this with a single proba predictor like disp sign, so no hardcoded class values!
# gammaness is the prediction probability for the class 0 (proton: class 101)
mc_type_gamma, mc_type_proton = 0, 101
col = list(gamma_classifier.classes_).index(mc_type_gamma)
dl2_df["gammaness"] = gammaness[:, col]
dl2_df["reco_type"] = np.where(gammaness[:, col] > 0.5, mc_type_gamma, mc_type_proton)
return dl2_df
[docs]
def dl1_dl2_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Stand-alone DL1 to DL2 processing from ctapipe-lstchain for Real Time Analysis",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--config",
"-c",
action="store",
type=Path,
dest="config_path",
help="Path to the configuration file",
required=True,
)
parser.add_argument(
"--energy_regressor",
"-e",
action="store",
type=Path,
required=True,
dest="energy_regressor_model_path",
help="Path to the energy regressor model (.sav)",
)
parser.add_argument(
"--gamma_classifier",
"-g",
action="store",
type=Path,
required=True,
dest="gamma_classifier_path",
help="Path to the gamma/hadron classifier model (.sav)",
)
parser.add_argument(
"--image_displacement_vector_regressor",
"-v",
action="store",
type=Path,
default=None,
dest="disp_vector_regressor_model_path",
help="Path to the image displacement vector regressor model (.sav). "
"Either this argument must be specified, or `image_displacement_norm_regressor` and "
"`image_displacement_sign_classifier`",
)
parser.add_argument(
"--image_displacement_norm_regressor",
"-n",
action="store",
type=Path,
default=None,
dest="disp_norm_regressor_model_path",
help="Path to the image displacement norm regressor model (.sav)"
"Either this argument and `image_displacement_sign_classifier` must be specified, or "
"`image_displacement_vector_regressor`",
)
parser.add_argument(
"--image_displacement_sign_classifier",
"-s",
action="store",
type=Path,
default=None,
dest="disp_sign_classifier_model_path",
help="Path to the image displacement sign classifier model (.sav)"
"Either this argument and `image_displacement_norm_regressor` must be specified, or "
"`image_displacement_vector_regressor`",
)
parser.add_argument(
"--input_dl1",
"-i",
type=Path,
nargs="+",
dest="dl1_file_paths",
help="Path(s) to DL1 file(s) to process to DL2.",
required=True,
)
parser.add_argument(
"--log_file",
"-l",
type=Path,
dest="log_file_path",
help="Path to a file where this process will log its execution",
)
parser.add_argument(
"--log_level",
type=LOGGING_LEVELS,
action=EnumAction,
dest="logging_level",
help="Logging level: all messages with a level severity above or equal the specified level will be logged.",
default=LOGGING_LEVELS.WARNING,
)
parser.add_argument(
"--output_dl2",
"-o",
type=Path,
nargs="+",
dest="dl2_file_paths",
help="Path(s) to DL2 file(s) to write the output.",
required=True,
)
return parser
[docs]
def main():
parser = dl1_dl2_arg_parser()
args = parser.parse_args()
init_logging(log_header_str="RTA_DL1_DL2", log_filename=args.log_file_path, log_level=args.logging_level)
if len(args.dl1_file_paths) != len(args.dl2_file_paths):
raise argparse.ArgumentTypeError(
"The number {} of input dl1 files must match the number {} of output DL2 files.".format(
len(args.dl1_file_paths), len(args.dl2_file_paths)
)
)
if args.disp_vector_regressor_model_path is not None and args.disp_norm_regressor_model_path is not None:
raise argparse.ArgumentTypeError(
"`image_displacement_vector_regressor` and `image_displacement_norm_regressor` are mutually exclusive arguments. "
"Got {} and {}".format(args.disp_vector_regressor_model_path, args.disp_norm_regressor_model_path)
)
if args.disp_vector_regressor_model_path is not None and args.disp_sign_classifier_model_path is not None:
raise argparse.ArgumentTypeError(
"`image_displacement_vector_regressor` and `image_displacement_sign_classifier` are mutually exclusive arguments. "
"Got {} and {}".format(args.disp_vector_regressor_model_path, args.disp_sign_classifier_model_path)
)
if (
args.disp_vector_regressor_model_path is None
and args.disp_norm_regressor_model_path is None
and args.disp_sign_classifier_model_path is None
):
raise argparse.ArgumentTypeError(
"Either `image_displacement_vector_regressor` or `image_displacement_norm_regressor` and "
"`image_displacement_sign_classifier` must be specified, got none of them."
)
if (args.disp_norm_regressor_model_path is not None and args.disp_sign_classifier_model_path is None) or (
args.disp_norm_regressor_model_path is None and args.disp_sign_classifier_model_path is not None
):
raise argparse.ArgumentError(
"`image_displacement_vector_regressor` and `image_displacement_sign_classifier` must both be specified if used. Got {} and {}".format(
args.disp_norm_regressor_model_path, args.disp_sign_classifier_model_path
)
)
with open(args.config_path, "r") as config_f:
config = json.load(config_f)
energy_regressor = joblib.load(args.energy_regressor_model_path)
gamma_classifier = joblib.load(args.gamma_classifier_path)
all_features = config["energy_regressor_features"] + config["gamma_classifier_features"]
using_disp_vector = args.disp_vector_regressor_model_path is not None
if using_disp_vector:
disp_vector_regressor = joblib.load(args.disp_vector_regressor_model_path)
disp_norm_regressor = None
disp_sign_classifier = None
all_features += config["disp_vector_regressor_features"]
else:
disp_vector_regressor = None
disp_norm_regressor = joblib.load(args.disp_norm_regressor_model_path)
disp_sign_classifier = joblib.load(args.disp_sign_classifier_model_path)
all_features += config["disp_norm_regressor_features"] + config["disp_sign_classifier_features"]
for dl1_file_path, dl2_file_path in zip(args.dl1_file_paths, args.dl2_file_paths):
# TODO: read dl1 straight in a numpy array (avoid pandas to avoid copy before passing to sklearn)
# should be Fortran order ? -> benchmark
dl1_df = read_dl1(dl1_file_path) # TODO: make table "key" configurable
interpolate_missing_alt_az(dl1_df) # TODO: required after dl1 alt-az maker ? what about simple dropna
tel_optics = read_telescope_optics(dl1_file_path) # TODO: make sure correct
convert_az_and_sin_az_angle_to_degree(
dl1_df
) # TODO: is this required ? (angles come from arctan2, and sin is not used ?)
dl1_df = filter_dl1(
dl1_df, filters=config["events_filters"], finite_params=all_features
) # TODO: config ? (filter events based on column names, filters, ~isnan, etc.)
# TODO: is this required ?
# Update parameters related to target direction on camera frame for MC data
# taking into account of the aberration effect using effective focal length
if "disp_norm" in dl1_df.columns:
update_disp_with_effective_focal_length(dl1_df, effective_focal_length=tel_optics.effective_focal_length)
# TODO: make dl1_to_dl2 not return anything - inplace operations in the df
dl2_df = dl1_to_dl2(
dl1_df,
config,
tel_optics.effective_focal_length,
energy_regressor,
gamma_classifier,
disp_vector_regressor,
disp_norm_regressor,
disp_sign_classifier,
)
init_dl2_file(dl2_file_path, dl1_file_path)
write_dl2_df(
dl2_file_path,
dl2_df,
attributes={"config": config},
)
if __name__ == "__main__":
main()