Source code for vlf4ions.detect_change_points

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Aug 27 11:42:43 2024

@author: pteysseyre
"""

import numpy as np


[docs] def find_break_point_realtime(the_time, data, delta=0.1): """ Based on the incremental algorithm presented by Guralnik & Srivastava (1999) :param the_time: time-array :param data: the phase data that has just been read (time-averaged & smoothed if needed) :param delta: in percent/100 (default : 0.1). This is the threshold over which we decide that a breakpoint is indeed a breakpoint and not due to noise. In Guralnik & Srivastava's paper, this is also called delta and only appears for the incremental algorithm. :return: - detec: Boolean, 'True' if a breakpoint was detected, 'False' if not - p1: Slope of the detected breakpoint """ L = len(data) if L > 9: # At least 9 points are necessary for this # Compute likelihood assuming no breakpoints: likelihood_without_bp, n = _compute_likelihood_criteria(0, L, the_time, data) # Compute the likelihood criteria, assuming a breakpoint 5 points ago: likelihood_with_bp, p1 = _compute_likelihood_criteria_several_change_points( the_time, data, np.array([0, L - 6, L - 1]) ) # Determine whether there is a new breakpoint detec = ( likelihood_without_bp - likelihood_with_bp ) / likelihood_without_bp > delta else: detec = False p1 = 0 return detec, p1
[docs] def detect_change_points(the_time, data, threshold): """Based on Guralnik, V. & Srivastava, J. (1999) Detect change_points in the 'data' array in post_processing (bash algorithm) :param the_time: time-array :param data: Data array :param threshold: Criterion used to end the search for breakpoints. (s in the article) :return: change_points: time of breakpoints found in the data array""" # Initialisation change_points = np.array([0, len(data)]) likelihood, dummy = _compute_likelihood_criteria( change_points[0], change_points[1], the_time, data ) last_likelihood = likelihood stopping_criterion = 1 loop_counter = 0 while stopping_criterion > threshold and loop_counter < 1e3: loop_counter += 1 N = len(change_points) - 1 # Number of segments for s in range(0, N): if change_points[s + 1] - change_points[s] > 5: new_candidate = _find_candidate_detection( change_points[s], change_points[s + 1], the_time, data ) temp_change_points = np.insert(change_points, s + 1, new_candidate) new_likelihood_criterion = ( _compute_likelihood_criteria_several_change_points( the_time, data, temp_change_points ) ) if new_likelihood_criterion < likelihood: likelihood = new_likelihood_criterion new_change_points = temp_change_points # Prepare for next iteration change_points = new_change_points stopping_criterion = (last_likelihood - likelihood) / last_likelihood return change_points
def _compute_likelihood_criteria_several_change_points(the_time, data, change_points): """Based on Guralnik, V., & Srivastava, J. (1999). Computes the total likelihood criterion for data, knowing that there are change_points at specified locations :param the_time: time-array :param data: Data in which to look for change points :param change_points: list of indices of the changepoints already computed. The first index is 0 (for the beggining of the 'data' array), the last is the number of elements in the 'data' array. :return: - total_L: total likelihood criterion - a: slope associated with the last breakpoint""" N = len(change_points) - 1 total_L = 0 for k in range(0, N): likelihood_criteria, a = _compute_likelihood_criteria( change_points[k], change_points[k + 1], the_time, data ) total_L += likelihood_criteria return total_L, a def _find_candidate_detection(i, j, the_time, data): """Based on Guralnik, V., & Srivastava, J. (1999)., Finds the best candidate to be a change-point in the array data[i:j]. :param i: indice of the start of the interval of interest :param j: indice of the end of the interval :param the_time: time-array :param data: data array :return: the indice in the array data (! not data[i:j]!) of this best candidate. """ optimal_likelihood_criteria = 1e14 # Minimum likelihood criteria. The point with the lowest likelihood criteria will be the best candidate split = 0 for k in range(i + 2, j - 2): likelihood_first_segment, dummy = _compute_likelihood_criteria( i, k, the_time, data ) likelihood_second_segment, dummy = _compute_likelihood_criteria( k + 1, j, the_time, data ) likelihood_all_segments = likelihood_first_segment + likelihood_second_segment if likelihood_all_segments < optimal_likelihood_criteria: optimal_likelihood_criteria = likelihood_all_segments split = k return split def _compute_likelihood_criteria(i, j, the_time, data): """Computes the likelihood criteria assuming an heteroscedastic error model. Based on Guralnik, V., & Srivastava, J. (1999, August). Event detection from time series data. In Proceedings of the fifth ACM SIGKDD international conference on Knowledge discovery and data mining (pp. 33-42). :param i: indice of the start of the interval of interest :param j: indice of the end of the interval :param the_time: time-array :param data: data array :return: - L, Likelihood criteria on the interval - a, slope of the linear fit for the data""" m = j - i + 1 # Number of elements in the the_time & data arrays a, b = np.polyfit( the_time[i : j + 1], data[i : j + 1], 1 ) # Use a linear model to fit the segment. THIS CAN BE CHANGED ! rss = np.sum(np.square(a * the_time[i : j + 1] + b - data[i : j + 1])) L = m * np.log(rss / m) return L, a