sshserver.py (8028B)
1 #!/usr/bin/env python 2 3 # Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 4 # 5 # This file is part of paramiko. 6 # 7 # Paramiko is free software; you can redistribute it and/or modify it under the 8 # terms of the GNU Lesser General Public License as published by the Free 9 # Software Foundation; either version 2.1 of the License, or (at your option) 10 # any later version. 11 # 12 # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 14 # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 15 # details. 16 # 17 # You should have received a copy of the GNU Lesser General Public License 18 # along with Paramiko; if not, write to the Free Software Foundation, Inc., 19 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. 20 21 import random 22 import socket 23 # import sys 24 import threading 25 # import traceback 26 import paramiko 27 28 from binascii import hexlify 29 from paramiko.py3compat import u, decodebytes 30 from tests.utils import make_tests_data_path 31 32 33 # setup logging 34 paramiko.util.log_to_file(make_tests_data_path('sshserver.log')) 35 36 host_key = paramiko.RSAKey(filename=make_tests_data_path('test_rsa.key')) 37 # host_key = paramiko.DSSKey(filename='test_dss.key') 38 39 print('Read key: ' + u(hexlify(host_key.get_fingerprint()))) 40 41 banner = u'\r\n\u6b22\u8fce\r\n' 42 event_timeout = 5 43 44 45 class Server(paramiko.ServerInterface): 46 # 'data' is the output of base64.b64encode(key) 47 # (using the "user_rsa_key" files) 48 data = (b'AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp' 49 b'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC' 50 b'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT' 51 b'UWT10hcuO4Ks8=') 52 good_pub_key = paramiko.RSAKey(data=decodebytes(data)) 53 54 commands = [ 55 b'$SHELL -ilc "locale charmap"', 56 b'$SHELL -ic "locale charmap"' 57 ] 58 encodings = ['UTF-8', 'GBK', 'UTF-8\r\n', 'GBK\r\n'] 59 60 def __init__(self, encodings=[]): 61 self.shell_event = threading.Event() 62 self.exec_event = threading.Event() 63 self.cmd_to_enc = self.get_cmd2enc(encodings) 64 self.password_verified = False 65 self.key_verified = False 66 67 def get_cmd2enc(self, encodings): 68 n = len(self.commands) 69 while len(encodings) < n: 70 encodings.append(random.choice(self.encodings)) 71 return dict(zip(self.commands, encodings[0:n])) 72 73 def check_channel_request(self, kind, chanid): 74 if kind == 'session': 75 return paramiko.OPEN_SUCCEEDED 76 return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED 77 78 def check_auth_password(self, username, password): 79 print('Auth attempt with username: {!r} & password: {!r}'.format(username, password)) # noqa 80 if (username in ['robey', 'bar', 'foo']) and (password == 'foo'): 81 return paramiko.AUTH_SUCCESSFUL 82 return paramiko.AUTH_FAILED 83 84 def check_auth_publickey(self, username, key): 85 print('Auth attempt with username: {!r} & key: {!r}'.format(username, u(hexlify(key.get_fingerprint())))) # noqa 86 if (username in ['robey', 'keyonly']) and (key == self.good_pub_key): 87 return paramiko.AUTH_SUCCESSFUL 88 if username == 'pkey2fa' and key == self.good_pub_key: 89 self.key_verified = True 90 return paramiko.AUTH_PARTIALLY_SUCCESSFUL 91 return paramiko.AUTH_FAILED 92 93 def check_auth_interactive(self, username, submethods): 94 if username in ['pass2fa', 'pkey2fa']: 95 self.username = username 96 prompt = 'Verification code: ' if self.password_verified else 'Password: ' # noqa 97 print(username, prompt) 98 return paramiko.InteractiveQuery('', '', prompt) 99 return paramiko.AUTH_FAILED 100 101 def check_auth_interactive_response(self, responses): 102 if self.username in ['pass2fa', 'pkey2fa']: 103 if not self.password_verified: 104 if responses[0] == 'password': 105 print('password verified') 106 self.password_verified = True 107 if self.username == 'pkey2fa': 108 return self.check_auth_interactive(self.username, '') 109 else: 110 print('wrong password: {}'.format(responses[0])) 111 return paramiko.AUTH_FAILED 112 else: 113 if responses[0] == 'passcode': 114 print('totp verified') 115 return paramiko.AUTH_SUCCESSFUL 116 else: 117 print('wrong totp: {}'.format(responses[0])) 118 return paramiko.AUTH_FAILED 119 else: 120 return paramiko.AUTH_FAILED 121 122 def get_allowed_auths(self, username): 123 if username == 'keyonly': 124 return 'publickey' 125 if username == 'pass2fa': 126 return 'keyboard-interactive' 127 if username == 'pkey2fa': 128 if not self.key_verified: 129 return 'publickey' 130 else: 131 return 'keyboard-interactive' 132 return 'password,publickey' 133 134 def check_channel_exec_request(self, channel, command): 135 if command not in self.commands: 136 ret = False 137 else: 138 ret = True 139 self.encoding = self.cmd_to_enc[command] 140 channel.send(self.encoding) 141 channel.shutdown(1) 142 self.exec_event.set() 143 return ret 144 145 def check_channel_shell_request(self, channel): 146 self.shell_event.set() 147 return True 148 149 def check_channel_pty_request(self, channel, term, width, height, 150 pixelwidth, pixelheight, modes): 151 return True 152 153 def check_channel_window_change_request(self, channel, width, height, 154 pixelwidth, pixelheight): 155 channel.send('resized') 156 return True 157 158 159 def run_ssh_server(port=2200, running=True, encodings=[]): 160 # now connect 161 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 162 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 163 sock.bind(('127.0.0.1', port)) 164 sock.listen(100) 165 166 while running: 167 client, addr = sock.accept() 168 print('Got a connection!') 169 170 t = paramiko.Transport(client) 171 t.load_server_moduli() 172 t.add_server_key(host_key) 173 server = Server(encodings) 174 try: 175 t.start_server(server=server) 176 except Exception as e: 177 print(e) 178 continue 179 180 # wait for auth 181 chan = t.accept(2) 182 if chan is None: 183 print('*** No channel.') 184 continue 185 186 username = t.get_username() 187 print('{} Authenticated!'.format(username)) 188 189 server.shell_event.wait(timeout=event_timeout) 190 if not server.shell_event.is_set(): 191 print('*** Client never asked for a shell.') 192 continue 193 194 server.exec_event.wait(timeout=event_timeout) 195 if not server.exec_event.is_set(): 196 print('*** Client never asked for a command.') 197 continue 198 199 # chan.send('\r\n\r\nWelcome!\r\n\r\n') 200 print(server.encoding) 201 try: 202 banner_encoded = banner.encode(server.encoding) 203 except (ValueError, LookupError): 204 continue 205 206 chan.send(banner_encoded) 207 if username == 'bar': 208 msg = chan.recv(1024) 209 chan.send(msg) 210 elif username == 'foo': 211 lst = [] 212 while True: 213 msg = chan.recv(32 * 1024) 214 lst.append(msg) 215 if msg.endswith(b'\r\n\r\n'): 216 break 217 data = b''.join(lst) 218 while data: 219 s = chan.send(data) 220 data = data[s:] 221 else: 222 chan.close() 223 t.close() 224 client.close() 225 226 try: 227 sock.close() 228 except Exception: 229 pass 230 231 232 if __name__ == '__main__': 233 run_ssh_server()