3
dj              
   @   sz   d Z ddlmZ ddlZddlmZmZmZ dZyddl	m
Z ddlmZ W n   d	ZY nX d
d ZG dd deZdS )z3Parallel workflow execution via IPython controller
    )dumpsN   )DistributedPluginBaseloggerreport_crashF)__version__)TimeoutErrorTc          	   C   s   ddl m} ddlm} ddlm}m} d }d }dd l}	|	j }
y8|j	| |j
| ddlm} || }|j|d}W n,   | }ddlm} || }|j}Y nX |	j|
 ||| fS )Nr   )gethostname)
format_exc)configlogging)loads)
updatehash)socketr	   	tracebackr
   Znipyper   r   osgetcwdZupdate_configZupdate_loggingpickler   runresultchdir)Z
pckld_taskZnode_configr   r	   r
   r   r   r   r   r   cwdr   Ztask r   A/tmp/pip-build-7vycvbft/nipype/nipype/pipeline/plugins/ipython.pyexecute_task   s(    


r   c                   sT   e Zd ZdZd fdd	Zd fdd	Zdd	 Zdd
dZdddZdd Z	  Z
S )IPythonPluginzExecute workflow with ipythonNc                sR   t rtdtt| j d d} fdd|D | _d | _d | _i | _d| _	d S )Nz.Please install ipyparallel to use this plugin.)plugin_argsurl_fileprofile
cluster_idcontextdebugtimeoutr   username	sshserversshkeypasswordparamikoc                s   i | ]}| kr | |qS r   r   ).0arg)r   r   r   
<dictcomp>C   s   z*IPythonPlugin.__init__.<locals>.<dictcomp>r   )r   r   r   r    r!   r"   r   r#   r$   r%   r&   r'   )
IPython_not_loadedImportErrorsuperr   __init__client_args	iparallel
taskclienttaskmap_taskid)selfr   Z
valid_args)	__class__)r   r   r.   1   s*               

zIPythonPlugin.__init__Fc          !      s   yd}t | tj| | _W n, tk
rH } ztd|W Y dd}~X nX y| jjf | j| _W nd tk
r } zHt	|t
rtd|t	|trtd|t	|trtd|n|W Y dd}~X nX tt| j|||dS )z~Executes a pre-defined pipeline is distributed approaches
        based on IPython's ipyparallel processing interface
        Zipyparallelz=ipyparallel not found. Parallel execution will be unavailableNzNo IPython clients found.z+ipcluster/ipcontroller has not been startedzIpython kernel not installed)r   )
__import__sysmodulesr0   r,   ZClientr/   r1   	Exception
isinstancer   IOError
ValueErrorr-   r   r   )r4   graphr   r   namee)r5   r   r   r   K   s&    




zIPythonPlugin.runc             C   sh   || j krtd| | j | j r`| j | j \}}}td d d}||d< ||d< ||d< |S d S d S )NzTask %d not in pending list)r   r   r   r   hostname)r2   r<   readygetdict)r4   taskidr   r   r@   Z
result_outr   r   r   _get_resulte   s    
zIPythonPlugin._get_resultc             C   sB   t |d}| jj jt||j|}|  jd7  _|| j| j< | jS )N   r   )r   r1   Zload_balanced_viewapplyr   r   r3   r2   )r4   noder   Z
pckld_nodeZresult_objectr   r   r   _submit_jobr   s    

zIPythonPlugin._submit_jobc             C   s<   |r0|d r0|d |_ |d |_t||d dS t|S d S )Nr   r   )r   )Z_resultZ
_tracebackr   )r4   rH   r   r   r   r   _report_crash{   s
    

zIPythonPlugin._report_crashc             C   s4   t dkr0tjd|  | jj| j|  | j|= d S )Nz0.11zClearing id: %d)
IPyversionr   r!   r1   Zpurge_resultsr2   )r4   rD   r   r   r   _clear_task   s    zIPythonPlugin._clear_task)N)F)F)N)__name__
__module____qualname____doc__r.   r   rE   rI   rJ   rL   __classcell__r   r   )r5   r   r   .   s   
	
r   )rP   r   r   r7   baser   r   r   r+   ZIPythonr   rK   Zipyparallel.errorr   r   r   r   r   r   r   <module>   s   
