作业需求:
1. 用户登陆
2. 上传/下载文件
3. 不同用户家目录不同
4. 查看当前目录下文件
5. 充分使用面向对象知识
思维图:
待补充()
思维分析:
1.用户登陆保存到文件对比用户名密码
2.上传用json序列化文件名,文件路径,文件大小传给服务器端。根据得到的字段内容操作上传动作
3.下载代码和上传基本可以互换,因为文件名都一样所以传一个文件大小即可
4.查看当前目录下文件,调用cd命令,既然能分解get 和put动作就可以看cd动作
5.添加了LINUX和Windows不同系统的命令,路径,判断,命令有cd,ls,mkdir,等
6.添加了少部分的异常处理
7.添加了 IO多路复用SocketServer
8.未完成日志,md5校验的功能
README:
作者:yaobin 版本:简单Ftp示例版本 v0.1 开发环境: python3.6 程序介绍: 1. 用户登陆 2. 上传/下载文件 3. 不同用户家目录不同 4. 查看当前目录下文件 5. 充分使用面向对象知识 使用说明: 1.可以在Linux和Windows都可以运行 2.Linux调用了cd,mkdir,ls,rm,命令 3.Windows调用了cd,md,dir,del,命令 On Linux,Windows Client: Python3 ./Ftp_Client.py put 上传 get 下载 mkdir 创建目录 ls 查看文件信息 rm 删除文件 drm 删除目录 Server:Python3 ./Ftp_Server.py 文件目录结构: ├─简单Ftp #程序执行目录 │ README │ __init__.py │ ├─bin │ Ftp_Client.py #客户端程序 │ Ftp_Server.py #服务器端程序 │ __init__.py │ ├─conf │ │ setting.py #配置文件 │ │ __init__.py │ │ │ └─__pycache__ │ setting.cpython-36.pyc │ __init__.cpython-36.pyc │ ├─core │ │ auth.py #用户验证逻辑交互 │ │ commands.py #命令逻辑交互 │ │ ftp_client.py #sock_客户端逻辑交互 │ │ ftp_server.py #sock_服务端逻辑交互 │ │ logger.py #日志逻辑交互---未完成 │ │ main.py #客户端程序 │ │ __init__.py │ │ │ └─__pycache__ │ auth.cpython-36.pyc │ commands.cpython-36.pyc │ ftp_client.cpython-36.pyc │ ftp_server.cpython-36.pyc │ main.cpython-36.pyc │ __init__.cpython-36.pyc │ ├─db │ │ __init__.py │ │ │ ├─colin #用户目录 │ │ │ colin.bak │ │ │ colin.dat #用户账号密码文件 │ │ │ colin.dir │ │ │ __init__.py │ │ │ │ │ └─colin #用户ftp家目录 │ │ │ __init__.py │ │ │ │ │ └─aaa │ ├─pub #ftp程序模拟pub目录 │ │ FTP作业.7z │ │ socket通信client.py │ │ __init__.py │ │ 选课系统.png │ │ │ └─user_path #用户路径文件,判断用户进入系统后在哪里处理 │ path │ ├─logs #日志未完成 │ access.log │ transmission.log │ __init__.py │ ├─src │ │ auth_class.py #用户验证类 │ │ linux_cmd_class.py #linux命令类 │ │ server_class.py #server_socket类 │ │ windows_cmd_class.py #server命令类 │ │ __init__.py │ │ │ └─__pycache__ │ auth_class.cpython-36.pyc │ linux_cmd_class.cpython-36.pyc │ server_class.cpython-36.pyc │ windows_cmd_class.cpython-36.pyc │ __init__.cpython-36.pyc │ └─test #测试 args_test.py __init__.py
FTP程序代码:
bin/Ftp_client.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import os,sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加环境变量 from core.main import Admin if __name__ == ‘__main__‘: Admin.run(‘‘)
bin/Ftp_Server.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import os,sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加环境变量 from core import ftp_server if __name__ == ‘__main__‘: ftp_server.socketserver.ThreadingTCPServer((‘127.0.0.1‘, 62000), ftp_server.Ftp_server).serve_forever()
conf/setting.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import logging import os,sys,platform,json BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加环境变量 os_res = platform.system() if os_res == ‘Windows‘: data_path = os.path.join(BASE_DIR + ‘\db‘) ftp_path = os.path.join(BASE_DIR + ‘\db\pub‘) path_file = os.path.join(BASE_DIR + ‘\db\\user_path\path‘) else: data_path = os.path.join(BASE_DIR + ‘/db‘) ftp_path = os.path.join(BASE_DIR + ‘\db\pub‘) path_file = os.path.join(BASE_DIR + ‘/db/user_path/path‘) with open(path_file, ‘r‘, encoding=‘utf-8‘)as f: file = f.readlines() file_object=file[0]
core/auth.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import os,sys,shelve BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加环境变量 from conf import setting from src.auth_class import Auth from core.commands import Commands class Auth_ftp(object): def __init__(self,username,user_passwd): self.user_data = {} self.username = username #用户名 self.username_passwd = user_passwd #密码 os_res = setting.platform.system() if os_res == ‘Windows‘: #用户密码文件 self.passwd_data_path = os.path.join(setting.data_path + ‘\\‘+ username + ‘\\‘+ username + ‘.‘ + ‘dat‘) #Windows用户密码文件路径 self.passwd_data = os.path.join(setting.data_path + ‘\\‘ + username+ ‘\\‘ + username) # Windows用户密码文件 self.file_object = os.path.join(setting.data_path + ‘\\‘ + self.username) else: self.passwd_data_path = os.path.join(setting.data_path + ‘/‘ + username + ‘/‘ + username + ‘.‘ + ‘dat‘) # Linux用户密码文件路径 self.passwd_data = os.path.join(setting.data_path + ‘/‘ + username + ‘/‘ + username) # Linux用户密码文件 self.file_object = os.path.join(setting.data_path + ‘/‘ + username) user_obj = (self.username,self.username_passwd,self.passwd_data) # ‘‘‘用户名作为key,把用户名,密码,目录,写到字典中方便调用‘‘‘ self.user_data[self.username] = user_obj ‘‘‘验证用户是否存在‘‘‘ def auth_user_passwd(self): ‘‘‘判断文件路径是否存在然后返回用户是否存在‘‘‘ os_res = os.path.exists(self.passwd_data_path) if os_res !=False: user_file = shelve.open(self.passwd_data) if self.user_data[self.username][0] in user_file and user_file[self.username][1] == self.username_passwd: print("Welcome,%s,您的身份验证成功"%self.username) user_file.close() else: return False else: return True def add_user_passwd(self): res = os.path.exists(self.file_object) if res != True: Commands(self.file_object).mkdir() #用户账号密码文件 Commands(self.passwd_data).mkdir() #用户上传下载目录 user_file = shelve.open(self.passwd_data) user_file.update(self.user_data) print("用户创建成功") else: print("账号信息出现问题,请联系管理员....")
core/commands.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import os,sys,platform BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加环境变量 from conf import setting from src.linux_cmd_class import L_commands from src.windows_cmd_class import W_commands class Commands(object): def __init__(self,file_object): self.file_object=file_object def cd1(self): if setting.os_res == ‘Windows‘: res = W_commands(self.file_object).cd1() return res elif setting.os_res == ‘Linux‘: res = L_commands(self.file_object).cd1() return res else: print(‘不支持此操作系统‘) ‘‘‘进入目录‘‘‘ def cd(self): if setting.os_res ==‘Windows‘: res= W_commands(self.file_object).cd() return res elif setting.os_res == ‘Linux‘: res=L_commands(self.file_object).cd() return res else: print(‘不支持此操作系统‘) ‘‘‘创建目录‘‘‘ def mkdir(self): if setting.os_res ==‘Windows‘: res= W_commands(self.file_object).mkdir() return res elif setting.os_res == ‘Linux‘: res=L_commands(self.file_object).mkdir() return res else: print(‘不支持此操作系统‘) ‘‘‘删除文件‘‘‘ def rm(self): if setting.os_res == ‘Windows‘: res=W_commands(self.file_object).rm() return res elif setting.os_res == ‘Linux‘: res=L_commands(self.file_object).rm() return res else: print(‘不支持此操作系统‘) ‘‘‘删除目录‘‘‘ def drm(self): if setting.os_res == ‘Windows‘: res=W_commands(self.file_object).drm() return res elif setting.os_res == ‘Linux‘: res=L_commands(self.file_object).drm() return res else: print(‘不支持此操作系统‘)
core/ftp_client.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import os, sys import platform, socket import time,hashlib,json BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) # 添加环境变量 from conf import setting from core.commands import Commands from conf import setting # from src.server_class import Server_class ‘‘‘定义client连接的协议和方式后面需要补充‘‘‘ ‘‘‘创建连接输入IP地址和密码‘‘‘ class Ftp_client(object): def link(self): try: os.chdir(setting.file_object) #直接进入FTP指定的用户目录 self.sending_msg_list = [] self.ip_addr = ‘127.0.0.1‘ self.ip_port = 62000 self.client = socket.socket() self.client.connect((self.ip_addr, self.ip_port)) while True: self.sending_msg = None self.data = self.client.recv(1024) print("[+]Server>>>recv: %s" % self.data.decode()) self.menu() sending_msg = input(‘请输入命令>>>:‘) self.sending_msg_list = sending_msg.split() if len(sending_msg) == 0: data_header = { ‘test1‘: { ‘action‘: ‘‘, ‘file_name‘: ‘‘, ‘size‘: 0 } } self.client.send(json.dumps(data_header).encode()) elif len(sending_msg) >= 2 : if setting.os_res == ‘Windows‘: try : new_path = self.sending_msg_list[1].encode(‘utf-8‘) self.res_new = self.sending_msg_list[1].strip().split(‘\\‘) # 截取获得文件名 self.file_name1 = self.res_new[-1] except IndexError: pass elif setting.os_res == ‘Linux‘: try: self.res_new = self.sending_msg_list[1].strip().split(‘/‘) self.file_name1 = self.res_new[-1] except IndexError: pass if self.sending_msg_list[0] == "put": try: self.put(self.sending_msg_list[1]) except IndexError: self.client.send(‘put‘.encode()) if self.sending_msg_list[0] == "get": try: self.get(self.file_name1) except IndexError and ValueError: self.client.send(‘get‘.encode()) elif self.sending_msg_list[0] == "exit": break else:#cd ls rm drm mkdir 命令等 try: data_header = { ‘test1‘: { ‘action‘: self.sending_msg_list[0], ‘file_name‘: self.file_name1, ‘size‘: 0 } } self.client.send(json.dumps(data_header).encode()) except AttributeError: data_header = { ‘test1‘: { ‘action‘: self.sending_msg_list[0], ‘file_name‘: ‘‘, ‘size‘: 0 } } self.client.send(json.dumps(data_header).encode()) except ConnectionResetError and ConnectionAbortedError: print("[+]Server is Down ....Try to Reconnect......") self.link() def get(self,file_name): data_header = { ‘test1‘: { ‘action‘: ‘get‘, ‘file_name‘: file_name, ‘size‘: 0 } } #这里没做同名文件判断,下一次补充 self.client.send(json.dumps(data_header).encode()) #发送get请求 print(os.getcwd()) self.data = self.client.recv(1024) #拿到size self.client.send(b‘come on‘) file_size = int(self.data.decode()) def file_tr(): file_object = open(file_name, ‘wb‘) received_size = 0 while received_size < file_size: recv_data = self.client.recv(1024) file_object.write(recv_data) received_size += len(recv_data) # 规定多少但不一定收到那么多 print(file_size, received_size) else: print(‘[+]Client:File Recv Successful‘) file_object.close() if os.path.exists(file_name) == False: # 判断本地目录文件是否存在 file_tr() else: print(‘文件已经存在将要覆盖‘) file_tr() def put(self,file_name): if os.path.exists(file_name)== True: #判断文件路径# 是否存不存在 if os.path.isfile(file_name): file_obj = open(file_name, "rb") data_header = { ‘test1‘: { ‘action‘: ‘put‘, ‘file_name‘: self.file_name1, ‘size‘: os.path.getsize(self.sending_msg_list[1].encode()) } } self.client.send(json.dumps(data_header).encode()) for line in file_obj: self.client.send(line) file_obj.close() print("[+]-----send file down-----") else: print(‘[+]file is no valid.‘) self.client.send(‘cmd‘.encode()) else: print(‘[+] File Not Found‘) data_header = { ‘test1‘: { ‘action‘: ‘aaaa‘, ‘file_name‘: ‘‘, ‘size‘: 0 } } self.client.send(json.dumps(data_header).encode()) def menu(self): menu = ‘‘‘ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; 帮 助 请注意windows和linux的路径稍有不同 查看文件 ls(file) eg: ls /tmp/Python3.6.3/README 进入目录 cd eg: cd /tmp/python 创建目录 mkdir(dir) eg: mkdir /tmp/python 删除文件 rm eg: rm /tmp/python/README 删除目录 drm eg: drm /tmp/python 上 传 put eg: put /tmp/python/README 下 载 get eg: get /tmp/python/README 登 出 exit 默认上传文件夹 用户目录下用用户名的命名的文件夹 默认下载文件夹 db/pub ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ‘‘‘ print(menu)
core/ftp_server.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import os,sys,time import subprocess,subprocess,json import socketserver,socket,traceback BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from core.main import Admin from core.auth import Auth_ftp from core.commands import Commands from conf import setting from src.server_class import Server_class class Ftp_server(socketserver.BaseRequestHandler): #都懵比了很多代码重复了、等着重新回炉下~~~ def parsecmd(self,data): data = json.loads(data.decode()) file_action = data[‘test1‘][‘action‘] file_path = data[‘test1‘][‘file_name‘] file_size = int(data[‘test1‘][‘size‘]) file_obj = setting.file_object print(‘from ip : %s information : %s‘ % (self.client_address[0], self.data.decode())) try: if file_action == ‘put‘: os.chdir(file_obj) # 默认用用户的目录作为上传目录 def file_tr(): self.request.send(b‘recv data‘) file_object = open(file_path, ‘wb‘) received_size = 0 while received_size < file_size: recv_data = self.request.recv(1024) file_object.write(recv_data) received_size += len(recv_data) # 规定多少但不一定收到那么多 print(file_size, received_size) else: print(‘[+]File Recv Successful‘) file_object.close() if os.path.exists(file_path) == False: # 判断文件是否存在 file_tr() else: self.request.send(b‘[+] this file[%s] will cover ....‘ % file_path.encode()) file_tr() elif file_action == ‘get‘: os.chdir(setting.ftp_path) # get对象工作的目录pub # # get文件首先需要其他目录,或者更改名字 ,然后进行getget的地址应该是自己家目录的地址 # # 下载文件时候客户端在用户目录下面去下载 #判断size不同os的路径不同 if setting.os_res == ‘Windows‘: file_size = os.path.getsize(setting.ftp_path+‘\\‘+file_path) if setting.os_res == ‘Linux‘: file_size = os.path.getsize(setting.ftp_path + ‘/‘ + file_path) try: if os.path.exists(file_path) == True: # 判断文件路径# 是否存在 if os.path.isfile(file_path): file_obj = open(file_path, "rb") #直接copy 客户端put的代码就可以,这里self.conn.send 改为request.send self.request.send(str(file_size).encode()) self.request.recv(1024) for line in file_obj: self.request.send(line) file_obj.close() self.request.send(b"-b:bash:[+]Server[%s]---send file down-----"%file_path.encode()) else: print(‘[+]file is no valid.‘) self.request.send(b"-b:bash:[+]Server[%s]---file is no valid.-----" % file_path.encode()) #这里不写上下俩条的话客户端就会卡住、处于一直等待中 else: self.request.send(b"-b:bash:[+]Server[%s]---file is no valid.-----" % file_path.encode()) # except json.decoder.JSONDecodeError: # self.request.send(b‘-b:bash:[+]what are you doing???‘) except: pass elif file_action== ‘cd‘: if os.path.exists(file_path) == True: res = Commands(file_path).cd() self.request.send(b‘-bash: [%s] [%s]: ‘ % (file_action.encode(), file_path.encode())) else: self.request.send(b‘-bash:Directory Exitis‘) elif file_action == ‘mkdir‘: os.chdir(file_obj) # 文件夹mkdir命令处理,如果是windows默认应该是gbk生成的bytes类型‘‘‘ if os.path.exists(file_path) == True:# 进入要传递目录 self.request.send(b‘-bash: directory exitis ‘) else: res = Commands(file_path).mkdir() self.request.send(b‘-bash: [%s] [%s]:‘ % (file_action.encode(), file_path.encode())) elif file_action == ‘ls‘: #测试利用json返回命令结果、反正挺好玩的!!!!用subporcess 用的挺high的 res = os.listdir() res = json.dumps(res) self.request.send(res.encode()) # ‘‘‘文件删除命令处理, 如果是windows默认应该是gbk生成的bytes类型‘‘‘ elif file_action == ‘rm‘: os.chdir(file_obj) if os.path.isfile(file_path) == True: res = Commands(file_path).rm() self.request.send(b‘-bash: [%s] [%s]:‘ % (file_action.encode(), file_path.encode())) else: self.request.send(b‘-bash: [%s]: Not file ‘ % file_path.encode()) # 目录删除命令处理, 如果是windows默认应该是gbk生成的bytes类型‘‘‘ elif file_action == ‘drm‘: os.chdir(file_obj) # 进入要传递目录 if os.path.isdir(file_path) == True: # 判断是否是目录 Commands(file_path).drm() self.request.send(b‘-bash: %s: Delete OK ‘%file_path.encode()) else: self.request.send(b‘-bash: [%s]: No such File or Directory ‘ %file_path.encode()) else: self.request.send(b‘-bash:Command or File Not Found ‘) except Exception and UnicodeDecodeError: traceback.print_exc() def handle(self): print("[+] Server is running on port:62000", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) while True: try: # 调整一下socket.socket()的位置每次重新连接都生成新的socket实例,避免因为意外而导致socket断开连接 print("[+] Connect success -> %s at ", self.client_address, time.strftime("%Y%m%d %H:%M:%S", time.localtime())) self.request.send(b‘\033[34;1mWelcome,-bash version 0.0.1-release \033[0m ‘) while True: self.data = self.request.recv(1024) data = self.data self.parsecmd(data) if not self.data: print("[+]Error: Client is lost") break except socket.error: print("[+]Error get connect error") break continue socketserver.ThreadingTCPServer((‘127.0.0.1‘, 62000), Ftp_server).serve_forever()
core/main.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import os,sys,json BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) #添加环境变量 from conf import setting from core.auth import Auth_ftp from core.commands import Commands from core.ftp_client import Ftp_client class Admin(object): def run(self): exit_flag = False print(‘欢迎来到简单FTP程序,登陆程序后server会在本地自动启动‘) menu = u‘‘‘ \033[32;1m 1.登陆 2.注册 3.退出\033[0m ‘‘‘ while not exit_flag: print(menu) user_option = input(‘请输入您的操作,输入q退出>>>:‘).strip() ‘‘‘登陆‘‘‘ if user_option == ‘1‘: user_name = input(‘请输入用户名>>>:‘).strip() user_passwd = input(‘请输入您的密码>>>:‘).strip() file_object = (Auth_ftp(user_name, user_passwd).passwd_data) #传入路径变量 res = Auth_ftp(user_name,user_passwd).auth_user_passwd() if res == None: with open(setting.path_file, ‘w‘,encoding=‘utf-8‘) as f: f.write(file_object);f.close() Ftp_client().link() elif res == False: print(‘%s用户密码不正确‘ % user_name) else: print(‘请先注册‘) elif user_option == ‘2‘: user_name = input(‘请输入用户名>>>:‘).strip() user_passwd = input(‘请输入您的密码>>>:‘).strip() user_res = Auth_ftp(user_name, user_passwd).auth_user_passwd() if user_res == None: print("%s用户不需要注册"%user_name) #不需要注册但是很是应该能进入ftp进行操作 file_object = (Auth_ftp(user_name, user_passwd).passwd_data) with open(setting.path_file, ‘w‘,encoding=‘utf-8‘) as f: f.write(file_object);f.close() Ftp_client().link() elif user_res == False: print("%已注册用户,密码不正确" % user_name) elif user_res == True: Auth_ftp(user_name, user_passwd).add_user_passwd() #创建用户名密码文件等 file_object = (Auth_ftp(user_name, user_passwd).passwd_data) with open(setting.path_file, ‘w‘,encoding=‘utf-8‘) as f: f.write(file_object);f.close() Ftp_client().link() else: sys.exit(‘异常退出‘) elif user_option == ‘q‘ or user_option == ‘3‘: sys.exit() else: print(‘输入的选项不正确,请重新输入‘) #Admin().run()
db logs src/auth_class.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao class Auth(object): ‘‘‘用户验证类类为用户、密码家目录‘‘‘ def __init__(self,name,passwd): self.name = name self.passwd = passwd
src/linux_cmd.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao class Auth(object): ‘‘‘用户验证类类为用户、密码家目录‘‘‘ def __init__(self,name,passwd): self.name = name self.passwd = passwd
src/windows_cmd.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao import os,sys,subprocess class L_commands(object): def __init__(self,file_object): self.cwd = os.getcwd() # 获取当前目录 self.file_object = [file_object] #列表文件 #取0得到文件名/或目录路径 self.directory_object=[file_object] #目录文件 self.file = self.file_object[0] self.directory = self.file_object[0] def cd1(self): os.chdir(self.directory_object[0]) ‘‘‘进入目录‘‘‘ def cd(self): res = subprocess.Popen([‘cd %s‘ % self.directory], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.msg = res.stdout.read() if len(self.msg) == 0: self.msg = res.stderr.read() return self.msg ‘‘‘创建目录‘‘‘ def mkdir(self): res = subprocess.call([‘mkdir -p %s‘ % self.directory], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.msg = res.stdout.read() if len(self.msg) == 0: self.msg = res.stderr.read() return self.msg ‘‘‘删除文件‘‘‘ def rm(self): res = subprocess.call([‘rm -f %s‘ % self.directory], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.msg = res.stdout.read() if len(self.msg) == 0: self.msg = res.stderr.read() return self.msg ‘‘‘删除目录‘‘‘ def drm(self): res = subprocess.call([‘rm -rf %s‘ % self.directory], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.msg = res.stdout.read() if len(self.msg) == 0: self.msg = res.stderr.read() return self.msg
src/server_class.py
#!/usr/bin/env python # -*- coding:utf-8 -*- # Author: Colin Yao ‘‘‘ftpserver类‘‘‘ import os,socket class Server_class(object): def __init__(self,ip_addr,port): self.ip_addr = ip_addr self.port = port self.server = socket.socket()
程序运行部分截图:
用户验证:
系统命令:
上传和下载:
Put
Get
时间: 2024-10-10 17:52:44