webssh

Web based ssh client https://github.com/huashengdun/webssh webssh.huashengdun.org/
git clone http://git.hanabi.in/repos/webssh.git
Log | Files | Refs | README | LICENSE

commit 049baad90948ce9774ca55ef208a79f06806926c
parent afcf8b52cc9c14f0c93671e379a8736f8df0c885
Author: Sheng <webmaster0115@gmail.com>
Date:   Sun,  6 Oct 2019 15:18:23 +0800

Try to detect the encoding set by the user

Diffstat:
Mtests/sshserver.py | 30++++++++++++++++++++++--------
Mtests/test_app.py | 19++++++++++++-------
Mwebssh/handler.py | 36+++++++++++++++++++++++-------------
3 files changed, 57 insertions(+), 28 deletions(-)

diff --git a/tests/sshserver.py b/tests/sshserver.py @@ -51,16 +51,24 @@ class Server(paramiko.ServerInterface): b'UWT10hcuO4Ks8=') good_pub_key = paramiko.RSAKey(data=decodebytes(data)) + commands = [ + b'$SHELL -ilc "locale charmap"', + b'$SHELL -ic "locale charmap"' + ] encodings = ['UTF-8', 'GBK', 'UTF-8\r\n', 'GBK\r\n'] - def __init__(self, encoding=None): + def __init__(self, encodings=[]): self.shell_event = threading.Event() self.exec_event = threading.Event() - self.encoding = random.choice(self.encodings) - self.bad_encoding = encoding + self.cmd_to_enc = self.get_cmd2enc(encodings) self.password_verified = False self.key_verified = False + def get_cmd2enc(self, encodings): + while len(encodings) < 2: + encodings.append(random.choice(self.encodings)) + return dict(zip(self.commands, encodings[0:2])) + def check_channel_request(self, kind, chanid): if kind == 'session': return paramiko.OPEN_SUCCEEDED @@ -123,11 +131,12 @@ class Server(paramiko.ServerInterface): return 'password,publickey' def check_channel_exec_request(self, channel, command): - if command != b'locale charmap': + if command not in self.commands: ret = False else: ret = True - channel.send(self.bad_encoding or self.encoding) + self.encoding = self.cmd_to_enc[command] + channel.send(self.encoding) channel.shutdown(1) self.exec_event.set() return ret @@ -146,7 +155,7 @@ class Server(paramiko.ServerInterface): return True -def run_ssh_server(port=2200, running=True, encoding=None): +def run_ssh_server(port=2200, running=True, encodings=[]): # now connect sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -160,7 +169,7 @@ def run_ssh_server(port=2200, running=True, encoding=None): t = paramiko.Transport(client) t.load_server_moduli() t.add_server_key(host_key) - server = Server(encoding) + server = Server(encodings) try: t.start_server(server=server) except Exception as e: @@ -188,7 +197,12 @@ def run_ssh_server(port=2200, running=True, encoding=None): # chan.send('\r\n\r\nWelcome!\r\n\r\n') print(server.encoding) - chan.send(banner.encode(server.encoding.strip())) + try: + banner_encoded = banner.encode(server.encoding) + except (ValueError, LookupError): + continue + + chan.send(banner_encoded) if username == 'bar': msg = chan.recv(1024) chan.send(msg) diff --git a/tests/test_app.py b/tests/test_app.py @@ -515,7 +515,7 @@ class OtherTestBase(TestAppBase): tdstream = '' maxconn = 20 origin = 'same' - encoding = '' + encodings = [] body = { 'hostname': '127.0.0.1', 'port': '', @@ -545,7 +545,7 @@ class OtherTestBase(TestAppBase): t = threading.Thread( target=run_ssh_server, - args=(self.sshserver_port, self.running, self.encoding) + args=(self.sshserver_port, self.running, self.encodings) ) t.setDaemon(True) t.start() @@ -768,19 +768,24 @@ class TestAppWithCrossOriginOperation(OtherTestBase): class TestAppWithBadEncoding(OtherTestBase): - encoding = b'\xe7\xbc\x96\xe7\xa0\x81' + encodings = [u'\u7f16\u7801'] @tornado.testing.gen_test def test_app_with_a_bad_encoding(self): response = yield self.async_post('/', self.body) - self.assertIn(b'Bad encoding', response.body) + dic = json.loads(to_str(response.body)) + self.assert_status_none(dic) + self.assertIn(dic['encoding'], ['UTF-8', 'GBK']) class TestAppWithUnknownEncoding(OtherTestBase): - encoding = u'UnknownEncoding' + encodings = [u'\u7f16\u7801', u'UnknownEncoding'] @tornado.testing.gen_test - def test_app_with_a_bad_encoding(self): + def test_app_with_a_unknown_encoding(self): response = yield self.async_post('/', self.body) - self.assertIn(b'Unknown encoding', response.body) + self.assert_status_none(json.loads(to_str(response.body))) + dic = json.loads(to_str(response.body)) + self.assert_status_none(dic) + self.assertEqual(dic['encoding'], 'utf-8') diff --git a/webssh/handler.py b/webssh/handler.py @@ -390,21 +390,31 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler): return args - def get_default_encoding(self, ssh): + def parse_encoding(self, data): try: - _, stdout, _ = ssh.exec_command('locale charmap') - except paramiko.SSHException as exc: - logging.warn(str(exc)) - return u'utf-8' - - try: - enc = to_str(stdout.read().strip(), 'ascii') + encoding = to_str(data, 'ascii') except UnicodeDecodeError: - raise ValueError('Bad encoding') - else: - if not is_valid_encoding(enc): - raise ValueError('Unknown encoding "{}"'.format(enc)) - return enc + return + + if is_valid_encoding(encoding): + return encoding + + def get_default_encoding(self, ssh): + commands = [ + '$SHELL -ilc "locale charmap"', + '$SHELL -ic "locale charmap"' + ] + + for command in commands: + _, stdout, _ = ssh.exec_command(command, get_pty=True) + data = stdout.read().strip() + logging.debug('encoding: {}'.format(data)) + result = self.parse_encoding(data) + if result: + return result + + logging.warn('Could not detect the default ecnoding.') + return 'utf-8' def ssh_connect(self, args): ssh = self.ssh_client