现在的位置: 首页 > 综合 > 正文

linux下线程的同步

2013年09月15日 ⁄ 综合 ⁄ 共 10359字 ⁄ 字号 评论关闭

    同步是也是一种互斥,但需要通过一定机制实现访问者对资源有序访问。下面是linux下线程同步的代码。线程2的先打印,然后才能轮到线程1打印。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
int value = 0;
sem_t sem1,sem2;

void *mythread();
void *mythread1()
{
int retval;
int i = 0;
for(i=0;i<10;i++)
{
retval = sem_wait(&sem1);
value = value + 1;
printf("in thread1 i=%d value1 = %d\n",i,value);
retval = sem_post(&sem2);
}
pthread_exit((void *) 0);
}
void *mythread2()
{
int retval;
int i= 0;
sleep(2);
for(i=0;i<10;i++)
{
retval = sem_wait(&sem2);
value = value + 1;
printf("in thread2 i=%d value1 = %d\n",i,value);
retval = sem_post(&sem1);
}
pthread_exit((void *) 0);
}
int main()
{
int retval;
pthread_t tid1;
pthread_t tid2;
retval = sem_init(&sem1,0,0);
retval = sem_init(&sem2,0,1);
retval =pthread_create(&tid1,NULL,mythread1,NULL);
retval =pthread_create(&tid2,NULL,mythread2,NULL);

pthread_join(tid1,NULL);

pthread_join(tid2,NULL);

printf("value3 = %d\n",value);
return 0;
}

 (二)其实线程的同步也可以用互斥锁或者其他的来实现。最终的目的是保证执行顺序。看看下面这段代码,3个文件,代码有点长。

 /*queue.c*/

/*----------------------------------------------*
 * 包含头文件                                   *
 *----------------------------------------------*/
#include<queue.h>
#include <malloc.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/un.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <termios.h>
#include<linux/soundcard.h>
#include<poll.h>
#include<fcntl.h>

struct  RING_BUFFER_QUEUE  * QueueInit(int nMaxCount,int frame_size)
{
       struct  RING_BUFFER_QUEUE  *queue =NULL;
       struct RING_BUFFER_ITEM *ItemList = NULL;
       int i =0;
       if((0 ==nMaxCount)||(0==frame_size))
        {
            return -1;
        }
 queue = (struct RING_BUFFER_QUEUE *)malloc (sizeof(struct RING_BUFFER_QUEUE));
        if(queue  == NULL)
        {
            return -1;
        }

        queue->m_nMaxCount = nMaxCount ;
        queue->m_nReadCount=0;
  queue->m_nDelayCount=1;
 
       queue->m_pHead = NULL;
       queue->m_pRead = NULL;
       queue->m_pWrite = NULL;
       pthread_mutex_init(&queue->m_critical,NULL);
      
       ItemList  =(struct RING_BUFFER_ITEM *)malloc(nMaxCount * sizeof(RING_BUFFER_ITEM));

 for(i=0;i<nMaxCount;i++)
 {
      memset(&(ItemList[i]),0,sizeof( RING_BUFFER_ITEM));
             ItemList[i].data =(char *)malloc(frame_size);
  if(i>0)
  {   
   ItemList[i].prev=&(ItemList[i-1]);
   ItemList[i-1].next=&(ItemList[i]);
  }

  if(i<(nMaxCount - 1))
  {
   ItemList[i].next=&(ItemList[i+1]);
   ItemList[i+1].prev=&(ItemList[i]);
  }
 }

 //link head and tail
 ItemList[0].prev=&(ItemList[nMaxCount-1]);
 ItemList[nMaxCount-1].next=&(ItemList[0]);

     queue->m_pHead=&ItemList[0];
     queue->m_pWrite = queue->m_pHead;
     queue->m_pRead  = queue->m_pHead;
         return queue;
}
int QueueRelease(struct RING_BUFFER_QUEUE *queue)
{
   RING_BUFFER_ITEM *p_Head = NULL;
   int count = 0;
   
    if(NULL == queue)
     {
        return -1;
     }
   
     p_Head = queue->m_pHead;
   
    while (p_Head->data)
    {
          free(p_Head->data);
          count ++;
          p_Head= p_Head->next;
          if((p_Head==queue->m_pHead)||(p_Head== NULL))
   break;
    }
    if(count != queue->m_nMaxCount)
     {
          printf("release queue fail\n");
    }
   
    free(queue->m_pHead);
    pthread_mutex_destroy(&queue->m_critical);
    free(queue);
    
    return 0;
}
int QueueWrite(struct RING_BUFFER_QUEUE *queue ,char * data, int size, struct timeval tv)
{

    if((NULL == data )||(NULL== queue)||(0 == size))
    {
        return -1;
    }

    pthread_mutex_lock(&queue->m_critical);
      if(queue->m_nReadCount > queue->m_nMaxCount)
    {
        pthread_mutex_unlock(&queue->m_critical);
        return -1; 
    }
    memcpy(queue->m_pWrite->data,data ,size);
    queue->m_pWrite->size  = size;
    memcpy(&queue->m_pWrite->tv,&tv,sizeof(struct timeval));

    queue->m_pWrite=queue->m_pWrite->next;
   
    if(queue->m_pWrite  == queue->m_pRead)
    {
         printf("put queue is full\n");
        //queue->m_pRead = queue->m_pRead->next;
    }

     queue->m_nReadCount++; 
    pthread_mutex_unlock(&queue->m_critical);
    return  0;
}
int QueueRead(struct RING_BUFFER_QUEUE * queue, char * data, int size, struct timeval * tv)
{
    int len =0 ;
   
    if((NULL == data )||(0 == size))
    {
        return -1;
    }

    pthread_mutex_lock(&queue->m_critical);
    if ((queue->m_nReadCount < queue->m_nDelayCount)||(queue->m_nReadCount== 0))
    {
        pthread_mutex_unlock(&queue->m_critical);
        return 0;
    }

    len = queue->m_pRead->size;
    if(len  > size)
    {
        len = size ;
    }

    memcpy(data,queue->m_pRead->data,len);
    memcpy(tv,&(queue->m_pRead->tv),sizeof(struct timeval));
    queue->m_pRead =queue->m_pRead->next;
    queue->m_nReadCount --;
    pthread_mutex_unlock(&queue->m_critical);

    return len;
}
int  CanRead(struct RING_BUFFER_QUEUE *queue)
{
 int ret= 0;

 pthread_mutex_lock(&queue->m_critical);
 ret=queue->m_nReadCount;
 pthread_mutex_unlock(&queue->m_critical);

 return ret ;
}

void QueueSetDelayCount(struct RING_BUFFER_QUEUE *queue, int nCount)
{

     if(NULL == queue)
     {
        return ;
     }
    
 pthread_mutex_lock(&queue->m_critical);

 if(nCount >= queue->m_nMaxCount)
  nCount=queue->m_nMaxCount -1;

 queue->m_nDelayCount=nCount;

 pthread_mutex_unlock(&queue->m_critical);
}

int  QueueGetDelayCount(struct RING_BUFFER_QUEUE *queue)
{
    int ret;

    if(NULL == queue)
    {
        return -1;
    }

    pthread_mutex_lock(&queue->m_critical);
    ret=queue->m_nDelayCount;
    pthread_mutex_unlock(&queue->m_critical);

    return ret;
}

int  GetSize(struct RING_BUFFER_QUEUE *queue)
{
       if(NULL == queue)
     {
        return -1;
     }
 return queue->m_nMaxCount;
}

/*************************queue.h***************/

#include<time.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/un.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <termios.h>
#include<linux/soundcard.h>
#include<poll.h>
#include<fcntl.h>
/*----------------------------------------------*
 * 外部变量说明                                 *
 *----------------------------------------------*/
 #define  FRAME_MAX_SIZE   1024
 #define  QUEUE_MAX_COUNT 40
typedef struct RING_BUFFER_ITEM
{
 char *data;                                     /*video or audio data*/
 int  size;                                         /*video or audio data*/
 //int  flag;
 struct timeval tv;                              /*timestap*/
 struct RING_BUFFER_ITEM *next;
 struct RING_BUFFER_ITEM *prev;
}RING_BUFFER_ITEM;

 typedef struct  RING_BUFFER_QUEUE
  { 
 int m_nMaxCount;                          /*队列的元素最大个数*/
 //int m_nSize;
 int m_nReadCount;                        /*队列中可读的元素个数*/
 int m_nDelayCount;                       /*队列中的最低阀值*/
       RING_BUFFER_ITEM *m_pHead;      /**/
 RING_BUFFER_ITEM *m_pWrite;
 RING_BUFFER_ITEM *m_pRead;
      pthread_mutex_t m_critical;            /*lock*/
 }RING_BUFFER_QUEUE;

struct  RING_BUFFER_QUEUE  * QueueInit(int nMaxCount,int frame_size);
int  QueueWrite(struct RING_BUFFER_QUEUE *queue,char *data ,int size , struct timeval tv);
int QueueRead(struct RING_BUFFER_QUEUE *queue,char * data, int size, struct timeval *tv);
int  QueueRelease(struct RING_BUFFER_QUEUE *queue);
//int QueueGetReadCount(void);
void QueueSetDelayCount(struct RING_BUFFER_QUEUE *queue,int nCount);
int  QueueGetDelayCount(struct RING_BUFFER_QUEUE *queue);
int  CanRead(struct RING_BUFFER_QUEUE *queue);   

/**********************实例程序************/

//main.c

#include"queue.h"
#include <malloc.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <termios.h>
#include<linux/soundcard.h>
#include<poll.h>
#include<fcntl.h>
#include<time.h>
void  *WriteQueue(void *param)
{
 
    int i =0;
    int ret = 0;
    char buf[100]={0};
    struct timeval  tv;
    struct RING_BUFFER_QUEUE *Audio_Queue= NULL;

     Audio_Queue= (  struct RING_BUFFER_QUEUE*)param;
     for(i=0;i<40;i++)
    {
            memset(buf,0,100);
           sprintf(buf,"hello,queue%d\n",i);
         //  printf("thread2:i=%d read_count=%d\n",i,CanRead(Audio_Queue));
           gettimeofday(&tv, NULL);
         ret = QueueWrite( Audio_Queue, buf, 100, tv);
          if(ret < 0)
            {
                 printf("the queue is full,write fail\n");
            }
          sleep(1);
    }
     printf("thread2 is exit\n");
}

void *ReadQueue(void *param)
{
   
    int i =0;
    int readlen =0 ;
    char buf[100]={0};
    struct timeval  tv;
    struct RING_BUFFER_QUEUE *Audio_Queue =NULL;

    Audio_Queue= (  struct RING_BUFFER_QUEUE*)param;
    while(!CanRead(Audio_Queue));
   
    for(i=0 ;i<50;i++)
    {
        memset(buf,0,100);
        readlen=QueueRead( Audio_Queue, buf, 100, &tv);
        if(readlen<=0)
        {
           printf("queue readcount =%d\n",CanRead(Audio_Queue));
            if(!CanRead(Audio_Queue))
            {
                  usleep(1000*1000);
                     printf("the queue is null,break\n");
                     continue;
                   // break;
            }
        }
        printf("read:buf=%s\n",buf);
        sleep(2);
    }
   
    printf("the thread1 is exit\n");
}

int main(char argc ,char *argv[])

 int nMaxCount = 20;
 
  char buf[100]={0};
  int i =0 ;
  int ret =0 ;
  pthread_t thread_writeid1;
  pthread_t thread_readid2;
  struct RING_BUFFER_QUEUE *Audio_Queue =NULL;
  struct timeval  tv;

  Audio_Queue = QueueInit(QUEUE_MAX_COUNT,FRAME_MAX_SIZE);
   if(NULL == Audio_Queue)
   {
         printf("create audio queue fail\n");
         return -1;
   }
  
   //QueueSetDelayCount(Audio_Queue, 2);
  
   ret =pthread_create(&thread_writeid1, NULL, WriteQueue, (void *)Audio_Queue);
    if(0== ret)
   {
          printf("create pthread writeQueue success\n");
    }
   if(0==pthread_create(&thread_readid2, NULL, ReadQueue, (void *)Audio_Queue))
    {
          printf("create pthread ReadQueue success\n");
    }
   pthread_join(thread_writeid1,NULL);
   pthread_join(thread_readid2,NULL);
  QueueRelease(Audio_Queue);
  printf("the audio queue is release success\n");
  return 0;
}

/*******************Makefile******/

SRCS =$(wildcard *.c)
OBJS =$(SRCS:.c = .o)
TARGET =my_app
CC= gcc
INCLUDES = -I ./ 
LIBS =  -lpthread
CCFLAGS = -g -Wall -O2
$(TARGET):$(OBJS)
 $(CC) $^ -o  $@ $(INCLUDES) $(LIBS)
%.o:%.c
 $(CC)-c $< $(CCFLAGS)
clean:
 rm  -rf *.o *.bak 
 rm -rf $(TARGET)
/**********************************/

结果分析:在实例程序中,首先创建在了一个队列,然后创建了两个线程,一个读线程一个写线程。从代码上来看,两个线程肯定是互斥的。WriteQueue线程写入,ReadQueue线程读取,在读取线程里面先判断队列里面是否有数据写入,没有等待。如果读取太快,则延时1s后再去读取。从而保证一次读取写入队列中的数据。实现数据的同步。

抱歉!评论已关闭.