U
    dRf%                     @   s  d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZ ddlmZ d	d
lmZ d	dlmZmZmZ d	dlmZmZ d	dlmZ zddlmZ W nL e k
r   zddl!mZ W n$ e k
r   e"d#eY nX Y nX e$e%Z&G dd deZ'G dd dej(Z)G dd deZ*e dde'j+fddZ,ej-dde dde'j.e'j+dfddZ/dS )a  
>>> pingpong = thriftpy2.load("pingpong.thrift")
>>>
>>> class Dispatcher(object):
>>>     def ping(self):
>>>         return "pong"

>>> server = make_server(pingpong.PingPong, Dispatcher())
>>> server.listen(6000)
>>> client = ioloop.IOLoop.current().run_sync(
    lambda: make_client(pingpong.PingPong, '127.0.0.1', 6000))
>>> ioloop.IOLoop.current().run_sync(client.ping)
'pong'
    )absolute_importN)contextmanager)	timedelta)BytesIO)geniostream	tcpserver)version   )TBinaryProtocolFactory)TApplicationExceptionTClient
TProcessor)TTransportBaseTTransportException)TMemoryBuffer)Lockz+With tornado {}, you need to install "toro"c                   @   s   e Zd ZdZeddZeddZdddefddZedkrDdd	 Z	nd
d	 Z	e
jefddZdd Zdd Zdd Zedd Ze
jdd Zdd Zdd Zdd ZdS )TTornadoStreamTransportz2a framed, buffered transport over a Tornado streamr
   )secondsNc                 C   sV   || _ || _|| _|| _d| _g | _t | _t | _	|| _
|| _| jd k	rR|   d S )NF)hostportio_loopread_timeoutZis_queuing_readsZ
read_queuer   _TTornadoStreamTransport__wbufr   
_read_lockssl_optionsstream_set_close_callback)selfr   r   r   r   r   r    r   5/tmp/pip-unpacked-wheel-jqs7l_7o/thriftpy2/tornado.py__init__6   s    
z TTornadoStreamTransport.__init__5.0c                 C   s   t ||S N)r   with_timeoutr   timeoutfuturer   r   r    r$   H   s    z$TTornadoStreamTransport.with_timeoutc                 C   s   t ||| jS r#   )r   r$   r   r%   r   r   r    r$   K   s    c              
   c   s   t d ttjtjd}| jd kr4t|| _ntj	|| jd| _z"| 
|| j| j| jfV  W n: tjttfk
r   d| j| j}ttj|dY nX |   t| d S )Nzsocket connectingr   )r   zcould not connect to {}:{}typemessage)loggerdebugsocketAF_INETSOCK_STREAMr   r   ZIOStreamr   ZSSLIOStreamr$   connectr   r   errorOSErrorIOErrorformatr   ZNOT_OPENr   r   Return)r   r&   sockr*   r   r   r    openN   s(    

 
zTTornadoStreamTransport.openc                 C   s   | j | j d S r#   r   Zset_close_callbackcloser   r   r   r    r   d   s    z+TTornadoStreamTransport._set_close_callbackc                 C   s   | j d  | j   d S r#   r8   r:   r   r   r    r9   g   s    zTTornadoStreamTransport.closec                 C   s   dst dd S )NFzyou're doing it wrong)AssertionError)r   _r   r   r    readl   s    zTTornadoStreamTransport.readc              
   c   s   z
d V  W n t jttfk
rF } zttjt|dW 5 d }~X Y nj tjk
rz } zttj	t|dW 5 d }~X Y n6 t
jk
r } zttjt|dW 5 d }~X Y nX d S )Nr(   )r-   r1   r2   r3   r   END_OF_FILEstrr   ZStreamBufferFullErrorUNKNOWNr   TimeoutErrorZ	TIMED_OUT)r   er   r   r    io_exception_contextq   s"    
z,TTornadoStreamTransport.io_exception_contextc              
   c   s   | j  V x |  d | dV }t|dkr:tdtd|\}t	
d| | |V }t	
d| t|W 5 Q R X W 5 Q R X d S )N   r   zRead zero bytes from stream!iz(received frame header, frame length = %dzreceived frame payload: %r)r   acquirerC   _read_byteslenr   ZStreamClosedErrorstructunpackr+   r,   r   r5   )r   Zframe_headerframe_lengthframer   r   r    
read_frame   s    
z"TTornadoStreamTransport.read_framec                 C   s   |  | j| j|S r#   )r$   r   r   
read_bytes)r   nr   r   r    rG      s    z#TTornadoStreamTransport._read_bytesc                 C   s   | j | d S r#   )r   write)r   bufr   r   r    rP      s    zTTornadoStreamTransport.writec              
   C   sR   | j  }tdt|}t | _ |   | j|| W  5 Q R  S Q R X d S )NrE   )	r   getvaluerI   packrH   r   rC   r   rP   )r   rL   rK   r   r   r    flush   s
    

zTTornadoStreamTransport.flush)__name__
__module____qualname____doc__r   DEFAULT_CONNECT_TIMEOUTDEFAULT_READ_TIMEOUTr!   tornado_versionr$   r   	coroutiner7   r   r9   r=   r   rC   rM   rG   rP   rT   r   r   r   r    r   1   s*   





r   c                       s2   e Zd Zdejf fdd	Zejdd Z  Z	S )TTornadoServerNc                    sH   t t| j|| || _|| _|d k	r*|n|| _|| _t| dd | _d S )Nr   )	superr]   r!   
_processor_iprot_factory_oprot_factorytransport_read_timeoutgetattr_TTornadoServer__io_loop)r   	processoriprot_factoryZoprot_factoryrb   argskwargs	__class__r   r    r!      s    zTTornadoServer.__init__c              
   c   sp  |\}}t |||| j| jd}z| j|}| jt }|j s2z|	 V }W n< t
k
r }	 z|	jt
jkrW Y q2n W 5 d }	~	X Y nX |j| | j|\}
}}}t|tr| j||
|| q<zt| V |_W n6 tk
r }	 z| j|	|s W 5 d }	~	X Y nX | j||
|| q<W n( tk
r\   td |  Y nX td|| d S )N)r   r   r   r   r   z!thrift exception in handle_streamzclient disconnected %s:%d)r   rd   rb   ra   get_protocolr`   r   r   closedrM   r   r)   r>   transsetvaluer_   Z
process_in
isinstancer   Zsend_exceptionr   Zmaybe_futuresuccess	ExceptionZhandle_exceptionZsend_resultr+   	exceptionr9   info)r   r   addressr   r   rm   oprotiprotrL   rB   apiZseqidresultcallr   r   r    handle_stream   s@       

zTTornadoServer.handle_stream)
rU   rV   rW   r   rZ   r!   r   r\   rz   __classcell__r   r   ri   r    r]      s
   r]   c                       s*   e Zd Zej fddZdd Z  ZS )TTornadoClientc                 #   s:   | j j V }| jj| tt| |}t	|d S r#   )
_oprotrm   rM   Z_iprotrn   r^   r|   _recvr   r5   )r   rw   rL   rx   ri   r   r    r~      s    zTTornadoClient._recvc                 C   s   | j j  d S r#   )r}   rm   r9   r:   r   r   r    r9      s    zTTornadoClient.close)rU   rV   rW   r   r\   r~   r9   r{   r   r   ri   r    r|      s   r|   c                 C   s:   t | |}tdkr$t||||d}nt|||||d}|S )Nr"   )rf   rb   r   )rf   rb   r   r   )r   r[   r]   )servicehandlerproto_factoryr   r   rb   re   serverr   r   r    make_server   s    
 r   	localhosti#   c	                 c   sr   |r$t j|}	|	jp|}|	jp"|}t|||||d}
|t }||
}|
|V  t	| ||}t
|d S )N)r   r   r   )urllibparseurlparsehostnamer   r   rk   r   r7   r|   r   r5   )r   r   r   r   r   r   connect_timeoutr   url
parsed_url	transportrv   ru   clientr   r   r    make_client   s     	

   
r   )0rX   
__future__r   loggingr-   rI   r   
contextlibr   datetimer   ior   Ztornador   r   r   r	   r[   Zprotocol.binaryr   Zthriftr   r   r   r   r   r   Ztransport.memoryr   Ztornado.locksr   ImportErrorZtoroRuntimeErrorr4   	getLoggerrU   r+   r   	TCPServerr]   r|   rZ   r   r\   rY   r   r   r   r   r    <module>   sT   
p8 
 