Source code for nwb2bids._converters._dataset_converter

import collections
import json
import traceback

import pandas
import pydantic
import typing_extensions
from tqdm import tqdm

from ._dandi_utils import get_bids_dataset_description
from ._run_config import RunConfig
from ._session_converter import SessionConverter
from .._converters._base_converter import BaseConverter
from ..bids_models import BidsSessionMetadata, DatasetDescription
from ..notifications import Notification


[docs] class DatasetConverter(BaseConverter): session_converters: list[SessionConverter] = pydantic.Field( description="List of session converters. Typically instantiated by calling `.from_nwb_paths()`." ) dataset_description: DatasetDescription | None = pydantic.Field( description="The BIDS-compatible dataset description.", default=None, ) @pydantic.computed_field @property def notifications(self) -> list[Notification]: """ All notifications from contained session converters. These can accumulate over time based on which instance methods have been called. """ notifications = [ notification for session_converter in self.session_converters for notification in session_converter.notifications ] + self._internal_notifications notifications.sort( key=lambda notification: (-notification.category.value, -notification.severity.value, notification.title) ) return notifications @property def _is_derivative(self) -> bool: """ Return True if the dataset should be written as a BIDS derivative. This is the case when any session contains a units table but no electrodes table and no raw :class:`~pynwb.ecephys.ElectricalSeries` in the ``acquisition`` module, indicating the data is derived (e.g., spike-sorted) rather than raw. Metadata must be extracted before this property is meaningful. """ return any( sc.session_metadata is not None and not sc.session_metadata.has_electrical_series_in_acquisition and sc.session_metadata.electrode_table is None and sc.session_metadata.has_units_table for sc in self.session_converters )
[docs] @classmethod @pydantic.validate_call def from_remote_dandiset( cls, dandiset_id: str = pydantic.Field(pattern=r"^\d{6}$"), api_url: str | None = None, version_id: str = "draft", token: str | None = None, limit: int | None = None, run_config: RunConfig = pydantic.Field(default_factory=lambda: RunConfig()), ) -> typing_extensions.Self: """ Initialize a converter of a Dandiset to BIDS format. Parameters ---------- dandiset_id : str The dandiset ID of the Dandiset to be converted. api_url : str, optional The API URL of a custom DANDI instance to use. If not provided, the API URL of the DANDI instance specified by the :envvar:`DANDI_INSTANCE` environment variable is used. If the :envvar:`DANDI_INSTANCE` environment variable is not specified, The API URL of the `"dandi"` DANDI instance is used. version_id : str, default: "draft" The version ID of the Dandiset to be converted. token : str, optional The authentication token for accessing the DANDI instance. If not provided, will attempt to read from the environment variable `DANDI_API_KEY` if it exists. This is required for accessing embargoed Dandisets. limit : int, optional If specified, limits the number of sessions to convert. This is mainly useful for testing purposes. run_config : RunConfig, optional The configuration for this conversion run. """ try: import dandi.dandiapi client = dandi.dandiapi.DandiAPIClient(api_url=api_url, token=token) dandiset = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id) dataset_description, _internal_notifications = get_bids_dataset_description(dandiset=dandiset) if limit is None: assets = list(dandiset.get_assets()) else: assets = [asset for counter, asset in enumerate(dandiset.get_assets()) if counter < limit] session_id_to_assets = collections.defaultdict(list) for asset in assets: asset_metadata = asset.get_raw_metadata() for session in asset_metadata.get("wasGeneratedBy", []): if session.get("schemaKey", "") != "Session": continue session_id = session.get("identifier", "") if session_id == "": continue session_id_to_assets[session_id].append(asset) sorted_session_id_to_assets = dict(sorted(session_id_to_assets.items(), key=lambda item: item[0])) session_converters = [ SessionConverter( session_id=session_id, nwbfile_paths=[asset.get_content_url(follow_redirects=1, strip_query=True) for asset in assets], run_config=run_config, ) for session_id, assets in tqdm( sorted_session_id_to_assets.items(), desc="Initializing sessions", unit="session", disable=run_config.silent, ) ] dataset_converter = cls( session_converters=session_converters, dataset_description=dataset_description, run_config=run_config ) dataset_converter._internal_notifications = _internal_notifications return dataset_converter except Exception: # noqa notification = Notification.from_definition( identifier="RemoteInitializationFailure", traceback=traceback.format_exc() ) _internal_notifications = [notification] dataset_converter = cls(session_converters=[], dataset_description=None, run_config=run_config) dataset_converter._internal_notifications = _internal_notifications return dataset_converter
[docs] @classmethod @pydantic.validate_call def from_nwb_paths( cls, nwb_paths: list[pydantic.FilePath | pydantic.DirectoryPath] = pydantic.Field(min_length=1), run_config: RunConfig = pydantic.Field(default_factory=lambda: RunConfig()), ) -> typing_extensions.Self: """ Initialize a converter of NWB files to BIDS format. Parameters ---------- nwb_paths : iterable of file and directory paths An iterable of NWB file paths and directories containing NWB files. run_config : RunConfig, optional The configuration for this conversion run. Returns ------- An instance of DatasetConverter. """ try: session_converters = SessionConverter.from_nwb_paths(nwb_paths=nwb_paths, run_config=run_config) dataset_description = None additional_metadata_file_path = run_config.additional_metadata_file_path if additional_metadata_file_path is not None: dataset_description = DatasetDescription.from_file_path(file_path=additional_metadata_file_path) session_messages = [ notification for session_converter in session_converters for notification in session_converter.notifications ] dataset_converter = cls( session_converters=session_converters, dataset_description=dataset_description, run_config=run_config ) dataset_converter._internal_notifications = session_messages return dataset_converter except Exception: # noqa notification = Notification.from_definition( identifier="LocalInitializationFailure", traceback=traceback.format_exc() ) _internal_notifications = [notification] dataset_converter = cls(session_converters=[], dataset_description=None, run_config=run_config) dataset_converter._internal_notifications = _internal_notifications return dataset_converter
[docs] def extract_metadata(self) -> None: try: sessions_needing_metadata = [sc for sc in self.session_converters if sc.session_metadata is None] if not sessions_needing_metadata: return collections.deque( ( session_converter.extract_metadata() for session_converter in tqdm( sessions_needing_metadata, desc="Extracting metadata", unit="session", disable=self.run_config.silent, ) ), maxlen=0, ) except Exception: # noqa notification = Notification.from_definition( identifier="MetadataExtractionFailure", traceback=traceback.format_exc() ) self._internal_notifications.append(notification)
def _set_use_session_labels(self) -> None: """ Determine whether each session converter should include the `ses-` entity in file names. Rules: - If `use_session_labels` is True in the run config, all sessions use `ses-` labels. - A subject with more than one session always uses the `ses-` label for its sessions. - If more than 50% of all subjects have more than one session, all session converters use `ses-` labels to ensure dataset-level consistency. - Otherwise (when ≤50% of subjects have multiple sessions), single-session subjects do not use `ses-` labels. """ if self.run_config.use_session_labels: for session_converter in self.session_converters: session_converter.use_session_labels = True return participant_session_counts: collections.Counter = collections.Counter() for session_converter in self.session_converters: session_metadata = session_converter.session_metadata if session_metadata is None: continue sanitization = session_metadata.sanitization if sanitization is None: continue participant_id = sanitization.sanitized_participant_id participant_session_counts[participant_id] += 1 total_subjects = len(participant_session_counts) if total_subjects == 0: return subjects_with_multiple_sessions = sum(1 for count in participant_session_counts.values() if count > 1) use_labels_globally = (subjects_with_multiple_sessions / total_subjects) > 0.5 for session_converter in self.session_converters: session_metadata = session_converter.session_metadata if session_metadata is None: continue sanitization = session_metadata.sanitization if sanitization is None: continue participant_id = sanitization.sanitized_participant_id session_converter.use_session_labels = participant_session_counts[participant_id] > 1 or use_labels_globally
[docs] def convert_to_bids_dataset(self) -> None: """Convert the directory of NWB files to a BIDS dataset.""" try: # Ensure all metadata is extracted before determining session label usage self.extract_metadata() # If any session has a units table but no electrodes table, redirect all output # to a 'derivatives/nwb2bids' subfolder. DatasetType is set to 'derivative' # separately inside write_dataset_description. if self._is_derivative: derivatives_bids_directory = self.run_config.bids_directory / "derivatives" / "nwb2bids" derivatives_bids_directory.mkdir(parents=True, exist_ok=True) derivative_run_config = self.run_config.model_copy( update={"bids_directory": derivatives_bids_directory} ) self.run_config = derivative_run_config for session_converter in self.session_converters: session_converter.run_config = derivative_run_config # Determine which sessions should use ses- labels (requires metadata for participant IDs) self._set_use_session_labels() for session_converter in tqdm( self.session_converters, desc="Converting sessions", unit="session", disable=self.run_config.silent, ): session_converter.convert_to_bids_session() self.write_participants_metadata() self.write_sessions_metadata() self.write_dataset_description() self.write_bidsignore() except Exception: # noqa notification = Notification.from_definition( identifier="LocalInitializationFailure", traceback=traceback.format_exc() ) self._internal_notifications.append(notification) finally: self.run_config.bids_directory.mkdir(exist_ok=True) # Just in case it failed to create earlier self.run_config._nwb2bids_directory.mkdir(exist_ok=True) notifications_dump = [notification.model_dump(mode="json") for notification in self.notifications] self.run_config.notifications_json_file_path.write_text(data=json.dumps(obj=notifications_dump, indent=2))
[docs] def write_bidsignore(self) -> None: """Write the `.bidsignore` file if an archive target of `"dandi"` or `"ember"` is specified.""" if (archive_target := self.run_config.archive_target) is None or archive_target not in ["dandi", "ember"]: return bidsignore_file_path = self.run_config.bids_directory / ".bidsignore" entry = "dandiset.yaml" if bidsignore_file_path.exists(): existing_lines = {line.strip() for line in bidsignore_file_path.read_text().splitlines()} if entry in existing_lines: return with bidsignore_file_path.open(mode="a") as file_stream: file_stream.write(entry + "\n")
[docs] def write_dataset_description(self) -> None: """Write the `dataset_description.json` file.""" if self.dataset_description is None: self.dataset_description = DatasetDescription(BIDSVersion="1.10.1", HEDVersion="8.3.0") if self._is_derivative: self.dataset_description.DatasetType = "derivative" dataset_description_dictionary = self.dataset_description.model_dump() dataset_description_file_path = self.run_config.bids_directory / "dataset_description.json" with dataset_description_file_path.open(mode="w") as file_stream: json.dump(obj=dataset_description_dictionary, fp=file_stream, indent=4)
[docs] def write_participants_metadata(self) -> None: """Write the `participants.tsv` and `participants.json` files.""" model_dump_per_session = [ sc.session_metadata.participant.model_dump() for sc in self.session_converters if sc.session_metadata is not None ] full_participants_data_frame = pandas.DataFrame.from_records( data=[ {key: value for key, value in model_dump.items() if value is not None} for model_dump in model_dump_per_session ] ).astype("string") if full_participants_data_frame.empty: return # Aggregate values across entries (such as species mismatches) cols_to_agg = [col for col in full_participants_data_frame.columns if col != "participant_id"] if any(cols_to_agg): aggregated_cols = {col: lambda val: ", ".join(val.dropna().unique()) for col in cols_to_agg} aggregated_data_frame = full_participants_data_frame.groupby(by="participant_id", as_index=False).agg( func=aggregated_cols ) else: aggregated_data_frame = full_participants_data_frame.copy() # Deduplicate all rows of the frame deduplicated_data_frame = aggregated_data_frame.drop_duplicates(ignore_index=True).copy() # Save original IDs before sanitization (for potential original_participant_id column) original_participant_id_values = deduplicated_data_frame["participant_id"].copy() # Apply sanitization sanitizations = [] for converter in self.session_converters: session_metadata = converter.session_metadata if session_metadata is None: continue sanitization = session_metadata.sanitization if sanitization is None: continue sanitizations.append(sanitization) sanitized_participant_ids = { sanitization.original_participant_id: sanitization.sanitized_participant_id for sanitization in sanitizations } with pandas.option_context("mode.chained_assignment", None): deduplicated_data_frame["participant_id"] = ( deduplicated_data_frame["participant_id"] .apply(lambda participant_id: sanitized_participant_ids[participant_id]) .astype("string") ) # BIDS requires sub- prefix in table values participants_data_frame = deduplicated_data_frame.copy(deep=True) participants_data_frame["participant_id"] = ( participants_data_frame["participant_id"] .apply(lambda participant_id: f"sub-{participant_id}") .astype("string") ) # Add original_participant_id column if sub_labels sanitization is enabled sanitization_config = sanitizations[0].sanitization_config if len(sanitizations) != 0 else None if sanitization_config is not None and sanitization_config.sub_labels: participants_data_frame.insert( loc=1, column="original_participant_id", value=original_participant_id_values.apply(lambda participant_id: f"sub-{participant_id}").astype( "string" ), ) is_field_in_table = {field: True for field in participants_data_frame.keys()} # BIDS Validator is strict regarding column order required_column_order = [ field for field in ["participant_id", "original_participant_id", "species", "sex", "strain"] if is_field_in_table.get(field, False) is True ] column_order = required_column_order + [ field for field in participants_data_frame.columns if is_field_in_table.get(field, False) is True and field not in required_column_order ] participants_tsv_file_path = self.run_config.bids_directory / "participants.tsv" participants_data_frame.to_csv( path_or_buf=participants_tsv_file_path, mode="w", index=False, sep="\t", columns=column_order ) if len(self.session_converters) > 0: is_field_in_table = {field: True for field in participants_data_frame.keys()} example_participant = self.session_converters[0].session_metadata.participant # type: ignore[union-attr] participants_schema = example_participant.model_json_schema() participants_json = { field: info["description"] for field, info in participants_schema["properties"].items() if is_field_in_table.get(field, False) is True } if sanitization_config is not None and sanitization_config.sub_labels: participants_json["original_participant_id"] = ( "The original participant identifier before sanitization." ) participants_json_file_path = self.run_config.bids_directory / "participants.json" with participants_json_file_path.open(mode="w") as file_stream: json.dump(obj=participants_json, fp=file_stream, indent=4)
[docs] def write_sessions_metadata(self) -> None: """ Write the `_sessions.tsv` and `_sessions.json` files, then create empty participant and session directories. Sessions metadata files and `ses-` subdirectories are only written for subjects that use session labels (i.e., subjects with multiple sessions, or all subjects when more than 50% have multiple sessions). """ participant_id_to_sessions = collections.defaultdict(list) participant_id_to_use_session_labels = {} for session_converter in self.session_converters: session_metadata = session_converter.session_metadata if session_metadata is None: continue sanitization = session_metadata.sanitization if sanitization is None: continue participant_id = sanitization.sanitized_participant_id participant_id_to_sessions[participant_id].append(session_metadata) participant_id_to_use_session_labels[participant_id] = session_converter.use_session_labels sanitization_config = None for sc in self.session_converters: session_metadata = sc.session_metadata if session_metadata is None: continue sanitization = session_metadata.sanitization if sanitization is None: continue sanitization_config = sanitization.sanitization_config break # TODO: expand beyond just session_id field (mainly via additional metadata) sessions_schema = BidsSessionMetadata.model_json_schema() sessions_json = {"session_id": sessions_schema["properties"]["session_id"]["description"]} if sanitization_config is not None and sanitization_config.ses_labels: sessions_json["original_session_id"] = "The original session identifier before sanitization." for participant_id, sessions_metadata in participant_id_to_sessions.items(): sanitized_participant_id = participant_id sanitized_session_ids = [] for session_metadata in sessions_metadata: sanitization = session_metadata.sanitization if sanitization is None: continue sanitized_session_ids.append(sanitization.sanitized_session_id) subject_directory = self.run_config.bids_directory / f"sub-{sanitized_participant_id}" subject_directory.mkdir(exist_ok=True) # Only write sessions metadata files for subjects that use ses- labels if not participant_id_to_use_session_labels.get(participant_id, True): continue # BIDS requires ses- prefix in table values sessions_data_frame = pandas.DataFrame( {"session_id": [f"ses-{session_id}" for session_id in sanitized_session_ids]} ) # Add original_session_id column if ses_labels sanitization is enabled if sanitization_config is not None and sanitization_config.ses_labels: original_session_ids = [] for session_metadata in sessions_metadata: sanitization = session_metadata.sanitization if sanitization is None: continue original_session_ids.append(sanitization.original_session_id) sessions_data_frame["original_session_id"] = pandas.Series( [f"ses-{session_id}" for session_id in original_session_ids] ).astype("string") session_tsv_file_path = subject_directory / f"sub-{sanitized_participant_id}_sessions.tsv" sessions_data_frame.to_csv(path_or_buf=session_tsv_file_path, mode="w", index=False, sep="\t") session_json_file_path = subject_directory / f"sub-{sanitized_participant_id}_sessions.json" with session_json_file_path.open(mode="w") as file_stream: json.dump(obj=sessions_json, fp=file_stream, indent=4) for session_id in sanitized_session_ids: session_directory = subject_directory / f"ses-{session_id}" session_directory.mkdir(exist_ok=True)