ThreadPool
Create a txt file and insert some sentences, and apply the thread pool to output the contents of the file to the terminal.
1. “ThreadPool.h”
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
typedef struct{
int head, tail, size, total;
void **data;
pthread_mutex_t mutex;
pthread_cond_t cond1, cond2;
}task_queue;
void task_queue_init(task_queue* Queue, int size);
void task_queue_push(task_queue* Queue, void* data);
void *task_queue_pop(task_queue* Queue);
void *thread_run(void *arg);
#endif
2. “ThreadPool.c”
#include <stdio.h>
#include "head.h"
void task_queue_init(task_queue* Queue, int size){
Queue->size = size;
Queue->total = Queue->head = Queue->tail = 0;
Queue->data = calloc(size, sizeof(void*));
pthread_mutex_init(&Queue->mutex, NULL);
pthread_cond_init(&Queue->cond1, NULL);
pthread_cond_init(&Queue->cond2, NULL);
return ;
}
void task_queue_push(task_queue* Queue, void* data){
pthread_mutex_lock(&Queue->mutex);
while (Queue->total == Queue->size) {
DBG(YELLOW"<push> Queue is full.\n"NONE);
pthread_cond_wait(&Queue->cond2, &Queue->mutex);
}
Queue->total++;
Queue->data[Queue->tail] = data;
DBG(PINK"<push> Queue push successfully.\n"NONE);
if (++Queue->tail == Queue->size) {
Queue->tail = 0;
DBG(PINK"<push> Queue change tail to zero.\n"NONE);
}
pthread_cond_signal(&Queue->cond1);
pthread_mutex_unlock(&Queue->mutex);
return ;
}
void *task_queue_pop(task_queue* Queue) {
pthread_mutex_lock(&Queue->mutex);
while (Queue->total == 0) {
DBG(YELLOW"<pop> Queue is empty.\n"NONE);
//pthread_mutex_unlock(&Queue->mutex);
//return ;
pthread_cond_wait(&Queue->cond1, &Queue->mutex);
}
//Queue->data[Queue->head] = 0;
void* data = Queue->data[Queue->head];
Queue->total--;
DBG(BLUE"<pop> Queue pop successfully.\n"NONE);
if (++Queue->head == Queue->size) {
Queue->head = 0;
DBG(BLUE"<pop> Queue change head to zero.\n"NONE);
}
pthread_cond_signal(&Queue->cond2);
pthread_mutex_unlock(&Queue->mutex);
//sleep(10);
return data;
}
void *thread_run(void *arg) {
task_queue* Queue = (task_queue*) arg;
pthread_detach(pthread_self());
while (1) {
void* data = task_queue_pop(Queue);
DBG(GREEN"Got a task.\n"NONE);
printf("%s", data);
}
}
3. “head.h”
#ifndef _HEAD_H
#define _HEAD_H
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include "color.h"
#include "common.h"
#include "ThreadPool.h"
#ifdef _D
#define DBG(fmt, args...) printf(fmt, ##args)
#else
#define DBG(fmt, args...)
#endif
#endif
4. test demo
#include <stdio.h>
#include "./common/head.h"
#define INS 5
#define MAX 100
int main() {
FILE* fp;
pthread_t tix[INS];
char buff[MAX][1024];
task_queue* Queue = (task_queue*)malloc(sizeof(task_queue));
task_queue_init(Queue, MAX);
for (int i = 0; i < INS; i++) {
pthread_create(&tix[i],NULL, thread_run, (void*)Queue);
}
int cnt = 1;
while(cnt--) {
int sub = 0;
if ((fp = fopen("./a.txt", "r")) == NULL) {
perror("fopen");
exit(1);
}
int ind = 0;
while(fgets(buff[sub], 1024, fp) != NULL) {
task_queue_push(Queue, buff[sub]);
sleep(1);
if (++sub == MAX) sub = 0;
}
fclose(fp);
printf("==========\n");
}
return 0;
}