from loguru import logger
from typing import List, Dict, Any
from pathlib import Path
import numpy as np
import pandas as pd
from resistics.time import TimeMetadata, TimeData, TimeReader
from resistics_readers.multifile import TimeMetadataSingle, TimeMetadataMerge
CHAN_MAP = {"Ex": "Ex", "Ey": "Ey", "Bx": "Hx", "By": "Hy", "Bz": "Hz"}
CHAN_TYPES = {
"Ex": "electric",
"Ey": "electric",
"Bx": "magnetic",
"By": "magnetic",
"Bz": "magnetic",
}
[docs]def update_xtr_data(
xtr_data: Dict[str, Any], section: str, key: str, val: str
) -> Dict[str, Any]:
"""
Decide how to deal with an XTR file entry
Parameters
----------
xtr_data : Dict[str, Any]
The dictionary with the existing XTR data
section : str
The section which has the key, value pair
key : str
The key
val : str
the value
Returns
-------
Dict[str, Any]
The updated XTR dictionary
"""
if key in xtr_data[section] and not isinstance(xtr_data[section][key], list):
# the key has already been encountered once but has now reappeared
# this now needs to be converted to a list
xtr_data[section][key] = [xtr_data[section][key], val]
elif key in xtr_data[section]:
# a new value for an existing key that is already a list
xtr_data[section][key].append(val)
else:
# a new value for a key that has not appeared before
# still unknown whether this will become a list later
xtr_data[section][key] = val
return xtr_data
[docs]def read_xtr(xtr_path: Path) -> Dict[str, Any]:
"""
Function to read an XTR file.
XTR files are similar to INI files but with duplicate entries making them
more annoying to read.
Parameters
----------
xtr_path : Path
Path to the XTR file
Returns
-------
Dict[str, Any]
Data from the XTR file
"""
with xtr_path.open("r") as f:
lines = f.readlines()
lines = [x.strip().replace("'", "").strip() for x in lines]
lines = [x for x in lines if x != ""]
xtr_data: Dict[str, Any] = {}
section = "GLOBAL"
for line in lines:
if line[0] == "[" and line[-1] == "]":
section = line[1:-1]
xtr_data[section] = {}
else:
key, val = line.split("=")
xtr_data = update_xtr_data(xtr_data, section, key.strip(), val.strip())
return xtr_data
[docs]class TimeReaderRAW(TimeReader):
"""
Parent reader for reading from .RAW SPAM files. Associated metadata can come
in XTR or XTRX (an XML style) format. Methods for reading from specific
metadata file formats should be added in child classes inheriting from this
one.
The raw data for SPAM is sensor Voltage in single precision float. However,
if there are multiple data files for a single continuous dataset, each one
may have a different gain. Therefore, a scaling has to be calculated for
each data file and channel. Applying these scalings will convert all
channels to mV.
More information about scalings can be found in readers for the various
metadata types, where the scalars are calculated. This class simply
implements the reading of data and not the calculation of the scalars.
"""
extension = ".RAW"
def _read_RAW_metadata(self, raw_path: Path) -> Dict[str, Any]:
"""
Read metadata directly from the .RAW data files' header bytes
First begin by reading the general metadata at the start of the file.
This can be followed by multiple event metadata. However, multiple event
metadata in RAW files are largely deprecated and only single event
metadata are supported in this reader.
Each .RAW file can have its own data byte offset
Notes
-----
Open with encoding ISO-8859-1 because it has a value for all bytes
unlike other encoding. In particular, want to find number of samples
and the size of the metadata. The extended metadata is ignored.
Parameters
----------
raw_path : Path
Path to RAW file
Returns
-------
Dict[str, Any]
Dictionary of metadata from RAW file
Raises
------
MetadataReadError
If the number of event metadata is greater than 1. This is not
currently supported
"""
from resistics.errors import MetadataReadError
f_size = raw_path.stat().st_size
f = raw_path.open("r", encoding="ISO-8859-1")
# read general metadata - provide enough bytes to read
raw_metadata = self._read_RAW_general_metadata(f.read(1000))
# read event metadata
event_metadata = []
record = raw_metadata["first_event"]
for _ir in range(raw_metadata["n_events"]):
# seek to the record from the beginning of the file
seek_pt = (record - 1) * raw_metadata["rec_length"]
if not seek_pt > f_size:
f.seek(seek_pt, 0)
event = self._read_RAW_event_metadata(f.read(1000))
event_metadata.append(event)
if event["next_event_metadata"] < raw_metadata["total_rec"]:
# byte location of next record
record = event["next_event_metadata"]
else:
break
f.close()
if len(event_metadata) > 1:
raise MetadataReadError(
raw_path, f"({len(event_metadata)}) > 1 events in RAW file."
)
raw_metadata.update(event_metadata[0])
raw_metadata["data_byte_start"] = (
raw_metadata["start_data"] - 1
) * raw_metadata["rec_length"]
return raw_metadata
def _read_RAW_general_metadata(self, general: str) -> Dict[str, Any]:
"""
Note that rec_chans is the number of channels recorded, not the number
of channels connected acquiring good data. This is usually five.
Parameters
----------
general : str
The general data as a string
Returns
-------
Dict[str, Any]
Dictionary with general metadata and values
"""
gen_split = general.split()
return {
"rec_length": int(gen_split[0]),
"file_type": gen_split[1],
"word_length": int(gen_split[2]),
"version": gen_split[3],
"proc_id": gen_split[4],
"rec_chans": int(gen_split[5]),
"total_rec": int(gen_split[6]),
"first_event": int(gen_split[7]),
"n_events": int(gen_split[8]),
"extend": int(gen_split[9]),
}
def _read_RAW_event_metadata(self, event: str) -> Dict[str, Any]:
"""
Parse the event metadata
Parameters
----------
event : str
The event data as a string
Returns
-------
Dict[str, Any]
Dictionary with event metadata and values
"""
event_split = event.split()
return {
"start": int(event_split[0]),
"startms": int(event_split[1]),
"stop": int(event_split[2]),
"stopms": int(event_split[3]),
"cvalue1": float(event_split[4]),
"cvalue2": float(event_split[5]),
"cvalue3": float(event_split[6]),
"event_metadata_infile": int(event_split[7]),
"next_event_metadata": int(event_split[8]),
"previous_event_metadata": int(event_split[9]),
"n_samples": int(event_split[10]),
"start_data": int(event_split[11]),
"extended": int(event_split[12]),
}
[docs] def read_data(
self, dir_path: Path, metadata: TimeMetadata, read_from: int, read_to: int
) -> TimeData:
"""
Get data from data file, returned in mV
Calling this applies scalings calculated when the metadata are read.
When a recording consists of multiple data files, each channel of each
data file might have a different scaling, therefore, gain removals and
other RAW file unique scalings need to be applied before the data is
stitched together.
This method returns the data in mV for all channels.
Parameters
----------
dir_path : path
The directory path to read from
metadata : TimeMetadata
Time series data metadata
read_from : int
Sample to read data from
read_to : int
Sample to read data to
Returns
-------
TimeData
Time data object
"""
from resistics_readers.multifile import samples_to_sources
dtype = np.float32
dtype_size = np.dtype(np.float32).itemsize
n_samples = read_to - read_from + 1
messages = [f"Reading raw data from {dir_path}"]
messages.append(f"Sampling rate {metadata.fs} Hz")
# loop over RAW files and read data
data_table = pd.DataFrame(data=metadata.data_table)
df_to_read = samples_to_sources(dir_path, data_table, read_from, read_to)
data = np.empty(shape=(metadata.n_chans, n_samples), dtype=dtype)
sample = 0
for data_file, info in df_to_read.iterrows():
file_from = info.loc["read_from"]
file_to = info.loc["read_to"]
n_samples_file = info.loc["n_samples_read"]
rec_chans = info.loc["rec_chans"]
data_byte_start = info.loc["data_byte_start"]
messages.append(f"{data_file}: Reading samples {file_from} to {file_to}")
logger.debug(f"{data_file}: Reading samples {file_from} to {file_to}")
n_samples_read = n_samples_file * rec_chans
byteoff = data_byte_start + (file_from * rec_chans * dtype_size)
data_path = dir_path / str(data_file)
data_read = np.memmap(
data_path, dtype=dtype, mode="r", offset=byteoff, shape=(n_samples_read)
)
for idx, chan in enumerate(metadata.chans):
scaling = info[f"{chan} scaling"]
data[idx, sample : sample + n_samples_file] = (
data_read[idx:n_samples_read:rec_chans] * scaling
)
sample = sample + n_samples_file
metadata = self._get_return_metadata(metadata, read_from, read_to)
messages.append(f"From sample, time: {read_from}, {str(metadata.first_time)}")
messages.append(f"To sample, time: {read_to}, {str(metadata.last_time)}")
metadata.history.add_record(self._get_record(messages))
logger.info(f"Data successfully read from {dir_path}")
return TimeData(metadata, data)
[docs] def scale_data(self, time_data: TimeData) -> TimeData:
"""
Scale data to physically meaningful units
Resistics uses field units, meaning physical samples will return the
following:
- Electrical channels in mV/km
- Magnetic channels in mV or nT depending on the sensor
- To convert magnetic in mV to nT, calibration is required
Notes
-----
Conversion to mV (gain removal) is performed on read as each RAW file in
a dataset can have a separate scalar. Because gain is removed when
reading the data and all channel data is in mV, the only calculation
that has to be done is to divide by the dipole lengths (east-west
spacing and north-south spacing).
Parameters
----------
time_data : TimeData
TimeData read in from files
Returns
-------
TimeData
Scaled TimeData
"""
logger.info("Applying scaling to data to give field units")
messages = ["Scaling raw data to physical units"]
for chan in time_data.metadata.chans:
chan_metadata = time_data.metadata.chans_metadata[chan]
if chan_metadata.electric():
dipole_dist_km = chan_metadata.dipole_dist / 1_000
time_data[chan] = time_data[chan] / dipole_dist_km
messages.append(f"Dividing {chan} by dipole length {dipole_dist_km} km")
record = self._get_record(messages)
time_data.metadata.history.add_record(record)
return time_data
[docs]class TimeReaderXTR(TimeReaderRAW):
"""Data reader for SPAM RAW data with XTR metadata"""
def _read_xtr(self, xtr_path: Path) -> TimeMetadataXTR:
"""
Read an individual XTR metadata file and return TimeMetadata
There is also metadata in RAW files associated with XTR files. To get
the full set of metadata and ensure the data makes sense, XTR metadata
are validated against those in the RAW file.
Parameters
----------
xtr_path : Path
The XTR metadata path to read in
Returns
-------
TimeMetadataXTR
Metadata for the XTR file
Raises
------
MetadataReadError
If there is a mismatch in samples between XTR file and RAW file
"""
from resistics.errors import MetadataReadError
xtr_data = read_xtr(xtr_path)
xtr_time_dict = self._read_xtr_dataset_metadata(xtr_data)
xtr_time_dict["xtr_file"] = xtr_path.name
xtr_time_dict["chans_metadata"] = self._read_xtr_chan_metadata(xtr_data)
xtr_time_dict["chans"] = list(xtr_time_dict["chans_metadata"].keys())
raw_path = xtr_path.parent / xtr_time_dict["data_file"]
raw_metadata = self._read_RAW_metadata(raw_path)
# check to make sure sample length in file matches calculated samples
if xtr_time_dict["n_samples"] != raw_metadata["n_samples"]:
raise MetadataReadError(
xtr_path,
f"Sample mismatch between XTR {xtr_path} and RAW in {raw_path}",
)
xtr_time_dict["data_byte_start"] = raw_metadata["data_byte_start"]
xtr_time_dict["rec_chans"] = raw_metadata["rec_chans"]
return TimeMetadataXTR(**xtr_time_dict)
def _read_xtr_dataset_metadata(self, xtr_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Read the data in the XTR file that relates to TimeData dataset metadata
Parameters
----------
xtr_data : Dict[str, Any]
Information from the xtr file
Returns
-------
Dict[str, Any]
Metadatas as a dictionary
"""
from resistics.sampling import to_datetime
xtr_time_metadata = {}
# raw file name and fs
split = xtr_data["FILE"]["NAME"].split()
xtr_time_metadata["data_file"] = split[0]
xtr_time_metadata["fs"] = np.absolute(float(split[-1]))
# first, last time info which are in unix timestamp
split = xtr_data["FILE"]["DATE"].split()
first_time = pd.to_datetime(int(split[0] + split[1]), unit="us", origin="unix")
last_time = pd.to_datetime(int(split[2] + split[3]), unit="us", origin="unix")
duration = (last_time - first_time).total_seconds()
n_samples = int((duration * xtr_time_metadata["fs"]) + 1)
xtr_time_metadata["first_time"] = to_datetime(first_time)
xtr_time_metadata["last_time"] = to_datetime(last_time)
xtr_time_metadata["n_samples"] = n_samples
# get number of channels
xtr_time_metadata["n_chans"] = xtr_data["CHANNAME"]["ITEMS"]
# location information
split = xtr_data["SITE"]["COORDS"].split()
xtr_time_metadata["wgs84_latitude"] = float(split[1])
xtr_time_metadata["wgs84_longitude"] = float(split[2])
xtr_time_metadata["elevation"] = float(split[3])
return xtr_time_metadata
def _read_xtr_chan_metadata(
self, xtr_data: Dict[str, Any]
) -> Dict[str, Dict[str, Any]]:
"""
Get metadata for each channel
There are some tricky notes here regarding scalers that are extracted.
This information was provided by Reinhard.
For electric channels:
- Data is in raw voltage of sensors
- Scaling is taken from DATA section of XTR file and will be applied
- Polarity reversal is applied (multiply by -1)
- 1000x scaling is applied to convert Volts to mV
- A final unknown scaling of 1000 is applied
Mathematically this becomes:
.. math::
scaling = scaling extracted from DATA section of XTR file
scaling = scaling * -1000 (polarity reversal and convert V to mV),
scaling = scaling * 1000 (unknown 1000 scaling).
For magnetic channels:
- Scaling in DATA section ignored
- This scaling is applied as the static gain during calibration
- Polarity reversal is applied (multiply by -1)
- A final unknown scaling of 1000
Mathematically, this is,
.. math::
scaling = -1000 (Polarity reversal and unknown 1000 factor)
.. note::
Board LF is currently translated to chopper being True or On. Not
sure how applicable this is, but it's in there.
Parameters
----------
xtr_data : Dict[str, Any]
Data from the XTR file
Returns
-------
Dict[Dict[str, Any]]
Metadatas for channels as a dictionary
"""
chans = [x.split()[1] for x in xtr_data["CHANNAME"]["NAME"]]
xtr_chans_metadata = {}
for idx, chan in enumerate(chans):
chan_metadata = {
"name": CHAN_MAP[chan],
"data_files": xtr_data["FILE"]["NAME"].split()[0],
}
chan_metadata["chan_type"] = CHAN_TYPES[chan]
chan_metadata["chan_source"] = chan
data_split = xtr_data["DATA"]["CHAN"][idx].split()
chan_metadata["scaling"] = self._get_xtr_chan_scaling(
CHAN_TYPES[chan], data_split
)
chan_metadata["dipole_dist"] = float(data_split[3])
# sensors
sensor_section = f"200{idx + 1}003"
sensor_split = xtr_data[sensor_section]["MODULE"].split()
chan_metadata["serial"] = sensor_split[1]
# get the board and coil type from name of calibration file
cal_file = sensor_split[0]
split = cal_file.split("-")
info = split[split.index("TYPE") + 1]
chan_metadata["sensor"] = info.split("_")[0]
chan_metadata["chopper"] = "LF" in info
# add to main dictionary
xtr_chans_metadata[CHAN_MAP[chan]] = chan_metadata
return xtr_chans_metadata
def _get_xtr_chan_scaling(self, chan_type: str, data_split: List[str]) -> float:
"""Get the correction required for field units"""
scaling = float(data_split[-2])
if chan_type == "electric":
scaling = -1000.0 * scaling * 1000
if chan_type == "magnetic":
scaling = -1000
return scaling
def _generate_table(self, metadata_list: List[TimeMetadataXTR]) -> pd.DataFrame:
"""
Generate a table mapping RAW file to first time, last time, number of
samples, data byte offsets and scalings
Parameters
----------
metadata_list : List[TimeMetadataXTR]
List of TimeMetadataXTR, one for each XTR/RAW file combination
Returns
-------
pd.DataFrame
The table mapping data file to various properties
"""
from resistics_readers.multifile import add_cumulative_samples
df = pd.DataFrame()
df["data_file"] = [x.data_file for x in metadata_list]
df["first_time"] = [x.first_time for x in metadata_list]
df["last_time"] = [x.last_time for x in metadata_list]
df["n_samples"] = [x.n_samples for x in metadata_list]
df["data_byte_start"] = [x.data_byte_start for x in metadata_list]
df["rec_chans"] = [x.rec_chans for x in metadata_list]
# save scaling information
chans = metadata_list[0].chans
for chan in chans:
col = f"{chan} scaling"
df[col] = [x.chans_metadata[chan].scaling for x in metadata_list]
df = df.sort_values("first_time")
df = df.set_index("data_file")
return add_cumulative_samples(df)
def _merge_metadata(
self, metadata_list: List[TimeMetadataXTR], df: pd.DataFrame
) -> TimeMetadataMerge:
"""
Merge metadata from all the metadata files
The following assumptions are made:
- Assume no change in location over time (lat and long remain the same)
- Assume no change in sensors over time
- Assume no change in electrode spacing over time
As each data file can have different scalings, the most common scaling
(mode) for each channel is taken.
Parameters
----------
metadata_list : List[TimeMetadataXTR]
List of TimeMetadataXTR, one for each XTR/RAW file combination
df : pd.DataFrame
The data table with information about each raw file
Returns
-------
TimeMetadataMerge
Merged metadata
"""
data_files = df.index.values.tolist()
metadata = metadata_list[0]
for chan in metadata.chans:
metadata.chans_metadata[chan].data_files = data_files
scaling = df[f"{chan} scaling"].mode(dropna=True)
metadata.chans_metadata[chan].scaling = scaling
metadata_dict = metadata.dict()
metadata_dict["first_time"] = df["first_time"].min()
metadata_dict["last_time"] = df["last_time"].max()
metadata_dict["n_samples"] = df["n_samples"].sum()
metadata_dict["data_table"] = df.to_dict()
return TimeMetadataMerge(**metadata_dict)