Designing Analyses#

An great way to contribute to the development of Arcana is to implement new analysis classes or extend existing ones. The architecture of analysis classes is intended to facilitate the implementation of generic analysis suites for wide-spread use, which can then be tailored to meet the specific requirements of particular research studies via class inheritance (see Inheritance).

This page builds upon the description of analysis-class design introduced in Analysis classes. The basic building blocks of the design are described in detail in the Basics section, while more advanced concepts involved in extending existing classes are covered in the Advanced section.

Basics#

There are two main components of analysis classes, column specifications (DataColumn and parameter specification), which define the data to be provided to and derived by the class, and pipeline builder methods (Pipeline builders), which construct the Pydra workflows used to generate the derivatives. Parameter attributes (DataColumn and parameter specification) expose key parameters used by the workflow construction and output methods (Output methods) provide a convenient way to include the final steps analyses (e.g. plotting figures) all in the one place.

DataColumn and parameter specification#

While columns in an Analysis class can be specified using the dataclass-like syntax of column_name: Format, in most cases you will want to explicitly use the arcana.core.mark.column function to include some basic metadata for the column, such as a description of what the column represents in the desc keyword arg.

@analysis(ExampleDataSpace)
class ExampleAnalysis():

    recorded_datafile: Zip  = column(
        desc=("Datafile acquired from an example scanner. Contains key "
              "data to analyse"))
    recorded_metadata: Json = column(
        desc="Metadata accompanying the recorded data")
    preprocessed: Zip = column(
        desc="Preprocessed data file, corrected for distortions")
    derived_image: Png = column(
        desc="Map of the processed data")
    summary_metric: float = column(
        desc="A summary metric extracted from the derived image",
        row_frequency='dataset')

The column spec descriptions will be shown to the user when they use the Dataset.menu() or arcana menu CLI command. The row row_frequency of the column (e.g. per-session, per-subject, per-group, once per-dataset etc…, see Spaces and Frames: Rows and Columns) is specified by the row_frequency keyword argument. The row_frequency should be a member of the data space(see Spaces) provided to the arcana.core.mark.analysis() class decorator.

Not all columns specifications are created equal. Some refer to key inputs (e.g. the primary MRI image) or outputs (e.g. lesion load) and others just need to be sanity checked or useful in debugging. Therefore, to avoid the menu being cluttered up with non-salient specifications, the “salience” of the columns can be specified in addition to a description via the salience keyword arg. Values for salience must be drawn from the arcana.core.enum.ColumnSalience enum:

  • primary - Primary input data, e.g. raw data or data reconstructed on the scanner

  • output - Results that would typically be used as main outputs in publications

  • supplementary - Derivatives that would typically only be provided in supplementary material

  • qa - Derivatives that would typically be only kept for quality assurance of analysis workflows

  • debug - Derivatives that would typically only need to be checked when debugging analysis workflows

  • temp - Data only temporarily stored to pass between pipelines

Descriptions and saliences can also be set for parameter attributes, where the saliences are drawn from arcana.core.enum.ParameterSalience enum.

  • debug - typically only needs to be altered for debugging

  • recommended - recommended to keep default value

  • dependent - can be dependent on the context of the analysis but default should work for most cases

  • check - the default should be at checked for validity for particular use case

  • arbitrary - a default is provided, but it is not clear which value is best

  • required - no sensible default value, the parameter should be set manually

With the exception of required parameters, default values should be provided to the parameter specification via the default keyword. The default value should match the type of the parameter specification. Parameters can be any of the following types:

  • float

  • int

  • bool

  • str

  • list[float]

  • list[int]

  • list[bool]

  • list[str]

See comprehensive_example L4-29 for examples of these attributes of column and parameter specifications.

Pipeline builders#

“Pipeline builders” are called by Arcana to construct the Pydra workflows that derive data columns. The arcana.core.mark.pipeline() decorator is used to mark a method as a pipeline builder and specify the columns the workflow it builds derives.

The first argument to a builder method is the Pipeline object that is being constructed. The initialisation of the pipeline and rows to iteract with the data store are handled by Arcana, the builder method just needs to add the rows that actually perform the analysis. Pipeline rows are added using Pydra’s workflow syntax. (the only exception being that the newly added row is returned from Pipeline.add() for convenience).

The remaining arguments to the builder should be named after any columns and parameters that are required for the pipeline rows to be added. Arcana will automagically provide LazyField pointers to the arguments named after column specs, and values to the arguments named after parameter specs. For file formats with side cars, lazy-field pointers to side car files can be accessed as attributes of the primary LazyField, e.g.

from arcana.data.spaces.medimage import Clinical
from arcana.tasks.misc import ExtractFromJson
from arcana.data.salience import ColumnSalience as ds


@analysis(Clinical)
class AnotherExampleAnalysis():

    primary_image: Dicom = column(
        desc="The primary image to be analysed",
        salience=ds.primary)
    repetition_time: float = column(
        "The repetition time of the MR sequence used",
        salience=ds.debug)
    slice_timing_interval: float = column(
        "The time interval between slices",
        salience=ds.debug)

    @pipeline(repetition_time, slice_timing_interval)
    def preprocess_pipeline(self, wf, primary_image: NiftiGzX):

        wf.add(
            ExtractFromJson(
                name='extract_tr',
                # JSON side car is accessed by an attribute of the primary image
                in_file=primary_image.json,
                field='tr'))

        wf.add(
            ExtractFromJson(
                name='extract_st',
                # JSON side car is accessed by an attribute of the primary image
                in_file=primary_image.json,
                x=wf.extract_tr.lzout.out,
                field='SliceTiming'))

        return wf.extract_tr.lzout.out, wf.extract_st.lzout.out

The “row_frequency” (see Spaces and Frames: Rows and Columns) of a pipeline, (whether it is run per-session, per-subject, per-timepoint, etc… for example) is determined by the row_frequency of its output columns. Therefore, all columns derived from a single pipeline need to have the same row row_frequency. If the row_frequency of an input column provided to the builder method is higher than that of the pipeline then the lazy field provided will point to a list (sorted by the axis IDs they are combined over) rather than a single value. If the row_frequency of an input is lower than that of the pipeline then that value is simply repeated. For example, an analysis of flood levels using datasets in the Weather data space (see weather_example) to calculate the average rainfall per station, could look like

import numpy
import pydra.mark
from arcana.data.spaces.weather import Weather  # See example in Data spaces section


# A basic Pydra function task used in the analysis
@pydra.mark.task
def average(measurements: list[float]) -> float:
    "A simple function task to convert daily to yearly figures"
    return numpy.average(measurements)

# Another basic Pydra function task used in the analysis
@pydra.mark.task
def delta(measurements: list[float], average: float) -> list[float]:
    "A simple function task to convert daily to yearly figures"
    return list(numpy.asarray(measurements) - average)


@analysis(Weather)
class FloodAnalysis():

    record_time: datetime = column(
        desc="The time/date the recording was taken"
        row_frequency='recording')
    rain: float = column(
        desc="Daily rain measurements at different locations",
        row_frequency='recording')
    avg_rainfall: float  = column(
        desc="Average rainfall for a given location",
        row_frequency='station')
    delta_rain: float = column(
        desc="Deviation from average rainfall for a given month"
        row_frequency='recording')

    # Pipeline is of 'per-station' row_frequency due to row_frequency of output column
    # 'avg_rainfall'
    @pipeline(avg_rainfall)
    # 'rain' arg is a lazy-field to a list[float] over all dates since the
    # row_frequency of the 'rain' column ('recording') is higher than
    # the pipeline's row_frequency ('station')
    def average_rainfall_pipeline(self, wf: pydra.Workflow, rain: list[float]):

        wf.add(
            average(
                name='average_rain',
                measurements=rainfall))

        return wf.average_rain.lzout.out

    # Pipeline is of 'per-recording' row_frequency due to delta_rainfall
    # output column
    @pipeline(delta_rain)
    def delta_pipeline(self, wf: pydra.Workflow, rain: float,  avg_rainfall: float):

        pipeline.add(
            delta(
                name="delta_rain",
                measurements=rain,
                average=avg_rainfall))

        return wf.delta_rain.lzout.out

Output methods#

“Output methods” take derivatives and produce the visualisations or tables to be included in publications or reports. Since these methods typically rely on graphical libraries, they are executed on the local workstation/row and therefore should not contain any heavy computations. The feature that differentiates them from a regular method is that they are accessible from the CLI

$ arcana derive output 'file///data/my-dataset' connectivity_matrix_plot \
  --save '~/Documents/papers/my-connectivity-paper/' \
  --option figsize 10,10

The arcana.core.mark.output decorator is used to specify an output method and the outputs that are generated by it. Output methods should take the directory to save the outputs in as its first argument and use keyword arguments for “options” of the method following that. The save directory should have a default of None, and display the results in the case that it isn’t provided.

import matplotlib.pyplot as plt
from arcana.data.spaces.medimage import Clinical

@analysis(Clinical)
class ExampleAnalysis2():

    ...

    @output
    def connectivity_matrix_plot(self, save_dir: str=None, figsize: tuple[float]=(5, 5)):
        """Plots the connectivity matrix as an image
        """
        plt.figure(figsize=figsize)
        plt.imshow(self['connectivity_matrix'].data)
        if save_dir:
            plt.savefig(save_dir)
        else:
            plt.show()

Advanced#

In every software framework, there are always corner cases that are more complicated than the basic logic can handle. In designing informatics frameworks, these challenges often arise when attempting to write portable workflows, due to slight differences in the data and and end goals of the application. This is particularly true in academia, where novelty is a key criteria. To address these requirements, this section introduces some more complex concepts, which can be used to customise and combine analysis methods into powerful new classes: class inheritance (Inheritance), conditional pipelines (Conditionals and switches), quality-control checks (Quality-control checks) and sub-analyses (Sub-analyses).

Inheritance#

Given a toy example analysis class that has two text-file source columns, file1 and file2. The concat_pipeline builds a workflow that generates data for the sink column concatenated and can be modified by the duplicates parameter.

@analysis(Samples)
class Concat:

    # Source columns
    file1: Text = column("an arbitrary text file")
    file2: Text = column("another arbitrary text file")

    # Sink columns
    concatenated: Text = column("the output of concatenating file1 and file2")

    # Parameters
    duplicates: int = parameter(
        "the number of times to duplicate the concatenation", default=1
    )

    @pipeline(concatenated)
    def concat_pipeline(self, wf, file1: Text, file2: Text, duplicates: int):
        """Concatenates the contents of `file1` with the contents of `file2` to produce
        a new text file. The concatenation can be repeated multiple times within
        the produced text file by specifying the number of repeats to the `duplicates`
        parameter
        """

        wf.add(
            concatenate(
                name="concat", in_file1=file1, in_file2=file2, duplicates=duplicates
            )
        )

        return wf.concat.lzout.out  # Output Pydra LazyField for concatenated file

The Concat class can be subclassed to create the ExtendedConcat class, which adds one additional source column file3 and another sink column doubly_concatenated. Data for doubly_concatenated is generated by the doubly_concat_pipeline.

@analysis(Samples)
class ExtendedConcat(Concat):

    # Source columns
    file3: Text = column("Another file to concatenate")

    # Sink columns
    concatenated = inherit()
    doubly_concatenated: Text = column("The doubly concatenated file")

    # Parameters
    duplicates = inherit(default=3)

    @pipeline(doubly_concatenated)
    def doubly_concat_pipeline(
        self, wf, concatenated: Text, file3: Text, duplicates: int
    ):

        wf.add(
            concatenate(
                name="concat",
                in_file1=concatenated,
                in_file2=file3,
                duplicates=duplicates,
            )
        )

        return wf.concat.lzout.out

Because the concatenated column and duplicates parameter are used in the doubly_concat_pipeline, they are explicitly referenced in the subclass using the inherit_from function. Note, that this is enforced due a design decision to make it clear where columns and parameters are defined when reading the code. Columns that aren’t explicitly referenced in the class (e.g. file1 and file2) can be omitted from the subclass definition (but will still be present in the subclass). When explicitly inheriting columns and parameters it is possible to override their attributes, such as the default value for a given parameter (see duplicates in above example).

Conditionals and switches#

There are cases where different analysis methods need to be applied depending on the requirements of a particular study or to deal with idiosyncrasies of a particular dataset. There are two mechanisms for handling such cases in Arcana: “condition expressions” and “switches”.

Both condition expressions and switches are referenced within the @pipeline decorator. When a condition expression or switch is set on a pipeline builder, that pipeline will be used to generate data for a sink column only when certain criteria are met. If the criteria aren’t met, then either the default pipeline builder (one without either a switch or condition expression) will be used if it is present or an “not produced” error will be raised instead.

The difference between a condition expression and a switch is that a condition expression is true or false over a whole dataset given a specific parameterisation, whereas a switch can be true or false for different rows of the dataset depending on the nature of the input data.

Condition expressions are specified as using the functions value_of(parameter) and is_provided(column) as placeholders for parameter values or whether a column specification in the analysis is linked to a column in the dataset or not. In the following example, a condition is used to enable the user whether concatenated should be generated by the concat_pipeline method (default) or the reverse_concat_pipeline by setting the value of the order parameter.

@analysis(Samples)
class OverridenConcat(Concat):

    # Source columns
    file1: Zip = inherit()
    file2: Text = inherit()

    # Sinks columns
    concatenated: Text = inherit()

    # Parameters
    duplicates = inherit(default=2)  # default value changed because we can
    order: str = parameter(
        "perform the concatenation in reverse order, i.e. file2 and then file1",
        choices=["forward", "reversed"],
        default="forward",
    )

    @pipeline(
        concatenated,
        condition=(value_of(order) == "reversed"),
    )
    def reverse_concat_pipeline(
        self, wf, file1: Text, file2: Text, duplicates: int
    ):

        wf.add(
            concatenate_reverse(
                name="concat", in_file1=file1, in_file2=file2, duplicates=duplicates
            )
        )

        return wf.concat.lzout.out

Switches are defined in methods of the analysis class using the @switch decorator and are similar pipeline builders in that they add nodes to a Pydra workflow passed to the first argument. The sole output field of a switch must contain either be a boolean or string, which specifies which branch of processing is to be performed. The switch method is then passed to the @pipeline decorator via the switch keyword. If the switch returns a string then the value passed to the switch keyword must be tuple, with the first element the switch method and the second the value of the string that will activate that branch of the pipeline to be run.

In the following example, the contents of the files in the concatenated column are multiplied the value passed to the arbitrary multiplier parameter if the contents of the input files file1 and file2 are numeric for the corresponding row as determined by the inputs_are_numeric switch.

@analysis(Samples)
class ConcatWithSwitch(Concat):

    # Source columns
    file1: Zip = inherit()
    file2: Text = inherit()

    # Sink columns
    concatenated: Text = inherit()
    multiplied: Text = column("contents of the concatenated files are multiplied")

    # Parameters
    multiplier: int = parameter(
        "the multiplier used to apply", salience=ps.arbitrary
    )

    @switch
    def inputs_are_numeric(self, wf, file1: Text, file2: Text):

        wf.add(contents_are_numeric(in_file=file1, name="check_file1"))

        wf.add(contents_are_numeric(in_file=file2, name="check_file2"))

        @pydra.mark.task
        def boolean_and(val1, val2) -> bool:
            return val1 and val2

        wf.add(
            boolean_and(
                val1=wf.check_file1.out, val2=wf.check_file2.out, name="bool_and"
            )
        )

        return wf.bool_and.out

    @pipeline(multiplied, switch=inputs_are_numeric)
    def multiply_pipeline(self, wf, concatenated, multiplier):

        wf.add(
            multiply_contents(
                name="concat", in_file=concatenated, multiplier=multiplier
            )
        )

        return wf.concat.lzout.out

Quality-control checks#

When running complex analyses it is important to inspect generated derivatives to make sure the workflows completed properly. In Arcana, it is possible to semi-automate this process by adding quality-control “checks” to an analysis class.

In the following example the number of lines produced by the concatation step is checked to see if it matches the number expected given the value of the duplicates parameter.

@analysis(Samples)
class ConcatWithCheck(Concat):

    # Sink columns
    concatenated = inherit()

    # Parameters
    duplicates = inherit()

    @check(concatenated, salience=CheckSalience.recommended)
    def check_file3(self, wf, concatenated: Text, duplicates: int):
        """Checks the number of lines in the concatenated file to see whether they
        match what is expected for the number of duplicates specified"""
        @pydra.mark.task
        def num_lines_equals(in_file, num_lines):
            with open(in_file) as f:
                contents = f.read()
            if len(contents.splitlines()) == num_lines:
                status = CheckStatus.probable_pass
            else:
                status = CheckStatus.failed
            return status

        wf.add(
            num_lines_equals(
                in_file=concatenated, num_lines=2 * duplicates, name="num_lines_check"
            )
        )

        return wf.num_lines_check.out

Sub-analyses#

When dealing with separate data streams that can be largely analysed in parallel (e.g. multiple MRI contrasts), it can be convenient to combine multiple analyses tailored to each stream into a single conglomerate analysis. This pattern can implemented in Arcana using subanalysis attributes.

The type annotation of the subanalysis attribute specifies the analysis to be performed, and the keyword arguments of specify mappings from the column specs and parameters in the global namespace of the outer class to the namespace of the subanalysis. With these mappings, source columns linked to specs in the global namespace can be passed to the subanalysis, and sink columns generated by pipelines in the global namespace can be linked to any column within the subanalysis.

The mapped_from function is used to map columns and parameters from subanalyses into the global namespace, and takes two arguments, the name of the subanalysis and the name of the column/parameter to map. By mapping a column/parameter into the global namespace from one subanalysis and then mapping it back into another subanalysis the designer can be stitched together. For example, the cortical surface reconstruction column from a subanalysis for analysing anatomical MRI images could be mapped to a source column in another subanalysis for analysing white matter tracts diffusion-weighted contrast MRI images in order to constrain the potential endpoints of the tracts.

In the following example, two of the classes defined above, ExtendedConcat and ConcatWithSwitch are stitched together, so that the multiplied output column of ConcatWithSwitch is passed to the file3 input column of ExtendedConcat. The duplicates parameter in each subanalysis are linked together so they are always consistent by mapping it from the ExtendedConcat subanalysis to the global namespace and then back into the ConcatWithSwitch.

@analysis(Samples)
class _ConcatWithSubanalyses:

    # Source columns mapped from "sub1" subanalysis so they can be shared across
    # both sub-analyses. Note that they could just as easily have been mapped from
    # "sub1" or recreated from scratch and mapped into both
    file1 = map_from("sub1", "file1")
    file2 = map_from("sub1", "file2")

    # Sink columns generated within the subanalyses mapped back out to the global
    # namespace so they can be mapped into the other subanalysis
    concat_and_multiplied = map_from("sub2", "multiplied")

    # Link the duplicates parameter across both subanalyses so it is always the same
    # by mapping a global parameter into both subanalyses
    common_duplicates = map_from(
        "sub1", "duplicates", default=5, salience=ps.check
    )

    # Additional parameters such as "multiplier" can be accessed within the subanalysis
    # class after the analysis class has been initialised using the 'sub2.multiplier'

    sub1: ExtendedConcat = subanalysis(
        "sub-analysis to add the 'doubly_concat' pipeline",
        # Feed the multiplied sink column from sub2 into the source column file3 of
        # the extended class
        file3=concat_and_multiplied,
    )
    sub2: ConcatWithSwitch = subanalysis(
        "sub-analysis to add the 'multiply' pipeline",
        file1=file1,
        file2=file2,
        # Use the concatenated generated by sub1 to avoid running it twice
        duplicates=common_duplicates,
    )