commit d197133c95e274483925f89ca91d6ebe6f3828d9
parent c0eba0ebb3536413aac1f81415fd71209171e79b
Author: Sheng <webmaster0115@gmail.com>
Date: Sun, 7 Jul 2019 14:52:43 +0800
Support 2fa
Diffstat:
5 files changed, 202 insertions(+), 9 deletions(-)
diff --git a/tests/sshserver.py b/tests/sshserver.py
@@ -57,6 +57,8 @@ class Server(paramiko.ServerInterface):
self.shell_event = threading.Event()
self.exec_event = threading.Event()
self.encoding = random.choice(self.encodings)
+ self.password_verified = False
+ self.key_verified = False
def check_channel_request(self, kind, chanid):
if kind == 'session':
@@ -73,11 +75,50 @@ class Server(paramiko.ServerInterface):
print('Auth attempt with username: {!r} & key: {!r}'.format(username, u(hexlify(key.get_fingerprint())))) # noqa
if (username in ['robey', 'keyonly']) and (key == self.good_pub_key):
return paramiko.AUTH_SUCCESSFUL
+ if username == 'pkey2fa' and key == self.good_pub_key:
+ self.key_verified = True
+ return paramiko.AUTH_PARTIALLY_SUCCESSFUL
return paramiko.AUTH_FAILED
+ def check_auth_interactive(self, username, submethods):
+ if username in ['pass2fa', 'pkey2fa']:
+ self.username = username
+ prompt = 'Verification code: ' if self.password_verified else 'Password: ' # noqa
+ print(username, prompt)
+ return paramiko.InteractiveQuery('', '', prompt)
+ return paramiko.AUTH_FAILED
+
+ def check_auth_interactive_response(self, responses):
+ if self.username in ['pass2fa', 'pkey2fa']:
+ if not self.password_verified:
+ if responses[0] == 'password':
+ print('password verified')
+ self.password_verified = True
+ if self.username == 'pkey2fa':
+ return self.check_auth_interactive(self.username, '')
+ else:
+ print('wrong password: {}'.format(responses[0]))
+ return paramiko.AUTH_FAILED
+ else:
+ if responses[0] == 'passcode':
+ print('totp verified')
+ return paramiko.AUTH_SUCCESSFUL
+ else:
+ print('wrong totp: {}'.format(responses[0]))
+ return paramiko.AUTH_FAILED
+ else:
+ return paramiko.AUTH_FAILED
+
def get_allowed_auths(self, username):
if username == 'keyonly':
return 'publickey'
+ if username == 'pass2fa':
+ return 'keyboard-interactive'
+ if username == 'pkey2fa':
+ if not self.key_verified:
+ return 'publickey'
+ else:
+ return 'keyboard-interactive'
return 'password,publickey'
def check_channel_exec_request(self, channel, command):
diff --git a/tests/test_app.py b/tests/test_app.py
@@ -444,6 +444,73 @@ class TestAppBasic(TestAppBase):
self.assertEqual(response.code, 200)
self.assert_status_in(json.loads(to_str(response.body)), 'Bad authentication type') # noqa
+ @tornado.testing.gen_test
+ def test_app_with_user_pass2fa_with_correct_password_and_passcode(self):
+ self.body_dict.update(username='pass2fa', password='password',
+ totp='passcode')
+ response = yield self.async_post('/', self.body_dict)
+ self.assertEqual(response.code, 200)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pass2fa_with_wrong_password(self):
+ self.body_dict.update(username='pass2fa', password='wrongpassword',
+ totp='passcode')
+ response = yield self.async_post('/', self.body_dict)
+ self.assertEqual(response.code, 200)
+ data = json.loads(to_str(response.body))
+ self.assertIn('Authentication failed', data['status'])
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pass2fa_with_wrong_passcode(self):
+ self.body_dict.update(username='pass2fa', password='password',
+ totp='wrongpasscode')
+ response = yield self.async_post('/', self.body_dict)
+ self.assertEqual(response.code, 200)
+ data = json.loads(to_str(response.body))
+ self.assertIn('Authentication failed', data['status'])
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pass2fa_with_wrong_pkey_correct_passwords(self): # noqa
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pass2fa', password='password',
+ privatekey=privatekey, totp='passcode')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pkey2fa_with_correct_password_and_passcode(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pkey2fa', password='password',
+ privatekey=privatekey, totp='passcode')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assert_status_none(data)
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pkey2fa_with_wrong_password(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pkey2fa', password='wrongpassword',
+ privatekey=privatekey, totp='passcode')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assertIn('Authentication failed', data['status'])
+
+ @tornado.testing.gen_test
+ def test_app_with_user_pkey2fa_with_wrong_passcode(self):
+ url = self.get_url('/')
+ privatekey = read_file(make_tests_data_path('user_rsa_key'))
+ self.body_dict.update(username='pkey2fa', password='password',
+ privatekey=privatekey, totp='wrongpasscode')
+ response = yield self.async_post(url, self.body_dict)
+ data = json.loads(to_str(response.body))
+ self.assertIn('Authentication failed', data['status'])
+
class OtherTestBase(TestAppBase):
sshserver_port = 3300
diff --git a/webssh/handler.py b/webssh/handler.py
@@ -36,6 +36,78 @@ swallow_http_errors = True
redirecting = None
+def make_handler(password, totp):
+
+ def handler(title, instructions, prompt_list):
+ answers = []
+ for prompt_, _ in prompt_list:
+ prompt = prompt_.strip().lower()
+ if prompt.startswith('password'):
+ answers.append(password)
+ elif prompt.startswith('verification'):
+ answers.append(totp)
+ else:
+ raise ValueError('Unknown prompt: {}'.format(prompt_))
+ return answers
+
+ return handler
+
+
+def auth_interactive(transport, username, handler):
+ if not handler:
+ raise ValueError('Need a verification code for 2fa.')
+ transport.auth_interactive(username, handler)
+
+
+def auth(self, username, password, pkey, *args):
+ handler = None
+ saved_exception = None
+ two_factor = False
+ allowed_types = set()
+ two_factor_types = {"keyboard-interactive", "password"}
+
+ if self._totp:
+ handler = make_handler(password, self._totp)
+
+ if pkey is not None:
+ logging.info('Trying public key authentication')
+ try:
+ allowed_types = set(
+ self._transport.auth_publickey(username, pkey)
+ )
+ two_factor = allowed_types & two_factor_types
+ if not two_factor:
+ return
+ except paramiko.SSHException as e:
+ saved_exception = e
+
+ if two_factor:
+ logging.info('Trying publickey 2fa')
+ return auth_interactive(self._transport, username, handler)
+
+ if password is not None:
+ logging.info('Trying password authentication')
+ try:
+ self._transport.auth_password(username, password)
+ return
+ except paramiko.SSHException as e:
+ saved_exception = e
+ allowed_types = set(getattr(e, 'allowed_types', []))
+ two_factor = allowed_types & two_factor_types
+
+ if two_factor:
+ logging.info('Trying password 2fa')
+ return auth_interactive(self._transport, username, handler)
+
+ # if we got an auth-failed exception earlier, re-raise it
+ if saved_exception is not None:
+ raise saved_exception
+ raise paramiko.SSHException("No authentication methods available")
+
+
+paramiko.client.SSHClient._auth = auth
+
+
class InvalidValueError(Exception):
pass
@@ -306,18 +378,24 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
def get_args(self):
hostname = self.get_hostname()
port = self.get_port()
- if isinstance(self.policy, paramiko.RejectPolicy):
- self.lookup_hostname(hostname, port)
username = self.get_value('username')
password = self.get_argument('password', u'')
- passphrase = self.get_argument('passphrase', u'')
privatekey, filename = self.get_privatekey()
+ passphrase = self.get_argument('passphrase', u'')
+ totp = self.get_argument('totp', u'')
+
+ if isinstance(self.policy, paramiko.RejectPolicy):
+ self.lookup_hostname(hostname, port)
+
if privatekey:
pkey = PrivateKey(privatekey, passphrase, filename).get_pkey_obj()
else:
pkey = None
+
+ self.ssh_client._totp = totp
args = (hostname, port, username, password, pkey)
logging.debug(args)
+
return args
def get_default_encoding(self, ssh):
@@ -336,9 +414,7 @@ class IndexHandler(MixinHandler, tornado.web.RequestHandler):
logging.info('Connecting to {}:{}'.format(*dst_addr))
try:
- ssh.connect(
- *args, timeout=6, allow_agent=False, look_for_keys=False
- )
+ ssh.connect(*args, timeout=6)
except socket.error:
raise ValueError('Unable to connect to {}:{}'.format(*dst_addr))
except paramiko.BadAuthenticationType:
diff --git a/webssh/static/js/main.js b/webssh/static/js/main.js
@@ -517,7 +517,7 @@ jQuery(function($){
function clean_data(data) {
var i, attr, val;
- var attrs = fields.concat(['password', 'privatekey', 'passphrase']);
+ var attrs = fields.concat(['password', 'privatekey', 'passphrase', 'totp']);
for (i = 0; i < attrs.length; i++) {
attr = attrs[i];
@@ -668,7 +668,7 @@ jQuery(function($){
}
- function connect(hostname, port, username, password, privatekey, passphrase) {
+ function connect(hostname, port, username, password, privatekey, passphrase, totp) {
// for console use
var result, opts;
@@ -687,7 +687,8 @@ jQuery(function($){
username: username,
password: password,
privatekey: privatekey,
- passphrase: passphrase
+ passphrase: passphrase,
+ totp: totp
};
} else {
opts = hostname;
diff --git a/webssh/templates/index.html b/webssh/templates/index.html
@@ -59,6 +59,14 @@
<input class="form-control" type="password" name="passphrase" value="">
</div>
</div>
+ <div class="row">
+ <div class="col">
+ <label for="totp">Totp (time-based one-time password)</label>
+ <input class="form-control" type="password" name="totp" value="">
+ </div>
+ <div class="col">
+ </div>
+ </div>
{% module xsrf_form_html() %}
<button type="submit" class="btn btn-primary">Connect</button>
<button type="reset" class="btn btn-danger">Reset</button>