3
d4                 @   s   d Z ddlZddlZddlmZmZ ddlmZ ddl	Z	ddl
mZ ddlZddlmZ ddlZddlm
Z
 dd	lmZ d
dlmZ ddlmZ yddlmZ W n ek
r   dd ZY nX e
jdZdd Zdd ZG dd deZdS )zParallel workflow execution via multiprocessing

Support for child processes running as non-daemons based on
http://stackoverflow.com/a/8963618/1183453
    N)ProcessPoolExecutorwait)format_exception)INFO)deepcopy   )logging)get_system_total_memory_gb   )MapNode   )DistributedPluginBase)indentc             C   s    |s| S | j d}||j| S )z.A textwrap.indent replacement for Python < 3.3T)
splitlinesjoin)textprefixZ	splittext r   C/tmp/pip-build-7vycvbft/nipype/nipype/pipeline/plugins/multiproc.pyr      s    
r   znipype.workflowc          	   C   sN   t dd|d}y| j|d|d< W n&   ttj  |d< | j|d< Y nX |S )a  Function to execute node.run(), catch and log any errors and
    return the result dictionary

    Parameters
    ----------
    node : nipype Node instance
        the node to run
    updatehash : boolean
        flag for updating hash
    taskid : int
        an identifier for this task

    Returns
    -------
    result : dictionary
        dictionary containing the node runtime results and stats
    N)result	tracebacktaskid)
updatehashr   r   )dictrunr   sysexc_infor   )noder   r   r   r   r   r   run_node+   s    r   c             C   s   t j|  dt jd< dS )z0Initializes the environment of the child process1ZNIPYPE_NO_ETN)oschdirenviron)cwdr   r   r   process_initializerL   s    
r$   c                   sp   e Zd ZdZd fdd	Zdd Zdd Zd	d
 ZdddZdd Z	dd Z
dd ZdddZdddZ  ZS )MultiProcPlugina  
    Execute workflow with multiprocessing, not sending more jobs at once
    than the system can support.

    The plugin_args input to run can be used to control the multiprocessing
    execution and defining the maximum amount of memory and threads that
    should be used. When those parameters are not specified,
    the number of threads and memory of the system is used.

    System consuming nodes should be tagged::

      memory_consuming_node.mem_gb = 8
      thread_consuming_node.n_procs = 16

    The default number of threads and memory are set at node
    creation, and are 1 and 0.25GB respectively.

    Currently supported options are:

    - non_daemon: boolean flag to execute as non-daemon processes
    - n_procs: maximum number of threads to be executed in parallel
    - memory_gb: maximum memory (in GB) that can be used at once.
    - raise_insufficient: raise error if the requested resources for
        a node over the maximum `n_procs` and/or `memory_gb`
        (default is ``True``).
    - scheduler: sort jobs topologically (``'tsort'``, default value)
        or prioritize jobs by, first, memory consumption and, second,
        number of threads (``'mem_thread'`` option).
    - mp_context: name of multiprocessing context to use

    Nc                s   t t| j|d i | _i | _d| _tj | _| j	j
dtj | _| j	j
dt d | _| j	j
dd| _tjd| j| j| j y.tj| j	j
d	}t| jt| jf|d
| _W nD ttfk
r   t| jd| _| jjt| j}t|gdd Y nX d | _d S )N)plugin_argsr   n_procs	memory_gbg?raise_insufficientTz7[MultiProc] Starting (n_procs=%d, mem_gb=%0.2f, cwd=%s)
mp_context)max_workersZinitializerZinitargsr*   )r+      )timeout)superr%   __init___taskresult	_task_obj_taskidr    getcwdZ_cwdr&   getmp	cpu_count
processorsr	   r(   r)   loggerdebugZget_contextr   r$   poolAttributeError	TypeErrorsubmitr   _stats)selfr&   r*   result_future)	__class__r   r   r/   s   s6    
zMultiProcPlugin.__init__c             C   s   |j  }|| j|d < d S )Nr   )r   r0   )r?   argsr   r   r   r   _async_callback   s    zMultiProcPlugin._async_callbackc             C   s   | j j|S )N)r0   r4   )r?   r   r   r   r   _get_result   s    zMultiProcPlugin._get_resultc             C   s   | j |= d S )N)r1   )r?   r   r   r   r   _clear_task   s    zMultiProcPlugin._clear_taskFc             C   sl   |  j d7  _ t|jdddkr(d|j_| jjt||| j }|j| j || j	| j < t
jd|j| j  | j S )Nr   terminal_output streamZ	allatoncez*[MultiProc] Submitted task %s (taskid=%d).)r2   getattrZ	interfacerF   r:   r=   r   Zadd_done_callbackrC   r1   r8   r9   fullname)r?   r   r   r@   r   r   r   _submit_job   s    zMultiProcPlugin._submit_jobc             C   s   g }g }x(|j  D ]}|j|j |j|j qW tjtj|| jkrdtj	d| j | j
rdtdtjtj|| jkrtj	d| j | j
rtddS )z0Check if any node exeeds the available resourceszASome nodes exceed the total amount of memory available (%0.2fGB).z(Insufficient resources available for jobz7Some nodes demand for more threads than available (%d).N)Znodesappendmem_gbr'   npanyarrayr(   r8   warningr)   RuntimeErrorr7   )r?   graphZtasks_mem_gbZtasks_num_thr   r   r   r   _prerun_check   s"    zMultiProcPlugin._prerun_checkc             C   s   | j j  d S )N)r:   shutdown)r?   r   r   r   _postrun_check   s    zMultiProcPlugin._postrun_checkc             C   sR   | j }| j}x<|D ]4\}}|t| j| j|8 }|t| j| j|8 }qW ||fS )z9
        Make sure there are resources available
        )r(   r7   minprocsrM   r'   )r?   running_tasksfree_memory_gbfree_processors_jobidr   r   r   _check_resources   s    z MultiProcPlugin._check_resourcesc                s  t j j  jjdddkj @ } j j\}}t jt|| j	| j
f} j|krd}tjtkr fdd jD }|rd}|dj|7 }t|dd	 }tjd
t jt|| j	| j
| | _|dk s|dkrtjd dS t|t j dkrtjd dS  j| jjdd}tj  xR|D ]H}	t j|	 try j|	 j }
W nF tk
r   ttj  } j |	|d|dd d j!|	< w8Y nX |
dkrȈ j"|	}|sȐq8t# j|	 j$ j	}t# j|	 j% j
}||ks||krtjd|	|| q8||8 }||8 }tjd j|	 j&|	|||| d j|	< d j!|	<  j'|	|rlq8|s j|	 j(rtjd j|	  y j|	 j)|d W n8 tk
r   ttj  } j |	|d|dd Y nX  j*|	  j+  ||7 }||7 }d _tj  q8 j,r0 j, j|	 d  j-t. j|	 |d}|dkrhd j|	< d j!|	< n jj/d||	f d _q8W dS )zL
        Sends jobs to workers when system resources are available.
        r   )ZaxisrG   c                s    g | ]\}}d  j | j qS )z  * %s)rX   rJ   ).0r\   r]   )r?   r   r   
<listcomp>   s   z:MultiProcPlugin._send_procs_to_workers.<locals>.<listcomp>z
Currently running:

    zi[MultiProc] Running %d tasks, and %d jobs ready. Free memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%sg{Gz?zNo resources availableNzUNo tasks are being run, and no jobs can be submitted to the queue. Potential deadlock	scheduler)rd   )r   r   )r   Fr   z-Cannot allocate job %d (%0.2fGB, %d threads).zEAllocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.Tz Running node %s on master thread)r   start)0rN   ZflatnonzeroZ	proc_doneZdepidxsumZ	__array__r^   Zpending_taskslenr(   r7   r>   r8   levelr   r   r   infor9   
_sort_jobsr&   r4   gcZcollect
isinstancerX   r   num_subnodes	Exceptionr   r   r   Z_clean_queueZproc_pendingZ_submit_mapnoderW   rM   r'   rJ   Z_local_hash_checkZrun_without_submittingr   Z_task_finished_cbZ_remove_node_dirsZ_status_callbackrK   r   insert)r?   r   rS   jobidsrZ   r[   statsZtasks_list_msgrY   r]   rm   r   r=   Znext_job_gbZnext_job_thtidr   )r?   r   _send_procs_to_workers   s     














z&MultiProcPlugin._send_procs_to_workerstsortc                s    |dkrt | fdddS |S )NZ
mem_threadc                s    j |  j j |  jfS )N)rX   rM   r'   )item)r?   r   r   <lambda>|  s    z,MultiProcPlugin._sort_jobs.<locals>.<lambda>)key)sorted)r?   rp   rd   r   )r?   r   rj   x  s
    zMultiProcPlugin._sort_jobs)N)F)FN)rt   )__name__
__module____qualname____doc__r/   rC   rD   rE   rK   rT   rV   r^   rs   rj   __classcell__r   r   )rA   r   r%   R   s   +

 r%   ) r|   r    multiprocessingr5   concurrent.futuresr   r   r   r   r   r   r   rk   copyr   numpyrN   rG   Zutils.profilerr	   Zenginer   baser   textwrapr   ImportError	getLoggerr8   r   r$   r%   r   r   r   r   <module>   s*   	
!