worker.py (3771B)
1 import logging 2 import tornado.websocket 3 4 from tornado.ioloop import IOLoop 5 from tornado.iostream import _ERRNO_CONNRESET 6 from tornado.util import errno_from_exception 7 8 9 BUF_SIZE = 32 * 1024 10 clients = {} # {ip: {id: worker}} 11 12 13 def clear_worker(worker, clients): 14 ip = worker.src_addr[0] 15 workers = clients.get(ip) 16 assert worker.id in workers 17 workers.pop(worker.id) 18 19 if not workers: 20 clients.pop(ip) 21 if not clients: 22 clients.clear() 23 24 25 def recycle_worker(worker): 26 if worker.handler: 27 return 28 logging.warning('Recycling worker {}'.format(worker.id)) 29 worker.close(reason='worker recycled') 30 31 32 class Worker(object): 33 def __init__(self, loop, ssh, chan, dst_addr): 34 self.loop = loop 35 self.ssh = ssh 36 self.chan = chan 37 self.dst_addr = dst_addr 38 self.fd = chan.fileno() 39 self.id = str(id(self)) 40 self.data_to_dst = [] 41 self.handler = None 42 self.mode = IOLoop.READ 43 self.closed = False 44 45 def __call__(self, fd, events): 46 if events & IOLoop.READ: 47 self.on_read() 48 if events & IOLoop.WRITE: 49 self.on_write() 50 if events & IOLoop.ERROR: 51 self.close(reason='error event occurred') 52 53 def set_handler(self, handler): 54 if not self.handler: 55 self.handler = handler 56 57 def update_handler(self, mode): 58 if self.mode != mode: 59 self.loop.update_handler(self.fd, mode) 60 self.mode = mode 61 if mode == IOLoop.WRITE: 62 self.loop.call_later(0.1, self, self.fd, IOLoop.WRITE) 63 64 def on_read(self): 65 logging.debug('worker {} on read'.format(self.id)) 66 try: 67 data = self.chan.recv(BUF_SIZE) 68 except (OSError, IOError) as e: 69 logging.error(e) 70 if self.chan.closed or errno_from_exception(e) in _ERRNO_CONNRESET: 71 self.close(reason='chan error on reading') 72 else: 73 logging.debug('{!r} from {}:{}'.format(data, *self.dst_addr)) 74 if not data: 75 self.close(reason='chan closed') 76 return 77 78 logging.debug('{!r} to {}:{}'.format(data, *self.handler.src_addr)) 79 try: 80 self.handler.write_message(data, binary=True) 81 except tornado.websocket.WebSocketClosedError: 82 self.close(reason='websocket closed') 83 84 def on_write(self): 85 logging.debug('worker {} on write'.format(self.id)) 86 if not self.data_to_dst: 87 return 88 89 data = ''.join(self.data_to_dst) 90 logging.debug('{!r} to {}:{}'.format(data, *self.dst_addr)) 91 92 try: 93 sent = self.chan.send(data) 94 except (OSError, IOError) as e: 95 logging.error(e) 96 if self.chan.closed or errno_from_exception(e) in _ERRNO_CONNRESET: 97 self.close(reason='chan error on writing') 98 else: 99 self.update_handler(IOLoop.WRITE) 100 else: 101 self.data_to_dst = [] 102 data = data[sent:] 103 if data: 104 self.data_to_dst.append(data) 105 self.update_handler(IOLoop.WRITE) 106 else: 107 self.update_handler(IOLoop.READ) 108 109 def close(self, reason=None): 110 if self.closed: 111 return 112 self.closed = True 113 114 logging.info( 115 'Closing worker {} with reason: {}'.format(self.id, reason) 116 ) 117 if self.handler: 118 self.loop.remove_handler(self.fd) 119 self.handler.close(reason=reason) 120 self.chan.close() 121 self.ssh.close() 122 logging.info('Connection to {}:{} lost'.format(*self.dst_addr)) 123 124 clear_worker(self, clients) 125 logging.debug(clients)