Source code for arcana.core.data.format

import os
from pathlib import Path
import typing as ty
from itertools import chain
from copy import copy
import hashlib
import logging
import shutil
from abc import ABCMeta, abstractmethod
import attrs
from attrs.converters import optional
from pydra.engine.core import LazyField, Workflow
from arcana.core.utils import (
    class_location,
    parse_value,
    func_task,
    path2varname,
    CONVERTER_ANNOTATIONS,
)
from arcana.exceptions import (
    ArcanaUsageError,
    ArcanaNameError,
    ArcanaDataNotDerivedYetError,
    ArcanaFileFormatError,
    ArcanaFormatConversionError,
)
from ..enum import DataQuality


logger = logging.getLogger("arcana")


[docs]@attrs.define class DataItem(metaclass=ABCMeta): """ A representation of a file_group within the dataset. Parameters ---------- name_path : str The name_path to the relative location of the file group, i.e. excluding information about which row in the data tree it belongs to order : int | None The order in which the file-group appears in the row it belongs to (starting at 0). Typically corresponds to the acquisition order for scans within an imaging session. Can be used to distinguish between scans with the same series description (e.g. multiple BOLD or T1w scans) in the same imaging sessions. quality : str The quality label assigned to the file_group (e.g. as is saved on XNAT) row : DataRow The data row within a dataset that the file-group belongs to exists : bool Whether the file_group exists or is just a placeholder for a sink provenance : Provenance | None The provenance for the pipeline that generated the file-group, if applicable """ path: str = attrs.field() uri: str = attrs.field(default=None) order: int = attrs.field(default=None) quality: DataQuality = attrs.field(default=DataQuality.usable) exists: bool = attrs.field(default=True) provenance: ty.Dict[str, ty.Any] = attrs.field(default=None) row = attrs.field(default=None)
[docs] @abstractmethod def get(self, assume_exists=False): """Pulls data from the store (if remote) and caches locally Parameters ---------- assume_exists: bool If set, checks to see whether the item exists are skipped (used to pull data after a successful workflow run) """ raise NotImplementedError
[docs] @abstractmethod def put(self, value): """Updates the value of the item in the store to the provided value, pushing remotely if necessary. Parameters ---------- value : ty.Any The value to update """ raise NotImplementedError
@property def recorded_checksums(self): if self.provenance is None: return None else: return self.provenance.outputs[self.name_path] @provenance.validator def check_provenance(self, _, provenance): "Checks that the data item path is present in the provenance" if provenance is not None: if self.path not in provenance.outputs: raise ArcanaNameError( self.path, f"{self.path} was not found in outputs " f"{provenance.outputs.keys()} of provenance provenance " f"{provenance}", ) def _check_exists(self): if not self.exists: raise ArcanaDataNotDerivedYetError( self.path, f"Cannot access {self} as it hasn't been derived yet" ) def _check_part_of_row(self): if self.row is None: raise ArcanaUsageError( f"Cannot 'get' {self} as it is not part of a dataset" ) @classmethod def class_name(cls): return cls.__name__.lower() @classmethod def location(cls, relative=True): """Returns the location of the format class definition Parameters ---------- relative : bool, optional return the module location relative to `arcana.data.formats`, if applicable, by default True Returns ------- str the location of the class format in <module-path>:<class-name> """ loc = class_location(cls) if relative and loc.startswith("arcana.data.formats."): loc = loc[len("arcana.data.formats.") :] return loc
[docs]@attrs.define class Field(DataItem): """ A representation of a value field in the dataset. Parameters ---------- name_path : str The name_path to the relative location of the field, i.e. excluding information about which row in the data tree it belongs to derived : bool Whether or not the value belongs in the derived session or not row : DataRow The data row that the field belongs to exists : bool Whether the field exists or is just a placeholder for a sink provenance : Provenance | None The provenance for the pipeline that generated the field, if applicable """ value: int or float or str = attrs.field(converter=parse_value, default=None) def get(self, assume_exists=False): if not assume_exists: self._check_exists() self._check_part_of_row() self.value = self.row.dataset.store.get_field_value(self) def put(self, value): self._check_part_of_row() self.row.dataset.store.put_field_value(self, self.format(value)) self.exists = True def __int__(self): return int(self.value) def __float__(self): return float(self.value) def __str__(self): if self.format.__args__: # Sequence type val = "[" + ",".join(self._to_str(v) for v in self.value) + "]" else: val = self._to_str(self.value) return val def _to_str(self, val): if self.format is str: val = '"{}"'.format(val) else: val = str(val) return val def get_checksums(self): """ For duck-typing with file_groups in checksum management. Instead of a checksum, just the value of the field is used """ return self.value
def absolute_path(path): return Path(path).absolute() def absolute_paths_dict(dct): return {n: absolute_path(p) for n, p in dict(dct).items()}
[docs]@attrs.define class FileGroup(DataItem, metaclass=ABCMeta): """ A representation of a file_group within the dataset. Parameters ---------- name_path : str The name_path to the relative location of the file group, i.e. excluding information about which row in the data tree it belongs to order : int | None The order in which the file-group appears in the row it belongs to (starting at 0). Typically corresponds to the acquisition order for scans within an imaging session. Can be used to distinguish between scans with the same series description (e.g. multiple BOLD or T1w scans) in the same imaging sessions. quality : str The quality label assigned to the file_group (e.g. as is saved on XNAT) row : DataRow The data row within a dataset that the file-group belongs to exists : bool Whether the file_group exists or is just a placeholder for a sink provenance : Provenance | None The provenance for the pipeline that generated the file-group, if applicable fs_path : str | None Path to the primary file or directory on the local file system side_cars : ty.Dict[str, str] | None Additional files in the file_group. Keys should match corresponding side_cars dictionary in format. checksums : ty.Dict[str, str] A checksums of all files within the file_group in a dictionary sorted bys relative file name_paths """ fs_path: str = attrs.field(default=None, converter=optional(absolute_path)) _checksums: ty.Dict[str, str] = attrs.field(default=None, repr=False, init=False) # Alternative names for the file format, empty by default overridden in # sub-classes where necessary alternative_names = () HASH_CHUNK_SIZE = 2**20 # 1MB in calc. checksums to avoid mem. issues @fs_path.validator def validate_fs_path(self, _, fs_path): if fs_path is not None: if not fs_path.exists: raise ArcanaUsageError( "Attempting to set a path that doesn't exist " f"({fs_path})" ) if not self.exists: raise ArcanaUsageError( "Attempting to set a path to a file group that hasn't " f"been derived yet ({fs_path})" ) def get(self, assume_exists=False): if assume_exists: self.exists = True self._check_part_of_row() fs_paths = self.row.dataset.store.get_file_group_paths(self) self.exists = True self.set_fs_paths(fs_paths) self.validate_file_paths() def put(self, *fs_paths): self._check_part_of_row() fs_paths = [Path(p) for p in fs_paths] dir_paths = list(p for p in fs_paths if p.is_dir()) if len(dir_paths) > 1: dir_paths_str = "', '".join(str(p) for p in dir_paths) raise ArcanaFileFormatError( f"Cannot put more than one directory, {dir_paths_str}, as part " f"of the same file group {self}" ) # Make a copy of the file-group to validate the local paths and auto-gen # any defaults before they are pushed to the store cpy = copy(self) cpy.exists = True cpy.set_fs_paths(fs_paths) cache_paths = self.row.dataset.store.put_file_group_paths(self, cpy.fs_paths) # Set the paths to the cached files self.exists = True self.set_fs_paths(cache_paths) self.validate_file_paths() # Save provenance if self.provenance: self.row.dataset.store.put_provenance(self) @property def fs_paths(self): """All base paths (i.e. not nested within directories) in the file group""" if self.fs_path is None: raise ArcanaUsageError( f"Attempting to access file path of {self} before it is set" ) return [self.fs_path] @classmethod def fs_names(cls): """Return names for each top-level file-system path in the file group, used when generating Pydra task interfaces. Returns ------- tuple[str] sequence of names for top-level file-system paths in the file group""" return ("fs_path",) @classmethod def matches_format_name(cls, name: str): """Checks to see whether the provided name is a valid name for the file format. Alternative names can be provided for format-specific subclasses, or this method can be overridden. Matches are case insensitive. Parameters ---------- name : str Name to match Returns ------- bool whether or not the name matches the format """ return name.lower() in [ n.lower() for n in (cls.class_name(),) + cls.alternative_names ] @property def value(self): return str(self.fs_path) @property def checksums(self): if self._checksums is None: self.get_checksums() return self._checksums def get_checksums(self, force_calculate=False): self._check_exists() # Load checksums from store (e.g. via API) if self.row is not None and not force_calculate: self._checksums = self.row.dataset.store.get_checksums(self) # If the store cannot calculate the checksums do them manually else: self._checksums = self.calculate_checksums() def calculate_checksums(self): self._check_exists() checksums = {} for fpath in self.all_file_paths(): fhash = hashlib.md5() with open(fpath, "rb") as f: # Calculate hash in chunks so we don't run out of memory for # large files. for chunk in iter(lambda: f.read(self.HASH_CHUNK_SIZE), b""): fhash.update(chunk) checksums[fpath] = fhash.hexdigest() checksums = self.generalise_checksum_keys(checksums) return checksums def contents_equal(self, other, **kwargs): """ Test the equality of the file_group contents with another file_group. If the file_group's format implements a 'contents_equal' method than that is used to determine the equality, otherwise a straight comparison of the checksums is used. Parameters ---------- other : FileGroup The other file_group to compare to """ self._check_exists() other._check_exists() return self.checksums[self.fs_path.name] == other.checksums[other.fs_path.name] @classmethod def resolve(cls, unresolved): """Resolve file group loaded from a repository to the specific format Parameters ---------- unresolved : UnresolvedFileGroup A file group loaded from a repository that has not been resolved to a specific format yet Returns ------- FileGroup The resolved file-group object Raises ------ ArcanaUnresolvableFormatException If there doesn't exist a unique resolution from the unresolved file group to the given format, then an ArcanaFileFormatError should be raised """ # Perform matching based on resource names in multi-format # file-group if unresolved.uris is not None: item = None for format_name, uri in unresolved.uris.items(): if cls.matches_format_name(format_name): item = cls(uri=uri, **unresolved.item_kwargs) if item is None: raise ArcanaFileFormatError( f"Could not file a matching resource in {unresolved.path} for" f" the given format ({cls.class_name()}), found " "('{}')".format("', '".join(unresolved.uris)) ) else: item = cls(**unresolved.item_kwargs) item.set_fs_paths(unresolved.file_paths) return item @abstractmethod def set_fs_paths(self, fs_paths: ty.List[Path]): """Set the file paths of the file group Parameters ---------- fs_paths : list[Path] The candidate paths from which to set the paths of the file group from. Note that not all paths need to be set if they are not relevant. Raises ------ ArcanaFileFormatError is raised if the required the paths cannot be set from the provided """ @classmethod def from_fs_paths(cls, *fs_paths: ty.List[Path], path=None): """Create a FileGroup object from a set of file-system paths Parameters ---------- fs_paths : list[Path] The candidate paths from which to set the paths of the file group from. Note that not all paths need to be set if they are not relevant. path : str, optional the location of the file-group relative to the node it (will) belong to. Defaults to Returns ------- FileGroup The created file-group """ if path is None: path = fs_paths[0].stem obj = cls(path) obj.set_fs_paths(fs_paths) return obj @classmethod def matches_ext(cls, *paths, ext=None): """Returns the path out of the candidates provided that matches the given extension (by default the extension of the class) Parameters ---------- *paths: list[Path] The paths to select from ext: str or None the extension to match (defaults to 'ext' attribute of class) Returns ------- Path the matching path Raises ------ ArcanaFileFormatError When no paths match or more than one path matches the given extension""" if ext is None: ext = cls.ext matches = [str(p) for p in paths if str(p).endswith("." + ext)] if not matches: paths_str = ", ".join(str(p) for p in paths) raise ArcanaFileFormatError( f"No matching files with '{ext}' extension found in " f"file group {paths_str}" ) elif len(matches) > 1: matches_str = ", ".join(matches) raise ArcanaFileFormatError( f"Multiple files with '{ext}' extension found in : {matches_str}" ) return matches[0] def validate_file_paths(self): attrs.validate(self) self.exists = True def _check_paths_exist(self, fs_paths: ty.List[Path]): if missing := [p for p in fs_paths if not p or not Path(p).exists()]: missing_str = "\n".join(str(p) for p in missing) all_str = "\n".join(str(p) for p in fs_paths) msg = ( f"The following file system paths provided to {self} do not " f"exist:\n{missing_str}\n\nFrom full list:\n{all_str}" ) for fs_path in missing: if fs_path: if fs_path.parent.exists(): msg += ( f"\n\nFiles in the directory '{str(fs_path.parent)}' are:\n" ) msg += "\n".join(str(p) for p in fs_path.parent.iterdir()) raise ArcanaFileFormatError(msg) def convert_to(self, to_format, **kwargs): """Convert the FileGroup to a new format Parameters ---------- to_format : type the file-group format to convert to **kwargs args to pass to the conversion process Returns ------- FileGroup the converted file-group """ task = to_format.converter_task( from_format=type(self), name="converter", **kwargs ) task.inputs.to_convert = self result = task(plugin="serial") return result.output.converted @classmethod def converter_task(cls, from_format, name, **kwargs): """Adds a converter row to a workflow Parameters ---------- from_format : type the file-group class to convert from taks_name: str the name for the converter task **kwargs: dict[str, ty.Any] keyword arguments passed through to the converter Returns ------- Workflow Pydra workflow to perform the conversion with an input called 'to_convert' and an output called 'converted', which take and produce file-groups in `from_format` and `cls` types respectively. """ wf = Workflow(name=name, input_spec=["to_convert"]) # Get row to extract paths from file-group lazy field wf.add( func_task( access_paths, in_fields=[("from_format", type), ("file_group", from_format)], out_fields=[(i, Path) for i in from_format.fs_names()], # name='extract', from_format=from_format, file_group=wf.lzin.to_convert, ) ) # Aggregate converter inputs and combine with fixed keyword args conv_inputs = { n: getattr(wf.access_paths.lzout, n) for n in from_format.fs_names() } conv_inputs.update(kwargs) # Create converter node converter, output_lfs = cls.find_converter(from_format)(**conv_inputs) # If there is only one output lazy field, place it in a tuple so it can # be zipped with cls.fs_names() if isinstance(output_lfs, LazyField): output_lfs = (output_lfs,) # converter.name = 'converter' # for lf in output_lfs: # lf.name = 'converter' wf.add(converter) # Encapsulate output paths from converter back into a file group object to_encapsulate = dict(zip(cls.fs_names(), output_lfs)) logger.debug("Paths to encapsulate are:\n%s", to_encapsulate) wf.add( func_task( encapsulate_paths, in_fields=[("to_format", type), ("to_convert", from_format)] + [(o, ty.Union[str, Path]) for o in cls.fs_names()], out_fields=[("converted", cls)], # name='encapsulate', to_format=cls, to_convert=wf.lzin.to_convert, **to_encapsulate, ) ) wf.set_output(("converted", wf.encapsulate_paths.lzout.converted)) return wf @classmethod def find_converter(cls, from_format): """Selects the converter method from the given format. Will select the most specific conversion. Parameters ---------- from_format : type The format type to convert from Returns ------- function The bound method that adds rows to a given workflow Raises ------ ArcanaFormatConversionError _description_ """ converter = None for attr_name in dir(cls): meth = getattr(cls, attr_name) try: converts_from = meth.__annotations__[CONVERTER_ANNOTATIONS] except (AttributeError, KeyError): pass else: if from_format is converts_from or issubclass( from_format, converts_from ): if converter: prev_converts_from = converter.__annotations__[ CONVERTER_ANNOTATIONS ] if issubclass(converts_from, prev_converts_from): converter = meth elif not issubclass(prev_converts_from, converts_from): raise ArcanaFormatConversionError( f"Ambiguous converters between {from_format} " f"and {cls}: {converter} and {meth}. Please " f"define a specific converter from {from_format} " f"(i.e. instead of from {prev_converts_from} " f"and {converts_from} respectively)" ) else: converter = meth if not converter: raise ArcanaFormatConversionError( f"No converters defined between {from_format} and {cls}" ) return converter def generalise_checksum_keys( self, checksums: ty.Dict[str, str], base_path: Path = None ): """Generalises the paths used for the file paths in a checksum dictionary so that they are the same irrespective of that the top-level file-system paths are Parameters ---------- checksums: dict[str, str] The checksum dict mapping relative file paths to checksums Returns ------- dict[str, str] The checksum dict with file paths generalised""" if base_path is None: base_path = self.fs_path return {str(Path(k).relative_to(base_path)): v for k, v in checksums.items()} @classmethod def access_contents_task(cls, file_group_lf: LazyField): """Access the fs paths of the file group""" @classmethod def from_fs_path(cls, fs_path): file_group = cls(path=Path(fs_path).stem) file_group.set_fs_paths([fs_path]) return file_group @classmethod def append_ext(cls, path: Path): if path.ext is not None: path = path.with_suffix(cls.ext) return path @classmethod def all_exts(cls): return [""]
def access_paths(from_format, file_group): """Copies files into the CWD renaming so the basenames match except for extensions""" logger.debug( "Extracting paths from %s (%s format) before conversion", file_group, from_format, ) cpy = file_group.copy_to(path2varname(file_group.path), symlink=True) return cpy.fs_paths if len(cpy.fs_paths) > 1 else cpy.fs_path def encapsulate_paths( to_format: type, to_convert: FileGroup, **fs_paths: ty.List[Path] ): """Copies files into the CWD renaming so the basenames match except for extensions""" logger.debug( "Encapsulating %s into %s format after conversion", fs_paths, to_format ) file_group = to_format(to_convert.path + "_" + to_format.class_name()) file_group.set_fs_paths(fs_paths.values()) return file_group
[docs]@attrs.define class BaseFile(FileGroup): is_dir = False def set_fs_paths(self, fs_paths: ty.List[Path]): self._check_paths_exist(fs_paths) fs_path = absolute_path(self.matches_ext(*fs_paths)) self.exists = True self.fs_path = fs_path def all_file_paths(self): """The paths of all nested files within the file-group""" if self.fs_path is None: raise ArcanaUsageError( f"Attempting to access file paths of {self} before they are set" ) return self.fs_paths def copy_to(self, fs_path: str or Path, symlink: bool = False): """Copies the file-group to the new path, with auxiliary files saved alongside the primary-file path. Parameters ---------- path : str Path to save the file-group to excluding file extensions symlink : bool Use symbolic links instead of copying files to new location Returns ------- BaseFile A copy of the file object at the new file system path """ if symlink: copy_file = os.symlink else: copy_file = shutil.copyfile dest_path = Path(str(fs_path) + "." + self.ext) copy_file(self.fs_path, dest_path) cpy = copy(self) cpy.set_fs_paths([dest_path]) return cpy @classmethod def copy_ext(cls, old_path, new_path): """Copy extension from the old path to the new path, ensuring that all of the extension is used (e.g. 'nii.gz' instead of 'gz') Parameters ---------- old_path: Path or str The path from which to copy the extension from new_path: Path or str The path to append the extension to Returns ------- Path The new path with the copied extension """ if not cls.matches_ext(old_path): raise ArcanaFileFormatError( f"Extension of old path ('{str(old_path)}') does not match that " f"of file, '{cls.ext}'" ) return Path(new_path).with_suffix("." + cls.ext) @classmethod def all_exts(cls): return [cls.ext]
[docs]@attrs.define class WithSideCars(BaseFile): """Base class for file-groups with a primary file and several header or side car files """ side_cars: ty.Dict[str, str] = attrs.field(converter=optional(absolute_paths_dict)) @side_cars.default def default_side_cars(self): if self.fs_path is None: return {} return self.default_side_car_paths(self.fs_path) @side_cars.validator def validate_side_cars(self, _, side_cars): if side_cars: if self.fs_path is None: raise ArcanaUsageError( "Auxiliary files can only be provided to a FileGroup " f"of '{self.path}' ({side_cars}) if the local path is " "as well" ) if set(self.side_car_exts) != set(side_cars.keys()): raise ArcanaUsageError( "Keys of provided auxiliary files ('{}') don't match " "format ('{}')".format( "', '".join(side_cars.keys()), "', '".join(self.side_car_exts) ) ) missing_side_cars = [(n, f) for n, f in side_cars.items() if not f.exists()] if missing_side_cars: msg = ( f"Attempting to set paths of auxiliary files for {self} " "that don't exist: " ) for name, fpath in missing_side_cars: if fpath.parent.exists(): info = "neighbouring files: " + ", ".join( p.name for p in fpath.parent.iterdir() ) else: info = "parent directory doesn't exist" msg += f"\n {name}: {str(fpath)} - {info}" raise ArcanaUsageError(msg) @classmethod def fs_names(cls): """Return names for each top-level file-system path in the file group, used when generating Pydra task interfaces. Returns ------- tuple[str] sequence of names for top-level file-system paths in the file group""" return super().fs_names() + cls.side_car_exts def set_fs_paths(self, paths: ty.List[Path]): super().set_fs_paths(paths) to_assign = set(Path(p) for p in paths) to_assign.remove(self.fs_path) # Begin with default side_car paths and override if provided default_side_cars = self.default_side_car_paths(self.fs_path) for sc_ext in self.side_car_exts: try: matched = self.side_cars[sc_ext] = absolute_path( self.matches_ext(*paths, ext=sc_ext) ) except ArcanaFileFormatError: self.side_cars[sc_ext] = default_side_cars[sc_ext] else: to_assign.remove(matched) @property def fs_paths(self): return chain(super().fs_paths, self.side_cars.values()) def side_car(self, name): return self.side_cars[name] def copy_to(self, fs_path: str or Path, symlink: bool = False): """Copies the file-group to the new path, with auxiliary files saved alongside the primary-file path. Parameters ---------- fs_path : str or Path Path to save the file-group to excluding file extensions symlink : bool Use symbolic links instead of copying files to new location """ if symlink: copy_file = os.symlink else: copy_file = shutil.copyfile dest_path = Path(str(fs_path) + "." + self.ext) copy_file(self.fs_path, dest_path) dest_side_cars = self.default_side_car_paths(dest_path) for sc_ext, sc_path in self.side_cars.items(): copy_file(sc_path, dest_side_cars[sc_ext]) cpy = copy(self) cpy.set_fs_paths([dest_path] + list(dest_side_cars.values())) return cpy @classmethod def default_side_car_paths(cls, primary_path): """ Get the default paths for auxiliary files relative to the path of the primary file, i.e. the same name as the primary path with a different extension Parameters ---------- primary_path : str Path to the primary file in the file_group Returns ------- aux_paths : ty.Dict[str, str] A dictionary of auxiliary file names and default paths """ return { e: Path(str(primary_path)[: -len(cls.ext)] + e) for e in cls.side_car_exts } @classmethod def copy_ext(cls, old_path, new_path): """Copy extension from the old path to the new path, ensuring that all of the extension is used (e.g. 'nii.gz' instead of 'gz'). If the old path extension doesn't match the primary path, the methods loops through all side-car extensions and selects the longest matching. Parameters ---------- old_path: Path or str The path from which to copy the extension from new_path: Path or str The path to append the extension to Returns ------- Path The new path with the copied extension """ try: # Check to see if the path it matches the primary path extension return super().copy_ext(old_path, new_path) except ArcanaFileFormatError: pass matches = [] for ext in cls.side_car_exts: try: cls.matches_ext(old_path, ext=ext) except ArcanaFileFormatError: pass else: matches.append(ext) if not matches: sc_exts_str = "', '".join(cls.side_car_exts) raise ArcanaFileFormatError( f"Extension of old path ('{str(old_path)}') does not match any " f" in {cls}: '{cls.ext}', {sc_exts_str}" ) longest_match = max(matches, key=len) return Path(new_path).with_suffix("." + longest_match) def generalise_checksum_keys( self, checksums: ty.Dict[str, str], base_path: Path = None ): """Generalises the paths used for the file paths in a checksum dictionary so that they are the same irrespective of that the top-level file-system paths are Parameters ---------- checksums: dict[str, str] The checksum dict mapping relative file paths to checksums Returns ------- dict[str, str] The checksum dict with file paths generalised""" if base_path is None: base_path = self.fs_path generalised = {} fs_name_dict = { self.matches_ext(*checksums.keys(), ext=e): e for e in self.side_car_exts } mapped_exts = list(fs_name_dict.values()) duplicates = set([e for e in mapped_exts if mapped_exts.count(e) > 1]) if duplicates: raise ArcanaUsageError( f"Multiple files with same extensions found in {self}: " + ", ".join(str(k) for k in checksums.keys()) ) for key, chksum in checksums.items(): try: rel_key = fs_name_dict[str(key)] except KeyError: try: rel_key = Path(key).relative_to(base_path) except ValueError: continue # skip these files generalised[str(rel_key)] = chksum return generalised @classmethod def all_exts(cls): return [cls.ext] + list(cls.side_car_exts)
[docs]@attrs.define class BaseDirectory(FileGroup): is_dir = True content_types = () # By default, don't check contents for any types def set_fs_paths(self, fs_paths: ty.List[Path]): self._check_paths_exist(fs_paths) matches = [p for p in fs_paths if Path(p).is_dir() and self.contents_match(p)] types_str = ", ".join(t.__name__ for t in self.content_types) if not matches: raise ArcanaFileFormatError( f"No matching directories with contents matching {types_str} amongst " f"{fs_paths}" ) elif len(matches) > 1: matches_str = ", ".join(str(m) for m in matches) raise ArcanaFileFormatError( f"Multiple directories with contents matching {types_str}: " f"{matches_str}" ) self.exists = True self.fs_path = absolute_path(matches[0]) @classmethod def contents_match(cls, path: Path): from arcana.core.data.row import UnresolvedFileGroup path = Path(path) # Ensure a Path object not a string contents = UnresolvedFileGroup.from_paths(path, path.iterdir()) for content_type in cls.content_types: resolved = False for unresolved in contents: try: content_type.resolve(unresolved) except ArcanaFileFormatError: pass else: resolved = True break if not resolved: return False return True def all_file_paths(self): "Iterates through all files in the group and returns their file paths" if self.fs_path is None: raise ArcanaUsageError( f"Attempting to access file paths of {self} before they are set" ) return chain( *( (Path(root) / f for f in files) for root, _, files in os.walk(self.fs_path) ) ) def copy_to(self, fs_path: str, symlink: bool = False): """Copies the file-group to the new path, with auxiliary files saved alongside the primary-file path. Parameters ---------- fs_path : str Path to save the file-group to excluding file extensions symlink : bool Use symbolic links instead of copying files to new location """ if symlink: copy_dir = os.symlink else: copy_dir = shutil.copytree copy_dir(self.fs_path, fs_path) cpy = copy(self) cpy.set_fs_paths([fs_path]) return cpy