同步是也是一种互斥,但需要通过一定机制实现访问者对资源的有序访问。下面是linux下线程同步的代码。线程2的先打印,然后才能轮到线程1打印。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
int value = 0;
sem_t sem1,sem2;
void *mythread();
void *mythread1()
{
int retval;
int i = 0;
for(i=0;i<10;i++)
{
retval = sem_wait(&sem1);
value = value + 1;
printf("in thread1 i=%d value1 = %d\n",i,value);
retval = sem_post(&sem2);
}
pthread_exit((void *) 0);
}
void *mythread2()
{
int retval;
int i= 0;
sleep(2);
for(i=0;i<10;i++)
{
retval = sem_wait(&sem2);
value = value + 1;
printf("in thread2 i=%d value1 = %d\n",i,value);
retval = sem_post(&sem1);
}
pthread_exit((void *) 0);
}
int main()
{
int retval;
pthread_t tid1;
pthread_t tid2;
retval = sem_init(&sem1,0,0);
retval = sem_init(&sem2,0,1);
retval =pthread_create(&tid1,NULL,mythread1,NULL);
retval =pthread_create(&tid2,NULL,mythread2,NULL);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
printf("value3 = %d\n",value);
return 0;
}
(二)其实线程的同步也可以用互斥锁或者其他的来实现。最终的目的是保证执行顺序。看看下面这段代码,3个文件,代码有点长。
/*queue.c*/
/*----------------------------------------------*
* 包含头文件 *
*----------------------------------------------*/
#include<queue.h>
#include <malloc.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <termios.h>
#include<linux/soundcard.h>
#include<poll.h>
#include<fcntl.h>
struct RING_BUFFER_QUEUE * QueueInit(int nMaxCount,int frame_size)
{
struct RING_BUFFER_QUEUE *queue =NULL;
struct RING_BUFFER_ITEM *ItemList = NULL;
int i =0;
if((0 ==nMaxCount)||(0==frame_size))
{
return -1;
}
queue = (struct RING_BUFFER_QUEUE *)malloc (sizeof(struct RING_BUFFER_QUEUE));
if(queue == NULL)
{
return -1;
}
queue->m_nMaxCount = nMaxCount ;
queue->m_nReadCount=0;
queue->m_nDelayCount=1;
queue->m_pHead = NULL;
queue->m_pRead = NULL;
queue->m_pWrite = NULL;
pthread_mutex_init(&queue->m_critical,NULL);
ItemList =(struct RING_BUFFER_ITEM *)malloc(nMaxCount * sizeof(RING_BUFFER_ITEM));
for(i=0;i<nMaxCount;i++)
{
memset(&(ItemList[i]),0,sizeof( RING_BUFFER_ITEM));
ItemList[i].data =(char *)malloc(frame_size);
if(i>0)
{
ItemList[i].prev=&(ItemList[i-1]);
ItemList[i-1].next=&(ItemList[i]);
}
if(i<(nMaxCount - 1))
{
ItemList[i].next=&(ItemList[i+1]);
ItemList[i+1].prev=&(ItemList[i]);
}
}
//link head and tail
ItemList[0].prev=&(ItemList[nMaxCount-1]);
ItemList[nMaxCount-1].next=&(ItemList[0]);
queue->m_pHead=&ItemList[0];
queue->m_pWrite = queue->m_pHead;
queue->m_pRead = queue->m_pHead;
return queue;
}
int QueueRelease(struct RING_BUFFER_QUEUE *queue)
{
RING_BUFFER_ITEM *p_Head = NULL;
int count = 0;
if(NULL == queue)
{
return -1;
}
p_Head = queue->m_pHead;
while (p_Head->data)
{
free(p_Head->data);
count ++;
p_Head= p_Head->next;
if((p_Head==queue->m_pHead)||(p_Head== NULL))
break;
}
if(count != queue->m_nMaxCount)
{
printf("release queue fail\n");
}
free(queue->m_pHead);
pthread_mutex_destroy(&queue->m_critical);
free(queue);
return 0;
}
int QueueWrite(struct RING_BUFFER_QUEUE *queue ,char * data, int size, struct timeval tv)
{
if((NULL == data )||(NULL== queue)||(0 == size))
{
return -1;
}
pthread_mutex_lock(&queue->m_critical);
if(queue->m_nReadCount > queue->m_nMaxCount)
{
pthread_mutex_unlock(&queue->m_critical);
return -1;
}
memcpy(queue->m_pWrite->data,data ,size);
queue->m_pWrite->size = size;
memcpy(&queue->m_pWrite->tv,&tv,sizeof(struct timeval));
queue->m_pWrite=queue->m_pWrite->next;
if(queue->m_pWrite == queue->m_pRead)
{
printf("put queue is full\n");
//queue->m_pRead = queue->m_pRead->next;
}
queue->m_nReadCount++;
pthread_mutex_unlock(&queue->m_critical);
return 0;
}
int QueueRead(struct RING_BUFFER_QUEUE * queue, char * data, int size, struct timeval * tv)
{
int len =0 ;
if((NULL == data )||(0 == size))
{
return -1;
}
pthread_mutex_lock(&queue->m_critical);
if ((queue->m_nReadCount < queue->m_nDelayCount)||(queue->m_nReadCount== 0))
{
pthread_mutex_unlock(&queue->m_critical);
return 0;
}
len = queue->m_pRead->size;
if(len > size)
{
len = size ;
}
memcpy(data,queue->m_pRead->data,len);
memcpy(tv,&(queue->m_pRead->tv),sizeof(struct timeval));
queue->m_pRead =queue->m_pRead->next;
queue->m_nReadCount --;
pthread_mutex_unlock(&queue->m_critical);
return len;
}
int CanRead(struct RING_BUFFER_QUEUE *queue)
{
int ret= 0;
pthread_mutex_lock(&queue->m_critical);
ret=queue->m_nReadCount;
pthread_mutex_unlock(&queue->m_critical);
return ret ;
}
void QueueSetDelayCount(struct RING_BUFFER_QUEUE *queue, int nCount)
{
if(NULL == queue)
{
return ;
}
pthread_mutex_lock(&queue->m_critical);
if(nCount >= queue->m_nMaxCount)
nCount=queue->m_nMaxCount -1;
queue->m_nDelayCount=nCount;
pthread_mutex_unlock(&queue->m_critical);
}
int QueueGetDelayCount(struct RING_BUFFER_QUEUE *queue)
{
int ret;
if(NULL == queue)
{
return -1;
}
pthread_mutex_lock(&queue->m_critical);
ret=queue->m_nDelayCount;
pthread_mutex_unlock(&queue->m_critical);
return ret;
}
int GetSize(struct RING_BUFFER_QUEUE *queue)
{
if(NULL == queue)
{
return -1;
}
return queue->m_nMaxCount;
}
/*************************queue.h***************/
#include<time.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <termios.h>
#include<linux/soundcard.h>
#include<poll.h>
#include<fcntl.h>
/*----------------------------------------------*
* 外部变量说明 *
*----------------------------------------------*/
#define FRAME_MAX_SIZE 1024
#define QUEUE_MAX_COUNT 40
typedef struct RING_BUFFER_ITEM
{
char *data; /*video or audio data*/
int size; /*video or audio data*/
//int flag;
struct timeval tv; /*timestap*/
struct RING_BUFFER_ITEM *next;
struct RING_BUFFER_ITEM *prev;
}RING_BUFFER_ITEM;
typedef struct RING_BUFFER_QUEUE
{
int m_nMaxCount; /*队列的元素最大个数*/
//int m_nSize;
int m_nReadCount; /*队列中可读的元素个数*/
int m_nDelayCount; /*队列中的最低阀值*/
RING_BUFFER_ITEM *m_pHead; /**/
RING_BUFFER_ITEM *m_pWrite;
RING_BUFFER_ITEM *m_pRead;
pthread_mutex_t m_critical; /*lock*/
}RING_BUFFER_QUEUE;
struct RING_BUFFER_QUEUE * QueueInit(int nMaxCount,int frame_size);
int QueueWrite(struct RING_BUFFER_QUEUE *queue,char *data ,int size , struct timeval tv);
int QueueRead(struct RING_BUFFER_QUEUE *queue,char * data, int size, struct timeval *tv);
int QueueRelease(struct RING_BUFFER_QUEUE *queue);
//int QueueGetReadCount(void);
void QueueSetDelayCount(struct RING_BUFFER_QUEUE *queue,int nCount);
int QueueGetDelayCount(struct RING_BUFFER_QUEUE *queue);
int CanRead(struct RING_BUFFER_QUEUE *queue);
/**********************实例程序************/
//main.c
#include"queue.h"
#include <malloc.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <termios.h>
#include<linux/soundcard.h>
#include<poll.h>
#include<fcntl.h>
#include<time.h>
void *WriteQueue(void *param)
{
int i =0;
int ret = 0;
char buf[100]={0};
struct timeval tv;
struct RING_BUFFER_QUEUE *Audio_Queue= NULL;
Audio_Queue= ( struct RING_BUFFER_QUEUE*)param;
for(i=0;i<40;i++)
{
memset(buf,0,100);
sprintf(buf,"hello,queue%d\n",i);
// printf("thread2:i=%d read_count=%d\n",i,CanRead(Audio_Queue));
gettimeofday(&tv, NULL);
ret = QueueWrite( Audio_Queue, buf, 100, tv);
if(ret < 0)
{
printf("the queue is full,write fail\n");
}
sleep(1);
}
printf("thread2 is exit\n");
}
void *ReadQueue(void *param)
{
int i =0;
int readlen =0 ;
char buf[100]={0};
struct timeval tv;
struct RING_BUFFER_QUEUE *Audio_Queue =NULL;
Audio_Queue= ( struct RING_BUFFER_QUEUE*)param;
while(!CanRead(Audio_Queue));
for(i=0 ;i<50;i++)
{
memset(buf,0,100);
readlen=QueueRead( Audio_Queue, buf, 100, &tv);
if(readlen<=0)
{
printf("queue readcount =%d\n",CanRead(Audio_Queue));
if(!CanRead(Audio_Queue))
{
usleep(1000*1000);
printf("the queue is null,break\n");
continue;
// break;
}
}
printf("read:buf=%s\n",buf);
sleep(2);
}
printf("the thread1 is exit\n");
}
int main(char argc ,char *argv[])
{
int nMaxCount = 20;
char buf[100]={0};
int i =0 ;
int ret =0 ;
pthread_t thread_writeid1;
pthread_t thread_readid2;
struct RING_BUFFER_QUEUE *Audio_Queue =NULL;
struct timeval tv;
Audio_Queue = QueueInit(QUEUE_MAX_COUNT,FRAME_MAX_SIZE);
if(NULL == Audio_Queue)
{
printf("create audio queue fail\n");
return -1;
}
//QueueSetDelayCount(Audio_Queue, 2);
ret =pthread_create(&thread_writeid1, NULL, WriteQueue, (void *)Audio_Queue);
if(0== ret)
{
printf("create pthread writeQueue success\n");
}
if(0==pthread_create(&thread_readid2, NULL, ReadQueue, (void *)Audio_Queue))
{
printf("create pthread ReadQueue success\n");
}
pthread_join(thread_writeid1,NULL);
pthread_join(thread_readid2,NULL);
QueueRelease(Audio_Queue);
printf("the audio queue is release success\n");
return 0;
}
/*******************Makefile******/
SRCS =$(wildcard *.c)
OBJS =$(SRCS:.c = .o)
TARGET =my_app
CC= gcc
INCLUDES = -I ./
LIBS = -lpthread
CCFLAGS = -g -Wall -O2
$(TARGET):$(OBJS)
$(CC) $^ -o $@ $(INCLUDES) $(LIBS)
%.o:%.c
$(CC)-c $< $(CCFLAGS)
clean:
rm -rf *.o *.bak
rm -rf $(TARGET)
/**********************************/
结果分析:在实例程序中,首先创建在了一个队列,然后创建了两个线程,一个读线程一个写线程。从代码上来看,两个线程肯定是互斥的。WriteQueue线程写入,ReadQueue线程读取,在读取线程里面先判断队列里面是否有数据写入,没有等待。如果读取太快,则延时1s后再去读取。从而保证一次读取写入队列中的数据。实现数据的同步。