# basic dependencies
import os
import sys
# main dependencies: numpy, nibabel
import numpy as np
import nibabel as nb
# nighresjava and nighres functions
import nighresjava
from ..io import load_volume, save_volume
from ..utils import _output_dir_4saving, _fname_4saving, \
_check_topology_lut_dir
# convenience labels
X=0
Y=1
Z=2
T=3
[docs]def embedded_antsreg(source_image, target_image,
run_rigid=False,
rigid_iterations=1000,
run_affine=False,
affine_iterations=1000,
run_syn=True,
coarse_iterations=40,
medium_iterations=50, fine_iterations=40,
cost_function='MutualInformation',
interpolation='NearestNeighbor',
regularization='Medium',
convergence=1e-6,
ignore_affine=False, ignore_header=False,
save_data=False, overwrite=False, output_dir=None,
file_name=None):
""" Embedded ANTS Registration
Runs the rigid and/or Symmetric Normalization (SyN) algorithm of ANTs and
formats the output deformations into voxel coordinate mappings as used in
CBSTools registration and transformation routines.
Parameters
----------
source_image: niimg
Image to register
target_image: niimg
Reference image to match
run_rigid: bool
Whether or not to run a rigid registration first (default is False)
rigid_iterations: float
Number of iterations in the rigid step (default is 1000)
run_affine: bool
Whether or not to run a affine registration first (default is False)
affine_iterations: float
Number of iterations in the affine step (default is 1000)
run_syn: bool
Whether or not to run a SyN registration (default is True)
coarse_iterations: float
Number of iterations at the coarse level (default is 40)
medium_iterations: float
Number of iterations at the medium level (default is 50)
fine_iterations: float
Number of iterations at the fine level (default is 40)
cost_function: {'CrossCorrelation', 'MutualInformation'}
Cost function for the registration (default is 'MutualInformation')
interpolation: {'NearestNeighbor', 'Linear'}
Interpolation for the registration result (default is 'NearestNeighbor')
regularization: {'Low', 'Medium', 'High'}
Regularization preset for the SyN deformation (default is 'Medium')
convergence: float
Threshold for convergence, can make the algorithm very slow
(default is convergence)
ignore_affine: bool
Ignore the affine matrix information extracted from the image header
(default is False)
ignore_header: bool
Ignore the orientation information and affine matrix information
extracted from the image header (default is False)
save_data: bool
Save output data to file (default is False)
overwrite: bool
Overwrite existing results (default is False)
output_dir: str, optional
Path to desired output directory, will be created if it doesn't exist
file_name: str, optional
Desired base name for output files with file extension
(suffixes will be added)
Returns
----------
dict
Dictionary collecting outputs under the following keys
(suffix of output files in brackets)
* transformed_source (niimg): Deformed source image (_ants-def)
* mapping (niimg): Coordinate mapping from source to target (_ants-map)
* inverse (niimg): Inverse coordinate mapping from target to source
(_ants-invmap)
Notes
----------
Port of the CBSTools Java module by Pierre-Louis Bazin. The main algorithm
is part of the ANTs software by Brian Avants and colleagues [1]_. The
interfacing with ANTs is performed through Nipype [2]_. Parameters have
been set to values commonly found in neuroimaging scripts online, but not
necessarily optimal.
References
----------
.. [1] Avants et al (2008), Symmetric diffeomorphic
image registration with cross-correlation: evaluating automated labeling
of elderly and neurodegenerative brain, Med Image Anal. 12(1):26-41
.. [2] Gorgolewski et al (2011) Nipype: a flexible, lightweight and
extensible neuroimaging data processing framework in python. Front
Neuroinform 5. doi:10.3389/fninf.2011.00013
"""
print('\nEmbedded ANTs Registration')
# for external tools: nipype
try:
from nipype.interfaces.ants import Registration
from nipype.interfaces.ants import ApplyTransforms
except ImportError:
print('Error: Nipype and/or ANTS could not be imported, they are required'
+' in order to run this module. \n (aborting)')
return None
# make sure that saving related parameters are correct
output_dir = _output_dir_4saving(output_dir, source_image) # needed for intermediate results
if save_data:
transformed_source_file = os.path.join(output_dir,
_fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='ants-def'))
mapping_file = os.path.join(output_dir,
_fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='ants-map'))
inverse_mapping_file = os.path.join(output_dir,
_fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='ants-invmap'))
if overwrite is False \
and os.path.isfile(transformed_source_file) \
and os.path.isfile(mapping_file) \
and os.path.isfile(inverse_mapping_file) :
print("skip computation (use existing results)")
output = {'transformed_source': load_volume(transformed_source_file),
'mapping': load_volume(mapping_file),
'inverse': load_volume(inverse_mapping_file)}
return output
# load and get dimensions and resolution from input images
source = load_volume(source_image)
src_affine = source.affine
src_header = source.header
nsx = source.header.get_data_shape()[X]
nsy = source.header.get_data_shape()[Y]
nsz = source.header.get_data_shape()[Z]
rsx = source.header.get_zooms()[X]
rsy = source.header.get_zooms()[Y]
rsz = source.header.get_zooms()[Z]
target = load_volume(target_image)
trg_affine = target.affine
trg_header = target.header
ntx = target.header.get_data_shape()[X]
nty = target.header.get_data_shape()[Y]
ntz = target.header.get_data_shape()[Z]
rtx = target.header.get_zooms()[X]
rty = target.header.get_zooms()[Y]
rtz = target.header.get_zooms()[Z]
# in case the affine transformations are not to be trusted: make them equal
if ignore_affine or ignore_header:
# create generic affine aligned with the orientation for the source
mx = np.argmax(np.abs(src_affine[0][0:3]))
my = np.argmax(np.abs(src_affine[1][0:3]))
mz = np.argmax(np.abs(src_affine[2][0:3]))
new_affine = np.zeros((4,4))
if ignore_header:
new_affine[0][0] = rsx
new_affine[1][1] = rsy
new_affine[2][2] = rsz
new_affine[0][3] = -rsx*nsx/2.0
new_affine[1][3] = -rsy*nsy/2.0
new_affine[2][3] = -rsz*nsz/2.0
else:
new_affine[0][mx] = rsx*np.sign(src_affine[0][mx])
new_affine[1][my] = rsy*np.sign(src_affine[1][my])
new_affine[2][mz] = rsz*np.sign(src_affine[2][mz])
if (np.sign(src_affine[0][mx])<0):
new_affine[0][3] = rsx*nsx/2.0
else:
new_affine[0][3] = -rsx*nsx/2.0
if (np.sign(src_affine[1][my])<0):
new_affine[1][3] = rsy*nsy/2.0
else:
new_affine[1][3] = -rsy*nsy/2.0
if (np.sign(src_affine[2][mz])<0):
new_affine[2][3] = rsz*nsz/2.0
else:
new_affine[2][3] = -rsz*nsz/2.0
#if (np.sign(src_affine[0][mx])<0): new_affine[mx][3] = rsx*nsx
#if (np.sign(src_affine[1][my])<0): new_affine[my][3] = rsy*nsy
#if (np.sign(src_affine[2][mz])<0): new_affine[mz][3] = rsz*nsz
#new_affine[0][3] = nsx/2.0
#new_affine[1][3] = nsy/2.0
#new_affine[2][3] = nsz/2.0
new_affine[3][3] = 1.0
src_img = nb.Nifti1Image(source.get_data(), new_affine, source.header)
src_img.update_header()
src_img_file = os.path.join(output_dir, _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_srcimg'))
save_volume(src_img_file, src_img)
source = load_volume(src_img_file)
src_affine = source.affine
src_header = source.header
# create generic affine aligned with the orientation for the target
mx = np.argmax(np.abs(trg_affine[0][0:3]))
my = np.argmax(np.abs(trg_affine[1][0:3]))
mz = np.argmax(np.abs(trg_affine[2][0:3]))
new_affine = np.zeros((4,4))
if ignore_header:
new_affine[0][0] = rtx
new_affine[1][1] = rty
new_affine[2][2] = rtz
new_affine[0][3] = -rtx*ntx/2.0
new_affine[1][3] = -rty*nty/2.0
new_affine[2][3] = -rtz*ntz/2.0
else:
new_affine[0][mx] = rtx*np.sign(trg_affine[0][mx])
new_affine[1][my] = rty*np.sign(trg_affine[1][my])
new_affine[2][mz] = rtz*np.sign(trg_affine[2][mz])
if (np.sign(trg_affine[0][mx])<0):
new_affine[0][3] = rtx*ntx/2.0
else:
new_affine[0][3] = -rtx*ntx/2.0
if (np.sign(trg_affine[1][my])<0):
new_affine[1][3] = rty*nty/2.0
else:
new_affine[1][3] = -rty*nty/2.0
if (np.sign(trg_affine[2][mz])<0):
new_affine[2][3] = rtz*ntz/2.0
else:
new_affine[2][3] = -rtz*ntz/2.0
#if (np.sign(trg_affine[0][mx])<0): new_affine[mx][3] = rtx*ntx
#if (np.sign(trg_affine[1][my])<0): new_affine[my][3] = rty*nty
#if (np.sign(trg_affine[2][mz])<0): new_affine[mz][3] = rtz*ntz
#new_affine[0][3] = ntx/2.0
#new_affine[1][3] = nty/2.0
#new_affine[2][3] = ntz/2.0
new_affine[3][3] = 1.0
trg_img = nb.Nifti1Image(target.get_data(), new_affine, target.header)
trg_img.update_header()
trg_img_file = os.path.join(output_dir, _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_trgimg'))
save_volume(trg_img_file, trg_img)
target = load_volume(trg_img_file)
trg_affine = target.affine
trg_header = target.header
# build coordinate mapping matrices and save them to disk
src_coord = np.zeros((nsx,nsy,nsz,3))
trg_coord = np.zeros((ntx,nty,ntz,3))
for x in range(nsx):
for y in range(nsy):
for z in range(nsz):
src_coord[x,y,z,X] = x
src_coord[x,y,z,Y] = y
src_coord[x,y,z,Z] = z
src_map = nb.Nifti1Image(src_coord, source.affine, source.header)
src_map_file = os.path.join(output_dir, _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_srccoord'))
save_volume(src_map_file, src_map)
for x in range(ntx):
for y in range(nty):
for z in range(ntz):
trg_coord[x,y,z,X] = x
trg_coord[x,y,z,Y] = y
trg_coord[x,y,z,Z] = z
trg_map = nb.Nifti1Image(trg_coord, target.affine, target.header)
trg_map_file = os.path.join(output_dir, _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_trgcoord'))
save_volume(trg_map_file, trg_map)
# run the main ANTS software
reg = Registration()
reg.inputs.dimension = 3
# add a prefix to avoid multiple names?
prefix = _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_syn')
prefix = os.path.basename(prefix)
prefix = prefix.split(".")[0]
reg.inputs.output_transform_prefix = prefix
reg.inputs.fixed_image = [target.get_filename()]
reg.inputs.moving_image = [source.get_filename()]
print("registering "+source.get_filename()+"\n to "+target.get_filename())
if run_syn is True:
if regularization is 'Low': syn_param = (0.2, 1.0, 0.0)
elif regularization is 'Medium': syn_param = (0.2, 3.0, 0.0)
elif regularization is 'High': syn_param - (0.2, 4.0, 3.0)
else: syn_param = (0.2, 3.0, 0.0)
if run_rigid is True and run_affine is True and run_syn is True:
reg.inputs.transforms = ['Rigid','Affine','SyN']
reg.inputs.transform_parameters = [(0.1,), (0.1,), syn_param]
reg.inputs.number_of_iterations = [[rigid_iterations, rigid_iterations,
rigid_iterations],
[affine_iterations, affine_iterations,
affine_iterations],
[coarse_iterations, coarse_iterations,
medium_iterations, fine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC', 'CC', 'CC']
reg.inputs.metric_weight = [1.0, 1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [5, 5, 5]
else :
reg.inputs.metric = ['MI', 'MI', 'MI']
reg.inputs.metric_weight = [1.0, 1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [32, 32, 32]
reg.inputs.shrink_factors = [[4, 2, 1]] + [[4, 2, 1]] + [[8, 4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]] + [[3, 2, 1]] + [[2, 1, 0.5, 0]]
reg.inputs.sampling_strategy = ['Random'] + ['Random'] + ['Random']
reg.inputs.sampling_percentage = [0.3] + [0.3] + [0.3]
reg.inputs.convergence_threshold = [convergence] + [convergence] + [convergence]
reg.inputs.convergence_window_size = [10] + [10] + [5]
reg.inputs.use_histogram_matching = [False] + [False] + [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is True and run_affine is False and run_syn is True:
reg.inputs.transforms = ['Rigid','SyN']
reg.inputs.transform_parameters = [(0.1,), syn_param]
reg.inputs.number_of_iterations = [[rigid_iterations, rigid_iterations,
rigid_iterations],
[coarse_iterations, coarse_iterations,
medium_iterations, fine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC', 'CC']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [5, 5]
else :
reg.inputs.metric = ['MI', 'MI']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [32, 32]
reg.inputs.shrink_factors = [[4, 2, 1]] + [[8, 4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]] + [[2, 1, 0.5, 0]]
reg.inputs.sampling_strategy = ['Random'] + ['Random']
reg.inputs.sampling_percentage = [0.3] + [0.3]
reg.inputs.convergence_threshold = [convergence] + [convergence]
reg.inputs.convergence_window_size = [10] + [5]
reg.inputs.use_histogram_matching = [False] + [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is False and run_affine is True and run_syn is True:
reg.inputs.transforms = ['Affine','SyN']
reg.inputs.transform_parameters = [(0.1,), syn_param]
reg.inputs.number_of_iterations = [[affine_iterations, affine_iterations,
affine_iterations],
[coarse_iterations, coarse_iterations,
medium_iterations, fine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC', 'CC']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [5, 5]
else :
reg.inputs.metric = ['MI', 'MI']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [64, 64]
reg.inputs.shrink_factors = [[4, 2, 1]] + [[8, 4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]] + [[2, 1, 0.5, 0]]
reg.inputs.sampling_strategy = ['Random'] + ['Random']
reg.inputs.sampling_percentage = [0.3] + [0.3]
reg.inputs.convergence_threshold = [convergence] + [convergence]
reg.inputs.convergence_window_size = [10] + [5]
reg.inputs.use_histogram_matching = [False] + [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
if run_rigid is True and run_affine is True and run_syn is False:
reg.inputs.transforms = ['Rigid','Affine']
reg.inputs.transform_parameters = [(0.1,), (0.1,)]
reg.inputs.number_of_iterations = [[rigid_iterations, rigid_iterations,
rigid_iterations],
[affine_iterations, affine_iterations,
affine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC', 'CC']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [5, 5]
else :
reg.inputs.metric = ['MI', 'MI']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [32, 32]
reg.inputs.shrink_factors = [[4, 2, 1]] + [[4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]] + [[3, 2, 1]]
reg.inputs.sampling_strategy = ['Random'] + ['Random']
reg.inputs.sampling_percentage = [0.3] + [0.3]
reg.inputs.convergence_threshold = [convergence] + [convergence]
reg.inputs.convergence_window_size = [10] + [10]
reg.inputs.use_histogram_matching = [False] + [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is True and run_affine is False and run_syn is False:
reg.inputs.transforms = ['Rigid']
reg.inputs.transform_parameters = [(0.1,)]
reg.inputs.number_of_iterations = [[rigid_iterations, rigid_iterations,
rigid_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [5]
else :
reg.inputs.metric = ['MI']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [32]
reg.inputs.shrink_factors = [[4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]]
reg.inputs.sampling_strategy = ['Random']
reg.inputs.sampling_percentage = [0.3]
reg.inputs.convergence_threshold = [convergence]
reg.inputs.convergence_window_size = [10]
reg.inputs.use_histogram_matching = [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is False and run_affine is True and run_syn is False:
reg.inputs.transforms = ['Affine']
reg.inputs.transform_parameters = [(0.1,)]
reg.inputs.number_of_iterations = [[affine_iterations, affine_iterations,
affine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [5]
else :
reg.inputs.metric = ['MI']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [32]
reg.inputs.shrink_factors = [[4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]]
reg.inputs.sampling_strategy = ['Random']
reg.inputs.sampling_percentage = [0.3]
reg.inputs.convergence_threshold = [convergence]
reg.inputs.convergence_window_size = [10]
reg.inputs.use_histogram_matching = [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is False and run_affine is False and run_syn is True:
reg.inputs.transforms = ['SyN']
reg.inputs.transform_parameters = [syn_param]
reg.inputs.number_of_iterations = [[coarse_iterations, coarse_iterations,
medium_iterations, fine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [5]
else :
reg.inputs.metric = ['MI']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [32]
reg.inputs.shrink_factors = [[8, 4, 2, 1]]
reg.inputs.smoothing_sigmas = [[2, 1, 0.5, 0]]
reg.inputs.sampling_strategy = ['Random']
reg.inputs.sampling_percentage = [0.3]
reg.inputs.convergence_threshold = [convergence]
reg.inputs.convergence_window_size = [10]
reg.inputs.use_histogram_matching = [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is False and run_affine is False and run_syn is False:
reg.inputs.transforms = ['Rigid']
reg.inputs.transform_parameters = [(0.1,)]
reg.inputs.number_of_iterations = [[0]]
reg.inputs.metric = ['CC']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [5]
reg.inputs.shrink_factors = [[1]]
reg.inputs.smoothing_sigmas = [[1]]
reg.inputs.args = '--float 0'
print(reg.cmdline)
result = reg.run()
# Transforms the moving image
at = ApplyTransforms()
at.inputs.dimension = 3
at.inputs.input_image = source.get_filename()
at.inputs.reference_image = target.get_filename()
at.inputs.interpolation = interpolation
at.inputs.transforms = result.outputs.forward_transforms
at.inputs.invert_transform_flags = result.outputs.forward_invert_flags
transformed = at.run()
# Create coordinate mappings
src_at = ApplyTransforms()
src_at.inputs.dimension = 3
src_at.inputs.input_image_type = 3
src_at.inputs.input_image = src_map.get_filename()
src_at.inputs.reference_image = target.get_filename()
src_at.inputs.interpolation = 'Linear'
src_at.inputs.transforms = result.outputs.forward_transforms
src_at.inputs.invert_transform_flags = result.outputs.forward_invert_flags
mapping = src_at.run()
trg_at = ApplyTransforms()
trg_at.inputs.dimension = 3
trg_at.inputs.input_image_type = 3
trg_at.inputs.input_image = trg_map.get_filename()
trg_at.inputs.reference_image = source.get_filename()
trg_at.inputs.interpolation = 'Linear'
trg_at.inputs.transforms = result.outputs.reverse_transforms
trg_at.inputs.invert_transform_flags = result.outputs.reverse_invert_flags
inverse = trg_at.run()
# pad coordinate mapping outside the image? hopefully not needed...
# collect outputs and potentially save
transformed_img = nb.Nifti1Image(nb.load(transformed.outputs.output_image).get_data(),
target.affine, target.header)
mapping_img = nb.Nifti1Image(nb.load(mapping.outputs.output_image).get_data(),
target.affine, target.header)
inverse_img = nb.Nifti1Image(nb.load(inverse.outputs.output_image).get_data(),
source.affine, source.header)
outputs = {'transformed_source': transformed_img, 'mapping': mapping_img,
'inverse': inverse_img}
# clean-up intermediate files
os.remove(src_map_file)
os.remove(trg_map_file)
if ignore_affine or ignore_header:
os.remove(src_img_file)
os.remove(trg_img_file)
for name in result.outputs.forward_transforms:
if os.path.exists(name): os.remove(name)
for name in result.outputs.reverse_transforms:
if os.path.exists(name): os.remove(name)
os.remove(transformed.outputs.output_image)
os.remove(mapping.outputs.output_image)
os.remove(inverse.outputs.output_image)
if save_data:
save_volume(transformed_source_file, transformed_img)
save_volume(mapping_file, mapping_img)
save_volume(inverse_mapping_file, inverse_img)
return outputs
[docs]def embedded_antsreg_2d(source_image, target_image,
run_rigid=False,
rigid_iterations=1000,
run_affine=False,
affine_iterations=1000,
run_syn=True,
coarse_iterations=40,
medium_iterations=50, fine_iterations=40,
cost_function='MutualInformation',
interpolation='NearestNeighbor',
convergence=1e-6,
ignore_affine=False, ignore_header=False,
save_data=False, overwrite=False, output_dir=None,
file_name=None):
""" Embedded ANTS Registration 2D
Runs the rigid and/or Symmetric Normalization (SyN) algorithm of ANTs and
formats the output deformations into voxel coordinate mappings as used in
CBSTools registration and transformation routines.
Parameters
----------
source_image: niimg
Image to register
target_image: niimg
Reference image to match
run_rigid: bool
Whether or not to run a rigid registration first (default is False)
rigid_iterations: float
Number of iterations in the rigid step (default is 1000)
run_affine: bool
Whether or not to run a affine registration first (default is False)
affine_iterations: float
Number of iterations in the affine step (default is 1000)
run_syn: bool
Whether or not to run a SyN registration (default is True)
coarse_iterations: float
Number of iterations at the coarse level (default is 40)
medium_iterations: float
Number of iterations at the medium level (default is 50)
fine_iterations: float
Number of iterations at the fine level (default is 40)
cost_function: {'CrossCorrelation', 'MutualInformation'}
Cost function for the registration (default is 'MutualInformation')
interpolation: {'NearestNeighbor', 'Linear'}
Interpolation for the registration result (default is 'NearestNeighbor')
convergence: flaot
Threshold for convergence, can make the algorithm very slow
(default is convergence)
ignore_affine: bool
Ignore the affine matrix information extracted from the image header
(default is False)
ignore_header: bool
Ignore the orientation information and affine matrix information
extracted from the image header (default is False)
save_data: bool
Save output data to file (default is False)
overwrite: bool
Overwrite existing results (default is False)
output_dir: str, optional
Path to desired output directory, will be created if it doesn't exist
file_name: str, optional
Desired base name for output files with file extension
(suffixes will be added)
Returns
----------
dict
Dictionary collecting outputs under the following keys
(suffix of output files in brackets)
* transformed_source (niimg): Deformed source image (_ants-def)
* mapping (niimg): Coordinate mapping from source to target (_ants-map)
* inverse (niimg): Inverse coordinate mapping from target to source
(_ants-invmap)
Notes
----------
Port of the CBSTools Java module by Pierre-Louis Bazin. The main algorithm
is part of the ANTs software by Brian Avants and colleagues [1]_. The
interfacing with ANTs is performed through Nipype [2]_. Parameters have been
set to values commonly found in neuroimaging scripts online, but not
necessarily optimal.
References
----------
.. [1] Avants et al (2008), Symmetric diffeomorphic
image registration with cross-correlation: evaluating automated labeling
of elderly and neurodegenerative brain, Med Image Anal. 12(1):26-41
.. [2] Gorgolewski et al (2011) Nipype: a flexible, lightweight and
extensible neuroimaging data processing framework in python.
Front Neuroinform 5. doi:10.3389/fninf.2011.00013
"""
print('\nEmbedded ANTs Registration')
# for external tools: nipype
try:
from nipype.interfaces.ants import Registration
from nipype.interfaces.ants import ApplyTransforms
except ImportError:
print('Error: Nipype and/or ANTS could not be imported, they are required'
+' in order to run this module. \n (aborting)')
return None
# make sure that saving related parameters are correct
output_dir = _output_dir_4saving(output_dir, source_image) # needed for intermediate results
if save_data:
transformed_source_file = os.path.join(output_dir,
_fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='ants-def'))
mapping_file = os.path.join(output_dir,
_fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='ants-map'))
inverse_mapping_file = os.path.join(output_dir,
_fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='ants-invmap'))
if overwrite is False \
and os.path.isfile(transformed_source_file) \
and os.path.isfile(mapping_file) \
and os.path.isfile(inverse_mapping_file) :
print("skip computation (use existing results)")
output = {'transformed_source': load_volume(transformed_source_file),
'mapping': load_volume(mapping_file),
'inverse': load_volume(inverse_mapping_file)}
return output
# load and get dimensions and resolution from input images
source = load_volume(source_image)
src_affine = source.affine
src_header = source.header
nsx = source.header.get_data_shape()[X]
nsy = source.header.get_data_shape()[Y]
nsz = 1
rsx = source.header.get_zooms()[X]
rsy = source.header.get_zooms()[Y]
rsz = 1
target = load_volume(target_image)
trg_affine = target.affine
trg_header = target.header
ntx = target.header.get_data_shape()[X]
nty = target.header.get_data_shape()[Y]
ntz = 1
rtx = target.header.get_zooms()[X]
rty = target.header.get_zooms()[Y]
rtz = 1
# in case the affine transformations are not to be trusted: make them equal
if ignore_affine or ignore_header:
mx = np.argmax(np.abs(src_affine[0][0:3]))
my = np.argmax(np.abs(src_affine[1][0:3]))
mz = np.argmax(np.abs(src_affine[2][0:3]))
new_affine = np.zeros((4,4))
if ignore_header:
new_affine[0][0] = rsx
new_affine[1][1] = rsy
new_affine[2][2] = rsz
new_affine[0][3] = -rsx*nsx/2.0
new_affine[1][3] = -rsy*nsy/2.0
new_affine[2][3] = -rsz*nsz/2.0
else:
new_affine[0][mx] = rsx*np.sign(src_affine[0][mx])
new_affine[1][my] = rsy*np.sign(src_affine[1][my])
new_affine[2][mz] = rsz*np.sign(src_affine[2][mz])
if (np.sign(src_affine[0][mx])<0):
new_affine[0][3] = rsx*nsx/2.0
else:
new_affine[0][3] = -rsx*nsx/2.0
if (np.sign(src_affine[1][my])<0):
new_affine[1][3] = rsy*nsy/2.0
else:
new_affine[1][3] = -rsy*nsy/2.0
if (np.sign(src_affine[2][mz])<0):
new_affine[2][3] = rsz*nsz/2.0
else:
new_affine[2][3] = -rsz*nsz/2.0
#if (np.sign(src_affine[0][mx])<0): new_affine[mx][3] = rsx*nsx
#if (np.sign(src_affine[1][my])<0): new_affine[my][3] = rsy*nsy
#if (np.sign(src_affine[2][mz])<0): new_affine[mz][3] = rsz*nsz
#new_affine[0][3] = nsx/2.0
#new_affine[1][3] = nsy/2.0
#new_affine[2][3] = nsz/2.0
new_affine[3][3] = 1.0
src_img = nb.Nifti1Image(source.get_data(), new_affine, source.header)
src_img.update_header()
src_img_file = os.path.join(output_dir, _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_srcimg'))
save_volume(src_img_file, src_img)
source = load_volume(src_img_file)
src_affine = source.affine
src_header = source.header
# create generic affine aligned with the orientation for the target
mx = np.argmax(np.abs(trg_affine[0][0:3]))
my = np.argmax(np.abs(trg_affine[1][0:3]))
mz = np.argmax(np.abs(trg_affine[2][0:3]))
new_affine = np.zeros((4,4))
if ignore_header:
new_affine[0][0] = rtx
new_affine[1][1] = rty
new_affine[2][2] = rtz
new_affine[0][3] = -rtx*ntx/2.0
new_affine[1][3] = -rty*nty/2.0
new_affine[2][3] = -rtz*ntz/2.0
else:
new_affine[0][mx] = rtx*np.sign(trg_affine[0][mx])
new_affine[1][my] = rty*np.sign(trg_affine[1][my])
new_affine[2][mz] = rtz*np.sign(trg_affine[2][mz])
if (np.sign(trg_affine[0][mx])<0):
new_affine[0][3] = rtx*ntx/2.0
else:
new_affine[0][3] = -rtx*ntx/2.0
if (np.sign(trg_affine[1][my])<0):
new_affine[1][3] = rty*nty/2.0
else:
new_affine[1][3] = -rty*nty/2.0
if (np.sign(trg_affine[2][mz])<0):
new_affine[2][3] = rtz*ntz/2.0
else:
new_affine[2][3] = -rtz*ntz/2.0
#if (np.sign(trg_affine[0][mx])<0): new_affine[mx][3] = rtx*ntx
#if (np.sign(trg_affine[1][my])<0): new_affine[my][3] = rty*nty
#if (np.sign(trg_affine[2][mz])<0): new_affine[mz][3] = rtz*ntz
#new_affine[0][3] = ntx/2.0
#new_affine[1][3] = nty/2.0
#new_affine[2][3] = ntz/2.0
new_affine[3][3] = 1.0
trg_img = nb.Nifti1Image(target.get_data(), new_affine, target.header)
trg_img.update_header()
trg_img_file = os.path.join(output_dir, _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_trgimg'))
save_volume(trg_img_file, trg_img)
target = load_volume(trg_img_file)
trg_affine = target.affine
trg_header = target.header
# build coordinate mapping matrices and save them to disk
src_coord = np.zeros((nsx,nsy,2))
trg_coord = np.zeros((ntx,nty,2))
for x in range(nsx):
for y in range(nsy):
src_coord[x,y,X] = x
src_coord[x,y,Y] = y
src_map = nb.Nifti1Image(src_coord, source.affine, source.header)
src_map_file = os.path.join(output_dir, _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_srccoord'))
save_volume(src_map_file, src_map)
for x in range(ntx):
for y in range(nty):
trg_coord[x,y,X] = x
trg_coord[x,y,Y] = y
trg_map = nb.Nifti1Image(trg_coord, target.affine, target.header)
trg_map_file = os.path.join(output_dir, _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_trgcoord'))
save_volume(trg_map_file, trg_map)
# run the main ANTS software
reg = Registration()
reg.inputs.dimension = 2
# add a prefix to avoid multiple names?
prefix = _fname_4saving(file_name=file_name,
rootfile=source_image,
suffix='tmp_syn')
prefix = os.path.basename(prefix)
prefix = prefix.split(".")[0]
reg.inputs.output_transform_prefix = prefix
reg.inputs.fixed_image = [target.get_filename()]
reg.inputs.moving_image = [source.get_filename()]
print("registering "+source.get_filename()+"\n to "+target.get_filename())
if run_rigid is True and run_affine is True and run_syn is True:
reg.inputs.transforms = ['Rigid','Affine','SyN']
reg.inputs.transform_parameters = [(0.1,), (0.1,), (0.2, 3.0, 0.0)]
reg.inputs.number_of_iterations = [[rigid_iterations, rigid_iterations,
rigid_iterations],
[affine_iterations, affine_iterations,
affine_iterations],
[coarse_iterations, coarse_iterations,
medium_iterations, fine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC', 'CC', 'CC']
reg.inputs.metric_weight = [1.0, 1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [5, 5, 5]
else :
reg.inputs.metric = ['MI', 'MI', 'MI']
reg.inputs.metric_weight = [1.0, 1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [32, 32, 32]
reg.inputs.shrink_factors = [[4, 2, 1]] + [[4, 2, 1]] + [[8, 4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]] + [[3, 2, 1]] + [[2, 1, 0.5, 0]]
reg.inputs.sampling_strategy = ['Random'] + ['Random'] + ['Random']
reg.inputs.sampling_percentage = [0.3] + [0.3] + [0.3]
reg.inputs.convergence_threshold = [convergence] + [convergence] + [convergence]
reg.inputs.convergence_window_size = [10] + [10] + [5]
reg.inputs.use_histogram_matching = [False] + [False] + [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is True and run_affine is False and run_syn is True:
reg.inputs.transforms = ['Rigid','SyN']
reg.inputs.transform_parameters = [(0.1,), (0.2, 3.0, 0.0)]
reg.inputs.number_of_iterations = [[rigid_iterations, rigid_iterations,
rigid_iterations],
[coarse_iterations, coarse_iterations,
medium_iterations, fine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC', 'CC']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [5, 5]
else :
reg.inputs.metric = ['MI', 'MI']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [32, 32]
reg.inputs.shrink_factors = [[4, 2, 1]] + [[8, 4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]] + [[2, 1, 0.5, 0]]
reg.inputs.sampling_strategy = ['Random'] + ['Random']
reg.inputs.sampling_percentage = [0.3] + [0.3]
reg.inputs.convergence_threshold = [convergence] + [convergence]
reg.inputs.convergence_window_size = [10] + [5]
reg.inputs.use_histogram_matching = [False] + [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is False and run_affine is True and run_syn is True:
reg.inputs.transforms = ['Affine','SyN']
reg.inputs.transform_parameters = [(0.1,), (0.2, 3.0, 0.0)]
reg.inputs.number_of_iterations = [[affine_iterations, affine_iterations,
affine_iterations],
[coarse_iterations, coarse_iterations,
medium_iterations, fine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC', 'CC']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [5, 5]
else :
reg.inputs.metric = ['MI', 'MI']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [64, 64]
reg.inputs.shrink_factors = [[4, 2, 1]] + [[8, 4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]] + [[2, 1, 0.5, 0]]
reg.inputs.sampling_strategy = ['Random'] + ['Random']
reg.inputs.sampling_percentage = [0.3] + [0.3]
reg.inputs.convergence_threshold = [convergence] + [convergence]
reg.inputs.convergence_window_size = [10] + [5]
reg.inputs.use_histogram_matching = [False] + [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
if run_rigid is True and run_affine is True and run_syn is False:
reg.inputs.transforms = ['Rigid','Affine']
reg.inputs.transform_parameters = [(0.1,), (0.1,)]
reg.inputs.number_of_iterations = [[rigid_iterations, rigid_iterations,
rigid_iterations],
[affine_iterations, affine_iterations,
affine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC', 'CC']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [5, 5]
else :
reg.inputs.metric = ['MI', 'MI']
reg.inputs.metric_weight = [1.0, 1.0]
reg.inputs.radius_or_number_of_bins = [32, 32]
reg.inputs.shrink_factors = [[4, 2, 1]] + [[4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]] + [[3, 2, 1]]
reg.inputs.sampling_strategy = ['Random'] + ['Random']
reg.inputs.sampling_percentage = [0.3] + [0.3]
reg.inputs.convergence_threshold = [convergence] + [convergence]
reg.inputs.convergence_window_size = [10] + [10]
reg.inputs.use_histogram_matching = [False] + [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is True and run_affine is False and run_syn is False:
reg.inputs.transforms = ['Rigid']
reg.inputs.transform_parameters = [(0.1,)]
reg.inputs.number_of_iterations = [[rigid_iterations, rigid_iterations,
rigid_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [5]
else :
reg.inputs.metric = ['MI']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [32]
reg.inputs.shrink_factors = [[4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]]
reg.inputs.sampling_strategy = ['Random']
reg.inputs.sampling_percentage = [0.3]
reg.inputs.convergence_threshold = [convergence]
reg.inputs.convergence_window_size = [10]
reg.inputs.use_histogram_matching = [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is False and run_affine is True and run_syn is False:
reg.inputs.transforms = ['Affine']
reg.inputs.transform_parameters = [(0.1,)]
reg.inputs.number_of_iterations = [[affine_iterations, affine_iterations,
affine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [5]
else :
reg.inputs.metric = ['MI']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [32]
reg.inputs.shrink_factors = [[4, 2, 1]]
reg.inputs.smoothing_sigmas = [[3, 2, 1]]
reg.inputs.sampling_strategy = ['Random']
reg.inputs.sampling_percentage = [0.3]
reg.inputs.convergence_threshold = [convergence]
reg.inputs.convergence_window_size = [10]
reg.inputs.use_histogram_matching = [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is False and run_affine is False and run_syn is True:
reg.inputs.transforms = ['SyN']
reg.inputs.transform_parameters = [(0.2, 3.0, 0.0)]
reg.inputs.number_of_iterations = [[coarse_iterations, coarse_iterations,
medium_iterations, fine_iterations]]
if (cost_function=='CrossCorrelation'):
reg.inputs.metric = ['CC']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [5]
else :
reg.inputs.metric = ['MI']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [32]
reg.inputs.shrink_factors = [[8, 4, 2, 1]]
reg.inputs.smoothing_sigmas = [[2, 1, 0.5, 0]]
reg.inputs.sampling_strategy = ['Random']
reg.inputs.sampling_percentage = [0.3]
reg.inputs.convergence_threshold = [convergence]
reg.inputs.convergence_window_size = [10]
reg.inputs.use_histogram_matching = [False]
reg.inputs.winsorize_lower_quantile = 0.001
reg.inputs.winsorize_upper_quantile = 0.999
elif run_rigid is False and run_affine is False and run_syn is False:
reg.inputs.transforms = ['Rigid']
reg.inputs.transform_parameters = [(0.1,)]
reg.inputs.number_of_iterations = [[0]]
reg.inputs.metric = ['CC']
reg.inputs.metric_weight = [1.0]
reg.inputs.radius_or_number_of_bins = [5]
reg.inputs.shrink_factors = [[1]]
reg.inputs.smoothing_sigmas = [[1]]
print(reg.cmdline)
result = reg.run()
# Transforms the moving image
at = ApplyTransforms()
at.inputs.dimension = 2
at.inputs.input_image = source.get_filename()
at.inputs.reference_image = target.get_filename()
at.inputs.interpolation = interpolation
at.inputs.transforms = result.outputs.forward_transforms
at.inputs.invert_transform_flags = result.outputs.forward_invert_flags
print(at.cmdline)
transformed = at.run()
# Create coordinate mappings
src_at = ApplyTransforms()
src_at.inputs.dimension = 2
src_at.inputs.input_image_type = 3
src_at.inputs.input_image = src_map.get_filename()
src_at.inputs.reference_image = target.get_filename()
src_at.inputs.interpolation = 'Linear'
src_at.inputs.transforms = result.outputs.forward_transforms
src_at.inputs.invert_transform_flags = result.outputs.forward_invert_flags
mapping = src_at.run()
trg_at = ApplyTransforms()
trg_at.inputs.dimension = 2
trg_at.inputs.input_image_type = 3
trg_at.inputs.input_image = trg_map.get_filename()
trg_at.inputs.reference_image = source.get_filename()
trg_at.inputs.interpolation = 'Linear'
trg_at.inputs.transforms = result.outputs.reverse_transforms
trg_at.inputs.invert_transform_flags = result.outputs.reverse_invert_flags
inverse = trg_at.run()
# pad coordinate mapping outside the image? hopefully not needed...
# collect outputs and potentially save
transformed_img = nb.Nifti1Image(nb.load(transformed.outputs.output_image).get_data(),
target.affine, target.header)
mapping_img = nb.Nifti1Image(nb.load(mapping.outputs.output_image).get_data(),
target.affine, target.header)
inverse_img = nb.Nifti1Image(nb.load(inverse.outputs.output_image).get_data(),
source.affine, source.header)
outputs = {'transformed_source': transformed_img, 'mapping': mapping_img,
'inverse': inverse_img}
# clean-up intermediate files
os.remove(src_map_file)
os.remove(trg_map_file)
if ignore_affine or ignore_header:
os.remove(src_img_file)
os.remove(trg_img_file)
for name in result.outputs.forward_transforms:
if os.path.exists(name): os.remove(name)
for name in result.outputs.reverse_transforms:
if os.path.exists(name): os.remove(name)
os.remove(transformed.outputs.output_image)
os.remove(mapping.outputs.output_image)
os.remove(inverse.outputs.output_image)
if save_data:
save_volume(transformed_source_file, transformed_img)
save_volume(mapping_file, mapping_img)
save_volume(inverse_mapping_file, inverse_img)
return outputs