webssh

Web based ssh client https://github.com/huashengdun/webssh webssh.huashengdun.org/
git clone http://git.hanabi.in/repos/webssh.git
Log | Files | Refs | README | LICENSE

worker.py (3771B)


      1 import logging
      2 import tornado.websocket
      3 
      4 from tornado.ioloop import IOLoop
      5 from tornado.iostream import _ERRNO_CONNRESET
      6 from tornado.util import errno_from_exception
      7 
      8 
      9 BUF_SIZE = 32 * 1024
     10 clients = {}  # {ip: {id: worker}}
     11 
     12 
     13 def clear_worker(worker, clients):
     14     ip = worker.src_addr[0]
     15     workers = clients.get(ip)
     16     assert worker.id in workers
     17     workers.pop(worker.id)
     18 
     19     if not workers:
     20         clients.pop(ip)
     21         if not clients:
     22             clients.clear()
     23 
     24 
     25 def recycle_worker(worker):
     26     if worker.handler:
     27         return
     28     logging.warning('Recycling worker {}'.format(worker.id))
     29     worker.close(reason='worker recycled')
     30 
     31 
     32 class Worker(object):
     33     def __init__(self, loop, ssh, chan, dst_addr):
     34         self.loop = loop
     35         self.ssh = ssh
     36         self.chan = chan
     37         self.dst_addr = dst_addr
     38         self.fd = chan.fileno()
     39         self.id = str(id(self))
     40         self.data_to_dst = []
     41         self.handler = None
     42         self.mode = IOLoop.READ
     43         self.closed = False
     44 
     45     def __call__(self, fd, events):
     46         if events & IOLoop.READ:
     47             self.on_read()
     48         if events & IOLoop.WRITE:
     49             self.on_write()
     50         if events & IOLoop.ERROR:
     51             self.close(reason='error event occurred')
     52 
     53     def set_handler(self, handler):
     54         if not self.handler:
     55             self.handler = handler
     56 
     57     def update_handler(self, mode):
     58         if self.mode != mode:
     59             self.loop.update_handler(self.fd, mode)
     60             self.mode = mode
     61         if mode == IOLoop.WRITE:
     62             self.loop.call_later(0.1, self, self.fd, IOLoop.WRITE)
     63 
     64     def on_read(self):
     65         logging.debug('worker {} on read'.format(self.id))
     66         try:
     67             data = self.chan.recv(BUF_SIZE)
     68         except (OSError, IOError) as e:
     69             logging.error(e)
     70             if self.chan.closed or errno_from_exception(e) in _ERRNO_CONNRESET:
     71                 self.close(reason='chan error on reading')
     72         else:
     73             logging.debug('{!r} from {}:{}'.format(data, *self.dst_addr))
     74             if not data:
     75                 self.close(reason='chan closed')
     76                 return
     77 
     78             logging.debug('{!r} to {}:{}'.format(data, *self.handler.src_addr))
     79             try:
     80                 self.handler.write_message(data, binary=True)
     81             except tornado.websocket.WebSocketClosedError:
     82                 self.close(reason='websocket closed')
     83 
     84     def on_write(self):
     85         logging.debug('worker {} on write'.format(self.id))
     86         if not self.data_to_dst:
     87             return
     88 
     89         data = ''.join(self.data_to_dst)
     90         logging.debug('{!r} to {}:{}'.format(data, *self.dst_addr))
     91 
     92         try:
     93             sent = self.chan.send(data)
     94         except (OSError, IOError) as e:
     95             logging.error(e)
     96             if self.chan.closed or errno_from_exception(e) in _ERRNO_CONNRESET:
     97                 self.close(reason='chan error on writing')
     98             else:
     99                 self.update_handler(IOLoop.WRITE)
    100         else:
    101             self.data_to_dst = []
    102             data = data[sent:]
    103             if data:
    104                 self.data_to_dst.append(data)
    105                 self.update_handler(IOLoop.WRITE)
    106             else:
    107                 self.update_handler(IOLoop.READ)
    108 
    109     def close(self, reason=None):
    110         if self.closed:
    111             return
    112         self.closed = True
    113 
    114         logging.info(
    115             'Closing worker {} with reason: {}'.format(self.id, reason)
    116         )
    117         if self.handler:
    118             self.loop.remove_handler(self.fd)
    119             self.handler.close(reason=reason)
    120         self.chan.close()
    121         self.ssh.close()
    122         logging.info('Connection to {}:{} lost'.format(*self.dst_addr))
    123 
    124         clear_worker(self, clients)
    125         logging.debug(clients)