"""
Reformatting for miniseed data
Miniseed data is potentially multi sampling frequency. Resistics does not
support multi sampling frequency data files, therefore, miniseed data files
need to be reformatted before they can be used.
.. warning::
This an initial implementation and there are likely to be numerous scenarios
where this breaks. It is suggested to compare this with the output from
miniseed to ascii thoroughly before using.
https://github.com/iris-edu/mseed2ascii
Contributors are welcomed and ecouraged to improve the code and particularly
add tests to ensure that the reformatting is occuring as expected.
"""
from loguru import logger
from typing import Dict, List, Optional, Tuple
from pathlib import Path
import numpy as np
import pandas as pd
import obspy
from obspy.core.utcdatetime import UTCDateTime
from obspy.core.stream import Stream
from resistics.common import dir_files
from resistics.time import ChanMetadata, TimeMetadata, TimeData
from resistics.time import TimeProcess, TimeWriterNumpy
CHAN_TYPES = {
"Ex": "electric",
"Ey": "electric",
"Hx": "magnetic",
"Hy": "magnetic",
"Hz": "magnetic",
}
[docs]class NoDataInInterval(Exception):
def __init__(
self,
first_time: pd.Timestamp,
last_time: pd.Timestamp,
from_time: pd.Timestamp,
to_time: pd.Timestamp,
):
"""Exception for the no miniseed data in a time interval"""
self.first_time = first_time
self.last_time = last_time
self.from_time = from_time
self.to_time = to_time
def __str__(self):
"""The Exception string"""
outstr = f"Data between {self.first_time} to {self.last_time}"
outstr += f" does not intersect interval {self.from_time} to {self.to_time}"
return outstr
[docs]def get_miniseed_stream(data_path: Path):
"""Get the miniseed file stream"""
return obspy.read(str(data_path))
[docs]def get_streams(data_paths: List[Path]) -> Dict[Path, Stream]:
"""
Get the stream object for each data_path
Parameters
----------
data_paths : List[Path]
The data paths
Returns
-------
Dict[Path, Stream]
The stream objects
"""
streams = {}
for data_path in data_paths:
try:
logger.info(f"Attempting to read file {data_path}")
streams[data_path] = get_miniseed_stream(data_path)
traces = [trace.id for trace in streams[data_path]]
logger.info(f"Successfully read file with traces {traces}")
except Exception:
logger.error(f"Unable to read data from {data_path}. Skipping.")
return streams
[docs]def get_table(streams: Dict[Path, Stream], trace_ids: List[str]) -> pd.DataFrame:
"""
Get table with start and ends for each trace of interest in each file
The table additionally contains the trace index for each trace for every
file
Parameters
----------
streams : Dict[Path, Stream]
Dictionary of file paths to streams
trace_ids : List[str]
The ids of the traces that are of interest
Returns
-------
pd.DataFrame
The data table
"""
data = []
for data_path, stream in streams.items():
file_trace_ids = [x.id for x in stream.traces]
for trace_id in trace_ids:
trace_index = file_trace_ids.index(trace_id)
trace = stream[trace_index]
first_time = pd.to_datetime(
trace.stats.starttime.ns, unit="ns", origin="unix"
)
last_time = pd.to_datetime(trace.stats.endtime.ns, unit="ns", origin="unix")
data.append((data_path.name, trace_id, trace_index, first_time, last_time))
return pd.DataFrame(
data=data,
columns=["data_file", "trace_id", "trace_index", "first_time", "last_time"],
)
[docs]def get_first_last_times(table: pd.DataFrame) -> Tuple[pd.Timestamp, pd.Timestamp]:
"""
Get the minimum first time and maximum last time for the data
Each trace may have different date ranges in the miniseed files. This
function calculates the first and last times where data is present for each
requested trace.
Parameters
----------
table : pd.DataFrame
The information table with the details about trace duration in each data
file
Returns
-------
Tuple[pd.Timestamp, pd.Timestamp]
The first and last time
"""
grouped = table.groupby("data_file")
first_time = (grouped["first_time"].max()).min()
last_time = (grouped["last_time"].min()).max()
return first_time, last_time
[docs]def get_streams_to_read(
trace_id: str,
table: pd.DataFrame,
from_time: pd.Timestamp,
to_time: pd.Timestamp,
) -> pd.DataFrame:
"""
Get the streams to read and the time intervals to read for each stream
Note that this finds time intervals to cover from_time to to_time inclusive
Parameters
----------
trace_id : str
The trace id
table : pd.DataFrame
The table with details about date ranges covered by each miniseed file
from_time : pd.Timestamp
The time to get data from
to_time : pd.Timestamp
The time to get data to
Returns
-------
pd.DataFrame
A row for each data file to read and the time range to read from it
"""
table_trace = table[table["trace_id"] == trace_id]
to_exclude = table_trace["first_time"] > to_time
to_exclude = to_exclude | (table_trace["last_time"] < from_time)
table_trace = table_trace[~to_exclude]
# get the time ranges to read for each file
streams_to_read = []
for _idx, row in table_trace.iterrows():
# get the read from time for this data file
if row.loc["first_time"] > from_time:
read_from = row.loc["first_time"]
else:
read_from = from_time
# get the read to time for this data file
if row.loc["last_time"] > to_time:
read_to = to_time
else:
read_to = row.loc["last_time"]
# save the data to read
streams_to_read.append(
(row.loc["data_file"], row.loc["trace_index"], read_from, read_to)
)
return pd.DataFrame(
data=streams_to_read,
columns=["data_file", "trace_index", "read_from", "read_to"],
)
[docs]def get_stream_data(
dt: pd.Timedelta,
stream: Stream,
trace_index: int,
read_from: pd.Timestamp,
read_to: pd.Timestamp,
) -> np.ndarray:
"""
Get data for a single trace from a stream
Parameters
----------
dt : pd.Timedelta
The sampling rate
stream : Stream
The miniseed file stream
trace_index : int
The index of the trace
read_from : pd.Timestamp
The time to read from
read_to : pd.Timestamp
The time to read to
Returns
-------
np.ndarray
The trace data from the stream
Raises
------
ValueError
If the number of expected samples does not give an integer. This is
currently a safety first approach until more testing is done
ValueError
If the number of samples expected != the number of samples returned by
the trace in the time interval
"""
trace = stream[trace_index]
n_samples_expected = ((read_to - read_from) / dt) + 1
if not n_samples_expected.is_integer():
raise ValueError(
f"Number of samples expected {n_samples_expected} is not an integer"
)
n_samples_expected = int(n_samples_expected)
obspy_read_from = UTCDateTime(read_from.timestamp())
obspy_read_to = UTCDateTime(read_to.timestamp())
subtrace = trace.slice(starttime=obspy_read_from, endtime=obspy_read_to)
if subtrace.count() != n_samples_expected:
raise ValueError(
f"samples expected {n_samples_expected} != found {subtrace.count()}"
)
logger.debug(f"Expecting {n_samples_expected} samples, found {subtrace.count()}")
return subtrace.data
[docs]def get_trace_data(
fs: float,
streams: Dict[Path, Stream],
streams_to_read: Dict[str, Tuple[pd.Timestamp, pd.Timestamp]],
from_time: pd.Timestamp,
n_samples: int,
) -> np.ndarray:
"""
Get data for a single trace beginning at from_time and for n_samples
Parameters
----------
fs : float
The sampling frequency
streams : Dict[Path, Stream]
The streams
streams_to_read : Dict[str, Tuple[pd.Timestamp, pd.Timestamp]]
The streams to read for this trace and time interval
from_time : pd.Timestamp
The time to get the data from
n_samples : int
The number of samples to get
Returns
-------
np.ndarray
The data
Raises
------
ValueError
If converting read_from date to samples does not give an integer. This
is a safety first approach but problems could be encountered at very
high sampling frequencies. In this case, much more testing needs to be
done about expected behaviour
ValueError
If converting read_to date to samples does not give an integer. This
is a safety first approach but problems could be encountered at very
high sampling frequencies. In this case, much more testing needs to be
done about expected behaviour
"""
dt = pd.Timedelta(1 / fs, "s")
streams_from_name = {k.name: v for k, v in streams.items()}
data = np.empty(shape=(n_samples), dtype=np.float32)
data[:] = np.nan
for _idx, row in streams_to_read.iterrows():
data_file = row.loc["data_file"]
trace_index = row.loc["trace_index"]
read_from = row.loc["read_from"]
read_to = row.loc["read_to"]
data_sample_from = (read_from - from_time) / dt
data_sample_to = (read_to - from_time) / dt
if not data_sample_from.is_integer():
raise ValueError(f"Date sample from {data_sample_from} is not an integer")
if not data_sample_to.is_integer():
raise ValueError(f"Date sample to {data_sample_to} is not an integer")
data_sample_from = int(data_sample_from)
data_sample_to = int(data_sample_to)
logger.debug(f"Reading range {read_from} to {read_to} from stream {data_file}")
logger.debug(f"Data will cover samples {data_sample_from} to {data_sample_to}")
stream = streams_from_name[data_file]
data[data_sample_from : data_sample_to + 1] = get_stream_data(
dt, stream, trace_index, read_from, read_to
)
return data
[docs]def get_time_data(
fs: float,
id_map: Dict[str, str],
streams: Dict[Path, Stream],
table: pd.DataFrame,
first_time: pd.Timestamp,
last_time: pd.Timestamp,
from_time: pd.Timestamp,
to_time: pd.Timestamp,
) -> TimeData:
"""
Get time data covering from_time to to_time
Parameters
----------
fs : float
The sampling frequency
id_map : Dict[str, str]
The map from trace id to channel
streams : Dict[Path, Stream]
The streams
table : pd.DataFrame
The table with information about trace ranges for each file
first_time : pd.Timestamp
The common first_time for all traces and streams
last_time : pd.Timestamp
The common last_time for all traces and streams
from_time : pd.Timestamp
The from time for this interval of data
to_time : pd.Timestamp
The to time for this intervel of data
Returns
-------
TimeData
TimeData
Raises
------
NoDataInInterval
If there is no trace data in the interval from_time and to_time
ValueError
If the number of samples in the interval is not an integer. This is a
safety first approach for now that could fail at very high sampling
frequencies, in which case much more thorough testing would be better.
"""
# check there is actually data in the date range
if (first_time > last_time) or (to_time < first_time):
raise NoDataInInterval(first_time, last_time, from_time, to_time)
# correct the times if they are earlier than first time or after last time
if from_time < first_time:
logger.debug(f"Adjusted from time {from_time} to first time {first_time}")
from_time = first_time
if to_time > last_time:
logger.debug(f"Adjusted to time {to_time} to last time {last_time}")
to_time = last_time
logger.info(f"Extracting chunk between {from_time} and {to_time}")
n_samples = ((to_time - from_time) / pd.Timedelta(1 / fs, "s")) + 1
if not n_samples.is_integer():
raise ValueError("Number of calculated samples is not an integer")
n_samples = int(n_samples)
logger.debug(f"Expecting {n_samples} samples between {from_time} to {to_time}")
trace_ids = list(id_map.keys())
chans = [id_map[trace_id] for trace_id in trace_ids]
n_chans = len(chans)
chans_metadata: Dict[str, ChanMetadata] = {}
data = np.empty(shape=(n_chans, n_samples), dtype=np.float32)
for idx, trace_id in enumerate(trace_ids):
logger.info(f"Extracting data for trace {trace_id}, channel {chans[idx]}")
streams_to_read = get_streams_to_read(trace_id, table, from_time, to_time)
data[idx, :] = get_trace_data(
fs, streams, streams_to_read, from_time, n_samples
)
chans_metadata[chans[idx]] = ChanMetadata(
name=chans[idx],
data_files=list(streams_to_read["data_file"].values.tolist()),
chan_type=CHAN_TYPES[chans[idx]],
chan_source=trace_id,
)
metadata = TimeMetadata(
fs=fs,
first_time=from_time,
last_time=to_time,
n_samples=n_samples,
chans=chans,
n_chans=len(chans),
chans_metadata=chans_metadata,
)
return TimeData(metadata, data)
[docs]def get_processed_data(time_data: TimeData, processors: List[TimeProcess]) -> TimeData:
"""
Process time data
Parameters
----------
time_data : TimeData
TimeData to process
processors : List[TimeProcess]
The processors to run
Returns
-------
TimeData
The processed TimeData
"""
for process in processors:
time_data = process.run(time_data)
return time_data