from abc import abstractmethod, ABCMeta
import re
import typing as ty
import attrs
from operator import attrgetter
from attrs.converters import optional
# from arcana.core.data.row import DataRow
from arcana.core.utils.serialize import ClassResolver
from arcana.core.exceptions import ArcanaDataMatchError
from ..analysis.salience import ColumnSalience
from fileformats.core.quality import DataQuality
from .space import DataSpace
ItemType = ty.TypeVar("ItemType")
@attrs.define
class DataColumn(ty.Generic[ItemType], metaclass=ABCMeta):
name: str = attrs.field()
path: str = attrs.field()
datatype = attrs.field()
row_frequency: DataSpace = attrs.field()
dataset = attrs.field(
default=None, metadata={"asdict": False}, eq=False, hash=False, repr=False
)
def __iter__(self):
return (n[self.name] for n in self.dataset.rows(self.row_frequency))
def __getitem__(self, id) -> ItemType:
return self.dataset.row(id=id, row_frequency=self.row_frequency)[self.name]
def __len__(self):
return len(list(self.dataset.rows(self.row_frequency)))
@property
def ids(self):
return [n.id for n in self.dataset.rows(self.row_frequency)]
@abstractmethod
def match(self, row):
"""Selects a single item from a data row that matches the
criteria/path of the column.
Parameters
----------
row: DataRow
the row to match the item from
Returns
-------
DataType
the data item that matches the criteria/path
Raises
------
ArcanaDataMatchError
if none or multiple items match the criteria/path of the column
within the row
FileFormatError
if there are no files matching the format of the column in the row"""
def assume_exists(self):
# Update local cache of sink paths
for item in self:
item.get(assume_exists=True)
[docs]@attrs.define
class DataSource(DataColumn):
"""
Specifies the criteria by which an item is selected from a data row to
be a data source.
Parameters
----------
path : str
A regex name_path to match the file_group names with. Must match
one and only one file_group per <row_frequency>. If None, the name
is used instead.
datatype : type
File format that data will be
row_frequency : DataSpace
The row_frequency of the file-group within the dataset tree, e.g. per
'session', 'subject', 'timepoint', 'group', 'dataset'
quality_threshold : DataQuality
The acceptable quality (or above) that should be considered. Data items
will be considered missing
order : int | None
To be used to distinguish multiple file_groups that match the
name_path in the same session. The order of the file_group within the
session (0-indexed). Based on the scan ID but is more robust to small
changes to the IDs within the session if for example there are
two scans of the same type taken before and after a task.
header_vals : Dict[str, str]
To be used to distinguish multiple items that match the
the other criteria. The provided dictionary contains
header values that must match the stored header_vals exactly.
is_regex : bool
Flags whether the name_path is a regular expression or not
"""
quality_threshold: DataQuality = attrs.field(
default=None, converter=optional(lambda q: DataQuality[str(q)])
)
order: int = attrs.field(
default=None, converter=lambda x: int(x) if x is not None else None
)
header_vals: ty.Dict[str, ty.Any] = attrs.field(default=None)
is_regex: bool = attrs.field(
default=False,
converter=lambda x: x.lower() == "true" if isinstance(x, str) else x,
)
is_sink = False
def match(self, row):
criteria = [
(match_path, self.path if not self.is_regex else None),
(match_path_regex, self.path if self.is_regex else None),
(match_quality, self.quality_threshold),
(match_header_vals, self.header_vals),
]
# Get all items that match the data type of the source
matches = row.resolved(self.datatype)
if not matches:
msg = (
f"Did not find any items matching data datatype "
f"{ClassResolver.tostr(self.datatype)} in '{row.id}' "
f"{self.row_frequency} for the "
f"'{self.name}' column, found unresolved items:"
)
for item in sorted(row.unresolved, key=attrgetter("path")):
msg += (
f"\n {item.path}: paths="
+ ",".join(p.name for p in item.file_paths)
+ ((", uris=" + ",".join(item.uris.keys())) if item.uris else "")
)
msg += self._format_criteria()
raise ArcanaDataMatchError(msg)
# Apply all filters to find items that match criteria
for func, arg in criteria:
if arg is not None:
filtered = [m for m in matches if func(m, arg)]
if not filtered:
raise ArcanaDataMatchError(
"Did not find any items "
+ func.__doc__.format(arg)
+ self._error_msg(row, matches)
)
matches = filtered
# Select a single item from the ones that match the criteria
if self.order is not None:
try:
match = matches[self.order - 1]
except IndexError as e:
raise ArcanaDataMatchError(
"Not enough matching items to select one at index "
f"{self.order} (starting from 1), found:"
+ self._format_matches(matches)
) from e
elif len(matches) > 1:
raise ArcanaDataMatchError(
"Found multiple matches " + self._error_msg(row, matches)
)
else:
match = matches[0]
return match
def _error_msg(self, row, matches):
return (
f" attempting to select {ClassResolver.tostr(self.datatype)} item for "
f"the '{row.id}' {row.frequency} in the '{self.name}' column, found:"
+ self._format_matches(matches)
+ self._format_criteria()
)
def _format_criteria(self):
return (
f"\n\n criteria: {self.path}', is_regex={self.is_regex}, "
f"datatype={ClassResolver.tostr(self.datatype)}, "
f"quality_threshold='{self.quality_threshold}', "
f"header_vals={self.header_vals}, order={self.order}"
)
def _format_matches(self, matches):
out_str = ""
for match in sorted(matches, key=attrgetter("path")):
out_str += "\n "
if match.order:
out_str += match.order + ": "
out_str += match.path
out_str += f" ({match.quality})"
return out_str
def match_path(item, path):
"at the path '{}'"
return item.path == path
def match_path_regex(item, pattern):
"that matched the path pattern '{}'"
if not pattern.endswith("$"):
pattern += "$"
return re.match(pattern, item.path)
def match_quality(item, threshold):
"with an acceptable quality '{}'"
return item.quality >= threshold
def match_header_vals(item, header_vals):
"with the header values '{}'"
return all(item.header(k) == v for k, v in header_vals.items())
[docs]@attrs.define
class DataSink(DataColumn):
"""
A specification for a file group within a analysis to be derived from a
processing pipeline.
Parameters
----------
path : str
The path to the relative location the corresponding data items will be
stored within the rows of the data tree.
datatype : type
The file datatype or data type used to store the corresponding items
in the store dataset.
row_frequency : DataSpace
The row_frequency of the file-group within the dataset tree, e.g. per
'session', 'subject', 'timepoint', 'group', 'dataset'
salience : Salience
The salience of the specified file-group, i.e. whether it would be
typically of interest for publication outputs or whether it is just
a temporary file in a workflow, and stages in between
pipeline_name : str
The name of the workflow applied to the dataset to generates the data
for the sink
"""
salience: ColumnSalience = attrs.field(
default=ColumnSalience.supplementary,
converter=lambda s: ColumnSalience[str(s)] if s is not None else None,
)
pipeline_name: str = attrs.field(default=None)
is_sink = True
def match(self, row):
matches = [i for i in row.resolved(self.datatype) if i.path == self.path]
if not matches:
# Return a placeholder data item th.datatypebe set
return self.datatype(path=self.path, row=row, exists=False)
elif len(matches) > 1:
raise ArcanaDataMatchError(
"Found multiple matches " + self._error_msg(row, matches)
)
return matches[0]
def derive(self, ids=None):
self.dataset.derive(self.name, ids=ids)