3
d:K                 @   s   d Z ddlZddlZddlZddlZddlZddlZddlZddl	m
Z
 ddlmZ ddlmZmZ e
jdZeeejd	d
Zdd ZG dd deZG dd deZdd ZG dd deZdS )z$Parallel workflow execution via SGE
    N   )logging)CommandLine   )SGELikeBatchManagerBaseloggerznipype.interfaced   i  c             C   s   t jtd d d |   dS )zPNeeded for debugging on big jobs.  Once this is fully vetted, it can be removed. z=!r   z  Nz=!=!=!)r   debugDEBUGGING_PREFIX)message r   =/tmp/pip-build-7vycvbft/nipype/nipype/pipeline/plugins/sge.pysge_debug_print   s    r   c               @   sX   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd ZdS )QJobInfozInformation about a single job created by OGE/SGE or similar
    Each job is responsible for knowing it's own refresh state
    :author Hans J. Johnson
    c             C   s>   t || _t|| _|| _tj | _|| _t || _|| _	d S )N)
int_job_numstr_job_queue_state	_job_timetime_job_info_creation_time_job_queue_name
_job_slots_qsub_command_line)selfjob_numjob_queue_statejob_timejob_queue_name	job_slotsqsub_command_liner   r   r   __init__"   s    



zQJobInfo.__init__c             C   s.   dj | j| j| jtjdtj| j| j| j	S )Nz{:<8d}{:12}{:<3d}{:20}{:8}{}z%Y-%m-%dT%H:%M:%S)
formatr   r   r   r   strftimegmtimer   r   r   )r   r   r   r   __repr__:   s    zQJobInfo.__repr__c             C   s
   | j dkS )Ninitializing)r   )r   r   r   r   is_initializingD   s    zQJobInfo.is_initializingc             C   s   | j dkp| j dkS )Nzombiefinished)r   )r   r   r   r   	is_zombieG   s    zQJobInfo.is_zombiec             C   s
   | j dkS )NZrunning)r   )r   r   r   r   
is_runningJ   s    zQJobInfo.is_runningc             C   s
   | j dkS )Npending)r   )r   r   r   r   
is_pendingM   s    zQJobInfo.is_pendingc             C   sX   t j  | j }| j r*tdj|  d}n*| j rP|dkrPtdj| | d}nd}|S )z1Return True, unless job is in the "zombie" statuszVDONE! QJobInfo.IsPending found in 'zombie' list, returning False so claiming done!
{0}FiX  zrFAILURE! QJobInfo.IsPending found long running at {1} seconds'initializing' returning False for to break loop!
{0}T)r   r   r+   r   r#   r(   )r   Z	time_diffZis_pending_statusr   r   r   is_job_state_pendingP   s    
zQJobInfo.is_job_state_pendingc             C   s    || _ || _|| _t|| _d S )N)r   r   r   r   r   )r   r   r   r   r    r   r   r   update_infoh   s    zQJobInfo.update_infoc             C   s
   || _ d S )N)r   )r   Z	new_stater   r   r   	set_staten   s    zQJobInfo.set_stateN)__name__
__module____qualname____doc__r"   r&   r(   r+   r,   r.   r/   r0   r1   r   r   r   r   r      s   
r   c               @   sX   e Zd ZdZdddZdd Zdd Zed	d
 Zdd Z	dddZ
dd Zdd ZdS )QstatSubstitutezdA wrapper for Qstat to avoid overloading the
    SGE/OGS server with rapid continuous qstat requestsqstatc             C   s(   || _ || _t | _t | _| j  dS )zZ
        :param qstat_instant_executable:
        :param qstat_cached_executable:
        N)_qstat_instant_executable_qstat_cached_executablelist_out_of_scope_jobsdict_task_dictionary_remove_old_jobs)r   Zqstat_instant_executableZqstat_cached_executabler   r   r   r"   v   s
    zQstatSubstitute.__init__c             C   s   | j dd dS )zThis is only called during initialization of the function for the purpose
        of identifying jobs that are not part of this run of nipype.  They
        are jobs that existed prior to starting a new jobs, so they are irrelevant.
        ZQstatInitializationTN)
_run_qstat)r   r   r   r   r>      s    z QstatSubstitute._remove_old_jobsc             C   s(   t |}t|dtj dd|| j|< dS )z
        :param taskid: The job id
        :param qsub_command_line: When initializing, re-use the job_queue_name
        :return: NONE
        r'   ZnoQueuer   N)r   r   r   r=   )r   taskidr!   r   r   r   add_startup_job   s    zQstatSubstitute.add_startup_jobc             C   s   t djtj d d}d}d}x|dkr|d8 }ybtj|dtjtj d d	t	| gtj
tj
d
}|j \}}|jt	| rd}t dj| P W q"   t d tjd Y q"X q"W |S )zhrequest definitive job completion information for the current job
        from the qacct report
        z6WARNING:  CONTACTING qacct for finished jobs, {0}: {1}zVerifying CompletionZqacct
   Fr   r   z-oz-j)stdoutstderrTzNOTE: qacct for jobs
{0}zNOTE: qacct call failed   )r   r#   r   
subprocessPopenpwdgetpwuidosgetuidr   PIPEcommunicatefindsleep)r@   this_commandZqacct_retriesZis_completeprocZqacct_result_r   r   r   _qacct_verified_complete   s6    



z(QstatSubstitute._qacct_verified_completec             C   s  t  }x*|D ] }y|jdd jd j}W n   d}Y nX yt|jdd jd j}W n   d}Y nX |jd}t|jdd jd j}y0|jdd jd j}ttjtj	|d	}	W n   td
}	Y nX t|}
|
| j
kr$| j
|
 j||	|| tdj| j
|
  |j|
 q| jj|
 qW xt | j
j D ]}||kr| j|}|rt| j
| jd ntdj|| j
|  | j
| j rD| j|}|r| j
| jd ntdj|| j
|  qDW d S )NZ
queue_namer   unknownslotsr   stateZJB_job_numberZJAT_start_timez%Y-%m-%dT%H:%M:%Sg        zUpdating job:  {0}r)   zCERROR:  Job not in current parselist, and not in done list {0}: {1}zKERROR:  Job not in still in intializing mode, and not in done list {0}: {1})r:   getElementsByTagName
childNodesdatar   getAttributefloatr   mktimestrptimer=   r0   r   r#   appendr;   keysrS   r1   r(   )r   Zxml_job_listZcurrent_jobs_parsedZcurrent_job_elementr   r    r   r   Zjob_time_textr   task_idZdictionary_jobZis_completedr   r   r   _parse_qstat_job_list   s`    







z%QstatSubstitute._parse_qstat_job_listTc             C   s   t djtj | |r | j}n| j}d}x|dkr|d8 }yttj|dtjt	j
 d dddgtjtjd	}|j \}}tjjj|}|jd
}	|	d }
|
jd}| j| P W q, tk
r } z(djt||}t | tjd W Y dd}~X q,X q,W dS )ai  request all job information for the current user in xmlformat.
        See documentation from java documentation:
        http://arc.liv.ac.uk/SGE/javadocs/jgdi/com/sun/grid/jgdi/monitoring/filter/JobStateFilter.html
        -s r gives running jobs
        -s z gives recently completed jobs (**recently** is very ambiguous)
        -s s suspended jobs
        z/WARNING:  CONTACTING qmaster for jobs, {0}: {1}rB   r   r   z-uz-xmlz-sZpsrz)rC   rD   Zjob_infoZjob_listzQstatParsingError:
	{0}
	{1}
rE   N)r   r#   r   r8   r9   rF   rG   rH   rI   rJ   rK   rL   rM   xmldomminidomparseStringrX   rb   	ExceptiontyperO   )r   Zreason_for_qstatZforce_instantrP   Zqstat_retriesrQ   Zqstat_xml_resultrR   rd   jobsrunZrunjobsinstZexception_messager   r   r   r?     sB    





zQstatSubstitute._run_qstatc             C   s(   x"t | jj D ]}tt| qW dS )zFor debuggingN)r:   r=   valuesr   r   )r   vvr   r   r   print_dictionaryI  s    z QstatSubstitute.print_dictionaryc             C   s   t |}|| jkrF| j| j }|r| jdj|d | j| j }n>| jdj|d || jkrr| j| j }ntdj| d}|stdj| || jkrtdj| | jjt | | jj| ntdj| |S )Nzchecking job pending status {0}FTz9ERROR: Job {0} not in task list, even after forced qstat!z&DONE! Returning for {0} claiming done!z(NOTE: Adding {0} to OutOfScopeJobs list!z=ERROR: Job {0} not in task list, but attempted to be removed!)	r   r=   r/   r?   r#   r   r;   r_   pop)r   ra   Zjob_is_pendingr   r   r   is_job_pendingN  s2    


zQstatSubstitute.is_job_pendingN)r7   r7   )T)r2   r3   r4   r5   r"   r>   rA   staticmethodrS   rb   r?   rn   rp   r   r   r   r   r6   r   s   
	'X
1r6   c             C   s   | d j  r| S d|  S dS )zEnsure that qsub job names must begin with a letter.

    Numbers and punctuation are  not allowed.

    >>> qsub_sanitize_job_name('01')
    'J01'
    >>> qsub_sanitize_job_name('a01')
    'a01'
    r   JN)isalpha)Ztestjobnamer   r   r   qsub_sanitize_job_nameu  s    
rt   c                   s0   e Zd ZdZ fddZdd Zdd Z  ZS )	SGEPlugina?  Execute using SGE (OGE not tested)

    The plugin_args input to run can be used to control the SGE execution.
    Currently supported options are:

    - template : template to use for batch job submission
    - qsub_args : arguments to be prepended to the job execution script in the
                  qsub call

    c                s   d}d| _ d| _d}d}d|kr|d rd|d krB|d d | _ d|d kr\|d d | _d|d krt|d d }d|d kr|d d }t||| _tt| j|f| d S )	Nz
#$ -V
#$ -S /bin/sh
           r7   plugin_argsZretry_timeoutZ	max_triesZqstatProgramPathZqstatCachedProgramPath)_retry_timeout
_max_triesr6   _refQstatSubstitutesuperru   r"   )r   kwargstemplateZinstant_qstatZcached_qstat)	__class__r   r   r"     s     zSGEPlugin.__init__c             C   s   | j jt|S )N)rz   rp   r   )r   r@   r   r   r   _is_pending  s    zSGEPlugin._is_pendingc             C   s(  t dttjddd}tjj|}d}| jr2| j}d|jkrnd|jkr\|jd r\|jd }n|d|jd  7 }d	|krd
||f }d|krd||f }|jrdj	ttjd |j|j
f}ndj	ttjd |j
f}|jd}|j  dj	|}t|}d|||f |j_tj}tjtjd d}	t }
xy|j }
W nl tk
r } zN|	| jk rp|	d7 }	tj| j n&tj| tdj	d|j
 t|fW Y d d }~X nX P q.W tj| dd |
jjjdD }t t!j"d|d j# d }|j$ | j%|< | j&j'||j( t)j*d||j
|j(f  |S )NZqsubFZ	allatonce)environZresource_monitorZterminal_output Z	qsub_args	overwriter	   z-oz%s -o %sz-ez%s -e %s.LOGNAMEz%s -N %s %sCRITICALr   r   
z%Could not submit sge task for node %sc             S   s   g | ]}|r|qS r   r   ).0liner   r   r   
<listcomp>  s    z/SGEPlugin._submit_batchtask.<locals>.<listcomp>z'Your job ([0-9]*) .* has been submittedz*submitted sge task: %d for node %s with %srW   )+r   r<   rJ   r   pathdirnameZ
_qsub_argsrw   Z
_hierarchyjoin_idsplitreversert   inputsargsifloggerlevelsetLevelr   getLevelNamer:   rj   rg   ry   r   rO   rx   RuntimeErrorr   ZruntimerC   r   rematchgroups
output_dirZ_pendingrz   rA   Zcmdliner   r
   )r   Z
scriptfilenodecmdr   ZqsubargsZjobnameZjobnameitemsZoldleveltriesresultelinesr@   r   r   r   _submit_batchtask  sh    
 


 
zSGEPlugin._submit_batchtask)r2   r3   r4   r5   r"   r   r   __classcell__r   r   )r~   r   ru     s   
ru   )r5   rJ   rH   r   rF   r   Zxml.dom.minidomrc   randomr   r   Zinterfaces.baser   baser   r   	getLoggerr   r   r   uniformr   r   objectr   r6   rt   ru   r   r   r   r   <module>   s&   
V  