SELECTORS模块实现并发简单版FTP

环境:windows, python 3.5功能:使用SELECTORS模块实现并发简单版FTP允许多用户并发上传下载文件

结构:ftp_client   ---|                bin ---|                        start_client.py     ......启动客户端                conf---|                        config.py           ......客户端参数配置                        system.ini          ......客户端参数配置文件                core---|                        clients.py          ......客户端主程序                home                        ......默认下载路径ftp_server  ---|                bin ---|                        start_server.py     ......启动服务端                conf---|                        config.py           ......服务端参数配置                        system.ini          ......服务端参数配置文件                core---|                        servers.py          ......服务端主程序                db  ---|                        data.py             ......存取用户数据                home                        ......默认上传路径

功能实现:客户端输入命令,根据命令通过映射进入相应方法,上传或者下载;服务端用SELECTORS模块实现并发,接收数据,通过action用映射进入相应方法;

如何使用:启动start_server.py,启动start_client.py;在客户端输入get pathname/put pathname进行上传下载文件,开启多个客户端可并发上传下载。

core:

#!/usr/bin/env python
# -*-coding:utf-8-*-
# Author:zh
import socket
import os
import json
import random
import conf
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep   # 默认下载存放路径为当前路径的home目录下

class FtpClient(object):
    def __init__(self):
        self.client = socket.socket()

    def connect(self, ip, port):
        self.client.connect((ip, port))

    def interactive(self):
        # 入口
        while True:
            cmd = input(">>").strip()
            if len(cmd) == 0:
                continue
            cmd_list = cmd.split()
            if len(cmd_list) == 2:
                cmd_str = cmd_list[0]
                if hasattr(self, cmd_str):
                    func = getattr(self, cmd_str)
                    func(cmd)
                else:
                    print("输入格式错误")
                    self.help()
            else:
                print("输入格式错误")
                self.help()

    @staticmethod
    def help():
        msg = ‘‘‘
get filepath(下载文件)
put filepath(上传文件)
        ‘‘‘
        print(msg)

    def get(self, *args):
        # 下载
        data = args[0].split()
        action = data[0]
        path = data[1]
        send_msg = {
            "action": action,  # 第一次请求,需要服务端返回文件信息
            "file_path": path  # F:\oracle课程\(必读)11gOCP考试流程与须知.rar
        }
        self.client.send(json.dumps(send_msg).encode())
        data = self.client.recv(1024)
        data = json.loads(data.decode())
        if data["sign"] == "0":
            file_length = data["length"]
            file_path = PATH+data["filename"]
            if os.path.isfile(file_path):
                file_path = file_path + str(random.randint(1,1000))
            file = open(file_path, "wb")
            send_msg_2 = {
                "action": "get_file_data",  # 第二次请求,请求客户端给发送文件数据
                "file_path": path  # 第二次请求依然需要传入文件路径,也可以在服务端利用全局变量保存
            }
            self.client.send(json.dumps(send_msg_2).encode())
            get_length = 0
            while get_length < int(file_length):
                data = self.client.recv(1024)
                file.write(data)
                get_length += len(data)
            else:
                file.close()
            if get_length == int(file_length):
                print("下载成功")
            else:
                print("文件传输失败")
                os.remove(file_path)  # 如果传输过程出现问题,删除文件
        else:
            print("文件不存在")

    def put(self, *args):
        # 上传
        data = args[0].split()
        action = data[0]
        file_path = data[1]
        if os.path.isfile(file_path):
            msg = {
                "action": action,  # 上传第一次发送,发送文件大小和姓名
                "length": os.stat(file_path).st_size,
                "filename": file_path[file_path.rfind("\\") + 1:]
            }
            self.client.send(json.dumps(msg).encode())
            self.client.recv(1024)  # 避免黏包
            file = open(file_path, "rb")
            for line in file:
                self.client.send(line)  # 上传第二次发送,发送文件数据,没有action,在服务端通过try,json报错的进入接收数据的方法里
            else:
                file.close()
            sign = self.client.recv(1024)
            if sign == b‘0‘:
                print("上传成功")
            else:
                print("上传失败")
        else:
            print("文件不存在")

def run():
    data = conf.config.Configuration()
    conf_data = data.get_config()
    obj = FtpClient()
    obj.connect(conf_data[0][1], int(conf_data[1][1]))
    obj.interactive()

clients

core:

#!/usr/bin/env python
# -*-coding:utf-8-*-
# Author:zh
import selectors
import socket
import json
import os
import random
import conf
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))+os.sep+"home"+os.sep  # 默认上传存放路径为当前路径的home目录下

class SelectorFtp(object):

    def __init__(self):
        self.sel = selectors.DefaultSelector()

    def connect(self, ip, port):
        sock = socket.socket()
        sock.bind((ip, port))
        sock.listen(100)
        sock.setblocking(False)
        self.sel.register(sock, selectors.EVENT_READ, self.accept)
        while True:
            events = self.sel.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj, mask)

    def accept(self, sock, mask):
        conn, addr = sock.accept()  # Should be ready
        conn.setblocking(False)
        self.sel.register(conn, selectors.EVENT_READ, self.interactive)

    def interactive(self, conn, mask):
        # 回调函数,每次接收消息都判断action,根据action分别调用不同函数信息交互
        try:
            data = conn.recv(1024)
            if data:
                try:
                    cmd_dict = json.loads(data.decode())
                    action = cmd_dict[‘action‘]
                    if hasattr(self, action):
                        func = getattr(self, action)
                        func(cmd_dict, conn)
                except (UnicodeDecodeError, json.decoder.JSONDecodeError, TypeError) as e:
                    # put发送文件数据,无法通过json打包,采取json报错后进入方法循环接收
                    self.put_file_data(data, conn)
            else:
                self.sel.unregister(conn)
                conn.close()
        except ConnectionResetError as e:
            print(e)
            # 客户端意外断开时注销链接
            self.sel.unregister(conn)
            conn.close()

    def get(self, *args):
        # 下载第一步:判断文件是否存在并返回文件大小和文件名
        conn = args[1]
        file_path = args[0]["file_path"]
        if os.path.isfile(file_path):
            sign = "0"
            length = os.stat(file_path).st_size
        else:
            sign = "1"
            length = None
        msg = {
            "sign": sign,
            "length": length,
            "filename": file_path[file_path.rfind("\\")+1:]
        }
        conn.send(json.dumps(msg).encode())

    def get_file_data(self, *args):
        # 下载第二步,传送文件数据
        conn = args[1]
        file_path = args[0]["file_path"]
        file = open(file_path, "rb")
        for line in file:
            conn.send(line)
        else:
            file.close()

    def put(self, *args):
        # 上传第一步,接收文件大小和文件名
        conn = args[1]
        self.length = args[0]["length"]
        self.filename = PATH + args[0]["filename"]
        if os.path.isfile(self.filename):
            self.filename = self.filename + str(random.randint(1, 1000))
        self.file = open(self.filename, "wb")
        self.recv_size = 0
        conn.send(b"0")  # 避免客户端黏包

    def put_file_data(self, *args):
        # json报错后进入此循环接收数据
        conn = args[1]
        self.recv_size += len(args[0])
        self.file.write(args[0])
        if int(self.length) == self.recv_size:
            self.file.close()
            conn.send(b"0")

def run():
    data = conf.config.Configuration()
    conf_data = data.get_config()
    obj = SelectorFtp()
    obj.connect(conf_data[0][1], int(conf_data[1][1]))

servers

config:

#!/usr/bin/env python
# -*-coding:utf-8-*-
# _author_=zh
import os
import configparser
PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

class Configuration(object):
    def __init__(self):
        self.config = configparser.ConfigParser()
        self.name = PATH+os.sep+"conf"+os.sep+"system.ini"

    def init_config(self):
        # 初始化配置文件,ip :客户端IP,port:客户端端口
        if not os.path.exists(self.name):
            self.config["config"] = {"ip": "localhost", "port": 1234}
            self.config.write(open(self.name, "w", encoding="utf-8", ))

    def get_config(self, head="config"):
        ‘‘‘
        获取配置文件数据
        :param head: 配置文件的section,默认取初始化文件config的数据
        :return:返回head中的所有数据(列表)
        ‘‘‘
        self.init_config()  # 取文件数据之前生成配置文件
        self.config.read(self.name, encoding="utf-8")
        if self.config.has_section(head):
            section = self.config.sections()
            return self.config.items(section[0])

config



原文地址:https://www.cnblogs.com/zh-20170913/p/8298166.html

时间: 2024-11-13 09:03:38

SELECTORS模块实现并发简单版FTP的相关文章

python 协程, 异步IO Select 和 selectors 模块 多并发演示

主要内容 Gevent协程 Select\Poll\Epoll异步IO与事件驱动 selectors 模块 多并发演示 协程 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是线程:协程是一种用户态的轻量级线程. 协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此: 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开

多线程并发简单版

server------------------- #!/usr/bin/python3# -*- coding: utf-8 -*-# @Time    : 2018/6/19 9:33# @File    : server1.py from socket import *from threading import Thread def communicate(conn):    while True:        try:            data = conn.recv(1024)

SELECT版FTP

功能: 1.使用SELECT或SELECTORS模块实现并发简单版FTP 2.允许多用户并发上传下载文件环境: python 3.5特性: select 实现并发效果运行: get 文件名 #从服务器下载文件 put 文件名 #向服务器上传文件 helps #帮助信息 其他命令 #变大写返回给客户端主要知识点: os模块的应用 json模块的运用 select模块的运用 socket通信 queue数据交互 粘包原理: 这个程序通过select实现了并发,主要原理为它通过一个select()系统

网络编程之IO模型——selectors模块

网络编程之IO模型--selectors模块 一.了解select,poll,epoll IO复用:为了解释这个名词,首先来理解下复用这个概念,复用也就是共用的意思,这样理解还是有些抽象, 为此,咱们来理解下复用在通信领域的使用,在通信领域中为了充分利用网络连接的物理介质. 往往在同一条网络链路上采用时分复用或频分复用的技术使其在同一链路上传输多路信号,到这里我们就基本上理解了复用的含义, 即公用某个"介质"来尽可能多的做同一类(性质)的事,那IO复用的"介质"是什

# 进程/线程/协程 # IO:同步/异步/阻塞/非阻塞 # greenlet gevent # 事件驱动与异步IO # Select\Poll\Epoll异步IO 以及selectors模块 # Python队列/RabbitMQ队列

1 # 进程/线程/协程 2 # IO:同步/异步/阻塞/非阻塞 3 # greenlet gevent 4 # 事件驱动与异步IO 5 # Select\Poll\Epoll异步IO 以及selectors模块 6 # Python队列/RabbitMQ队列 7 8 ############################################################################################## 9 1.什么是进程?进程和程序之间有什么

{python之IO多路复用} IO模型介绍 阻塞IO(blocking IO) 非阻塞IO(non-blocking IO) 多路复用IO(IO multiplexing) 异步IO(Asynchronous I/O) IO模型比较分析 selectors模块

阅读目录 一 IO模型介绍 二 阻塞IO(blocking IO) 三 非阻塞IO(non-blocking IO) 四 多路复用IO(IO multiplexing) 五 异步IO(Asynchronous I/O) 六 IO模型比较分析 七 selectors模块 一 IO模型介绍 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能

python之selectors模块

selectors模块是在python3.4版本中引进的,它封装了IO多路复用中的select和epoll,能够更快,更方便的实现多并发效果. 官方文档见:https://docs.python.org/3/library/selectors.html 以下是一个selectors模块的代码示范: #!/usr/bin/python #Author:sean import selectors import socket #selectors模块默认会用epoll,如果你的系统中没有epoll(比

简单的FTP上传下载(java实现)

/** *阅读前请自己在win7上建立FTP主机 *具体步骤如:http://jingyan.baidu.com/article/574c5219d466c36c8d9dc138.html * 然后将以下FTP,username,password分别改成你的FTP ip地址 用户名 密码即可 * 本例子用了apche的commons-net-3.3.jar以方便FTP的访问 请手动buid -path * 待完成版 刷新按钮 登录 都还没有做 而且上传 下载 完成后都需要重新运行 * 2014-

名片管理系统---简单版

先写一个名片管理系统简单版,后续改进#!/usr/bin/env python3# -*- conding:utf-8 -*-# @Time: 2017/12/15 12:27# @Author:Luke# 名片系统..持续开发中...还有函数,模块等=没加进来,预先演练cardSys = [{"name":"张三","telephone":"15892233331","QQ":"12345&qu