本文共 9994 字,大约阅读时间需要 33 分钟。
本文基于顺序循环队列,给出Linux生产者/消费者问题的多线程示例,并探讨编程时需要注意的事项。文中涉及的代码运行环境如下:
本文假定读者已具备线程同步的基础知识。
队列是一种运算受限的先进先出线性表,仅允许在队尾插入(入队),在队首删除(出队)。新元素入队后成为新的队尾元素,元素出队后其后继元素就成为队首元素。
队列的顺序存储结构使用一个数组和两个整型变量实现,其结构如下:
即利用数组elem顺序存储队列中的所有元素,利用整型变量head和tail分别存储队首元素和队尾(或队尾下一个)元素的下标位置,分别称为队首指针和队尾指针。MaxSize指定顺序存储队列的最大长度,即队列中最多能够存储的元素个数。当队列的顺序存储空间静态分配时,MaxSize通常为宏定义;若动态分配时,还可为全局或局部变量。
单向顺序队列存在“假溢出”问题,即多次入队和出队操作后,队首空余许多位置而队尾指针指向队列中最后一个元素位置,从而无法使新的数据元素入队。假溢出本质在于队首和队尾指针值不能由所定义数组下界值自动转为数组上界值,解决办法是将顺序队列构造成一个逻辑上首尾相连的循环表。当队列的第MaxSize-1个位置被占用后,只要队首还有可用空间,则把新的数据元素插入队列第0号位置。因此,顺序队列通常都采用顺序循环队列结构。
下图所示为MaxSize=4的循环队列三种状态图:
其中,为区分队空与队满,在入队时少用一个数据(图中下标为3的位置)。本文约定队首指针head指向队首元素,队尾指针tail指向队尾元素的下一个元素,以队尾指针加1等于队首指针判断队满,即:
%为取模运算符,循环队列利用数学上的取模运算将首尾巧妙相连。
当约定head指向队首前一个元素,tail指向队尾元素时,元素数目仍满足上面的公式。当约定head指向队首元素,tail指向队尾元素时,元素数目为(tail - head + 1 + MaxSize) % MaxSize。
此外,也可另设一个标志以区别队空和队满。该标志可为布尔型空满标志位,也可为队列中元素个数。
通过队首元素位置和队列中元素个数,可计算出队尾元素所在位置。亦即,可用队列中元素个数代替队尾指针(或队首指针)。此时,队列满足:
本节将采用C语言实现一个简单但却标准的顺序循环队列函数集。
首先定义循环队列结构如下:
即:typedef struct{int aData[QUEUE_SIZE]; int dwHead; int dwTail; }T_QUEUE, *PT_QUEUE;
为求简单性,元素类型假定为整型。
基于上述结构,定义初始化、入队出队和显示函数:
然后提供判空、判满、查询等辅助函数:
最后,通过QueueTest()函数来测试队列函数集:
static T_QUEUE gtQueue;void QueueTest(void){InitQue(>Queue);printf("Enter Queue 1,2,3,4,5...\n");EnterQue(>Queue, 1);EnterQue(>Queue, 2);EnterQue(>Queue, 3);EnterQue(>Queue, 4);EnterQue(>Queue, 5);printf("Queue Status: Empty(%d), Full(%d)\n", IsQueEmpty(>Queue), IsQueFull(>Queue));printf("Queue Elem Num: %u\n", QueDataNum(>Queue));printf("Head: %u(%d)\n", GetQueHead(>Queue), GetQueHeadData(>Queue));printf("Tail: %u\n", GetQueTail(>Queue));DisplayQue(>Queue);printf("\nLeave Queue...\n");printf("Leave %d\n", LeaveQue(>Queue));printf("Leave %d\n", LeaveQue(>Queue));printf("Leave %d\n", LeaveQue(>Queue));DisplayQue(>Queue);printf("\nEnter Queue 6,7...\n");EnterQue(>Queue, 6);EnterQue(>Queue, 7);DisplayQue(>Queue);printf("\nLeave Queue...\n");printf("Leave %d\n", LeaveQue(>Queue));printf("Leave %d\n", LeaveQue(>Queue));printf("Leave %d\n", LeaveQue(>Queue));DisplayQue(>Queue);}
编译链接后,运行结果如下:
Enter Queue 1,2,3,4,5...Elem 5 cannot enter Queue 0x8053f9c(Full)!
Queue Status: Empty(0), Full(1)Queue Elem Num: 4Head: 0(1)Tail: 4Queue Element: 1 2 3 4Leave Queue...Leave 1Leave 2Leave 3Queue Element: 4Enter Queue 6,7...Queue Element: 4 6 7Leave Queue...Leave 4Leave 6Leave 7Queue 0x8053f9c is Empty!
本节将讨论Linux并发编程中经典的生产者/消费者(producer-consumer)问题。该问题涉及一个大小限定的有界缓冲区(bounded buffer)和两类线程或进程(生产者和消费者)。
在缓冲区中有可用空间时,一个或一组生产者(线程或进程)将创建的产品(数据条目)放入缓冲区,然后由一个或一组消费者(线程或进程)提取这些产品。产品在生产者和消费者之间通过某种类型的IPC传递。
在多个进程间共享一个公共数据缓冲区需要某种形式的共享内存区(如存储映射或共享内存),而多线程天然地共享存储空间。为简便起见,本节的讨论仅限于多线程。
多个生产者和消费者线程的场景如下图所示:
其中,生产者线程必须在缓冲区中有可用空间后才能向其中放置内容,否则将阻塞(进入休眠状态)直到出现下一个可用的空位置。生产者线程可使用互斥量原子性地检查缓冲区,而不受其他线程干扰。当发现缓冲区已满后,生产者阻塞自己并在缓冲区变为非满时被唤醒,这可由条件变量实现。
消费者线程必须在生产者向缓冲区中写入之后才能从中提取内容。同理,可用互斥量和条件变量以无竞争的方式等待缓冲区由空变为非空。
本节将采用队列模拟任务,给出生产者/消费者问题的多线程示例。
为简单起见,生产者将队列元素下标作为元素值入队,消费者使元素出队并验证元素值正确性。
首先定义一组全局数据:
T_QUEUE gtQueue;pthread_mutex_t gtQueLock = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t gtPrdCond = PTHREAD_COND_INITIALIZER; //Full->Not Fullpthread_cond_t gtCsmCond = PTHREAD_COND_INITIALIZER; //Empty->Not Empty
此处,QUEUE_SIZE按照产品数重新定义。互斥量gtQueLock用于保护全局队列gtQueue和两个条件变量。条件变量gtPrdCond用于队列由满变为非满时通知(唤醒)生产者线程,而gtCsmCond用于队列由空变为非空时通知消费者线程。
若首先创建并启动生产者线程,再立即或稍候创建消费者线程,则不需要条件变量gtCsmCond(队列中始终有产品)。本节为全面展现线程间的同步,约定消费者线程创建和启动完毕之后,再创建生产者线程。这样,所有消费者线程将会阻塞,在条件变量gtCsmCond的线程列表里等待条件状态的改变。生产者线程启动并开始“产出”后,广播通知所有消费者线程。反之,因为消费者线程不会全部阻塞,可单播唤醒某个消费者。
初始化队列和创建线程的主函数如下:
int main(void){InitQue(>Queue);srand(getpid());pthread_t aThrd[CONSUMER_NUM+PRODUCER_NUM];int dwThrdIdx;for(dwThrdIdx = 0; dwThrdIdx < CONSUMER_NUM; dwThrdIdx++){pthread_create(&aThrd[dwThrdIdx], NULL, ConsumerThread, (void*)dwThrdIdx);}sleep(2);for(dwThrdIdx = 0; dwThrdIdx < PRODUCER_NUM; dwThrdIdx++){pthread_create(&aThrd[dwThrdIdx+CONSUMER_NUM], NULL, ProducerThread, (void*)dwThrdIdx);}while(1);return 0 ;}
生产者线程启动例程ProducerThread()实现如下:
void *ProducerThread(void *pvArg){pthread_detach(pthread_self());int dwThrdNo = (int)pvArg;while(1){pthread_mutex_lock(>QueLock);while(IsQueFull(>Queue)) //队列由满变为非满时,生产者线程被唤醒pthread_cond_wait(>PrdCond, >QueLock);EnterQue(>Queue, GetQueTail(>Queue)); //将队列元素下标作为元素值入队if(QueDataNum(>Queue) == 1) //当生产者开始产出后,通知(唤醒)消费者线程pthread_cond_broadcast(>CsmCond);printf("[Producer %2u]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue));pthread_mutex_unlock(>QueLock);sleep(rand()%DELAY_TIME + 1);}}
队列变满时,生产者线程进入休眠状态。消费者线程取出产品,将队列由满变为非满时,生产者线程再次被唤醒。
消费者线程启动例程ConsumerThread()实现如下:
void *ConsumerThread(void *pvArg){pthread_detach(pthread_self());int dwThrdNo = (int)pvArg;while(1){pthread_mutex_lock(>QueLock);while(IsQueEmpty(>Queue)) //队列由空变为非空时,消费者线程将被唤醒pthread_cond_wait(>CsmCond, >QueLock);if(GetQueHead(>Queue) != GetQueHeadData(>Queue)) //当队列元素值不符合期望时{printf("[Consumer %2u]Product: %d, Expect: %d\n", dwThrdNo, GetQueHead(>Queue), GetQueHeadData(>Queue));exit(0);}LeaveQue(>Queue); //从循环队列中取出元素if(QueDataNum(>Queue) == (PRD_NUM-1)) //当队列由满变为非满时,通知(唤醒)生产者线程pthread_cond_broadcast(>PrdCond);printf("[Consumer %2u]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue));pthread_mutex_unlock(>QueLock);sleep(rand()%DELAY_TIME + 1);}}
编译链接后,截取部分运行结果如下:
[Producer 4]Current Product Num: 1[Consumer 1]Current Product Num: 0[Producer 3]Current Product Num: 1[Consumer 0]Current Product Num: 0[Producer 2]Current Product Num: 1[Consumer 2]Current Product Num: 0[Producer 1]Current Product Num: 1[Producer 0]Current Product Num: 2... ... [Consumer 0]Current Product Num: 17[Producer 3]Current Product Num: 18[Producer 1]Current Product Num: 19[Consumer 2]Current Product Num: 18[Producer 4]Current Product Num: 19[Producer 2]Current Product Num: 20[Consumer 1]Current Product Num: 19[Consumer 2]Current Product Num: 18[Producer 0]Current Product Num: 19[Producer 4]Current Product Num: 20//Ctrl + C
本节基于生产者/消费者问题的多线程实现,讨论一些编程注意事项。为简化书写,省略线程相关函数名的前缀"pthread_"。
互斥量实际上保护的是临界区中被多个线程或进程共享的数据(shared data)。此外,互斥量是协作性(cooperative)锁,无法禁止绕过这种机制的访问。例如,若共享数据为一个链表,则操作该链表的所有线程都应该在实际操作前获取该互斥量(但无法防止某个线程不首先获取该互斥量就操作链表)。
对本文实现而言,互斥量gtQueLock保护全局队列gtQueue。但因为各生产者(或消费者)线程启动例程相同,内部对该队列的操作逻辑和顺序相同,故gtQueLock看起来在对队列函数加锁。进而,在生产者与消费者线程之间,均在加锁期间操作队列,因此对队列函数加锁近似等效于对队列数据加锁。但仍需认识到,这种协作对于频繁调用的基础性函数(如库函数)而言并不现实。当某些调用未经加锁直接操作时,对于其他使用互斥量的调用而言,互斥保护就失去意义。本文也可考虑在队列函数集内加锁(互斥量或读写锁),但因为互斥量gtQueLock同时还保护条件变量,队列内再加锁就稍显复杂,而且线程内互斥量同名时容易产生死锁。
因此,设计时最好约定所有线程遵守相同的数据访问规则。
应尽量减少由一个互斥量锁住的代码量,以增强并发性。例如,生产者线程启动例程ProducerThread()中,对pvArg进行赋值操作时并不需要互斥量保护(not属于 critical section)。再者,若各生产者线程之间与生产者/消费者线程之间共享不同的数据,则可使用两个互斥量,但复杂度也随之上升。
多线程软件设计经常要考虑加锁粒度的折中。若使用粗粒度锁定(coarse-grained locking),即长期持有锁定以便尽可能降低获取和释放锁的开销,则可能有很多线程阻塞等待相同的锁,从而降低并发性;若使用细粒度锁定(fine-grained locking),即仅在必要时锁定并在不必要时尽快解锁以便尽可能提高并发性,则较多的锁开销可能会降低系统性能,而且代码变得相当复杂。因此,设计时应划分适当数目的锁来保护数据,以在代码复杂性和性能优化之间找好平衡点。
条件本身是由互斥量保护的。当线程调用cond_wait()函数基于条件变量阻塞之前,应先锁定互斥量以避免线程间的竞争和饥饿情况(否则将导致未定义的行为)。
调用cond_wait()时,线程将锁定的互斥量传递给该函数,对条件进行保护。该函数将调用线程放到等待条件的线程列表上,然后对互斥量解锁。这两个操作是原子性的,即关闭了条件检查和线程进入休眠状态等待条件改变这两个操作之间的时间通道,这样线程就不会错过条件的任何变化。该函数对互斥量解锁后,其它线程可以获得加锁的权利,并访问和修改临界资源(条件状态)。一旦其它某个线程改变了条件使其为真,该线程将通知相应的条件变量唤醒一个或多个正被该条件变量阻塞的线程。cond_wait()返回时,互斥量再次被锁定并被调用线程拥有,即使返回错误时也是如此。
可见,cond_wait()函数解锁和阻塞调用线程的“原子性”针对其他线程访问互斥量和条件变量而言。若线程B在即将阻塞的线程A解锁之后获得互斥量,则线程B随后调用cond_signal/broadcast()函数通告条件状态变化时,该信号必然在线程A阻塞之后发出(否则将遗漏该信号)。
使用多个互斥量保护基于相同条件变量的cond_wait()调用时,其效果未定义。当线程基于条件变量阻塞时,该条件变量绑定到唯一的互斥量上,且这种(动态)绑定将在cond_wait()调用返回时结束。
调用cond_signal()唤醒被阻塞在条件变量上的至少一个线程。在多处理器上,该函数在不同处理器上同时单播信号时可能唤醒多个线程。当多个线程阻塞在条件变量上时,哪个或哪些线程被唤醒由线程的调度策略(scheduling policy)所决定。
cond_broadcast()会唤醒阻塞在条件变量上的所有线程。这些线程被唤醒后将再次竞争相应的互斥量。唤醒多个线程的典型场景是读出者与写入者问题。当一个写入者完成访问并释放相应的锁后,它希望唤醒所有正在排队的读出者,因为同时允许多个读出者访问。
若已确定只有一个等待者需要唤醒,且唤醒哪个等待者无关紧要,则可使用单播发送;所有其他情况下都应使用广播发送(更为安全)。
若当前没有任何线程基于条件变量阻塞,则调用cond_signal/broadcast()不起作用。
调用cond_signal/broadcast()时若无线程等待条件变量,则该信号将被丢失。因此,发送信号前最好检查下是否有正在等待的线程。例如,可维护一个等待线程计数器,在触发条件变量前检查该计数器。本文线程代码内对队列元素数目的检查与之类似。
对于等待条件的线程而言,若错失信号(如启动过迟),则会一直阻塞到其它线程再次发送信号到该条件变量。
ProducerThread()中,cond_broadcast()函数由当前锁住互斥量的生产者线程调用,本函数将发送信号给该互斥量所关联的条件变量。当该条件变量被发送信号后,系统立即调度等待在其上的消费者线程;该线程开始运行后试图获取互斥量,但该互斥量仍由生产者线程所持有。因此被唤醒的消费者线程被迫进入休眠状态,直至生产者线程释放互斥量后再次被唤醒。这种不必要的上下文切换可能会严重影响性能。
为避免这种加锁冲突(以提高效率),可将判断队列元素数目的语句值赋给一个局部变量bHasOnePrd,直到释放互斥量gtQueLock后才判断bHasOnePrd并向与之关联的条件变量gtCsmCond发送信号。
Posix标准规定,调用cond_signal/broadcast()的线程不必是与之关联的互斥量的当前拥有者,即允许在释放互斥量后才给与之关联的条件变量发送信号。若程序不关心线程可预知的调度行为,最好在锁定区域以外调用cond_signal/broadcast()。
当只有一个生产者和一个消费者时,通过谨慎操作队列可避免线程间的原子性同步。例如,生产者线程仅更新队尾指针,消费者线程仅更新队首指针。简化的示例如下:
volatile unsigned int gdwPrdNum = 0, gdwCsmNum = 0;int gQueue[QUEUE_SIZE] = {0};
void *Producer(void *pvArg){while(1){while(gdwPrdNum - gdwCsmNum == QUEUE_SIZE);gQueue[gdwPrdNum % QUEUE_SIZE]++;gdwPrdNum++;}pthread_exit(0);}
void *Consumer(void *pvArg){while(1){while(gdwPrdNum - gdwCsmNum == 0);gQueue[gdwCsmNum % QUEUE_SIZE]--;gdwCsmNum++;}pthread_exit(0);}
该例中使用轮询(polling)方式,可在不依赖互斥量和条件变量的情况下高效地共享数据。判空和判满的循环保证两个线程不可能同时操作同一个队列元素。当线程被取消时,可观察到队列中元素值为全零(若无循环则为随机值)。
另一个实例则可参考《》一文。
当函数sem_wait()和sem_post()用于线程内时,两个调用间的区域就是所要保护的临界区代码;当用于线程间时,则与条件变量等效。
此外,信号量还可用作资源计数器,即初始化信号量的值作为某个资源当前可用的数量,使用时递减释放时递增。这样,原先一些保存队列状态的变量都不再需要。
最后,内核会记录信号的存在,不会将信号丢失;而唤醒条件变量时若没有线程在等待该条件变量,信号将被丢失。
转载地址:http://flkfk.baihongyu.com/