Python实现的Google IP 可用性检测脚本
需要Python3.4+,一个参数用来选择测试搜索服务还是GAE服务。测试GAE服务的话需要先修改开头的两个变量。从标准输入读取IP地址或者IP段(形如192.168.0.0/16)列表,每行一个。可用IP输出到标准输出。实时测试结果输出到标准错误。50线程并发。
checkgoogleip
#!/usr/bin/envpython3
importsys
fromipaddressimportIPv4Network
importhttp.clientasclient
fromconcurrent.futuresimportThreadPoolExecutor
importargparse
importssl
importsocket
#先按自己的情况修改以下几行
APP_ID='your_id_here'
APP_PATH='/fetch.py'
context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode=ssl.CERT_REQUIRED
context.load_verify_locations('/etc/ssl/certs/ca-certificates.crt')
classHTTPSConnection(client.HTTPSConnection):
def__init__(self,*args,hostname=None,**kwargs):
self._hostname=hostname
super().__init__(*args,**kwargs)
defconnect(self):
super(client.HTTPSConnection,self).connect()
ifself._tunnel_host:
server_hostname=self._tunnel_host
else:
server_hostname=self._hostnameorself.host
sni_hostname=server_hostnameifssl.HAS_SNIelseNone
self.sock=self._context.wrap_socket(self.sock,
server_hostname=sni_hostname)
ifnotself._context.check_hostnameandself._check_hostname:
try:
ssl.match_hostname(self.sock.getpeercert(),server_hostname)
exceptException:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
defcheck_ip_p(ip,func):
iffunc(ip):
print(ip,flush=True)
defcheck_for_gae(ip):
return_check(APP_ID+'.appspot.com',APP_PATH,ip)
defcheck_for_search(ip):
return_check('www.google.com','/',ip)
def_check(host,path,ip):
forchanceinrange(1,-1,-1):
try:
conn=HTTPSConnection(
ip,timeout=5,
context=context,
hostname=host,
)
conn.request('GET',path,headers={
'Host':host,
})
response=conn.getresponse()
ifresponse.status<400:
print('GOOD:',ip,file=sys.stderr)
else:
raiseException('HTTPError%s%s'%(
response.status,response.reason))
returnTrue
exceptKeyboardInterrupt:
raise
exceptExceptionase:
ifisinstance(e,ssl.CertificateError):
print('WARN:%sisnotGoogle\'s!'%ip,file=sys.stderr)
chance=0
ifchance==0:
print('BAD:',ip,e,file=sys.stderr)
returnFalse
else:
print('RE:',ip,e,file=sys.stderr)
defmain():
parser=argparse.ArgumentParser(description='CheckGoogleIPs')
parser.add_argument('service',choices=['search','gae'],
help='servicetocheck')
args=parser.parse_args()
func=globals()['check_for_'+args.service]
count=0
withThreadPoolExecutor(max_workers=50)asexecutor:
forlinsys.stdin:
l=l.strip()
if'/'inl:
foripinIPv4Network(l).hosts():
executor.submit(check_ip_p,str(ip),func)
count+=1
else:
executor.submit(check_ip_p,l,func)
count+=1
print('%dIPchecked.'%count)
if__name__=='__main__':
main()