Thread pool usage scenarios and code implementation!
Foreword:
Today I bring you a technical article about the implementation and usage scenarios of thread pools; I believe you often see the usage of this thread pool in the company's code, or you may even hear about memory pools and object pools. , connection pool and other professional terms. Anyway, there are many professional terms with pool, but you will find that they all have a common feature. They all have the word "pool" at the end of "butt". The simple understanding of the word "pool" is that it is used to store To give a simple example, you can use it to store water in a swimming pool!
Okay, let’s briefly talk about the following memory pools and connection pools. I’ll share them with you for later review. Today’s topic is thread pools.
1. Implementation of thread pool:
1. Why use thread pool?
Multi-threaded programming should be familiar to everyone. Last time, a friend asked a question about how much memory a thread occupies. Generally speaking, according to POSIX standards, a thread is about 8M, but we generally have limited memory resources. , when performing high concurrency, for example, multiple clients send requests to the server at the same time:
At this time, if you think about allocating a memory size of about 8M to so many clients, this is realistic and obviously does not work. Let’s calculate it:
-
One thread: 8M
-
1024M can open 128 threads
-
16G memory size can open 16x128, calculated to be about 2048 threads
Therefore, if one thread is allocated to millions of clients, the memory resources will definitely not be enough, so this involves our thread pool, which is why the thread pool is used in this scenario!
In order to help everyone better understand the concept of thread pool, let's take an actual scene in life; everyone is familiar with going to the bank to deposit money or handle related business. When you go to the bank, generally speaking, you will You have to queue up at the window and wait for the person in front to finish the business before it can be your turn to handle the business you want to handle. Inside the window are bank staff who help you handle various businesses. At the same time, there is usually a window above the window to handle business. Prompt electronic information, if it is your turn, you will be notified, and you will know that it is your turn to handle business.
From a professional perspective here (it’s not professional, it’s just an analogy), the business you are handling is a task (that is, a thread, which can be said to be a task queue, because it has to be queued up, and it cannot be executed all at once. With so many tasks, the tasks in the task queue must be executed one by one), and the bank staff is equivalent to taking a task from the task queue to execute. You can think of the bank staff as executing the task queue; and the electronic display notification information, You can think of it as preventing multiple businesses from being handled by one bank staff at the same time in one window. Two windows means two bank staff handle one business at the same time. In other words, this electronic display information is a management component. Management Whether the task can be handled, and whether the bank staff start to handle the business tasks, prevent them from getting messed up, and perform the tasks reasonably and effectively.
So you can see from the above the advantages of using a thread pool:
-
Avoid having too many threads and running out of memory
-
At the beginning, you can put the created thread into the thread pool. When we want to use it, we can take a thread from the thread pool and use it. When we are done with this thread, we can put this thread in the thread pool. Go back to the thread pool; avoid the cost of thread creation and destruction
2. Thread pool implementation template steps:
In fact, the process steps for implementing this thread pool are similar. If you look at the company code carefully or implement a thread pool yourself, the approximate implementation template is as follows:
-
Task queue (people who come to handle business)
-
Execution queue (that is, bank staff execute tasks in the task queue)
-
Management component (orderly execution of management tasks)
3. Thread pool implementation structure definition:
-
Task queue:
struct nTask
{
//用函数指针来存放不同的任务
void (*task_func)(struct nTask *task);
//这个参数用来做任务执行的参数
void *user_data;
//链表节点的定义,这里采用链表的方式实现
struct nTask *prev;
struct nTask *next;
};
-
Execution queue:
struct nWorker
{
pthread_t threadid;//线程id
int terminate;//表示是否终止任务
//表示银行工作人员要执行任务还要向执行组件通告一下
struct nManager *manager;
//还是通过链表的方式来实现执行队列
struct nWorker *prev;
struct nWorker *next;
};
Note: If no one comes here to handle business, the bank staff can only wait for the arrival of the task and then perform the task.
-
Management components:
typedef struct nManager
{
struct nTask *task;
struct nWorker *workers;
pthread_mutex_t mutex;//互斥锁
pthread_cond_t cond;//条件变量
}ThreadPool;
-
Linked list insertion and deletion templates:
//插入
#define LIST_INSERT(item,list) do{\
item->prev=NULL; \
item->next=list; \
if((list)!=NULL) list->prev=item;\
list=item;
}while(0)
//删除
#define LIST_REMOVE(item,list) do{ \
if(item->prev != NULL) item->prev->next = item->next; \
if(item->next !=NULL) item->next->prev=item->prev; \
if(list == item)list = item->netx; \
item->prev=item->next=NULL;\
}while(0)
}
4. The thread pool interface is defined as follows:
-
1. Thread pool initialization interface:
int nThreadPoolCreate(ThreadPool *pool,int numWorkers)
{
//参数pool表示线程池,numWorkers表示线程池里面有多少个任务
}
-
2. Thread pool destruction interface:
int nThreadPoolDestory(ThreadPool *pool,int nWorker)
{
}
-
3. Add a task interface to the thread pool:
int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task)
{
}
-
4. Thread callback function:
void *nThreadPoolCallback(void *arg)
{
}
2. Thread pool project code:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
//链表插入
#define LIST_INSTER(item,list)do{ \
item->prev=NULL; \
item->next=next; \
if(list!=NULL) list->prev=item; \
list=item;
}while(0)
//删除
#define LIST_REMOVE(item,list)do { \
if(item->prev!=NULL)item->prev->next=item->next; \
if(item->next!=NULL)itme->next->prev=item->prev;\
if(list==item)list=item->next;
item->prev=item->next=NULL;
}while(0)
//任务队列
struct nTask
{
void(*task_funt)(struct nTask *task);
void *uset_data;
struct nTask *prev;
struct nTask *next;
};
//执行队列
struct nWorker
{
pthread_t threadid;
int terminate;
struct nManager *manager;
struct nWorker *prev;
struct nWorker *next;
};
//管理组件
typedef struct nManager
{
struct nTask *tasks;
struct nWoker *workers;
pthread_mutex_t mutex;
pthread_cond_t cond;
}ThreadPool;
//线程回调函数
void *nThreadPoolCallback(void *arg)
{
struct nWorker *worker=(struct nWorker*)arg;
while(1)
{
//判断是否有任务
pthread_mutex_lock(&worker->manager-mutex);
while(worker->manager->tasks==NULL)
{
if(worker-terminate)
break;
pthread_cond_wait(&worker->manager->cond,&worker->manager->mutex);//如果没有任务,一直等待任务的到来
}
if(worker->terminate)
{
pthread_mutex_unlock(&worker->manager->mutex);
break;
}
struct nTask *task = worker->manager->tasks;
LIST_REMOVE(task,worker->manager->tasks);
pthread_mutex_unlock(&worker->manager->mutex);
task->task_func(task);
}
free(worker);
}
//创建线程池
int nThreadPoolCreate(ThreadPool *pool, int numWorkers)
{
if(pool == NULL) return -1;
if(numWorkers < 1)numWorkers =1;
memset(&pool,0,sizeof(ThreadPool));
//开始初始化
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->cond,&blank_cond,sizeof(pthread_cond_t));
pthread_mutex_t blank_mutex =PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->mutex,&blank_mutex,sizeof(pthread_mutex_t));
int i =0;//开线程的个数,也就是执行任务的个数
for(i=0;i < numWorkers;i++)
{
struct nWorker *worker =(struct nWorker*)malloc(sizeof(struct nWorker));
if(worker == NUll)
{
perror("malloc");
return -2;
}
memset(worker,0,sizeof(struct nWorker));
worker->manager=pool;
//创建线程
int ret=pthread_create(&worker->pthreadid,NULL,nThreadPoolCallback,worker);
if(ret)
{
perror("pthread_create");
free(worker);
return -3;
}
LIST_INSERT(worker,pool->workers);
}
}
//线程池销毁
int nThreadPoolDestory(ThreadPool *pool,int nWorker)
{
struct nWorker *worker = NULL;
for(worker=pool->workers;worker!=NULL;worker=worker->next)
{
worker->terminate;
}
pthread_mutex_lock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);//做一个广播通知
pthread_mutex_unlock(&pool->mutex);
pool->workers = NULL;
pool->tasks = NULL;
}
//往线程池里面添加任务
int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task)
{
pthread_mutex_lock(&pool->mutex);
LIST_INSERTER(task,pool->tasks);
pthread_cond_sigal(&pool->cond);// 发送一个信号,有人来办理业务了
pthread_mutex_unlock(&pool-mutex);
}
#if 1
#define THREADPOOL_INIT_COUNT 20
#define TASK_INIT_SIZE 1000
void task_entry(struct nTask *task) { //type
//struct nTask *task = (struct nTask*)task;
int idx = *(int *)task->user_data;
printf("idx: %d\n", idx);
free(task->user_data);
free(task);
}
int main(void) {
ThreadPool pool = {0};
nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
// pool --> memset();
int i = 0;
for (i = 0;i < TASK_INIT_SIZE;i ++) {
struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
if (task == NULL) {
perror("malloc");
exit(1);
}
memset(task, 0, sizeof(struct nTask));
task->task_func = task_entry;
task->user_data = malloc(sizeof(int));
*(int*)task->user_data = i;
nThreadPoolPushTask(&pool, task);
}
getchar();
}
The amount of code is a bit large, so you can read it a few times!
3. Summary:
Today’s sharing is here for now. Today’s thread pool sharing took a lot of time to sort out. I will start sharing audio and video next week, and other review knowledge will be sorted out and shared slowly.
-END-
Follow and reply [ 1024 ] Massive Linux information will be given away
Collection of wonderful articles