Coverage for lisacattools/utils.py: 39%

154 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-02-06 17:36 +0000

1# -*- coding: utf-8 -*- 

2# Copyright (C) 2021 - James I. Thorpe, Tyson B. Littenberg, Jean-Christophe 

3# Malapert 

4# 

5# This file is part of lisacattools. 

6# 

7# lisacattools is free software: you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation, either version 3 of the License, or 

10# (at your option) any later version. 

11# 

12# lisacattools is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License 

18# along with lisacattools. If not, see <https://www.gnu.org/licenses/>. 

19import logging 

20from enum import Enum 

21from functools import partial 

22from functools import wraps 

23from typing import List 

24from typing import Union 

25 

26import healpy as hp 

27import matplotlib.transforms as transforms 

28import numpy as np 

29import pandas as pd 

30from astropy import units as u 

31from astropy.coordinates import SkyCoord 

32from matplotlib.patches import Ellipse 

33 

34from .monitoring import UtilsMonitoring 

35 

36 

37class DocEnum(Enum): 

38 """Enum where we can add documentation.""" 

39 

40 def __new__(cls, value, doc=None): 

41 # calling super().__new__(value) here would fail 

42 self = object.__new__(cls) 

43 self._value_ = value 

44 if doc is not None: 

45 self.__doc__ = doc 

46 return self 

47 

48 

49class FrameEnum(DocEnum): 

50 """Supported reference frame""" 

51 

52 GALACTIC = "Galactic", "Galactic frame" 

53 ECLIPTIC = "Ecliptic", "Ecliptic frame" 

54 

55 

56class CacheManager(object): 

57 memory_cache = dict() 

58 

59 @staticmethod 

60 def get_cache_pandas( 

61 func=None, 

62 keycache_argument: Union[int, List[int]] = 0, 

63 level=logging.INFO, 

64 ): 

65 """Cache a pandas DataFrame. 

66 

67 Parameters 

68 ---------- 

69 func: func 

70 Function to cache (default: {None}) 

71 keycache_argument: Union[int, List[int]] 

72 Argument number that is used for the cache (default: 0) 

73 level: int 

74 Level from which the log is displayed (default: {logging.INFO}) 

75 

76 Returns 

77 ------- 

78 object : the result of the function 

79 """ 

80 if func is None: 

81 return partial( 

82 CacheManager.get_cache_pandas, 

83 keycache_argument=keycache_argument, 

84 level=level, 

85 ) 

86 

87 @wraps(func) 

88 def newfunc(*args, **kwargs): 

89 name = func.__name__ 

90 logger = logging.getLogger(__name__ + "." + name) 

91 if ( 

92 type(keycache_argument) is not list 

93 and keycache_argument > len(args) - 1 

94 ): 

95 logger.warning( 

96 f"Error during the configuration of keycache_argument, \ 

97 the value should be in [0, {len(args)-1}]" 

98 ) 

99 result = func(*args, **kwargs) 

100 return result 

101 elif ( 

102 type(keycache_argument) is list 

103 and len( 

104 [key for key in keycache_argument if key > len(args) - 1] 

105 ) 

106 > 0 

107 ): 

108 logger.warning( 

109 f"Error during the configuration of keycache_argument, \ 

110 each value of the list should be in [0, {len(args)-1}]" 

111 ) 

112 result = func(*args, **kwargs) 

113 return result 

114 

115 key_cache = ( 

116 "-".join( 

117 [args[arg_number] for arg_number in keycache_argument] 

118 ) 

119 if type(keycache_argument) is list 

120 else args[keycache_argument] 

121 ) 

122 

123 if key_cache in CacheManager.memory_cache: 

124 if logger.getEffectiveLevel() >= level: 

125 logger.log(level, f"Retrieve {key_cache} from cache") 

126 result = CacheManager.memory_cache[key_cache].copy() 

127 else: 

128 result = func(*args, **kwargs) 

129 if logger.getEffectiveLevel() >= level: 

130 logger.log(level, "Init the memory cache") 

131 CacheManager.memory_cache.clear() 

132 if logger.getEffectiveLevel() >= level: 

133 logger.log(level, f"Cache {key_cache}") 

134 CacheManager.memory_cache[key_cache] = result.copy() 

135 return result 

136 

137 return newfunc 

138 

139 

140def HPbin(df, nside, system: FrameEnum = FrameEnum.GALACTIC): 

141 # Assigns each lat/lon coordinate to a HEALPPIX Bin. Optional argument 

142 # 'system' can be 'FrameEnum.GALACTIC' [default] or 'Ecliptic' 

143 

144 # load in the coordinates in radians 

145 if system == FrameEnum.GALACTIC: 

146 if not ("Galactic Latitude" in df.columns): 

147 convert_ecliptic_to_galactic(df) 

148 

149 lat = np.deg2rad(np.array(df["Galactic Latitude"])) 

150 lon = np.deg2rad(np.array(df["Galactic Longitude"])) 

151 elif system == FrameEnum.ECLIPTIC: 

152 lat = np.array(df["Ecliptic Latitude"]) 

153 lon = np.array(df["Ecliptic Longitude"]) 

154 else: 

155 raise Exception( 

156 f"{system} ({type(system)}) is not a valid coordinate system, \ 

157 please choose FrameEnum.GALACTIC or FrameEnum.ECLIPTIC" 

158 ) 

159 return 

160 

161 # make the healpix map and insert into the passed dataframe 

162 # the latitude/co-latitude convention in HEALPY 

163 hpidx = hp.ang2pix(nside, np.pi / 2.0 - lat, lon) 

164 if not ("HEALPix bin" in df.columns): 

165 df.insert(len(df.columns), "HEALPix bin", hpidx, True) 

166 else: 

167 df["HEALPix bin"] = hpidx 

168 return 

169 

170 

171@UtilsMonitoring.time_spend(level=logging.DEBUG) 

172def HPhist(source, nside, system: FrameEnum = FrameEnum.GALACTIC): 

173 

174 HPbin(source, nside, system) 

175 

176 # make an empty array for all the bins 

177 npix = hp.nside2npix(nside) 

178 hp_map = np.zeros(npix, dtype=int) 

179 # count samples in the non-empty bins 

180 hp_idx, hp_cnts = np.unique(source["HEALPix bin"], return_counts=True) 

181 # fill in the non-empty bins 

182 hp_map[hp_idx] = hp_cnts 

183 return hp_map 

184 

185 

186@UtilsMonitoring.time_spend(level=logging.DEBUG) 

187def convert_galactic_to_cartesian( 

188 data: pd.DataFrame, long_name, lat_name, distance_name 

189): 

190 longitude = data[long_name].to_list() 

191 latitude = data[lat_name].to_list() 

192 distance = data[distance_name].to_list() 

193 galactic_coord = SkyCoord( 

194 longitude * u.degree, 

195 latitude * u.degree, 

196 frame="galactic", 

197 distance=distance, 

198 ) 

199 cartesian = galactic_coord.cartesian 

200 data["X"] = cartesian.x 

201 data["Y"] = cartesian.y 

202 data["Z"] = cartesian.z 

203 

204 

205@UtilsMonitoring.time_spend(level=logging.DEBUG) 

206def convert_ecliptic_to_galactic(data: pd.DataFrame): 

207 lamb = None 

208 try: 

209 lamb = ( 

210 data["Ecliptic Longitude"] 

211 if "Ecliptic Longitude" in data.columns 

212 else data["ecliptic longitude"] 

213 ) 

214 except: # noqa: E722 

215 raise Exception( 

216 "ERROR: Unable to find ecliptic longitude in data frame" 

217 ) 

218 lamb = lamb.to_list() 

219 try: 

220 beta = ( 

221 data["Ecliptic Latitude"] 

222 if "Ecliptic Latitude" in data.columns 

223 else data["ecliptic latitude"] 

224 ) 

225 beta = beta.to_list() 

226 except: # noqa: E722 

227 try: 

228 beta = ( 

229 np.pi / 2 - np.arccos(np.array(data["coslat"])) 

230 if "cos ecliptic colatitude" in data.columns 

231 or "coslat" in data.columns 

232 else np.pi / 2 

233 - np.arccos(np.array(data["cos ecliptic colatitude"])) 

234 ) 

235 except: # noqa: E722 

236 raise Exception( 

237 "ERROR: Unable to find ecliptic latitude in data frame" 

238 ) 

239 ecliptic_coord = SkyCoord( 

240 lamb * u.rad, beta * u.rad, frame="barycentrictrueecliptic" 

241 ) 

242 galactic_coord = ecliptic_coord.galactic 

243 gal_longitude = galactic_coord.l 

244 gal_latitude = galactic_coord.b 

245 # gal_latitude.wrap_angle = 180 * u.deg 

246 gal_longitude.wrap_angle = 180 * u.deg 

247 if not ("Galactic Latitude" in data.columns): 

248 data.insert( 

249 len(data.columns), 

250 "Galactic Longitude", 

251 gal_longitude.to_value(), 

252 True, 

253 ) 

254 data.insert( 

255 len(data.columns), 

256 "Galactic Latitude", 

257 gal_latitude.to_value(), 

258 True, 

259 ) 

260 else: 

261 data["Galactic Longitude"] = gal_longitude 

262 data["Galactic Latitude"] = gal_latitude 

263 return 

264 

265 

266def getSciRD(f, Tobs): 

267 # Compute the LISA Sensitivity curve given a set of Fourier frequencies 

268 # and an observation time 

269 # Curve is in units of strain amplitude and is defined by SciRD available 

270 # at: https://www.cosmos.esa.int/documents/678316/1700384/SciRD.pdf 

271 

272 f1 = np.array(0.4 * 1e-3) 

273 f2 = np.array(25 * 1e-3) 

274 R = 1 + np.power((f / f2), 2) 

275 S2 = 2.6 * 1e-41 

276 S1 = 5.76 * 1e-48 * (1 + np.power((f1 / f), 2)) 

277 Sh = (1 / 2) * (20 / 3) * (S1 / (np.power((2 * np.pi * f), 4)) + S2) * R 

278 Sha = np.sqrt(Sh / Tobs) 

279 return Sha 

280 

281 

282def get_DL(df): 

283 # Estimate luminosity distance (in kpc) from GW amplitude, frequency, and 

284 # frequency derivative 

285 c = 2.99e8 # speed of light in m/s 

286 kpc2m = 3.086e19 # 1 kiloparsec in meters 

287 r = ( 

288 (5 / (96 * (np.pi**2))) 

289 * (c / df["Amplitude"]) 

290 * (df["Frequency Derivative"]) 

291 / np.power(df["Frequency"], 3) 

292 * (1 / kpc2m) 

293 ) 

294 df.insert(len(df.columns), "Luminosity Distance", r, True) 

295 return 

296 

297 

298def get_Mchirp(df): 

299 TSUN = 4.9169e-6 # Mass of the Sun \f$M_\odot G c^{-3}\f$ [s] 

300 Mc = ( 

301 np.power( 

302 df["Frequency Derivative"] 

303 / (96.0 / 5.0) 

304 / np.power(np.pi, 8.0 / 3.0) 

305 / np.power(df["Frequency"], 11.0 / 3.0), 

306 3.0 / 5.0, 

307 ) 

308 / TSUN 

309 ) 

310 df.insert(len(df.columns), "Chirp Mass", Mc, True) 

311 return 

312 

313 

314# ellipse_area 

315def ellipse_area(df): 

316 # Compute 90% ellipse area from chain samples 

317 cov = np.array(df.cov()) 

318 ev = np.linalg.eigvals(cov) 

319 ax = 2.0 * np.sqrt( 

320 4.605 * ev 

321 ) # the 4.605 corresponds to 90%, for 95% we'd use 5.991 

322 return np.pi * ax[0] * ax[1] 

323 

324 

325# confidence_ellipse 

326@UtilsMonitoring.time_spend(level=logging.DEBUG) 

327def confidence_ellipse(df, ax, n_std=1.0, facecolor="none", **kwargs): 

328 # Plot an error ellipse on some axes. It takes a 2D data frame of 

329 # parameters as its input 

330 mean = np.array(df.mean()) 

331 

332 cov = np.array(df.cov()) 

333 pearson = cov[0, 1] / np.sqrt(cov[0, 0] * cov[1, 1]) 

334 # Using a special case to obtain the eigenvalues of this 

335 # two-dimensionl dataset. 

336 ell_radius_x = np.sqrt(1 + pearson) 

337 ell_radius_y = np.sqrt(1 - pearson) 

338 ellipse = Ellipse( 

339 (0, 0), 

340 width=ell_radius_x * 2, 

341 height=ell_radius_y * 2, 

342 facecolor=facecolor, 

343 **kwargs, 

344 ) 

345 

346 # Calculating the stdandard deviation of x from 

347 # the squareroot of the variance and multiplying 

348 # with the given number of standard deviations. 

349 scale_x = np.sqrt(cov[0, 0]) * n_std 

350 

351 # calculating the stdandard deviation of y ... 

352 scale_y = np.sqrt(cov[1, 1]) * n_std 

353 

354 transf = ( 

355 transforms.Affine2D() 

356 .rotate_deg(45) 

357 .scale(scale_x, scale_y) 

358 .translate(mean[0], mean[1]) 

359 ) 

360 

361 ellipse.set_transform(transf + ax.transData) 

362 return ax.add_patch(ellipse)