http://blog.csdn.net/denny_233/article/details/40484365
2014
所謂線程池,就是程序的初始化階段,就預先創建一批線程,每個線程都做好準備干活;
然后初始化一個任務列表,當有任務來了,就往任務列表里面添加;
任務列表里面有任務了,這時候那些等待的線程們就要搶活干了,怎么搶,使用各種線程同步手段(互斥量,臨界區等),人品好的線程搶到任務后,從任務列表取出任務,就可以開始干活了。干完以后,就又繼續回到初始等待狀態,準備搶奪下一個任務。
/**********************************************************************************************************************************************/
這樣就好比你有一批小弟排隊在那里等著,一旦有任務,他們會很守紀律的去搶著干,每個任務都會被一個小弟搶走,干完以后,小弟不用休息,繼續等著搶下一個任務干活。這樣當你的任務源源不斷的到達,你的小弟們就一個個爭先恐后的搶過來完成,絕不偷懶。
相 反,如果不使用線程池,每次等到任務來了,再臨時創建線程。這樣就相當于每次有任務時,你再臨時招聘一個小弟過來,小弟完成任務后,就回家了。然后下次再有任務,又招聘一個小弟過來,完成任務后,回家。相比線程池,中間招聘小弟的時間就要額外耗費時間和精力了(創建和銷毀線程中,cpu的時間,內存的分配)。/*********************************************************************************************************************************************/
總結來說,線程池有4個組成部分
線程池管理器(ThreadPoolManager):用于創建并管理線程池
工作線程(WorkThread): 線程池中線程
任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行。
任務隊列:用于存放沒有處理的任務。提供一種緩沖機制。
所以,使用線程池,就省去了哪些額外的線程開銷,從而連續的完成所有的任務。當然,線程池中用于線程同步的操作同樣也有一定的消耗,但這個消耗是相對小的。另外,還可以對線程池中的線程根據當前的任務量進行動態的調整,從而更好的節省相關資源。
什么時候適合用線程池:
1. 需要大量的線程來完成,且完成時間比較段。比如WEB服務器完成網頁的請求,使用線程池技術非常適合。對于長時間的任務,如telnet鏈接什么的,使用線程池就沒有什么有點了。
2、對性能要求苛刻的應用,比如要求服務器迅速響應客戶請求。
3、接受突發性的大量請求,但不至于使服務器因此產生大量線程的應用。短時間內產生大量線程可能使內存達到極致,并出現”outofMemory”情況。
下面是一個網上的代碼用例:
thpool.h代碼如下:
- /**********************************
- * @author Johan Hanssen Seferidis
- * @date 12/08/2011
- * Last update: 01/11/2011
- * License: LGPL
- *
- **********************************/
-
- /* Description: Library providing a threading pool where you can add work on the fly. The number
- * of threads in the pool is adjustable when creating the pool. In most cases
- * this should equal the number of threads supported by your cpu.
- *
- * For an example on how to use the threadpool, check the main.c file or just read
- * the documentation.
- *
- * In this header file a detailed overview of the functions and the threadpool logical
- * scheme is present in case tweaking of the pool is needed.
- * */
-
- /*
- * Fast reminders:
- *
- * tp = threadpool
- * thpool = threadpool
- * thpool_t = threadpool type
- * tp_p = threadpool pointer
- * sem = semaphore
- * xN = x can be any string. N stands for amount
- *
- * */
-
- /* _______________________________________________________
- * / \
- * | JOB QUEUE | job1 | job2 | job3 | job4 | .. |
- * | |
- * | threadpool | thread1 | thread2 | .. |
- * \_______________________________________________________/
- *
- * Description: Jobs are added to the job queue. Once a thread in the pool
- * is idle, it is assigned with the first job from the queue(and
- * erased from the queue). It's each thread's job to read from
- * the queue serially(using lock) and executing each job
- * until the queue is empty.
- * 描述: 每來一個Job,會加入的隊列中去,當池子中有空閑線程時,從隊列中取出job
- *
- * Scheme:
- *
- * thpool______ jobqueue____ ______
- * | | | | .----------->|_job0_| Newly added job
- * | | | head------------' |_job1_|
- * | jobqueue----------------->| | |_job2_|
- * | | | tail------------. |__..__|
- * |___________| |___________| '----------->|_jobn_| Job for thread to take
- *
- *
- * job0________
- * | |
- * | function----> //Job具有通用的接口,供工作線程調度試用
- * | |
- * | arg------->
- * | | job1________
- * | next-------------->| |
- * |___________| | |..
- */
-
- #ifndef _THPOOL_
- #define _THPOOL_
-
- #include <pthread.h>
- #include <semaphore.h>
-
- /* ================================= STRUCTURES ================================================ */
- /* Individual job */
- typedef struct thpool_job_t{
- void* (*function)(void* arg); /**< function pointer */
- void* arg; /**< function's argument */
- struct thpool_job_t* next; /**< pointer to next job */
- struct thpool_job_t* prev; /**< pointer to previous job */
- }thpool_job_t;
-
- /* Job queue as doubly linked list */
- typedef struct thpool_jobqueue{
- thpool_job_t *head; /**< pointer to head of queue */
- thpool_job_t *tail; /**< pointer to tail of queue */
- int jobsN; /**< amount of jobs in queue */
- sem_t *queueSem; /**< semaphore(this is probably just holding the same as jobsN) */
- }thpool_jobqueue;
-
- /* The threadpool */
- typedef struct thpool_t{
- pthread_t* threads; /**< pointer to threads' ID */
- int threadsN; /**< amount of threads */
- thpool_jobqueue* jobqueue; /**< pointer to the job queue */
- }thpool_t;
-
- /* Container for all things that each thread is going to need */
- typedef struct thread_data{
- pthread_mutex_t *mutex_p;
- thpool_t *tp_p;
- }thread_data;
-
-
- /* =========================== FUNCTIONS ================================================ */
- /* ----------------------- Threadpool specific --------------------------- */
-
- /**
- * @brief Initialize threadpool
- *
- * Allocates memory for the threadpool, jobqueue, semaphore and fixes
- * pointers in jobqueue.
- *
- * @param number of threads to be used
- * @return threadpool struct on success,
- * NULL on error
- */
- thpool_t* thpool_init(int threadsN);
-
- /**
- * @brief What each thread is doing
- *
- * In principle this is an endless loop. The only time this loop gets interuppted is once
- * thpool_destroy() is invoked.
- *
- * @param threadpool to use
- * @return nothing
- */
- void thpool_thread_do(thpool_t* tp_p);
-
- /**
- * @brief Add work to the job queue
- *
- * Takes an action and its argument and adds it to the threadpool's job queue.
- * If you want to add to work a function with more than one arguments then
- * a way to implement this is by passing a pointer to a structure.
- *
- * ATTENTION: You have to cast both the function and argument to not get warnings.
- *
- * @param threadpool to where the work will be added to
- * @param function to add as work
- * @param argument to the above function
- * @return int
- */
- int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p);
-
- /**
- * @brief Destroy the threadpool
- *
- * This will 'kill' the threadpool and free up memory. If threads are active when this
- * is called, they will finish what they are doing and then they will get destroyied.
- *
- * @param threadpool a pointer to the threadpool structure you want to destroy
- */
- void thpool_destroy(thpool_t* tp_p);
-
-
- /* ------------------------- Queue specific ------------------------------ */
- /**
- * @brief Initialize queue
- * @param pointer to threadpool
- * @return 0 on success,
- * -1 on memory allocation error
- */
- int thpool_jobqueue_init(thpool_t* tp_p);
-
- /**
- * @brief Add job to queue
- *
- * A new job will be added to the queue. The new job MUST be allocated
- * before passed to this function or else other functions like thpool_jobqueue_empty()
- * will be broken.
- *
- * @param pointer to threadpool
- * @param pointer to the new job(MUST BE ALLOCATED)
- * @return nothing
- */
- void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p);
-
- /**
- * @brief Remove last job from queue.
- *
- * This does not free allocated memory so be sure to have peeked() \n
- * before invoking this as else there will result lost memory pointers.
- *
- * @param pointer to threadpool
- * @return 0 on success,
- * -1 if queue is empty
- */
- int thpool_jobqueue_removelast(thpool_t* tp_p);
-
- /**
- * @brief Get last job in queue (tail)
- *
- * Gets the last job that is inside the queue. This will work even if the queue
- * is empty.
- *
- * @param pointer to threadpool structure
- * @return job a pointer to the last job in queue,
- * a pointer to NULL if the queue is empty
- */
- thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p);
-
- /**
- * @brief Remove and deallocate all jobs in queue
- *
- * This function will deallocate all jobs in the queue and set the
- * jobqueue to its initialization values, thus tail and head pointing
- * to NULL and amount of jobs equal to 0.
- *
- * @param pointer to threadpool structure
- * */
- void thpool_jobqueue_empty(thpool_t* tp_p);
-
- #endif
- </span>
thpool.c代碼如下
- /* ********************************
- *
- * Author: Johan Hanssen Seferidis
- * Date: 12/08/2011
- * Update: 01/11/2011
- * License: LGPL
- *
- *
- *//** @file thpool.h *//*
- ********************************/
-
- /* Library providing a threading pool where you can add work. For an example on
- * usage you refer to the main file found in the same package */
-
- /*
- * Fast reminders:
- *
- * tp = threadpool
- * thpool = threadpool
- * thpool_t = threadpool type
- * tp_p = threadpool pointer
- * sem = semaphore
- * xN = x can be any string. N stands for amount
- *
- * */
-
- #include <stdio.h>
- #include <stdlib.h>
- #include <pthread.h>
- #include <semaphore.h>
- #include <errno.h>
-
- #include "thpool.h" /* here you can also find the interface to each function */
-
-
- static int thpool_keepalive=1;
-
- /* Create mutex variable */
- pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */
-
-
- /* Initialise thread pool */
- thpool_t* thpool_init(int threadsN){
- thpool_t* tp_p;
-
- if (!threadsN || threadsN<1) threadsN=1;
-
- /* Make new thread pool */
- tp_p=(thpool_t*)malloc(sizeof(thpool_t)); /* MALLOC thread pool */
- if (tp_p==NULL){
- fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
- return NULL;
- }
- tp_p->threads=(pthread_t*)malloc(threadsN*sizeof(pthread_t)); /* MALLOC thread IDs */
- if (tp_p->threads==NULL){
- fprintf(stderr, "thpool_init(): Could not allocate memory for thread IDs\n");
- return NULL;
- }
- tp_p->threadsN=threadsN;
-
- /* Initialise the job queue */
- if (thpool_jobqueue_init(tp_p)==-1){
- fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
- return NULL;
- }
-
- /* Initialise semaphore,關于信號量的操作請參考其他*/
- tp_p->jobqueue->queueSem=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC job queue semaphore */
- sem_init(tp_p->jobqueue->queueSem, 0, 0); /* no shared, initial value */
-
- /* Make threads in pool */
- int t;
- for (t=0; t<threadsN; t++){
- printf("Created thread %d in pool \n", t);
- pthread_create(&(tp_p->threads[t]), NULL, (void *)thpool_thread_do, (void *)tp_p); /* MALLOCS INSIDE PTHREAD HERE */
- }
-
- return tp_p;
- }
-
-
- /* What each individual thread is doing ,線程池的入口函數,從隊列中抽取共同接口的Task;在這里即thpool_job_t的
- 前2個參數void* (*function)(void* arg) 及void *arg. */
- /* There are two scenarios here. One is everything works as it should and second if
- * the thpool is to be killed. In that manner we try to BYPASS sem_wait and end each thread. */
- void thpool_thread_do(thpool_t* tp_p){
-
- while(thpool_keepalive){
- //收到sem_post signal
- if (sem_wait(tp_p->jobqueue->queueSem)) {/* WAITING until there is work in the queue */
- perror("thpool_thread_do(): Waiting for semaphore");
- exit(1);
- }
-
- if (thpool_keepalive){
-
- /* Read job from queue and execute it */
- void*(*func_buff)(void* arg);
- void* arg_buff;
- thpool_job_t* job_p;
-
- pthread_mutex_lock(&mutex); /* LOCK */
-
- job_p = thpool_jobqueue_peek(tp_p);
- func_buff=job_p->function;
- arg_buff =job_p->arg; /*每個入隊列的Task都具有的接口*/
- thpool_jobqueue_removelast(tp_p);
-
- pthread_mutex_unlock(&mutex); /* UNLOCK */
-
- func_buff(arg_buff); /* run function */
- free(job_p); /* DEALLOC job */
- }
- else
- {
- return; /* EXIT thread*/
- }
- }
- return;
- }
-
-
- /* Add work to the thread pool */
- int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){
- thpool_job_t* newJob;
-
- newJob=(thpool_job_t*)malloc(sizeof(thpool_job_t)); /* MALLOC job */
- if (newJob==NULL){
- fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
- exit(1);
- }
-
- /* add function and argument */
- newJob->function=function_p;
- newJob->arg=arg_p;
-
- /* add job to queue */
- pthread_mutex_lock(&mutex); /* LOCK */
- thpool_jobqueue_add(tp_p, newJob);
- pthread_mutex_unlock(&mutex); /* UNLOCK */
-
- return 0;
- }
-
-
- /* Destroy the threadpool */
- void thpool_destroy(thpool_t* tp_p){
- int t;
-
- /* End each thread's infinite loop */
- thpool_keepalive=0;
-
- /* Awake idle threads waiting at semaphore */
- for (t=0; t<(tp_p->threadsN); t++){
- if (sem_post(tp_p->jobqueue->queueSem)){
- fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n");
- }
- }
-
- /* Kill semaphore */
- if (sem_destroy(tp_p->jobqueue->queueSem)!=0){
- fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n");
- }
-
- /* Wait for threads to finish */
- for (t=0; t<(tp_p->threadsN); t++){
- pthread_join(tp_p->threads[t], NULL);
- }
-
- thpool_jobqueue_empty(tp_p);
-
- /* Dealloc */
- free(tp_p->threads); /* DEALLOC threads */
- free(tp_p->jobqueue->queueSem); /* DEALLOC job queue semaphore */
- free(tp_p->jobqueue); /* DEALLOC job queue */
- free(tp_p); /* DEALLOC thread pool */
- }
-
-
-
- /* =================== JOB QUEUE OPERATIONS ===================== */
-
- /* Initialise queue */
- int thpool_jobqueue_init(thpool_t* tp_p){
- tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue)); /* MALLOC job queue */
- if (tp_p->jobqueue==NULL) return -1;
- tp_p->jobqueue->tail=NULL;
- tp_p->jobqueue->head=NULL;
- tp_p->jobqueue->jobsN=0;
- return 0;
- }
-
- /* Add job to queue */
- void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p){ /* remember that job prev and next point to NULL */
-
- newjob_p->next=NULL;
- newjob_p->prev=NULL;
-
- thpool_job_t *oldFirstJob;
- oldFirstJob = tp_p->jobqueue->head;
-
- /* fix jobs' pointers */
- switch(tp_p->jobqueue->jobsN){
-
- case 0: /* if there are no jobs in queue */
- tp_p->jobqueue->tail=newjob_p;
- tp_p->jobqueue->head=newjob_p;
- break;
-
- default: /* if there are already jobs in queue */
- oldFirstJob->prev=newjob_p;
- newjob_p->next=oldFirstJob;
- tp_p->jobqueue->head=newjob_p;
-
- }
-
- (tp_p->jobqueue->jobsN)++; /* increment amount of jobs in queue */
- sem_post(tp_p->jobqueue->queueSem);
-
- int sval;
- sem_getvalue(tp_p->jobqueue->queueSem, &sval);
- }
-
-
- /* Remove job from queue */
- int thpool_jobqueue_removelast(thpool_t* tp_p){
- thpool_job_t *oldLastJob;
- oldLastJob = tp_p->jobqueue->tail;
-
- /* fix jobs' pointers */
- switch(tp_p->jobqueue->jobsN){
-
- case 0: /* if there are no jobs in queue */
- return -1;
- break;
-
- case 1: /* if there is only one job in queue */
- tp_p->jobqueue->tail=NULL;
- tp_p->jobqueue->head=NULL;
- break;
-
- default: /* if there are more than one jobs in queue */
- oldLastJob->prev->next=NULL; /* the almost last item */
- tp_p->jobqueue->tail=oldLastJob->prev;
-
- }
-
- (tp_p->jobqueue->jobsN)--;
-
- int sval;
- sem_getvalue(tp_p->jobqueue->queueSem, &sval);
- return 0;
- }
-
-
- /* Get first element from queue */
- thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p){
- return tp_p->jobqueue->tail;
- }
-
- /* Remove and deallocate all jobs in queue */
- void thpool_jobqueue_empty(thpool_t* tp_p){
-
- thpool_job_t* curjob;
- curjob=tp_p->jobqueue->tail;
-
- while(tp_p->jobqueue->jobsN){
- tp_p->jobqueue->tail=curjob->prev;
- free(curjob);
- curjob=tp_p->jobqueue->tail;
- tp_p->jobqueue->jobsN--;
- }
-
- /* Fix head and tail */
- tp_p->jobqueue->tail=NULL;
- tp_p->jobqueue->head=NULL;
- }
- </span>
main.c 代碼如下
- /*
- * This is just an example on how to use the thpool library
- *
- * We create a pool of 4 threads and then add 20 tasks to the pool(10 task1
- * functions and 10 task2 functions).
- *
- * Task1 doesn't take any arguments. Task2 takes an integer. Task2 is used to show
- * how to add work to the thread pool with an argument.
- *
- * As soon as we add the tasks to the pool, the threads will run them. One thread
- * may run x tasks in a row so if you see as output the same thread running several
- * tasks, it's not an error.
- *
- * All jobs will not be completed and in fact maybe even none will. You can add a sleep()
- * function if you want to complete all tasks in this test file to be able and see clearer
- * what is going on.
- *
- * */
-
- #include <stdio.h>
-
- #include "thpool.h"
-
-
-
- /* Some arbitrary task 1 */
- void task1(){
- printf("# Thread working: %u\n", (int)pthread_self());
- printf(" Task 1 running..\n");
- }
-
-
-
- /* Some arbitrary task 2 */
- void task2(int a){
- printf("# Thread working: %u\n", (int)pthread_self());
- printf(" Task 2 running..\n");
- printf("%d\n", a);
- }
-
-
-
- int main(){
- int i;
-
- thpool_t* threadpool; /* make a new thread pool structure */
- threadpool=thpool_init(4); /* initialise it to 4 number of threads */
-
-
- puts("Adding 20 tasks to threadpool");
- int a=54;
- for (i=0; i<10; i++){
- thpool_add_work(threadpool, (void*)task1, NULL);
- thpool_add_work(threadpool, (void*)task2, (void*)a);
- };
-
-
- puts("Will kill threadpool");
- thpool_destroy(threadpool);
-
- return 0;
- }
- </span>
本站僅提供存儲服務,所有內容均由用戶發布,如發現有害或侵權內容,請
點擊舉報。