精品伊人久久大香线蕉,开心久久婷婷综合中文字幕,杏田冲梨,人妻无码aⅴ不卡中文字幕

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
多種方法實現(xiàn) python 線程池

最近在做一個爬蟲相關的項目,單線程的整站爬蟲,耗時真的不是一般的巨大,運行一次也是心累,,,所以,要想實現(xiàn)整站爬蟲,多線程是不可避免的,那么python多線程又應該怎樣實現(xiàn)呢?這里主要要幾個問題(關于python多線程的GIL問題就不再說了,網(wǎng)上太多了)。

一、 既然多線程可以縮短程序運行時間,那么,是不是線程數(shù)量越多越好呢?

顯然,并不是,每一個線程的從生成到消亡也是需要時間和資源的,太多的線程會占用過多的系統(tǒng)資源(內存開銷,cpu開銷),而且生成太多的線程時間也是可觀的,很可能會得不償失,這里給出一個最佳線程數(shù)量的計算方式:

最佳線程數(shù)的獲取:

1、通過用戶慢慢遞增來進行性能壓測,觀察QPS(即每秒的響應請求數(shù),也即是最大吞吐能力。),響應時間

2、根據(jù)公式計算:服務器端最佳線程數(shù)量=((線程等待時間+線程cpu時間)/線程cpu時間) * cpu數(shù)量

3、單用戶壓測,查看CPU的消耗,然后直接乘以百分比,再進行壓測,一般這個值的附近應該就是最佳線程數(shù)量。

二、為什么要使用線程池?

對于任務數(shù)量不斷增加的程序,每有一個任務就生成一個線程,最終會導致線程數(shù)量的失控,例如,整站爬蟲,假設初始只有一個鏈接a,那么,這個時候只啟動一個線程,運行之后,得到這個鏈接對應頁面上的b,c,d,,,等等新的鏈接,作為新任務,這個時候,就要為這些新的鏈接生成新的線程,線程數(shù)量暴漲。在之后的運行中,線程數(shù)量還會不停的增加,完全無法控制。所以,對于任務數(shù)量不端增加的程序,固定線程數(shù)量的線程池是必要的。

三、如何實現(xiàn)線程池?

這里,我分別介紹三種實現(xiàn)方式:

1、過去:

使用threadpool模塊,這是個python的第三方模塊,支持python2和python3,具體使用方式如下:

#! /usr/bin/env python# -*- coding: utf-8 -*-import threadpoolimport timedef sayhello (a):    print("hello: "+a)    time.sleep(2)def main():    global result    seed=["a","b","c"]    start=time.time()    task_pool=threadpool.ThreadPool(5)    requests=threadpool.makeRequests(sayhello,seed)    for req in requests:        task_pool.putRequest(req)    task_pool.wait()    end=time.time()    time_m = end-start    print("time: "+str(time_m))    start1=time.time()    for each in seed:        sayhello(each)    end1=time.time()    print("time1: "+str(end1-start1))if __name__ == '__main__':    main()

運行結果如下:

threadpool是一個比較老的模塊了,現(xiàn)在雖然還有一些人在用,但已經(jīng)不再是主流了,關于python多線程,現(xiàn)在已經(jīng)開始步入未來(future模塊)了

2、未來:

使用concurrent.futures模塊,這個模塊是python3中自帶的模塊,但是,python2.7以上版本也可以安裝使用,具體使用方式如下:

#! /usr/bin/env python# -*- coding: utf-8 -*-from concurrent.futures import ThreadPoolExecutorimport timedef sayhello(a):    print("hello: "+a)    time.sleep(2)def main():    seed=["a","b","c"]    start1=time.time()    for each in seed:        sayhello(each)    end1=time.time()    print("time1: "+str(end1-start1))    start2=time.time()    with ThreadPoolExecutor(3) as executor:        for each in seed:            executor.submit(sayhello,each)    end2=time.time()    print("time2: "+str(end2-start2))    start3=time.time()    with ThreadPoolExecutor(3) as executor1:        executor1.map(sayhello,seed)    end3=time.time()    print("time3: "+str(end3-start3))if __name__ == '__main__':    main()

運行結果如下:

注意到一點:

concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數(shù),另一種是map()函數(shù),兩者的主要區(qū)別在于:

2.1、map可以保證輸出的順序, submit輸出的順序是亂的

2.2、如果你要提交的任務的函數(shù)是一樣的,就可以簡化成map。但是假如提交的任務函數(shù)是不一樣的,或者執(zhí)行的過程之可能出現(xiàn)異常(使用map執(zhí)行過程中發(fā)現(xiàn)問題會直接拋出錯誤)就要用到submit()

2.3、submit和map的參數(shù)是不同的,submit每次都需要提交一個目標函數(shù)和對應的參數(shù),map只需要提交一次目標函數(shù),目標函數(shù)的參數(shù)放在一個迭代器(列表,字典)里就可以。

3.現(xiàn)在?

這里要考慮一個問題,以上兩種線程池的實現(xiàn)都是封裝好的,任務只能在線程池初始化的時候添加一次,那么,假設我現(xiàn)在有這樣一個需求,需要在線程池運行時,再往里面添加新的任務(注意,是新任務,不是新線程),那么要怎么辦?

其實有兩種方式:

3.1、重寫threadpool或者future的函數(shù):

這個方法需要閱讀源模塊的源碼,必須搞清楚源模塊線程池的實現(xiàn)機制才能正確的根據(jù)自己的需要重寫其中的方法。

3.2、自己構建一個線程池:

這個方法就需要對線程池的有一個清晰的了解了,附上我自己構建的一個線程池:

#! /usr/bin/env python# -*- coding: utf-8 -*-import threadingimport Queueimport hashlibimport loggingfrom utils.progress import PrintProgressfrom utils.save import SaveToSqliteclass ThreadPool(object):    def __init__(self, thread_num, args):        self.args = args        self.work_queue = Queue.Queue()        self.save_queue = Queue.Queue()        self.threads = []        self.running = 0        self.failure = 0        self.success = 0        self.tasks = {}        self.thread_name = threading.current_thread().getName()        self.__init_thread_pool(thread_num)    # 線程池初始化    def __init_thread_pool(self, thread_num):        # 下載線程        for i in range(thread_num):            self.threads.append(WorkThread(self))        # 打印進度信息線程        self.threads.append(PrintProgress(self))        # 保存線程        self.threads.append(SaveToSqlite(self, self.args.dbfile))    # 添加下載任務    def add_task(self, func, url, deep):        # 記錄任務,判斷是否已經(jīng)下載過        url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()        if not url_hash in self.tasks:            self.tasks[url_hash] = url            self.work_queue.put((func, url, deep))            logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))    # 獲取下載任務    def get_task(self):        # 從隊列里取元素,如果block=True,則一直阻塞到有可用元素為止。        task = self.work_queue.get(block=False)        return task    def task_done(self):        # 表示隊列中的某個元素已經(jīng)執(zhí)行完畢。        self.work_queue.task_done()    # 開始任務    def start_task(self):        for item in self.threads:            item.start()        logging.debug("Work start")    def increase_success(self):        self.success += 1    def increase_failure(self):        self.failure += 1    def increase_running(self):        self.running += 1    def decrease_running(self):        self.running -= 1    def get_running(self):        return self.running    # 打印執(zhí)行信息    def get_progress_info(self):        progress_info = {}        progress_info['work_queue_number'] = self.work_queue.qsize()        progress_info['tasks_number'] = len(self.tasks)        progress_info['save_queue_number'] = self.save_queue.qsize()        progress_info['success'] = self.success        progress_info['failure'] = self.failure        return progress_info    def add_save_task(self, url, html):        self.save_queue.put((url, html))    def get_save_task(self):        save_task = self.save_queue.get(block=False)        return save_task    def wait_all_complete(self):        for item in self.threads:            if item.isAlive():                # join函數(shù)的意義,只有當前執(zhí)行join函數(shù)的線程結束,程序才能接著執(zhí)行下去                item.join()# WorkThread 繼承自threading.Threadclass WorkThread(threading.Thread):    # 這里的thread_pool就是上面的ThreadPool類    def __init__(self, thread_pool):        threading.Thread.__init__(self)        self.thread_pool = thread_pool    #定義線程功能方法,即,當thread_1,...,thread_n,調用start()之后,執(zhí)行的操作。    def run(self):        print (threading.current_thread().getName())        while True:            try:                # get_task()獲取從工作隊列里獲取當前正在下載的線程,格式為func,url,deep                do, url, deep = self.thread_pool.get_task()                self.thread_pool.increase_running()                # 判斷deep,是否獲取新的鏈接                flag_get_new_link = True                if deep >= self.thread_pool.args.deep:                    flag_get_new_link = False                # 此處do為工作隊列傳過來的func,返回值為一個頁面內容和這個頁面上所有的新鏈接                html, new_link = do(url, self.thread_pool.args, flag_get_new_link)                if html == '':                    self.thread_pool.increase_failure()                else:                    self.thread_pool.increase_success()                    # html添加到待保存隊列                    self.thread_pool.add_save_task(url, html)                # 添加新任務,即,將新頁面上的不重復的鏈接加入工作隊列。                if new_link:                    for url in new_link:                        self.thread_pool.add_task(do, url, deep + 1)                self.thread_pool.decrease_running()                # self.thread_pool.task_done()            except Queue.Empty:                if self.thread_pool.get_running() <= 0:                    break            except Exception, e:                self.thread_pool.decrease_running()                # print str(e)                break

 

本站僅提供存儲服務,所有內容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權內容,請點擊舉報
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
linux下C實現(xiàn)多線程
進程池和線程池、協(xié)程
python多進程并發(fā)與pool多線程
Python爬蟲(五)
線程池是怎么一回事
MariaDB線程池源碼分析
更多類似文章 >>
生活服務
分享 收藏 導長圖 關注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服

主站蜘蛛池模板: 巢湖市| 富平县| 和田市| 江都市| 湟源县| 西峡县| 大化| 蕉岭县| 瓦房店市| 宝鸡市| 梧州市| 天祝| 梁山县| 晋城| 汨罗市| 民勤县| 偃师市| 波密县| 安泽县| 读书| 台州市| 东源县| 达日县| 东辽县| 上饶市| 普安县| 高唐县| 珠海市| 怀安县| 北安市| 延津县| 普兰店市| 灵寿县| 五台县| 绥中县| 永嘉县| 太康县| 商河县| 沧州市| 镇宁| 蓬莱市|