Spaces:
Running
Running
| # Authors: The MNE-Python contributors. | |
| # License: BSD-3-Clause | |
| # Copyright the MNE-Python contributors. | |
| """EGI NetStation Load Function.""" | |
| import datetime | |
| import math | |
| import os.path as op | |
| import re | |
| from collections import OrderedDict | |
| from pathlib import Path | |
| import numpy as np | |
| from ..._fiff.constants import FIFF | |
| from ..._fiff.meas_info import _empty_info, _ensure_meas_date_none_or_dt, create_info | |
| from ..._fiff.proj import setup_proj | |
| from ..._fiff.utils import _create_chs, _mult_cal_one | |
| from ...annotations import Annotations | |
| from ...channels.montage import make_dig_montage | |
| from ...evoked import EvokedArray | |
| from ...utils import _check_fname, _check_option, _soft_import, logger, verbose, warn | |
| from ..base import BaseRaw | |
| from .events import _combine_triggers, _read_events, _triage_include_exclude | |
| from .general import ( | |
| _block_r, | |
| _extract, | |
| _get_blocks, | |
| _get_ep_info, | |
| _get_gains, | |
| _get_signalfname, | |
| ) | |
| REFERENCE_NAMES = ("VREF", "Vertex Reference") | |
| def _read_mff_header(filepath): | |
| """Read mff header.""" | |
| _soft_import("defusedxml", "reading EGI MFF data") | |
| from defusedxml.minidom import parse | |
| all_files = _get_signalfname(filepath) | |
| eeg_file = all_files["EEG"]["signal"] | |
| eeg_info_file = all_files["EEG"]["info"] | |
| info_filepath = op.join(filepath, "info.xml") # add with filepath | |
| tags = ["mffVersion", "recordTime"] | |
| version_and_date = _extract(tags, filepath=info_filepath) | |
| version = "" | |
| if len(version_and_date["mffVersion"]): | |
| version = version_and_date["mffVersion"][0] | |
| fname = op.join(filepath, eeg_file) | |
| signal_blocks = _get_blocks(fname) | |
| epochs = _get_ep_info(filepath) | |
| summaryinfo = dict(eeg_fname=eeg_file, info_fname=eeg_info_file) | |
| summaryinfo.update(signal_blocks) | |
| # sanity check and update relevant values | |
| record_time = version_and_date["recordTime"][0] | |
| # e.g., | |
| # 2018-07-30T10:47:01.021673-04:00 | |
| # 2017-09-20T09:55:44.072000000+01:00 | |
| g = re.match( | |
| r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.(\d{6}(?:\d{3})?)[+-]\d{2}:\d{2}", # noqa: E501 | |
| record_time, | |
| ) | |
| if g is None: | |
| raise RuntimeError(f"Could not parse recordTime {repr(record_time)}") | |
| frac = g.groups()[0] | |
| assert len(frac) in (6, 9) and all(f.isnumeric() for f in frac) # regex | |
| div = 1000 if len(frac) == 6 else 1000000 | |
| for key in ("last_samps", "first_samps"): | |
| # convert from times in µS to samples | |
| for ei, e in enumerate(epochs[key]): | |
| if e % div != 0: | |
| raise RuntimeError(f"Could not parse epoch time {e}") | |
| epochs[key][ei] = e // div | |
| epochs[key] = np.array(epochs[key], np.uint64) | |
| # I guess they refer to times in milliseconds? | |
| # What we really need to do here is: | |
| # epochs[key] *= signal_blocks['sfreq'] | |
| # epochs[key] //= 1000 | |
| # But that multiplication risks an overflow, so let's only multiply | |
| # by what we need to (e.g., a sample rate of 500 means we can multiply | |
| # by 1 and divide by 2 rather than multiplying by 500 and dividing by | |
| # 1000) | |
| numerator = int(signal_blocks["sfreq"]) | |
| denominator = 1000 | |
| this_gcd = math.gcd(numerator, denominator) | |
| numerator = numerator // this_gcd | |
| denominator = denominator // this_gcd | |
| with np.errstate(over="raise"): | |
| epochs[key] *= numerator | |
| epochs[key] //= denominator | |
| # Should be safe to cast to int now, which makes things later not | |
| # upbroadcast to float | |
| epochs[key] = epochs[key].astype(np.int64) | |
| n_samps_block = signal_blocks["samples_block"].sum() | |
| n_samps_epochs = (epochs["last_samps"] - epochs["first_samps"]).sum() | |
| bad = ( | |
| n_samps_epochs != n_samps_block | |
| or not (epochs["first_samps"] < epochs["last_samps"]).all() | |
| or not (epochs["first_samps"][1:] >= epochs["last_samps"][:-1]).all() | |
| ) | |
| if bad: | |
| raise RuntimeError( | |
| "EGI epoch first/last samps could not be parsed:\n" | |
| f"{list(epochs['first_samps'])}\n{list(epochs['last_samps'])}" | |
| ) | |
| summaryinfo.update(epochs) | |
| # index which samples in raw are actually readable from disk (i.e., not | |
| # in a skip) | |
| disk_samps = np.full(epochs["last_samps"][-1], -1) | |
| offset = 0 | |
| for first, last in zip(epochs["first_samps"], epochs["last_samps"]): | |
| n_this = last - first | |
| disk_samps[first:last] = np.arange(offset, offset + n_this) | |
| offset += n_this | |
| summaryinfo["disk_samps"] = disk_samps | |
| # Add the sensor info. | |
| sensor_layout_file = op.join(filepath, "sensorLayout.xml") | |
| sensor_layout_obj = parse(sensor_layout_file) | |
| summaryinfo["device"] = sensor_layout_obj.getElementsByTagName("name")[ | |
| 0 | |
| ].firstChild.data | |
| sensors = sensor_layout_obj.getElementsByTagName("sensor") | |
| chan_type = list() | |
| chan_unit = list() | |
| n_chans = 0 | |
| numbers = list() # used for identification | |
| for sensor in sensors: | |
| sensortype = int(sensor.getElementsByTagName("type")[0].firstChild.data) | |
| if sensortype in [0, 1]: | |
| sn = sensor.getElementsByTagName("number")[0].firstChild.data | |
| sn = sn.encode() | |
| numbers.append(sn) | |
| chan_type.append("eeg") | |
| chan_unit.append("uV") | |
| n_chans = n_chans + 1 | |
| if n_chans != summaryinfo["n_channels"]: | |
| raise RuntimeError( | |
| f"Number of defined channels ({n_chans}) did not match the " | |
| f"expected channels ({summaryinfo['n_channels']})." | |
| ) | |
| # Check presence of PNS data | |
| pns_names = [] | |
| if "PNS" in all_files: | |
| pns_fpath = op.join(filepath, all_files["PNS"]["signal"]) | |
| pns_blocks = _get_blocks(pns_fpath) | |
| pns_samples = pns_blocks["samples_block"] | |
| signal_samples = signal_blocks["samples_block"] | |
| same_blocks = np.array_equal( | |
| pns_samples[:-1], signal_samples[:-1] | |
| ) and pns_samples[-1] in (signal_samples[-1] - np.arange(2)) | |
| if not same_blocks: | |
| raise RuntimeError( | |
| "PNS and signals samples did not match:\n" | |
| f"{list(pns_samples)}\nvs\n{list(signal_samples)}" | |
| ) | |
| pns_file = op.join(filepath, "pnsSet.xml") | |
| pns_obj = parse(pns_file) | |
| sensors = pns_obj.getElementsByTagName("sensor") | |
| pns_types = [] | |
| pns_units = [] | |
| for sensor in sensors: | |
| # sensor number: | |
| # sensor.getElementsByTagName('number')[0].firstChild.data | |
| name = sensor.getElementsByTagName("name")[0].firstChild.data | |
| unit_elem = sensor.getElementsByTagName("unit")[0].firstChild | |
| unit = "" | |
| if unit_elem is not None: | |
| unit = unit_elem.data | |
| if name == "ECG": | |
| ch_type = "ecg" | |
| elif "EMG" in name: | |
| ch_type = "emg" | |
| else: | |
| ch_type = "bio" | |
| pns_types.append(ch_type) | |
| pns_units.append(unit) | |
| pns_names.append(name) | |
| summaryinfo.update( | |
| pns_types=pns_types, | |
| pns_units=pns_units, | |
| pns_fname=all_files["PNS"]["signal"], | |
| pns_sample_blocks=pns_blocks, | |
| ) | |
| summaryinfo.update( | |
| pns_names=pns_names, | |
| version=version, | |
| date=version_and_date["recordTime"][0], | |
| chan_type=chan_type, | |
| chan_unit=chan_unit, | |
| numbers=numbers, | |
| ) | |
| return summaryinfo | |
| def _read_header(input_fname): | |
| """Obtain the headers from the file package mff. | |
| Parameters | |
| ---------- | |
| input_fname : path-like | |
| Path for the file | |
| Returns | |
| ------- | |
| info : dict | |
| Main headers set. | |
| """ | |
| input_fname = str(input_fname) # cast to str any Paths | |
| mff_hdr = _read_mff_header(input_fname) | |
| with open(input_fname + "/signal1.bin", "rb") as fid: | |
| version = np.fromfile(fid, np.int32, 1)[0] | |
| """ | |
| the datetime.strptime .f directive (milleseconds) | |
| will only accept up to 6 digits. if there are more than | |
| six millesecond digits in the provided timestamp string | |
| (i.e. because of trailing zeros, as in test_egi_pns.mff) | |
| then slice both the first 26 elements and the last 6 | |
| elements of the timestamp string to truncate the | |
| milleseconds to 6 digits and extract the timezone, | |
| and then piece these together and assign back to mff_hdr['date'] | |
| """ | |
| if len(mff_hdr["date"]) > 32: | |
| dt, tz = [mff_hdr["date"][:26], mff_hdr["date"][-6:]] | |
| mff_hdr["date"] = dt + tz | |
| time_n = datetime.datetime.strptime(mff_hdr["date"], "%Y-%m-%dT%H:%M:%S.%f%z") | |
| info = dict( | |
| version=version, | |
| meas_dt_local=time_n, | |
| utc_offset=time_n.strftime("%z"), | |
| gain=0, | |
| bits=0, | |
| value_range=0, | |
| ) | |
| info.update( | |
| n_categories=0, | |
| n_segments=1, | |
| n_events=0, | |
| event_codes=[], | |
| category_names=[], | |
| category_lengths=[], | |
| pre_baseline=0, | |
| ) | |
| info.update(mff_hdr) | |
| return info | |
| def _get_eeg_calibration_info(filepath, egi_info): | |
| """Calculate calibration info for EEG channels.""" | |
| gains = _get_gains(op.join(filepath, egi_info["info_fname"])) | |
| if egi_info["value_range"] != 0 and egi_info["bits"] != 0: | |
| cals = [egi_info["value_range"] / 2 ** egi_info["bits"]] * len( | |
| egi_info["chan_type"] | |
| ) | |
| else: | |
| cal_scales = {"uV": 1e-6, "V": 1} | |
| cals = [cal_scales[t] for t in egi_info["chan_unit"]] | |
| if "gcal" in gains: | |
| cals *= gains["gcal"] | |
| return cals | |
| def _read_locs(filepath, egi_info, channel_naming): | |
| """Read channel locations.""" | |
| _soft_import("defusedxml", "reading EGI MFF data") | |
| from defusedxml.minidom import parse | |
| fname = op.join(filepath, "coordinates.xml") | |
| if not op.exists(fname): | |
| warn("File coordinates.xml not found, not setting channel locations") | |
| ch_names = [channel_naming % (i + 1) for i in range(egi_info["n_channels"])] | |
| return ch_names, None | |
| dig_ident_map = { | |
| "Left periauricular point": "lpa", | |
| "Right periauricular point": "rpa", | |
| "Nasion": "nasion", | |
| } | |
| numbers = np.array(egi_info["numbers"]) | |
| coordinates = parse(fname) | |
| sensors = coordinates.getElementsByTagName("sensor") | |
| ch_pos = OrderedDict() | |
| hsp = list() | |
| nlr = dict() | |
| ch_names = list() | |
| for sensor in sensors: | |
| name_element = sensor.getElementsByTagName("name")[0].firstChild | |
| num_element = sensor.getElementsByTagName("number")[0].firstChild | |
| name = ( | |
| channel_naming % int(num_element.data) | |
| if name_element is None | |
| else name_element.data | |
| ) | |
| nr = num_element.data.encode() | |
| coords = [ | |
| float(sensor.getElementsByTagName(coord)[0].firstChild.data) | |
| for coord in "xyz" | |
| ] | |
| loc = np.array(coords) / 100 # cm -> m | |
| # create dig entry | |
| if name in dig_ident_map: | |
| nlr[dig_ident_map[name]] = loc | |
| else: | |
| # id_ is the index of the channel in egi_info['numbers'] | |
| id_ = np.flatnonzero(numbers == nr) | |
| # if it's not in egi_info['numbers'], it's a headshape point | |
| if len(id_) == 0: | |
| hsp.append(loc) | |
| # not HSP, must be a data or reference channel | |
| else: | |
| ch_names.append(name) | |
| ch_pos[name] = loc | |
| mon = make_dig_montage(ch_pos=ch_pos, hsp=hsp, **nlr) | |
| return ch_names, mon | |
| def _add_pns_channel_info(chs, egi_info, ch_names): | |
| """Add info for PNS channels to channel info dict.""" | |
| for i_ch, ch_name in enumerate(egi_info["pns_names"]): | |
| idx = ch_names.index(ch_name) | |
| ch_type = egi_info["pns_types"][i_ch] | |
| type_to_kind_map = {"ecg": FIFF.FIFFV_ECG_CH, "emg": FIFF.FIFFV_EMG_CH} | |
| ch_kind = type_to_kind_map.get(ch_type, FIFF.FIFFV_BIO_CH) | |
| ch_unit = FIFF.FIFF_UNIT_V | |
| ch_cal = 1e-6 | |
| if egi_info["pns_units"][i_ch] != "uV": | |
| ch_unit = FIFF.FIFF_UNIT_NONE | |
| ch_cal = 1.0 | |
| chs[idx].update( | |
| cal=ch_cal, kind=ch_kind, coil_type=FIFF.FIFFV_COIL_NONE, unit=ch_unit | |
| ) | |
| return chs | |
| def _read_raw_egi_mff( | |
| input_fname, | |
| eog=None, | |
| misc=None, | |
| include=None, | |
| exclude=None, | |
| preload=False, | |
| channel_naming="E%d", | |
| *, | |
| events_as_annotations=True, | |
| verbose=None, | |
| ): | |
| """Read EGI mff binary as raw object.""" | |
| return RawMff( | |
| input_fname, | |
| eog, | |
| misc, | |
| include, | |
| exclude, | |
| preload, | |
| channel_naming, | |
| events_as_annotations=events_as_annotations, | |
| verbose=verbose, | |
| ) | |
| class RawMff(BaseRaw): | |
| """RawMff class.""" | |
| _extra_attributes = ("event_id",) | |
| def __init__( | |
| self, | |
| input_fname, | |
| eog=None, | |
| misc=None, | |
| include=None, | |
| exclude=None, | |
| preload=False, | |
| channel_naming="E%d", | |
| *, | |
| events_as_annotations=True, | |
| verbose=None, | |
| ): | |
| """Init the RawMff class.""" | |
| input_fname = str( | |
| _check_fname( | |
| input_fname, | |
| "read", | |
| True, | |
| "input_fname", | |
| need_dir=True, | |
| ) | |
| ) | |
| logger.info(f"Reading EGI MFF Header from {input_fname}...") | |
| egi_info = _read_header(input_fname) | |
| if eog is None: | |
| eog = [] | |
| if misc is None: | |
| misc = np.where(np.array(egi_info["chan_type"]) != "eeg")[0].tolist() | |
| logger.info(" Reading events ...") | |
| egi_events, egi_info, mff_events = _read_events(input_fname, egi_info) | |
| cals = _get_eeg_calibration_info(input_fname, egi_info) | |
| logger.info(" Assembling measurement info ...") | |
| event_codes = egi_info["event_codes"] | |
| include = _triage_include_exclude(include, exclude, egi_events, egi_info) | |
| if egi_info["n_events"] > 0 and not events_as_annotations: | |
| logger.info(' Synthesizing trigger channel "STI 014" ...') | |
| if all(ch.startswith("D") for ch in include): | |
| # support the DIN format DIN1, DIN2, ..., DIN9, DI10, DI11, ... DI99, | |
| # D100, D101, ..., D255 that we get when sending 0-255 triggers on a | |
| # parallel port. | |
| events_ids = list() | |
| for ch in include: | |
| while not ch[0].isnumeric(): | |
| ch = ch[1:] | |
| events_ids.append(int(ch)) | |
| else: | |
| events_ids = np.arange(len(include)) + 1 | |
| egi_info["new_trigger"] = _combine_triggers( | |
| egi_events[[c in include for c in event_codes]], remapping=events_ids | |
| ) | |
| self.event_id = dict( | |
| zip([e for e in event_codes if e in include], events_ids) | |
| ) | |
| if egi_info["new_trigger"] is not None: | |
| egi_events = np.vstack([egi_events, egi_info["new_trigger"]]) | |
| else: | |
| self.event_id = None | |
| egi_info["new_trigger"] = None | |
| assert egi_events.shape[1] == egi_info["last_samps"][-1] | |
| meas_dt_utc = egi_info["meas_dt_local"].astimezone(datetime.timezone.utc) | |
| info = _empty_info(egi_info["sfreq"]) | |
| info["meas_date"] = _ensure_meas_date_none_or_dt(meas_dt_utc) | |
| info["utc_offset"] = egi_info["utc_offset"] | |
| info["device_info"] = dict(type=egi_info["device"]) | |
| # read in the montage, if it exists | |
| ch_names, mon = _read_locs(input_fname, egi_info, channel_naming) | |
| # Second: Stim | |
| ch_names.extend(list(egi_info["event_codes"])) | |
| n_extra = len(event_codes) + len(misc) + len(eog) + len(egi_info["pns_names"]) | |
| if egi_info["new_trigger"] is not None: | |
| ch_names.append("STI 014") # channel for combined events | |
| n_extra += 1 | |
| # Third: PNS | |
| ch_names.extend(egi_info["pns_names"]) | |
| cals = np.concatenate([cals, np.ones(n_extra)]) | |
| assert len(cals) == len(ch_names), (len(cals), len(ch_names)) | |
| # Actually create channels as EEG, then update stim and PNS | |
| ch_coil = FIFF.FIFFV_COIL_EEG | |
| ch_kind = FIFF.FIFFV_EEG_CH | |
| chs = _create_chs(ch_names, cals, ch_coil, ch_kind, eog, (), (), misc) | |
| sti_ch_idx = [ | |
| i | |
| for i, name in enumerate(ch_names) | |
| if name.startswith("STI") or name in event_codes | |
| ] | |
| for idx in sti_ch_idx: | |
| chs[idx].update( | |
| { | |
| "unit_mul": FIFF.FIFF_UNITM_NONE, | |
| "cal": cals[idx], | |
| "kind": FIFF.FIFFV_STIM_CH, | |
| "coil_type": FIFF.FIFFV_COIL_NONE, | |
| "unit": FIFF.FIFF_UNIT_NONE, | |
| } | |
| ) | |
| chs = _add_pns_channel_info(chs, egi_info, ch_names) | |
| info["chs"] = chs | |
| info._unlocked = False | |
| info._update_redundant() | |
| if mon is not None: | |
| info.set_montage(mon, on_missing="ignore") | |
| ref_idx = np.flatnonzero(np.isin(mon.ch_names, REFERENCE_NAMES)) | |
| if len(ref_idx): | |
| ref_idx = ref_idx.item() | |
| ref_coords = info["chs"][int(ref_idx)]["loc"][:3] | |
| for chan in info["chs"]: | |
| if chan["kind"] == FIFF.FIFFV_EEG_CH: | |
| chan["loc"][3:6] = ref_coords | |
| file_bin = op.join(input_fname, egi_info["eeg_fname"]) | |
| egi_info["egi_events"] = egi_events | |
| # Check how many channels to read are from EEG | |
| keys = ("eeg", "sti", "pns") | |
| idx = dict() | |
| idx["eeg"] = np.where([ch["kind"] == FIFF.FIFFV_EEG_CH for ch in chs])[0] | |
| idx["sti"] = np.where([ch["kind"] == FIFF.FIFFV_STIM_CH for ch in chs])[0] | |
| idx["pns"] = np.where( | |
| [ | |
| ch["kind"] in (FIFF.FIFFV_ECG_CH, FIFF.FIFFV_EMG_CH, FIFF.FIFFV_BIO_CH) | |
| for ch in chs | |
| ] | |
| )[0] | |
| # By construction this should always be true, but check anyway | |
| if not np.array_equal( | |
| np.concatenate([idx[key] for key in keys]), np.arange(len(chs)) | |
| ): | |
| raise ValueError( | |
| "Currently interlacing EEG and PNS channels is not supported" | |
| ) | |
| egi_info["kind_bounds"] = [0] | |
| for key in keys: | |
| egi_info["kind_bounds"].append(len(idx[key])) | |
| egi_info["kind_bounds"] = np.cumsum(egi_info["kind_bounds"]) | |
| assert egi_info["kind_bounds"][0] == 0 | |
| assert egi_info["kind_bounds"][-1] == info["nchan"] | |
| first_samps = [0] | |
| last_samps = [egi_info["last_samps"][-1] - 1] | |
| annot = dict(onset=list(), duration=list(), description=list()) | |
| if len(idx["pns"]): | |
| # PNS Data is present and should be read: | |
| egi_info["pns_filepath"] = op.join(input_fname, egi_info["pns_fname"]) | |
| # Check for PNS bug immediately | |
| pns_samples = np.sum(egi_info["pns_sample_blocks"]["samples_block"]) | |
| eeg_samples = np.sum(egi_info["samples_block"]) | |
| if pns_samples == eeg_samples - 1: | |
| warn("This file has the EGI PSG sample bug") | |
| annot["onset"].append(last_samps[-1] / egi_info["sfreq"]) | |
| annot["duration"].append(1 / egi_info["sfreq"]) | |
| annot["description"].append("BAD_EGI_PSG") | |
| elif pns_samples != eeg_samples: | |
| raise RuntimeError( | |
| f"PNS samples ({pns_samples}) did not match EEG samples " | |
| f"({eeg_samples})." | |
| ) | |
| super().__init__( | |
| info, | |
| preload=preload, | |
| orig_format="single", | |
| filenames=[file_bin], | |
| first_samps=first_samps, | |
| last_samps=last_samps, | |
| raw_extras=[egi_info], | |
| verbose=verbose, | |
| ) | |
| # Annotate acquisition skips | |
| for first, prev_last in zip( | |
| egi_info["first_samps"][1:], egi_info["last_samps"][:-1] | |
| ): | |
| gap = first - prev_last | |
| assert gap >= 0 | |
| if gap: | |
| annot["onset"].append((prev_last - 0.5) / egi_info["sfreq"]) | |
| annot["duration"].append(gap / egi_info["sfreq"]) | |
| annot["description"].append("BAD_ACQ_SKIP") | |
| # create events from annotations | |
| if events_as_annotations: | |
| for code, samples in mff_events.items(): | |
| if code not in include: | |
| continue | |
| annot["onset"].extend(np.array(samples) / egi_info["sfreq"]) | |
| annot["duration"].extend([0.0] * len(samples)) | |
| annot["description"].extend([code] * len(samples)) | |
| if len(annot["onset"]): | |
| self.set_annotations(Annotations(**annot)) | |
| def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): | |
| """Read a chunk of data.""" | |
| logger.debug(f"Reading MFF {start:6d} ... {stop:6d} ...") | |
| dtype = "<f4" # Data read in four byte floats. | |
| egi_info = self._raw_extras[fi] | |
| one = np.zeros((egi_info["kind_bounds"][-1], stop - start)) | |
| # info about the binary file structure | |
| n_channels = egi_info["n_channels"] | |
| samples_block = egi_info["samples_block"] | |
| # Check how many channels to read are from each type | |
| bounds = egi_info["kind_bounds"] | |
| if isinstance(idx, slice): | |
| idx = np.arange(idx.start, idx.stop) | |
| eeg_out = np.where(idx < bounds[1])[0] | |
| eeg_one = idx[eeg_out, np.newaxis] | |
| eeg_in = idx[eeg_out] | |
| stim_out = np.where((idx >= bounds[1]) & (idx < bounds[2]))[0] | |
| stim_one = idx[stim_out] | |
| stim_in = idx[stim_out] - bounds[1] | |
| pns_out = np.where((idx >= bounds[2]) & (idx < bounds[3]))[0] | |
| pns_in = idx[pns_out] - bounds[2] | |
| pns_one = idx[pns_out, np.newaxis] | |
| del eeg_out, stim_out, pns_out | |
| # take into account events (already extended to correct size) | |
| one[stim_one, :] = egi_info["egi_events"][stim_in, start:stop] | |
| # Convert start and stop to limits in terms of the data | |
| # actually on disk, plus an indexer (disk_use_idx) that populates | |
| # the potentially larger `data` with it, taking skips into account | |
| disk_samps = egi_info["disk_samps"][start:stop] | |
| disk_use_idx = np.where(disk_samps > -1)[0] | |
| # short circuit in case we don't need any samples | |
| if not len(disk_use_idx): | |
| _mult_cal_one(data, one, idx, cals, mult) | |
| return | |
| start = disk_samps[disk_use_idx[0]] | |
| stop = disk_samps[disk_use_idx[-1]] + 1 | |
| assert len(disk_use_idx) == stop - start | |
| # Get starting/stopping block/samples | |
| block_samples_offset = np.cumsum(samples_block) | |
| offset_blocks = np.sum(block_samples_offset <= start) | |
| offset_samples = start - ( | |
| block_samples_offset[offset_blocks - 1] if offset_blocks > 0 else 0 | |
| ) | |
| # TODO: Refactor this reading with the PNS reading in a single function | |
| # (DRY) | |
| samples_to_read = stop - start | |
| with open(self.filenames[fi], "rb", buffering=0) as fid: | |
| # Go to starting block | |
| current_block = 0 | |
| current_block_info = None | |
| current_data_sample = 0 | |
| while current_block < offset_blocks: | |
| this_block_info = _block_r(fid) | |
| if this_block_info is not None: | |
| current_block_info = this_block_info | |
| fid.seek(current_block_info["block_size"], 1) | |
| current_block += 1 | |
| # Start reading samples | |
| while samples_to_read > 0: | |
| logger.debug(f" Reading from block {current_block}") | |
| this_block_info = _block_r(fid) | |
| current_block += 1 | |
| if this_block_info is not None: | |
| current_block_info = this_block_info | |
| to_read = current_block_info["nsamples"] * current_block_info["nc"] | |
| block_data = np.fromfile(fid, dtype, to_read) | |
| block_data = block_data.reshape(n_channels, -1, order="C") | |
| # Compute indexes | |
| samples_read = block_data.shape[1] | |
| logger.debug(f" Read {samples_read} samples") | |
| logger.debug(f" Offset {offset_samples} samples") | |
| if offset_samples > 0: | |
| # First block read, skip to the offset: | |
| block_data = block_data[:, offset_samples:] | |
| samples_read = samples_read - offset_samples | |
| offset_samples = 0 | |
| if samples_to_read < samples_read: | |
| # Last block to read, skip the last samples | |
| block_data = block_data[:, :samples_to_read] | |
| samples_read = samples_to_read | |
| logger.debug(f" Keep {samples_read} samples") | |
| s_start = current_data_sample | |
| s_end = s_start + samples_read | |
| one[eeg_one, disk_use_idx[s_start:s_end]] = block_data[eeg_in] | |
| samples_to_read = samples_to_read - samples_read | |
| current_data_sample = current_data_sample + samples_read | |
| if len(pns_one) > 0: | |
| # PNS Data is present and should be read: | |
| pns_filepath = egi_info["pns_filepath"] | |
| pns_info = egi_info["pns_sample_blocks"] | |
| n_channels = pns_info["n_channels"] | |
| samples_block = pns_info["samples_block"] | |
| # Get starting/stopping block/samples | |
| block_samples_offset = np.cumsum(samples_block) | |
| offset_blocks = np.sum(block_samples_offset < start) | |
| offset_samples = start - ( | |
| block_samples_offset[offset_blocks - 1] if offset_blocks > 0 else 0 | |
| ) | |
| samples_to_read = stop - start | |
| with open(pns_filepath, "rb", buffering=0) as fid: | |
| # Check file size | |
| fid.seek(0, 2) | |
| file_size = fid.tell() | |
| fid.seek(0) | |
| # Go to starting block | |
| current_block = 0 | |
| current_block_info = None | |
| current_data_sample = 0 | |
| while current_block < offset_blocks: | |
| this_block_info = _block_r(fid) | |
| if this_block_info is not None: | |
| current_block_info = this_block_info | |
| fid.seek(current_block_info["block_size"], 1) | |
| current_block += 1 | |
| # Start reading samples | |
| while samples_to_read > 0: | |
| if samples_to_read == 1 and fid.tell() == file_size: | |
| # We are in the presence of the EEG bug | |
| # fill with zeros and break the loop | |
| one[pns_one, -1] = 0 | |
| break | |
| this_block_info = _block_r(fid) | |
| if this_block_info is not None: | |
| current_block_info = this_block_info | |
| to_read = current_block_info["nsamples"] * current_block_info["nc"] | |
| block_data = np.fromfile(fid, dtype, to_read) | |
| block_data = block_data.reshape(n_channels, -1, order="C") | |
| # Compute indexes | |
| samples_read = block_data.shape[1] | |
| if offset_samples > 0: | |
| # First block read, skip to the offset: | |
| block_data = block_data[:, offset_samples:] | |
| samples_read = samples_read - offset_samples | |
| offset_samples = 0 | |
| if samples_to_read < samples_read: | |
| # Last block to read, skip the last samples | |
| block_data = block_data[:, :samples_to_read] | |
| samples_read = samples_to_read | |
| s_start = current_data_sample | |
| s_end = s_start + samples_read | |
| one[pns_one, disk_use_idx[s_start:s_end]] = block_data[pns_in] | |
| samples_to_read = samples_to_read - samples_read | |
| current_data_sample = current_data_sample + samples_read | |
| # do the calibration | |
| _mult_cal_one(data, one, idx, cals, mult) | |
| def read_evokeds_mff( | |
| fname, condition=None, channel_naming="E%d", baseline=None, verbose=None | |
| ): | |
| """Read averaged MFF file as EvokedArray or list of EvokedArray. | |
| Parameters | |
| ---------- | |
| fname : path-like | |
| File path to averaged MFF file. Should end in ``.mff``. | |
| condition : int or str | list of int or str | None | |
| The index (indices) or category (categories) from which to read in | |
| data. Averaged MFF files can contain separate averages for different | |
| categories. These can be indexed by the block number or the category | |
| name. If ``condition`` is a list or None, a list of EvokedArray objects | |
| is returned. | |
| channel_naming : str | |
| Channel naming convention for EEG channels. Defaults to 'E%%d' | |
| (resulting in channel names 'E1', 'E2', 'E3'...). | |
| baseline : None (default) or tuple of length 2 | |
| The time interval to apply baseline correction. If None do not apply | |
| it. If baseline is (a, b) the interval is between "a (s)" and "b (s)". | |
| If a is None the beginning of the data is used and if b is None then b | |
| is set to the end of the interval. If baseline is equal to (None, None) | |
| all the time interval is used. Correction is applied by computing mean | |
| of the baseline period and subtracting it from the data. The baseline | |
| (a, b) includes both endpoints, i.e. all timepoints t such that | |
| a <= t <= b. | |
| %(verbose)s | |
| Returns | |
| ------- | |
| evoked : EvokedArray or list of EvokedArray | |
| The evoked dataset(s); one EvokedArray if condition is int or str, | |
| or list of EvokedArray if condition is None or list. | |
| Raises | |
| ------ | |
| ValueError | |
| If ``fname`` has file extension other than '.mff'. | |
| ValueError | |
| If the MFF file specified by ``fname`` is not averaged. | |
| ValueError | |
| If no categories.xml file in MFF directory specified by ``fname``. | |
| See Also | |
| -------- | |
| Evoked, EvokedArray, create_info | |
| Notes | |
| ----- | |
| .. versionadded:: 0.22 | |
| """ | |
| mffpy = _import_mffpy() | |
| # Confirm `fname` is a path to an MFF file | |
| fname = Path(fname) # should be replace with _check_fname | |
| if not fname.suffix == ".mff": | |
| raise ValueError('fname must be an MFF file with extension ".mff".') | |
| # Confirm the input MFF is averaged | |
| mff = mffpy.Reader(fname) | |
| try: | |
| flavor = mff.mff_flavor | |
| except AttributeError: # < 6.3 | |
| flavor = mff.flavor | |
| if flavor not in ("averaged", "segmented"): # old, new names | |
| raise ValueError( | |
| f"{fname} is a {flavor} MFF file. " | |
| "fname must be the path to an averaged MFF file." | |
| ) | |
| # Check for categories.xml file | |
| if "categories.xml" not in mff.directory.listdir(): | |
| raise ValueError( | |
| "categories.xml not found in MFF directory. " | |
| f"{fname} may not be an averaged MFF file." | |
| ) | |
| return_list = True | |
| if condition is None: | |
| categories = mff.categories.categories | |
| condition = list(categories.keys()) | |
| elif not isinstance(condition, list): | |
| condition = [condition] | |
| return_list = False | |
| logger.info(f"Reading {len(condition)} evoked datasets from {fname} ...") | |
| output = [ | |
| _read_evoked_mff( | |
| fname, c, channel_naming=channel_naming, verbose=verbose | |
| ).apply_baseline(baseline) | |
| for c in condition | |
| ] | |
| return output if return_list else output[0] | |
| def _read_evoked_mff(fname, condition, channel_naming="E%d", verbose=None): | |
| """Read evoked data from MFF file.""" | |
| import mffpy | |
| egi_info = _read_header(fname) | |
| mff = mffpy.Reader(fname) | |
| categories = mff.categories.categories | |
| if isinstance(condition, str): | |
| # Condition is interpreted as category name | |
| category = _check_option( | |
| "condition", condition, categories, extra="provided as category name" | |
| ) | |
| epoch = mff.epochs[category] | |
| elif isinstance(condition, int): | |
| # Condition is interpreted as epoch index | |
| try: | |
| epoch = mff.epochs[condition] | |
| except IndexError: | |
| raise ValueError( | |
| f'"condition" parameter ({condition}), provided ' | |
| "as epoch index, is out of range for available " | |
| f"epochs ({len(mff.epochs)})." | |
| ) | |
| category = epoch.name | |
| else: | |
| raise TypeError('"condition" parameter must be either int or str.') | |
| # Read in signals from the target epoch | |
| data = mff.get_physical_samples_from_epoch(epoch) | |
| eeg_data, t0 = data["EEG"] | |
| if "PNSData" in data: | |
| pns_data, t0 = data["PNSData"] | |
| all_data = np.vstack((eeg_data, pns_data)) | |
| ch_types = egi_info["chan_type"] + egi_info["pns_types"] | |
| else: | |
| all_data = eeg_data | |
| ch_types = egi_info["chan_type"] | |
| all_data *= 1e-6 # convert to volts | |
| # Load metadata into info object | |
| # Exclude info['meas_date'] because record time info in | |
| # averaged MFF is the time of the averaging, not true record time. | |
| ch_names, mon = _read_locs(fname, egi_info, channel_naming) | |
| ch_names.extend(egi_info["pns_names"]) | |
| info = create_info(ch_names, mff.sampling_rates["EEG"], ch_types) | |
| with info._unlock(): | |
| info["device_info"] = dict(type=egi_info["device"]) | |
| info["nchan"] = sum(mff.num_channels.values()) | |
| # Add individual channel info | |
| # Get calibration info for EEG channels | |
| cals = _get_eeg_calibration_info(fname, egi_info) | |
| # Initialize calibration for PNS channels, will be updated later | |
| cals = np.concatenate([cals, np.repeat(1, len(egi_info["pns_names"]))]) | |
| ch_coil = FIFF.FIFFV_COIL_EEG | |
| ch_kind = FIFF.FIFFV_EEG_CH | |
| chs = _create_chs(ch_names, cals, ch_coil, ch_kind, (), (), (), ()) | |
| # Update PNS channel info | |
| chs = _add_pns_channel_info(chs, egi_info, ch_names) | |
| with info._unlock(): | |
| info["chs"] = chs | |
| if mon is not None: | |
| info.set_montage(mon, on_missing="ignore") | |
| # Add bad channels to info | |
| info["description"] = category | |
| try: | |
| channel_status = categories[category][0]["channelStatus"] | |
| except KeyError: | |
| warn( | |
| f"Channel status data not found for condition {category}. " | |
| "No channels will be marked as bad.", | |
| category=UserWarning, | |
| ) | |
| channel_status = None | |
| bads = [] | |
| if channel_status: | |
| for entry in channel_status: | |
| if entry["exclusion"] == "badChannels": | |
| if entry["signalBin"] == 1: | |
| # Add bad EEG channels | |
| for ch in entry["channels"]: | |
| bads.append(ch_names[ch - 1]) | |
| elif entry["signalBin"] == 2: | |
| # Add bad PNS channels | |
| for ch in entry["channels"]: | |
| bads.append(egi_info["pns_names"][ch - 1]) | |
| info["bads"] = bads | |
| # Add EEG reference to info | |
| try: | |
| fp = mff.directory.filepointer("history") | |
| except (ValueError, FileNotFoundError): # old (<=0.6.3) vs new mffpy | |
| pass | |
| else: | |
| with fp: | |
| history = mffpy.XML.from_file(fp) | |
| for entry in history.entries: | |
| if entry["method"] == "Montage Operations Tool": | |
| if "Average Reference" in entry["settings"]: | |
| # Average reference has been applied | |
| _, info = setup_proj(info) | |
| # Get nave from categories.xml | |
| try: | |
| nave = categories[category][0]["keys"]["#seg"]["data"] | |
| except KeyError: | |
| warn( | |
| f"Number of averaged epochs not found for condition {category}. " | |
| "nave will default to 1.", | |
| category=UserWarning, | |
| ) | |
| nave = 1 | |
| # Let tmin default to 0 | |
| return EvokedArray( | |
| all_data, info, tmin=0.0, comment=category, nave=nave, verbose=verbose | |
| ) | |
| def _import_mffpy(why="read averaged .mff files"): | |
| """Import and return module mffpy.""" | |
| try: | |
| import mffpy | |
| except ImportError as exp: | |
| msg = f"mffpy is required to {why}, got:\n{exp}" | |
| raise ImportError(msg) | |
| return mffpy | |