一、作业要求
开发简单的FTP:
1. 用户登陆
2. 上传/下载文件
3. 不同用户家目录不同
4. 查看当前目录下文件
5. 充分使用面向对象知识
二、程序目录说明
FTP/ |-- FTPClient/ #客户端文件夹 | |-- 示例文件夹/ #客户端上传/下载示例文件夹 | |-- Client_start.py #客户端启动程序 | |-- FTPServer/ #服务端文件夹 | |-- bin/ | | |-- __init__.py | | |-- Server_start.py #程序启动的主入口 | | | |-- conf/ | | |-- setting.py #配置文件 | | | |-- db/ #用户数据 | | |-- alex #用户名alex的数据文件夹 | | |-- japhi #用户名japhi的数据文件夹 | | | |-- home/ | | |-- alex/ #用户alex用户家目录 | | |-- japhi/ #用户japhi用户家目录 | | |-- log/ | |-- log_sys.log #日志文件(未启用) | |-- src/ | |-- __init__.py | |-- common.py #公共功能 | |-- Server_start.py #程序启动的主入口 | |-- user.py #用户类及方法 | |-- FTP.png #流程图 |-- README.txt
三、流程图
四、作业代码
1、FTPClient客户端文件夹下客户端启动程序Client_start.py:
import socket,os,sys,time Basedir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),"FTPServer") updir = os.path.join(os.path.dirname(os.path.abspath(__file__)),"示例文件夹") sys.path.append(Basedir) from conf import settings def upload(client,user_info,name): ‘‘‘ 客户端上传文件的函数 :param client:scoket客户端标志 :param user_info:客户端登陆用户的信息 :param name:客户端登陆用户的名字 :return:none ‘‘‘ print("\033[1;37m当前可选上传\033[0m".center(40,"*")) dic = {} for root, dirs, files in os.walk(updir): for i,j in enumerate(files): k = i+1 dic[k] = j print("\033[1;37m%s:%s\033[0m"%(k,j)) choice = input("请输入要上传的文件序号:>>>") command = "upload+"+user_info+"+"+dic[int(choice)] client.sendall(bytes(command,encoding="utf-8")) res = client.recv(1024) if str(res,encoding="utf-8") == "True": dir = os.path.join(updir,dic[int(choice)]) with open(dir,"rb") as f: data = f.read() a = str(len(data)) client.sendall(bytes(a,encoding="utf-8")) client.sendall(data) time.sleep(0.5) print("\033[1;37m文件上传成功\033[0m") def download(client,user_info,name): ‘‘‘ 客户端下载文件的函数 :param client: scoket客户端标志 :param user_info: 客户端登陆的用户信息 :param name:客户端登陆的用户名字 :return: none ‘‘‘ dic = {} command = "download+"+user_info client.sendall(bytes(command, encoding="utf-8")) data = client.recv(4069) res = eval(str(data, encoding="utf-8")) if len(res) == 0: print("\033[1;31m当前目录下暂无文件\033[0m".center(40, "-")) else: for i,j in enumerate(res): k = i + 1 dic[k] = j print("\033[1;37m%s:%s\033[0m" % (k, j)) choice = input("请选择要下载的文件序号:>>>") cm = "downloadfile+"+dic[int(choice)]+"+"+name client.sendall(bytes(cm, encoding="utf-8")) print("\033[1;37m准备开始下载文件\033[0m") dir = os.path.join(updir, dic[int(choice)]) res_length = str(client.recv(1024).decode()) length = 0 with open(dir, "wb") as f: while True: data = client.recv(1024) length += len(data) f.write(data) if length >= int(res_length): print("\033[1;37m文件下载成功\033[0m") time.sleep(0.5) break def view_file(client,user_info,name): ‘‘‘ 客户端查看当前目录下文件的函数 :param client: scoket客户端标志 :param user_info: 客户端登陆的用户信息 :param name: 客户端登陆的用户名字 :return: none ‘‘‘ command = "view+"+user_info client.sendall(bytes(command,encoding="utf-8")) data = client.recv(1024) res = eval(str(data,encoding="utf-8")) if len(res) == 0: print("\033[1;31m当前目录下暂无文件\033[0m".center(40, "-")) else: for i in res: print("\033[1;35m%s\033[0m"%i) def operate(client,user_info,name): ‘‘‘ 客户端操作主函数 :param client: scoket客户端标志 :param user_info: 客户端登陆的用户信息 :param name: 客户端登陆的用户名字 :return: none ‘‘‘ dic = {"1":upload,"2":download,"3":view_file} info = ‘‘‘------操作指令------ 1、上传文件 2、下载文件 3、查看目录下文件 4、退出 ‘‘‘ while True: print("\033[1;33m%s\033[0m" % info) choice = input("请输入你要操作的命令:>>>").strip() if choice.isdigit() and 0 < int(choice) <= len(dic): dic.get(choice)(client,user_info,name) elif int(choice) == 4: break else: print("\033[1;31m输出错误\033[0m".center(40, "-")) def com_parse(client,com): ‘‘‘ 客户端用户登陆注册命中解析函数 :param client: 客户端scoket标志 :param com: 命令 :return: 登陆成功返回True,否则False ‘‘‘ # print(com) client.sendall(bytes(com,encoding="utf-8")) re = client.recv(4096) if str(re,encoding="utf-8") == "Success": return True elif str(re, encoding="utf-8") == "Success": return False def login(client,data): ‘‘‘ 客户端用户登陆函数 :param client: 客户端scoket标志 :param data: 数据 :return: none ‘‘‘ name = input("请输入您的名字:>>>").strip() psd = input("请输入密码:>>>").strip() user_info = name+"+"+psd com = "login+"+user_info if com_parse(client,com): operate(client,user_info,name) else: print("\033[1;31m登陆出现异常\033[0m") def register(client,data): ‘‘‘ 客户端用户注册函数 :param client: 客户端scoket标志 :param data: 数据 :return: none ‘‘‘ name = input("请输入您的名字:>>>").strip() psd = input("请输入密码:>>>").strip() com = "register+" + name + "+" + psd if com_parse(client, com): user_info = name + "+" + psd print("\033[1;31m注册成功\033[0m") command = "view+" + user_info client.sendall(bytes(command, encoding="utf-8")) res1 = client.recv(1024) operate(client, user_info, name) else: print("\033[1;31m注册出现异常\033[0m") def quit(client,data): ‘‘‘ 程序退出函数 :param client: 客户端scoket标志 :param data: 用户数据 :return: none ‘‘‘ exit() def main_func(client,data): ‘‘‘ 客户端主菜单函数 :param client: 客户端scoket标志 :param data: 数据 :return: none ‘‘‘ dic = {"1":login,"2":register,"3":quit} info = ‘‘‘------用户登录界面------*{0}* 1、登陆 2、注册 3、退出 ‘‘‘.format(str(data,encoding="utf-8")) print("\033[1;33m%s\033[0m"%info) what = input("你要干嘛?>>>").strip() if what.isdigit() and 0 < int(what) <= len(dic): dic.get(what)(client,data) else: print("\033[1;31m输出错误\033[0m".center(40,"-")) if __name__ == ‘__main__‘: client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(("localhost",settings.PORT)) main_func(client,client.recv(1024)) client.close()
2、FTPServer服务端文件下conf文件下配置程序settings.py:
import os basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) user_home = "%s/home"%basedir user_info = "%s/db"%basedir HOST = "0.0.0.0" PORT = 9999
3、FTPServer服务端文件下src文件下公共功能程序common.py:
import logging,os,pickle,sys,uuid frame = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(frame) # from conf import setting # # def sys_logging(content,levelname): # ‘‘‘ # 程序记录日志函数 # :param content: 日志的内容 # :param levelname: 日志的等级 # :return: none # ‘‘‘ # _filename = os.path.join(setting.log_dir,"log_sys.log") # log = logging.getLogger(_filename) # logging.basicConfig(filename=_filename,level=logging.INFO,format=‘%(asctime)s-%(levelname)s-%(message)s‘, datefmt=‘%m/%d/%Y %I:%M:%S %p‘) # if levelname == ‘debug‘: # logging.debug(content) # elif levelname == ‘info‘: # logging.info(content) # elif levelname == ‘warning‘: # logging.warning(content) # elif levelname == ‘error‘: # logging.error(content) # elif levelname == ‘critical‘: # logging.critical(content) def show(msg,msg_type): ‘‘‘ 程序不同信息打印的字体颜色 :param msg: 打印信息 :param msg_type: 打印信息的类型 :return: none ‘‘‘ if msg_type == "info": show_msg = "\033[1;35m%s\033[0m"%msg elif msg_type == "error": show_msg = "\033[1;31m%s\033[0m"%msg elif msg_type == "msg": show_msg = "\033[1;37m%s\033[0m"%msg else: show_msg = "\033[1;32m%s\033[0m"%msg print(show_msg)
src文件夹下Server_start.py服务端启动主函数:
import socket,os,sys Base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(Base_dir) from conf import settings from common import show from user import User def fuck_tcp(con,addr): ‘‘‘ 服务端数据解析主函数 :param con: :param addr: :return: ‘‘‘ show("收到{0}的连接请求,正在通信中。。。".format(addr),"info") con.sendall(bytes("连接成功",encoding="utf-8")) while True: cmd = con.recv(4096) if not cmd: break else: # try: data = str(cmd.decode(encoding="utf-8")) res = data.split("+") if res[0] == "login": show("收到客户端登陆的请求,正在登陆。。。", "msg") name = res[1] psd = res[2] user = User(name, psd) sign = user.login() if sign: con.sendall(bytes("Success", encoding="utf-8")) else: con.sendall(bytes("Failure", encoding="utf-8")) elif res[0] == "register": show("收到客户端注册的请求,正在注册。。。", "msg") name = res[1] psd = res[2] user = User(name, psd) if user.register(): con.sendall(bytes("Success", encoding="utf-8")) else: con.sendall(bytes("Failure", encoding="utf-8")) elif res[0] == "view": show("收到客户端创建或查看当前目录文件的请求。。。", "msg") name = res[1] psd = res[2] user = User(name, psd) res = user.view_file() file = str(res) con.sendall(bytes(file, encoding="utf-8")) show("当前目录文件查看或创建成功", "info") elif res[0] == "upload": show("收到客户端上传文件的请求。。。", "msg") name = res[1] filename = res[3] con.sendall(bytes("True",encoding="utf-8")) res_length = str(con.recv(1024).decode()) User.receive(filename,name,res_length,con) elif res[0] == "download": show("收到客户端下载文件的请求。。。", "msg") name = res[1] psd = res[2] user = User(name, psd) res = user.view_file() file = str(res) con.sendall(bytes(file, encoding="utf-8")) elif res[0] == "downloadfile": show("开始下载文件", "msg") User.download_file(res[1], res[2], con) show("文件下载成功", "info") if __name__ == ‘__main__‘: server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(("localhost",settings.PORT)) server.listen(5) show("正在监听[%s]地址[%s]端口,等待连接。。。"%(settings.HOST,settings.PORT),"info") con,addr = server.accept() fuck_tcp(con,addr)
src文件下用户类程序user.py:
import os,sys,pickle,socket,time Base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(Base_dir) from conf import settings from common import show class User(object): ‘‘‘ 用户类 ‘‘‘ def __init__(self,username,psd): self.name = username self.password = psd self.home_path = settings.user_home + "/" +self.name def login(self): ‘‘‘ 用户登陆方法 :return: ‘‘‘ user_dic = User.info_read(self.name) if user_dic.get(self.name) == self.password: show("登陆成功","info") return True else: show("登陆失败,用户名或密码错误","error") return False def register(self): ‘‘‘ 用户注册方法 :return: ‘‘‘ dic = {} dic[self.name] = self.password if User.info_write(self.name,dic): show("注册成功","info") return True else: show("注册失败","error") return False def view_file(self): ‘‘‘ 查看当前目录下文件 :return: 目录下文件名组成的列表 ‘‘‘ if not os.path.exists(self.home_path): os.mkdir(self.home_path) with open("%s\空白文件"%self.home_path,"w") as f: f.write("空白文件") for root, dirs, files in os.walk(self.home_path): return files @staticmethod def download_file(filename,name,con): ‘‘‘ 下载文件静态方法 :param filename: 文件名 :param name: 用户名 :param con: 标志 :return: none ‘‘‘ dir = os.path.join(os.path.join(os.path.join(Base_dir, "home"), name), filename) with open(dir,"rb") as f: # while True: data = f.read() a = str(len(data)) # print(type(a)) con.sendall(bytes(a,encoding="utf-8")) con.sendall(data) @staticmethod def receive(filename,name,res_length,con): ‘‘‘ 接收文件静态方法 :param filename: 文件名 :param name: 用户名 :param con: 标志 :return: none ‘‘‘ dir = os.path.join(os.path.join(os.path.join(Base_dir,"home"),name),filename) length = 0 f = open(dir, "wb") while True: data = con.recv(1024) length += len(data) f.write(data) # print(length) if length == int(res_length): show("文件下载成功","info") f.flush() f.close() return True @staticmethod def info_read(name): ‘‘‘ 读取用户数据的静态方法 :param name: 用户名 :return: 字典 ‘‘‘ user_dir = os.path.join(settings.user_info,name) if os.path.exists(user_dir): with open(user_dir,"rb") as f: dic = pickle.load(f) return dic else: print("用户数据为空") @staticmethod def info_write(name,dic): ‘‘‘ 写入用户数据的静态方法 :param name:用户名 :param dic:用户信息字典 :return:True ‘‘‘ user_dir = os.path.join(settings.user_info, name) with open(user_dir,"wb") as f: pickle.dump(dic,f) return True
五、程序界面说明
时间: 2024-11-06 23:16:37