Source code for africanus.experimental.rime.fused.transformers.parangle

import numpy as np
from numba.core import cgutils, types
from numba.core.errors import TypingError
from numba import njit, objmode

from africanus.rime.parangles_casa import casa_parallactic_angles
from africanus.experimental.rime.fused.transformers.core import Transformer


[docs] class ParallacticTransformer(Transformer): OUTPUTS = ["feed_parangle", "beam_parangle"] def __init__(self, process_pool): self.pool = process_pool def dask_schema(self, antenna_position, phase_dir, receptor_angle=None): dt = np.result_type(antenna_position, phase_dir, receptor_angle) inputs = {"antenna_position": ("antenna", "ant-comp"), "phase_dir": ("radec",)} if receptor_angle is not None: inputs["receptor_angle"] = ("feed", "receptor_angle") else: inputs["receptor_angle"] = None outputs = { "feed_parangle": np.empty((0,) * 5, dt), "beam_parangle": np.empty((0,) * 4, dt), } return inputs, outputs def init_fields( self, typingctx, init_state, antenna_position, phase_dir, receptor_angle=None, ): fdict = init_state.field_dict if not isinstance(antenna_position, types.Array) or antenna_position.ndim != 2: raise TypingError( f"antenna_position {antenna_position} should be a (antenna, 3) array" ) dt = typingctx.unify_types( fdict["utime"].dtype, fdict["ufeed"].dtype, antenna_position.dtype, phase_dir.dtype, ) fields = [ ("feed_parangle", dt[:, :, :, :, :]), ("beam_parangle", dt[:, :, :, :]), ] parangle_dt = types.Array(types.float64, 2, "C") HAVE_RA = not cgutils.is_nonelike(receptor_angle) if HAVE_RA and ( not isinstance(receptor_angle, types.Array) or receptor_angle.ndim != 2 ): raise TypingError("receptor_angle must be a (feed, 2) array") @njit(inline="never") def parangle_stub(time, antenna, phase_dir): with objmode(out=parangle_dt): out = self.pool.apply( casa_parallactic_angles, (time, antenna, phase_dir) ) return out def parangles(init_state, antenna_position, phase_dir, receptor_angle=None): (ntime,) = init_state.utime.shape (nant,) = init_state.uantenna.shape (nfeed,) = init_state.ufeed.shape # Select out the antennae we're interested in antenna_position = antenna_position[init_state.uantenna] parangles = parangle_stub(init_state.utime, antenna_position, phase_dir) feed_pa = np.empty((ntime, nfeed, nant, 2, 2), parangles.dtype) beam_pa = np.empty((ntime, nfeed, nant, 2), parangles.dtype) if HAVE_RA: if receptor_angle.ndim != 2: raise ValueError("receptor_angle.ndim != 2") if receptor_angle.shape[1] != 2: raise ValueError("Only 2 receptor angles currently supported") # Select out the feeds we're interested in receptor_angle = receptor_angle[init_state.ufeed, :] for t in range(ntime): for f in range(nfeed): ra1 = receptor_angle[f, 0] if HAVE_RA else dt(0) ra2 = receptor_angle[f, 1] if HAVE_RA else dt(0) for a in range(nant): pa1 = parangles[t, a] pa2 = parangles[t, a] beam_pa[t, f, a, 0] = np.sin(pa1) beam_pa[t, f, a, 1] = np.cos(pa1) pa1 += ra1 pa2 += ra2 feed_pa[t, f, a, 0, 0] = np.sin(pa1) feed_pa[t, f, a, 0, 1] = np.cos(pa1) feed_pa[t, f, a, 1, 0] = np.sin(pa2) feed_pa[t, f, a, 1, 1] = np.cos(pa2) return feed_pa, beam_pa return fields, parangles