test_app.py (30545B)
1 import json 2 import random 3 import threading 4 import tornado.websocket 5 import tornado.gen 6 7 from tornado.testing import AsyncHTTPTestCase 8 from tornado.httpclient import HTTPError 9 from tornado.options import options 10 from tests.sshserver import run_ssh_server, banner, Server 11 from tests.utils import encode_multipart_formdata, read_file, make_tests_data_path # noqa 12 from webssh import handler 13 from webssh.main import make_app, make_handlers 14 from webssh.settings import ( 15 get_app_settings, get_server_settings, max_body_size 16 ) 17 from webssh.utils import to_str 18 from webssh.worker import clients 19 20 try: 21 from urllib.parse import urlencode 22 except ImportError: 23 from urllib import urlencode 24 25 26 swallow_http_errors = handler.swallow_http_errors 27 server_encodings = {e.strip() for e in Server.encodings} 28 29 30 class TestAppBase(AsyncHTTPTestCase): 31 32 def get_httpserver_options(self): 33 return get_server_settings(options) 34 35 def assert_response(self, bstr, response): 36 if swallow_http_errors: 37 self.assertEqual(response.code, 200) 38 self.assertIn(bstr, response.body) 39 else: 40 self.assertEqual(response.code, 400) 41 self.assertIn(b'Bad Request', response.body) 42 43 def assert_status_in(self, status, data): 44 self.assertIsNone(data['encoding']) 45 self.assertIsNone(data['id']) 46 self.assertIn(status, data['status']) 47 48 def assert_status_equal(self, status, data): 49 self.assertIsNone(data['encoding']) 50 self.assertIsNone(data['id']) 51 self.assertEqual(status, data['status']) 52 53 def assert_status_none(self, data): 54 self.assertIsNotNone(data['encoding']) 55 self.assertIsNotNone(data['id']) 56 self.assertIsNone(data['status']) 57 58 def fetch_request(self, url, method='GET', body='', headers={}, sync=True): 59 if not sync and url.startswith('/'): 60 url = self.get_url(url) 61 62 if isinstance(body, dict): 63 body = urlencode(body) 64 65 if not headers: 66 headers = self.headers 67 else: 68 headers.update(self.headers) 69 70 client = self if sync else self.get_http_client() 71 return client.fetch(url, method=method, body=body, headers=headers) 72 73 def sync_post(self, url, body, headers={}): 74 return self.fetch_request(url, 'POST', body, headers) 75 76 def async_post(self, url, body, headers={}): 77 return self.fetch_request(url, 'POST', body, headers, sync=False) 78 79 80 class TestAppBasic(TestAppBase): 81 82 running = [True] 83 sshserver_port = 2200 84 body = 'hostname=127.0.0.1&port={}&_xsrf=yummy&username=robey&password=foo'.format(sshserver_port) # noqa 85 headers = {'Cookie': '_xsrf=yummy'} 86 87 def get_app(self): 88 self.body_dict = { 89 'hostname': '127.0.0.1', 90 'port': str(self.sshserver_port), 91 'username': 'robey', 92 'password': '', 93 '_xsrf': 'yummy' 94 } 95 loop = self.io_loop 96 options.debug = False 97 options.policy = random.choice(['warning', 'autoadd']) 98 options.hostfile = '' 99 options.syshostfile = '' 100 options.tdstream = '' 101 options.delay = 0.1 102 app = make_app(make_handlers(loop, options), get_app_settings(options)) 103 return app 104 105 @classmethod 106 def setUpClass(cls): 107 print('='*20) 108 t = threading.Thread( 109 target=run_ssh_server, args=(cls.sshserver_port, cls.running) 110 ) 111 t.setDaemon(True) 112 t.start() 113 114 @classmethod 115 def tearDownClass(cls): 116 cls.running.pop() 117 print('='*20) 118 119 def test_app_with_invalid_form_for_missing_argument(self): 120 response = self.fetch('/') 121 self.assertEqual(response.code, 200) 122 123 body = 'port=7000&username=admin&password&_xsrf=yummy' 124 response = self.sync_post('/', body) 125 self.assert_response(b'Missing argument hostname', response) 126 127 body = 'hostname=127.0.0.1&port=7000&password&_xsrf=yummy' 128 response = self.sync_post('/', body) 129 self.assert_response(b'Missing argument username', response) 130 131 body = 'hostname=&port=&username=&password&_xsrf=yummy' 132 response = self.sync_post('/', body) 133 self.assert_response(b'Missing value hostname', response) 134 135 body = 'hostname=127.0.0.1&port=7000&username=&password&_xsrf=yummy' 136 response = self.sync_post('/', body) 137 self.assert_response(b'Missing value username', response) 138 139 def test_app_with_invalid_form_for_invalid_value(self): 140 body = 'hostname=127.0.0&port=22&username=&password&_xsrf=yummy' 141 response = self.sync_post('/', body) 142 self.assert_response(b'Invalid hostname', response) 143 144 body = 'hostname=http://www.googe.com&port=22&username=&password&_xsrf=yummy' # noqa 145 response = self.sync_post('/', body) 146 self.assert_response(b'Invalid hostname', response) 147 148 body = 'hostname=127.0.0.1&port=port&username=&password&_xsrf=yummy' 149 response = self.sync_post('/', body) 150 self.assert_response(b'Invalid port', response) 151 152 body = 'hostname=127.0.0.1&port=70000&username=&password&_xsrf=yummy' 153 response = self.sync_post('/', body) 154 self.assert_response(b'Invalid port', response) 155 156 def test_app_with_wrong_hostname_ip(self): 157 body = 'hostname=127.0.0.2&port=2200&username=admin&_xsrf=yummy' 158 response = self.sync_post('/', body) 159 self.assertEqual(response.code, 200) 160 self.assertIn(b'Unable to connect to', response.body) 161 162 def test_app_with_wrong_hostname_domain(self): 163 body = 'hostname=xxxxxxxxxxxx&port=2200&username=admin&_xsrf=yummy' 164 response = self.sync_post('/', body) 165 self.assertEqual(response.code, 200) 166 self.assertIn(b'Unable to connect to', response.body) 167 168 def test_app_with_wrong_port(self): 169 body = 'hostname=127.0.0.1&port=7000&username=admin&_xsrf=yummy' 170 response = self.sync_post('/', body) 171 self.assertEqual(response.code, 200) 172 self.assertIn(b'Unable to connect to', response.body) 173 174 def test_app_with_wrong_credentials(self): 175 response = self.sync_post('/', self.body + 's') 176 self.assert_status_in('Authentication failed.', json.loads(to_str(response.body))) # noqa 177 178 def test_app_with_correct_credentials(self): 179 response = self.sync_post('/', self.body) 180 self.assert_status_none(json.loads(to_str(response.body))) 181 182 def test_app_with_correct_credentials_but_with_no_port(self): 183 default_port = handler.DEFAULT_PORT 184 handler.DEFAULT_PORT = self.sshserver_port 185 186 # with no port value 187 body = self.body.replace(str(self.sshserver_port), '') 188 response = self.sync_post('/', body) 189 self.assert_status_none(json.loads(to_str(response.body))) 190 191 # with no port argument 192 body = body.replace('port=&', '') 193 response = self.sync_post('/', body) 194 self.assert_status_none(json.loads(to_str(response.body))) 195 196 handler.DEFAULT_PORT = default_port 197 198 @tornado.testing.gen_test 199 def test_app_with_correct_credentials_timeout(self): 200 url = self.get_url('/') 201 response = yield self.async_post(url, self.body) 202 data = json.loads(to_str(response.body)) 203 self.assert_status_none(data) 204 205 url = url.replace('http', 'ws') 206 ws_url = url + 'ws?id=' + data['id'] 207 yield tornado.gen.sleep(options.delay + 0.1) 208 ws = yield tornado.websocket.websocket_connect(ws_url) 209 msg = yield ws.read_message() 210 self.assertIsNone(msg) 211 self.assertEqual(ws.close_reason, 'Websocket authentication failed.') 212 213 @tornado.testing.gen_test 214 def test_app_with_correct_credentials_but_ip_not_matched(self): 215 url = self.get_url('/') 216 response = yield self.async_post(url, self.body) 217 data = json.loads(to_str(response.body)) 218 self.assert_status_none(data) 219 220 clients = handler.clients 221 handler.clients = {} 222 url = url.replace('http', 'ws') 223 ws_url = url + 'ws?id=' + data['id'] 224 ws = yield tornado.websocket.websocket_connect(ws_url) 225 msg = yield ws.read_message() 226 self.assertIsNone(msg) 227 self.assertEqual(ws.close_reason, 'Websocket authentication failed.') 228 handler.clients = clients 229 230 @tornado.testing.gen_test 231 def test_app_with_correct_credentials_user_robey(self): 232 url = self.get_url('/') 233 response = yield self.async_post(url, self.body) 234 data = json.loads(to_str(response.body)) 235 self.assert_status_none(data) 236 237 url = url.replace('http', 'ws') 238 ws_url = url + 'ws?id=' + data['id'] 239 ws = yield tornado.websocket.websocket_connect(ws_url) 240 msg = yield ws.read_message() 241 self.assertEqual(to_str(msg, data['encoding']), banner) 242 ws.close() 243 244 @tornado.testing.gen_test 245 def test_app_with_correct_credentials_but_without_id_argument(self): 246 url = self.get_url('/') 247 response = yield self.async_post(url, self.body) 248 data = json.loads(to_str(response.body)) 249 self.assert_status_none(data) 250 251 url = url.replace('http', 'ws') 252 ws_url = url + 'ws' 253 ws = yield tornado.websocket.websocket_connect(ws_url) 254 msg = yield ws.read_message() 255 self.assertIsNone(msg) 256 self.assertIn('Missing argument id', ws.close_reason) 257 258 @tornado.testing.gen_test 259 def test_app_with_correct_credentials_but_empty_id(self): 260 url = self.get_url('/') 261 response = yield self.async_post(url, self.body) 262 data = json.loads(to_str(response.body)) 263 self.assert_status_none(data) 264 265 url = url.replace('http', 'ws') 266 ws_url = url + 'ws?id=' 267 ws = yield tornado.websocket.websocket_connect(ws_url) 268 msg = yield ws.read_message() 269 self.assertIsNone(msg) 270 self.assertIn('Missing value id', ws.close_reason) 271 272 @tornado.testing.gen_test 273 def test_app_with_correct_credentials_but_wrong_id(self): 274 url = self.get_url('/') 275 response = yield self.async_post(url, self.body) 276 data = json.loads(to_str(response.body)) 277 self.assert_status_none(data) 278 279 url = url.replace('http', 'ws') 280 ws_url = url + 'ws?id=1' + data['id'] 281 ws = yield tornado.websocket.websocket_connect(ws_url) 282 msg = yield ws.read_message() 283 self.assertIsNone(msg) 284 self.assertIn('Websocket authentication failed', ws.close_reason) 285 286 @tornado.testing.gen_test 287 def test_app_with_correct_credentials_user_bar(self): 288 body = self.body.replace('robey', 'bar') 289 url = self.get_url('/') 290 response = yield self.async_post(url, body) 291 data = json.loads(to_str(response.body)) 292 self.assert_status_none(data) 293 294 url = url.replace('http', 'ws') 295 ws_url = url + 'ws?id=' + data['id'] 296 ws = yield tornado.websocket.websocket_connect(ws_url) 297 msg = yield ws.read_message() 298 self.assertEqual(to_str(msg, data['encoding']), banner) 299 300 # messages below will be ignored silently 301 yield ws.write_message('hello') 302 yield ws.write_message('"hello"') 303 yield ws.write_message('[hello]') 304 yield ws.write_message(json.dumps({'resize': []})) 305 yield ws.write_message(json.dumps({'resize': {}})) 306 yield ws.write_message(json.dumps({'resize': 'ab'})) 307 yield ws.write_message(json.dumps({'resize': ['a', 'b']})) 308 yield ws.write_message(json.dumps({'resize': {'a': 1, 'b': 2}})) 309 yield ws.write_message(json.dumps({'resize': [100]})) 310 yield ws.write_message(json.dumps({'resize': [100]*10})) 311 yield ws.write_message(json.dumps({'resize': [-1, -1]})) 312 yield ws.write_message(json.dumps({'data': [1]})) 313 yield ws.write_message(json.dumps({'data': (1,)})) 314 yield ws.write_message(json.dumps({'data': {'a': 2}})) 315 yield ws.write_message(json.dumps({'data': 1})) 316 yield ws.write_message(json.dumps({'data': 2.1})) 317 yield ws.write_message(json.dumps({'key-non-existed': 'hello'})) 318 # end - those just for testing webssh websocket stablity 319 320 yield ws.write_message(json.dumps({'resize': [79, 23]})) 321 msg = yield ws.read_message() 322 self.assertEqual(b'resized', msg) 323 324 yield ws.write_message(json.dumps({'data': 'bye'})) 325 msg = yield ws.read_message() 326 self.assertEqual(b'bye', msg) 327 ws.close() 328 329 @tornado.testing.gen_test 330 def test_app_auth_with_valid_pubkey_by_urlencoded_form(self): 331 url = self.get_url('/') 332 privatekey = read_file(make_tests_data_path('user_rsa_key')) 333 self.body_dict.update(privatekey=privatekey) 334 response = yield self.async_post(url, self.body_dict) 335 data = json.loads(to_str(response.body)) 336 self.assert_status_none(data) 337 338 url = url.replace('http', 'ws') 339 ws_url = url + 'ws?id=' + data['id'] 340 ws = yield tornado.websocket.websocket_connect(ws_url) 341 msg = yield ws.read_message() 342 self.assertEqual(to_str(msg, data['encoding']), banner) 343 ws.close() 344 345 @tornado.testing.gen_test 346 def test_app_auth_with_valid_pubkey_by_multipart_form(self): 347 url = self.get_url('/') 348 privatekey = read_file(make_tests_data_path('user_rsa_key')) 349 files = [('privatekey', 'user_rsa_key', privatekey)] 350 content_type, body = encode_multipart_formdata(self.body_dict.items(), 351 files) 352 headers = { 353 'Content-Type': content_type, 'content-length': str(len(body)) 354 } 355 response = yield self.async_post(url, body, headers=headers) 356 data = json.loads(to_str(response.body)) 357 self.assert_status_none(data) 358 359 url = url.replace('http', 'ws') 360 ws_url = url + 'ws?id=' + data['id'] 361 ws = yield tornado.websocket.websocket_connect(ws_url) 362 msg = yield ws.read_message() 363 self.assertEqual(to_str(msg, data['encoding']), banner) 364 ws.close() 365 366 @tornado.testing.gen_test 367 def test_app_auth_with_invalid_pubkey_for_user_robey(self): 368 url = self.get_url('/') 369 privatekey = 'h' * 1024 370 files = [('privatekey', 'user_rsa_key', privatekey)] 371 content_type, body = encode_multipart_formdata(self.body_dict.items(), 372 files) 373 headers = { 374 'Content-Type': content_type, 'content-length': str(len(body)) 375 } 376 377 if swallow_http_errors: 378 response = yield self.async_post(url, body, headers=headers) 379 self.assertIn(b'Invalid key', response.body) 380 else: 381 with self.assertRaises(HTTPError) as ctx: 382 yield self.async_post(url, body, headers=headers) 383 self.assertIn('Bad Request', ctx.exception.message) 384 385 @tornado.testing.gen_test 386 def test_app_auth_with_pubkey_exceeds_key_max_size(self): 387 url = self.get_url('/') 388 privatekey = 'h' * (handler.PrivateKey.max_length + 1) 389 files = [('privatekey', 'user_rsa_key', privatekey)] 390 content_type, body = encode_multipart_formdata(self.body_dict.items(), 391 files) 392 headers = { 393 'Content-Type': content_type, 'content-length': str(len(body)) 394 } 395 if swallow_http_errors: 396 response = yield self.async_post(url, body, headers=headers) 397 self.assertIn(b'Invalid key', response.body) 398 else: 399 with self.assertRaises(HTTPError) as ctx: 400 yield self.async_post(url, body, headers=headers) 401 self.assertIn('Bad Request', ctx.exception.message) 402 403 @tornado.testing.gen_test 404 def test_app_auth_with_pubkey_cannot_be_decoded_by_multipart_form(self): 405 url = self.get_url('/') 406 privatekey = 'h' * 1024 407 files = [('privatekey', 'user_rsa_key', privatekey)] 408 content_type, body = encode_multipart_formdata(self.body_dict.items(), 409 files) 410 body = body.encode('utf-8') 411 # added some gbk bytes to the privatekey, make it cannot be decoded 412 body = body[:-100] + b'\xb4\xed\xce\xf3' + body[-100:] 413 headers = { 414 'Content-Type': content_type, 'content-length': str(len(body)) 415 } 416 if swallow_http_errors: 417 response = yield self.async_post(url, body, headers=headers) 418 self.assertIn(b'Invalid unicode', response.body) 419 else: 420 with self.assertRaises(HTTPError) as ctx: 421 yield self.async_post(url, body, headers=headers) 422 self.assertIn('Bad Request', ctx.exception.message) 423 424 def test_app_post_form_with_large_body_size_by_multipart_form(self): 425 privatekey = 'h' * (2 * max_body_size) 426 files = [('privatekey', 'user_rsa_key', privatekey)] 427 content_type, body = encode_multipart_formdata(self.body_dict.items(), 428 files) 429 headers = { 430 'Content-Type': content_type, 'content-length': str(len(body)) 431 } 432 response = self.sync_post('/', body, headers=headers) 433 self.assertIn(response.code, [400, 599]) 434 435 def test_app_post_form_with_large_body_size_by_urlencoded_form(self): 436 privatekey = 'h' * (2 * max_body_size) 437 body = self.body + '&privatekey=' + privatekey 438 response = self.sync_post('/', body) 439 self.assertIn(response.code, [400, 599]) 440 441 @tornado.testing.gen_test 442 def test_app_with_user_keyonly_for_bad_authentication_type(self): 443 self.body_dict.update(username='keyonly', password='foo') 444 response = yield self.async_post('/', self.body_dict) 445 self.assertEqual(response.code, 200) 446 self.assert_status_in('Bad authentication type', json.loads(to_str(response.body))) # noqa 447 448 @tornado.testing.gen_test 449 def test_app_with_user_pass2fa_with_correct_passwords(self): 450 self.body_dict.update(username='pass2fa', password='password', 451 totp='passcode') 452 response = yield self.async_post('/', self.body_dict) 453 self.assertEqual(response.code, 200) 454 data = json.loads(to_str(response.body)) 455 self.assert_status_none(data) 456 457 @tornado.testing.gen_test 458 def test_app_with_user_pass2fa_with_wrong_pkey_correct_passwords(self): 459 url = self.get_url('/') 460 privatekey = read_file(make_tests_data_path('user_rsa_key')) 461 self.body_dict.update(username='pass2fa', password='password', 462 privatekey=privatekey, totp='passcode') 463 response = yield self.async_post(url, self.body_dict) 464 data = json.loads(to_str(response.body)) 465 self.assert_status_none(data) 466 467 @tornado.testing.gen_test 468 def test_app_with_user_pkey2fa_with_correct_passwords(self): 469 url = self.get_url('/') 470 privatekey = read_file(make_tests_data_path('user_rsa_key')) 471 self.body_dict.update(username='pkey2fa', password='password', 472 privatekey=privatekey, totp='passcode') 473 response = yield self.async_post(url, self.body_dict) 474 data = json.loads(to_str(response.body)) 475 self.assert_status_none(data) 476 477 @tornado.testing.gen_test 478 def test_app_with_user_pkey2fa_with_wrong_password(self): 479 url = self.get_url('/') 480 privatekey = read_file(make_tests_data_path('user_rsa_key')) 481 self.body_dict.update(username='pkey2fa', password='wrongpassword', 482 privatekey=privatekey, totp='passcode') 483 response = yield self.async_post(url, self.body_dict) 484 data = json.loads(to_str(response.body)) 485 self.assert_status_in('Authentication failed', data) 486 487 @tornado.testing.gen_test 488 def test_app_with_user_pkey2fa_with_wrong_passcode(self): 489 url = self.get_url('/') 490 privatekey = read_file(make_tests_data_path('user_rsa_key')) 491 self.body_dict.update(username='pkey2fa', password='password', 492 privatekey=privatekey, totp='wrongpasscode') 493 response = yield self.async_post(url, self.body_dict) 494 data = json.loads(to_str(response.body)) 495 self.assert_status_in('Authentication failed', data) 496 497 @tornado.testing.gen_test 498 def test_app_with_user_pkey2fa_with_empty_passcode(self): 499 url = self.get_url('/') 500 privatekey = read_file(make_tests_data_path('user_rsa_key')) 501 self.body_dict.update(username='pkey2fa', password='password', 502 privatekey=privatekey, totp='') 503 response = yield self.async_post(url, self.body_dict) 504 data = json.loads(to_str(response.body)) 505 self.assert_status_in('Need a verification code', data) 506 507 508 class OtherTestBase(TestAppBase): 509 sshserver_port = 3300 510 headers = {'Cookie': '_xsrf=yummy'} 511 debug = False 512 policy = None 513 xsrf = True 514 hostfile = '' 515 syshostfile = '' 516 tdstream = '' 517 maxconn = 20 518 origin = 'same' 519 encodings = [] 520 body = { 521 'hostname': '127.0.0.1', 522 'port': '', 523 'username': 'robey', 524 'password': 'foo', 525 '_xsrf': 'yummy' 526 } 527 528 def get_app(self): 529 self.body.update(port=str(self.sshserver_port)) 530 loop = self.io_loop 531 options.debug = self.debug 532 options.xsrf = self.xsrf 533 options.policy = self.policy if self.policy else random.choice(['warning', 'autoadd']) # noqa 534 options.hostfile = self.hostfile 535 options.syshostfile = self.syshostfile 536 options.tdstream = self.tdstream 537 options.maxconn = self.maxconn 538 options.origin = self.origin 539 app = make_app(make_handlers(loop, options), get_app_settings(options)) 540 return app 541 542 def setUp(self): 543 print('='*20) 544 self.running = True 545 OtherTestBase.sshserver_port += 1 546 547 t = threading.Thread( 548 target=run_ssh_server, 549 args=(self.sshserver_port, self.running, self.encodings) 550 ) 551 t.setDaemon(True) 552 t.start() 553 super(OtherTestBase, self).setUp() 554 555 def tearDown(self): 556 self.running = False 557 print('='*20) 558 super(OtherTestBase, self).tearDown() 559 560 561 class TestAppInDebugMode(OtherTestBase): 562 563 debug = True 564 565 def assert_response(self, bstr, response): 566 if swallow_http_errors: 567 self.assertEqual(response.code, 200) 568 self.assertIn(bstr, response.body) 569 else: 570 self.assertEqual(response.code, 500) 571 self.assertIn(b'Uncaught exception', response.body) 572 573 def test_server_error_for_post_method(self): 574 body = dict(self.body, error='raise') 575 response = self.sync_post('/', body) 576 self.assert_response(b'"status": "Internal Server Error"', response) 577 578 def test_html(self): 579 response = self.fetch('/', method='GET') 580 self.assertIn(b'novalidate>', response.body) 581 582 583 class TestAppWithLargeBuffer(OtherTestBase): 584 585 @tornado.testing.gen_test 586 def test_app_for_sending_message_with_large_size(self): 587 url = self.get_url('/') 588 response = yield self.async_post(url, dict(self.body, username='foo')) 589 data = json.loads(to_str(response.body)) 590 self.assert_status_none(data) 591 592 url = url.replace('http', 'ws') 593 ws_url = url + 'ws?id=' + data['id'] 594 ws = yield tornado.websocket.websocket_connect(ws_url) 595 msg = yield ws.read_message() 596 self.assertEqual(to_str(msg, data['encoding']), banner) 597 598 send = 'h' * (64 * 1024) + '\r\n\r\n' 599 yield ws.write_message(json.dumps({'data': send})) 600 lst = [] 601 while True: 602 msg = yield ws.read_message() 603 lst.append(msg) 604 if msg.endswith(b'\r\n\r\n'): 605 break 606 recv = b''.join(lst).decode(data['encoding']) 607 self.assertEqual(send, recv) 608 ws.close() 609 610 611 class TestAppWithRejectPolicy(OtherTestBase): 612 613 policy = 'reject' 614 hostfile = make_tests_data_path('known_hosts_example') 615 616 @tornado.testing.gen_test 617 def test_app_with_hostname_not_in_hostkeys(self): 618 response = yield self.async_post('/', self.body) 619 data = json.loads(to_str(response.body)) 620 message = 'Connection to {}:{} is not allowed.'.format(self.body['hostname'], self.sshserver_port) # noqa 621 self.assertEqual(message, data['status']) 622 623 624 class TestAppWithBadHostKey(OtherTestBase): 625 626 policy = random.choice(['warning', 'autoadd', 'reject']) 627 hostfile = make_tests_data_path('test_known_hosts') 628 629 def setUp(self): 630 self.sshserver_port = 2222 631 super(TestAppWithBadHostKey, self).setUp() 632 633 @tornado.testing.gen_test 634 def test_app_with_bad_host_key(self): 635 response = yield self.async_post('/', self.body) 636 data = json.loads(to_str(response.body)) 637 self.assertEqual('Bad host key.', data['status']) 638 639 640 class TestAppWithTrustedStream(OtherTestBase): 641 tdstream = '127.0.0.2' 642 643 def test_with_forbidden_get_request(self): 644 response = self.fetch('/', method='GET') 645 self.assertEqual(response.code, 403) 646 self.assertIn('Forbidden', response.error.message) 647 648 def test_with_forbidden_post_request(self): 649 response = self.sync_post('/', self.body) 650 self.assertEqual(response.code, 403) 651 self.assertIn('Forbidden', response.error.message) 652 653 def test_with_forbidden_put_request(self): 654 response = self.fetch_request('/', method='PUT', body=self.body) 655 self.assertEqual(response.code, 403) 656 self.assertIn('Forbidden', response.error.message) 657 658 659 class TestAppNotFoundHandler(OtherTestBase): 660 661 custom_headers = handler.MixinHandler.custom_headers 662 663 def test_with_not_found_get_request(self): 664 response = self.fetch('/pathnotfound', method='GET') 665 self.assertEqual(response.code, 404) 666 self.assertEqual( 667 response.headers['Server'], self.custom_headers['Server'] 668 ) 669 self.assertIn(b'404: Not Found', response.body) 670 671 def test_with_not_found_post_request(self): 672 response = self.sync_post('/pathnotfound', self.body) 673 self.assertEqual(response.code, 404) 674 self.assertEqual( 675 response.headers['Server'], self.custom_headers['Server'] 676 ) 677 self.assertIn(b'404: Not Found', response.body) 678 679 def test_with_not_found_put_request(self): 680 response = self.fetch_request('/pathnotfound', method='PUT', 681 body=self.body) 682 self.assertEqual(response.code, 404) 683 self.assertEqual( 684 response.headers['Server'], self.custom_headers['Server'] 685 ) 686 self.assertIn(b'404: Not Found', response.body) 687 688 689 class TestAppWithHeadRequest(OtherTestBase): 690 691 def test_with_index_path(self): 692 response = self.fetch('/', method='HEAD') 693 self.assertEqual(response.code, 200) 694 695 def test_with_ws_path(self): 696 response = self.fetch('/ws', method='HEAD') 697 self.assertEqual(response.code, 405) 698 699 def test_with_not_found_path(self): 700 response = self.fetch('/notfound', method='HEAD') 701 self.assertEqual(response.code, 404) 702 703 704 class TestAppWithPutRequest(OtherTestBase): 705 706 xsrf = False 707 708 @tornado.testing.gen_test 709 def test_app_with_method_not_supported(self): 710 with self.assertRaises(HTTPError) as ctx: 711 yield self.fetch_request('/', 'PUT', self.body, sync=False) 712 self.assertIn('Method Not Allowed', ctx.exception.message) 713 714 715 class TestAppWithTooManyConnections(OtherTestBase): 716 717 maxconn = 1 718 719 def setUp(self): 720 clients.clear() 721 super(TestAppWithTooManyConnections, self).setUp() 722 723 @tornado.testing.gen_test 724 def test_app_with_too_many_connections(self): 725 clients['127.0.0.1'] = {'fake_worker_id': None} 726 727 url = self.get_url('/') 728 response = yield self.async_post(url, self.body) 729 data = json.loads(to_str(response.body)) 730 self.assertEqual('Too many live connections.', data['status']) 731 732 clients['127.0.0.1'].clear() 733 response = yield self.async_post(url, self.body) 734 self.assert_status_none(json.loads(to_str(response.body))) 735 736 737 class TestAppWithCrossOriginOperation(OtherTestBase): 738 739 origin = 'http://www.example.com' 740 741 @tornado.testing.gen_test 742 def test_app_with_wrong_event_origin(self): 743 body = dict(self.body, _origin='localhost') 744 response = yield self.async_post('/', body) 745 self.assert_status_equal('Cross origin operation is not allowed.', json.loads(to_str(response.body))) # noqa 746 747 @tornado.testing.gen_test 748 def test_app_with_wrong_header_origin(self): 749 headers = dict(Origin='localhost') 750 response = yield self.async_post('/', self.body, headers=headers) 751 self.assert_status_equal('Cross origin operation is not allowed.', json.loads(to_str(response.body)), ) # noqa 752 753 @tornado.testing.gen_test 754 def test_app_with_correct_event_origin(self): 755 body = dict(self.body, _origin=self.origin) 756 response = yield self.async_post('/', body) 757 self.assert_status_none(json.loads(to_str(response.body))) 758 self.assertIsNone(response.headers.get('Access-Control-Allow-Origin')) 759 760 @tornado.testing.gen_test 761 def test_app_with_correct_header_origin(self): 762 headers = dict(Origin=self.origin) 763 response = yield self.async_post('/', self.body, headers=headers) 764 self.assert_status_none(json.loads(to_str(response.body))) 765 self.assertEqual( 766 response.headers.get('Access-Control-Allow-Origin'), self.origin 767 ) 768 769 770 class TestAppWithBadEncoding(OtherTestBase): 771 772 encodings = [u'\u7f16\u7801'] 773 774 @tornado.testing.gen_test 775 def test_app_with_a_bad_encoding(self): 776 response = yield self.async_post('/', self.body) 777 dic = json.loads(to_str(response.body)) 778 self.assert_status_none(dic) 779 self.assertIn(dic['encoding'], server_encodings) 780 781 782 class TestAppWithUnknownEncoding(OtherTestBase): 783 784 encodings = [u'\u7f16\u7801', u'UnknownEncoding'] 785 786 @tornado.testing.gen_test 787 def test_app_with_a_unknown_encoding(self): 788 response = yield self.async_post('/', self.body) 789 self.assert_status_none(json.loads(to_str(response.body))) 790 dic = json.loads(to_str(response.body)) 791 self.assert_status_none(dic) 792 self.assertEqual(dic['encoding'], 'utf-8')