webssh

Web based ssh client https://github.com/huashengdun/webssh webssh.huashengdun.org/
git clone http://git.hanabi.in/repos/webssh.git
Log | Files | Refs | README | LICENSE

test_app.py (30545B)


      1 import json
      2 import random
      3 import threading
      4 import tornado.websocket
      5 import tornado.gen
      6 
      7 from tornado.testing import AsyncHTTPTestCase
      8 from tornado.httpclient import HTTPError
      9 from tornado.options import options
     10 from tests.sshserver import run_ssh_server, banner, Server
     11 from tests.utils import encode_multipart_formdata, read_file, make_tests_data_path  # noqa
     12 from webssh import handler
     13 from webssh.main import make_app, make_handlers
     14 from webssh.settings import (
     15     get_app_settings, get_server_settings, max_body_size
     16 )
     17 from webssh.utils import to_str
     18 from webssh.worker import clients
     19 
     20 try:
     21     from urllib.parse import urlencode
     22 except ImportError:
     23     from urllib import urlencode
     24 
     25 
     26 swallow_http_errors = handler.swallow_http_errors
     27 server_encodings = {e.strip() for e in Server.encodings}
     28 
     29 
     30 class TestAppBase(AsyncHTTPTestCase):
     31 
     32     def get_httpserver_options(self):
     33         return get_server_settings(options)
     34 
     35     def assert_response(self, bstr, response):
     36         if swallow_http_errors:
     37             self.assertEqual(response.code, 200)
     38             self.assertIn(bstr, response.body)
     39         else:
     40             self.assertEqual(response.code, 400)
     41             self.assertIn(b'Bad Request', response.body)
     42 
     43     def assert_status_in(self, status, data):
     44         self.assertIsNone(data['encoding'])
     45         self.assertIsNone(data['id'])
     46         self.assertIn(status, data['status'])
     47 
     48     def assert_status_equal(self, status, data):
     49         self.assertIsNone(data['encoding'])
     50         self.assertIsNone(data['id'])
     51         self.assertEqual(status, data['status'])
     52 
     53     def assert_status_none(self, data):
     54         self.assertIsNotNone(data['encoding'])
     55         self.assertIsNotNone(data['id'])
     56         self.assertIsNone(data['status'])
     57 
     58     def fetch_request(self, url, method='GET', body='', headers={}, sync=True):
     59         if not sync and url.startswith('/'):
     60             url = self.get_url(url)
     61 
     62         if isinstance(body, dict):
     63             body = urlencode(body)
     64 
     65         if not headers:
     66             headers = self.headers
     67         else:
     68             headers.update(self.headers)
     69 
     70         client = self if sync else self.get_http_client()
     71         return client.fetch(url, method=method, body=body, headers=headers)
     72 
     73     def sync_post(self, url, body, headers={}):
     74         return self.fetch_request(url, 'POST', body, headers)
     75 
     76     def async_post(self, url, body, headers={}):
     77         return self.fetch_request(url, 'POST', body, headers, sync=False)
     78 
     79 
     80 class TestAppBasic(TestAppBase):
     81 
     82     running = [True]
     83     sshserver_port = 2200
     84     body = 'hostname=127.0.0.1&port={}&_xsrf=yummy&username=robey&password=foo'.format(sshserver_port) # noqa
     85     headers = {'Cookie': '_xsrf=yummy'}
     86 
     87     def get_app(self):
     88         self.body_dict = {
     89             'hostname': '127.0.0.1',
     90             'port': str(self.sshserver_port),
     91             'username': 'robey',
     92             'password': '',
     93             '_xsrf': 'yummy'
     94         }
     95         loop = self.io_loop
     96         options.debug = False
     97         options.policy = random.choice(['warning', 'autoadd'])
     98         options.hostfile = ''
     99         options.syshostfile = ''
    100         options.tdstream = ''
    101         options.delay = 0.1
    102         app = make_app(make_handlers(loop, options), get_app_settings(options))
    103         return app
    104 
    105     @classmethod
    106     def setUpClass(cls):
    107         print('='*20)
    108         t = threading.Thread(
    109             target=run_ssh_server, args=(cls.sshserver_port, cls.running)
    110         )
    111         t.setDaemon(True)
    112         t.start()
    113 
    114     @classmethod
    115     def tearDownClass(cls):
    116         cls.running.pop()
    117         print('='*20)
    118 
    119     def test_app_with_invalid_form_for_missing_argument(self):
    120         response = self.fetch('/')
    121         self.assertEqual(response.code, 200)
    122 
    123         body = 'port=7000&username=admin&password&_xsrf=yummy'
    124         response = self.sync_post('/', body)
    125         self.assert_response(b'Missing argument hostname', response)
    126 
    127         body = 'hostname=127.0.0.1&port=7000&password&_xsrf=yummy'
    128         response = self.sync_post('/', body)
    129         self.assert_response(b'Missing argument username', response)
    130 
    131         body = 'hostname=&port=&username=&password&_xsrf=yummy'
    132         response = self.sync_post('/', body)
    133         self.assert_response(b'Missing value hostname', response)
    134 
    135         body = 'hostname=127.0.0.1&port=7000&username=&password&_xsrf=yummy'
    136         response = self.sync_post('/', body)
    137         self.assert_response(b'Missing value username', response)
    138 
    139     def test_app_with_invalid_form_for_invalid_value(self):
    140         body = 'hostname=127.0.0&port=22&username=&password&_xsrf=yummy'
    141         response = self.sync_post('/', body)
    142         self.assert_response(b'Invalid hostname', response)
    143 
    144         body = 'hostname=http://www.googe.com&port=22&username=&password&_xsrf=yummy'  # noqa
    145         response = self.sync_post('/', body)
    146         self.assert_response(b'Invalid hostname', response)
    147 
    148         body = 'hostname=127.0.0.1&port=port&username=&password&_xsrf=yummy'
    149         response = self.sync_post('/', body)
    150         self.assert_response(b'Invalid port', response)
    151 
    152         body = 'hostname=127.0.0.1&port=70000&username=&password&_xsrf=yummy'
    153         response = self.sync_post('/', body)
    154         self.assert_response(b'Invalid port', response)
    155 
    156     def test_app_with_wrong_hostname_ip(self):
    157         body = 'hostname=127.0.0.2&port=2200&username=admin&_xsrf=yummy'
    158         response = self.sync_post('/', body)
    159         self.assertEqual(response.code, 200)
    160         self.assertIn(b'Unable to connect to', response.body)
    161 
    162     def test_app_with_wrong_hostname_domain(self):
    163         body = 'hostname=xxxxxxxxxxxx&port=2200&username=admin&_xsrf=yummy'
    164         response = self.sync_post('/', body)
    165         self.assertEqual(response.code, 200)
    166         self.assertIn(b'Unable to connect to', response.body)
    167 
    168     def test_app_with_wrong_port(self):
    169         body = 'hostname=127.0.0.1&port=7000&username=admin&_xsrf=yummy'
    170         response = self.sync_post('/', body)
    171         self.assertEqual(response.code, 200)
    172         self.assertIn(b'Unable to connect to', response.body)
    173 
    174     def test_app_with_wrong_credentials(self):
    175         response = self.sync_post('/', self.body + 's')
    176         self.assert_status_in('Authentication failed.', json.loads(to_str(response.body))) # noqa
    177 
    178     def test_app_with_correct_credentials(self):
    179         response = self.sync_post('/', self.body)
    180         self.assert_status_none(json.loads(to_str(response.body)))
    181 
    182     def test_app_with_correct_credentials_but_with_no_port(self):
    183         default_port = handler.DEFAULT_PORT
    184         handler.DEFAULT_PORT = self.sshserver_port
    185 
    186         # with no port value
    187         body = self.body.replace(str(self.sshserver_port), '')
    188         response = self.sync_post('/', body)
    189         self.assert_status_none(json.loads(to_str(response.body)))
    190 
    191         # with no port argument
    192         body = body.replace('port=&', '')
    193         response = self.sync_post('/', body)
    194         self.assert_status_none(json.loads(to_str(response.body)))
    195 
    196         handler.DEFAULT_PORT = default_port
    197 
    198     @tornado.testing.gen_test
    199     def test_app_with_correct_credentials_timeout(self):
    200         url = self.get_url('/')
    201         response = yield self.async_post(url, self.body)
    202         data = json.loads(to_str(response.body))
    203         self.assert_status_none(data)
    204 
    205         url = url.replace('http', 'ws')
    206         ws_url = url + 'ws?id=' + data['id']
    207         yield tornado.gen.sleep(options.delay + 0.1)
    208         ws = yield tornado.websocket.websocket_connect(ws_url)
    209         msg = yield ws.read_message()
    210         self.assertIsNone(msg)
    211         self.assertEqual(ws.close_reason, 'Websocket authentication failed.')
    212 
    213     @tornado.testing.gen_test
    214     def test_app_with_correct_credentials_but_ip_not_matched(self):
    215         url = self.get_url('/')
    216         response = yield self.async_post(url, self.body)
    217         data = json.loads(to_str(response.body))
    218         self.assert_status_none(data)
    219 
    220         clients = handler.clients
    221         handler.clients = {}
    222         url = url.replace('http', 'ws')
    223         ws_url = url + 'ws?id=' + data['id']
    224         ws = yield tornado.websocket.websocket_connect(ws_url)
    225         msg = yield ws.read_message()
    226         self.assertIsNone(msg)
    227         self.assertEqual(ws.close_reason, 'Websocket authentication failed.')
    228         handler.clients = clients
    229 
    230     @tornado.testing.gen_test
    231     def test_app_with_correct_credentials_user_robey(self):
    232         url = self.get_url('/')
    233         response = yield self.async_post(url, self.body)
    234         data = json.loads(to_str(response.body))
    235         self.assert_status_none(data)
    236 
    237         url = url.replace('http', 'ws')
    238         ws_url = url + 'ws?id=' + data['id']
    239         ws = yield tornado.websocket.websocket_connect(ws_url)
    240         msg = yield ws.read_message()
    241         self.assertEqual(to_str(msg, data['encoding']), banner)
    242         ws.close()
    243 
    244     @tornado.testing.gen_test
    245     def test_app_with_correct_credentials_but_without_id_argument(self):
    246         url = self.get_url('/')
    247         response = yield self.async_post(url, self.body)
    248         data = json.loads(to_str(response.body))
    249         self.assert_status_none(data)
    250 
    251         url = url.replace('http', 'ws')
    252         ws_url = url + 'ws'
    253         ws = yield tornado.websocket.websocket_connect(ws_url)
    254         msg = yield ws.read_message()
    255         self.assertIsNone(msg)
    256         self.assertIn('Missing argument id', ws.close_reason)
    257 
    258     @tornado.testing.gen_test
    259     def test_app_with_correct_credentials_but_empty_id(self):
    260         url = self.get_url('/')
    261         response = yield self.async_post(url, self.body)
    262         data = json.loads(to_str(response.body))
    263         self.assert_status_none(data)
    264 
    265         url = url.replace('http', 'ws')
    266         ws_url = url + 'ws?id='
    267         ws = yield tornado.websocket.websocket_connect(ws_url)
    268         msg = yield ws.read_message()
    269         self.assertIsNone(msg)
    270         self.assertIn('Missing value id', ws.close_reason)
    271 
    272     @tornado.testing.gen_test
    273     def test_app_with_correct_credentials_but_wrong_id(self):
    274         url = self.get_url('/')
    275         response = yield self.async_post(url, self.body)
    276         data = json.loads(to_str(response.body))
    277         self.assert_status_none(data)
    278 
    279         url = url.replace('http', 'ws')
    280         ws_url = url + 'ws?id=1' + data['id']
    281         ws = yield tornado.websocket.websocket_connect(ws_url)
    282         msg = yield ws.read_message()
    283         self.assertIsNone(msg)
    284         self.assertIn('Websocket authentication failed', ws.close_reason)
    285 
    286     @tornado.testing.gen_test
    287     def test_app_with_correct_credentials_user_bar(self):
    288         body = self.body.replace('robey', 'bar')
    289         url = self.get_url('/')
    290         response = yield self.async_post(url, body)
    291         data = json.loads(to_str(response.body))
    292         self.assert_status_none(data)
    293 
    294         url = url.replace('http', 'ws')
    295         ws_url = url + 'ws?id=' + data['id']
    296         ws = yield tornado.websocket.websocket_connect(ws_url)
    297         msg = yield ws.read_message()
    298         self.assertEqual(to_str(msg, data['encoding']), banner)
    299 
    300         # messages below will be ignored silently
    301         yield ws.write_message('hello')
    302         yield ws.write_message('"hello"')
    303         yield ws.write_message('[hello]')
    304         yield ws.write_message(json.dumps({'resize': []}))
    305         yield ws.write_message(json.dumps({'resize': {}}))
    306         yield ws.write_message(json.dumps({'resize': 'ab'}))
    307         yield ws.write_message(json.dumps({'resize': ['a', 'b']}))
    308         yield ws.write_message(json.dumps({'resize': {'a': 1, 'b': 2}}))
    309         yield ws.write_message(json.dumps({'resize': [100]}))
    310         yield ws.write_message(json.dumps({'resize': [100]*10}))
    311         yield ws.write_message(json.dumps({'resize': [-1, -1]}))
    312         yield ws.write_message(json.dumps({'data': [1]}))
    313         yield ws.write_message(json.dumps({'data': (1,)}))
    314         yield ws.write_message(json.dumps({'data': {'a': 2}}))
    315         yield ws.write_message(json.dumps({'data': 1}))
    316         yield ws.write_message(json.dumps({'data': 2.1}))
    317         yield ws.write_message(json.dumps({'key-non-existed': 'hello'}))
    318         # end - those just for testing webssh websocket stablity
    319 
    320         yield ws.write_message(json.dumps({'resize': [79, 23]}))
    321         msg = yield ws.read_message()
    322         self.assertEqual(b'resized', msg)
    323 
    324         yield ws.write_message(json.dumps({'data': 'bye'}))
    325         msg = yield ws.read_message()
    326         self.assertEqual(b'bye', msg)
    327         ws.close()
    328 
    329     @tornado.testing.gen_test
    330     def test_app_auth_with_valid_pubkey_by_urlencoded_form(self):
    331         url = self.get_url('/')
    332         privatekey = read_file(make_tests_data_path('user_rsa_key'))
    333         self.body_dict.update(privatekey=privatekey)
    334         response = yield self.async_post(url, self.body_dict)
    335         data = json.loads(to_str(response.body))
    336         self.assert_status_none(data)
    337 
    338         url = url.replace('http', 'ws')
    339         ws_url = url + 'ws?id=' + data['id']
    340         ws = yield tornado.websocket.websocket_connect(ws_url)
    341         msg = yield ws.read_message()
    342         self.assertEqual(to_str(msg, data['encoding']), banner)
    343         ws.close()
    344 
    345     @tornado.testing.gen_test
    346     def test_app_auth_with_valid_pubkey_by_multipart_form(self):
    347         url = self.get_url('/')
    348         privatekey = read_file(make_tests_data_path('user_rsa_key'))
    349         files = [('privatekey', 'user_rsa_key', privatekey)]
    350         content_type, body = encode_multipart_formdata(self.body_dict.items(),
    351                                                        files)
    352         headers = {
    353             'Content-Type': content_type, 'content-length': str(len(body))
    354         }
    355         response = yield self.async_post(url, body, headers=headers)
    356         data = json.loads(to_str(response.body))
    357         self.assert_status_none(data)
    358 
    359         url = url.replace('http', 'ws')
    360         ws_url = url + 'ws?id=' + data['id']
    361         ws = yield tornado.websocket.websocket_connect(ws_url)
    362         msg = yield ws.read_message()
    363         self.assertEqual(to_str(msg, data['encoding']), banner)
    364         ws.close()
    365 
    366     @tornado.testing.gen_test
    367     def test_app_auth_with_invalid_pubkey_for_user_robey(self):
    368         url = self.get_url('/')
    369         privatekey = 'h' * 1024
    370         files = [('privatekey', 'user_rsa_key', privatekey)]
    371         content_type, body = encode_multipart_formdata(self.body_dict.items(),
    372                                                        files)
    373         headers = {
    374             'Content-Type': content_type, 'content-length': str(len(body))
    375         }
    376 
    377         if swallow_http_errors:
    378             response = yield self.async_post(url, body, headers=headers)
    379             self.assertIn(b'Invalid key', response.body)
    380         else:
    381             with self.assertRaises(HTTPError) as ctx:
    382                 yield self.async_post(url, body, headers=headers)
    383             self.assertIn('Bad Request', ctx.exception.message)
    384 
    385     @tornado.testing.gen_test
    386     def test_app_auth_with_pubkey_exceeds_key_max_size(self):
    387         url = self.get_url('/')
    388         privatekey = 'h' * (handler.PrivateKey.max_length + 1)
    389         files = [('privatekey', 'user_rsa_key', privatekey)]
    390         content_type, body = encode_multipart_formdata(self.body_dict.items(),
    391                                                        files)
    392         headers = {
    393             'Content-Type': content_type, 'content-length': str(len(body))
    394         }
    395         if swallow_http_errors:
    396             response = yield self.async_post(url, body, headers=headers)
    397             self.assertIn(b'Invalid key', response.body)
    398         else:
    399             with self.assertRaises(HTTPError) as ctx:
    400                 yield self.async_post(url, body, headers=headers)
    401             self.assertIn('Bad Request', ctx.exception.message)
    402 
    403     @tornado.testing.gen_test
    404     def test_app_auth_with_pubkey_cannot_be_decoded_by_multipart_form(self):
    405         url = self.get_url('/')
    406         privatekey = 'h' * 1024
    407         files = [('privatekey', 'user_rsa_key', privatekey)]
    408         content_type, body = encode_multipart_formdata(self.body_dict.items(),
    409                                                        files)
    410         body = body.encode('utf-8')
    411         # added some gbk bytes to the privatekey, make it cannot be decoded
    412         body = body[:-100] + b'\xb4\xed\xce\xf3' + body[-100:]
    413         headers = {
    414             'Content-Type': content_type, 'content-length': str(len(body))
    415         }
    416         if swallow_http_errors:
    417             response = yield self.async_post(url, body, headers=headers)
    418             self.assertIn(b'Invalid unicode', response.body)
    419         else:
    420             with self.assertRaises(HTTPError) as ctx:
    421                 yield self.async_post(url, body, headers=headers)
    422             self.assertIn('Bad Request', ctx.exception.message)
    423 
    424     def test_app_post_form_with_large_body_size_by_multipart_form(self):
    425         privatekey = 'h' * (2 * max_body_size)
    426         files = [('privatekey', 'user_rsa_key', privatekey)]
    427         content_type, body = encode_multipart_formdata(self.body_dict.items(),
    428                                                        files)
    429         headers = {
    430             'Content-Type': content_type, 'content-length': str(len(body))
    431         }
    432         response = self.sync_post('/', body, headers=headers)
    433         self.assertIn(response.code, [400, 599])
    434 
    435     def test_app_post_form_with_large_body_size_by_urlencoded_form(self):
    436         privatekey = 'h' * (2 * max_body_size)
    437         body = self.body + '&privatekey=' + privatekey
    438         response = self.sync_post('/', body)
    439         self.assertIn(response.code, [400, 599])
    440 
    441     @tornado.testing.gen_test
    442     def test_app_with_user_keyonly_for_bad_authentication_type(self):
    443         self.body_dict.update(username='keyonly', password='foo')
    444         response = yield self.async_post('/', self.body_dict)
    445         self.assertEqual(response.code, 200)
    446         self.assert_status_in('Bad authentication type', json.loads(to_str(response.body))) # noqa
    447 
    448     @tornado.testing.gen_test
    449     def test_app_with_user_pass2fa_with_correct_passwords(self):
    450         self.body_dict.update(username='pass2fa', password='password',
    451                               totp='passcode')
    452         response = yield self.async_post('/', self.body_dict)
    453         self.assertEqual(response.code, 200)
    454         data = json.loads(to_str(response.body))
    455         self.assert_status_none(data)
    456 
    457     @tornado.testing.gen_test
    458     def test_app_with_user_pass2fa_with_wrong_pkey_correct_passwords(self):
    459         url = self.get_url('/')
    460         privatekey = read_file(make_tests_data_path('user_rsa_key'))
    461         self.body_dict.update(username='pass2fa', password='password',
    462                               privatekey=privatekey, totp='passcode')
    463         response = yield self.async_post(url, self.body_dict)
    464         data = json.loads(to_str(response.body))
    465         self.assert_status_none(data)
    466 
    467     @tornado.testing.gen_test
    468     def test_app_with_user_pkey2fa_with_correct_passwords(self):
    469         url = self.get_url('/')
    470         privatekey = read_file(make_tests_data_path('user_rsa_key'))
    471         self.body_dict.update(username='pkey2fa', password='password',
    472                               privatekey=privatekey, totp='passcode')
    473         response = yield self.async_post(url, self.body_dict)
    474         data = json.loads(to_str(response.body))
    475         self.assert_status_none(data)
    476 
    477     @tornado.testing.gen_test
    478     def test_app_with_user_pkey2fa_with_wrong_password(self):
    479         url = self.get_url('/')
    480         privatekey = read_file(make_tests_data_path('user_rsa_key'))
    481         self.body_dict.update(username='pkey2fa', password='wrongpassword',
    482                               privatekey=privatekey, totp='passcode')
    483         response = yield self.async_post(url, self.body_dict)
    484         data = json.loads(to_str(response.body))
    485         self.assert_status_in('Authentication failed', data)
    486 
    487     @tornado.testing.gen_test
    488     def test_app_with_user_pkey2fa_with_wrong_passcode(self):
    489         url = self.get_url('/')
    490         privatekey = read_file(make_tests_data_path('user_rsa_key'))
    491         self.body_dict.update(username='pkey2fa', password='password',
    492                               privatekey=privatekey, totp='wrongpasscode')
    493         response = yield self.async_post(url, self.body_dict)
    494         data = json.loads(to_str(response.body))
    495         self.assert_status_in('Authentication failed', data)
    496 
    497     @tornado.testing.gen_test
    498     def test_app_with_user_pkey2fa_with_empty_passcode(self):
    499         url = self.get_url('/')
    500         privatekey = read_file(make_tests_data_path('user_rsa_key'))
    501         self.body_dict.update(username='pkey2fa', password='password',
    502                               privatekey=privatekey, totp='')
    503         response = yield self.async_post(url, self.body_dict)
    504         data = json.loads(to_str(response.body))
    505         self.assert_status_in('Need a verification code', data)
    506 
    507 
    508 class OtherTestBase(TestAppBase):
    509     sshserver_port = 3300
    510     headers = {'Cookie': '_xsrf=yummy'}
    511     debug = False
    512     policy = None
    513     xsrf = True
    514     hostfile = ''
    515     syshostfile = ''
    516     tdstream = ''
    517     maxconn = 20
    518     origin = 'same'
    519     encodings = []
    520     body = {
    521         'hostname': '127.0.0.1',
    522         'port': '',
    523         'username': 'robey',
    524         'password': 'foo',
    525         '_xsrf': 'yummy'
    526     }
    527 
    528     def get_app(self):
    529         self.body.update(port=str(self.sshserver_port))
    530         loop = self.io_loop
    531         options.debug = self.debug
    532         options.xsrf = self.xsrf
    533         options.policy = self.policy if self.policy else random.choice(['warning', 'autoadd'])  # noqa
    534         options.hostfile = self.hostfile
    535         options.syshostfile = self.syshostfile
    536         options.tdstream = self.tdstream
    537         options.maxconn = self.maxconn
    538         options.origin = self.origin
    539         app = make_app(make_handlers(loop, options), get_app_settings(options))
    540         return app
    541 
    542     def setUp(self):
    543         print('='*20)
    544         self.running = True
    545         OtherTestBase.sshserver_port += 1
    546 
    547         t = threading.Thread(
    548             target=run_ssh_server,
    549             args=(self.sshserver_port, self.running, self.encodings)
    550         )
    551         t.setDaemon(True)
    552         t.start()
    553         super(OtherTestBase, self).setUp()
    554 
    555     def tearDown(self):
    556         self.running = False
    557         print('='*20)
    558         super(OtherTestBase, self).tearDown()
    559 
    560 
    561 class TestAppInDebugMode(OtherTestBase):
    562 
    563     debug = True
    564 
    565     def assert_response(self, bstr, response):
    566         if swallow_http_errors:
    567             self.assertEqual(response.code, 200)
    568             self.assertIn(bstr, response.body)
    569         else:
    570             self.assertEqual(response.code, 500)
    571             self.assertIn(b'Uncaught exception', response.body)
    572 
    573     def test_server_error_for_post_method(self):
    574         body = dict(self.body, error='raise')
    575         response = self.sync_post('/', body)
    576         self.assert_response(b'"status": "Internal Server Error"', response)
    577 
    578     def test_html(self):
    579         response = self.fetch('/', method='GET')
    580         self.assertIn(b'novalidate>', response.body)
    581 
    582 
    583 class TestAppWithLargeBuffer(OtherTestBase):
    584 
    585     @tornado.testing.gen_test
    586     def test_app_for_sending_message_with_large_size(self):
    587         url = self.get_url('/')
    588         response = yield self.async_post(url, dict(self.body, username='foo'))
    589         data = json.loads(to_str(response.body))
    590         self.assert_status_none(data)
    591 
    592         url = url.replace('http', 'ws')
    593         ws_url = url + 'ws?id=' + data['id']
    594         ws = yield tornado.websocket.websocket_connect(ws_url)
    595         msg = yield ws.read_message()
    596         self.assertEqual(to_str(msg, data['encoding']), banner)
    597 
    598         send = 'h' * (64 * 1024) + '\r\n\r\n'
    599         yield ws.write_message(json.dumps({'data': send}))
    600         lst = []
    601         while True:
    602             msg = yield ws.read_message()
    603             lst.append(msg)
    604             if msg.endswith(b'\r\n\r\n'):
    605                 break
    606         recv = b''.join(lst).decode(data['encoding'])
    607         self.assertEqual(send, recv)
    608         ws.close()
    609 
    610 
    611 class TestAppWithRejectPolicy(OtherTestBase):
    612 
    613     policy = 'reject'
    614     hostfile = make_tests_data_path('known_hosts_example')
    615 
    616     @tornado.testing.gen_test
    617     def test_app_with_hostname_not_in_hostkeys(self):
    618         response = yield self.async_post('/', self.body)
    619         data = json.loads(to_str(response.body))
    620         message = 'Connection to {}:{} is not allowed.'.format(self.body['hostname'], self.sshserver_port) # noqa
    621         self.assertEqual(message, data['status'])
    622 
    623 
    624 class TestAppWithBadHostKey(OtherTestBase):
    625 
    626     policy = random.choice(['warning', 'autoadd', 'reject'])
    627     hostfile = make_tests_data_path('test_known_hosts')
    628 
    629     def setUp(self):
    630         self.sshserver_port = 2222
    631         super(TestAppWithBadHostKey, self).setUp()
    632 
    633     @tornado.testing.gen_test
    634     def test_app_with_bad_host_key(self):
    635         response = yield self.async_post('/', self.body)
    636         data = json.loads(to_str(response.body))
    637         self.assertEqual('Bad host key.', data['status'])
    638 
    639 
    640 class TestAppWithTrustedStream(OtherTestBase):
    641     tdstream = '127.0.0.2'
    642 
    643     def test_with_forbidden_get_request(self):
    644         response = self.fetch('/', method='GET')
    645         self.assertEqual(response.code, 403)
    646         self.assertIn('Forbidden', response.error.message)
    647 
    648     def test_with_forbidden_post_request(self):
    649         response = self.sync_post('/', self.body)
    650         self.assertEqual(response.code, 403)
    651         self.assertIn('Forbidden', response.error.message)
    652 
    653     def test_with_forbidden_put_request(self):
    654         response = self.fetch_request('/', method='PUT', body=self.body)
    655         self.assertEqual(response.code, 403)
    656         self.assertIn('Forbidden', response.error.message)
    657 
    658 
    659 class TestAppNotFoundHandler(OtherTestBase):
    660 
    661     custom_headers = handler.MixinHandler.custom_headers
    662 
    663     def test_with_not_found_get_request(self):
    664         response = self.fetch('/pathnotfound', method='GET')
    665         self.assertEqual(response.code, 404)
    666         self.assertEqual(
    667             response.headers['Server'], self.custom_headers['Server']
    668         )
    669         self.assertIn(b'404: Not Found', response.body)
    670 
    671     def test_with_not_found_post_request(self):
    672         response = self.sync_post('/pathnotfound', self.body)
    673         self.assertEqual(response.code, 404)
    674         self.assertEqual(
    675             response.headers['Server'], self.custom_headers['Server']
    676         )
    677         self.assertIn(b'404: Not Found', response.body)
    678 
    679     def test_with_not_found_put_request(self):
    680         response = self.fetch_request('/pathnotfound', method='PUT',
    681                                       body=self.body)
    682         self.assertEqual(response.code, 404)
    683         self.assertEqual(
    684             response.headers['Server'], self.custom_headers['Server']
    685         )
    686         self.assertIn(b'404: Not Found', response.body)
    687 
    688 
    689 class TestAppWithHeadRequest(OtherTestBase):
    690 
    691     def test_with_index_path(self):
    692         response = self.fetch('/', method='HEAD')
    693         self.assertEqual(response.code, 200)
    694 
    695     def test_with_ws_path(self):
    696         response = self.fetch('/ws', method='HEAD')
    697         self.assertEqual(response.code, 405)
    698 
    699     def test_with_not_found_path(self):
    700         response = self.fetch('/notfound', method='HEAD')
    701         self.assertEqual(response.code, 404)
    702 
    703 
    704 class TestAppWithPutRequest(OtherTestBase):
    705 
    706     xsrf = False
    707 
    708     @tornado.testing.gen_test
    709     def test_app_with_method_not_supported(self):
    710         with self.assertRaises(HTTPError) as ctx:
    711             yield self.fetch_request('/', 'PUT', self.body, sync=False)
    712         self.assertIn('Method Not Allowed', ctx.exception.message)
    713 
    714 
    715 class TestAppWithTooManyConnections(OtherTestBase):
    716 
    717     maxconn = 1
    718 
    719     def setUp(self):
    720         clients.clear()
    721         super(TestAppWithTooManyConnections, self).setUp()
    722 
    723     @tornado.testing.gen_test
    724     def test_app_with_too_many_connections(self):
    725         clients['127.0.0.1'] = {'fake_worker_id': None}
    726 
    727         url = self.get_url('/')
    728         response = yield self.async_post(url, self.body)
    729         data = json.loads(to_str(response.body))
    730         self.assertEqual('Too many live connections.', data['status'])
    731 
    732         clients['127.0.0.1'].clear()
    733         response = yield self.async_post(url, self.body)
    734         self.assert_status_none(json.loads(to_str(response.body)))
    735 
    736 
    737 class TestAppWithCrossOriginOperation(OtherTestBase):
    738 
    739     origin = 'http://www.example.com'
    740 
    741     @tornado.testing.gen_test
    742     def test_app_with_wrong_event_origin(self):
    743         body = dict(self.body, _origin='localhost')
    744         response = yield self.async_post('/', body)
    745         self.assert_status_equal('Cross origin operation is not allowed.', json.loads(to_str(response.body))) # noqa
    746 
    747     @tornado.testing.gen_test
    748     def test_app_with_wrong_header_origin(self):
    749         headers = dict(Origin='localhost')
    750         response = yield self.async_post('/', self.body, headers=headers)
    751         self.assert_status_equal('Cross origin operation is not allowed.', json.loads(to_str(response.body)), ) # noqa
    752 
    753     @tornado.testing.gen_test
    754     def test_app_with_correct_event_origin(self):
    755         body = dict(self.body, _origin=self.origin)
    756         response = yield self.async_post('/', body)
    757         self.assert_status_none(json.loads(to_str(response.body)))
    758         self.assertIsNone(response.headers.get('Access-Control-Allow-Origin'))
    759 
    760     @tornado.testing.gen_test
    761     def test_app_with_correct_header_origin(self):
    762         headers = dict(Origin=self.origin)
    763         response = yield self.async_post('/', self.body, headers=headers)
    764         self.assert_status_none(json.loads(to_str(response.body)))
    765         self.assertEqual(
    766             response.headers.get('Access-Control-Allow-Origin'), self.origin
    767         )
    768 
    769 
    770 class TestAppWithBadEncoding(OtherTestBase):
    771 
    772     encodings = [u'\u7f16\u7801']
    773 
    774     @tornado.testing.gen_test
    775     def test_app_with_a_bad_encoding(self):
    776         response = yield self.async_post('/', self.body)
    777         dic = json.loads(to_str(response.body))
    778         self.assert_status_none(dic)
    779         self.assertIn(dic['encoding'], server_encodings)
    780 
    781 
    782 class TestAppWithUnknownEncoding(OtherTestBase):
    783 
    784     encodings = [u'\u7f16\u7801', u'UnknownEncoding']
    785 
    786     @tornado.testing.gen_test
    787     def test_app_with_a_unknown_encoding(self):
    788         response = yield self.async_post('/', self.body)
    789         self.assert_status_none(json.loads(to_str(response.body)))
    790         dic = json.loads(to_str(response.body))
    791         self.assert_status_none(dic)
    792         self.assertEqual(dic['encoding'], 'utf-8')