/* 3000pc-rendezvous.c More complex producer-consumer using mmap shared memory and pthread_cond_wait * Original Version Copyright (C) 2017 Anil Somayaji * Modified Version Copyright (C) 2020 William Findlay * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ /* You really shouldn't be incorporating parts of this in any other code, it is meant for teaching, not production */ #include #include #include #include #include #include #include #include #include #include #include #include #include #define QUEUESIZE 32 #define WORDSIZE 16 const int wordlist_size = 27; const char *wordlist[] = { "Alpha", "Bravo", "Charlie", "Delta", "Echo", "Foxtrot", "Golf", "Hotel", "India", "Juliet", "Kilo", "Lima", "Mike", "November", "Oscar", "Papa", "Quebec", "Romeo", "Sierra", "Tango", "Uniform", "Victor", "Whiskey", "X-ray", "Yankee", "Zulu", "Dash" }; typedef struct entry { char word[WORDSIZE]; sem_t lock; } entry; typedef struct shared { //pthread_mutex_t cond_mutex; //pthread_cond_t queue_nonempty; bool predicate_nonempty; //pthread_cond_t queue_nonfull; bool predicate_nonfull; sem_t queue_nonempty; sem_t queue_nonfull; entry queue[QUEUESIZE]; int last_produced; int last_consumed; pid_t prod_pid; pid_t con_pid; int prod_count; int con_count; } shared; void report_error(char *error) { fprintf(stderr, "Error: %s\n", error); } void usage_exit(char *progname) { fprintf(stderr, "Usage: %s \n", progname); exit(-1); } void pick_word(char *word) { unsigned int pick; /* Open /dev/urandom for reading */ int fd = open("/dev/urandom", O_RDONLY); if (fd < 0) { fprintf(stderr, "Error: Unable to open /dev/urandom for reading: %s\n", strerror(errno)); pick = 0; } else if (read(fd, (void *)&pick, sizeof(pick)) == -1) { fprintf(stderr, "Error: Unable to read from /dev/urandom: %s\n",strerror(errno)); pick = 0; } pick = pick % wordlist_size; strcpy(word, wordlist[pick]); close(fd); } void wait_for_producer(shared *s) { //pthread_mutex_lock(&s->cond_mutex); fprintf(stderr, "Waiting for producer...\n"); while(s->predicate_nonempty == false) //pthread_cond_wait(&s->queue_nonempty, &s->cond_mutex); //pthread_mutex_unlock(&s->cond_mutex); sem_wait(&s->queue_nonempty); } void wait_for_consumer(shared *s) { //pthread_mutex_lock(&s->cond_mutex); fprintf(stderr, "Waiting for consumer...\n"); while(s->predicate_nonfull == false) //pthread_cond_wait(&s->queue_nonfull, &s->cond_mutex); //pthread_mutex_unlock(&s->cond_mutex); sem_wait(&s->queue_nonfull); } void output_word(int c, char *w) { printf("Word %d: %s\n", c, w); } int queue_word(char *word, shared *s) { entry *e; int current, retval; current = (s->last_produced + 1) % QUEUESIZE; e = &s->queue[current]; sem_wait(&e->lock); if (e->word[0] != '\0') { /* consumer hasn't consumed this entry yet */ sem_post(&e->lock); wait_for_consumer(s); sem_wait(&e->lock); } if (e->word[0] != '\0') { fprintf(stderr, "ERROR: No room for producer after waiting!\n"); retval = -1;exit(1); goto done; } else { strncpy(e->word, word, WORDSIZE); s->last_produced = current; s->prod_count++; s->predicate_nonempty = true; s->predicate_nonfull = false; /* Notify that queue is nonempty */ //pthread_cond_signal(&s->queue_nonempty); sem_post(&s->queue_nonempty); retval = 0; goto done; } done: sem_post(&e->lock); return retval; } int get_next_word(char *word, shared *s) { entry *e; int current, retval; current = (s->last_consumed + 1) % QUEUESIZE; e = &s->queue[current]; sem_wait(&e->lock); if (e->word[0] == '\0') { /* producer hasn't filled in this entry yet */ sem_post(&e->lock); wait_for_producer(s); sem_wait(&e->lock); } if (e->word[0] == '\0') { fprintf(stderr, "ERROR: Nothing for consumer after waiting!\n"); retval = -1;exit(1); goto done; } else { strncpy(word, e->word, WORDSIZE); e->word[0] = '\0'; s->last_consumed = current; s->con_count++; s->predicate_nonfull = true; s->predicate_nonempty = false; /* Notify that queue is nonfull */ //pthread_cond_signal(&s->queue_nonfull); sem_post(&s->queue_nonfull); retval = 0; goto done; } done: sem_post(&e->lock); return retval; } void producer(shared *s, int event_count, int prod_interval) { char word[WORDSIZE]; int i; for (i=0; i < event_count; i++) { pick_word(word); queue_word(word, s); /* Don't sleep if interval <= 0 */ if (prod_interval <= 0) continue; /* Sleep if we hit our interval */ if (i % prod_interval == 0) { fprintf(stderr, "Producer sleeping for 1 second...\n"); sleep(1); } } fprintf(stderr, "Producer finished.\n"); exit(0); } void consumer(shared *s, int event_count, int con_interval) { char word[WORDSIZE]; int i; for (i=0; i < event_count; i++) { get_next_word(word, s); output_word(s->con_count, word); /* Don't sleep if interval <= 0 */ if (con_interval <= 0) continue; /* Sleep if we hit our interval */ if (i % con_interval == 0) { fprintf(stderr, "Consumer sleeping for 1 second...\n"); sleep(1); } } fprintf(stderr, "Consumer finished.\n"); exit(0); } void init_shared(shared *s) { int i; /* We need to explicitly mark the mutex as shared or risk undefined behavior */ //pthread_mutexattr_t mattr = {}; //pthread_mutexattr_setpshared(&mattr, 1); //pthread_mutex_init(&s->cond_mutex, &mattr); /* We need to explicitly mark the conditions as shared or risk undefined behavior */ //pthread_condattr_t cattr = {}; //pthread_condattr_setpshared(&cattr, 1); //pthread_cond_init(&s->queue_nonempty, &cattr); //pthread_cond_init(&s->queue_nonfull, &cattr); sem_init(&s->queue_nonempty, 1, 0); sem_init(&s->queue_nonfull, 1, 0); s->last_consumed = -1; s->last_produced = -1; s->prod_pid = -1; s->con_pid = -1; s->prod_count = 0; s->con_count = 0; s->predicate_nonfull = false; s->predicate_nonempty = false; for (i=0; iqueue[i].word[0] = '\0'; /* semaphore is shared between processes, and initial value is 1 (unlocked) */ sem_init(&s->queue[i].lock, 1, 1); } } int main(int argc, char *argv[]) { int pid, count, prod_interval, con_interval; shared *s; if (argc < 4) { if (argc < 1) { report_error("no command line"); usage_exit(argv[0]); } else { report_error("Not enough arguments"); usage_exit(argv[0]); } } count = atoi(argv[1]); prod_interval = atoi(argv[2]); con_interval = atoi(argv[3]); s = (shared *) mmap(NULL, sizeof(shared), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); if (s == MAP_FAILED) { fprintf(stderr, "Error: Unable to mmap: %s\n", strerror(errno)); exit(-1); } init_shared(s); pid = fork(); if (pid == 0) { /* Producer */ s->prod_pid = getpid(); producer(s, count, prod_interval); } else { /* Consumer */ s->con_pid = getpid(); consumer(s, count, con_interval); } /* This line should never be reached */ return -1; }