Source code for resistics_readers.spam.calibration

from typing import List, Dict, Any, Tuple
from pathlib import Path
import re
import numpy as np
import pandas as pd
from xml.etree.ElementTree import Element  # noqa: S405
import defusedxml.ElementTree as ET
from resistics.time import ChanMetadata
from resistics.calibrate import SensorCalibrationReader, CalibrationData
from resistics.spectra import SpectraMetadata


[docs]class SensorCalibration_RSP_RSPX_Base(SensorCalibrationReader): """Base class for RSP and RSPX calibration data readers""" file_str: str = "Metronix_Coil-----TYPE-$sensor_$chopper-ID-$serial$extension" """The file string to search for. Various parameters will be substituted""" def _get_path(self, dir_path: Path, metadata: SpectraMetadata, chan: str) -> Path: """ Get the path to the calibration file Parameters ---------- dir_path : Path The directory path to look for calibration files metadata : SpectraMetadata SpectraMetadata with data information chan : str The channel to calibrate Returns ------- Path The path to the calibration file """ chan_metadata = metadata.chans_metadata[chan] chopper_str = "LF" if chan_metadata.chopper else "HF" sensor_str = re.sub("[^0-9]", "", chan_metadata.sensor) sensor_str = f"{int(sensor_str):03d}" serial_str = f"{int(chan_metadata.serial):06d}" file_name = self.file_str.replace("$sensor", sensor_str) file_name = file_name.replace("$serial", serial_str) file_name = file_name.replace("$chopper", chopper_str) file_name = file_name.replace("$extension", self.extension) return dir_path / file_name def _get_chopper(self, file_path: Path) -> bool: """Get whether the calibration is chopper on or off""" if "LF" in file_path.stem or "BB" in file_path.stem: return True return False
[docs]class SensorCalibrationRSP(SensorCalibration_RSP_RSPX_Base): """ Reader for RSP calibration files RSP data is in units: - F [Hz] - Magnitude [mv/nT] - Phase [deg] Data is returned with units: - F [Hz] - Magnitude [mV/nT] - Phase [radians] The static gain for RSP files is applied to the magnitude as it is read in """ extension: str = ".RSP"
[docs] def read_calibration_data( self, file_path: Path, chan_metadata: ChanMetadata ) -> CalibrationData: """ Read data from a RSP calibration file Parameters ---------- file_path : Path The file path of the calibration file chan_metadata : ChanMetadata The channel metadata for the channel to be calibrated Returns ------- CalibrationData The calibration data """ with file_path.open("r") as f: lines = f.readlines() lines = [x.strip() for x in lines] data_dict = self._read_metadata(lines) data_dict["chopper"] = self._get_chopper(file_path) df = self._read_data(lines) df["magnitude"] = df["magnitude"] * data_dict["static_gain"] df["phase"] = df["phase"] * (np.pi / 180) data_dict["frequency"] = df.index.values.tolist() data_dict["magnitude"] = df["magnitude"].values.tolist() data_dict["phase"] = df["phase"].values.tolist() data_dict["file_path"] = file_path return CalibrationData(**data_dict)
def _read_metadata(self, lines: List[str]) -> Dict[str, Any]: """Read the calibration file metadata""" serial, sensor = self._get_sensor_details(lines) static_gain = self._get_static_gain(lines) return { "serial": serial, "sensor": sensor, "static_gain": static_gain, "magnitude_unit": "mV/nT", "phase_unit": "radians", } def _get_sensor_details(self, lines: List[str]) -> Tuple[int, str]: """Get sensor details from the file""" serial: int = 1 sensor: str = "" for line in lines: if "induction coil no" in line: split1 = line.split(":")[1] serial = int(split1.split("-")[0].strip()) if "SensorType" in line: sensor = line.split()[1] return serial, sensor def _get_static_gain(self, lines: List[str]) -> float: static_gain: float = 1.0 for line in lines: if "StaticGain" in line: static_gain = float(line.split()[1]) return static_gain return static_gain def _read_data(self, lines: List[str]) -> pd.DataFrame: """Read data from calibration file""" read_from = self._get_read_from(lines) data_lines = self._get_data_lines(lines, read_from) data = np.array([x.split() for x in data_lines], dtype=np.float32) df = pd.DataFrame(data=data, columns=["frequency", "magnitude", "phase"]) return df.set_index("frequency").sort_index() def _get_read_from(self, lines: List[str]) -> int: """Get the line number to read from""" for idx, line in enumerate(lines): if "FREQUENCY" in line: return idx + 2 raise ValueError("Unable to determine location of data in file") def _get_data_lines(self, lines: List[str], idx: int) -> List[str]: """Get the data lines out of the file""" data_lines: List[str] = [] while idx < len(lines) and lines[idx] != "": data_lines.append(lines[idx]) idx += 1 return data_lines
[docs]class SensorCalibrationRSPX(SensorCalibration_RSP_RSPX_Base): """ Read data from RSPX calibration file RSPX data is in units: - F [Hz] - Magnitude [mv/nT] - Phase [deg] Data is returned with units: - F [Hz] - Magnitude [mV/nT] - Phase [radians] Static gain is applied to the magnitude """ extension: str = ".RSPX"
[docs] def read_calibration_data( self, file_path: Path, chan_metadata: ChanMetadata ) -> CalibrationData: """ Read RSPX file Parameters ---------- file_path : Path The file path of the calibration file chan_metadata : ChanMetadata The channel metadata for the channel to be calibrated Returns ------- CalibrationData The calibration data """ root = ET.parse(file_path).getroot() data_dict = self._read_metadata(root) data_dict["chopper"] = self._get_chopper(file_path) df = self._read_data(root) df["magnitude"] = df["magnitude"] * data_dict["static_gain"] df["phase"] = df["phase"] * (np.pi / 180) data_dict["frequency"] = df.index.values.tolist() data_dict["magnitude"] = df["magnitude"].values.tolist() data_dict["phase"] = df["phase"].values.tolist() data_dict["file_path"] = file_path return CalibrationData(**data_dict)
def _read_metadata(self, root: Element) -> Dict[str, Any]: """Read the calibration file metadata""" serial, sensor = self._get_sensor_details(root) static_gain = self._get_static_gain(root) return { "serial": serial, "sensor": sensor, "static_gain": static_gain, "magnitude_unit": "mV/nT", "phase_unit": "radians", } def _get_sensor_details(self, root: Element) -> Tuple[int, str]: """Get sensor details""" serial: int = 1 if root.find("SensorId") is not None: serial = int(root.find("SensorId").text) sensor: str = "" if root.find("SensorSpecification") is not None: sensor = root.find("SensorSpecification").text return serial, sensor def _get_static_gain(self, root) -> float: """Get the static gain""" static_gain: float = 1.0 if root.find("StaticGain") is not None: static_gain = float(root.find("StaticGain").text) return static_gain def _read_data(self, root: Element) -> pd.DataFrame: """Get data in a DataFrame""" data = [] for resp in root.findall("ResponseData"): data.append( [ np.float32(resp.get("Frequency")), np.float32(resp.get("Magnitude")), np.float32(resp.get("Phase")), ] ) df = pd.DataFrame(data=data, columns=["frequency", "magnitude", "phase"]) return df.set_index("frequency").sort_index()