Source code for vlf4ions.class_definition

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 22 14:10:40 2025

@author: pteysseyre
"""

import vlf4ions.read_files as rf
import vlf4ions.detect_change_points as dcp
import vlf4ions.correct_phase as correct_phase
import vlf4ions.flag as fl
import vlf4ions.analyse_change_points as acp
import vlf4ions.manage_statefile as msf
import vlf4ions.polarisation as polar
import vlf4ions.sunrise_sunset as srss

import numpy as np
import datetime as dt  # Necessary to use timezone
from datetime import datetime, timedelta, timedelta
import logging

import pandas as pd

# For alerts
import smtplib
from pathlib import Path
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email.utils import COMMASPACE, formatdate
from email import encoders


# ----------------------- Receiver description -----------------
[docs] class receiver: """Characteristics of the receiver (for plotting purposes mainly and for sending alerts) :param name: Name of the receiver. Must match the name in the VLF data file names :param lat: Latitude of the receiver (for plotting) :param lon: Longitude of the receiver (for plotting) :param threshold: Amplitude threshold bellow which we declare a station not transmitting (default: 0.5) :param nb_errors: Number of reading errors, we keep track of them to alert in case of malfunction :param file_endings: List of strings found at the end of the data files. Default (for AWESOME): ["_100A", "_100B", "_101A", "_101B"] """ def __init__( self, name, lat, lon, threshold=0.5, nb_error=0, file_endings=["_100A", "_100B", "_101A", "_101B"], ): self.name = name self.lat = lat self.lon = lon self.threshold = threshold self.nb_error = nb_error self.file_endings = file_endings
# ---------------------- Alerts, warnings and such --------------
[docs] class email_alerts: """Alerts sent in case of flare detection, antenna problems or such :param subject: Subject of the email alert :param body: Body of the email alert :param sender: Email adress of the sender :param password: Password of the email address of the sender :param recipiends: Recipients of the email alert (must be a list of strings) :param threshold: Flux threshold. If the probability of the flux being above this threshold is 10% or more, the alert will be sent (default: 0) :param files: List of files to include as attachment (default: empty) :param send_last_time: Computed internally; used to keep track of alerts already sent to avoid spamming people :param send_each_change: Boolean. If true, the recipients will receive an alert each time the flux estimation changes, as long as there is at least a 10% probability of the flux being above the threshold""" def __init__( self, subject, body, sender, password, recipients, threshold=0, files=[], sent_last_time=0, send_each_change=False, ): self.subject = subject self.body = body self.sender = sender self.password = password self.recipients = recipients self.threshold = threshold # Only send the alerts if the proba of the flare is 10% above a threshold self.files = files # Files to attach to the email self.sent_last_time = ( sent_last_time # Boolean, only useful for alerts on probas ) self.send_each_change = ( send_each_change # Boolean, if True send each time the proba change )
[docs] def update_detection_body(self, callsign, p1): """Just updates the body of the email with the relevant informations in case of flare detection""" self.body = "Detection with " + callsign + "\n Slope : " + str(p1) + " deg/hr."
[docs] def get_path_to_fluxestimate(self, path): """Get the path to the file used by the 'craft_newsletter' function bellow :param path: Path to the flux estimate recap :return: filename, full name and path of the file with the flux estimations""" today = dt.datetime.now(dt.timezone.utc) if today.hour == 0: # This is the beginning of the day, so we want to send the recap # for the previous day today = today - dt.timedelta(days=1) la_date = today.strftime("%Y_%m_%d") filename = path + la_date + "_nowcast.csv" return filename
[docs] def craft_newsletter(self, filename, completenamegiven=False): """Craft a newsletter based on the recapitulative file indicated by `filename'. It should include the highest flux estimated, its time, and each time the estimated flux was above M2 (with start, end time and peak times). :param filename: Path to the recapitulative file created with the 'write_fluxestimate' function. It can be found by calling the 'get_path_to_fileestimate' method just before :param completenamegiven: Boolean (default: False). If it is false, it assumes that the filename given is the path to the folder, and that the true filename must be computed (because it depends on the date) :return: - max_flux: List of peak fluxes for each flare detected during the day - max_times: List of each time of peak flux previously returned - starts: Start times of each detected flare (quiet < 1) - ends: End times of each detected flare (quiet == 1) """ if not completenamegiven: filename = self.get_path_to_fluxestimate(filename) # Read the file and convert the data into a numpy array df = pd.read_csv(filename) data = df.to_numpy() # Convert data to convenient variables flux = data[:, 1] the_time = data[:, 0] quiet = data[:, 2] nb_stations = data[:, 3] # For each time when quiet is not one, get the beginning/end time and maximum estimated flux max_flux = [] starts = [] ends = [] max_times = [] i_start = -1 i_end = 0 # If quiet[0] < 1, it means that the first station in daytime has # detected a flare before the second station becomes in daytime if quiet[0] < 1: starts.append(the_time[0]) i_start = 0 # There must be a more elegant way to do this, but this works too # NOTE: The 'initial' flag in max allows it to run on empty arrays at the beginning of the day for i in range(1, len(flux)): if ( quiet[i] <= 0.5 and quiet[i - 1] > quiet[i] and np.max(starts, initial=0) <= np.max(ends, initial=0) ): # Flare beginning if i_start < i - 1: starts.append(the_time[i]) i_start = i elif ( (quiet[i] > quiet[i - 1] and quiet[i] >= 0.5) or (quiet[i] < 1 and i == len(flux)) ) and np.max(starts, initial=0.1) > np.max( ends, initial=0.1 ): # Flare ending i_end = i ends.append(the_time[i]) i_max = np.argmax(flux[i_start : i + 1]) max_flux.append(flux[i_max + i_start]) max_times.append(the_time[i_max + i_start]) nb_flares = len(max_flux) # Get date in correct format today = dt.datetime.now(dt.timezone.utc) if today.hour == 0: # This is the beginning of the day, so we want to send the recap # for the previous day today = today - dt.timedelta(days=1) la_date = today.strftime("%Y/%m/%d") # Write the body of the newletter body = "Detections on " + la_date + ": " + "\n" # Initialisation body += "\n" body += ( "From " + self._good_format_time(the_time[0]) + " to " + self._good_format_time(the_time[-1]) + " UT \n \n" ) if nb_flares > 0: nb_flares_above_M1 = 0 for f in range(nb_flares): if max_flux[f] > 1e-5: nb_flares_above_M1 += 1 body += "Flare " + str(nb_flares_above_M1) + " : \n" body += "-------------------- \n" body += ( "Start time : " + self._good_format_time(starts[f]) + " UT \n" ) body += ( "Max flux : " + "{flux:.1e} W/m^2".format(flux=max_flux[f]) + "at " + self._good_format_time(max_times[f]) + " UT \n" ) body += ( "End time : " + self._good_format_time(ends[f]) + " UT \n \n" ) else: body += "No detection today !" # Reminder body += "\n Note: Please remember that the quality of the flux estimates depends on the data used to compute the probability functions" body += " If only M and weak X flares are used, the estimates will be skewed for C flares and strong X flares" self.body = body return max_flux, max_times, starts, ends
def _good_format_time(self, the_time): """Convert a float like 13.1222222 to a string like '13:07' :param the_time: Time to convert to string :return: The time, converted to a string in the format '%H:%M'""" hour = np.floor(the_time) minute = np.round((the_time - hour) * 60) the_time = dt.time(int(hour), int(minute)) the_time_str = the_time.strftime("%H:%M") return the_time_str
[docs] def send_email(self): """Sends the email from the sender email adress to the recipients""" msg = MIMEMultipart() msg["From"] = self.sender msg["To"] = COMMASPACE.join(self.recipients) msg["Date"] = formatdate(localtime=True) msg["Subject"] = self.subject msg.attach(MIMEText(self.body)) for path in self.files: part = MIMEBase("application", "octet-stream") with open(path, "rb") as file: part.set_payload(file.read()) encoders.encode_base64(part) part.add_header( "Content-Disposition", "attachment; filename={}".format(Path(path).name) ) msg.attach(part) with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp_server: smtp_server.login(self.sender, self.password) smtp_server.sendmail(self.sender, self.recipients, msg.as_string()) smtp_server.quit() self.sent_last_time = 1
# ------------------- Stations and related functions -------------- def sliding_median(arr, window): return np.median(np.lib.stride_tricks.sliding_window_view(arr, (window,)), axis=1)
[docs] class station: """Characteristics of the transmitter. :param name: call-sign (e.g. 'GVT') :param lat: latitude of the transmitter :param lon: latitude of the transmitter :param freq: Frequency (kHz) :param detection_threshold: Phase slope above which we detect a solar flare :param reading_time: second after the minute at which we read the data. NOTE: They should be unique for each station :param time_average: Time resolution of the smoothed signal (default: 60 s) :param delta: Delta parameter of the incremental algorithm (Guralnik & Srivastava, 1999. Default:0.1) :param nb_data_points: number of data points last time the file was read (default : 0, used for flagging) :param Ne: Average electron density computed in the path (not done yet, but I needed this for data analysis) :param DA/DP: Computed internally, variations of the amplitude and phase compared to quiet times :param mu/sigma: This is computed internally, and is the average and std of the flux probability density function given DA and DP :param orientation: Is 1 if the phase considered is E/W, 0 if it is N/S. The idea is that the phase can be much noisier in one direction, so the optimal orientation is left as a choice for the user. If it is set to 2, the polarisation start phase is used to combine both orientations. (Default:2) :param sza: Solar zenith angle (in °) at the middle of the path. Updated in the code automatically :param sza_threshold: Solar zenith angle threshold below which it is daytime (Default: 85°) --------------- History: Written by P. Teysseyre in January 2025 Re-written under current format in April 2025 """ def __init__( self, name, lat, lon, freq, detection_threshold, reading_time, Cal_NS=0.14, Cal_EW=0.08, df=0, time_average=60, delta=0.1, nb_data_points=0, nb_error=0, Ne=1e8, DP=0, DA=0, p1=0, quiet=1, flag=0, mu=-6, sigma=1, orientation=2, sza=0, sza_threshold=85, last_time_problem=-1, ): self.name = name # Call-sign self.lat = lat # Latitude self.lon = lon # Longitude self.freq = freq # Frequency self.detection_threshold = detection_threshold self.reading_time = reading_time # When the data is read (must not be the same for two stations) self.Cal_NS = Cal_NS # Calibration number in the N/S orientation self.Cal_EW = Cal_EW # Calibration number in the E/W orientation self.time_average = time_average # Time-resolution self.delta = ( delta # Delta parameters for the detection (see Guralnik & Srivasta, 1999) ) self.nb_data_points = nb_data_points self.Ne = Ne self.df = df # For phase correction self.DP = DP # Phase increase compared to quiet self.DA = DA # Amplitude increase compared to quiet self.p1 = p1 # Last breakpoint slope self.quiet = quiet # Is 1 if it is quiet self.flag = flag # Is 1 if the station is transmitting self.mu = mu # Average of the flux pdf self.sigma = sigma # Std of the flux pdf self.orientation = orientation self.sza = sza self.sza_threshold = sza_threshold self.last_time_problem = last_time_problem # Last time the flag was not 0 or 3
[docs] def update_df(self, this_receiver, path_to_data, epsilon=5, nb_days=30): """Updates the value of df for the station, as it depends on the transmitter and may vary in time :param receiver: Receiver class instance :param path_to_data: Path to the data files :param epsilon: Maximum phase jump (in °) that we allow on average (default: 5) :param nb_days: Number of days on which we want to compute df (default: 30)""" # Get today's date, date_inf (yesterday - nb_days) and date_sup (yesterday) today = dt.datetime.now( dt.timezone.utc ) # The time zone is necessary to use get_sza later date_sup = today - dt.timedelta(days=1) date_inf = date_sup - dt.timedelta(days=nb_days) # Compute the new df df = correct_phase.compute_df( date_inf, date_sup, self, path_to_data, this_receiver.file_endings, epsilon ) # Update the df value self.df = df
[docs] def update_breakpoints( self, path_breakpoint, path_mau, path_sunrise, alerts_antenna, alerts_detection, this_receiver, ): """Checks the phase and detects perturbations due to flares. If needed, send alerts to users :param path_breakpoint: path to the files where the breakpoints are stores :param path_mau: path to the data being written :param path_sunrise: path to the files with precomputed sunrise and sunset times :param alerts_antenna: instance of 'alert' class, alert to send if the antenna is down :param alerts_detection: instance of 'alert' class, alert to send if there is a detection :param this_receiver: Receiver class instance """ try: # Get today's date in a good format today = dt.datetime.now( dt.timezone.utc ) # The time zone is necessary to use get_sza later la_date = today.strftime("%Y_%m_%d") # Read data & corrects the phase ( amp_NS, amp_EW, phase_NS, phase_EW, the_time, nb_error, ) = rf.read_Narrowband_real_time(self, path_mau, this_receiver, today) # Correct the phase if self.orientation == 2: phase = polar.EllipseParams(amp_NS, amp_EW, phase_NS, phase_EW) amp = np.sqrt(np.square(amp_NS) + np.square(amp_EW)) elif self.orientation == 1: phase = phase_NS amp = amp_NS else: phase = phase_EW amp = amp_EW phase = correct_phase.correctionphaseparDw(self, phase, the_time) # Flag the data if needed # CAREFUL: amp is the median of the amplitude recorded over the last minute at most path_sunrise = path_sunrise + self.name + "_sunrise_sunset.txt" flag, nb_data_points, amp = fl.flags( amp, self.nb_data_points, today, self, this_receiver, phase, ) self.nb_data_points = nb_data_points self.flag = flag if flag == 1: # The antenna has stopped self.last_time_problem = the_time[-1] this_receiver.nb_error += 1 if alerts_antenna.sent_last_time == 0 and self.nb_error == 5: alerts_antenna.send_email() # We only look for breakpoints if there weren't problems in the last 15 min elif flag == 0 and np.abs(the_time[-1] - self.last_time_problem) > 15 / 60: this_receiver.nb_error = 0 alerts_antenna.sent_last_time = 1 # Time average the data & apply median filter time_average = np.min([self.time_average, len(the_time)]) the_time = sliding_median(the_time, self.time_average) the_time = the_time[0::time_average] phase = sliding_median(phase, self.time_average) phase = phase[0::time_average] # Read the last breakpoint value OR create a new file if it is the first of the day ( last_breakpoint, last_p1, last_amp, last_phase, quiet, DP, ) = msf.get_lastbreakpoint(self.name, la_date, path_breakpoint) # Keep the data after last breakpoint mask_time = the_time > last_breakpoint the_time = the_time[mask_time] phase = phase[mask_time] # Shorten the data if it is too long # Also consider that it is quiet time, as there would have been more changes if it was flare time if np.max(the_time) - the_time[0] > 2: two_hours = int( np.floor(7200 / self.time_average) ) # Number of indices in those two hours phase = phase[-1 - two_hours :] the_time = the_time[-1 - two_hours :] # Look if there are new breakpoints now detec, p1 = dcp.find_break_point_realtime( the_time, phase, self.delta ) # NOTE: Here, the p1 value is the one corresponding to the last nine minutes of data # Write new breakpoint in file msf.write_newbreakpoint( self.name, la_date, path_breakpoint, the_time[-1], p1, amp, phase[-1], 1, 0, ) print("New breakpoint, after two quiet hours") # Change the station's characteritics self.quiet = 1 self.DA = 0 self.DP = 0 self.p1 = p1 else: # Look if there are new breakpoints now detec, p1 = dcp.find_break_point_realtime( the_time, phase, self.delta ) if p1 >= 1e4: # This is an error self.quiet = 1 self.DP = 0 self.DA = 0 # If there is a new breakpoint, write it in a file if detec and p1 < 1e4: # Get last QUIET breakpoint ( quiet_breakpoint, quiet_p1, quiet_amp, quiet_phase, quiet_quiet, quiet_DP, ) = msf.get_lastbreakpoint( self.name, la_date, path_breakpoint, quiet=True ) # NOTE: quiet_quiet is useless and is always 1 (same for quiet_DP who is always 0) quiet_now, prev_timedecay, DP, DA = acp.analyse_breakpoint( the_time[-1], p1, amp, phase, the_time, quiet, quiet_phase, quiet_amp, last_breakpoint, last_p1, self.detection_threshold, quiet_p1, quiet_breakpoint, this_receiver, ) if ( DP > 1e3 ): # This is an error (very noisy data or bad phase correction) self.flag == 2 self.DP = 0 self.DA = 0 self.quiet = 1 if quiet_now == 1: DP = 0 DA = 0 # Write new breakpoint in file msf.write_newbreakpoint( self.name, la_date, path_breakpoint, the_time[-1], p1, amp, phase[-1], quiet_now, DP, ) print("New breakpoint !") # Update the station self.p1 = p1 self.quiet = quiet_now self.DP = DP self.DA = DA # Send alerts if needed if quiet_now == 0 and quiet == 1: alerts_detection.update_detection_body(self.name, p1) alerts_detection.send_email() elif not detec: self.compute_DA_DP( path_breakpoint, la_date, amp, phase, this_receiver ) print("Done - ", today.strftime("%H:%M:%S"), "-", self.name) elif flag == 3: print(self.name, " at ", today.strftime("%H:%M:%S"), " : nighttime") self.quiet = 1 self.DP = 0 self.DA = 0 this_receiver.nb_error = 0 alerts_antenna.sent_last_time = 1 else: print(self.name, " is not transmitting at ", today.strftime("%H:%M:%S")) self.quiet = 1 alerts_antenna.sent_last_time = 1 self.last_time_problem = the_time[-1] except Exception as e: # If there was an error, we update the log file this_receiver.nb_error += 1 # Configuration of the logging file logging.basicConfig(filename="detection.log", encoding="utf-8") logging.error("Error at %s", "division", exc_info=e) print(self.name, " - error at ", today.strftime("%H:%M:%S"), " - see log")
[docs] def get_pdf_coeff(self, path_to_probas): """Returns the mu and sigma coefficients, defining a gaussian probabiility distribution function associated to a station and a value of DP :param path_to_probas: path to the files containing the mu and sigma values for the porbability computations""" if self.DP < 0: self.DP = 0 if ( self.DP > 5 ): # Only do it for high DP, the other ones may be due to small noise probas = pd.read_csv(path_to_probas + self.name + "_probas.csv") # Get the correct row rows = probas[probas["Low"] <= self.DP] row = rows.tail(1) self.mu = row["mu"].iloc[0] self.sigma = row["sigma"].iloc[0] else: # This is quiet time, we return the quiet proba centered in C1 self.mu = -6 # log(1e-6) self.sigma = 1
[docs] def compute_DA_DP(self, path_breakpoint, la_date, amp, phase, this_receiver): """This function computes the values of DA and DP. Before this was only done in case of a new breakpoint, but that actually should be done as soon as quiet==0 :param path_breakpoint: Path to the breakpoint file :param la_date: Date, in the 'yyyy_mm_dd' format :param amp: Median amplitude over the last minute (given by flag.flags) :param phase: Phase array :param this_receiver: Receiver class instance""" # Read the last breakpoint value OR create a new file if it is the first of the day ( current_time, current_p1, current_amp, current_phase, quiet, DP, ) = msf.get_lastbreakpoint(self.name, la_date, path_breakpoint) # Get last QUIET breakpoint ( quiet_breakpoint, quiet_p1, quiet_amp, quiet_phase, quiet_quiet, quiet_DP, ) = msf.get_lastbreakpoint(self.name, la_date, path_breakpoint, quiet=True) # If the station has not seen a quiet point in three hour, we force it to return to quiet if ( current_time - quiet_breakpoint > 3 and current_p1 < self.detection_threshold ): self.quiet = 1 self.DP = 0 self.DA = 0 print("Forced return to quiet after three hours") else: if quiet == 1: self.DA = 0 self.DP = 0 else: # No breakpoint has been detected, but we update DA and DP # Get last QUIET breakpoint self.DA = amp - quiet_amp phase_nowifquiet = quiet_phase + quiet_p1 * ( current_time - quiet_breakpoint ) self.DP = phase[-1] - phase_nowifquiet return