webssh

Web based ssh client https://github.com/huashengdun/webssh webssh.huashengdun.org/
git clone http://git.hanabi.in/repos/webssh.git
Log | Files | Refs | README | LICENSE

commit 58feb5e76bb8c3fa88613283de8abd748995c57f
parent 688ca78bb2100376c9c1b56970c3b81abd2d715f
Author: Sheng <webmaster0115@gmail.com>
Date:   Tue, 28 Aug 2018 20:08:03 +0800

Enable xsrf for testing

Diffstat:
Mtests/test_app.py | 179+++++++++++++++++++++++++++----------------------------------------------------
1 file changed, 60 insertions(+), 119 deletions(-)

diff --git a/tests/test_app.py b/tests/test_app.py @@ -29,23 +29,23 @@ class TestApp(AsyncHTTPTestCase): running = [True] sshserver_port = 2200 - body = 'hostname=127.0.0.1&port={}&username=robey&password=foo'.format(sshserver_port) # noqa + body = 'hostname=127.0.0.1&port={}&_xsrf=yummy&username=robey&password=foo'.format(sshserver_port) # noqa + headers = {'Cookie': '_xsrf=yummy'} def get_app(self): self.body_dict = { 'hostname': '127.0.0.1', 'port': str(self.sshserver_port), 'username': 'robey', - 'password': '' + 'password': '', + '_xsrf': 'yummy' } loop = self.io_loop options.debug = False options.policy = random.choice(['warning', 'autoadd']) options.hostFile = '' options.sysHostFile = '' - settings = get_app_settings(options) - settings.update(xsrf_cookies=False) - app = make_app(make_handlers(loop, options), settings) + app = make_app(make_handlers(loop, options), get_app_settings(options)) return app @classmethod @@ -73,82 +73,87 @@ class TestApp(AsyncHTTPTestCase): else: self.assertIn(b'Bad Request', whole) + def sync_post(self, body, headers={}, url='/', method='POST'): + headers.update(self.headers) + return self.fetch(url, method=method, body=body, headers=headers) + + def async_post(self, url, body, headers={}, method='POST'): + client = self.get_http_client() + headers.update(self.headers) + return client.fetch(url, method=method, body=body, headers=headers) + def test_app_with_invalid_form_for_missing_argument(self): response = self.fetch('/') self.assertEqual(response.code, 200) - body = 'port=7000&username=admin&password' - response = self.fetch('/', method='POST', body=body) + body = 'port=7000&username=admin&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Missing argument hostname', response.body) - body = 'hostname=127.0.0.1&username=admin&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0.1&username=admin&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Missing argument port', response.body) - body = 'hostname=127.0.0.1&port=7000&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0.1&port=7000&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Missing argument username', response.body) - body = 'hostname=&port=&username=&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=&port=&username=&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Missing value hostname', response.body) - body = 'hostname=127.0.0.1&port=&username=&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0.1&port=&username=&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Missing value port', response.body) - body = 'hostname=127.0.0.1&port=7000&username=&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0.1&port=7000&username=&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Missing value username', response.body) def test_app_with_invalid_form_for_invalid_value(self): - body = 'hostname=127.0.0&port=22&username=&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0&port=22&username=&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Invalid hostname', response.body) - body = 'hostname=http://www.googe.com&port=22&username=&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=http://www.googe.com&port=22&username=&password&_xsrf=yummy' # noqa + response = self.sync_post(body) self.my_assertIn(b'Invalid hostname', response.body) - body = 'hostname=127.0.0.1&port=port&username=&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0.1&port=port&username=&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Invalid port', response.body) - body = 'hostname=127.0.0.1&port=70000&username=&password' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0.1&port=70000&username=&password&_xsrf=yummy' + response = self.sync_post(body) self.my_assertIn(b'Invalid port', response.body) def test_app_with_wrong_hostname_ip(self): - body = 'hostname=127.0.0.1&port=7000&username=admin' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0.1&port=7000&username=admin&_xsrf=yummy' + response = self.sync_post(body) self.assertEqual(response.code, 200) self.assertIn(b'Unable to connect to', response.body) def test_app_with_wrong_hostname_domain(self): - body = 'hostname=xxxxxxxxxxxx&port=7000&username=admin' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=xxxxxxxxxxxx&port=7000&username=admin&_xsrf=yummy' + response = self.sync_post(body) self.assertEqual(response.code, 200) self.assertIn(b'Unable to connect to', response.body) def test_app_with_wrong_port(self): - body = 'hostname=127.0.0.1&port=7000&username=admin' - response = self.fetch('/', method='POST', body=body) + body = 'hostname=127.0.0.1&port=7000&username=admin&_xsrf=yummy' + response = self.sync_post(body) self.assertEqual(response.code, 200) self.assertIn(b'Unable to connect to', response.body) def test_app_with_wrong_credentials(self): - response = self.fetch('/') - self.assertEqual(response.code, 200) - response = self.fetch('/', method='POST', body=self.body + 's') + response = self.sync_post(self.body + 's') data = json.loads(to_str(response.body)) self.assertIsNone(data['encoding']) self.assertIsNone(data['id']) self.assertIn('Authentication failed.', data['status']) def test_app_with_correct_credentials(self): - response = self.fetch('/') - self.assertEqual(response.code, 200) - response = self.fetch('/', method='POST', body=self.body) + response = self.sync_post(self.body) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -157,11 +162,7 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_with_correct_credentials_timeout(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - - response = yield client.fetch(url, method='POST', body=self.body) + response = yield self.async_post(url, self.body) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -178,11 +179,7 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_with_correct_credentials_user_robey(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - - response = yield client.fetch(url, method='POST', body=self.body) + response = yield self.async_post(url, self.body) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -198,11 +195,7 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_with_correct_credentials_but_without_id_argument(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - - response = yield client.fetch(url, method='POST', body=self.body) + response = yield self.async_post(url, self.body) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -218,11 +211,7 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_with_correct_credentials_but_empty_id(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - - response = yield client.fetch(url, method='POST', body=self.body) + response = yield self.async_post(url, self.body) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -238,11 +227,7 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_with_correct_credentials_but_wrong_id(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - - response = yield client.fetch(url, method='POST', body=self.body) + response = yield self.async_post(url, self.body) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -257,13 +242,9 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_with_correct_credentials_user_bar(self): - url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - body = self.body.replace('robey', 'bar') - response = yield client.fetch(url, method='POST', body=body) + url = self.get_url('/') + response = yield self.async_post(url, body) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -307,14 +288,10 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_auth_with_valid_pubkey_by_urlencoded_form(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - privatekey = read_file(make_tests_data_path('user_rsa_key')) self.body_dict.update(privatekey=privatekey) body = urlencode(self.body_dict) - response = yield client.fetch(url, method='POST', body=body) + response = yield self.async_post(url, body) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -330,10 +307,6 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_auth_with_valid_pubkey_by_multipart_form(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - privatekey = read_file(make_tests_data_path('user_rsa_key')) files = [('privatekey', 'user_rsa_key', privatekey)] content_type, body = encode_multipart_formdata(self.body_dict.items(), @@ -341,8 +314,7 @@ class TestApp(AsyncHTTPTestCase): headers = { 'Content-Type': content_type, 'content-length': str(len(body)) } - response = yield client.fetch(url, method='POST', headers=headers, - body=body) + response = yield self.async_post(url, body, headers=headers) data = json.loads(to_str(response.body)) self.assertIsNone(data['status']) self.assertIsNotNone(data['id']) @@ -358,10 +330,6 @@ class TestApp(AsyncHTTPTestCase): @tornado.testing.gen_test def test_app_auth_with_invalid_pubkey_for_user_robey(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - privatekey = 'h' * 1024 files = [('privatekey', 'user_rsa_key', privatekey)] content_type, body = encode_multipart_formdata(self.body_dict.items(), @@ -371,22 +339,16 @@ class TestApp(AsyncHTTPTestCase): } if swallow_http_errors: - response = yield client.fetch(url, method='POST', headers=headers, - body=body) + response = yield self.async_post(url, body, headers=headers) self.assertIn(b'Invalid private key', response.body) else: with self.assertRaises(HTTPError) as ctx: - yield client.fetch(url, method='POST', headers=headers, - body=body) + yield self.async_post(url, body, headers=headers) self.assertIn('Bad Request', ctx.exception.message) @tornado.testing.gen_test def test_app_auth_with_pubkey_exceeds_key_max_size(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - privatekey = 'h' * (handler.KEY_MAX_SIZE * 2) files = [('privatekey', 'user_rsa_key', privatekey)] content_type, body = encode_multipart_formdata(self.body_dict.items(), @@ -395,22 +357,16 @@ class TestApp(AsyncHTTPTestCase): 'Content-Type': content_type, 'content-length': str(len(body)) } if swallow_http_errors: - response = yield client.fetch(url, method='POST', headers=headers, - body=body) + response = yield self.async_post(url, body, headers=headers) self.assertIn(b'Invalid private key', response.body) else: with self.assertRaises(HTTPError) as ctx: - yield client.fetch(url, method='POST', headers=headers, - body=body) + yield self.async_post(url, body, headers=headers) self.assertIn('Bad Request', ctx.exception.message) @tornado.testing.gen_test def test_app_auth_with_pubkey_cannot_be_decoded_by_multipart_form(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - privatekey = 'h' * 1024 files = [('privatekey', 'user_rsa_key', privatekey)] content_type, body = encode_multipart_formdata(self.body_dict.items(), @@ -422,22 +378,16 @@ class TestApp(AsyncHTTPTestCase): 'Content-Type': content_type, 'content-length': str(len(body)) } if swallow_http_errors: - response = yield client.fetch(url, method='POST', headers=headers, - body=body) + response = yield self.async_post(url, body, headers=headers) self.assertIn(b'Invalid unicode', response.body) else: with self.assertRaises(HTTPError) as ctx: - yield client.fetch(url, method='POST', headers=headers, - body=body) + yield self.async_post(url, body, headers=headers) self.assertIn('Bad Request', ctx.exception.message) @tornado.testing.gen_test def test_app_post_form_with_large_body_size_by_multipart_form(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - privatekey = 'h' * (2 * max_body_size) files = [('privatekey', 'user_rsa_key', privatekey)] content_type, body = encode_multipart_formdata(self.body_dict.items(), @@ -447,33 +397,24 @@ class TestApp(AsyncHTTPTestCase): } with self.assertRaises(HTTPError) as ctx: - yield client.fetch(url, method='POST', headers=headers, - body=body) + yield self.async_post(url, body, headers=headers) self.assertIn('Bad Request', ctx.exception.message) @tornado.testing.gen_test def test_app_post_form_with_large_body_size_by_urlencoded_form(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - privatekey = 'h' * (2 * max_body_size) body = self.body + '&privatekey=' + privatekey with self.assertRaises(HTTPError) as ctx: - yield client.fetch(url, method='POST', body=body) + yield self.async_post(url, body) self.assertIn('Bad Request', ctx.exception.message) @tornado.testing.gen_test def test_app_with_user_keyonly_for_bad_authentication_type(self): url = self.get_url('/') - client = self.get_http_client() - response = yield client.fetch(url) - self.assertEqual(response.code, 200) - self.body_dict.update(username='keyonly', password='foo') body = urlencode(self.body_dict) - response = yield client.fetch(url, method='POST', body=body) + response = yield self.async_post(url, body) self.assertEqual(response.code, 200) data = json.loads(to_str(response.body)) self.assertIsNone(data['id'])