# Licensed under the GPLv3 - see LICENSE
"""Resampling of baseband signals."""
import numpy as np
from astropy import units as u
from astropy.utils import lazyproperty
from astropy.time import Time
from baseband_tasks.base import TaskBase, check_broadcast_to, PaddedTaskBase
from baseband_tasks.convolution import Convolve
__all__ = ['seek_float', 'ShiftAndResample', 'Resample', 'TimeDelay',
'ShiftSamples']
def to_sample(ih, offset):
"""The offset in units of samples."""
return u.Quantity(offset, copy=False).to_value(u.one, equivalencies=[
(u.one, u.Unit(1/ih.sample_rate))])
[docs]def seek_float(ih, offset, whence=0):
"""Get a float sample position.
Similar to ``ih.seek()``, but without rounding, and allowing
offsets that are different for different sample streams.
Parameters
----------
ih : stream handle
Handle of a stream reader or task.
offset : float, `~astropy.units.Quantity`, or `~astropy.time.Time`
Offset to move to. Can be an (float) number of samples,
an offset in time units, or an absolute time. Should be
broadcastable to the stream sample shape.
whence : {0, 1, 2, 'start', 'current', or 'end'}, optional
Like regular seek, the offset is taken to be from the start if
``whence=0`` (default), from the current position if 1,
and from the end if 2. One can alternativey use 'start',
'current', or 'end' for 0, 1, or 2, respectively. Ignored if
``offset`` is a time.
"""
if isinstance(offset, Time):
offset = (offset - ih.start_time).to(1./ih.sample_rate)
whence = 0
offset = to_sample(ih, offset)
check_broadcast_to(offset, ih.sample_shape)
if whence == 0 or whence == 'start':
return offset
elif whence == 1 or whence == 'current':
return ih.offset + offset
elif whence == 2 or whence == 'end':
return ih.shape[0] + offset
else:
raise ValueError("invalid 'whence'; should be 0 or 'start', 1 or "
"'current', or 2 or 'end'.")
[docs]class ShiftAndResample(Convolve):
r"""Shift and optionally resample a stream in time.
The shift is added to the sample times, and the stream is optionally
resampled to ensure a sample falls on the given offset.
If the shift corresponds to a time delay, and the signal was recorded
after mixing, the phases should be adjusted, which can be done by
passing in the local oscillator frequency. For an upper sideband, the
phase correction is,
.. math:: \phi = - \tau f_{lo}.
For the lower sideband, the rotation is in the opposite direction.
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis.
shift : float, float array-like, or `~astropy.units.Quantity`
Amount by which to shift samples in time, as (float) samples, or a
quantity with units of time. Should broadcast to the sample shape.
offset : float, `~astropy.units.Quantity`, or `~astropy.time.Time`
Offset that the output stream should include. Can be an absolute
time, or a (float) number of samples or time offset relative to the
start of the underlying stream. The default of `None` implies that
the output stream is free to adjust. Hence, if a single shift is
given, all that will happen is a change in ``start_time``. To
ensure the grid stays fixed, pass in ``0``.
whence : {0, 1, 2, 'start', 'current', or 'end'}, optional
Like regular seek, the offset is taken to be from the start if
``whence=0`` (default), from the current position if 1, and from the
end if 2. One can alternativey use 'start', 'current', or 'end' for
0, 1, or 2, respectively. Ignored if ``offset`` is a time.
lo : `~astropy.units.Quantity`, or `None`, optional
Local oscillator frequency. Should be passed in when the signal was
mixed down before sampling and the shift should be interpreted as a
time delay (rather than a clock correction). For raw data, this can
usually just be ``if.frequency``. But for channelized data, the
actual frequency needs to be passed in. If data were recorded
without mixing (like for CHIME), no phase correction is necessary
and one should pass in `None`. Default: `None`.
pad : int, optional
Padding to apply on each side when shifting data. This sets the size
of the sinc function which which the data is convolved (see below).
Numerical tests suggest that with the default of 64, the accuracy
is better than 0.1%.
samples_per_frame : int, optional
Number of resampled samples which should be produced in one go.
The number of input samples used will be larger by ``2*pad+1``.
If not given, works on the larger of the samples per frame from
If not given, the larger of the sampler per frame in the underlying
stream or 14 times the padding (to ensure ~87.5% efficiency).
Notes
-----
The precision of the shifting and resampling is controlled by ``pad``,
as it sets the length of the response, which consists of a sinc function
combined with a Hann window,
.. math:: R(x) &= {\rm sinc}(x-s) \cos^2(\pi(x-s)/L),\\
-{\rm pad} &\le x \le {\rm pad},\quad L = 2{\rm pad} + 2
Here, :math:`s` is the fractional pixel offset. Note that :math:`L`
is chosen such that :math:`R(\pm{\rm pad})` is still non-zero.
The convolution is done in the Fourier domain, and the Hann window,
combined with with the padding done when reading, ensures there is no
aliasing between the front and the back of each frame. There is,
however, aliasing near the Nyquist frequency. For most recorded signals,
this will not be important, as the band-pass filter will likely have
ensured there is very little signal near the band edges. For
channelized data, however, it may be more problematic and some
pre-filtering stage may be necessary.
See Also
--------
Resample : resample a stream to a new grid, without time shifts
ShiftSamples : shift channels by integer number of samples
TimeDelay : delay a complex stream, changing phases but no resampling
baseband_tasks.base.SetAttribute : change start time without resampling
"""
def __init__(self, ih, shift, offset=None, whence='start', *,
lo=None, pad=64, samples_per_frame=None):
self._shift = to_sample(ih, shift)
shift_mean = np.mean(self._shift)
if offset is None:
# Just use the average shift as a time shift, and do
# the individual shifts relative to it.
d_time = shift_mean
self._offset = None
else:
# Ensure the time shift lands on the grid given by offset,
# but remove as many integer cycles as possible, such that
# individual shifts are as symmetric around 0 as possible.
self._offset = seek_float(ih, offset, whence)
d_time = self._offset + np.around(shift_mean - self._offset)
# The remainder we actually need to shift.
sample_shift = np.array(self._shift - d_time, ndmin=ih.ndim-1,
copy=False, subok=True)
response = self._windowed_sinc(pad, sample_shift)
if samples_per_frame is None:
samples_per_frame = max(ih.samples_per_frame, pad * 14)
super().__init__(ih, response,
offset=pad - int(round(sample_shift.min())),
samples_per_frame=samples_per_frame)
self._lo = lo
self._start_time += d_time / ih.sample_rate
def _windowed_sinc(self, pad, sample_shift):
"""Response for shifting samples.
A combination of a sinc function, and a Hanning filter that ensures
the response drops to zero at the edges.
"""
ishift_max = int(round(sample_shift.max()))
ishift_min = int(round(sample_shift.min()))
n_result = 2*pad + 1 + ishift_max - ishift_min
result = np.zeros((n_result,) + sample_shift.shape)
for shift, res in zip(sample_shift.ravel(),
result.reshape(n_result, -1).T):
ishift = int(round(shift.item()))
x = np.arange(-pad, pad+1) - (shift - ishift)
res[ishift - ishift_min:ishift - ishift_max + n_result] = (
np.sinc(x) * np.cos(np.pi*x/(2*pad+2))**2)
return result
# def _window(self, pad):
# # Note: tried various window functions (ignoring the shift in them),
# # but Hann was clearly superior for the test_sampling test cases.
# # return np.hanning(2*pad+3)[1:-1] # Remove final zeros.
# # lanczos
# # return np.sinc(np.arange(-pad, pad+1)/pad)
# # blackman
# # a0, a1, a2, a3 = 0.42, 0.5, 0.08, 0.
# # nuttall
# a0, a1, a2, a3 = 0.355768, 0.487396, 0.144232, 0.012604
# nbyN = np.arange(2*pad+1)/(2*pad)
# return (a0
# - a1*np.cos(2*np.pi*nbyN)
# + a2*np.cos(4*np.pi*nbyN)
# - a3*np.cos(6*np.pi*nbyN))
@lazyproperty
def _ft_response(self):
"""Phase offsets of the Fourier-transformed frame."""
if self._lo is None:
return super()._ft_response
else:
lo_phase_delay = (self._shift / self.sample_rate * u.cycle
* self._lo * self.sideband)
return (super()._ft_response
* np.exp(-1j * lo_phase_delay.to_value(u.rad)))
def _repr_item(self, key, default, value=None):
# Our 'offset' input argument should not be looked up as 'offset',
# since that can vary.
if key == 'offset':
value = self._offset
return super()._repr_item(key, default=default, value=value)
[docs]class Resample(ShiftAndResample):
"""Resample a stream such that a sample occurs at the given offset.
The offset pointer is left at the requested time, so one can think of
this task as a precise version of the ``seek()`` method.
Generally, the stream start time will change, by up to one sample, and
the stream length reduced by one frame. The precision with which the
resampling is done depends on ``pad``.
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis.
offset : float, `~astropy.units.Quantity`, or `~astropy.time.Time`
Offset to ensure the output stream includes. Can an absolute time,
or a (float) number of samples or time offset relative to the start
of the underlying stream.
whence : {0, 1, 2, 'start', 'current', or 'end'}, optional
Like regular seek, the offset is taken to be from the start if
``whence=0`` (default), from the current position if 1,
and from the end if 2. One can alternativey use 'start',
'current', or 'end' for 0, 1, or 2, respectively. Ignored if
``offset`` is a time.
pad : int, optional
Padding to apply on each side when shifting data. This sets the size
of the sinc function which which the data is convolved (see above).
The default of 64 ensures an accuracy of better than 0.1%.
samples_per_frame : int, optional
Number of resampled samples which should be produced in one go.
The number of input samples used will be larger by ``2*pad+1``.
If not given, works on the larger of the samples per frame from
If not given, the larger of the sampler per frame in the underlying
stream or 14 times the padding (to ensure ~87.5% efficiency).
See Also
--------
ShiftAndResample : also shift a stream, possibly including phase delay
Examples
--------
Suppose one wanted to read the 8 samples surrounding a precise time::
>>> from baseband_tasks.sampling import Resample
>>> from astropy.time import Time
>>> from baseband import data, vdif
>>> fh = vdif.open(data.SAMPLE_VDIF)
>>> texact = Time('2014-06-16T05:56:07.000123456')
>>> ((texact - fh.start_time) * fh.sample_rate).to(1)
... # doctest: +FLOAT_CMP
<Quantity 3950.59201992>
>>> rh = Resample(fh, texact)
>>> rh.time.isot
'2014-06-16T05:56:07.000123456'
>>> rh.seek(-4, 1)
3883
>>> data = rh.read(8)
>>> data[:, 4] # doctest: +SKIP
array([ 3.8278387 , 2.0259624 , -0.3738842 , -1.2480919 , -0.04606577,
2.6100893 , 3.4867156 , 3.2312815 ], dtype=float32)
For comparison, if one uses the underlying filehandle directly, one gets
the data only at the approximate time::
>>> fh.seek(texact)
3951
>>> fh.time.isot
'2014-06-16T05:56:07.000123469'
>>> fh.seek(-4, 1)
3947
>>> data = fh.read(8)
>>> data[:, 4] # doctest: +FLOAT_CMP
array([ 3.316505, 1. , -1. , -1. , 1. , 3.316505,
3.316505, 3.316505], dtype=float32)
>>> fh.close()
"""
def __init__(self, ih, offset, whence='start', *,
pad=64, samples_per_frame=None):
super().__init__(ih, shift=0., offset=offset,
pad=pad, samples_per_frame=samples_per_frame)
self.seek(ih.start_time + self._offset / ih.sample_rate)
[docs]class TimeDelay(TaskBase):
r"""Delay a stream by a given amount, taking care of phase rotations.
The delay is added to the sample times (by adding to the ``start_time``
of the stream), and the sample phases are rotated as needed if the
signal was recorded after mixing with a local oscillator. For an upper
sideband, the phases are rotated by
.. math:: \phi = - \tau f_{lo}.
For the lower sideband, the rotation is in the opposite direction.
Note that the input data stream must be complex. For real-valued
streams, use `~baseband_tasks.sampling.ShiftAndResample` with
``shift`` as the delay, no ``offset``, and ``pad=0`` (this works for
complex data as well, but is slower as it involves fourier transforms).
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis.
delay : float, `~astropy.units.Quantity`
Delay to apply to all times. Can be (float) samples, or a
quantity with units of time.
lo : `~astropy.units.Quantity`, or `None`
Local oscillator frequency. For raw data, this can just be
``if.frequency``. But for channelized data, the actual
frequency needs to be passed in. If data were recorded without
mixing (like for CHIME), pass in `None`.
frequency : `~astropy.units.Quantity`, optional
Frequencies for each channel. Should be broadcastable to the
sample shape. By default, taken from the underlying stream.
(Note that these frequencies are not used in the calculations here.)
sideband : array, optional
Whether frequencies are upper (+1) or lower (-1) sideband.
Should be broadcastable to the sample shape. By default, taken
from the underlying stream. Assumed to be correct for the lo.
See Also
--------
ShiftAndResample : also resample a stream, or delay a real-valued stream
baseband_tasks.base.SetAttribute : change start time without phase delay
"""
def __init__(self, ih, delay, *, lo, frequency=None, sideband=None):
assert ih.complex_data, "Time delay only works on complex data."
self._delay = to_sample(ih, delay)
self._lo = lo
delay = self._delay / ih.sample_rate
super().__init__(ih, frequency=frequency, sideband=sideband)
self._start_time += delay
if lo is None:
self._phase_factor = None
else:
lo_phase_delay = delay * lo * self.sideband * u.cycle
self._phase_factor = np.exp(-1j * lo_phase_delay.to_value(u.rad)
).astype(ih.dtype)
[docs] def task(self, data):
if self._phase_factor is not None:
data *= self._phase_factor
return data
[docs]class ShiftSamples(PaddedTaskBase):
"""Shift channels in a stream by integer numbers of samples.
The shifts are interpreted as additive to the sample times, i.e.,
positive shifts imply that a stream will appear delayed in time.
Note that no resampling is done; for shifting by non-integer numbers of
samples, use `~baseband_tasks.sampling.ShiftAndResample`.
Parameters
----------
ih : task or `baseband` stream reader
Input data stream, with time as the first axis.
shift : int or float array-like, or `~astropy.units.Quantity`
Amount by which to shift samples along the stream. Normally integer,
but can also be float or quantity with units of time, in which case
the shifts will be rounded to the nearest integer. Should broadcast
to the sample shape. For instance, to shift along the one-but-last
axis with length ``N``, the shape of shift should be ``(N, 1)``.
samples_per_frame : int
Number of shifted samples which should be produced in one go.
The number of input samples used will be larger to avoid wrapping.
If not given, as produced by the minimum power of 2 of input
samples that yields at least 75% efficiency.
See Also
--------
ShiftAndResample : shift channels by fractional samples and resample
Resample : resample a stream to a new grid, without time shifts
"""
def __init__(self, ih, shift, *, samples_per_frame=None):
shift = self._shift = np.round(to_sample(ih, shift)).astype(int)
check_broadcast_to(shift, ih.sample_shape)
start_time = ih.start_time + shift.max() / ih.sample_rate
super().__init__(ih, pad_start=0, pad_end=shift.ptp(),
samples_per_frame=samples_per_frame,
start_time=start_time)
# Form the advanced index used to select the shifted samples.
# Start with one that just takes unshifted output elements for a
# single frame, then add shifts for the first (sample) dimension.
indices = np.ix_(np.arange(self.samples_per_frame),
*[np.arange(sh) for sh in self.sample_shape])
self._indices = (shift.max() - shift + indices[0],) + indices[1:]
[docs] def task(self, data):
return data[self._indices]