Coverage for lisacattools/utils.py: 39%
154 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-02-06 17:36 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-02-06 17:36 +0000
1# -*- coding: utf-8 -*-
2# Copyright (C) 2021 - James I. Thorpe, Tyson B. Littenberg, Jean-Christophe
3# Malapert
4#
5# This file is part of lisacattools.
6#
7# lisacattools is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# lisacattools is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with lisacattools. If not, see <https://www.gnu.org/licenses/>.
19import logging
20from enum import Enum
21from functools import partial
22from functools import wraps
23from typing import List
24from typing import Union
26import healpy as hp
27import matplotlib.transforms as transforms
28import numpy as np
29import pandas as pd
30from astropy import units as u
31from astropy.coordinates import SkyCoord
32from matplotlib.patches import Ellipse
34from .monitoring import UtilsMonitoring
37class DocEnum(Enum):
38 """Enum where we can add documentation."""
40 def __new__(cls, value, doc=None):
41 # calling super().__new__(value) here would fail
42 self = object.__new__(cls)
43 self._value_ = value
44 if doc is not None:
45 self.__doc__ = doc
46 return self
49class FrameEnum(DocEnum):
50 """Supported reference frame"""
52 GALACTIC = "Galactic", "Galactic frame"
53 ECLIPTIC = "Ecliptic", "Ecliptic frame"
56class CacheManager(object):
57 memory_cache = dict()
59 @staticmethod
60 def get_cache_pandas(
61 func=None,
62 keycache_argument: Union[int, List[int]] = 0,
63 level=logging.INFO,
64 ):
65 """Cache a pandas DataFrame.
67 Parameters
68 ----------
69 func: func
70 Function to cache (default: {None})
71 keycache_argument: Union[int, List[int]]
72 Argument number that is used for the cache (default: 0)
73 level: int
74 Level from which the log is displayed (default: {logging.INFO})
76 Returns
77 -------
78 object : the result of the function
79 """
80 if func is None:
81 return partial(
82 CacheManager.get_cache_pandas,
83 keycache_argument=keycache_argument,
84 level=level,
85 )
87 @wraps(func)
88 def newfunc(*args, **kwargs):
89 name = func.__name__
90 logger = logging.getLogger(__name__ + "." + name)
91 if (
92 type(keycache_argument) is not list
93 and keycache_argument > len(args) - 1
94 ):
95 logger.warning(
96 f"Error during the configuration of keycache_argument, \
97 the value should be in [0, {len(args)-1}]"
98 )
99 result = func(*args, **kwargs)
100 return result
101 elif (
102 type(keycache_argument) is list
103 and len(
104 [key for key in keycache_argument if key > len(args) - 1]
105 )
106 > 0
107 ):
108 logger.warning(
109 f"Error during the configuration of keycache_argument, \
110 each value of the list should be in [0, {len(args)-1}]"
111 )
112 result = func(*args, **kwargs)
113 return result
115 key_cache = (
116 "-".join(
117 [args[arg_number] for arg_number in keycache_argument]
118 )
119 if type(keycache_argument) is list
120 else args[keycache_argument]
121 )
123 if key_cache in CacheManager.memory_cache:
124 if logger.getEffectiveLevel() >= level:
125 logger.log(level, f"Retrieve {key_cache} from cache")
126 result = CacheManager.memory_cache[key_cache].copy()
127 else:
128 result = func(*args, **kwargs)
129 if logger.getEffectiveLevel() >= level:
130 logger.log(level, "Init the memory cache")
131 CacheManager.memory_cache.clear()
132 if logger.getEffectiveLevel() >= level:
133 logger.log(level, f"Cache {key_cache}")
134 CacheManager.memory_cache[key_cache] = result.copy()
135 return result
137 return newfunc
140def HPbin(df, nside, system: FrameEnum = FrameEnum.GALACTIC):
141 # Assigns each lat/lon coordinate to a HEALPPIX Bin. Optional argument
142 # 'system' can be 'FrameEnum.GALACTIC' [default] or 'Ecliptic'
144 # load in the coordinates in radians
145 if system == FrameEnum.GALACTIC:
146 if not ("Galactic Latitude" in df.columns):
147 convert_ecliptic_to_galactic(df)
149 lat = np.deg2rad(np.array(df["Galactic Latitude"]))
150 lon = np.deg2rad(np.array(df["Galactic Longitude"]))
151 elif system == FrameEnum.ECLIPTIC:
152 lat = np.array(df["Ecliptic Latitude"])
153 lon = np.array(df["Ecliptic Longitude"])
154 else:
155 raise Exception(
156 f"{system} ({type(system)}) is not a valid coordinate system, \
157 please choose FrameEnum.GALACTIC or FrameEnum.ECLIPTIC"
158 )
159 return
161 # make the healpix map and insert into the passed dataframe
162 # the latitude/co-latitude convention in HEALPY
163 hpidx = hp.ang2pix(nside, np.pi / 2.0 - lat, lon)
164 if not ("HEALPix bin" in df.columns):
165 df.insert(len(df.columns), "HEALPix bin", hpidx, True)
166 else:
167 df["HEALPix bin"] = hpidx
168 return
171@UtilsMonitoring.time_spend(level=logging.DEBUG)
172def HPhist(source, nside, system: FrameEnum = FrameEnum.GALACTIC):
174 HPbin(source, nside, system)
176 # make an empty array for all the bins
177 npix = hp.nside2npix(nside)
178 hp_map = np.zeros(npix, dtype=int)
179 # count samples in the non-empty bins
180 hp_idx, hp_cnts = np.unique(source["HEALPix bin"], return_counts=True)
181 # fill in the non-empty bins
182 hp_map[hp_idx] = hp_cnts
183 return hp_map
186@UtilsMonitoring.time_spend(level=logging.DEBUG)
187def convert_galactic_to_cartesian(
188 data: pd.DataFrame, long_name, lat_name, distance_name
189):
190 longitude = data[long_name].to_list()
191 latitude = data[lat_name].to_list()
192 distance = data[distance_name].to_list()
193 galactic_coord = SkyCoord(
194 longitude * u.degree,
195 latitude * u.degree,
196 frame="galactic",
197 distance=distance,
198 )
199 cartesian = galactic_coord.cartesian
200 data["X"] = cartesian.x
201 data["Y"] = cartesian.y
202 data["Z"] = cartesian.z
205@UtilsMonitoring.time_spend(level=logging.DEBUG)
206def convert_ecliptic_to_galactic(data: pd.DataFrame):
207 lamb = None
208 try:
209 lamb = (
210 data["Ecliptic Longitude"]
211 if "Ecliptic Longitude" in data.columns
212 else data["ecliptic longitude"]
213 )
214 except: # noqa: E722
215 raise Exception(
216 "ERROR: Unable to find ecliptic longitude in data frame"
217 )
218 lamb = lamb.to_list()
219 try:
220 beta = (
221 data["Ecliptic Latitude"]
222 if "Ecliptic Latitude" in data.columns
223 else data["ecliptic latitude"]
224 )
225 beta = beta.to_list()
226 except: # noqa: E722
227 try:
228 beta = (
229 np.pi / 2 - np.arccos(np.array(data["coslat"]))
230 if "cos ecliptic colatitude" in data.columns
231 or "coslat" in data.columns
232 else np.pi / 2
233 - np.arccos(np.array(data["cos ecliptic colatitude"]))
234 )
235 except: # noqa: E722
236 raise Exception(
237 "ERROR: Unable to find ecliptic latitude in data frame"
238 )
239 ecliptic_coord = SkyCoord(
240 lamb * u.rad, beta * u.rad, frame="barycentrictrueecliptic"
241 )
242 galactic_coord = ecliptic_coord.galactic
243 gal_longitude = galactic_coord.l
244 gal_latitude = galactic_coord.b
245 # gal_latitude.wrap_angle = 180 * u.deg
246 gal_longitude.wrap_angle = 180 * u.deg
247 if not ("Galactic Latitude" in data.columns):
248 data.insert(
249 len(data.columns),
250 "Galactic Longitude",
251 gal_longitude.to_value(),
252 True,
253 )
254 data.insert(
255 len(data.columns),
256 "Galactic Latitude",
257 gal_latitude.to_value(),
258 True,
259 )
260 else:
261 data["Galactic Longitude"] = gal_longitude
262 data["Galactic Latitude"] = gal_latitude
263 return
266def getSciRD(f, Tobs):
267 # Compute the LISA Sensitivity curve given a set of Fourier frequencies
268 # and an observation time
269 # Curve is in units of strain amplitude and is defined by SciRD available
270 # at: https://www.cosmos.esa.int/documents/678316/1700384/SciRD.pdf
272 f1 = np.array(0.4 * 1e-3)
273 f2 = np.array(25 * 1e-3)
274 R = 1 + np.power((f / f2), 2)
275 S2 = 2.6 * 1e-41
276 S1 = 5.76 * 1e-48 * (1 + np.power((f1 / f), 2))
277 Sh = (1 / 2) * (20 / 3) * (S1 / (np.power((2 * np.pi * f), 4)) + S2) * R
278 Sha = np.sqrt(Sh / Tobs)
279 return Sha
282def get_DL(df):
283 # Estimate luminosity distance (in kpc) from GW amplitude, frequency, and
284 # frequency derivative
285 c = 2.99e8 # speed of light in m/s
286 kpc2m = 3.086e19 # 1 kiloparsec in meters
287 r = (
288 (5 / (96 * (np.pi**2)))
289 * (c / df["Amplitude"])
290 * (df["Frequency Derivative"])
291 / np.power(df["Frequency"], 3)
292 * (1 / kpc2m)
293 )
294 df.insert(len(df.columns), "Luminosity Distance", r, True)
295 return
298def get_Mchirp(df):
299 TSUN = 4.9169e-6 # Mass of the Sun \f$M_\odot G c^{-3}\f$ [s]
300 Mc = (
301 np.power(
302 df["Frequency Derivative"]
303 / (96.0 / 5.0)
304 / np.power(np.pi, 8.0 / 3.0)
305 / np.power(df["Frequency"], 11.0 / 3.0),
306 3.0 / 5.0,
307 )
308 / TSUN
309 )
310 df.insert(len(df.columns), "Chirp Mass", Mc, True)
311 return
314# ellipse_area
315def ellipse_area(df):
316 # Compute 90% ellipse area from chain samples
317 cov = np.array(df.cov())
318 ev = np.linalg.eigvals(cov)
319 ax = 2.0 * np.sqrt(
320 4.605 * ev
321 ) # the 4.605 corresponds to 90%, for 95% we'd use 5.991
322 return np.pi * ax[0] * ax[1]
325# confidence_ellipse
326@UtilsMonitoring.time_spend(level=logging.DEBUG)
327def confidence_ellipse(df, ax, n_std=1.0, facecolor="none", **kwargs):
328 # Plot an error ellipse on some axes. It takes a 2D data frame of
329 # parameters as its input
330 mean = np.array(df.mean())
332 cov = np.array(df.cov())
333 pearson = cov[0, 1] / np.sqrt(cov[0, 0] * cov[1, 1])
334 # Using a special case to obtain the eigenvalues of this
335 # two-dimensionl dataset.
336 ell_radius_x = np.sqrt(1 + pearson)
337 ell_radius_y = np.sqrt(1 - pearson)
338 ellipse = Ellipse(
339 (0, 0),
340 width=ell_radius_x * 2,
341 height=ell_radius_y * 2,
342 facecolor=facecolor,
343 **kwargs,
344 )
346 # Calculating the stdandard deviation of x from
347 # the squareroot of the variance and multiplying
348 # with the given number of standard deviations.
349 scale_x = np.sqrt(cov[0, 0]) * n_std
351 # calculating the stdandard deviation of y ...
352 scale_y = np.sqrt(cov[1, 1]) * n_std
354 transf = (
355 transforms.Affine2D()
356 .rotate_deg(45)
357 .scale(scale_x, scale_y)
358 .translate(mean[0], mean[1])
359 )
361 ellipse.set_transform(transf + ax.transData)
362 return ax.add_patch(ellipse)