Converting eye-tracking into BIDS
Reading EyeLink's EDF recordings¶
Our ET device produces EyeLink's EDF recording files, which will be accessed with PyEDFRead.
PyEDFRead must be installed
Make sure PyEDFRead is installed on the computer where conversion will be executed, as described in the software annex.
-
Open a run's EDF file:
from pyedfread import edf recording, events, messages = edf.pread(str(DATA_PATH / "example-run.edf"), trial_marker=b"")
As a result, thre Pandas' dataframes will be obtained:
recording
contains the actual ET data,events
contains information of events reported by the device (most substantially these are fixations, saccades, and blinks), andmessages
with the logged textual messages sent to the device.
Processing the Pandas dataframes generated by PyEDFRead¶
The code snippets are derived from the EyeTrackingRun
object.
Disclamer: despite this code attempting to be general, some parts focus on single-eye recordings
Parsing the messages
dataframe¶
First, we clean the messages
dataframe, as it may have headers ending with space and drop duplicate rows:
messages = messages.rename(
columns={c: c.strip() for c in messages.columns.values}
).drop_duplicates()
Extract and reserve calibration messages.
EyeLink reports the calibration information as messages, which are marked up with a !CAL
prefix.
We extract them into a separate variable to process them at a later time:
# Extract calibration headers
_cal_hdr = messages.trialid.str.startswith("!CAL")
calibration = messages[_cal_hdr]
messages = messages.drop(messages.index[_cal_hdr])
Extracting the StartTime
and StopTime
metadata.
We then search for messages signaling the start and stop run events.
These events are stored in the JSON sidecar file annotating our _eyetrack.tsv.gz
file.
In our Psychopy experiment files, we configure messages be sent to the ET at these events.
The message changes a little with the RSfMRI experiment, but are usually like hello <taskname>
and bye <taskname>
, which are stored in the variables message_first_trigger
and message_last_trigger
variables, respectively.
In the last line, we clean up these used-up lines from the dataframe.
# Find Start time
start_rows = messages.trialid.str.contains(
message_first_trigger, case=False, regex=True
)
stop_rows = messages.trialid.str.contains(
message_last_trigger, case=False, regex=True
)
# Extract calibration headers
_cal_hdr = messages.trialid.str.startswith("!CAL")
calibration = messages[_cal_hdr]
messages = messages.drop(messages.index[_cal_hdr])
# Pick the LAST of the start messages
metadata["StartTime"] = (
int(messages[start_rows].trialid_time.values[-1])
if start_rows.any()
else None
)
# Pick the FIRST of the stop messages
metadata["StopTime"] = (
int(messages[stop_rows].trialid_time.values[0])
if stop_rows.any()
else None
)
# Drop start and stop messages from messages dataframe
messages = messages.loc[~start_rows & ~stop_rows, :]
Extracting basic metadata. Next, we extract basic metadata: the eye being sampled, the ET approach, and the sampling frequency. These are parsed from the following message:
mode_record = messages.trialid.str.startswith("!MODE RECORD")
meta_record = {
"freq": DEFAULT_FREQUENCY,
"mode": DEFAULT_MODE,
"eye": DEFAULT_EYE,
}
if mode_record.any():
try:
meta_record = re.match(
r"\!MODE RECORD (?P<mode>\w+) (?P<freq>\d+) \d \d (?P<eye>[RL]+)",
messages[mode_record].trialid.iloc[-1].strip(),
).groupdict()
meta_record["eye"] = EYE_CODE_MAP[meta_record["eye"]]
meta_record["mode"] = (
"P-CR" if meta_record["mode"] == "CR" else meta_record["mode"]
)
except AttributeError:
warn(
"Error extracting !MODE RECORD message, "
"using default frequency, mode, and eye"
)
finally:
messages = messages.loc[~mode_record]
eye = (
("right", "left") if meta_record["eye"] == "both" else (meta_record["eye"],)
)
metadata["SamplingFrequency"] = int(meta_record["freq"])
metadata["EyeTrackingMethod"] = meta_record["mode"]
metadata["RecordedEye"] = meta_record["eye"]
Extracting screen parameters.
We then parse the GAZE_COORDS
message, with format GAZE_COORDS 0.00 0.00 800.00 600.00
:
# Extract GAZE_COORDS message signaling start of recording
gaze_msg = messages.trialid.str.startswith("GAZE_COORDS")
metadata["ScreenAOIDefinition"] = [
"square",
DEFAULT_SCREEN,
]
if gaze_msg.any():
try:
gaze_record = re.match(
r"GAZE_COORDS (\d+\.\d+) (\d+\.\d+) (\d+\.\d+) (\d+\.\d+)",
messages[gaze_msg].trialid.iloc[-1].strip(),
).groups()
metadata["ScreenAOIDefinition"][1] = [
int(round(float(gaze_record[0]))),
int(round(float(gaze_record[2]))),
int(round(float(gaze_record[1]))),
int(round(float(gaze_record[3]))),
]
except AttributeError:
warn("Error extracting GAZE_COORDS")
finally:
messages = messages.loc[~gaze_msg]
Extracting parameters of the pupil fit model.
Next, we look into messages containing ELCL_PROC
and ELCL_EFIT_PARAMS
, which specify the pupil fit.
The format of these messages is as follows:
# Extract ELCL_PROC AND ELCL_EFIT_PARAMS to extract pupil fit method
pupilfit_msg = messages.trialid.str.startswith("ELCL_PROC")
if pupilfit_msg.any():
try:
pupilfit_method = [
val
for val in messages[pupilfit_msg]
.trialid.iloc[-1]
.strip()
.split(" ")[1:]
if val
]
metadata["PupilFitMethod"] = pupilfit_method[0].lower()
metadata["PupilFitMethodNumberOfParameters"] = int(
pupilfit_method[1].strip("(").strip(")")
)
except AttributeError:
warn("Error extracting ELCL_PROC (pupil fitting method)")
finally:
messages = messages.loc[~pupilfit_msg]
pupilfit_msg_params = messages.trialid.str.startswith("ELCL_EFIT_PARAMS")
if pupilfit_msg_params.any():
rows = messages[pupilfit_msg_params]
row = rows.trialid.values[-1].strip().split(" ")[1:]
try:
metadata["PupilFitParameters"] = [
tuple(float(val) for val in vals)
for k, vals in groupby(row, key=bool)
if k
]
except AttributeError:
warn("Error extracting ELCL_EFIT_PARAMS (pupil fitting parameters)")
finally:
messages = messages.loc[~pupilfit_msg_params]
Calibration validation.
If calibration was performed, most likely a validation procedure was executed next.
These messages take the format VALIDATE R 4POINT 4 RIGHT at 752,300 OFFSET 0.35 deg. -8.7,-3.8 pix.
# Extract VALIDATE messages for a calibration validation
validation_msg = messages.trialid.str.startswith("VALIDATE")
if validation_msg.any():
metadata["ValidationPosition"] = []
metadata["ValidationErrors"] = []
for i_row, validate_row in enumerate(messages[validation_msg].trialid.values):
prefix, suffix = validate_row.split("OFFSET")
validation_eye = (
f"eye{eye.index('right') + 1}"
if "RIGHT" in prefix
else f"eye{eye.index('left') + 1}"
)
validation_coords = [
int(val.strip())
for val in prefix.rsplit("at", 1)[-1].split(",")
if val.strip()
]
metadata["ValidationPosition"].append(
[validation_eye, validation_coords]
)
validate_values = [
float(val)
for val in re.match(
r"(-?\d+\.\d+) deg\.\s+(-?\d+\.\d+),(-?\d+\.\d+) pix\.",
suffix.strip(),
).groups()
]
metadata["ValidationErrors"].append(
(validation_eye, validate_values[0], tuple(validate_values[1:]))
)
messages = messages.loc[~validation_msg]
Extracting final bits of metadata.
Finally, we parse the last of the THRESHOLDS
messages.
# Extract THRESHOLDS messages prior recording and process last
thresholds_msg = messages.trialid.str.startswith("THRESHOLDS")
if thresholds_msg.any():
metadata["PupilThreshold"] = [None] * len(eye)
metadata["CornealReflectionThreshold"] = [None] * len(eye)
thresholds_chunks = (
messages[thresholds_msg].trialid.iloc[-1].strip().split(" ")[1:]
)
eye_index = eye.index(EYE_CODE_MAP[thresholds_chunks[0]])
metadata["PupilThreshold"][eye_index] = int(thresholds_chunks[-2])
metadata["CornealReflectionThreshold"][eye_index] = int(
thresholds_chunks[-1]
)
messages = messages.loc[~thresholds_msg]
Flush the remaining messages as a metadata entry.
In order to preserve all the information generated by the ET for further use, we dump the remainder of the messages dataframe into a metadata key LoggedMessages
:
# Consume the remainder of messages
if not messages.empty:
metadata["LoggedMessages"] = [
(int(msg_timestamp), msg.strip())
for msg_timestamp, msg in messages[["trialid_time", "trialid"]].values
]
Parsing the recording
dataframe¶
Curation of the input dataframe.
The recording
dataframe will have several issues that we want to resolve from the beginning.
These includes bugs and inconsistencies in some of the column names, dropping data with timestamp set to zero, implausible values, empty columns, etc.
# Normalize timestamps (should be int and strictly positive)
recording = recording.astype({"time": int})
recording = recording[recording["time"] > 0]
recording = recording.rename(
columns={
# Fix buggy header names generated by pyedfread
"fhxyvel": "fhxvel",
"frxyvel": "frxvel",
# Normalize weird header names generated by pyedfread
"rx": "screen_ppdeg_x_coordinate",
"ry": "screen_ppdeg_y_coordinate",
# Convert some BIDS columns
"time": "timestamp",
}
)
# Split extra columns from the dataframe
extra = recording[["flags", "input", "htype"]]
recording = recording.drop(columns=["flags", "input", "htype"])
# Remove columns that are always very close to zero
recording = recording.loc[:, (recording.abs() > 1e-8).any(axis=0)]
# Remove columns that are always 1e8 or more
recording = recording.loc[:, (recording.abs() < 1e8).any(axis=0)]
# Replace unreasonably high values with NaNs
recording = recording.replace({1e8: np.nan})
Remove columns that do not apply (e.g., only one eye recorded). Next, we clean up unnecessary columns.
# Drop one eye's columns if not interested in "both"
if remove_eye := set(("left", "right")) - set(eye):
remove_eye = remove_eye.pop() # Drop set decoration
recording = recording.reindex(
columns=[c for c in recording.columns if remove_eye not in c]
)
Clean-up pupil size and gaze position. These are the parameters we most likely we care for, so special curation is applied:
# Drop one eye's columns if not interested in "both"
if remove_eye := set(("left", "right")) - set(eye):
remove_eye = remove_eye.pop() # Drop set decoration
recording = recording.reindex(
columns=[c for c in recording.columns if remove_eye not in c]
)
for eyenum, eyename in enumerate(eye):
# Clean-up implausible values for pupil area (pa)
recording.loc[
recording[f"pa_{eyename}"] < 1, f"pa_{eyename}"
] = np.nan
recording = recording.rename(
columns={f"pa_{eyename}": f"eye{eyenum + 1}_pupil_size"}
)
# Clean-up implausible values for gaze x position
recording.loc[
(recording[f"gx_{eyename}"] < 0)
| (recording[f"gx_{eyename}"] > screen_resolution[0]),
f"gx_{eyename}",
] = np.nan
# Clean-up implausible values for gaze y position
recording.loc[
(recording[f"gy_{eyename}"] <= 0)
| (recording[f"gy_{eyename}"] > screen_resolution[1]),
f"gy_{eyename}",
] = np.nan
Munging columns to comply with BIDS. At this point, the dataframe is almost ready for writing out as BIDS.
# Interpolate BIDS column names
columns = list(
set(recording.columns)
- set(
(
"timestamp",
"screen_ppdeg_x_coordinate",
"screen_ppdeg_y_coordinate",
"eye1_pupil_size",
"eye2_pupil_size",
)
)
)
bids_columns = []
for eyenum, eyename in enumerate(eye):
for name in columns:
colprefix = f"eye{eyenum + 1}" if name.endswith(f"_{eyename}") else ""
_newname = name.split("_")[0]
_newname = re.sub(r"([xy])$", r"_\1_coordinate", _newname)
_newname = re.sub(r"([xy])vel$", r"_\1_velocity", _newname)
_newname = _newname.split("_", 1)
_newname[0] = EDF2BIDS_COLUMNS[_newname[0]]
_newname.insert(0, colprefix)
bids_columns.append("_".join((_n for _n in _newname if _n)))
# Rename columns to be BIDS-compliant
recording = recording.rename(columns=dict(zip(columns, bids_columns)))
# Reorder columns to render nicely (tracking first, pupil size after)
columns = sorted(
set(recording.columns.values).intersection(BIDS_COLUMNS_ORDER),
key=lambda entry: BIDS_COLUMNS_ORDER.index(entry),
)
columns += [c for c in recording.columns.values if c not in columns]
recording = recording.reindex(columns=columns)
Parsing the calibration messages¶
We had reserved the calibration information for extraction of metadata. In the case of our ET, the calibration information is textual and as follows:
!CAL
>>>>>>> CALIBRATION (HV9,P-CR) FOR RIGHT: <<<<<<<<<
!CAL Calibration points:
!CAL -18.8, -42.0 0, 0
!CAL -16.8, -58.4 0, -2457
!CAL -20.0, -25.9 0, 2457
!CAL -45.1, -42.6 -3474, 0
!CAL 10.2, -40.9 3474, 0
!CAL -45.4, -59.0 -3474, -2457
!CAL 11.1, -55.5 3474, -2457
!CAL -47.5, -27.1 -3474, 2457
!CAL 8.6, -25.6 3474, 2457
!CAL eye check box: (L,R,T,B)
-53 17 -62 -22
!CAL href cal range: (L,R,T,B)
-5211 5211 -3686 3686
!CAL Cal coeff:(X=a+bx+cy+dxx+eyy,Y=f+gx+goaly+ixx+jyy)
5.882e-05 125.7 12.521 -0.22744 -0.18622
4.4115e-05 -4.6332 150.8 -0.048749 0.093667
!CAL Prenormalize: offx, offy = -18.822 -42.021
!CAL Quadrant center: centx, centy =
5.882e-05 4.4115e-05
!CAL Corner correction:
3.207e-05, -1.7627e-06
-1.6438e-05, 3.5065e-05
-2.0184e-05, -1.0586e-05
5.7616e-06, 1.5672e-05
!CAL Gains: cx:120.898 lx:115.734 rx:103.799
!CAL Gains: cy:164.746 ty:163.045 by:164.277
!CAL Resolution (upd) at screen center: X=2.2, Y=1.6
!CAL Gain Change Proportion: X: 0.115 Y: 0.008
!CAL Gain Ratio (Gy/Gx) = 1.363
!CAL Cross-Gain Ratios: X=0.100, Y=0.031
!CAL Quadrant fixup[0] = 0.059,0.003
!CAL Quadrant fixup[1] = 0.029,0.062
!CAL Quadrant fixup[2] = 0.037,0.019
!CAL Quadrant fixup[3] = 0.010,0.027
!CAL PCR gain ratio(x,y) = 2.442, 1.946
!CAL CR gain match(x,y) = 1.020, 1.020
!CAL Slip rotation correction OFF
!CAL CALIBRATION HV9 R RIGHT GOOD
!CAL VALIDATION HV9 R RIGHT GOOD ERROR 0.49 avg. 0.92 max OFFSET 0.17 deg. -2.4,-3.7 pix.
The !CAL
messages are parsed as follows:
# Parse calibration metadata
metadata["CalibrationCount"] = 0
if not calibration.empty:
warn("Calibration of more than one eye is not implemented")
calibration.trialid = calibration.trialid.str.replace("!CAL", "")
calibration.trialid = calibration.trialid.str.strip()
metadata["CalibrationLog"] = list(
zip(
calibration.trialid_time.values.astype(int),
calibration.trialid.values,
)
)
calibrations_msg = calibration.trialid.str.startswith(
"VALIDATION"
) & calibration.trialid.str.contains("ERROR")
metadata["CalibrationCount"] = calibrations_msg.sum()
calibration_last = calibration.index[calibrations_msg][-1]
try:
meta_calib = re.match(
r"VALIDATION (?P<ctype>[\w\d]+) (?P<eyeid>[RL]+) (?P<eye>RIGHT|LEFT) "
r"(?P<result>\w+) ERROR (?P<avg>-?\d+\.\d+) avg\. (?P<max>-?\d+\.\d+) max\s+"
r"OFFSET (?P<offsetdeg>-?\d+\.\d+) deg\. "
r"(?P<offsetxpix>-?\d+\.\d+),(?P<offsetypix>-?\d+\.\d+) pix\.",
calibration.loc[calibration_last, "trialid"].strip(),
).groupdict()
metadata["CalibrationType"] = meta_calib["ctype"]
metadata["AverageCalibrationError"] = [float(meta_calib["avg"])]
metadata["MaximalCalibrationError"] = [float(meta_calib["max"])]
metadata["CalibrationResultQuality"] = [meta_calib["result"]]
metadata["CalibrationResultOffset"] = [
float(meta_calib["offsetdeg"]),
(float(meta_calib["offsetxpix"]), float(meta_calib["offsetypix"])),
]
metadata["CalibrationResultOffsetUnits"] = ["deg", "pixels"]
except AttributeError:
warn("Calibration data found but unsuccessfully parsed for results")
Parsing the events
dataframe¶
The events dataframe contains several parameters calculated by the EyeLink software. It is important to remark that these events are not exactly the same as the experimental events that are encoded in the dataset. These events are specific to the recording of ET data with the EyeLink device.
# Process events: first generate empty columns
recording["eye1_fixation"] = 0
recording["eye1_saccade"] = 0
recording["eye1_blink"] = 0
# Add fixations
for _, fixation_event in events[
events["type"] == "fixation"
].iterrows():
recording.loc[
(recording["timestamp"] >= fixation_event["start"])
& (recording["timestamp"] <= fixation_event["end"]),
"eye1_fixation",
] = 1
# Add saccades, and blinks, which are a sub-event of saccades
for _, saccade_event in events[
events["type"] == "saccade"
].iterrows():
recording.loc[
(recording["timestamp"] >= saccade_event["start"])
& (recording["timestamp"] <= saccade_event["end"]),
"eye1_saccade",
] = 1
if saccade_event["blink"] == 1:
recording.loc[
(recording["timestamp"] >= saccade_event["start"])
& (recording["timestamp"] <= saccade_event["end"]),
"eye1_blink",
] = 1
Writing the data into the BIDS structure¶
Finally, data can be written out if the reference run corresponding to the ET recording is provided:
def write_bids(
et_run: EyeTrackingRun,
exp_run: str | Path,
) -> List[str]:
"""
Save an eye-tracking run into a existing BIDS structure.
Parameters
----------
et_run : :obj:`EyeTrackingRun`
An object representing an eye-tracking run.
exp_run : :obj:`os.pathlike`
The path of the corresponding neuroimaging experiment in BIDS.
Returns
-------
List[str]
A list of generated files.
"""
exp_run = Path(exp_run)
out_dir = exp_run.parent
refname = exp_run.name
extension = "".join(exp_run.suffixes)
suffix = refname.replace(extension, "").rsplit("_", 1)[-1]
refname = refname.replace(f"_{suffix}", "_eyetrack")
# Remove undesired entities
refname = re.sub(r"_part-(mag|phase)", "", refname)
refname = re.sub(r"_echo-[\w\d]+", "", refname)
# Write out sidecar JSON
out_json = out_dir / refname.replace(extension, ".json")
out_json.write_text(
json.dumps(et_run.metadata, sort_keys=True, indent=2)
)
# Write out data
out_tsvgz = out_dir / refname.replace(extension, ".tsv.gz")
et_run.recording.to_csv(
out_tsvgz,
sep="\t",
index=False,
header=False,
compression="gzip",
na_rep="n/a",
)
return str(out_tsvgz), str(out_json)