Commit f17d3943 authored by Neil Vaytet's avatar Neil Vaytet
Browse files

Merge branch 'add_old_peak_finding' into 'master'

Add old peak finding

See merge request !10
parents 7bbde84a b3a2cfbc
# Peak detection routine adapted from Marcos Duarte's detect_peaks
#
# """Detect peaks in data based on their amplitude and other features."""
#
#
# __author__ = "Marcos Duarte, https://github.com/demotu/BMC"
# __version__ = "1.0.5"
# __license__ = "MIT"
#
# Modified: 2019/01 - neil.vaytet@esss.se - European Spallation Source
import numpy as np
def detect_peaks(x, mph=None, mpd=1, threshold=0, edge="rising",
kpsh=False, valley=False):
"""Detect peaks in data based on their amplitude and other features.
Parameters
----------
x : 1D array_like
data.
mph : {None, number}, optional (default = None)
detect peaks that are greater than minimum peak height (if parameter
`valley` is False) or peaks that are smaller than maximum peak height
(if parameter `valley` is True).
mpd : positive integer, optional (default = 1)
detect peaks that are at least separated by minimum peak distance (in
number of data).
threshold : positive number, optional (default = 0)
detect peaks (valleys) that are greater (smaller) than `threshold`
in relation to their immediate neighbors.
edge : {None, 'rising', 'falling', 'both'}, optional (default = 'rising')
for a flat peak, keep only the rising edge ('rising'), only the
falling edge ('falling'), both edges ('both'), or don't detect a
flat peak (None).
kpsh : bool, optional (default = False)
keep peaks with same height even if they are closer than `mpd`.
valley : bool, optional (default = False)
if True (1), detect valleys (local minima) instead of peaks.
Returns
-------
ind : 1D array_like
indeces of the peaks in `x`.
Notes
-----
The detection of valleys instead of peaks is performed internally by simply
negating the data: `ind_valleys = detect_peaks(-x)`
The function can handle NaN's
See this IPython Notebook [1]_.
References
----------
.. [1] http://nbviewer.ipython.org/github/demotu/BMC/blob/master/notebooks/DetectPeaks.ipynb
Examples
--------
>>> from detect_peaks import detect_peaks
>>> x = np.random.randn(100)
>>> x[60:81] = np.nan
>>> # detect all peaks and plot data
>>> ind = detect_peaks(x, show=True)
>>> print(ind)
>>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5
>>> # set minimum peak height = 0 and minimum peak distance = 20
>>> detect_peaks(x, mph=0, mpd=20, show=True)
>>> x = [0, 1, 0, 2, 0, 3, 0, 2, 0, 1, 0]
>>> # set minimum peak distance = 2
>>> detect_peaks(x, mpd=2, show=True)
>>> x = np.sin(2*np.pi*5*np.linspace(0, 1, 200)) + np.random.randn(200)/5
>>> # detection of valleys instead of peaks
>>> detect_peaks(x, mph=-1.2, mpd=20, valley=True, show=True)
>>> x = [0, 1, 1, 0, 1, 1, 0]
>>> # detect both edges
>>> detect_peaks(x, edge='both', show=True)
>>> x = [-2, 1, -2, 2, 1, 1, 3, 0]
>>> # set threshold = 2
>>> detect_peaks(x, threshold = 2, show=True)
Version history
---------------
'1.0.5':
The sign of `mph` is inverted if parameter `valley` is True
"""
x = np.atleast_1d(x).astype("float64")
if x.size < 3:
return np.array([], dtype=int)
if valley:
x = -x
if mph is not None:
mph = -mph
# find indices of all peaks
dx = x[1:] - x[:-1]
# handle NaN's
indnan = np.where(np.isnan(x))[0]
if indnan.size:
x[indnan] = np.inf
dx[np.where(np.isnan(dx))[0]] = np.inf
ine, ire, ife = np.array([[], [], []], dtype=int)
if not edge:
ine = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) > 0))[0]
else:
if edge.lower() in ["rising", "both"]:
ire = np.where((np.hstack((dx, 0)) <= 0) & (np.hstack((0, dx)) > 0))[0]
if edge.lower() in ["falling", "both"]:
ife = np.where((np.hstack((dx, 0)) < 0) & (np.hstack((0, dx)) >= 0))[0]
ind = np.unique(np.hstack((ine, ire, ife)))
# handle NaN's
if ind.size and indnan.size:
# NaN's and values close to NaN's cannot be peaks
ind = ind[np.in1d(ind, np.unique(np.hstack((indnan, indnan-1, indnan+1))), invert=True)]
# first and last values of x cannot be peaks
if ind.size and ind[0] == 0:
ind = ind[1:]
if ind.size and ind[-1] == x.size-1:
ind = ind[:-1]
# remove peaks < minimum peak height
if ind.size and mph is not None:
ind = ind[x[ind] >= mph]
# remove peaks - neighbors < threshold
if ind.size and threshold > 0:
dx = np.min(np.vstack([x[ind]-x[ind-1], x[ind]-x[ind+1]]), axis=0)
ind = np.delete(ind, np.where(dx < threshold)[0])
# detect small peaks closer than minimum peak distance
if ind.size and mpd > 1:
ind = ind[np.argsort(x[ind])][::-1] # sort ind by peak height
idel = np.zeros(ind.size, dtype=bool)
for i in range(ind.size):
if not idel[i]:
# keep peaks with the same height if kpsh is True
idel = idel | (ind >= ind[i] - mpd) & (ind <= ind[i] + mpd) \
& (x[ind[i]] > x[ind] if kpsh else True)
idel[i] = 0 # Keep current peak
# remove the small peaks and sort back the indices by their occurrence
ind = np.sort(ind[~idel])
return ind
......@@ -4,9 +4,13 @@ from mantid.simpleapi import *
from scipy.ndimage import gaussian_filter1d
try:
from scipy.signal import find_peaks
peak_finding = "scipy"
except ImportError:
print("Warning: scipy.signal.find_peaks could not be imported. "
"Some functionality will be missing. "
from .detect_peaks import detect_peaks
peak_finding = "old"
print("Deprecation warning: scipy.signal.find_peaks could not be imported. "
"Reverting to the old peak finding method. "
"This will soon be deprecated. "
"Install scipy>=1.1.0 to fix this problem.")
################################################################################
......@@ -84,18 +88,18 @@ def check_peaks(peaks, nwindows):
"""
This function check if the number of peaks found is the correct one.
"""
nvalleys = len(peaks)-2
if nvalleys > 2*(nwindows - 1):
print("Way too many valleys: should be {}, found {}.".format(nwindows-1, nvalleys))
npeaks = len(peaks)-2
if npeaks > 2*(nwindows - 1):
print("Way too many peaks: should be {}, found {}.".format(nwindows-1, npeaks))
return 3 # Continue looping
elif nvalleys > (nwindows - 1):
print("Too many valleys: should be {}, found {}.".format(nwindows-1, nvalleys))
elif npeaks > (nwindows - 1):
print("Too many peaks: should be {}, found {}.".format(nwindows-1, npeaks))
return 2 # Continue looping
elif nvalleys < (nwindows - 1):
print("Too few valleys: should be {}, found {}.".format(nwindows-1, nvalleys))
elif npeaks < (nwindows - 1):
print("Too few peaks: should be {}, found {}.".format(nwindows-1, npeaks))
return 1 # Continue looping
else:
print("OK: Found {} valleys.".format(nvalleys))
print("OK: Found {} peaks.".format(npeaks))
return 0 # Exit the loop
################################################################################
......@@ -225,30 +229,56 @@ def get_wfm_windows(workspace=None, data=None, filename=None, nwindows=6,
break
print(i_start,i_end, lbound, rbound)
# Fine the valleys using the scipy find_peaks tool.
# The prominence parameter is probably the most relevant here.
prominence = 0.05
delta_prominence = 1.5
if peak_finding == "scipy":
# Find the valleys using the scipy find_peaks tool.
# The prominence parameter is probably the most relevant here.
prominence = 0.05
else:
# Determine minimum peak distance (mpd):
# We know there should be 6 windows between the leading and trailing edges.
# Since the windows have approximately all the same size, we can estimate a
# minimum peak distance to be close to the distance between leading and trailing
# edges divided by 6 (but slightly less to be on the safe side).
# Note that for the `detect_peaks` function, mpd is in units of data index, not
# time-of-flight.
mpd = int(0.75 * float(i_end - i_start) / nwindows)
delta = 1.5
loop = 2
max_loops = 10
nloops = 0
while loop > 0:
nloops += 1
print("The peak prominence is:", prominence)
# Find valleys using `detect_peaks` function from scipy.signal
peaks, _ = find_peaks(-y[lbound:rbound+1], prominence=prominence*(ymax-ymin))
# Added start and end to peaks array
peaks = np.concatenate(([i_start], peaks+lbound, [i_end]))
if peak_finding == "scipy":
print("The peak prominence is:", prominence)
# Find valleys using `find_peaks` function from scipy.signal
peaks, _ = find_peaks(-y[lbound:rbound+1], prominence=prominence*(ymax-ymin))
else:
# Find valleys using `detect_peaks` function from Marcos Duarte
print("The minimum peak distance (mpd) is:", mpd)
peaks = detect_peaks(y[lbound:rbound+1], mpd=mpd, valley=True)
# Make sure we only have peaks between left and right edges
peaks += lbound
peaks = peaks[np.where(np.logical_and(peaks > i_start, peaks < i_end))]
# Add start and end to peaks array
peaks = np.concatenate(([i_start], peaks, [i_end]))
# Check how many peaks were found
loop = check_peaks(peaks, nwindows)
# If loop is 3, it means way too many peaks were found, so we re-run
# the peak search having increased the prominence
if loop == 3:
print("Increasing the peak prominence")
prominence *= delta_prominence
if peak_finding == "scipy":
print("Increasing the peak prominence")
prominence *= delta
else:
print("Increasing the minimum peak distance")
mpd = int(mpd * delta)
# Try to remove peaks if there are too many
if loop == 2:
print("Too many peaks found. Attempting automatic dismissal")
print("Attempting automatic dismissal")
# if the value at the bottom of the valley is over the lowest window
# average, then this is not a real valley
removed_a_peak = True
......@@ -257,7 +287,7 @@ def get_wfm_windows(workspace=None, data=None, filename=None, nwindows=6,
for p in range(1, len(peaks)-1):
rmean = np.average(y[peaks[p]:peaks[p+1]])
lmean = np.average(y[peaks[p-1]:peaks[p]])
mean = min(lmean,rmean)
mean = min(lmean, rmean)
if y[peaks[p]] > mean:
print("Removed peak number {} at x,y = {},{}".format(p,x[peaks[p]],y[peaks[p]]))
peaks = np.delete(peaks, p)
......@@ -267,8 +297,12 @@ def get_wfm_windows(workspace=None, data=None, filename=None, nwindows=6,
loop = check_peaks(peaks, nwindows)
# Try to reduce the peak prominence if there are too few peaks
if loop == 1:
print("Reducing the peak prominence")
prominence /= delta_prominence
if peak_finding == "scipy":
print("Reducing the peak prominence")
prominence /= delta
else:
print("Reducing the minimum peak distance")
mpd = int(mpd / delta)
# Exit if maximum number of loops has been reached
if nloops > max_loops:
loop = 0
......@@ -344,7 +378,7 @@ def get_wfm_windows(workspace=None, data=None, filename=None, nwindows=6,
ax.plot(x[peaks[-1]], y[peaks[-1]], "o", color="yellow", label="Trailing edge")
ax.set_xlabel("Arrival time at detector")
ax.set_ylabel("Amplitude")
ax.set_ylim([0.0,1.05*np.amax(data[:,1])])
ax.set_ylim([ymin,1.05*np.amax(data[:,1])])
ax.plot(x[peaks[0]], -ymax, "o", color="r",label="Valleys")
ax.legend(loc=(0,1.02),ncol=4, fontsize=10)
ax2 = fig.add_subplot(122)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment