import collections
import json
import pathlib
import typing
import warnings
import pandas
import pydantic
import pynwb
import typing_extensions
from ._model_utils import _build_json_sidecar
from ..bids_models._base_metadata_model import BaseMetadataContainerModel, BaseMetadataModel
from ..notifications import Notification
def _infer_scalar_field(
electrode_name_to_series: dict[str, list[pynwb.icephys.PatchClampSeries]], field_name: str
) -> dict[str, typing.Any]:
"""For icephys specifically, infer some scalar field (e.g., rate, gain) for each electrode name."""
electrode_name_to_field_values = collections.defaultdict(set)
for electrode_name, series_list in electrode_name_to_series.items():
for series in series_list:
field_value = getattr(series, field_name, None)
if field_value is not None:
electrode_name_to_field_values[electrode_name].add(field_value)
electrode_name_to_field = dict()
for electrode_name, field_values in electrode_name_to_field_values.items():
if len(ls := list(field_values)) == 1:
electrode_name_to_field[electrode_name] = ls[0]
else:
message = (
f"Some PatchClampSeries associated with electrode {electrode_name} have conflicting "
f"{field_name}s: {field_values}. Automatic detection of channel {field_name} for this "
"case is not yet implemented."
)
warnings.warn(message=message, stacklevel=2)
return electrode_name_to_field
[docs]
class Channel(BaseMetadataModel):
name: str = pydantic.Field(
description="Label of the channel.",
title="Channel name",
)
electrode_name: str = pydantic.Field(
description=(
"Name of the electrode contact point. The value MUST match a name entry in the corresponding "
"*_electrodes.tsv file, linking this channel to its associated electrode contact point. "
"For channels not associated with an electrode, use n/a."
),
title="Electrode name",
)
type: str = pydantic.Field(
description=(
"Type of channel; MUST use the channel types listed below. Note that the type MUST be in upper-case."
),
title="Type",
)
units: str = pydantic.Field(
description=(
"Physical unit of the value represented in this channel, for example, V for Volt, or fT/cm for femto Tesla "
"per centimeter (see Units)."
),
title="Units",
)
sampling_frequency: float = pydantic.Field(
description="Sampling rate of the channel in Hz.",
title="Sampling frequency",
)
low_cutoff: float | None = pydantic.Field(
description=(
"Frequencies used for the high-pass filter applied to the channel in Hz. If no high-pass "
"filter applied, use n/a."
),
title="Low cutoff",
default=None,
)
high_cutoff: float | None = pydantic.Field(
description=(
"Frequencies used for the low-pass filter applied to the channel in Hz. "
"If no low-pass filter applied, use n/a. Note that hardware anti-aliasing in A/D conversion of "
"all MEG/EEG/EMG electronics applies a low-pass filter; specify its frequency here if applicable."
),
title="High cutoff",
default=None,
)
reference: str | None = pydantic.Field(
description=(
"The reference for the given channel. When the reference is an electrode in *_electrodes.tsv, "
'use the name of that electrode. If a corresponding electrode is not applicable, use "n/a".'
),
title="Reference",
default=None,
)
notch: str | None = pydantic.Field(
description=(
"Frequencies used for the notch filter applied to the channel, in Hz. "
"If notch filters are applied at multiple frequencies, these frequencies MAY be specified as a list, "
"for example, [60, 120, 180]. If no notch filter was applied, use n/a."
),
title="Notch",
default=None,
)
channel_label: str | None = pydantic.Field(
description=(
"Human readable identifier. Use this name to specify the content of signals not generated by electrodes. "
"For example, 'DAQ internal synchronization signals', 'behavioral signals', 'behavioral cues'."
),
title="Channel label",
default=None,
)
stream_id: str | None = pydantic.Field(
description="Data stream of the recording the signal.",
title="Stream ID",
default=None,
)
description: str | None = pydantic.Field(
description="Brief free-text description of the channel, or other information of interest.",
title="Description",
default=None,
)
software_filter_types: str | None = pydantic.Field(
description=(
"The types of software filters applied to this channel. "
"The Levels for this column SHOULD be defined in the accompanying *_channels.json file, "
"mapping each filter type key to its description. Use n/a if no software filters were "
"applied to this channel."
),
title="Software filter types",
default=None,
)
status: typing.Literal["good", "bad", "n/a"] | None = pydantic.Field(
description=(
"Data quality observed on the channel. "
"A channel is considered bad if its data quality is compromised by excessive noise. "
"If quality is unknown, then a value of n/a may be used. Description of noise type SHOULD "
"be provided in [status_description]."
),
title="Status",
default=None,
)
status_description: str | None = pydantic.Field(
description=(
"Freeform text description of noise or artifact affecting data quality on the channel. "
"It is meant to explain why the channel was declared bad in the status column."
),
title="Status description",
default=None,
)
gain: float | None = pydantic.Field(
description=(
"Amplification factor applied from signal detection at the electrode to the signal stored in the data file."
" If no gain factor is provided it is assumed to be 1."
),
title="Gain",
default=None,
)
time_offset: float | None = pydantic.Field(
description="Time shift between signal of this channel to a reference channel in seconds.",
title="Time offset",
default=None,
)
time_reference_channel: str | None = pydantic.Field(
description="Name of the channel that is used for time alignment of signals.",
title="Time reference channel",
default=None,
)
ground: str | None = pydantic.Field(
description=(
"Information on the ground. For example, 'chamber screw', 'head post', 'ear clip'. "
"Only should be used to optionally override the global ground in the _ecephys.json or _icephys.json file."
),
title="Ground",
default=None,
)
recording_mode: str | None = pydantic.Field(
description="The mode of recording for patch clamp datasets (for example, voltage clamp, current clamp).",
title="Recording mode",
default=None,
)
[docs]
class ChannelTable(BaseMetadataContainerModel):
channels: list[Channel]
modality: typing.Literal["ecephys", "icephys"]
@pydantic.computed_field
@property
def notifications(self) -> list[Notification]:
"""
All notifications from contained session converters.
These can accumulate over time based on which instance methods have been called.
"""
notifications = [notification for channel in self.channels for notification in channel.notifications]
notifications.sort(
key=lambda notification: (-notification.category.value, -notification.severity.value, notification.title)
)
return notifications
[docs]
@classmethod
@pydantic.validate_call
def from_nwbfiles(cls, nwbfiles: list[pydantic.InstanceOf[pynwb.NWBFile]]) -> typing_extensions.Self | None:
if len(nwbfiles) > 1:
message = "Conversion of multiple NWB files per session is not yet supported."
raise NotImplementedError(message)
nwbfile = nwbfiles[0]
has_ecephys_electrodes = nwbfile.electrodes is not None
has_icephys_electrodes = any(nwbfile.icephys_electrodes)
if not has_ecephys_electrodes and not has_icephys_electrodes:
return None
if has_ecephys_electrodes and has_icephys_electrodes:
message = (
"Converting electrode metadata when there are both ecephys and icephys types "
"has not yet been implemented."
)
raise NotImplementedError(message)
modality = "ecephys" if has_ecephys_electrodes else "icephys"
if modality == "ecephys":
# Only scan electrical series listed under acquisition since those under processing can downsample the rate
sampling_frequency = -1.0
stream_id = None
gain = None
raw_electrical_series = [
neurodata_object
for neurodata_object in nwbfile.objects.values()
if isinstance(neurodata_object, pynwb.ecephys.ElectricalSeries)
]
if len(raw_electrical_series) > 1:
# TODO: form a map of electrode to rate/gate based on ElectricalSeries linkage
message = (
"Support for automatic extraction of rates/gains from multiple ElectricalSeries is not yet "
"implemented. Skipping `sampling_frequency`, `stream_id`, and `gain` extraction."
)
warnings.warn(message=message, stacklevel=2)
if len(raw_electrical_series) == 1:
electrical_series = raw_electrical_series[0]
if electrical_series.rate is None:
message = (
"Support for automatic extraction of rate from ElectricalSeries with timestamps "
"is not yet implemented. Skipping `sampling_frequency`, `stream_id`, and `gain` extraction."
)
warnings.warn(message=message, stacklevel=2)
sampling_frequency = electrical_series.rate
stream_id = electrical_series.name
gain = electrical_series.conversion
channels = [
Channel(
name=(
f"{channel_name.values[0]}"
if (channel_name := electrode.get("channel_name", None)) is not None
else f"ch{str(electrode.index[0]).zfill(3)}"
),
electrode_name=f"e{str(electrode.index[0]).zfill(3)}",
type="n/a", # TODO: in dedicated follow-up, could classify LFP based on container
units="V",
sampling_frequency=sampling_frequency,
# channel_label: str | None = None # TODO: only support with additional metadata
stream_id=stream_id,
# description: str | None = None # TODO: only support with additional metadata
# status: typing.Literal["good", "bad"] | None = None # TODO: only support with additional metadata
# status_description: str | None = None # TODO: only support with additional metadata
gain=gain,
# Special extraction from SpikeInterface field
time_offset=shift[0] if (shift := electrode.get("inter_sample_shift", None)) is not None else None,
# time_reference_channel: str | None = None # TODO: only support with additional metadata
# ground: str | None = None # TODO: only support with additional metadata
)
for electrode in nwbfile.electrodes
]
else:
icephys_series = [
neurodata_object
for neurodata_object in nwbfile.acquisition.values()
if isinstance(neurodata_object, pynwb.icephys.PatchClampSeries)
]
electrode_name_to_series = collections.defaultdict(list)
for series in icephys_series:
electrode_name_to_series[series.electrode.name].append(series)
electrode_name_to_sampling_frequency = _infer_scalar_field(
electrode_name_to_series=electrode_name_to_series, field_name="rate"
)
electrode_name_to_gain = _infer_scalar_field(
electrode_name_to_series=electrode_name_to_series, field_name="gain"
)
electrode_name_to_class = _infer_scalar_field(
electrode_name_to_series=electrode_name_to_series, field_name="__class__"
)
class_name_to_type = {
"VoltageClampSeries": "VM",
"CurrentClampSeries": "IM",
}
electrode_name_to_type = {
electrode_name: class_name_to_type.get(series_class.__name__, "n/a")
for electrode_name, series_class in electrode_name_to_class.items()
}
type_to_recording_mode = {
"VM": "voltage-clamp",
"IM": "current-clamp",
"n/a": "n/a",
}
electrode_name_to_stream_ids = {
electrode_name: ",".join([series.name for series in series_list])
for electrode_name, series_list in electrode_name_to_series.items()
}
channels = [
Channel(
name=electrode.name,
electrode_name=electrode.name,
type=electrode_name_to_type.get(electrode.name, "n/a"),
units="V",
sampling_frequency=electrode_name_to_sampling_frequency.get(electrode.name, -1.0),
# channel_label: str | None = None # TODO: only support with additional metadata
stream_id=electrode_name_to_stream_ids.get(electrode.name, None),
# description: str | None = None # TODO: only support with additional metadata
# status: typing.Literal["good", "bad"] | None = None # TODO: only support with additional metadata
# status_description: str | None = None # TODO: only support with additional metadata
gain=electrode_name_to_gain.get(electrode.name, None),
# time_reference_channel: str | None = None # TODO: only support with additional metadata
# ground: str | None = None # TODO: only support with additional metadata
recording_mode=type_to_recording_mode[electrode_name_to_type.get(electrode.name, "n/a")],
# TODO: add extra columns
)
for electrode in nwbfile.icephys_electrodes.values()
]
return cls(channels=channels, modality=modality)
[docs]
@pydantic.validate_call
def to_tsv(self, file_path: str | pathlib.Path):
"""
Write the channels data to a TSV file.
Parameters
----------
file_path : path
The path where the TSV file will be saved.
"""
data = []
for channel in self.channels:
model_dump = channel.model_dump()
data.append(model_dump)
data_frame = pandas.DataFrame(data=data)
data_frame = data_frame.dropna(axis=1, how="all")
data_frame.to_csv(path_or_buf=file_path, sep="\t", index=False)
[docs]
@pydantic.validate_call
def to_json(self, file_path: str | pathlib.Path) -> None:
"""
Save the channels information to a JSON file.
Parameters
----------
file_path : path
The path to the output JSON file.
"""
file_path = pathlib.Path(file_path)
json_content = _build_json_sidecar(models=self.channels)
with file_path.open(mode="w") as file_stream:
json.dump(obj=json_content, fp=file_stream, indent=4)