一、UnlockQueue声明
#ifndef _UNLOCK_QUEUE_H #define _UNLOCK_QUEUE_H class UnlockQueue { public: UnlockQueue(int nSize); virtual ~UnlockQueue(); bool Initialize(); unsigned int Put(const unsigned char *pBuffer, unsigned int nLen); unsigned int Get(unsigned char *pBuffer, unsigned int nLen); inline void Clean() { m_nIn = m_nOut = 0; } inline unsigned int GetDataLen() const { return m_nIn - m_nOut; } private: inline bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); }; inline unsigned long roundup_power_of_two(unsigned long val); private: unsigned char *m_pBuffer; /* the buffer holding the data */ unsigned int m_nSize; /* the size of the allocated buffer */ unsigned int m_nIn; /* data is added at offset (in % size) */ unsigned int m_nOut; /* data is extracted from off. (out % size) */ }; #endif
UnlockQueue与kfifo 结构相同相同,也是由一下变量组成:
UnlockQueue | kfifo | 作用 |
m_pBuffer | buffer | 用于存放数据的缓存 |
m_nSize | size | 缓冲区空间的大小,圆整为2的次幂 |
m_nIn | in | 指向buffer中队头 |
m_nOut | out | 指向buffer中的队尾 |
UnlockQueue的设计是用在单生产者单消费者情况下,所以不需要锁 | lock | 如果使用不能保证任何时间最多只有一个读线程和写线程,必须使用该lock实施同步。 |
二、UnlockQueue构造函数和初始化
UnlockQueue::UnlockQueue(int nSize) :m_pBuffer(NULL) ,m_nSize(nSize) ,m_nIn(0) ,m_nOut(0) { //round up to the next power of 2 if (!is_power_of_2(nSize)) { m_nSize = roundup_power_of_two(nSize); } } UnlockQueue::~UnlockQueue() { if(NULL != m_pBuffer) { delete[] m_pBuffer; m_pBuffer = NULL; } } bool UnlockQueue::Initialize() { m_pBuffer = new unsigned char[m_nSize]; if (!m_pBuffer) { return false; } m_nIn = m_nOut = 0; return true; } unsigned long UnlockQueue::roundup_power_of_two(unsigned long val) { if((val & (val-1)) == 0) return val; unsigned long maxulong = (unsigned long)((unsigned long)~0); unsigned long andv = ~(maxulong&(maxulong>>1)); while((andv & val) == 0) andv = andv>>1; return andv<<1; }
1.在构造函数中,对传入的size进行2的次幂圆整,圆整的好处是可以将m_nIn % m_nSize 可以转化为 m_nIn & (m_nSize – 1),取模运算”的效率并没有 “位运算” 的效率高。
2.在构造函数中,未给buffer分配内存,而在Initialize中分配,这样做的原因是:我们知道在new UnlockQueue的时候有两步操作,第一步分配内存,第二步调用构造函数,如果将buffer的分配放在构造函数中,那么就可能 buffer 就可能分配失败,而后面用到buffer,还需要判空。
三、UnlockQueue入队和出队操作
unsigned int UnlockQueue::Put(const unsigned char *buffer, unsigned int len) { unsigned int l; len = std::min(len, m_nSize - m_nIn + m_nOut); /* * Ensure that we sample the m_nOut index -before- we * start putting bytes into the UnlockQueue. */ __sync_synchronize(); /* first put the data starting from fifo->in to buffer end */ l = std::min(len, m_nSize - (m_nIn & (m_nSize - 1))); memcpy(m_pBuffer + (m_nIn & (m_nSize - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(m_pBuffer, buffer + l, len - l); /* * Ensure that we add the bytes to the kfifo -before- * we update the fifo->in index. */ __sync_synchronize(); m_nIn += len; return len; } unsigned int UnlockQueue::Get(unsigned char *buffer, unsigned int len) { unsigned int l; len = std::min(len, m_nIn - m_nOut); /* * Ensure that we sample the fifo->in index -before- we * start removing bytes from the kfifo. */ __sync_synchronize(); /* first get the data from fifo->out until the end of the buffer */ l = std::min(len, m_nSize - (m_nOut & (m_nSize - 1))); memcpy(buffer, m_pBuffer + (m_nOut & (m_nSize - 1)), l); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + l, m_pBuffer, len - l); /* * Ensure that we remove the bytes from the kfifo -before- * we update the fifo->out index. */ __sync_synchronize(); m_nOut += len; return len; }
入队和出队操作与kfifo相同,用到的技巧也完全相同,有不理解的童鞋可以参考前面一篇文章《眉目传情之匠心独运的kfifo》。这里需要指出的是__sync_synchronize()函数,由于linux并未开房出内存屏障函数,而在gcc4.2以上版本提供This
builtin issues a full memory barrier,有兴趣同学可以参考Built-in
functions for atomic memory access。
四、测试程序
如图所示,我们设计了两个线程,一个生产者随机生成学生信息放入队列,一个消费者从队列中取出学生信息并打印,可以看到整个代码是无锁的。
#include "UnlockQueue.h" #include <iostream> #include <algorithm> #include <pthread.h> #include <time.h> #include <stdio.h> #include <errno.h> #include <string.h> struct student_info { long stu_id; unsigned int age; unsigned int score; }; void print_student_info(const student_info *stu_info) { if(NULL == stu_info) return; printf("id:%ld\t",stu_info->stu_id); printf("age:%u\t",stu_info->age); printf("score:%u\n",stu_info->score); } student_info * get_student_info(time_t timer) { student_info *stu_info = (student_info *)malloc(sizeof(student_info)); if (!stu_info) { fprintf(stderr, "Failed to malloc memory.\n"); return NULL; } srand(timer); stu_info->stu_id = 10000 + rand() % 9999; stu_info->age = rand() % 30; stu_info->score = rand() % 101; //print_student_info(stu_info); return stu_info; } void * consumer_proc(void *arg) { UnlockQueue* queue = (UnlockQueue *)arg; student_info stu_info; while(1) { sleep(1); unsigned int len = queue->Get((unsigned char *)&stu_info, sizeof(student_info)); if(len > 0) { printf("------------------------------------------\n"); printf("UnlockQueue length: %u\n", queue->GetDataLen()); printf("Get a student\n"); print_student_info(&stu_info); printf("------------------------------------------\n"); } } return (void *)queue; } void * producer_proc(void *arg) { time_t cur_time; UnlockQueue *queue = (UnlockQueue*)arg; while(1) { time(&cur_time); srand(cur_time); int seed = rand() % 11111; printf("******************************************\n"); student_info *stu_info = get_student_info(cur_time + seed); printf("put a student info to queue.\n"); queue->Put( (unsigned char *)stu_info, sizeof(student_info)); free(stu_info); printf("UnlockQueue length: %u\n", queue->GetDataLen()); printf("******************************************\n"); sleep(1); } return (void *)queue; } int main() { UnlockQueue unlockQueue(1024); if(!unlockQueue.Initialize()) { return -1; } pthread_t consumer_tid, producer_tid; printf("multi thread test.......\n"); if(0 != pthread_create(&producer_tid, NULL, producer_proc, (void*)&unlockQueue)) { fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n", errno, strerror(errno)); return -1; } if(0 != pthread_create(&consumer_tid, NULL, consumer_proc, (void*)&unlockQueue)) { fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n", errno, strerror(errno)); return -1; } pthread_join(producer_tid, NULL); pthread_join(consumer_tid, NULL); return 0; }