天天看点

消费者与生产者线程同步程序

#include<stdio.h>

#include<stdlib.h>

#include<unistd.h>

#include<fcntl.h>

#include<pthread.h>

#include<errno.h>

#include<semaphore.h>

#include<sys/ipc.h>

#define MYFIFO  "/tmp/myfifo" //缓冲区有名管道的名字

#define BUFFER_SIZE  3   //缓冲区的单元数

#define UNIT_SIZE  5   //每个单元的大小

#define RUN_TIME  30

#define DELAY_TIME_LEVELS 5.0

int fd;

time_t end_time;

sem_t mutex,full,avail;

void *producer(void *arg);

void *customer(void *arg);

int main()

{

 pthread_t thrd_prd_id,thrd_cst_id;

 pthread_t mon_th_id;

 int ret;

 srand(time(NULL));

 end_time=time(NULL)+RUN_TIME;

 if((mkfifo(MYFIFO,O_CREAT|O_EXCL)<0)&&(errno!=EEXIST))

 {

  printf("cannot create fifo\n");

  return errno;

 }

 fd=open(MYFIFO,O_RDWR);

 if(fd==-1)

 {

  printf("open fifo error\n");

  return fd;

 }

 ret=sem_init(&mutex,0,1);

 ret+=sem_init(&avail,0,BUFFER_SIZE);

 ret+=sem_init(&full,0,0);

 if(ret!=0)

 {

  printf("any semaphore initalization failed\n");

  return ret;

 }

 ret=pthread_create(&thrd_prd_id,NULL,producer,NULL);

 if(ret!=0)

 {

  printf("Create producer thread error\n");

  return ret;

 }

 ret=pthread_create(&thrd_cst_id,NULL,customer,NULL);

 if(ret!=0)

 {

  printf("Create customer thread error\n");

  return ret;

 }

 pthread_join(thrd_prd_id,NULL);

 pthread_join(thrd_cst_id,NULL);

 close(fd);

 unlink(MYFIFO);

 return 0;

}

void *producer(void *arg)

{

 int real_write;

 int delay_time=0;

 while(time(NULL)<end_time)

 {

  delay_time=(int)(rand()*DELAY_TIME_LEVELS/(RAND_MAX)/2.0)+1;

  sleep(delay_time);

  sem_wait(&avail);

  sem_wait(&mutex);

  printf("\nproducer:delay=%d\n",delay_time);

  if((real_write=write(fd,"hello",UNIT_SIZE))==-1)

  {

   if(errno==EAGAIN)

    printf("the fifo has not ben read yet.please try later");

  }

  else

  {

   printf("Write %d to the fifo\n",real_write);

  }

  sem_post(&full);

  sem_post(&mutex);

 }

 pthread_exit(NULL);

void *customer(void *arg)

{

 int real_read;

 unsigned char read_buffer[UNIT_SIZE];

 int delay_time=0;

 while(time(NULL)<end_time)

 {

  delay_time=(int)(rand()*DELAY_TIME_LEVELS/(RAND_MAX))+1;

  sleep(delay_time);

  sem_wait(&full);

  sem_wait(&mutex);

  printf("\ncustomer:delay=%d\n",delay_time);

  if((real_read=read(fd,read_buffer,UNIT_SIZE))==-1)

  {

   if(errno==EAGAIN)

    printf("no data yet\n");

  }

  printf("read %s from FIFO\n",read_buffer);

  sem_post(&avail);

  sem_post(&mutex);

 }

 pthread_exit(NULL);

}  

继续阅读