#! /usr/bin/env python3
# coding=utf-8
"""
Collection of common helper functions.
"""
"""
Author: radenz@tropos.de
"""
import datetime
import numpy as np
import scipy
from numba import jit
[docs]def list_of_elem(elem, length):
"""return a list of given length of given elements"""
return [elem for i in range(length)]
[docs]def epoch_to_timestamp(time_raw):
"""converts raw time (days since year 0) to unix timestamp
"""
offset = 719529 # offset between 1970-1-1 und 0000-1-1
time = (time_raw - offset) * 86400
return time
[docs]def dt_to_ts(dt):
"""convert a datetime to unix timestamp"""
# timestamp_midnight = int((datetime.datetime(self.dt[1].year, self.dt[1].month, self.dt[1].day) - datetime.datetime(1970, 1, 1)) / datetime.timedelta(seconds=1)) #python3
return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
[docs]def ts_to_dt(ts):
"""convert a unix timestamp to datetime"""
return datetime.datetime.utcfromtimestamp(ts)
[docs]def masked_to_plain(array):
"""unpack the masked array into plain versions"""
if isinstance(array, np.ma.MaskedArray):
mask = array.mask
mask = mask if mask.shape == array.data.shape else np.zeros_like(array).astype(bool)
return array.data, mask
else:
return array, np.zeros_like(array).astype(bool)
[docs]def divide_bounds(bounds):
"""
divide_bounds([[10,20],[20,25],[25,30],[40,50]])
=> noise_sep [[10, 30], [40, 50]], internal [20, 25]
"""
bounds = list(sorted(flatten(bounds)))
occ = dict((k, (bounds).count(k)) for k in set(bounds))
internal = [k for k in occ if occ[k] == 2]
noise_sep = [b for b in bounds if b not in internal]
noise_sep = [[noise_sep[i], noise_sep[i+1]] for i in \
range(0, len(noise_sep)-1,2)]
return noise_sep, internal
[docs]@jit(nopython=True, fastmath=True)
def lin2z(array):
"""calculate dB values from a array of linear"""
return 10*np.log10(array)
[docs]@jit(nopython=True, fastmath=True)
def z2lin(array):
"""calculate linear values from a array of dBs"""
return 10**(array/10.)
[docs]def fill_with(array, mask, fill):
"""fill array with fill value where mask is True"""
filled = array.copy()
filled[mask] = fill
return filled
def round_odd_old(f):
return int(np.ceil(f/2.) * 2 + 1)
[docs]def round_odd(f):
"""round to odd number
:param f: float number to be rounded to odd number
"""
return round(f) if round(f) % 2 == 1 else round(f) + 1
[docs]def flatten(xs):
"""flatten inhomogeneous deep lists
e.g. ``[[1,2,3],4,5,[6,[7,8],9],10]``
"""
result = []
if isinstance(xs, (list, tuple)):
for x in xs:
result.extend(flatten(x))
else:
result.append(xs)
return result
[docs]def filter_none_rec(e):
"""filter None values from nested list"""
return list(filter(None, [filter_none_rec(y) for y in e])) if isinstance(e, list) else e
[docs]@jit(nopython=True, fastmath=True)
def gauss_func(x, m, sd):
"""calculate the gaussian function on a given grid
Args:
x (array): grid
m (float): mean
sd (float): standard deviation
Returns:
array
"""
a = 1. / (sd * np.sqrt(2. * np.pi))
return a * np.exp(-(x - m) ** 2 / (2. * sd ** 2))
# slightly different formulation for the tail filter
[docs]@jit(nopython=True, fastmath=True)
def gauss_func_offset(x, H, A, x0, sigma):
"""x, H, A, x0, sigma"""
return H + A * np.exp(-(x - x0) ** 2 / (2 * sigma ** 2))
def gauss_fit(x, y):
mean = sum(x * y) / sum(y)
sigma = np.sqrt(sum(y * (x - mean) ** 2) / sum(y))
#print('init guess ', [min(y), max(y)-min(y), mean, sigma])
popt, pcov = scipy.optimize.curve_fit(
gauss_func_offset, x, y,
p0=[min(y), max(y)-min(y), mean, sigma],
#bounds=[(-57, 9, -4, 0.4), (-35, 18, 4, 2)],
bounds=[(-np.inf, 9, -np.inf, -np.inf), (np.inf, 18, np.inf, np.inf)]
)
return popt, pcov
[docs]@jit(nopython=False, fastmath=True)
def estimate_noise(spec, mov_avg=1):
"""
Noise estimate based on Hildebrand and Sekhon (1974)
"""
i_noise = len(spec)
spec_sort = np.sort(spec)
for i in range(spec_sort.shape[0]):
partial = spec_sort[:i+1]
mean = partial.mean()
var = partial.var()
if var * mov_avg * 2 < mean**2.:
i_noise = i
else:
# remaining part of spectrum has no noise characteristic
break
noise_part = spec_sort[:i_noise+1]
# case no signal above noise
noise_sep = spec_sort[i_noise] if i_noise < spec.shape[0] else np.mean(noise_part)
return {'noise_mean': np.mean(noise_part),
'noise_sep': noise_sep,
'noise_var': np.var(noise_part),
'no_noise_bins': i_noise}
@jit(nopython=True, fastmath=True)
def estimate_mean_noise(spec, mov_avg=1):
i_noise = len(spec)
spec_sort = np.sort(spec)
for i in range(spec_sort.shape[0]):
partial = spec_sort[:i+1]
mean = partial.mean()
var = partial.var()
if var * mov_avg * 2 < mean**2.:
i_noise = i
else:
break
noise_part = spec_sort[:i_noise+1]
return np.mean(noise_part)
[docs]def estimate_noise_array(spectra_array):
"""
Wrapper for estimate_noise, to apply to a chunk of Doppler spectra
Args:
spectra_array (ndarray): 3D array of Doppler spectra
Returns:
mean noise for each time-height
"""
print('estimating noise...')
out = np.zeros(spectra_array.shape[:2])-999.0
for ts in range(spectra_array.shape[0]):
for rg in range(spectra_array.shape[1]):
out[ts, rg] = estimate_mean_noise(spectra_array[ts, rg, :])
return out