python搭建ssserver限制端口连接数

新建文件,编写下面内容,保存为socket.py文件。放到ssserver.exe所在文件夹

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2015 Falseen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# 功能:限制客户端数量(基于ip判断)
#
# 使用说明:1.将此文件放在ss根目录中即可生效,不用做其他设置(需要重新运行ss)。
#          2.修改53、54行的 clean_time 和 ip_numbers 为你想要的数值。
#          3.如果你的服务器有ipv6,并且你想让ip4和ip6分开限制,则可以设置 only_port 为 False 。
#
#
# 原理:默认情况下,ss在运行的时候会引用系统中的socket文件,但是把这个socket文件放到ss目录之后,ss就会引用这个我们自定义的socket文件。
#      然后在这个文件中再引用真正的socket包,并在原socket的基础上加以修改,最终ss调用的就是经过我们修改的socket文件了。
#
# 所以理论上任何引用了socket包的python程序都可以用这个文件来达到限制连接ip数量的目的。
#

from __future__ import absolute_import, division, print_function, \
    with_statement, nested_scopes

import sys

del sys.modules[‘socket‘]

import sys
import time
import logging
import types

path = sys.path[0]
sys.path.pop(0)

import socket    # 导入真正的socket包

sys.path.insert(0, path)

clean_time = 60        # 设置清理ip的时间间隔,在此时间内无连接的ip会被清理
ip_numbers = 1         # 设置每个端口的允许通过的ip数量,即设置客户端ip数量
only_port = True       # 设置是否只根据端口判断。如果为 True ,则只根据端口判断。如果为 False ,则会严格的根据 ip+端口进行判断。

# 动态path类方法
def re_class_method(_class, method_name, re_method):
    method = getattr(_class, method_name)
    info = sys.version_info
    if info[0] >= 3:
        setattr(_class, method_name,
                types.MethodType(lambda *args, **kwds: re_method(method, *args, **kwds), _class))
    else:
        setattr(_class, method_name,
                types.MethodType(lambda *args, **kwds: re_method(method, *args, **kwds), None, _class))

# 动态path实例方法
def re_self_method(self, method_name, re_method):
    method = getattr(self, method_name)
    setattr(self, method_name, types.MethodType(lambda *args, **kwds: re_method(method, *args, **kwds), self, self))

# 处理Tcp连接
def re_accept(old_method, self, *args, **kwds):

while True:

return_value = old_method(self, *args, **kwds)
        self_socket = return_value[0]
        if only_port:
            server_ip_port = ‘%s‘ % self.getsockname()[1]
        else:
            server_ip_port = ‘%s_%s‘ % (self.getsockname()[0], self.getsockname()[1])

client_ip = return_value[1][0]

client_ip_list = [x[0].split(‘#‘)[0] for x in self._list_client_ip[server_ip_port]]
        if len(self._list_client_ip[server_ip_port]) == 0:
            logging.debug("[re_socket] first add %s" % client_ip)
            self._list_client_ip[server_ip_port].append([‘%s#%s‘ % (client_ip, time.time()), self_socket])
            return return_value

if client_ip in client_ip_list:
            logging.debug("[re_socket] update socket in %s" % client_ip)
            _ip_index = client_ip_list.index(client_ip)
            self._list_client_ip[server_ip_port][_ip_index][0] = ‘%s#%s‘ % (client_ip, time.time())
            self._list_client_ip[server_ip_port][_ip_index].append(self_socket)
            return return_value

else:
            if len(self._list_client_ip[server_ip_port]) < ip_numbers:
                logging.debug("[re_socket] add %s" % client_ip)
                self._list_client_ip[server_ip_port].append([‘%s#%s‘ % (client_ip, time.time()), self_socket])
                return return_value

for x in [x for x in self._list_client_ip[server_ip_port]]:
                is_closed = True
                if time.time() - float(x[0].split(‘#‘)[1]) > clean_time:

for y in x[1:]:
                        try:
                            y.getpeername()     # 判断连接是否关闭
                            is_closed = False
                            break
                        except:                # 如果抛出异常,则说明连接已经关闭,这时可以关闭套接字
                            logging.debug("[re_socket] close and remove the time out socket 1/%s" % (len(x[1:])))
                            x.remove(y)

if not is_closed:
                        logging.debug(‘[re_socket] the %s still exists and update last_time‘ % str(x[1].getpeername()[0]))
                        _ip_index = client_ip_list.index(x[0].split(‘#‘)[0])
                        self._list_client_ip[server_ip_port][_ip_index][0] = ‘%s#%s‘ % (x[0].split(‘#‘)[0], time.time())

else:
                        logging.info("[re_socket] remove time out ip and add new ip %s" % client_ip )
                        self._list_client_ip[server_ip_port].remove(x)
                        self._list_client_ip[server_ip_port].append([‘%s#%s‘ % (client_ip, time.time()), self_socket])
                        return return_value

if int(time.time()) % 5 == 0:
            logging.debug("[re_socket] the port %s client more then the %s" % (server_ip_port, ip_numbers))

# 处理Udp连接
def re_recvfrom(old_method, self, *args, **kwds):

while True:
        return_value = old_method(*args, **kwds)
        self_socket = ‘‘
        if only_port:
            server_ip_port = ‘%s‘ % self.getsockname()[1]
        else:
            server_ip_port = ‘%s_%s‘ % (self.getsockname()[0], self.getsockname()[1])
        client_ip = return_value[1][0]
        client_ip_list = [x[0].split(‘#‘)[0] for x in self._list_client_ip[server_ip_port]]

if len(self._list_client_ip[server_ip_port]) == 0:
            logging.debug("[re_socket] first add %s" % client_ip)
            self._list_client_ip[server_ip_port].append([‘%s#%s‘ % (client_ip, time.time()), self_socket])
            return return_value

if client_ip in client_ip_list:
            logging.debug("[re_socket] update socket in %s" % client_ip)
            _ip_index = client_ip_list.index(client_ip)
            self._list_client_ip[server_ip_port][_ip_index][0] = ‘%s#%s‘ % (client_ip, time.time())
            return return_value
        else:
            if len(self._list_client_ip[server_ip_port]) < ip_numbers:
                logging.debug("[re_socket] add %s" % client_ip)
                self._list_client_ip[server_ip_port].append([‘%s#%s‘ % (client_ip, time.time()), self_socket])
                return return_value

for x in [x for x in self._list_client_ip[server_ip_port]]:
                is_closed = True
                if time.time() - float(x[0].split(‘#‘)[1]) > clean_time:

for y in x[1:]:
                        try:
                            y.getpeername()     # 判断连接是否关闭
                            is_closed = False
                            break
                        except:                # 如果抛出异常,则说明连接已经关闭,这时可以关闭套接字
                            logging.debug("[re_socket] close and remove the time out socket 1/%s" % (len(x[1:])))
                            x.remove(y)

if not is_closed:
                        logging.debug(‘[re_socket] the %s still exists and update last_time‘ % str(x[1].getpeername()[0]))
                        _ip_index = client_ip_list.index(x[0].split(‘#‘)[0])
                        self._list_client_ip[server_ip_port][_ip_index][0] = ‘%s#%s‘ % (x[0].split(‘#‘)[0], time.time())

else:
                        logging.info("[re_socket] remove time out ip and add new ip %s" % client_ip )
                        self._list_client_ip[server_ip_port].remove(x)
                        self._list_client_ip[server_ip_port].append([‘%s#%s‘ % (client_ip, time.time()), self_socket])
                        return return_value

if int(time.time()) % 5 == 0:
            logging.debug("[re_socket] the port %s client more then the %s" % (server_ip_port, ip_numbers))
        new_tuple = [b‘‘, return_value[1]]
        return_value = tuple(new_tuple)
        return return_value

def re_bind(old_method, self, *args, **kwds):
    if only_port:
        port = ‘%s‘ % args[0][1]
    else:
        port = ‘%s_%s‘ % (args[0][0], args[0][1])
    self._list_client_ip[port] = []
    re_self_method(self, ‘recvfrom‘, re_recvfrom)
    old_method(self, *args, **kwds)

setattr(socket.socket, ‘_list_client_ip‘, {})
re_class_method(socket.socket, ‘bind‘, re_bind)
re_class_method(socket.socket, ‘accept‘, re_accept)

时间: 2024-10-25 21:03:01

python搭建ssserver限制端口连接数的相关文章

python搭建httpserver

因为手机要下载电脑上的文件,使用手机助手什么的经常出没反应,于是网上查了下,直接使用python搭建简单的HTTP服务器,之后在其运行目录下扔文件就行了.浏览器访问时可以直接显示相关的文件列表.参考了下面一篇文章:http://www.cnblogs.com/yili16438/p/d3209323913c6d53e6060fcd8d27e4c0.html 若你使用的是2.x,那么你只需要简单运行命令: python -m SimpleHTTPServer 80 最后一个参数是端口号,这里使用8

Python搭建网站框架

1. 机器上安装python 省略 2. 机器上安装python的easy_install 下载一个ez_setup.py文件 进入该文件路径下,运行该文件:运行结束后,文件下<python路径>\Scripts多了easy_install.exe 将这个exe文件的路径加入环境变量path下,这个时候在任何情况下都可以使用easy_install解析python框架了 3. 在你的web框架下,使用easy_install安装你的frame框架. 运行结束frame框架就安装在你的机器上了.

拥抱Android:编译python搭建移动的无线服务器平台

你想用废旧的Android手机作家庭服务器嘛? 其实并不难.以前,用Android SDK开发一个手机应用,安装下apk就可以host服务了,而现在就直接native化吧. 这篇文章会带你体验编译Python的过程,并用Python搭建可以带着跑的服务器. 首先,我们要开始在Arm的Android平台上编译Python.当然,你需要先准备好一台Linux的机器,然后从Android的官方网站下载并安装好Android NDK(最好SDK也装了). 下载一些必要的代码包: openssl-1.0.

python搭建简易服务器实例参考

有关python搭建简易服务器的方法. 需求分析: 省油宝用户数 已经破了6000,原有的静态报表 已经变得臃肿不堪, 每次打开都要缓上半天,甚至浏览器直接挂掉 采用python搭建一个最最简易的 web 服务 请求一个nick 就返回 对应的 报表数据 参数用GET方式传送 调研与实现: 园里没找到靠谱的,google了半天,最终还是成功了. 以下是源码,里面记录了 其中的 一些问题 复制代码 代码如下: #! /usr/bin/env python # -*- coding: utf-8 -

zabbix自动发现使用中端口并图形展示各个端口连接数

zabbix自动发现当前服务器使用中的端口并图形展示各个端口连接数 1.修改配置文件 vi /data/server/zabbix_agent/conf/zabbix_agentd.conf UserParameter=tcp.port.discovery,/data/server/zabbix/bin/discover_tcp_port_count tcp_port_discovery UserParameter=tcp.port.count[*],/data/server/zabbix/bi

Coco2d-x android win7 Python 搭建游戏开发环境

1:我用电脑配置 win7 3 核 内存8G 台式机,一直想研究Coco2d 游戏开发,所以经过一周的需找,终于把环境搭建好了 2:我用的版本是该版本,至于搭建android开发环境省略了, 3: 2.2>安装ndk,为了使用c++/c进行android开发 下载android-ndk-r8e,然后在eclipse或adt bundle中配置ndk路径. 4: 5:采用VS 编译环境 我之前用的VS2010 感觉用着不爽,所以改成现在的VS 2012 , 6:一般用python建立项目: 用py

使用Python编写简单的端口扫描器的实例分享【转】

转自 使用Python编写简单的端口扫描器的实例分享_python_脚本之家 http://www.jb51.net/article/76630.htm -*- coding:utf8 -*- #!/usr/bin/python # Python: 2.7.8 # Platform: Windows # Authro: wucl # Program: 端口扫描 # History: 2015.6.1 import socket, time, thread socket.setdefaulttim

python 自动发现Java端口

python 自动发现Java端口 #!/usr/bin/env python #encoding: utf8 import subprocess try: import json except ImportError: import simplejson as json command = "netstat -tpln | grep -v '127.0.0.1' | awk -F '[ :]+' '/java/ && / / {print $5}'" p = subp

运维脚本:python实现批量IP端口扫描

运维脚本:python实现批量IP端口扫描 专注网络运维,只发实用干货 扫描二维码关注公众 今天不想更新,发一篇存货,分享一小段python代码给大家,能实现简单的批量端口扫描,废话不多说,先上代码: =========================================================== # -*- coding: utf-8 -*- import socket import time import xlrd import threading hostfile =