webssh

Web based ssh client https://github.com/huashengdun/webssh webssh.huashengdun.org/
git clone http://git.hanabi.in/repos/webssh.git
Log | Files | Refs | README | LICENSE

sshserver.py (8028B)


      1 #!/usr/bin/env python
      2 
      3 # Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com>
      4 #
      5 # This file is part of paramiko.
      6 #
      7 # Paramiko is free software; you can redistribute it and/or modify it under the
      8 # terms of the GNU Lesser General Public License as published by the Free
      9 # Software Foundation; either version 2.1 of the License, or (at your option)
     10 # any later version.
     11 #
     12 # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
     13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     14 # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
     15 # details.
     16 #
     17 # You should have received a copy of the GNU Lesser General Public License
     18 # along with Paramiko; if not, write to the Free Software Foundation, Inc.,
     19 # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
     20 
     21 import random
     22 import socket
     23 # import sys
     24 import threading
     25 # import traceback
     26 import paramiko
     27 
     28 from binascii import hexlify
     29 from paramiko.py3compat import u, decodebytes
     30 from tests.utils import make_tests_data_path
     31 
     32 
     33 # setup logging
     34 paramiko.util.log_to_file(make_tests_data_path('sshserver.log'))
     35 
     36 host_key = paramiko.RSAKey(filename=make_tests_data_path('test_rsa.key'))
     37 # host_key = paramiko.DSSKey(filename='test_dss.key')
     38 
     39 print('Read key: ' + u(hexlify(host_key.get_fingerprint())))
     40 
     41 banner = u'\r\n\u6b22\u8fce\r\n'
     42 event_timeout = 5
     43 
     44 
     45 class Server(paramiko.ServerInterface):
     46     # 'data' is the output of base64.b64encode(key)
     47     # (using the "user_rsa_key" files)
     48     data = (b'AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp'
     49             b'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC'
     50             b'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT'
     51             b'UWT10hcuO4Ks8=')
     52     good_pub_key = paramiko.RSAKey(data=decodebytes(data))
     53 
     54     commands = [
     55         b'$SHELL -ilc "locale charmap"',
     56         b'$SHELL -ic "locale charmap"'
     57     ]
     58     encodings = ['UTF-8', 'GBK', 'UTF-8\r\n', 'GBK\r\n']
     59 
     60     def __init__(self, encodings=[]):
     61         self.shell_event = threading.Event()
     62         self.exec_event = threading.Event()
     63         self.cmd_to_enc = self.get_cmd2enc(encodings)
     64         self.password_verified = False
     65         self.key_verified = False
     66 
     67     def get_cmd2enc(self, encodings):
     68         n = len(self.commands)
     69         while len(encodings) < n:
     70             encodings.append(random.choice(self.encodings))
     71         return dict(zip(self.commands, encodings[0:n]))
     72 
     73     def check_channel_request(self, kind, chanid):
     74         if kind == 'session':
     75             return paramiko.OPEN_SUCCEEDED
     76         return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
     77 
     78     def check_auth_password(self, username, password):
     79         print('Auth attempt with username: {!r} & password: {!r}'.format(username, password)) # noqa
     80         if (username in ['robey', 'bar', 'foo']) and (password == 'foo'):
     81             return paramiko.AUTH_SUCCESSFUL
     82         return paramiko.AUTH_FAILED
     83 
     84     def check_auth_publickey(self, username, key):
     85         print('Auth attempt with username: {!r} & key: {!r}'.format(username, u(hexlify(key.get_fingerprint())))) # noqa
     86         if (username in ['robey', 'keyonly']) and (key == self.good_pub_key):
     87             return paramiko.AUTH_SUCCESSFUL
     88         if username == 'pkey2fa' and key == self.good_pub_key:
     89             self.key_verified = True
     90             return paramiko.AUTH_PARTIALLY_SUCCESSFUL
     91         return paramiko.AUTH_FAILED
     92 
     93     def check_auth_interactive(self, username, submethods):
     94         if username in ['pass2fa', 'pkey2fa']:
     95             self.username = username
     96             prompt = 'Verification code: ' if self.password_verified else 'Password: '  # noqa
     97             print(username, prompt)
     98             return paramiko.InteractiveQuery('', '', prompt)
     99         return paramiko.AUTH_FAILED
    100 
    101     def check_auth_interactive_response(self, responses):
    102         if self.username in ['pass2fa', 'pkey2fa']:
    103             if not self.password_verified:
    104                 if responses[0] == 'password':
    105                     print('password verified')
    106                     self.password_verified = True
    107                     if self.username == 'pkey2fa':
    108                         return self.check_auth_interactive(self.username, '')
    109                 else:
    110                     print('wrong password: {}'.format(responses[0]))
    111                     return paramiko.AUTH_FAILED
    112             else:
    113                 if responses[0] == 'passcode':
    114                     print('totp verified')
    115                     return paramiko.AUTH_SUCCESSFUL
    116                 else:
    117                     print('wrong totp: {}'.format(responses[0]))
    118                     return paramiko.AUTH_FAILED
    119         else:
    120             return paramiko.AUTH_FAILED
    121 
    122     def get_allowed_auths(self, username):
    123         if username == 'keyonly':
    124             return 'publickey'
    125         if username == 'pass2fa':
    126             return 'keyboard-interactive'
    127         if username == 'pkey2fa':
    128             if not self.key_verified:
    129                 return 'publickey'
    130             else:
    131                 return 'keyboard-interactive'
    132         return 'password,publickey'
    133 
    134     def check_channel_exec_request(self, channel, command):
    135         if command not in self.commands:
    136             ret = False
    137         else:
    138             ret = True
    139             self.encoding = self.cmd_to_enc[command]
    140             channel.send(self.encoding)
    141             channel.shutdown(1)
    142         self.exec_event.set()
    143         return ret
    144 
    145     def check_channel_shell_request(self, channel):
    146         self.shell_event.set()
    147         return True
    148 
    149     def check_channel_pty_request(self, channel, term, width, height,
    150                                   pixelwidth, pixelheight, modes):
    151         return True
    152 
    153     def check_channel_window_change_request(self, channel, width, height,
    154                                             pixelwidth, pixelheight):
    155         channel.send('resized')
    156         return True
    157 
    158 
    159 def run_ssh_server(port=2200, running=True, encodings=[]):
    160     # now connect
    161     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    162     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    163     sock.bind(('127.0.0.1', port))
    164     sock.listen(100)
    165 
    166     while running:
    167         client, addr = sock.accept()
    168         print('Got a connection!')
    169 
    170         t = paramiko.Transport(client)
    171         t.load_server_moduli()
    172         t.add_server_key(host_key)
    173         server = Server(encodings)
    174         try:
    175             t.start_server(server=server)
    176         except Exception as e:
    177             print(e)
    178             continue
    179 
    180         # wait for auth
    181         chan = t.accept(2)
    182         if chan is None:
    183             print('*** No channel.')
    184             continue
    185 
    186         username = t.get_username()
    187         print('{} Authenticated!'.format(username))
    188 
    189         server.shell_event.wait(timeout=event_timeout)
    190         if not server.shell_event.is_set():
    191             print('*** Client never asked for a shell.')
    192             continue
    193 
    194         server.exec_event.wait(timeout=event_timeout)
    195         if not server.exec_event.is_set():
    196             print('*** Client never asked for a command.')
    197             continue
    198 
    199         # chan.send('\r\n\r\nWelcome!\r\n\r\n')
    200         print(server.encoding)
    201         try:
    202             banner_encoded = banner.encode(server.encoding)
    203         except (ValueError, LookupError):
    204             continue
    205 
    206         chan.send(banner_encoded)
    207         if username == 'bar':
    208             msg = chan.recv(1024)
    209             chan.send(msg)
    210         elif username == 'foo':
    211             lst = []
    212             while True:
    213                 msg = chan.recv(32 * 1024)
    214                 lst.append(msg)
    215                 if msg.endswith(b'\r\n\r\n'):
    216                     break
    217             data = b''.join(lst)
    218             while data:
    219                 s = chan.send(data)
    220                 data = data[s:]
    221         else:
    222             chan.close()
    223             t.close()
    224             client.close()
    225 
    226     try:
    227         sock.close()
    228     except Exception:
    229         pass
    230 
    231 
    232 if __name__ == '__main__':
    233     run_ssh_server()