3
dn                 @   s  d Z ddlmZmZ ddlZddljZddlm	Z	 ddl
Z
ddlZddlmZ ddlmZ ddlmZ ddlmZ d	d
lmZmZ d	dlmZmZmZmZ d	dlmZmZmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z% d	dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/ d	dl0m1Z1 ddl2m3Z3m4Z5m6Z7m8Z9m:Z;m<Z=m>Z>m?Z?m@Z@mAZA ddlBmCZC ejDdZEG dd deFZGG dd deCZHG dd deHZIG dd deHZJdS )zDefines functionality for pipelined execution of interfaces

The `Node` class provides core functionality for batch processing.
    )OrderedDictdefaultdictN)Path)deepcopy)glob)INFO)mkdtemp   )configlogging)flatten	unflattenstr2bool	dict_diff)md5ensure_listsimplify_list	copyfilesfnames_presuffixloadpklsplit_filename	load_json	emptydirssavepklsilentrm)	traitsInputMultiPathCommandLine	UndefinedDynamicTraitedSpecBunchInterfaceResult	Interface	isdefined)get_filecopy_info   )
_parameterization_dirsave_hashfileload_resultfilesave_resultfilenodelist_runner
strip_tempwrite_node_reportclean_working_directory
merge_dictevaluate_connect_function)
EngineBaseznipype.workflowc               @   s   e Zd ZdZdS )NodeExecutionErrorz<A nipype-specific name for exceptions when executing a Node.N)__name__
__module____qualname____doc__ r6   r6   >/tmp/pip-build-7vycvbft/nipype/nipype/pipeline/engine/nodes.pyr1   C   s   r1   c                   s  e Zd ZdZd6 fdd	Zedd Zed	d
 Zedd Zedd Z	edd Z
e
jdd Z
edd Zedd Zejdd Zdd Zdd Zdd Zdd Zd7dd Zd8d!d"Zd9d#d$Zd%d& Zd'd( Zd)d* Zd:d,d-Zd.d/ Zd;d0d1Zd<d2d3Zd4d5 Z  ZS )=Nodea  
    Wraps interface objects for use in pipeline

    A Node creates a sandbox-like directory for executing the underlying
    interface. It will copy or link inputs into this directory to ensure that
    input data are not overwritten. A hash of the input state is used to
    determine if the Node inputs have changed and whether the node needs to be
    re-executed.

    Examples
    --------

    >>> from nipype import Node
    >>> from nipype.interfaces import spm
    >>> realign = Node(spm.Realign(), 'realign')
    >>> realign.inputs.in_files = 'functional.nii'
    >>> realign.inputs.register_to_mean = True
    >>> realign.run() # doctest: +SKIP

    NF皙?c                s   |dkrt dt|ts"t dtt| j||jd || _d| _d| _	d| _
d| _|| _|| _|| _|| _g | _i | _i | _|| _|
| _|	| _t| jjdr| jdk	r| j| jj_d| _d| _g | _|| _d| _t| jdrd| j_dS )	am  
        Parameters
        ----------

        interface : interface object
            node specific interface (fsl.Bet(), spm.Coregister())

        name : alphanumeric string
            node specific name

        iterables : generator
            Input field and list to iterate using the pipeline engine
            for example to iterate over different frac values in fsl.Bet()
            for a single field the input can be a tuple, otherwise a list
            of tuples ::

                node.iterables = ('frac',[0.5,0.6,0.7])
                node.iterables = [('fwhm',[2,4]),('fieldx',[0.5,0.6,0.7])]

            If this node has an itersource, then the iterables values
            is a dictionary which maps an iterable source field value
            to the target iterables field values, e.g.: ::

                inputspec.iterables = ('images',['img1.nii', 'img2.nii']])
                node.itersource = ('inputspec', ['frac'])
                node.iterables = ('frac', {'img1.nii': [0.5, 0.6],
                                           'img2.nii': [0.6, 0.7]})

            If this node's synchronize flag is set, then an alternate
            form of the iterables is a [fields, values] list, where
            fields is the list of iterated fields and values is the
            list of value tuples for the given fields, e.g.: ::

                node.synchronize = True
                node.iterables = [('frac', 'threshold'),
                                  [(0.5, True),
                                   (0.6, False)]]

        itersource: tuple
            The (name, fields) iterables source which specifies the name
            of the predecessor iterable node and the input fields to use
            from that source node. The output field values comprise the
            key to the iterables parameter value mapping dictionary.

        synchronize: boolean
            Flag indicating whether iterables are synchronized.
            If the iterables are synchronized, then this iterable
            node is expanded once per iteration over all of the
            iterables values.
            Otherwise, this iterable node is expanded once per
            each permutation of the iterables values.

        overwrite : Boolean
            Whether to overwrite contents of output directory if it already
            exists. If directory exists and hash matches it
            assumes that process has been executed

        needed_outputs : list of output_names
            Force the node to keep only specific outputs. By default all
            outputs are kept. Setting this attribute will delete any output
            files and directories from the node's working directory that are
            not part of the `needed_outputs`.

        run_without_submitting : boolean
            Run the node without submitting to a job engine or to a
            multiprocessing pool

        NzInterface must be providedz-interface must be an instance of an Interfacebase_dirFnum_threadswrite_cmdlineT)IOError
isinstancer"   superr8   __init__get
_interface
_hierarchy_got_inputs_originputs_output_dir	iterablessynchronize
itersource	overwriteparameterizationinput_sourceplugin_argsrun_without_submitting_mem_gb_n_procshasattrinputsr;   
_hashvalue_hashed_inputs_needed_outputsneeded_outputsr
   r<   )self	interfacenamerG   rI   rH   rJ   rV   rN   n_procsmem_gbkwargs)	__class__r6   r7   r@   ]   s:    S
zNode.__init__c             C   s   | j S )z&Return the underlying interface object)rB   )rW   r6   r6   r7   rX      s    zNode.interfacec             C   s   t tj| j d| j S )z6Get result from result file (do not hold it in memory)zresult_%s.pklz)_load_resultfileopjoin
output_dirrY   )rW   r6   r6   r7   result   s    zNode.resultc             C   s   | j jS )z-Return the inputs of the underlying interface)rB   rR   )rW   r6   r6   r7   rR      s    zNode.inputsc             C   s
   | j j S )z4Return the output fields of the underlying interface)rB   _outputs)rW   r6   r6   r7   outputs   s    zNode.outputsc             C   s   | j S )N)rU   )rW   r6   r6   r7   rV      s    zNode.needed_outputsc             C   s4   t tt|pg }|| jkr0d| _d| _|| _dS )z3Needed outputs changes the hash, refresh if changedN)sortedlistsetrU   rS   rT   )rW   Znew_outputsr6   r6   r7   rV      s
    
c             C   s&   t | jdr | jj| _tjd | jS )zGet estimated memory (GB)estimated_memory_gbziSetting "estimated_memory_gb" on Interfaces has been deprecated as of nipype 1.0, please use Node.mem_gb.)rQ   rB   rh   rO   loggerwarning)rW   r6   r6   r7   r[      s
    
zNode.mem_gbc             C   s:   | j dk	r| j S t| jjdr6t| jjjr6| jjjS dS )z-Get the estimated number of processes/threadsNr;   r%   )rP   rQ   rB   rR   r#   r;   )rW   r6   r6   r7   rZ     s    

zNode.n_procsc             C   s$   || _ t| jjdr | j | jj_dS )z,Set an estimated number of processes/threadsr;   N)rP   rQ   rB   rR   r;   )rW   valuer6   r6   r7   rZ     s    c             C   s   | j r| j S | jdkrt | _| j}| jrBtj|f| jjd }| jrdd | jD }t| j	d d sxdd |D }tj|f| }tj
tj|| j| _ | j S )z8Return the location of the output directory for the nodeN.c             S   s   g | ]}d j |qS )z{})format).0pr6   r6   r7   
<listcomp>(  s    z#Node.output_dir.<locals>.<listcomp>	executionZparameterize_dirsc             S   s   g | ]}t |qS r6   )r&   )rn   ro   r6   r6   r7   rp   *  s    )rF   r:   r   rC   r_   r`   splitrK   r   r
   realpathrY   )rW   Z	outputdirZ
params_strr6   r6   r7   ra     s    
zNode.output_dirc             C   s,   t jd| j|t| t| j|t| dS )zSet interface input valuez![Node] %s - setting input %s = %sN)ri   debugrY   strsetattrrR   r   )rW   	parametervalr6   r6   r7   	set_input0  s    zNode.set_inputc             C   s   t | jj|dS )z(Retrieve a particular output of the nodeN)getattrrb   rd   )rW   rw   r6   r6   r7   
get_output7  s    zNode.get_outputc             C   s   | j j  dS )zPrint interface helpN)rB   help)rW   r6   r6   r7   r|   ;  s    z	Node.helpc             C   s  | j  }tj| s.tjtj|d| j  r>tjd| dS ttj|d}dd |D }tt	|t	| }| j
 \}}tj|d| }tjd|||| ||k}	|	rt|d	kr||d
 ksttjd| j dS t|d	kr2|	r|j| tjdt|| j x|D ]}
tj|
 qW |	r.|gng }|sTtjd| |	 sPtdS ||d
 k}|s
d}	tjd| j tj }|tk rt|d
 d	 }|tdd }tj|d|| yt|d
 }W n tk
r   Y nX tj|t||d |r
tj|d
  |	st|	|fS )zt
        Check if the interface has been run previously, and whether
        cached results are up-to-date.
        zresult_%s.pklzz[Node] Not cached "%s".Fz	_0x*.jsonc             S   s   g | ]}|j d r|qS )z_unfinished.json)endswith)rn   pathr6   r6   r7   rp   O  s    z"Node.is_cached.<locals>.<listcomp>z
_0x%s.jsonz[Node] Hashes: %s, %s, %s, %sr%   r   z'[Node] Up-to-date cache found for "%s".Tzw[Node] Found %d previous hashfiles indicating that the working directory of node "%s" is stale, deleting old hashfiles.z"[Node] No hashfiles found in "%s".z%[Node] Outdated cache found for "%s".Z_0xNz[Node] Old/new hashes = %s/%s
   )FF)TT)FF)ra   r_   existsr`   rY   ri   rt   r   rf   rg   _get_hashvallenAssertionErrorfullnameremoverj   osinfogetEffectiveLevelr   r   logr   	Exceptionr   )rW   rm_outdatedoutdirZ
globhashes
unfinishedZ	hashfileshashed_inputs	hashvaluehashfilecachedZrmfileupdatedZloglevelZexp_hash_file_baseZexp_hashZprev_inputsr6   r6   r7   	is_cached?  sl    



zNode.is_cachedc             C   sp   | j dd\}}| j }tj|d| j }|r>d| j|| jfS |r`|r`tjd| j t|| j || j|| jfS )zu
        Decorate the new `is_cached` method with hash updating
        to maintain backwards compatibility.
        T)r   z
_0x%s.jsonz[Node] Updating hash: %s)	r   ra   r_   r`   rS   rT   ri   rt   _save_hashfile)rW   
updatehashr   r   r   r   r6   r6   r7   hash_exists  s    zNode.hash_existsc             C   s  | j dkri | _ ttt j| j | _ | j }| jpB| jdkoB| jj}t| t	sht
jd| j d| d | j \}}| o|s| o|r(t
jd tj|d}tj|st
jd| t|| jj  tj|d}tj|st
jd	| t||  | jd
|o| d}t
jd| jd|o|   |S |rt|rtt| t	 rtt
jd| j | rtt| j d d rttd|rx(ttj| j dD ]}	tj|	 qW | j  tj|d| j }
| jjotj|
 }|rt| t	 rt|dd nt
jddt t| t	 |
 t| t	rDx$ttj|dD ]}tj| q0W tj!|dd t"|
| j# t$| t| t	d ttj|d|  ttj|d| jj  y| jdd}W n> tk
r   t
j%d| j| t&|
st
j%d|
  Y nX t'j(|
|
j)dd t$| |t| t	d |S ) aT  
        Execute the node in its directory.

        Parameters
        ----------
        updatehash: boolean
            When the hash stored in the output directory as a result of a previous run
            does not match that calculated for this execution, updatehash=True only
            updates the hash without re-running.

        Nz[Node] Setting-up "z" in "z".z/Only updating node hashes or skipping executionz_inputs.pklzzCreating inputs file %sz
_node.pklzzCreating node file %sF)executer   z[Node] "%s" found cached%s.z (and hash updated)z-[Node] Rerunning cached, up-to-date node "%s"rq   Zstop_on_first_rerunz6Cannot rerun when "stop_on_first_rerun" is set to Truez	_0x*.jsonz_0x%s_unfinished.jsonT)
noexist_okz[%sNode] Resume - hashfile=%sZMap)exist_ok)
is_mapnode)r   z[Node] Error on "%s" (%s)zInterface finished unexpectedly and the corresponding unfinished hashfile %s does not exist. Another nipype instance may be running against the same work directory. Please ensure no other concurrent workflows are racingZ_unfinished )rb   r   )*r
   r.   r   	_sectionsra   rJ   rB   Z
always_runr>   MapNoderi   r   r   r   rt   r_   r`   r   r   rR   get_traitsfree_run_interfacer   r   r   r   r   r   rS   
can_resumeisfiler   intmakedirsr   rT   r,   rj   r   shutilmovereplace)rW   r   r   Z	force_runr   r   Zinputs_fileZ	node_filerb   outdatedhashZhashfile_unfinishedr   filenamer6   r6   r7   run  s    







zNode.runc             C   s   | j   | jdkr| jdkr| jj| jd d d\| _| _| jd d }t|r| jrt }|j	| jj
  |j	t| jj
  |j | _| jjd| jf | j| jfS )z Return a hash of the input stateNrq   hash_method)r   remove_unnecessary_outputsrV   )_get_inputsrS   rT   rR   get_hashvalr
   r   rV   r   updateencoderu   	hexdigestappend)rW   rm_extra
hashobjectr6   r6   r7   r     s    
zNode._get_hashvalc          -   C   s
  | j r
dS | jsd| _ dS tt}x4t| jj D ]"\}}||d  j||d f q2W tjdt| j| j	t| xt|j D ]v\}}d}yt
|j}W n. tk
r } ztjd| W Y dd}~X nX |dkrtd| j	|f x|D ]\}}t}	t|tr:t||d }
t|
r~t|d |d |
}	nD|}y|j | }	W n" tk
rp   |j | }	Y nX tjd	| y| j|t|	 W q tjk
r } zD|jd d
dd| j	 d| d| dt|	 f}dj|f|_ W Y dd}~X qX qW qW d| _ dS )z
        Retrieve inputs from pointers to results files.

        This mechanism can be easily extended/replaced to retrieve data from
        other data sources (e.g., XNAT, HTTP, etc.,.)
        NTr   r%   zG[Node] Setting %d connected inputs of node "%s" from %d previous nodes.z%szpError populating the inputs of node "%s": the results file of the source node (%s) does not contain any outputs.   z
output: %sr   zError setting node input:zNode: %sz	input: %szresults_file: %sz	value: %s
)rD   rL   r   rf   itemsr   ri   rt   r   rY   r^   rd   AttributeErrorcriticalr1   r   r>   tuplerz   r#   r/   	trait_getZdictcopyry   r   r   
TraitErrorargsru   r`   )rW   Zprev_resultskeyr   Zresults_fnameconnectionsrd   econnZoutput_valuerk   Zoutput_namemsgr6   r6   r7   r   /  sb    

zNode._get_inputsc             C   s:   x&t tj| j dD ]}tj| qW t| j| j d S )Nz	_0x*.json)	r   r_   r`   ra   r   r   r   rS   rT   )rW   r   r6   r6   r7   _update_hashy  s    zNode._update_hashTc             C   s   |r| j   | j S | j|S )N)r   _load_results_run_command)rW   r   r   r6   r6   r7   r   ~  s    zNode._run_interfacec             C   s4  | j  }yttj|d| j }W nv tjtfk
rF   tj	d Y nX t
tfk
r } z4tj	dt| ttj|d}| jjf | W Y d d }~X nX |S t| ts| jdd | jj| jd}t|dttjtj d	}t| jj|| jjj |d
}t||| jt | j!d d d ntj	d | j" }|S )Nzresult_%s.pklzz;Error populating inputs/outputs, (re)aggregating results...z?attribute error: %s probably using different trait pickled filez_inputs.pklzT)	linksonly)rV   r   )cwd
returncodeenvironhostname)rX   runtimerR   rd   rq   use_relative_paths)rebasezaggregating mapnode results)#ra   r^   r_   r`   rY   r   r   EOFErrorri   rt   r   ImportErrorru   r   rR   	trait_setr>   r   _copyfiles_to_wdrB   Zaggregate_outputsrV   r    dictr   r   socketgethostnamer!   r]   r   _save_resultfiler   r
   r   )rW   r   rb   err
old_inputsZaggoutsr   r6   r6   r7   r     sD    
 


zNode._load_resultsc             C   sf  |sNy| j  }W n* ttfk
r:   tjd d}d}Y nX tjd| j |S t| j }|rxt| j	j
| _| j|d tjd| j d| j	j d| j	jj d	 | j	j|dd
}tjd| j d|jj d t|jdd }|s"d }t| trtj|dg}t|j|| j	j
| j| j|d|_t||| jt| jd d d |rbtd| j d|jj  |S )Nz:[Node] Some of the outputs were not found: rerunning node.FTz3[Node] Cached "%s" - collecting precomputed outputs)r   z[Node] Executing "z" <rl   >)r   Zignore_exceptionz[Node] Finished "z", elapsed time zs.	tracebackmapflow)	dirs2keeprq   r   )r   z&Exception raised while executing Node z.

)!r   FileNotFoundErrorr   ri   r   r   r   ra   r   rB   rR   rE   r   rY   r3   r]   r2   r   r   durationrz   r>   r   r_   r`   r-   rd   rV   r
   r   r   r1   r   )rW   r   r   rb   r   exc_tbr   r6   r6   r7   r     sR    

&
zNode._run_commandc       
      C   sB  t | j}|sdS tjd|| | j }|rN|rN|}tj|d}tj|dd x|D ]}| j	j
 j|d }t| sT| r~qTt|}|r|r|d st||g|d dd}	nt||d	}	t|	tj|jtjd }	nt||g|d dd}	nt||d	}	t|tst|	}	t| j	|d |	 qTW |r>|r>t|dd dS )z%copy files over and change the inputsNz.copying files to wd [execute=%s, linksonly=%s]Z
_tempinputT)r   r   copy)r   Z
create_new)newpathr%   )r   )r$   rX   ri   rt   ra   r_   r`   r   r   rR   r   rA   r#   r   r   r   _strip_tempabspathrr   sepr>   rf   r   rv   r   )
rW   r   r   Zfilecopy_infor   Zolddirr   filesinfilesZnewfilesr6   r6   r7   r     s>    


zNode._copyfiles_to_wdc             K   s   | j jf | dS )zUpdate inputsN)rR   r   )rW   optsr6   r6   r7   r   "  s    zNode.update)NNFNNFNr9   )F)F)F)TF)T)TF)r2   r3   r4   r5   r@   propertyrX   rb   rR   rd   rV   setterr[   rZ   ra   ry   r{   r|   r   r   r   r   r   r   r   r   r   r   r   __classcell__r6   r6   )r]   r7   r8   G   sB          o
	
U

tJ
.
B
.r8   c                   s   e Zd ZdZd fdd	Zedd Zejdd Zed	d
 Zdd Z	dd Z
dd Zd fdd	Zdd Zdd Zdd Z  ZS )JoinNodea  Wraps interface objects that join inputs into a list.

    Examples
    --------

    >>> import nipype.pipeline.engine as pe
    >>> from nipype import Node, JoinNode, Workflow
    >>> from nipype.interfaces.utility import IdentityInterface
    >>> from nipype.interfaces import (ants, dcm2nii, fsl)
    >>> wf = Workflow(name='preprocess')
    >>> inputspec = Node(IdentityInterface(fields=['image']),
    ...                     name='inputspec')
    >>> inputspec.iterables = [('image',
    ...                        ['img1.nii', 'img2.nii', 'img3.nii'])]
    >>> img2flt = Node(fsl.ImageMaths(out_data_type='float'),
    ...                   name='img2flt')
    >>> wf.connect(inputspec, 'image', img2flt, 'in_file')
    >>> average = JoinNode(ants.AverageImages(), joinsource='inputspec',
    ...                       joinfield='images', name='average')
    >>> wf.connect(img2flt, 'out_file', average, 'images')
    >>> realign = Node(fsl.FLIRT(), name='realign')
    >>> wf.connect(img2flt, 'out_file', realign, 'in_file')
    >>> wf.connect(average, 'output_average_image', realign, 'reference')
    >>> strip = Node(fsl.BET(), name='strip')
    >>> wf.connect(realign, 'out_file', strip, 'in_file')

    NFc                sr   t t| j||f| d| _|| _|s4| jjj }nt|t	t
frH|g}|| _| j| jj| j| _|| _d| _dS )ae  

        Parameters
        ----------
        interface : interface object
            node specific interface (fsl.Bet(), spm.Coregister())
        name : alphanumeric string
            node specific name
        joinsource : node name
            name of the join predecessor iterable node
        joinfield : string or list of strings
            name(s) of list input fields that will be aggregated.
            The default is all of the join node input fields.
        unique : flag indicating whether to ignore duplicate input values

        See Node docstring for additional keyword arguments.
        Nr   )r?   r   r@   _joinsource
joinsourcerB   rR   copyable_trait_namesr>   ru   bytes	joinfield_override_join_traits_inputs_unique_next_slot_index)rW   rX   rY   r   r   uniquer\   )r]   r6   r7   r@   D  s    zJoinNode.__init__c             C   s   | j S )N)r   )rW   r6   r6   r7   r   q  s    zJoinNode.joinsourcec             C   s   t |tr|j}|| _dS )zxSet the joinsource property. If the given value is a Node,
        then the joinsource is set to the node name.
        N)r>   r8   rY   r   )rW   rk   r6   r6   r7   r   u  s    
c             C   s   | j S )z5The JoinNode inputs include the join field overrides.)r   )rW   r6   r6   r7   rR   ~  s    zJoinNode.inputsc                s@   j  t fddjD }tjd|  j d7  _ |S )a  Add new join item fields assigned to the next iterated
        input

        This method is intended solely for workflow graph expansion.

        Examples
        --------

        >>> from nipype.interfaces.utility import IdentityInterface
        >>> import nipype.pipeline.engine as pe
        >>> from nipype import Node, JoinNode, Workflow
        >>> inputspec = Node(IdentityInterface(fields=['image']),
        ...    name='inputspec'),
        >>> join = JoinNode(IdentityInterface(fields=['images', 'mask']),
        ...    joinsource='inputspec', joinfield='images', name='join')
        >>> join._add_join_item_fields()
        {'images': 'imagesJ1'}

        Return the {base field: slot field} dictionary
        c                s   g | ]}|j | fqS r6   )_add_join_item_field)rn   field)idxrW   r6   r7   rp     s    z2JoinNode._add_join_item_fields.<locals>.<listcomp>z!Added the %s join item fields %s.r%   )r   r   r   ri   rt   )rW   Z	newfieldsr6   )r   rW   r7   _add_join_item_fields  s    zJoinNode._add_join_item_fieldsc             C   s2   d||d f }| j j|dd}| j j|| |S )zaAdd new join item fields qualified by the given index

        Return the new field name
        z%sJ%dr%   FT)r   trait	add_trait)rW   r   indexrY   r   r6   r6   r7   r     s    zJoinNode._add_join_item_fieldc             C   s   t  }|dkr|j }n*x(|D ] }|j|std| j|f qW xt|j D ]x\}}||krt|jdkr|jd }|j	|| t
||t tjd| ||jj |j  qP|j	|tj t
||t qPW |S )zConvert the given join fields to accept an input that
        is a list item rather than a list. Non-join fields
        delegate to the interface traits.

        Return the override DynamicTraitedSpec
        Nz.The JoinNode %s does not have a field named %sr%   r   z<Converted the join node %s field %s trait type from %s to %s)r   r   r   
ValueErrorrY   rf   r   r   Zinner_traitsr   rv   r   ri   rt   
trait_typer   r   Any)rW   
basetraitsfieldsZ	dyntraitsr   rY   r   Z
item_traitr6   r6   r7   r     s,    



zJoinNode._override_join_traitsTc                s   | j   tt| j||S )z?Collates the join inputs prior to delegating to the superclass.)_collate_join_field_inputsr?   r   r   )rW   r   r   )r]   r6   r7   r     s    zJoinNode._run_commandc             C   s   x| j j D ]}|| jkr~| j|}yt| jj || W q tk
rz } z&td| ||| j j | j|f W Y dd}~X qX qt| jj |rt	| j
|}t|rt| jj || qW tjd| j|  dS )z\
        Collects each override join item field into the interface join
        field input.z>>JN %s %s %s %s %s: %sNz/Collated %d inputs into the %s node join fields)rR   r   r   _collate_input_valuerv   rB   r   r   rQ   rz   r   r#   ri   rt   r   )rW   r   rx   r   r6   r6   r7   r    s,    

z#JoinNode._collate_join_field_inputsc                sV    fddt jD }jjj }t|jtjr>t	|S j
rRttj|S |S )a  
        Collects the join item field values into a list or set value for
        the given field, as follows:

        - If the field trait is a Set, then the values are collected into
        a set.

        - Otherwise, the values are collected into a list which preserves
        the iterables order. If the ``unique`` flag is set, then duplicate
        values are removed but the iterables order is preserved.
        c                s   g | ]}j  |qS r6   )_slot_value)rn   r   )r   rW   r6   r7   rp     s    z1JoinNode._collate_input_value.<locals>.<listcomp>)ranger   rB   rR   r   r>   r   r   Setrg   r   rf   r   fromkeys)rW   r   rx   Z	basetraitr6   )r   rW   r7   r    s    zJoinNode._collate_input_valuec             C   sZ   d||d f }yt | j|S  tk
rT } ztd| ||||f W Y d d }~X nX d S )Nz%sJ%dr%   zSThe join node %s does not have a slot field %s to hold the %s value at index %d: %s)rz   r   r   )rW   r   r   Z
slot_fieldr   r6   r6   r7   r    s    zJoinNode._slot_value)NF)T)r2   r3   r4   r5   r@   r   r   r   rR   r   r   r   r   r  r  r  r   r6   r6   )r]   r7   r   '  s   ,	&!r   c                   s   e Zd ZdZd! fdd	Zd"ddZdd	 Zd
d Zdd Ze	dd Z
e	dd Zd#ddZdd Zdd Zdd Z fddZdd Zd$dd Z  ZS )%r   a  Wraps interface objects that need to be iterated on a list of inputs.

    Examples
    --------

    >>> from nipype import MapNode
    >>> from nipype.interfaces import fsl
    >>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign')
    >>> realign.inputs.in_file = ['functional.nii',
    ...                           'functional2.nii',
    ...                           'functional3.nii']
    >>> realign.run() # doctest: +SKIP

    Fc                sj   t t| j||f| t|ttfr*|g}|| _|| _| j| j	j
| jd| _| jj| j d| _|| _dS )a  

        Parameters
        ----------
        interface : interface object
            node specific interface (fsl.Bet(), spm.Coregister())
        iterfield : string or list of strings
            name(s) of input fields that will receive a list of whatever kind
            of input they take. the node will be run separately for each
            value in these lists. for more than one input, the values are
            paired (i.e. it does not compute a combinatorial product).
        name : alphanumeric string
            node specific name
        serial : boolean
            flag to enforce executing the jobs of the mapnode in a serial
            manner rather than parallel
        nested : boolean
            support for nested lists. If set, the input list will be flattened
            before running and the nested list structure of the outputs will
            be resored.

        See Node docstring for additional keyword arguments.
        )r   FN)r?   r   r@   r>   ru   r   	iterfieldnested_create_dynamic_traitsrB   rR   r   Zon_trait_change_set_mapnode_inputrD   _serial)rW   rX   r  rY   serialr  r\   )r]   r6   r7   r@   .  s    zMapNode.__init__Nc             C   s   t  }|dkr|j }xt|j D ]\}}||kr|dksD|dkrtjd| | jrl|j|tt	j
  q|j|t|j n|j|t	j| t||t t||}t|rt||| t||}q$W |S )z<Convert specific fields of a trait to accept multiple inputsNr%   zadding multipath trait: %s)r   r   rf   r   ri   rt   r  r   r   r   r   r   ZTraitrv   r   rz   r#   )rW   r   r   nitemsoutputrY   specrk   r6   r6   r7   r	  U  s     
zMapNode._create_dynamic_traitsc             C   s,   t jdt| |t| | j|t| dS )zh
        Set interface input value or nodewrapper attribute
        Priority goes to interface.
        z#setting nodelevel(%s) input %s = %sN)ri   rt   ru   r
  r   )rW   rw   rx   r6   r6   r7   ry   j  s    zMapNode.set_inputc             C   sF   t jdt| |t| || jkr2t| j|| nt| jj|| d S )Nz#setting mapnode(%s) input: %s -> %s)ri   rt   ru   r  rv   r   rB   rR   )rW   rY   Znewvaluer6   r6   r7   r
  t  s
    
zMapNode._set_mapnode_inputc             C   sV  | j   | jdk	r(| jdk	r(| j| jfS | j  t| jj}x| jD ]v}|j| |j	|t
| jjj | j tjd|t| j| | jrt||tt| j| qDt||t| j| qDW |j| jd d d\}}| jd d }t|o| jr<t }|j|j  t| j}|jt|j  |j }|jd|f || | _| _| j| jfS )z'Compute hash including iterfield lists.Nzsetting hashinput %s-> %srq   r   )r   r   rV   )r   rS   rT   _check_iterfieldr   rB   rR   r  Zremove_traitr   r   r   r   ri   rt   rz   r   r  rv   r   r   r
   r   rV   r   r   r   re   ru   r   r   )rW   Z
hashinputsrY   r   r   r   r   Zsorted_outputsr6   r6   r7   r   }  s2    

zMapNode._get_hashvalc             C   s   | j S )N)r   )rW   r6   r6   r7   rR     s    zMapNode.inputsc             C   s    | j j rt| j j j S d S )N)rB   rc   r    r   )rW   r6   r6   r7   rd     s    
zMapNode.outputsc             c   sH  |d kr| j  }| jr6tttt| j| jd }nttt| j| jd }xt|D ]}d| j	|f }t
t| j| j| j| j| j| jtj|d|d}| j|_|jjjf t| jjj  | jj|j_x`| jD ]V}| jrttt| j|}ntt| j|}tjd||||  t|j|||  qW | j|_||fV  qZW d S )Nr   z_%s%dr   )rZ   r[   rJ   rV   rN   r:   rY   zsetting input %d %s %s)ra   r  r   r   r   rz   rR   r  r  rY   r8   r   rB   rP   rO   rJ   rV   rN   r_   r`   rM   rX   r   r   Zresource_monitorri   rt   rv   r
   )rW   r   r  inodenamenoder   Z	fieldvalsr6   r6   r7   _make_nodes  s8     

zMapNode._make_nodesc             C   s>  t g g g g | jd}g }x@|D ]6\}}}|jj|d  |j|| |rt|dr|jj||j |jj||j |j|j|< t|dr|jj||j | jr xt| jj	 D ]\}}| j
d d }	t|	r| jr|| jkrqt|j|}
t|
sg }
|r|jr|
j||jj |  n|
j|d  dd |
D }t|r|jrt|j||
 qW q W | jrx\t| jj	 D ]J\}}t|j|}
t|
rt|
tt| j| jd }
t|j||
 qtW |r:td	d |D r:g }x@t|D ]4\}}|d k	r|d
| g7 }|dt| g7 }qW td| jdj|f |S )N)rX   r   
provenancerR   rd   r   r  rq   r   c             S   s   g | ]}t |qS r6   )r#   )rn   rx   r6   r6   r7   rp     s    z,MapNode._collate_results.<locals>.<listcomp>r   c             S   s   g | ]}|d k	qS )Nr6   )rn   coder6   r6   r7   rp     s    zSubnode %d failedz	Error: %szSubnodes of node: %s failed:
%sr   )r!   rd   r   insertrQ   rX   rR   r  rf   r   r
   r   rV   rz   r#   r   anyrv   r  r   r   r  	enumerateru   r1   rY   r`   )rW   ZnodesZfinalresultr   r  Znresultr   r   _r   valuesZdefined_valsr   r  r6   r6   r7   _collate_results  sX    




zMapNode._collate_resultsc             C   s0   | j   | j  t| ddd dd | j D S )z=Generate subnodes of a mapnode and write pre-execution reportNT)rb   r   c             S   s   g | ]\}}|qS r6   r6   )rn   r  r  r6   r6   r7   rp     s    z(MapNode.get_subnodes.<locals>.<listcomp>)r   r  r,   r  )rW   r6   r6   r7   get_subnodes   s    zMapNode.get_subnodesc             C   sX   | j   | j  | jrdS | jr>tttt| j| j	d S ttt| j| j	d S )z5Get the number of subnodes to iterate in this MapNoder%   r   )
r   r  r  r  r   r   r   rz   rR   r  )rW   r6   r6   r7   num_subnodes  s    zMapNode.num_subnodesc                s@   | j j }| j| jj| jd| _ | j jf | tt| j	  d S )N)r   )
r   r   r	  rB   rR   r  r   r?   r   r   )rW   r   )r]   r6   r7   r     s
    
zMapNode._get_inputsc             C   s   x*| j D ] }tt| j|std| qW t| j dkrttt| j| j d }x@| j dd D ].}|ttt| j|krdtdt| j qdW dS )zyChecks iterfield

        * iterfield must be in inputs
        * number of elements must match across iterfield
        z4Input %s was not set but it is listed in iterfields.r%   r   Nz<All iterfields of a MapNode have to have the same length. %s)r  r#   rz   rR   r   r   r   ru   )rW   r  Z	first_lenr6   r6   r7   r    s    zMapNode._check_iterfieldTc                s  | j   | j }|s| j S | jrBtttt| j| j	d }nttt| j| j	d }d| j
   fddt|D }| j }t| jjt|dttjtj d| jjj d}y,| jt| j||t| jd d	 d
}W n` tk
r@ } zBdjt|jdd||j_t||| j
t| jd d d  W Y dd}~X nX t||| j
dd g }	xHt t!j"|ddD ]2}
t!j#|
rl|
j$t!j%d |krl|	j&|
 qlW x$|	D ]}
t'j(d|
 t)j*|
 qW |S )zRun the mapnode interface

        This is primarily intended for serial execution of mapnode. A parallel
        execution requires creation of new nodes that can be spawned
        r   z_%s{}c                s   g | ]} j |qS r6   )rm   )rn   r  )nnametplr6   r7   rp   B  s    z*MapNode._run_interface.<locals>.<listcomp>r%   )r   r   r   r   )rX   r   rR   rq   Zstop_on_first_crash)r   Z
stop_firstz%s

%sstderrr   r   )r   NFr   *z[MapNode] Removing folder "%s".r   )+r  ra   r   r  r   r   r   rz   rR   r  rY   r  r!   rB   r]   r    r   r   r   r   r   r   r  _node_runnerr  r   r
   r   rm   r   r   r   r   r_   r`   isdirrr   r   r   ri   rt   r   rmtree)rW   r   r   r   r  Z	nodenamesr   rb   r   Zdirs2remover~   r6   )r  r7   r   1  sX     


zMapNode._run_interface)FF)NN)N)TF)r2   r3   r4   r5   r@   r	  ry   r
  r   r   rR   rd   r  r  r  r  r   r  r   r   r6   r6   )r]   r7   r     s   &

	!
"7
r   )Kr5   collectionsr   r   r   os.pathr~   r_   Zpathlibr   r   r   r   r   r   r   r   tempfiler   r   r
   Z
utils.miscr   r   r   r   Zutils.filemanipr   r   r   r   r   r   r   r   r   r   r   Zinterfaces.baser   r   r   r   r   r    r!   r"   r#   Zinterfaces.base.specsr$   utilsr&   r'   r   r(   r^   r)   r   r*   r"  r+   r   r,   r-   r.   r/   baser0   	getLoggerri   RuntimeErrorr1   r8   r   r   r6   r6   r6   r7   <module>   s8   
4,0
     e x