3
dd                 @   s   d Z ddlZddlmZ ddlmZ ddlZddlZddlmZmZ ddl	m
Z
 ddlZddlmZ dd	lmZ d
dlmZmZ d
dlmZ ddlmZmZmZ ejdZG dd deZG dd deZG dd deZG dd deZ dS )z&Common graph operations for execution.    N)deepcopy)glob)sleeptime)format_exception   )logging)str2bool   )topological_sortload_resultfile)MapNode   )report_crashreport_nodes_not_runcreate_pyscriptznipype.workflowc               @   s$   e Zd ZdZdddZd	ddZdS )

PluginBasezBase class for plugins.Nc             C   s(   |d kri }|| _ d | _|jd| _d S )Nstatus_callback)plugin_args_configget_status_callback)selfr    r   >/tmp/pip-build-7vycvbft/nipype/nipype/pipeline/plugins/base.py__init__   s
    zPluginBase.__init__Fc             C   s   t dS )a  
        Instruct the plugin to execute the workflow graph.

        The core plugin member that should be implemented by
        all plugins.

        Parameters
        ----------
        graph :
            a networkx, flattened :abbr:`DAG (Directed Acyclic Graph)`
            to be executed
        config : :obj:`~nipype.config`
            a nipype.config object
        updatehash : :obj:`bool`
            whether cached nodes with stale hash should be just updated.

        N)NotImplementedError)r   graphconfig
updatehashr   r   r   run"   s    zPluginBase.run)N)F)__name__
__module____qualname____doc__r   r    r   r   r   r   r      s   
r   c                   s   e Zd ZdZd$ fdd	Zdd Zdd Zd%d
dZdd Zd&ddZ	d'ddZ
dd Zd(ddZdd Zd)ddZdd Zd*ddZdd Zd d! Zd"d# Z  ZS )+DistributedPluginBasea  
    Execute workflow with a distribution engine

    Combinations of ``proc_done`` and ``proc_pending``:

    +------------+---------------+--------------------------------+
    | proc_done  | proc_pending  | outcome                        |
    +============+===============+================================+
    | True       | False         | Process is finished            |
    +------------+---------------+--------------------------------+
    | True       | True          | Process is currently being run |
    +------------+---------------+--------------------------------+
    | False      | False         | Process is queued              |
    +------------+---------------+--------------------------------+
    | False      | True          | INVALID COMBINATION            |
    +------------+---------------+--------------------------------+

    Attributes
    ----------
    procs : :obj:`list`
        list (N) of underlying interface elements to be processed
    proc_done : :obj:`numpy.ndarray`
        a boolean numpy array (N,) signifying whether a process has been
        submitted for execution
    proc_pending : :obj:`numpy.ndarray`
        a boolean numpy array (N,) signifying whether a
        process is currently running.
    depidx : :obj:`numpy.matrix`
        a boolean matrix (NxN) storing the dependency structure accross
        processes. Process dependencies are derived from each column.

    Nc                sX   t t| j|d d| _d| _d| _d| _d| _d| _d| _	g | _
| jjdtj| _dS )z8
        Initialize runtime attributes to none

        )r   Nmax_jobs)superr%   r   procsdepidxrefidxmapnodesmapnodesubids	proc_doneproc_pendingpending_tasksr   r   npinfr&   )r   r   )	__class__r   r   r   Y   s    zDistributedPluginBase.__init__c             C   s   dS )z>Stub method to validate/massage graph and nodes before runningNr   )r   r   r   r   r   _prerun_checki   s    z#DistributedPluginBase._prerun_checkc             C   s   dS )z'Stub method to close any open resourcesNr   )r   r   r   r   _postrun_checkl   s    z$DistributedPluginBase._postrun_checkFc             C   s  t jd || _t|d d }| j| | j| g | _i | _g }g }d}d}xNtj	| j
 sptj| jrt }	tj| j
 | jjddk@ d }
t| j
tj| j
| jA tj| j
| j@ t|
t| jtj| j
 | j @ f}||k}|rt jd|  |}g }x| jr| jj \}}y| j|}W n@ tk
rp } z"|j| j|| |j| W Y dd}~X nX |r|d r|j| j|||d	 |jd
j|d  n| j| | j  | j| n,| j
| r| j| st|jd||f qW |r| jj| t| j}|tj| j
| j@ f}|p<||k}|rVt jd|  |}|| j k rr| j!||d n|rt jd |	| }t"t#d|t   qTW | j  t$| | j%  |r|d d }}t&|t'rt(|}t|dkrt(t| d| }}||dS )zN
        Executes a pre-defined pipeline using distributed approaches
        zRunning in parallel.	executionZpoll_sleep_durationNr   r   PProgress: %d jobs, %d/%d/%d (done/running/ready), %d/%d (pending_tasks/waiting).	traceback)result )Tasks currently running: %d. Pending: %d.)r   r   z!Not submitting (max jobs reached)z raised. Re-raising first.)r6   )r:   ))loggerinfor   floatr3   _generate_dependency_listr+   r,   r0   allr-   anyr.   r   nonzeror)   sumlenr/   debugpop_get_result	Exceptionappend_clean_queuejoin_task_finished_cb_remove_node_dirs_clear_taskAssertionErrorinsertextendr&   _send_procs_to_workersr   maxr   r4   
isinstancestrRuntimeError)r   r   r   r   Zpoll_sleep_secsZnotrunerrorsZold_progress_statsZold_presub_statsZ
loop_startZ
jobs_readyZprogress_statsZdisplay_statsZtoappendtaskidjobidr8   excnum_jobsZpresub_statsZ	sleep_tilerrorcauser   r   r   r    o   s    


 "




zDistributedPluginBase.runc             C   s   t d S )N)r   )r   rW   r   r   r   rF      s    z!DistributedPluginBase._get_resultc             C   s   t d S )N)r   )r   noder   r   r   r   _submit_job   s    z!DistributedPluginBase._submit_jobc             C   s0   d }|d k	r$|d |_ |d }||_t||dS )Nr8   r7   )r7   )Z_resultZ
_tracebackr   )r   r]   r8   tbr   r   r   _report_crash   s    
z#DistributedPluginBase._report_crashc             C   s   t d S )N)r   )r   rW   r   r   r   rM      s    z!DistributedPluginBase._clear_taskc             C   s   t jd| | jr$| j| j| d |d krDd djttj  d}| j| j| |d}t	| j
d d r|tdj|d	 || jkrd
| j|< d| j|< | j| }d
| j|< d| j|< | j|||S )NzClearing %d from queue	exception
)r8   r7   )r8   r5   Zstop_on_first_crashr9   r7   FT)r;   rD   r   r(   rJ   r   sysexc_infor`   r	   r   rU   r,   r.   r-   _remove_node_deps)r   rX   r   r8   	crashfiler   r   r   rI      s     





z"DistributedPluginBase._clean_queuec             C   s*  dd l j}|| jkrdS | jj| | j| j }t|}tjd|| j|  x&t	|D ]}|| j
| jjd | < qXW | jj| |j| j|jtj|| jjd ffd| _|j| j|jtj| jjd |ffd| _d| j| d |f< tj| jtj|tdf| _tj| jtj|tdf| _dS )Nr   TzAdding %d jobs for mapnode %sr   lil)dtypeF)Zscipy.sparsesparser+   rH   r(   Zget_subnodesrC   r;   rD   ranger,   r)   shaperP   ZvstackZ
lil_matrixr0   zerosZhstackZconcatenater-   boolr.   )r   rX   Zsspr,   Znumnodesir   r   r   _submit_mapnode   s,    

  z%DistributedPluginBase._submit_mapnodec       
      C   s\  xTt j| jsVt| j}t j| jr.d}ntd| j| }tj	d| || jks\|dkr^P t j
| j | jjddk@ d }t|dkrRtjd|t|d| |pd x|d| D ]}t| j| tr4y| j| j }W n. tk
r   | j|| d| j|< wY nX |dkr4| j|}|s4qd| j|< d| j|< tjd	| j| | | jrv| j| j| d
 | j||s8| j| jrtj	d| j|  y| j| j  W n" tk
r   | j|| Y nX | j| | j  nJ| jt| j| |d}	|	dkr&d| j|< d| j|< n| jjd|	|f tjd| j| | qW qP qW dS )z'
        Sends jobs to workers
        Nr   zSlots available: %sr   z)Pending[%d] Submitting[%d] jobs Slots[%s]r1   FTzSubmitting: %s ID: %dstartz Running node %s on master thread)r   zFinished submitting: %s ID: %d)r0   r?   r-   rC   r/   isinfr&   rR   r;   rD   rA   r)   rB   r<   rS   r(   r   num_subnodesrG   rI   r.   ro   r   _local_hash_checkZrun_without_submittingr    rK   rL   r^   r   rO   )
r   r   r   rZ   slotsZjobidsrX   rr   Zsubmittidr   r   r   rQ     sf    
"









z,DistributedPluginBase._send_procs_to_workersc             C   s\  t | j| jd d sdS y| j| j \}}W nP tk
r   tjd| j| | j| jj| j| jj	j
djttj   dS X tjd| j| || | j| j}| j| jj}|o|o|dks|d ko| rXtjd| j| | y| j|dd	 | j  W nP tk
rR   tjd
| j| |djttj   | j|| d| j|< Y nX dS dS )Nr5   Zlocal_hash_checkFa"  Error while checking node hash, forcing re-run. Although this error may not prevent the workflow from running, it could indicate a major problem. Please report a new issue at https://github.com/nipy/nipype/issues adding the following information:

	Node: %s
	Interface: %s.%s
	Traceback:
%srb   z2Checking hash "%s" locally: cached=%s, updated=%s.z#Skipping cached node %s with ID %s.T)cachedz'Error skipping cached node %s (%s).

%s)r	   r(   r   Z	is_cachedrG   r;   warningZ	interfacer"   r2   r!   rJ   r   rc   rd   rD   	overwrite
always_runrK   rL   rI   r.   )r   rX   r   rv   updatedrx   ry   r   r   r   rs   g  sJ    z'DistributedPluginBase._local_hash_checkc             C   s   t jd||rdnd| j|  | jr6| j| j| d d| j|< | jj|}d||j < || jkrd| j	| j	dd|f j d |f< dS )zqExtract outputs and assign to inputs of dependent tasks

        This is called when a job is completed.
        z[Job %d] %s (%s).ZCachedZ	CompletedendFr   N)
r;   r<   r(   r   r.   r)   Z
getrowviewrA   r,   r*   )r   rX   rv   Zrowviewr   r   r   rK     s    


z'DistributedPluginBase._task_finished_cbc             C   sh   ddl }t|\| _}|j|| jdd| _| jjt| _tj	t
| jtd| _tj	t
| jtd| _dS )z1Generates a dependency list for a list of graphs.r   Nrg   )Znodelistformat)rh   )networkxr   r(   Zto_scipy_sparse_matrixr)   Zastypeintr*   r0   rl   rC   rm   r-   r.   )r   r   nx_r   r   r   r>     s    z/DistributedPluginBase._generate_dependency_listc       	      C   s   dd l }y
|j}W n tk
r,   |j}Y nX dd ||| j| D }x,|D ]$}| jj|}d| j|< d| j|< qNW t| j| ||dS )Nr   c             S   s   g | ]}|qS r   r   ).0sr   r   r   
<listcomp>  s    z;DistributedPluginBase._remove_node_deps.<locals>.<listcomp>TF)r]   Z
dependentsrf   )	r}   dfs_preorderAttributeErrorZdfs_preorder_nodesr(   indexr-   r.   dict)	r   rX   rf   r   r   r   Zsubnodesr]   idxr   r   r   re     s    


z'DistributedPluginBase._remove_node_depsc             C   s   t | jd d rtj| jjdddkj d }xn|D ]f}|| jkrHq8| j| r8| j	|  r8d| j||f< | j
| j }tjd| j
| j|f  tj| q8W dS )	z;Removes directories whose outputs have already been used upr5   Zremove_node_directoriesr   )Zaxisr   z@[node dependencies finished] removing node: %s from directory %sN)r	   r   r0   rA   r*   rB   Z	__array__r,   r-   r.   r(   
output_dirr;   r<   _idshutilrmtree)r   indicesr   outdirr   r   r   rL     s     

z'DistributedPluginBase._remove_node_dirs)N)F)F)N)N)FN)F)r!   r"   r#   r$   r   r3   r4   r    rF   r^   r`   rM   rI   ro   rQ   rs   rK   r>   re   rL   __classcell__r   r   )r2   r   r%   7   s"    
g



J4
r%   c                   sL   e Zd ZdZd fdd	Zdd Zdd Zd	d
 ZdddZdd Z	  Z
S )SGELikeBatchManagerBasez3Execute workflow with SGE/OGE/PBS like batch systemNc          	      s~   t t| j|d || _d | _|rtd|krb|d | _tjj| jrbt| j}|j	 | _W d Q R X d|krt|d | _i | _
d S )N)r   templateZ	qsub_args)r'   r   r   	_templateZ
_qsub_argsospathisfileopenread_pending)r   r   r   Ztpl_file)r2   r   r   r     s    

z SGELikeBatchManagerBase.__init__c             C   s   t dS )z.Check if a task is pending in the batch systemN)r   )r   rW   r   r   r   _is_pending  s    z#SGELikeBatchManagerBase._is_pendingc             C   s   t dS )z!Submit a task to the batch systemN)r   )r   Z
scriptfiler]   r   r   r   _submit_batchtask  s    z)SGELikeBatchManagerBase._submit_batchtaskc             C   s  || j krtd| | j|r$d S | j | }t }t| jd d }d}xht | |k ry ttjj	|dj
  d}P W n, tk
r } ztj| W Y d d }~X nX td qLW |r"dd d d	}d }yd
j|||}	t|	W n: tk
r } zdj	ttj  |d< W Y d d }~X nX nttjj	|dd }t|}td d d}
t|tr|d |
d< |d |
d< |d |
d< |rtjj	|d}tj|| n||
d< |
S )NzTask %d not foundr5   Zjob_finished_timeoutTzresult_*.pklzFr
   unknown)hostnamer8   r7   zJob id ({0}) finished or terminated, but results file does not exist after ({1}) seconds. Batch dir contains crashdump file if node raised an exception.
Node working directory: ({2}) rb   r7   r   )r8   r7   r8   r   zcrashstore.pklz)r   rG   r   r   r=   r   r   r   r   rJ   rE   r;   rD   r   r|   IOErrorr   rc   rd   r   r   rS   rename)r   rW   node_dirttimeoutZ	timed_outeresult_dataresults_fileerror_message
result_out
crash_filer   r   r   rF     sJ    



*z#SGELikeBatchManagerBase._get_resultFc       	   
   C   s   t ||d}tjj|\}}dj|jddd	 }dj| jjddtj|f f}tjj|d| }t	|d}|j
| W dQ R X | j||S )
zsubmit job and return taskid)r   .Nr   rb   z%s %szbatchscript_%s.shwtr   )r   r   r   splitrJ   r   rstriprc   
executabler   
writelinesr   )	r   r]   r   ZpyscriptZ	batch_dirnameZbatchscriptZbatchscriptfilefpr   r   r   r^   *  s    z#SGELikeBatchManagerBase._submit_jobc             C   s   | j |= d S )N)r   )r   rW   r   r   r   rM   7  s    z#SGELikeBatchManagerBase._clear_task)N)F)r!   r"   r#   r$   r   r   r   rF   r^   rM   r   r   r   )r2   r   r     s   3
r   c                   sD   e Zd ZdZd fdd	ZdddZdd	 Zd
d Zdd Z  Z	S )GraphPluginBasez:Base class for plugins that distribute graphs to workflowsNc                s.   |r|j drtjd tt| j|d d S )Nr   z:status_callback not supported for Graph submission plugins)r   )r   r;   rw   r'   r   r   )r   r   )r2   r   r   r   >  s    zGraphPluginBase.__init__Fc       	         s   dd l }g }i }|| _t|j| tjd xHt D ]<\}}|jt||dd  fddt|j	|D ||< q8W | j
||  d S )Nr   z.Creating executable python files for each nodeF)r   Zstore_exceptionc                s   g | ]} j |qS r   )r   )r   Zprevnode)nodesr   r   r   R  s    z'GraphPluginBase.run.<locals>.<listcomp>)r}   r   listr   r;   rD   	enumeraterH   r   Zpredecessors_submit_graph)	r   r   r   r   r   pyfilesdependenciesr   r]   r   )r   r   r    E  s    
$zGraphPluginBase.runc             C   s   f }x|D ]}t | d| }|dkrLtjj|rLt|}|j }W d Q R X t|drt|jt	r||jkr|dkrtjj|j| rt|j| }|j }W d Q R X n
|j| }d|jkr|jd r|}n||7 }||f7 }q
W |S )Nr   r   r   rx   )
getattrr   r   r   r   r   hasattrrS   r   r   )r   r]   keywordsvalueskeywordvaluef	tmp_valuer   r   r   	_get_argsV  s$    




zGraphPluginBase._get_argsc             C   s   t dS )z
        pyfiles: list of files corresponding to a topological sort
        dependencies: dictionary of dependencies based on the toplogical sort
        N)r   )r   r   r   r   r   r   r   r   o  s    zGraphPluginBase._submit_graphc             C   s   || j krtd| | j|r$d S | j | }ttjj|dj  ttjj|dd }t|}t	d d d}t
|t	r|d |d< |d |d< |d |d< |rtjj|d}tj|| n||d< |S )	NzTask %d not foundzresult_*.pklzr   )r8   r7   r8   r7   r   zcrashstore.pklz)r   rG   r   r   r   r   rJ   rE   r   r   rS   r   )r   rW   r   r   r   r   r   r   r   r   rF   v  s$    



zGraphPluginBase._get_result)N)F)
r!   r"   r#   r$   r   r    r   r   rF   r   r   r   )r2   r   r   ;  s   
r   )!r$   rc   copyr   r   r   r   r   r   r7   r   numpyr0   r9   r   Z
utils.miscr	   Zengine.utilsr   r   Zenginer   toolsr   r   r   	getLoggerr;   objectr   r%   r   r   r   r   r   r   <module>   s*   
   *]