之前写的一个自动查找,检验可用的VPN_gate服务器.
#!/usr/bin/python #-*- encoding: utf-8 -*- import re import os import imaplib import email import sys import base64 import tempfile import subprocess import time #----------------------------------------------------------------------------- vpn_gate = [] mail_user = ‘[email protected]‘ mail_pass = ‘123123‘ mail_ser = ‘imap.gmail.com‘ vpn_tmp_dir = "/tmp/vpn" vpn_ser_config_dir = vpn_tmp_dir + "/config" vpn_ser_list = vpn_ser_config_dir + "/vpn.list" vpn_ser_wget = vpn_tmp_dir + "/vpn_wget.tmp" vpn_ser_old = "/etc/vpngate/vpn_ser.list" vpn_bad_ser = "/etc/vpngate/bad_ser.list" dns_gfw = ["118.5.49.6","128.121.126.139","159.106.121.75","169.132.13.103","188.5.4.96","189.163.17.5","192.67.198.6","197.4.4.12","202.106.1.2","202.181.7.85","203.161.230.171","203.98.7.65","207.12.88.98","208.56.31.43","209.145.54.50","209.220.30.174","209.36.73.33","209.85.229.138","211.94.66.147","213.169.251.35","216.221.188.182","216.234.179.13","23.89.5.60","243.185.187.39","249.129.46.48","253.157.14.165","37.61.54.158","4.36.66.178","46.82.174.68","49.2.123.56","54.76.135.1","59.24.3.173","64.33.88.161","64.33.99.47","64.66.163.251","65.104.202.252","65.160.219.113","66.45.252.237","72.14.205.104","72.14.205.99","74.125.127.102","74.125.155.102","74.125.39.102","74.125.39.113","77.4.7.92","78.16.49.15","8.7.198.45","93.46.8.89"] #----------------------------------------------------------------------------- def write_file(filename, content): f = open(filename, ‘w‘) f.write(content) f.close() def getMail(): try: imapServer = imaplib.IMAP4_SSL(mail_ser, 993) imapServer.login(mail_user, mail_pass) imapServer.select() #Message statues = ‘All,Unseen,Seen,Recent,Answered, Flagged‘ resp, items = imapServer.search(None, "Unseen") number = 1 website = ‘‘ for i in items[0].split(): #get information of email resp, mailData = imapServer.fetch(i, "(RFC822)") mailText = mailData[0][1] msg = email.message_from_string(mailText) website = re.findall(‘http://[a-zA-Z0-9.:-]*/‘, str(msg)) if website == ‘‘: continue else: break return list(set(website)) imapServer.close() imapServer.logout() except: print "MAIL SERVER ERROR" return [] def check_wget(filename, filesize): if not os.path.exists(filename): return False else: if os.path.getsize(filename) != int(filesize): os.system(‘rm -f %s‘ % filename) return False else: return True def get_vpn_list(filename, config_dir, host_list): filea = open(filename,‘r‘) lines = filea.readlines() hostname = {} for i in lines: if not str.startswith(i,‘v‘): continue try: tmp = i.split(‘,‘) result = base64.decodestring(tmp[-1]) result = result.replace(‘\r\n‘,‘\n‘) if tmp[1] in bad_vpn_ser.keys(): continue hostname[tmp[1]] = ‘‘ write_file(config_dir + "/" + tmp[1] + ".ovpn", result) except: pass return hostname filea.close() def del_tmp_exit(): #os.system(‘rm -f %s‘ % vpn_ser_wget) #os.system(‘rm -rf %s‘ % tmp_file) sys.exit() def check_vpn_old_ser(filename, content=‘‘, del_bad = False): tmp = {} if content == ‘‘: # read vpn ser list if os.path.getsize(filename) != 0: f = open(filename,‘r‘) for i in f: tmp[i.strip()] = ‘‘ f.close() return(tmp.keys()) else: return [] else: # write vpn ser list if os.path.getsize(filename) != 0: f = open(filename,‘r‘) for i in f: tmp[i.strip()] = ‘‘ f.close() if del_bad: if content in tmp: del tmp[content.strip()] else: tmp[content.strip()] = ‘‘ write_file(filename,"\n".join(tmp.keys())) else: write_file(filename,content.strip()) def check_vpn_status(): child = subprocess.Popen("ping -c 4 twitter.com", shell=True, stdout=subprocess.PIPE) child.wait() log = child.stdout.read() if re.search("\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", log): if re.findall("\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", log)[0].strip() in dns_gfw: return False else: if re.findall("\d%", log)[0].strip() == "0%": return True else: return False else: # can‘t ping dst return False def check_vpn_connect(): tun = subprocess.Popen("ip a | grep tun | wc -l", shell=True,stdout=subprocess.PIPE) if int(tun.stdout.read()) == 0: return False else: return True def vpn_connect(ip): os.system(‘(openvpn --client --config %s --route-noexec> /dev/null 2>&1 &)‘ % ip) def status_check(ser): #, child): time.sleep(11) if check_vpn_connect(): print "VPN Connect OK" try: subprocess.call(["/etc/init.d/nscd", "restart"]) except: subprocess.call(["/etc/init.d/dns-clean", "restart"]) return check_vpn_status() else: return False def vpn_check_net(): child2 = subprocess.Popen("wget -T 5 -t 2 --spider https://twitter.com 2>&1 | grep Length", shell=True, stdout=subprocess.PIPE) child2.wait() if child2.stdout.read().strip() == ‘‘: return True else: return False def read_ini(filename): tmp = {} if os.path.exists(filename): f = open(filename, ‘r‘) for i in f: tmp[i.strip()] = ‘‘ f.close() else: os.system(‘touch %s‘ % filename) return tmp def vpn_check_line(): ping_child = subprocess.Popen(‘ping -c 4 twitter.com‘, shell=True, stdout=subprocess.PIPE) ping_child.wait() try: ping_value = ping_child.stdout.read().split(‘\n‘)[-2].split(‘/‘)[-3] print "Ping avg is %s" % ping_value if float(ping_value) < 400: return False else: return True except: return True def vpn_tun_ip(): tun = subprocess.Popen("ip r | grep tun | awk ‘{print $1}‘", shell=True,stdout=subprocess.PIPE) tun.wait() return tun.stdout.read() def vpn_route(remote_ser): os.system(‘route del 199.59.150.39‘) os.system(‘route add %s gw 192.168.100.1‘ % remote_ser) os.system(‘route add -net 0.0.0.0 gw %s‘ % remote_ser) os.system(‘route add -net 128.0.0.0 gw %s‘ % remote_ser) #----------------------------------------------------------------------------- if os.getuid() != 0: print "Please Run As Root!" del_tmp_exit() bad_vpn_ser = read_ini(vpn_bad_ser) if not os.path.exists(vpn_ser_list): os.system(‘rm -rf %s‘ % vpn_tmp_dir) else: curr_ser = check_vpn_old_ser(vpn_ser_list) if len(curr_ser) == 0: os.system(‘rm -rf %s‘ % vpn_tmp_dir) if os.path.exists(vpn_tmp_dir): for ser in curr_ser: if ser.strip() in bad_vpn_ser.keys(): continue print "Current Test SERVER %s" % ser.strip() vpn_connect(vpn_ser_config_dir + "/" + ser.strip() + ".ovpn") if status_check(ser.strip()): #, vpn_child): route_ip = vpn_tun_ip() os.system(‘route add 199.59.150.39 gw %s‘ % route_ip) print "Current Useable Server %s" % ser.strip() ok_times = 0 for j in range(1,11): time.sleep(60) if vpn_check_net(): bad_vpn_ser[ser] = ‘‘ os.system(‘rm -f %s‘ % vpn_ser_config_dir + "/" + ser.strip() + ".ovpn") break if vpn_check_line(): bad_vpn_ser[ser] = ‘‘ os.system(‘rm -f %s‘ % vpn_ser_config_dir + "/" + ser.strip() + ".ovpn") break ok_times += 1 print "wget twitter.com %s times OK" % j if ok_times == 10: vpn_route(route_ip) break else: os.system(‘rm -f %s‘ % vpn_ser_config_dir + "/" + ser.strip() + ".ovpn") bad_vpn_ser[ser] = ‘‘ os.system("for i in $(ps aux | grep %s | grep -v grep | awk ‘{print $2}‘); do kill -9 $i; done" % ser.strip()) time.sleep(3) if len(bad_vpn_ser) != 0: new_list = list(set(curr_ser).difference(set(bad_vpn_ser.keys()))) write_file(vpn_bad_ser, "\n".join(bad_vpn_ser.keys())) if len(curr_ser) == 0: os.chdir(‘/tmp‘) os.system(‘rm -rf %s‘ % vpn_tmp_dir) del_tmp_exit() else: write_file(vpn_ser_list, "\n".join(new_list)) del_tmp_exit() else: vpn_gate = getMail() os.mkdir(vpn_tmp_dir) os.chdir(vpn_tmp_dir) os.mkdir(vpn_ser_config_dir) if len(vpn_gate) == 0: print "Can‘t Get VPN Mirror Site" if not os.path.exists(vpn_ser_old): os.system("mkdir /etc/vpngate") os.system("touch %s" % vpn_ser_old) del_tmp_exit() if raw_input("Use Old Mirror Server? [y|n] ").lower().strip() == ‘y‘: vpn_gate = check_vpn_old_ser(vpn_ser_old) if vpn_gate == []: print "Can‘t Get VPN Mirror Site And No Old Server" del_tmp_exit() else: del_tmp_exit() wget_ok = 0 for i in vpn_gate: # csv list htt://www.vpngate.net/api/iphone/ if "vpngate" in i: continue wget_file_size = os.popen("wget -T 5 -t 2 --spider %sapi/iphone/ 2>&1 | grep Length" % i).read() if wget_file_size.strip() == ‘‘: check_vpn_old_ser(vpn_ser_old, i, True) continue wget_file_size = wget_file_size.strip().split()[1] if wget_ok == 1: continue else: child = subprocess.Popen(‘wget -T 5 -t 2 -q %sapi/iphone/ -O %s‘ % (i, vpn_ser_wget),shell=True) child.wait() if check_wget(vpn_ser_wget, wget_file_size): print "Current Mirrors Useable Is %s" % i check_vpn_old_ser(vpn_ser_old, i) wget_ok == 1 if not os.path.exists(vpn_ser_wget): print "Wget File ERROR!" del_tmp_exit() vpn_ser_list_tmp = get_vpn_list(vpn_ser_wget, vpn_ser_config_dir, vpn_ser_list) for ser in vpn_ser_list_tmp: if ser.strip() in bad_vpn_ser.keys(): continue print "Current Test SERVER %s" % ser.strip() vpn_connect(vpn_ser_config_dir + "/" + ser.strip() + ".ovpn") if status_check(ser.strip()): #, vpn_child): route_ip = vpn_tun_ip() os.system(‘route add 199.59.150.39 gw %s‘ % route_ip) print "Current Useable Server %s" % ser.strip() ok_times = 0 for j in range(1,11): time.sleep(60) if vpn_check_net(): bad_vpn_ser[ser] = ‘‘ os.system(‘rm -f %s‘ % vpn_ser_config_dir + "/" + ser.strip() + ".ovpn") break if vpn_check_line(): bad_vpn_ser[ser] = ‘‘ os.system(‘rm -f %s‘ % vpn_ser_config_dir + "/" + ser.strip() + ".ovpn") break ok_times += 1 print "wget twitter.com %s times OK" % j if ok_times == 10: vpn_route(route_ip) break else: os.system(‘rm -f %s‘ % vpn_ser_config_dir + "/" + ser.strip() + ".ovpn") bad_vpn_ser[ser] = ‘‘ os.system("for i in $(ps aux | grep %s | grep -v grep | awk ‘{print $2}‘); do kill -9 $i; done" % ser.strip()) time.sleep(3) if len(bad_vpn_ser) != 0: for i in bad_vpn_ser: if i in vpn_ser_list_tmp: del vpn_ser_list_tmp[i] write_file(vpn_bad_ser, "\n".join(bad_vpn_ser.keys())) if len(vpn_ser_list) == 0: os.chdir(‘/tmp‘) os.system(‘rm -rf %s‘ % vpn_tmp_dir) else: write_file(vpn_ser_list, "\n".join(vpn_ser_list_tmp.keys())) print "VPN TEST OK" del_tmp_exit()
时间: 2024-11-09 11:31:23