U
    e                     @   s   d dl mZ d dlZd dlZd dlZddlmZmZmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZmZ d	d
gZeeZG dd deZG dd
 d
eZG dd	 d	ejZdS )    )absolute_importN   )TTransportExceptionTTransportBaseTMemoryBuffer)BytesIO)deque)contextmanager)geniostreamioloop	tcpserver
concurrentTTornadoServerTTornadoStreamTransportc                   @   s>   e Zd Zdd Zdd Zejdd Zdd Ze	d	d
 Z
dS )_Lockc                 C   s   t  | _d S N)r   _waitersself r   /thrift/TTornado.py__init__&   s    z_Lock.__init__c                 C   s   t | jdkS )Nr   )lenr   r   r   r   r   acquired)   s    z_Lock.acquiredc                 c   sF   |   r| jd nd }t }| j| |r4|V  t|  d S )N)r   r   r   ZFutureappendr
   Return_lock_context)r   Zblockerfuturer   r   r   acquire,   s    z_Lock.acquirec                 C   s(   |   std| j }|d  d S )NzLock not aquired)r   AssertionErrorr   popleftZ
set_result)r   r   r   r   r   release6   s    
z_Lock.releasec                 c   s   z
d V  W 5 |    X d S r   )r#   r   r   r   r   r   ;   s    
z_Lock._lock_contextN)__name__
__module____qualname__r   r   r
   	coroutiner    r#   r	   r   r   r   r   r   r   %   s   
	r   c                   @   st   e Zd ZdZdddZdd ZejdddZd	d
 Z	dd Z
dd Zedd Zejdd Zdd Zdd ZdS )r   z2a framed, buffered transport over a Tornado streamNc                 C   s6   || _ || _|ptj | _t | _t | _	|| _
d S r   )hostportr   ZIOLoopcurrentio_loopr   _TTornadoStreamTransport__wbufr   
_read_lockstream)r   r(   r)   r.   r+   r   r   r   r   E   s    z TTornadoStreamTransport.__init__c                 C   s   t ||| jS r   )r
   with_timeoutr+   )r   timeoutr   r   r   r   r/   O   s    z$TTornadoStreamTransport.with_timeoutc              
   c   s   t d ttjtjd}t|| _z6| j| j	| j
f}|d k	rV| ||V  n|V  W nL tjttjfk
r } z$d| j	| j
|}ttj|dW 5 d }~X Y nX t| d S )Nzsocket connectingr   zcould not connect to {}:{} ({})typemessage)loggerdebugsocketZAF_INETZSOCK_STREAMr   ZIOStreamr.   connectr(   r)   r/   errorIOErrorr   TimeoutErrorformatr   ZNOT_OPENr
   r   )r   r0   Zsockr7   er3   r   r   r   openR   s    

zTTornadoStreamTransport.openc                 C   s   | j | dS )z<
        Should be called only after open() returns
        N)r.   set_close_callback)r   callbackr   r   r   r>   f   s    z*TTornadoStreamTransport.set_close_callbackc                 C   s   | j d  | j   d S r   )r.   r>   closer   r   r   r   r@   l   s    zTTornadoStreamTransport.closec                 C   s   dst dd S )NFzyou're doing it wrong)r!   )r   _r   r   r   readq   s    zTTornadoStreamTransport.readc              
   c   s~   z
d V  W nn t jtfk
rD } zttjt|dW 5 d }~X Y n6 tjk
rx } zttjt|dW 5 d }~X Y nX d S )Nr1   )	r6   r8   r9   r   END_OF_FILEstrr   ZStreamBufferFullErrorZUNKNOWN)r   r<   r   r   r   io_exception_contextv   s    
z,TTornadoStreamTransport.io_exception_contextc              
   c   sz   | j  V d |  P | jdV }t|dkr<tdt	d|\}| j|V }t
|W 5 Q R X W 5 Q R X d S )N   r   zRead zero bytes from stream!i)r-   r    rE   r.   Z
read_bytesr   r   ZStreamClosedErrorstructZunpackr
   r   )r   Zframe_headerframe_lengthframer   r   r   	readFrame   s    

z!TTornadoStreamTransport.readFramec                 C   s   | j | d S r   )r,   write)r   Zbufr   r   r   rL      s    zTTornadoStreamTransport.writec              
   C   sR   | j  }tdt|}t | _ |   | j|| W  5 Q R  S Q R X d S )NrG   )	r,   getvaluerH   Zpackr   r   rE   r.   rL   )r   rJ   rI   r   r   r   flush   s
    

zTTornadoStreamTransport.flush)NN)N)r$   r%   r&   __doc__r   r/   r
   r'   r=   r>   r@   rB   r	   rE   rK   rL   rN   r   r   r   r   r   C   s   



c                       s,   e Zd Zd fdd	Zejdd Z  ZS )r   Nc                    s4   t t| j|| || _|| _|d k	r*|n|| _d S r   )superr   r   
_processor_iprot_factory_oprot_factory)r   Z	processorZiprot_factoryZoprot_factoryargskwargs	__class__r   r   r      s    zTTornadoServer.__init__c              
   c   s   |d d \}}t |||| jd}| j|}z~|j sz| V }W n: tk
r } z|jtj	krpW Y 
qn W 5 d }~X Y nX t
|}	| j|	}
| j|
|V  q0W n& tk
r   td |  Y nX td|| d S )N   )r(   r)   r.   r+   z!thrift exception in handle_streamzclient disconnected %s:%d)r   r+   rS   ZgetProtocolr.   closedrK   r   r2   rC   r   rR   rQ   process	Exceptionr4   Z	exceptionr@   info)r   r.   Zaddressr(   r)   ZtransZoprotrJ   r<   ZtrZiprotr   r   r   handle_stream   s(    


zTTornadoServer.handle_stream)N)r$   r%   r&   r   r
   r'   r]   __classcell__r   r   rV   r   r      s   	)Z
__future__r   Zloggingr6   rH   Ztransport.TTransportr   r   r   ior   collectionsr   
contextlibr	   Ztornador
   r   r   r   r   __all__Z	getLoggerr$   r4   objectr   r   Z	TCPServerr   r   r   r   r   <module>   s   
X