Processing#

All data processing in Arcana is performed by the Pydra dataflow engine, which specifies a declarative domain-specific language (DSL) for creating workflows in Python, and enables the execution of workflows to be split over multiple cores or high-performance clusters (i.e. SLURM and SGE).

Processed derivatives are computed by “pipelines”, modular Pydra workflows that connect dataset columns (see Frames: Rows and Columns). Pipeline outputs are always connected to sink columns, whereas pipeline inputs can draw data from either source columns or sink columns containing derivatives generated by prerequisite pipelines.

By connecting pipeline inputs to the outputs of other pipelines, complex processing chains/webs can be created (reminiscent of a makefile), in which intermediate products will be stored in the dataset for subsequent analysis. Alternatively, Analysis classes can be used to implement processing chains/webs independently of a specific dataset so that they can be applied to new studies in a reproducible way. If required, Analysis classes can be customised for particular use cases via combination of new parameterisations and overriding selected methods in subclasses (see Designing Analyses).

Note

While a general-purpose dataflow engine, Pydra was developed in the neuroimaging field and so the existing task interfaces wrap common neuroimaging tools. Therefore, you will need to create your own Pydra wrappers for other tools used in other domains.

Pydra workflows#

Pydra workflows, or individual Pydra tasks, can be applied to dataset in order to operate on the data within it. Workflows are connected from source or sink columns to sink columns. During the application process Pipeline objects are created to wrap the workflow and prepend and append additional tasks to

  • iterate over the relevant rows in the dataset

  • manage storage and retrieval of data to and from the data store

  • convert between between mismatching file formats

  • write provenance metadata

  • check saved provenance metadata to ensure prerequisite derivatives were generated with equivalent parameterisations and software versions (and potentially reprocess them if not)

To connect a workflow via the CLI mapping the inputs and outputs of the Pydra workflow/task (in_file, peel and out_file in the example below) to appropriate columns in the dataset (T1w, T2w and freesurfer/recon-all respectively)

$ arcana dataset add-source 'myuni-xnat//myproject:training' T1w \
  medimage:Dicom --path '.*mprage.*' --regex

$ arcana dataset add-source 'myuni-xnat//myproject:training' T2w \
  medimage:Dicom --path '.*t2spc.*' --regex

$ arcana dataset add-sink 'myuni-xnat//myproject:training' freesurver/recon-all \
  common:Zip

$ arcana apply pipeline 'myuni-xnat//myproject:training' freesurfer \
  pydra.tasks.freesurfer:Freesurfer \
  --input T1w in_file medimage:NiftiGz \
  --input T2w peel medimage:NiftiGz \
  --output freesurfer/recon-all out_file common:Directory \
  --parameter param1 10 \
  --parameter param2 20

If there is a mismatch in the data format (see Items) between the workflow inputs/outputs and the columns they are connected to, a format conversion task will be inserted into the pipeline if converter method between the two formats exists (see File formats).

To add a workflow to a dataset via the API use the Dataset.apply_pipeline() method

from pydra.tasks.freesurfer import Freesurfer
from arcana.data.formats import common, medimage

dataset = Dataset.load('myuni-xnat//myproject:training')

dataset.add_source('T1w', format=medimage.Dicom, path='.*mprage.*',
                   is_regex=True)
dataset.add_source('T2w', format=medimage.Dicom, path='.*t2spc.*',
                   is_regex=True)

dataset.add_sink('freesurfer/recon-all', common.Directory)

dataset.apply_pipeline(
    workflow=Freesurfer(
        name='freesurfer,
        param1=10.0,
        param2=20.0),
    inputs=[('T1w', 'in_file', medimage.NiftiGz),
            ('T2w', 'peel', medimage.NiftiGz)],
    outputs=[('freesurfer/recon-all', 'out_file', common.Directory)])

dataset.save()

If the source can be referenced by its path alone and the formats of the source and sink columns match those expected and produced by the workflow, then you can all add the sources and sinks in one step

$ arcana apply pipeline 'file///data/enigma/alzheimers:test' segmentation \
  pydra.tasks.fsl.preprocess.fast:FAST \
  --source T1w in_file medimage:NiftiGz \
  --sink fast/gm gm medimage:NiftiGz \
  --parameter method a-method

By default, pipelines will iterate all “leaf rows” of the data tree (e.g. session for datasets in the Clinical space). However, pipelines can be run at any row row_frequency of the dataset (see Spaces), e.g. per subject, per timepoint, or on the dataset as a whole (to create single templates/statistics).

Pipeline outputs must be connected to sinks of the same row row_frequency. However, inputs can be drawn from columns of any row row_frequency. In this case, inputs from more frequent rows will be provided to the pipeline as a list sorted by their ID.

For example, when the pipeline in the following code-block runs, it will receive a list of T1w filenames, run one workflow row, and then sink a single template back to the dataset.

from myworkflows import vbm_template
from arcana.data.formats import common, medimage
from arcana.data.spaces.medimage import Clinical

dataset = Dataset.load('bids///data/openneuro/ds00014')

# Add sink column with "dataset" row row_frequency
dataset.add_sink(
    name='vbm_template',
    format=medimage.NiftiGz
    row_frequency='dataset')

# NB: we don't need to add the T1w source as it is automatically detected
#     when using BIDS

# Connect pipeline to a "dataset" row-row_frequency sink column. Needs to be
# of `dataset` row_frequency itself or Arcana will raise an error
dataset.apply_pipeline(
    name='vbm_template',
    workflow=vbm_template(),
    inputs=[('in_file', 'T1w')],
    outputs=[('out_file', 'vbm_template')],
    row_frequency='dataset')

Analysis classes#

Analysis classes are used to implement pipeline chains/webs that can be applied to types of datasets in a reproducible manner. The syntax used is an extension of the attrs package (see https://www.attrs.org/en/stable/extending.html). In this syntax, member attributes are either free parameters or placeholders for columns in the dataset the analysis is applied to. Decorated “pipeline builder” methods construct the pipelines to perform the analysis.

The following toy example has two column placeholders, recorded_datafile and recorded_metadata, to be linked to source data (Line 13 & 14), and three column placeholders, preprocessed, derived_image and summary_metric (Line 15-17) that can be derived by pipelines created by one of the two implemented pipeline builder methods preprocess_pipeline (Line 26) and create_image_pipeline (Line 56).

The arcana.core.mark.analysis() decorator is used to specify an analysis class (Line 6), taking the dataset space that the class operates on as an argument. By default, class attributes are assumed to be column placeholders of arcana.core.mark.column() type (Line 13-17). Class attributes can also be free parameters of the analysis by using the arcana.core.mark.parameter() instead (Line 21).

The arca.acore.mark.pipeline() decorator specifies pipeline builder methods, and takes the columns the pipeline outputs are connected to as arguments (Line 26 & 54). More details on the design of analysis classes see Designing Analyses.

 1import pydra
 2from some.example.pydra.tasks import Preprocess, ExtractFromJson, MakeImage
 3from arcana.core.mark import analysis, pipeline, parameter
 4from arcana.data.spaces.example import ExampleDataSpace
 5from arcana.data.formats.common import Zip, Directory, Json, Png, Gif
 6
 7@analysis(ExampleDataSpace)
 8class ExampleAnalysis():
 9
10    # Define the columns for the dataset along with their formats.
11    # The `column` decorator can be used to specify additional options but
12    # is not required by default. The data formats specify the format
13    # that the column data will be stored in
14    recorded_datafile: Zip  # Not derived by a pipeline, should be linked to existing dataset column
15    recorded_metadata: Json  # "     "     "     "
16    preprocessed: Zip  # Derived by 'preprocess_pipeline' pipeline
17    derived_image: Png  # Derived by 'create_image_pipeline' pipeline
18    summary_metric: float  # Derived by 'create_image_pipeline' pipeline
19
20    # Define an analysis-wide parameters that can be used in multiple
21    # pipelines/tasks
22    contrast: float = parameter(default=0.5)
23    kernel_fwhms: list[float] = parameter(default=[0.5, 0.3, 0.1])
24
25    # Define a "pipeline builder method" to generate the 'preprocessed'
26    # derivative. Arcana automagically maps column names to arguments of the
27    # builder methods.
28    @pipeline(preprocessed)
29    def preprocess_pipeline(
30            self,
31            wf: pydra.Workflow,
32            recorded_datafile: Directory,  # Automatic conversion from stored Zip format before pipeline is run
33            recorded_metadata):  # Format/format is the same as class definition so can be omitted
34
35        # A simple task to extract the "temperature" field from a JSON
36        # metadata
37        wf.add(
38            ExtractFromJson(
39                name='extract_metadata',
40                in_file=recorded_metadata,
41                field='temperature'))
42
43        # Add tasks to the pipeline using Pydra workflow syntax
44        wf.add(
45            Task1(
46                name='preprocess',
47                in_file=recorded_datafile,
48                temperature=wf.extract_metadata.lzout.out_field))
49
50        # Map the output of the pipeline to the "preprocessed" column specified
51        # in the @pipeline decorator
52        return preprocess.lzout.out_file
53
54    # The 'create_image' pipeline derives two columns 'derived_image' (in GIF format) and
55    # 'summary_metric'. Since the output format of derived image created by the pipeline ('Gif')
56    # differs from that specified for the column ('Png'), an automatic conversion
57    # step will be added by Arcana before the image is stored.
58    @pipeline((derived_image, Gif),
59              summary_metric)
60    def create_image_pipeline(
61            self,
62            wf,
63            preprocessed: Directory,  # Automatic conversion from stored Zip format before pipeline is run
64            contrast: float):  # Parameters are also automagically mapped to method args
65
66        # Add a task that creates an image from the preprocessed data, using
67        # the 'contrast' parameter
68        wf.add(
69            MakeImage(
70                name="create_image",
71                in_file=preprocessed,
72                contrast=contrast))
73
74        return create_image.lzout.out_file, wf.create_image.lzout.summary

To apply an analysis via the command-line use the --column flag to connect column specs in the class with existing columns in the dataset.

$ arcana apply analysis 'file///data/a-dataset' example:ExampleAnalysis \
  --column recorded_datafile datafile \
  --column recorded_metadata metadata \
  --parameter contrast 0.75

Analyses are applied to datasets using the Python API with the Dataset.apply() method. Dataset.apply() takes an Analysis object that is instantiated with the names of columns in the dataset to link placeholders to and any parameters.

from arcana.core.data.set import Dataset
from arcana.data.formats.common import Yaml
from arcana.analyses.example import ExampleAnalysis

a_dataset = Dataset.load('file///data/a-dataset')

dataset.add_source(
    name='datafile',
    path='a-long-arbitrary-name',
    format=Zip)

dataset.add_source(
    name='metadata',
    path='another-long-arbitrary-name',
    format=Yaml)  # The format the data is in the dataset, will be automatically converted

dataset.apply(
    ExampleAnalysis(
        recorded_datafile='datafile',
        recorded_metadata='metadata',
        contrast=0.75))

Generating derivatives#

After workflows and/or analysis classes have been connected to a dataset, derivatives can be generated using Dataset.derive() or alternatively DataColumn.derive() for single columns. These methods check the data store to see whether the source data is present and executes the pipelines over all rows of the dataset with available source data. If pipeline inputs are sink columns to be derived by prerequisite pipelines, then the prerequisite pipelines will be prepended onto the execution stack.

To generate derivatives via the CLI

$ arcana derive column 'myuni-xnat//myproject:training' freesurfer/recon-all

To generate derivatives via the API

dataset = Dataset.load('file///data/openneuro/ds00014:test')

dataset.derive('fast/gm', cache_dir='/work/temp-dir')

# Print URI of generated dataset
print(dataset['fast/gm']['sub11'].uri)

By default Pydra uses the “concurrent-futures” (‘cf’) plugin, which splits workflows over multiple processes. You can specify which plugin, and thereby how the workflow is executed via the pydra_plugin option, and pass options to it with pydra_option.

$ arcana derive column 'myuni-xnat//myproject:training' freesurfer/recon-all \
  --plugin slurm --pydra-option poll_delay 5 --pydra-option max_jobs 10

To list the derivatives that can be derived from a dataset after workflows have been applied you can use the menu command

$ arcana derive menu 'file///data/a-dataset'

Derivatives
-----------
recorded_datafile (zip)
recorded_metadata (json)
preprocessed (zip)
derived_image (png)
summary_metric (float)

Parameters
----------
contrast (float) default=0.5
kernel_fwhms (list[float]) default=[0.5, 0.3, 0.1]

For large analysis classes with many column specs this list could become overwhelming, so when designing an analysis class it is good practice to set the “salience” of columns and parameters (see DataColumn and parameter specification). The menu can then be filtered to show only the more salient columns (the default is to only show “supplementary” and above). Parameters can similarly be filtered by their salience (see ParameterSalience), by default only showing parameters “check” and above. For example, the following menu call will show all columns and parameters with salience >= ‘qa’ and ‘recommended’, respectively.

$ arcana derive menu 'file///data/another-dataset' --columns qa --parameters recommended

The salience_threshold argument can also be used to filter out derivatives from the data store when applying an analysis to a dataset. This allows the user to control how much derivative data are saved to avoid filling up (potentially expensive) storage. The following call will only attempt to store data columns with “qa” or greater salience in XNAT, keeping the remaining only in local cache.

$ arcana apply analysis 'my-unis-xnat//MYPROJECT:test' example:ExampleAnalysis \
  --link recorded_datafile datafile \
  --link recorded_metadata metadata \
  --parameter contrast 0.75 \
  --salience-threshold qa

Provenance#

Provenance metadata is saved alongside derivatives in the data store. The metadata includes:

  • MD5 Checksums of all pipeline inputs and outputs

  • Full workflow graph with connections between, and parameterisations of, Pydra tasks

  • Container image tags for tasks that ran inside containers

  • Python dependencies and versions used.

How these provenance metadata are stored will depend on the type data store, but often it will be stored in a JSON file. For example, a provenance JSON file would look like

{
  "store": {
    "class": "<arcana.data.stores.medimage.xnat.api:Xnat>",
    "server": "https://central.xnat.org"
  },
  "dataset": {
    "id": "MYPROJECT",
    "name": "passed-dwi-qc",
    "exclude": ['015', '101']
    "id_inference": [
      ["subject", "(?P<group>TEST|CONT)(?P<member>\d+3)"]
    ]
  },
  "pipelines": [
    {
      "name": "anatomically_constrained_tractography",
      "inputs": {
        // MD5 Checksums for all files in the file group. "." refers to the
        // "primary file" in the file group.
        "T1w_reg_dwi": {
          "format": "<arcana.data.formats.medimage:NiftiGzX>",
          "checksums": {
            ".": "4838470888DBBEADEAD91089DD4DFC55",
            "json": "7500099D8BE29EF9057D6DE5D515DFFE"
          }
        },
        "T2w_reg_dwi": {
          "format": "<arcana.data.formats.medimage:NiftiGzX>",
          "checksums": {
            ".": "4838470888DBBEADEAD91089DD4DFC55",
            "json": "5625E881E32AE6415E7E9AF9AEC59FD6"
          }
        },
        "dwi_fod": {
          "format": "<arcana.data.formats.medimage:MrtrixImage>",
          "checksums": {
            ".": "92EF19B942DD019BF8D32A2CE2A3652F"
          }
        }
      },
      "outputs": {
        "wm_tracks": {
          "pydra_task": "tckgen",
          "pydra_field": "out_file",
          "format": "<arcana.data.formats.medimage:MrtrixTrack>",
          "checksums": {
            ".": "D30073044A7B1239EFF753C85BC1C5B3"
          }
        }
      }
      "workflow": {
        "name": "workflow",
        "class": "<pydra.engine.core:Workflow>",
        "tasks": {
          "5ttgen": {
            "class": "<pydra.tasks.mrtrix3.preprocess:FiveTissueTypes>",
            "package": "pydra-mrtrix",
            "version": "0.1.1",
            "inputs": {
              "in_file": {
                "pydra_field": "T1w_reg_dwi"
              }
              "t2": {
                "pydra_field": "T1w_reg_dwi"
              }
              "sgm_amyg_hipp": true
            },
            "container": {
              "type": "docker",
              "image": "mrtrix3/mrtrix3:3.0.3"
            }
          },
          "tckgen": {
            "class": "<pydra.tasks.mrtrix3.tractography:TrackGen>",
            "package": "pydra-mrtrix",
            "version": "0.1.1",
            "inputs": {
              "in_file": {
                "pydra_field": "dwi_fod"
              },
              "act": {
                "pydra_task": "5ttgen",
                "pydra_field": "out_file"
              },
              "select": 100000000,
            },
            "container": {
              "type": "docker",
              "image": "mrtrix3/mrtrix3:3.0.3"
            }
          },
        },
      },
      "execution": {
        "machine": "hpc.myuni.edu",
        "processor": "intel9999",
        "python-packages": {
          "pydra-mrtrix3": "0.1.0",
          "arcana-medimage": "0.1.0"
        }
      },
    },
  ],
}

Before derivatives are generated, provenance metadata of prerequisite derivatives (i.e. inputs of the pipeline and prerequisite pipelines, etc…) are checked to see if there have been any alterations to the configuration of the pipelines that generated them. If so, any affected rows will not be processed, and a warning will be generated by default. To override this behaviour and reprocesse the derivatives, set the reprocess flag when calling Dataset.derive()

dataset.derive('fast/gm', reprocess=True)

or

$ arcana derive column 'myuni-xnat//myproject:training' freesurfer/recon-all  --reprocess

To ignore differences between pipeline configurations you can use the Dataset.ignore() method

dataset.ignore_diff('freesurfer_pipeline', ('freesurfer_task', 'num_iterations', 3))

or via the CLI

$ arcana derive ignore-diff 'myuni-xnat//myproject:training' freesurfer --param freesurfer_task num_iterations 3