Source code for baseband_tasks.base

# Licensed under the GPLv3 - see LICENSE

import inspect
import operator
import types
import warnings

import numpy as np
from astropy import units as u
from astropy.utils.metadata import MetaData


__all__ = ['Base', 'BaseTaskBase', 'TaskBase', 'PaddedTaskBase',
           'SetAttribute', 'Task']


META_ATTRIBUTES = {'frequency', 'sideband', 'polarization'}


def check_broadcast_to(value, sample_shape):
    """Broadcast values to the given shape.

    Like `~numpy.broadcast_to`, but with an addition to any error message.
    """
    try:
        broadcast = np.broadcast_to(value, sample_shape, subok=True)
    except ValueError as exc:
        exc.args += ("value cannot be broadcast to sample shape",)
        raise
    return broadcast


def simplify_shape(value):
    """Replace axes that contain only duplicates with broadcast values.

    For each axis, get first element of the sample, and keep only it if
    all other elements are the same (numpy broadcasting rules will ensure
    any operations using the result will work correctly).
    """
    for axis in range(value.ndim):
        value_0 = value[(slice(None),) * axis + (slice(0, 1),)]
        if value.strides[axis] == 0 or np.all(value == value_0):
            value = value_0
    # Remove leading ones, which are not needed in numpy broadcasting.
    first_not_unity = next((i for (i, s) in enumerate(value.shape)
                            if s > 1), value.ndim)
    # Make copy to ensure we're not a view of the input value (so
    # things cannot get changed under our feet).
    return value.reshape(value.shape[first_not_unity:]).copy()


def getattr_if_none(ih, attr, value=None, *, required=True, **kwargs):
    """Get an attribute if no default is provided.

    Like `getattr`, but look-up will only happen if the default is `None`
    and ``attr`` is not provided as a keyword argument either.

    Parameters
    ----------
    ih : stream
        Object to get the attribute from.
    attr : str
        Attribute to get if default is None.
    value : object
        Default value.  If not None, directly returned.
    required : bool
        Whether the attribute should exist the value passed in is `None`.
        Default `True`.
    **kwargs
        Keyword arguments.  If attr is among them, will be returned.
    """
    if value is None:
        value = kwargs.get(attr, None)
        if value is None:
            value = getattr(ih, attr, None)

    if required and value is None:
        raise TypeError(f"{attr!r} should either be defined by the "
                        "underlying stream or passed in.")
    return value


[docs]class Base: """Base class of all tasks and generators. Following the design of `baseband` stream readers, features properties describing the size, shape, data type, sample rate and start/stop times of the task's output. Also defines methods to move a sample pointer across the output data in units of either complete samples or time. Subclasses should define ``_read_frame``: method to read (or generate) a single block of data. Parameters ---------- shape : tuple, optional Overall shape of the stream, with first entry the total number of complete samples, and the remainder the sample shape. start_time : `~astropy.time.Time` Start time of the stream. sample_rate : `~astropy.units.Quantity` Rate at which complete samples are produced. samples_per_frame : int, optional Number of samples dealt with in one go. The number of complete samples (``shape[0]``) should be an integer multiple of this. dtype : `~numpy.dtype`, optional Dtype of the samples. --- **kwargs : meta data for the stream, which usually include frequency : `~astropy.units.Quantity`, optional Frequencies for each channel. Should be broadcastable to the sample shape. sideband : array, optional Whether frequencies are upper (+1) or lower (-1) sideband. Should be broadcastable to the sample shape. polarization : array or (nested) list of char, optional Polarization labels. Should broadcast to the sample shape, i.e., the labels are in the correct axis. For instance, ``['X', 'Y']``, or ``[['L'], ['R']]``. """ # Initial values for sample and frame pointers, etc. offset = 0 _frame_index = None _frame = None closed = False meta = MetaData() def __init__(self, shape, start_time, sample_rate, *, samples_per_frame=1, dtype=np.complex64, **kwargs): self._shape = shape self._start_time = start_time self._samples_per_frame = operator.index(samples_per_frame) self._sample_rate = sample_rate self._dtype = np.dtype(dtype, copy=False) if len({'frequency', 'sideband'}.difference(kwargs)) == 1: raise ValueError('frequency and sideband should both ' 'be passed in.') attributes = {} for attr, value in kwargs.items(): if attr in META_ATTRIBUTES: if value is not None: if attr == 'sideband': value = np.where(value > 0, np.int8(1), np.int8(-1)) attributes[attr] = self._check_shape(value) else: raise TypeError('__init__() got unexpected keyword argument ' f'{attr!r}') if attributes: self.meta.setdefault('__attributes__', {}).update(attributes) def __getattr__(self, attr): if attr in META_ATTRIBUTES: value = self.meta.get('__attributes__', {}).get(attr, None) if value is None: raise AttributeError(f"{attr} not set.") else: return value else: return super().__getattr__(attr) def __dir__(self): return sorted(META_ATTRIBUTES.union(super().__dir__())) def _repr_item(self, key, default, value=None): """Representation of one argument. Subclasses can override this, either to return something else than the base key=value or to set a different default for specific keys. """ if value is None: value = getattr(self, key, None) if value is None: value = getattr(self, '_' + key, None) if value is None: return None if default is not inspect._empty: try: if np.all(value == default): return None except Exception: pass return f"{key}={value}".replace('\n', ',') def __str__(self): name = self.__class__.__name__ pars = inspect.signature(self.__class__).parameters overrides = [self._repr_item(key, par.default) for key, par in pars.items()] overrides = ', '.join([override for override in overrides if override]) return f"{name}({overrides})" def __repr__(self): """Representation which lists non-default arguments. Finds possible arguments by inspection of the whole class hierarchy (as long as kwargs are passed along) and creates a list of all whose values on the instance are different from the default. Subclasses can override the assumed default and what to return in _repr_item. """ name = self.__class__.__name__ pars = {} for cls in self.__class__.__mro__: for key, par in inspect.signature(cls).parameters.items(): pars.setdefault(key, par) if 'kwargs' not in pars or cls is Base: break overrides = [self._repr_item(key, par.default) for key, par in pars.items()] if cls is Base and '__attributes__' in self.meta: overrides.extend([self._repr_item(key, None) for key in self.meta['__attributes__'].keys() if key not in pars]) overrides = (',\n '+' '*len(name)).join( [override for override in overrides if override]) return f"{name}({overrides})" def _check_shape(self, value): """Check that value can be broadcast to the sample shape.""" broadcast = check_broadcast_to(value, self.sample_shape) return simplify_shape(broadcast) @property def shape(self): """Shape of the output.""" return self._shape @property def sample_shape(self): """Shape of a complete sample.""" return self.shape[1:] @property def samples_per_frame(self): """Number of samples per frame of data. For compatibility with file readers, to help indicate what a nominal chunk of data is. """ return self._samples_per_frame @property def size(self): """Number of component samples in the output.""" prod = 1 for dim in self.shape: prod *= dim return prod @property def ndim(self): """Number of dimensions of the output.""" return len(self.shape) @property def dtype(self): """Data type of the output.""" return self._dtype @property def complex_data(self): return self._dtype.kind == 'c' @property def sample_rate(self): """Number of complete samples per second.""" return self._sample_rate @property def start_time(self): """Start time of the output. See also `time` and `stop_time`. """ # We don't just return self._start_time so classes like Integrate # can get correct results by just overriding _tell_time. return self._tell_time(0) @property def time(self): """Time of the sample pointer's current offset in the output. See also `start_time` and `stop_time`. """ return self._tell_time(self.offset) @property def stop_time(self): """Time at the end of the output, just after the last sample. See also `start_time` and `time`. """ return self._tell_time(self.shape[0])
[docs] def seek(self, offset, whence=0): """Change the sample pointer position. This works like a normal filehandle seek, but the offset is in samples (or a relative or absolute time). Parameters ---------- offset : int, `~astropy.units.Quantity`, or `~astropy.time.Time` Offset to move to. Can be an (integer) number of samples, an offset in time units, or an absolute time. For the latter two, the pointer will be moved to the nearest integer sample. whence : {0, 1, 2, 'start', 'current', or 'end'}, optional Like regular seek, the offset is taken to be from the start if ``whence=0`` (default), from the current position if 1, and from the end if 2. One can alternativey use 'start', 'current', or 'end' for 0, 1, or 2, respectively. Ignored if ``offset`` is a time. """ try: offset = operator.index(offset) except Exception: try: offset = offset - self.start_time except Exception: pass else: whence = 0 offset = int((offset * self.sample_rate).to(u.one).round()) if whence == 0 or whence == 'start': self.offset = offset elif whence == 1 or whence == 'current': self.offset += offset elif whence == 2 or whence == 'end': self.offset = self.shape[0] + offset else: raise ValueError("invalid 'whence'; should be 0 or 'start', 1 or " "'current', or 2 or 'end'.") return self.offset
[docs] def tell(self, unit=None): """Current offset in the file. Parameters ---------- unit : `~astropy.units.Unit` or str, optional Time unit the offset should be returned in. By default, no unit is used, i.e., an integer enumerating samples is returned. For the special string 'time', the absolute time is calculated. Returns ------- offset : int, `~astropy.units.Quantity`, or `~astropy.time.Time` Offset in current file (or time at current position). """ if unit is None: return self.offset # "isinstance" avoids costly comparisons of an actual unit with 'time'. if not isinstance(unit, u.UnitBase) and unit == 'time': return self._tell_time(self.offset) return (self.offset / self.sample_rate).to(unit)
def _tell_time(self, offset): """Calculate time for given offset. Used for ``start_time``, ``time``, ``stop_time`` and ``tell(unit='time')``. Simple implementation is present mostly so subclasses like Integration and Stack can override as appropriate. """ return self._start_time + offset / self.sample_rate
[docs] def read(self, count=None, out=None): """Read a number of complete samples. Parameters ---------- count : int or None, optional Number of complete samples to read. If `None` (default) or negative, the number of samples left. Ignored if ``out`` is given. out : None or array, optional Array to store the samples in. If given, ``count`` will be inferred from the first dimension; the remaining dimensions should equal `sample_shape`. Returns ------- out : `~numpy.ndarray` of float or complex The first dimension is sample-time, and the remaining ones are as given by `sample_shape`. """ if self.closed: raise ValueError("I/O operation on closed stream.") samples_left = self.shape[0] - self.offset if out is None: if count is None or count < 0: count = max(0, samples_left) out = np.empty((count,) + self.sample_shape, dtype=self.dtype) else: assert out.shape[1:] == self.sample_shape, ( "'out' must have trailing shape {}".format(self.sample_shape)) count = out.shape[0] if count > samples_left: raise EOFError("cannot read from beyond end of input.") offset0 = self.offset sample = 0 while sample < count: # For current position, get frame plus offset in that frame. frame, sample_offset = self._get_frame(self.offset) nsample = min(count - sample, len(frame) - sample_offset) data = frame[sample_offset:sample_offset + nsample] # Copy to relevant part of output. out[sample:sample + nsample] = data sample += nsample # Explicitly set offset (leaving get_frame free to adjust it). self.offset = offset0 + sample return out
def _get_frame(self, offset): """Get a frame that includes given offset. Finds the index corresponding to the needed frame, assuming frames are all the same length. If not already cached, it retrieves a frame by calling ``self._read_frame(index)``. Parameters ---------- offset : int Offset in the stream for which a frame should be found. Returns ------- frame : `~baseband.base.frame.FrameBase` Frame holding the sample at ``offset``. sample_offset : int Offset within the frame corresponding to ``offset``. """ frame_index, sample_offset = divmod(offset, self.samples_per_frame) if frame_index != self._frame_index: # Read the frame required. Set offset to start so that _read_frame # can count on tell() being correct. self.offset = frame_index * self.samples_per_frame self._frame = self._read_frame(frame_index) self._frame_index = frame_index return self._frame, sample_offset def __getitem__(self, item): from .shaping import GetSlice return GetSlice(self, item) def __array__(self, dtype=None): old_offset = self.tell() try: self.seek(0) return np.array(self.read(), dtype=dtype, copy=False) finally: self.seek(old_offset) def __array_ufunc__(self, *args, **kwargs): return NotImplemented def __array_function__(self, *args, **kwargs): return NotImplemented def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.closed = True self._frame = None # clear possibly cached frame
[docs]class BaseTaskBase(Base): """Base for all classes that operate on underlying streams. Following the design of `baseband` stream readers, features properties describing the size, shape, data type, sample rate and start/stop times of the task's output. Also defines methods to move a sample pointer across the output data in units of either complete samples or time. Subclasses should define ``_read_frame``: method to read and process a single block of data By default, all parameters are taken from the underlying stream. Note that no consistency checks are done between the parameters passed in and those of the underlying stream; the appropriate base class for most tasks is `~baseband_tasks.base.TaskBase`. Parameters ---------- ih : stream handle Handle of a stream reader or another task. ih_samples_per_frame : int, optional Number of input samples which should be dealt with in one go. shape : tuple, optional Overall shape of the stream, with first entry the total number of complete samples, and the remainder the sample shape. start_time : `~astropy.time.Time`, optional Start time of the stream. sample_rate : `~astropy.units.Quantity`, optional With units of a rate. samples_per_frame : int, optional Number of output samples produced per frame. By default, equal to the number of input samples. dtype : `~numpy.dtype`, optional Output dtype. --- **kwargs : meta data for the stream, which usually include frequency : `~astropy.units.Quantity`, optional Frequencies for each channel. Should be broadcastable to the sample shape. sideband : array, optional Whether frequencies are upper (+1) or lower (-1) sideband. Should be broadcastable to the sample shape. polarization : array or (nested) list of char, optional Polarization labels. Should broadcast to the sample shape, i.e., the labels are in the correct axis. For instance, ``['X', 'Y']``, or ``[['L'], ['R']]``. """ def __init__(self, ih, *, ih_samples_per_frame=None, start_time=None, shape=None, sample_rate=None, samples_per_frame=None, dtype=None, **kwargs): self.ih = ih if ih_samples_per_frame is None: ih_samples_per_frame = ih.samples_per_frame self._ih_samples_per_frame = ih_samples_per_frame shape = getattr_if_none(ih, 'shape', shape) start_time = getattr_if_none(ih, 'start_time', start_time) sample_rate = getattr_if_none(ih, 'sample_rate', sample_rate) dtype = getattr_if_none(ih, 'dtype', dtype) if samples_per_frame is None: samples_per_frame = ih_samples_per_frame self.meta = getattr(ih, 'meta', {}) # Get possible metadata, but giving preference to what is passed in, # except that any None are removed. for attr in META_ATTRIBUTES: value = getattr_if_none(ih, attr, kwargs.pop(attr, None), required=False) if value is not None: kwargs[attr] = value super().__init__(shape=shape, start_time=start_time, sample_rate=sample_rate, samples_per_frame=samples_per_frame, dtype=dtype, **kwargs) def _repr_item(self, key, default, value=None): if key == 'ih': return 'ih' if default is None: if key == 'samples_per_frame': default = self._ih_samples_per_frame elif key == 'ih_samples_per_frame': default = self.ih.samples_per_frame else: default = getattr(self.ih, key, None) return super()._repr_item(key, default=default, value=value) def __repr__(self): base = super().__repr__() if base.count('\n') == 1: base = ' '.join(b.strip() for b in base.split('\n')) return (base + "\nih: " + "\n ".join(repr(self.ih).split('\n')))
[docs] def close(self): """Close task. Note that this does not explicitly close the underlying source; instead, it just deletes the reference to it. """ super().close() # Delete the reference to the underlying filehandle, so that it # can be freed if used nowhere else. del self.ih
[docs]class TaskBase(BaseTaskBase): """Base class of all tasks. Following the design of `baseband` stream readers, features properties describing the size, shape, data type, sample rate and start/stop times of the task's output. Also defines methods to move a sample pointer across the output data in units of either complete samples or time. This class provides a base ``_read_frame`` method that will read a frame worth of data from the underlying stream and pass it on to a task method. Hence, subclasses should define: ``task(self, data)`` : return processed data from one frame. Parameters ---------- ih : stream handle Handle of a stream reader or another task. ih_samples_per_frame : int, optional Number of input samples which should be dealt with in one go. If not given, inferred from ``samples_per_frame``; if that is also not given, taken from the underlying stream. shape : tuple, optional Overall shape of the stream, with first entry the total number of complete samples, and the remainder the sample shape. If not given, the sample shape is that of the underlying stream, and the number of complete samples is inferred from the number of frames implied by ``ih_samples_per_frame``, combined with ``samples_per_frame``. The latter inference also happens if the first entry is ``-1``. If a shape inconsistent with an integer number of frames is given, the task should be able to deal with a partial frame (which, when the sample rate is reduced, will always contain an integer multiple of the reduction factor). sample_rate : `~astropy.units.Quantity`, optional With units of a rate. If not given, taken from the underlying stream. samples_per_frame : int, optional Number of samples the task produces per frame. If not given, inferred from the input number of samples using the ratio of the ``sample_rate`` passed in and that of the underlying stream. **kwargs Possible further arguments and metadata; see `~baseband_tasks.base.BaseTaskBase`. """ def __init__(self, ih, *, ih_samples_per_frame=None, shape=None, sample_rate=None, samples_per_frame=None, **kwargs): if sample_rate is None: sample_rate = ih.sample_rate sample_rate_ratio = 1. else: sample_rate_ratio = (ih.sample_rate / sample_rate).to(1).value if samples_per_frame is None: if ih_samples_per_frame is None: ih_samples_per_frame = ih.samples_per_frame samples_per_frame = ih_samples_per_frame / sample_rate_ratio assert samples_per_frame % 1 == 0, ( "inferred samples per frame must be integer") samples_per_frame = int(samples_per_frame) elif ih_samples_per_frame is None: ih_samples_per_frame = samples_per_frame * sample_rate_ratio assert ih_samples_per_frame % 1 == 0, ( "inferred input samples per frame must be integer") ih_samples_per_frame = int(ih_samples_per_frame) assert ih_samples_per_frame <= ih.shape[0], ( "time per frame larger than total time in stream") if shape is None or shape[0] == -1: ns = ((ih.shape[0] // ih_samples_per_frame) * samples_per_frame) shape = (ns,) + (ih.shape[1:] if shape is None else shape[1:]) super().__init__(ih=ih, ih_samples_per_frame=ih_samples_per_frame, shape=shape, sample_rate=sample_rate, samples_per_frame=samples_per_frame, **kwargs) alignment = max(1, int(sample_rate_ratio)) self._ih_stop = (self.ih.shape[0] // alignment) * alignment def _seek_frame(self, frame_index): return self.ih.seek(frame_index * self._ih_samples_per_frame) def _read_frame(self, frame_index): # Read data from underlying filehandle. start = self._seek_frame(frame_index) stop = min(start + self._ih_samples_per_frame, self._ih_stop) data = self.ih.read(stop-start) # Apply function to the data. Note that the _get_frame() function # in base ensures that our offset pointer is correct. return self.task(data)
[docs]class PaddedTaskBase(TaskBase): """Base for tasks which need more points than they produce. Like `~baseband_tasks.base.TaskBase`, subclasses should define: ``task(self, data)`` : return processed data from one frame. Where ``data`` will contain extra padding. The ``task`` method has to ensure the right selection is returned, and can use the ``_pad_start`` and ``_pad_end`` attributes for this purpose. Parameters ---------- ih : stream handle Handle of a stream reader or another task. pad_start, pad_end : int Padding to apply at the start and end. Default: 0. samples_per_frame : int, optional Number of output samples which should be produced in each frame. The number of input samples will be larger by the padding. If not given, the minimum power of 2 needed to get at least 75% efficiency. **kwargs Possible further arguments; see `~baseband_tasks.base.BaseTaskBase`. """ def __init__(self, ih, pad_start=0, pad_end=0, *, samples_per_frame=None, **kwargs): self._pad_start = operator.index(pad_start) self._pad_end = operator.index(pad_end) if self._pad_start < 0 or self._pad_end < 0: raise ValueError("padding values must be 0 or positive.") pad = self._pad_start + self._pad_end if samples_per_frame is None: # Calculate the number of samples that ensures >75% efficiency: # use 4 times power of two just above pad. ih_samples_per_frame = ih.samples_per_frame if pad > 0: ih_samples_per_frame = max(ih_samples_per_frame, 2 ** ( int((np.ceil(np.log2(pad)))) + 2)) samples_per_frame = ih_samples_per_frame - pad else: ih_samples_per_frame = samples_per_frame + pad if pad > samples_per_frame: warnings.warn("task will be inefficient; for {} samples " "per frame, more ({}) will be added for padding." .format(samples_per_frame, pad)) n_sample = ih.shape[0] - pad shape = (n_sample,) + ih.sample_shape kwargs['start_time'] = (getattr_if_none(ih, 'start_time', **kwargs) + self._pad_start / ih.sample_rate) super().__init__(ih, ih_samples_per_frame=ih_samples_per_frame, shape=shape, samples_per_frame=samples_per_frame, **kwargs) def _seek_frame(self, frame_index): """Seek to the start of the given frame in the underlying file. If the frame is a partial frame, then seek to the last complete chunk of data, setting ``_frame_offset`` to indicate the offset used. Returns the position in the underlying file. """ ih_index = frame_index * self.samples_per_frame max_start = self.ih.shape[0] - self._ih_samples_per_frame if ih_index > max_start: self._frame_offset = ih_index - max_start return self.ih.seek(max_start) else: self._frame_offset = 0 return self.ih.seek(ih_index) def _get_frame(self, offset): # Override to add the possible sample offset in the last frame. self._frame, sample_offset = super()._get_frame(offset) return self._frame, sample_offset + self._frame_offset
[docs]class Task(TaskBase): """Apply a user-supplied callable to a stream. The task can either behave like a function or a method. If a function, it will be passed just the frame data read from the underlying file or task; if a method, it will be passed the Task instance (with its offset at the correct sample) as well as the frame data read. Note that for common functions it is recommended to instead define a new subclass of `~baseband_tasks.base.TaskBase` in which a ``task`` (static)method is defined. Parameters ---------- ih : filehandle Source of data, or another task, from which samples are read. task : callable The function or method-like callable. method : bool, optional Whether ``task`` is a method (two arguments) or a function (one argument). Default: inferred by inspection. **kwargs Additional arguments and metadata to be passed on to the base class. --- Possible arguments : (see `~baseband_tasks.base.TaskBase`) shape : tuple, optional Overall shape of the stream, with first entry the total number of complete samples, and the remainder the sample shape. By default, the shape of the underlying stream, possibly adjusted for a difference of sample rate. sample_rate : `~astropy.units.Quantity`, optional With units of a rate. If not given, taken from the underlying stream. Should be passed in if the function reduces or expands the number of elements. samples_per_frame : int, optional Number of samples which should be fed to the function in one go. If not given, by default the number from the underlying file, possibly adjusted for a difference in sample rate. dtype : `~numpy.dtype`, optional Output dtype. If not given, the dtype of the underlying file. --- Possible metadata : (see `~baseband_tasks.base.BaseTaskBase`) frequency : `~astropy.units.Quantity`, optional Frequencies for each channel. Should be broadcastable to the sample shape. Default: taken from the underlying stream, if available. sideband : array, optional Whether frequencies are upper (+1) or lower (-1) sideband. Should be broadcastable to the sample shape. Default: taken from the underlying stream, if available. polarization : array or (nested) list of char, optional Polarization labels. Should broadcast to the sample shape, i.e., the labels are in the correct axis. For instance, ``['X', 'Y']``, or ``[['L'], ['R']]``. Default: taken from the underlying stream, if available. Raises ------ TypeError If inspection of the task does not work. AssertionError If the task has zero or more than 2 arguments. """ def __init__(self, ih, task, method=None, **kwargs): if method is None: try: argspec = inspect.getfullargspec(task) narg = len(argspec.args) if argspec.defaults: narg -= len(argspec.defaults) if inspect.ismethod(task): narg -= 1 assert 1 <= narg <= 2 method = narg == 2 except Exception as exc: exc.args += ("cannot determine whether ``task`` is a " "function or method. Pass in ``method``.",) raise if method: self.task = types.MethodType(task, self) else: self.task = task super().__init__(ih, **kwargs) def _repr_item(self, key, default, value=None): if key == 'task' and isinstance(self.task, types.MethodType): value = self.task.__func__ return super()._repr_item(key, default=default, value=value)
[docs]class SetAttribute(TaskBase): """Wrapper for streams that allows one to set or change attributes. Can be used to add ``frequency``, ``sideband`` and ``polarization`` attributes, which most baseband readers do not provide, checking that the values broadcast properly to the sample shape. Can also be used to apply a clock correction by changing ``start_time``. The ``sample_rate`` can also be set, but no check is done to ensure it remains consistent with the ``frequency``. One can also change the ``shape``, ``samples_per_frames``, or ``dtype`` but that will be slightly slower, as the class can then not read directly from the underlying stream. By default, all parameters are taken from the underlying stream. Parameters ---------- ih : stream handle Handle of a stream reader or another task. start_time : `~astropy.time.Time`, optional Start time of the stream. sample_rate : `~astropy.units.Quantity`, optional With units of a rate. **kwargs Other parameters and metadata. See `~baseband_tasks.base.TaskBase`. --- Possible metadata : (see `~baseband_tasks.base.BaseTaskBase`) frequency : `~astropy.units.Quantity`, optional Frequencies for each channel. Should be broadcastable to the sample shape. sideband : array, optional Whether frequencies are upper (+1) or lower (-1) sideband. Should be broadcastable to the sample shape. polarization : array or (nested) list of char, optional Polarization labels. Should broadcast to the sample shape, i.e., the labels are in the correct axis. For instance, ``['X', 'Y']``, or ``[['L'], ['R']]``. """ def __init__(self, ih, *, start_time=None, sample_rate=None, **kwargs): super().__init__(ih, start_time=start_time, sample_rate=sample_rate, **kwargs) if not set(kwargs).difference(META_ATTRIBUTES): # No overrides of anything related to data, so can use read of # underlying file directly. self.read = self.simple_read
[docs] def simple_read(self, *args, **kwargs): """Read data from the underlying stream at the current offset.""" # Used as ``read`` if the data aspects are not changed. See above. self.ih.seek(self.offset) return self.ih.read(*args, **kwargs)
[docs] def task(self, data): return data