Source code for nwb2bids._converters._dataset_converter
import collections
import json
import traceback
import pandas
import pydantic
import typing_extensions
from tqdm import tqdm
from ._dandi_utils import get_bids_dataset_description
from ._run_config import RunConfig
from ._session_converter import SessionConverter
from .._converters._base_converter import BaseConverter
from ..bids_models import BidsSessionMetadata, DatasetDescription
from ..notifications import Notification
[docs]
class DatasetConverter(BaseConverter):
session_converters: list[SessionConverter] = pydantic.Field(
description="List of session converters. Typically instantiated by calling `.from_nwb_paths()`."
)
dataset_description: DatasetDescription | None = pydantic.Field(
description="The BIDS-compatible dataset description.",
default=None,
)
@pydantic.computed_field
@property
def notifications(self) -> list[Notification]:
"""
All notifications from contained session converters.
These can accumulate over time based on which instance methods have been called.
"""
notifications = [
notification
for session_converter in self.session_converters
for notification in session_converter.notifications
] + self._internal_notifications
notifications.sort(
key=lambda notification: (-notification.category.value, -notification.severity.value, notification.title)
)
return notifications
@property
def _is_derivative(self) -> bool:
"""
Return True if the dataset should be written as a BIDS derivative.
This is the case when any session contains a units table but no electrodes table
and no raw :class:`~pynwb.ecephys.ElectricalSeries` in the ``acquisition`` module,
indicating the data is derived (e.g., spike-sorted) rather than raw.
Metadata must be extracted before this property is meaningful.
"""
return any(
sc.session_metadata is not None
and not sc.session_metadata.has_electrical_series_in_acquisition
and sc.session_metadata.electrode_table is None
and sc.session_metadata.has_units_table
for sc in self.session_converters
)
[docs]
@classmethod
@pydantic.validate_call
def from_remote_dandiset(
cls,
dandiset_id: str = pydantic.Field(pattern=r"^\d{6}$"),
api_url: str | None = None,
version_id: str = "draft",
token: str | None = None,
limit: int | None = None,
run_config: RunConfig = pydantic.Field(default_factory=lambda: RunConfig()),
) -> typing_extensions.Self:
"""
Initialize a converter of a Dandiset to BIDS format.
Parameters
----------
dandiset_id : str
The dandiset ID of the Dandiset to be converted.
api_url : str, optional
The API URL of a custom DANDI instance to use. If not provided, the API URL of the
DANDI instance specified by the :envvar:`DANDI_INSTANCE` environment variable
is used. If the :envvar:`DANDI_INSTANCE` environment variable is not specified,
The API URL of the `"dandi"` DANDI instance is used.
version_id : str, default: "draft"
The version ID of the Dandiset to be converted.
token : str, optional
The authentication token for accessing the DANDI instance.
If not provided, will attempt to read from the environment variable `DANDI_API_KEY` if it exists.
This is required for accessing embargoed Dandisets.
limit : int, optional
If specified, limits the number of sessions to convert.
This is mainly useful for testing purposes.
run_config : RunConfig, optional
The configuration for this conversion run.
"""
try:
import dandi.dandiapi
client = dandi.dandiapi.DandiAPIClient(api_url=api_url, token=token)
dandiset = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id)
dataset_description, _internal_notifications = get_bids_dataset_description(dandiset=dandiset)
if limit is None:
assets = list(dandiset.get_assets())
else:
assets = [asset for counter, asset in enumerate(dandiset.get_assets()) if counter < limit]
session_id_to_assets = collections.defaultdict(list)
for asset in assets:
asset_metadata = asset.get_raw_metadata()
for session in asset_metadata.get("wasGeneratedBy", []):
if session.get("schemaKey", "") != "Session":
continue
session_id = session.get("identifier", "")
if session_id == "":
continue
session_id_to_assets[session_id].append(asset)
sorted_session_id_to_assets = dict(sorted(session_id_to_assets.items(), key=lambda item: item[0]))
session_converters = [
SessionConverter(
session_id=session_id,
nwbfile_paths=[asset.get_content_url(follow_redirects=1, strip_query=True) for asset in assets],
run_config=run_config,
)
for session_id, assets in tqdm(
sorted_session_id_to_assets.items(),
desc="Initializing sessions",
unit="session",
disable=run_config.silent,
)
]
dataset_converter = cls(
session_converters=session_converters, dataset_description=dataset_description, run_config=run_config
)
dataset_converter._internal_notifications = _internal_notifications
return dataset_converter
except Exception: # noqa
notification = Notification.from_definition(
identifier="RemoteInitializationFailure", traceback=traceback.format_exc()
)
_internal_notifications = [notification]
dataset_converter = cls(session_converters=[], dataset_description=None, run_config=run_config)
dataset_converter._internal_notifications = _internal_notifications
return dataset_converter
[docs]
@classmethod
@pydantic.validate_call
def from_nwb_paths(
cls,
nwb_paths: list[pydantic.FilePath | pydantic.DirectoryPath] = pydantic.Field(min_length=1),
run_config: RunConfig = pydantic.Field(default_factory=lambda: RunConfig()),
) -> typing_extensions.Self:
"""
Initialize a converter of NWB files to BIDS format.
Parameters
----------
nwb_paths : iterable of file and directory paths
An iterable of NWB file paths and directories containing NWB files.
run_config : RunConfig, optional
The configuration for this conversion run.
Returns
-------
An instance of DatasetConverter.
"""
try:
session_converters = SessionConverter.from_nwb_paths(nwb_paths=nwb_paths, run_config=run_config)
dataset_description = None
additional_metadata_file_path = run_config.additional_metadata_file_path
if additional_metadata_file_path is not None:
dataset_description = DatasetDescription.from_file_path(file_path=additional_metadata_file_path)
session_messages = [
notification
for session_converter in session_converters
for notification in session_converter.notifications
]
dataset_converter = cls(
session_converters=session_converters, dataset_description=dataset_description, run_config=run_config
)
dataset_converter._internal_notifications = session_messages
return dataset_converter
except Exception: # noqa
notification = Notification.from_definition(
identifier="LocalInitializationFailure", traceback=traceback.format_exc()
)
_internal_notifications = [notification]
dataset_converter = cls(session_converters=[], dataset_description=None, run_config=run_config)
dataset_converter._internal_notifications = _internal_notifications
return dataset_converter
[docs]
def extract_metadata(self) -> None:
try:
sessions_needing_metadata = [sc for sc in self.session_converters if sc.session_metadata is None]
if not sessions_needing_metadata:
return
collections.deque(
(
session_converter.extract_metadata()
for session_converter in tqdm(
sessions_needing_metadata,
desc="Extracting metadata",
unit="session",
disable=self.run_config.silent,
)
),
maxlen=0,
)
except Exception: # noqa
notification = Notification.from_definition(
identifier="MetadataExtractionFailure", traceback=traceback.format_exc()
)
self._internal_notifications.append(notification)
def _set_use_session_labels(self) -> None:
"""
Determine whether each session converter should include the `ses-` entity in file names.
Rules:
- If `use_session_labels` is True in the run config, all sessions use `ses-` labels.
- A subject with more than one session always uses the `ses-` label for its sessions.
- If more than 50% of all subjects have more than one session, all session converters use `ses-` labels
to ensure dataset-level consistency.
- Otherwise (when ≤50% of subjects have multiple sessions), single-session subjects do not use
`ses-` labels.
"""
if self.run_config.use_session_labels:
for session_converter in self.session_converters:
session_converter.use_session_labels = True
return
participant_session_counts: collections.Counter = collections.Counter()
for session_converter in self.session_converters:
session_metadata = session_converter.session_metadata
if session_metadata is None:
continue
sanitization = session_metadata.sanitization
if sanitization is None:
continue
participant_id = sanitization.sanitized_participant_id
participant_session_counts[participant_id] += 1
total_subjects = len(participant_session_counts)
if total_subjects == 0:
return
subjects_with_multiple_sessions = sum(1 for count in participant_session_counts.values() if count > 1)
use_labels_globally = (subjects_with_multiple_sessions / total_subjects) > 0.5
for session_converter in self.session_converters:
session_metadata = session_converter.session_metadata
if session_metadata is None:
continue
sanitization = session_metadata.sanitization
if sanitization is None:
continue
participant_id = sanitization.sanitized_participant_id
session_converter.use_session_labels = participant_session_counts[participant_id] > 1 or use_labels_globally
[docs]
def convert_to_bids_dataset(self) -> None:
"""Convert the directory of NWB files to a BIDS dataset."""
try:
# Ensure all metadata is extracted before determining session label usage
self.extract_metadata()
# If any session has a units table but no electrodes table, redirect all output
# to a 'derivatives/nwb2bids' subfolder. DatasetType is set to 'derivative'
# separately inside write_dataset_description.
if self._is_derivative:
derivatives_bids_directory = self.run_config.bids_directory / "derivatives" / "nwb2bids"
derivatives_bids_directory.mkdir(parents=True, exist_ok=True)
derivative_run_config = self.run_config.model_copy(
update={"bids_directory": derivatives_bids_directory}
)
self.run_config = derivative_run_config
for session_converter in self.session_converters:
session_converter.run_config = derivative_run_config
# Determine which sessions should use ses- labels (requires metadata for participant IDs)
self._set_use_session_labels()
for session_converter in tqdm(
self.session_converters,
desc="Converting sessions",
unit="session",
disable=self.run_config.silent,
):
session_converter.convert_to_bids_session()
self.write_participants_metadata()
self.write_sessions_metadata()
self.write_dataset_description()
self.write_bidsignore()
except Exception: # noqa
notification = Notification.from_definition(
identifier="LocalInitializationFailure", traceback=traceback.format_exc()
)
self._internal_notifications.append(notification)
finally:
self.run_config.bids_directory.mkdir(exist_ok=True) # Just in case it failed to create earlier
self.run_config._nwb2bids_directory.mkdir(exist_ok=True)
notifications_dump = [notification.model_dump(mode="json") for notification in self.notifications]
self.run_config.notifications_json_file_path.write_text(data=json.dumps(obj=notifications_dump, indent=2))
[docs]
def write_bidsignore(self) -> None:
"""Write the `.bidsignore` file if an archive target of `"dandi"` or `"ember"` is specified."""
if (archive_target := self.run_config.archive_target) is None or archive_target not in ["dandi", "ember"]:
return
bidsignore_file_path = self.run_config.bids_directory / ".bidsignore"
entry = "dandiset.yaml"
if bidsignore_file_path.exists():
existing_lines = {line.strip() for line in bidsignore_file_path.read_text().splitlines()}
if entry in existing_lines:
return
with bidsignore_file_path.open(mode="a") as file_stream:
file_stream.write(entry + "\n")
[docs]
def write_dataset_description(self) -> None:
"""Write the `dataset_description.json` file."""
if self.dataset_description is None:
self.dataset_description = DatasetDescription(BIDSVersion="1.10.1", HEDVersion="8.3.0")
if self._is_derivative:
self.dataset_description.DatasetType = "derivative"
dataset_description_dictionary = self.dataset_description.model_dump()
dataset_description_file_path = self.run_config.bids_directory / "dataset_description.json"
with dataset_description_file_path.open(mode="w") as file_stream:
json.dump(obj=dataset_description_dictionary, fp=file_stream, indent=4)
[docs]
def write_participants_metadata(self) -> None:
"""Write the `participants.tsv` and `participants.json` files."""
model_dump_per_session = [
sc.session_metadata.participant.model_dump()
for sc in self.session_converters
if sc.session_metadata is not None
]
full_participants_data_frame = pandas.DataFrame.from_records(
data=[
{key: value for key, value in model_dump.items() if value is not None}
for model_dump in model_dump_per_session
]
).astype("string")
if full_participants_data_frame.empty:
return
# Aggregate values across entries (such as species mismatches)
cols_to_agg = [col for col in full_participants_data_frame.columns if col != "participant_id"]
if any(cols_to_agg):
aggregated_cols = {col: lambda val: ", ".join(val.dropna().unique()) for col in cols_to_agg}
aggregated_data_frame = full_participants_data_frame.groupby(by="participant_id", as_index=False).agg(
func=aggregated_cols
)
else:
aggregated_data_frame = full_participants_data_frame.copy()
# Deduplicate all rows of the frame
deduplicated_data_frame = aggregated_data_frame.drop_duplicates(ignore_index=True).copy()
# Save original IDs before sanitization (for potential original_participant_id column)
original_participant_id_values = deduplicated_data_frame["participant_id"].copy()
# Apply sanitization
sanitizations = []
for converter in self.session_converters:
session_metadata = converter.session_metadata
if session_metadata is None:
continue
sanitization = session_metadata.sanitization
if sanitization is None:
continue
sanitizations.append(sanitization)
sanitized_participant_ids = {
sanitization.original_participant_id: sanitization.sanitized_participant_id
for sanitization in sanitizations
}
with pandas.option_context("mode.chained_assignment", None):
deduplicated_data_frame["participant_id"] = (
deduplicated_data_frame["participant_id"]
.apply(lambda participant_id: sanitized_participant_ids[participant_id])
.astype("string")
)
# BIDS requires sub- prefix in table values
participants_data_frame = deduplicated_data_frame.copy(deep=True)
participants_data_frame["participant_id"] = (
participants_data_frame["participant_id"]
.apply(lambda participant_id: f"sub-{participant_id}")
.astype("string")
)
# Add original_participant_id column if sub_labels sanitization is enabled
sanitization_config = sanitizations[0].sanitization_config if len(sanitizations) != 0 else None
if sanitization_config is not None and sanitization_config.sub_labels:
participants_data_frame.insert(
loc=1,
column="original_participant_id",
value=original_participant_id_values.apply(lambda participant_id: f"sub-{participant_id}").astype(
"string"
),
)
is_field_in_table = {field: True for field in participants_data_frame.keys()}
# BIDS Validator is strict regarding column order
required_column_order = [
field
for field in ["participant_id", "original_participant_id", "species", "sex", "strain"]
if is_field_in_table.get(field, False) is True
]
column_order = required_column_order + [
field
for field in participants_data_frame.columns
if is_field_in_table.get(field, False) is True and field not in required_column_order
]
participants_tsv_file_path = self.run_config.bids_directory / "participants.tsv"
participants_data_frame.to_csv(
path_or_buf=participants_tsv_file_path, mode="w", index=False, sep="\t", columns=column_order
)
if len(self.session_converters) > 0:
is_field_in_table = {field: True for field in participants_data_frame.keys()}
example_participant = self.session_converters[0].session_metadata.participant # type: ignore[union-attr]
participants_schema = example_participant.model_json_schema()
participants_json = {
field: info["description"]
for field, info in participants_schema["properties"].items()
if is_field_in_table.get(field, False) is True
}
if sanitization_config is not None and sanitization_config.sub_labels:
participants_json["original_participant_id"] = (
"The original participant identifier before sanitization."
)
participants_json_file_path = self.run_config.bids_directory / "participants.json"
with participants_json_file_path.open(mode="w") as file_stream:
json.dump(obj=participants_json, fp=file_stream, indent=4)
[docs]
def write_sessions_metadata(self) -> None:
"""
Write the `_sessions.tsv` and `_sessions.json` files, then create empty participant and session directories.
Sessions metadata files and `ses-` subdirectories are only written for subjects that use session labels
(i.e., subjects with multiple sessions, or all subjects when more than 50% have multiple sessions).
"""
participant_id_to_sessions = collections.defaultdict(list)
participant_id_to_use_session_labels = {}
for session_converter in self.session_converters:
session_metadata = session_converter.session_metadata
if session_metadata is None:
continue
sanitization = session_metadata.sanitization
if sanitization is None:
continue
participant_id = sanitization.sanitized_participant_id
participant_id_to_sessions[participant_id].append(session_metadata)
participant_id_to_use_session_labels[participant_id] = session_converter.use_session_labels
sanitization_config = None
for sc in self.session_converters:
session_metadata = sc.session_metadata
if session_metadata is None:
continue
sanitization = session_metadata.sanitization
if sanitization is None:
continue
sanitization_config = sanitization.sanitization_config
break
# TODO: expand beyond just session_id field (mainly via additional metadata)
sessions_schema = BidsSessionMetadata.model_json_schema()
sessions_json = {"session_id": sessions_schema["properties"]["session_id"]["description"]}
if sanitization_config is not None and sanitization_config.ses_labels:
sessions_json["original_session_id"] = "The original session identifier before sanitization."
for participant_id, sessions_metadata in participant_id_to_sessions.items():
sanitized_participant_id = participant_id
sanitized_session_ids = []
for session_metadata in sessions_metadata:
sanitization = session_metadata.sanitization
if sanitization is None:
continue
sanitized_session_ids.append(sanitization.sanitized_session_id)
subject_directory = self.run_config.bids_directory / f"sub-{sanitized_participant_id}"
subject_directory.mkdir(exist_ok=True)
# Only write sessions metadata files for subjects that use ses- labels
if not participant_id_to_use_session_labels.get(participant_id, True):
continue
# BIDS requires ses- prefix in table values
sessions_data_frame = pandas.DataFrame(
{"session_id": [f"ses-{session_id}" for session_id in sanitized_session_ids]}
)
# Add original_session_id column if ses_labels sanitization is enabled
if sanitization_config is not None and sanitization_config.ses_labels:
original_session_ids = []
for session_metadata in sessions_metadata:
sanitization = session_metadata.sanitization
if sanitization is None:
continue
original_session_ids.append(sanitization.original_session_id)
sessions_data_frame["original_session_id"] = pandas.Series(
[f"ses-{session_id}" for session_id in original_session_ids]
).astype("string")
session_tsv_file_path = subject_directory / f"sub-{sanitized_participant_id}_sessions.tsv"
sessions_data_frame.to_csv(path_or_buf=session_tsv_file_path, mode="w", index=False, sep="\t")
session_json_file_path = subject_directory / f"sub-{sanitized_participant_id}_sessions.json"
with session_json_file_path.open(mode="w") as file_stream:
json.dump(obj=sessions_json, fp=file_stream, indent=4)
for session_id in sanitized_session_ids:
session_directory = subject_directory / f"ses-{session_id}"
session_directory.mkdir(exist_ok=True)