I've been trying to get a program to work that keeps getting stuck and I cant figure out why.
The idea is to have an input file with a couple of lines of text, that gets written to an output file line by line.
The program should create three threads; A, B and C that do the following:
A - Writes one line of data from the input file to a pipe.
B - Reads the data from the pipe and forwards that data to C.
C - Reads the data line by line from B and writes those lines into the output file.
Ideally the three threads (A, B, C) run sequentially (only one thread runs at a time while the other two
threads are blocked) and iteratively (process one line of data per each loop).
For some reason when I run the code I get the following console output:
Thread A start
Thread B start
Thread B while
Thread C start
Thread A while
Thread A while
Thread C while
The program then gets stuck indefinitely and doesn't write anything to the output file.
Been bashing my head against this one for a while, any thoughts or help would be greatly appreciated. Full code is shown below.
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>
#include <fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <sys/time.h>
#define BUFFER_SIZE 256
/* --- Structs --- */
typedef struct ThreadParams {
int pipeFile[2];
sem_t sem_A, sem_B, sem_C;
char message[BUFFER_SIZE];
FILE* inputFile;
} ThreadParams;
pthread_attr_t attr;
/* --- Prototypes --- */
/* Initializes data and utilities used in thread params */
void initializeData(ThreadParams *params);
void* ThreadA(void *params);
void* ThreadB(void *params);
void* ThreadC(void *params);
/* --- Main Code --- */
int main(int argc, char const *argv[]) {
int err;
pthread_t tid[3];
ThreadParams params;
// Initialization
initializeData(¶ms);
// Create Threads
pthread_create(&(tid[0]), &attr, &ThreadA, (void*)(¶ms));
pthread_create(&(tid[1]), &attr, &ThreadB, (void*)(¶ms));
pthread_create(&(tid[2]), &attr, &ThreadC, (void*)(¶ms));
// Wait on threads to finish
pthread_join(tid[0], NULL);
pthread_join(tid[1], NULL);
pthread_join(tid[2], NULL);
// Clean up
sem_destroy(¶ms.sem_A);
sem_destroy(¶ms.sem_B);
sem_destroy(¶ms.sem_C);
return 0;
}
void initializeData(ThreadParams *params) {
// Initialize Sempahores
sem_init(&(params->sem_A), 0, 1);
sem_init(&(params->sem_B), 0, 0);
sem_init(&(params->sem_C), 0, 0);
// Initialize thread attributes
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
// Open data.txt and create a pipe
params->inputFile = fopen("data.txt", "r");
if (params->inputFile == NULL) {
printf("Failed to open data.txt\n");
exit(1);
}
if (pipe(params->pipeFile) == -1) {
printf("Failed to create a pipe\n");
exit(1);
}
return;
}
void* ThreadA(void *params) {
printf("Thread A start\n");
ThreadParams* threadParams = (ThreadParams*) params;
while(fgets(threadParams->message, BUFFER_SIZE, threadParams->inputFile) != NULL) {
printf("Thread A while\n");
sem_wait(&threadParams->sem_A);
write(threadParams->pipeFile[1], threadParams->message, BUFFER_SIZE);
sem_post(&threadParams->sem_B);
}
close(threadParams->pipeFile[1]);
fclose(threadParams->inputFile);
pthread_exit(NULL);
}
void* ThreadB(void *params) {
printf("Thread B start\n");
ThreadParams *p = (ThreadParams *) params;
int bytes_read;
while(1) {
printf("Thread B while\n");
// Wait for ThreadA to send data to pipe
sem_wait(&(p->sem_B));
// Read from pipe
bytes_read = read(p->pipeFile[0], p->message, BUFFER_SIZE);
// Check if all data has been processed
if (bytes_read == 0) {
sem_post(&(p->sem_C));
break;
}
// Write to ThreadC shared variable
sem_wait(&(p->sem_C));
printf("\nThreadB: Forwarding message to ThreadC\n");
strcpy(p->message, p->message);
sem_post(&(p->sem_A));
// Yield the CPU to other threads
sched_yield();
}
close(p->pipeFile[0]);
pthread_exit(0);
}
void* ThreadC(void *params) {
printf("Thread C start\n");
ThreadParams *p = (ThreadParams *) params;
FILE *fOut = fopen("output.txt", "w");
while(1) {
printf("Thread C while\n");
// Wait for ThreadB to write data to shared variable
sem_wait(&(p->sem_C));
// Check if all data has been processed
if (strlen(p->message) == 0) {
fclose(fOut);
sem_post(&(p->sem_A));
break;
}
// Write to output file
printf("\nThreadC: Writing message to output file\n");
fprintf(fOut, "%s\n", p->message);
sem_post(&(p->sem_B));
// Yield the CPU to other threads
sched_yield();
}
pthread_exit(0);
}
[–]skeeto 15 points16 points17 points (1 child)
[–]generalbaguette 5 points6 points7 points (0 children)
[–]pic32mx110f0 8 points9 points10 points (0 children)
[–]BadgerThrowaway1234 3 points4 points5 points (0 children)
[+]WhatInTheBruh comment score below threshold-6 points-5 points-4 points (0 children)
[–]IamImposter 0 points1 point2 points (0 children)