U
    h                     @   s   d dl mZ d dlmZ d dlmZ d dlmZmZ d dlm	Z	m
Z
mZ d dlmZmZmZ d dlmZ zd dlZW n ek
r   d dlZY nX zd d	lmZ W n ek
r   ed
Y nX G dd de	ZdS )    )absolute_import)datetime)utc)NoNodeErrorNodeExistsError)BaseJobStoreJobLookupErrorConflictingIdError)	maybe_refdatetime_to_utc_timestamputc_timestamp_to_datetime)JobN)KazooClientz*ZooKeeperJobStore requires Kazoo installedc                       s   e Zd ZdZdddejf fdd	Zdd Z fd	d
Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Z  ZS )#ZooKeeperJobStorea  
    Stores jobs in a ZooKeeper tree. Any leftover keyword arguments are directly passed to
    kazoo's `KazooClient
    <http://kazoo.readthedocs.io/en/latest/api/client.html>`_.

    Plugin alias: ``zookeeper``

    :param str path: path to store jobs in
    :param client: a :class:`~kazoo.client.KazooClient` instance to use instead of
        providing connection arguments
    :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
        highest available
    z/apschedulerNFc                    sR   t t|   || _|| _|s&td|| _|r<t|| _nt	f || _d| _
d S )Nz&The "path" parameter must not be emptyF)superr   __init__pickle_protocolclose_connection_on_exit
ValueErrorpathr
   clientr   _ensured_path)selfr   r   r   r   Zconnect_args	__class__ C/tmp/pip-unpacked-wheel-ehb4gh6l/apscheduler/jobstores/zookeeper.pyr   &   s    zZooKeeperJobStore.__init__c                 C   s   | j s| j| j d| _ d S )NT)r   r   Zensure_pathr   r   r   r   r   _ensure_paths7   s    zZooKeeperJobStore._ensure_pathsc                    s(   t t| || | jjs$| j  d S N)r   r   startr   	connected)r   Z	scheduleraliasr   r   r   r    <   s    zZooKeeperJobStore.startc                 C   sd   |    | jd t| }z.| j|\}}t|}| |d }|W S  tk
r^   Y d S X d S )N/	job_state)	r   r   strr   getpickleloads_reconstitute_jobBaseException)r   job_id	node_pathcontent_docjobr   r   r   
lookup_jobA   s    
zZooKeeperJobStore.lookup_jobc                    s"   t |  fdd|  D }|S )Nc                    s,   g | ]$}|d  dk	r|d   kr|d qS )next_run_timeNr0   r   .0job_def	timestampr   r   
<listcomp>N   s     z2ZooKeeperJobStore.get_due_jobs.<locals>.<listcomp>)r   	_get_jobs)r   nowjobsr   r6   r   get_due_jobsL   s    zZooKeeperJobStore.get_due_jobsc                 C   s.   dd |   D }t|dkr*tt|S d S )Nc                 S   s    g | ]}|d  dk	r|d  qS )r2   Nr   r3   r   r   r   r8   S   s    z7ZooKeeperJobStore.get_next_run_time.<locals>.<listcomp>r   )r9   lenr   min)r   Z	next_runsr   r   r   get_next_run_timeR   s    z#ZooKeeperJobStore.get_next_run_timec                 C   s    dd |   D }| | |S )Nc                 S   s   g | ]}|d  qS )r0   r   r3   r   r   r   r8   X   s     z2ZooKeeperJobStore.get_all_jobs.<locals>.<listcomp>)r9   Z_fix_paused_jobs_sorting)r   r;   r   r   r   get_all_jobsW   s    
zZooKeeperJobStore.get_all_jobsc                 C   sv   |    | jd t|j }t|j| d}t|| j	}z| j
j||d W n tk
rp   t|jY nX d S Nr#   )r2   r$   )value)r   r   r%   idr   r2   __getstate__r'   dumpsr   r   creater   r	   )r   r0   r,   rB   datar   r   r   add_job\   s    zZooKeeperJobStore.add_jobc                 C   sv   |    | jd t|j }t|j| d}t|| j	}z| j
j||d W n tk
rp   t|jY nX d S rA   )r   r   r%   rC   r   r2   rD   r'   rE   r   r   setr   r   )r   r0   r,   changesrG   r   r   r   
update_jobi   s    zZooKeeperJobStore.update_jobc                 C   sL   |    | jd t| }z| j| W n tk
rF   t|Y nX d S )Nr#   )r   r   r%   r   deleter   r   )r   r+   r,   r   r   r   
remove_jobv   s    zZooKeeperJobStore.remove_jobc                 C   s6   z| j j| jdd W n tk
r*   Y nX d| _d S )NT)	recursiveF)r   rL   r   r   r   r   r   r   r   remove_all_jobs~   s
    z!ZooKeeperJobStore.remove_all_jobsc                 C   s   | j r| j  | j  d S r   )r   r   stopcloser   r   r   r   shutdown   s    
zZooKeeperJobStore.shutdownc                 C   s,   |}t t }|| | j|_| j|_|S r   )r   __new____setstate__Z
_scheduler_aliasZ_jobstore_alias)r   r$   r0   r   r   r   r)      s    

z#ZooKeeperJobStore._reconstitute_jobc              	      s   |    g }g }| j| j}|D ]}zf| jd | }| j|\}}t|}||d rb|d nd |d | |d |jd}	|	|	 W q" t
k
r   | jd|  |	| Y q"X q"|r|D ]}
| |
 qtdddtd	 t| fd
ddS )Nr#   r2   r$   )r+   r2   r$   r0   creation_timez)Unable to restore job "%s" -- removing iti'        )tzinfoc                    s   | d j p | d fS )Nr0   rV   )r2   )r5   Zpaused_sort_keyr   r   <lambda>   s   z-ZooKeeperJobStore._get_jobs.<locals>.<lambda>)key)r   r   Zget_childrenr   r&   r'   r(   r)   ctimeappendr*   _logger	exceptionrM   r   r   sorted)r   r;   Zfailed_job_idsZall_idsZ	node_namer,   r-   r.   r/   r5   Z	failed_idr   rZ   r   r9      s0    
zZooKeeperJobStore._get_jobsc                 C   s,   | j d| jj| jf  d| jj| jf S )Nz<%s (client=%s)>)r_   r`   r   __name__r   r   r   r   r   __repr__   s    zZooKeeperJobStore.__repr__)rb   
__module____qualname____doc__r'   HIGHEST_PROTOCOLr   r   r    r1   r<   r?   r@   rH   rK   rM   rO   rR   r)   r9   rc   __classcell__r   r   r   r   r      s$   r   )
__future__r   r   Zpytzr   Zkazoo.exceptionsr   r   Zapscheduler.jobstores.baser   r   r	   Zapscheduler.utilr
   r   r   Zapscheduler.jobr   cPickler'   ImportErrorZkazoo.clientr   r   r   r   r   r   <module>   s   