Hello! I am a university student and need help completing an assignment about thread synchronization in C. The assignment is basically about a program that copies a given text file to another text file (character by character) using threads. The program has a circular array that stores a certain number of the struct:
typedef struct{
char data;
off_t offset;
}BufferItem;
It also has two sets of threads: IN threads and OUT threads. The purpose of IN threads are to read from the source file a character and its offset and then store the information in the array using the struct BufferItem. The OUT thread on ther hand reads from the array and writes the stored character into the target/copy file using its corresponding offset.
Upon creation both threads go to sleep for a random time between 0 and 0.1 sec.
Along the way, each thread writes some information to two log files, so that their activities could be traced. Since the threads would reading and writing on a shared memory, thread synchronization is required. Therefore the IN threads have three critical sections where they cannot be preempted and thus have CPU guaranteed, this is achieved using mutex locks. The same is the case for OUT threads.
The above program(copy.c) will be compiled with:
%cc -Wall -o cpy copy.c -lpthread
It will be invoked as follows:
cpy <nIn> <nOut> <file> <copy> <bufSize><IN_Log><OUT_log>
<nIn> is the number of IN threads to create. There should be atleast 1.
<nOut> is the number of OUT threads to create. There should be atleast 1.
<file> is the pathname of the file to be copied. It should exist and readable.
<copy> is the name to be given to the copy. If a file with that name already exists, it should be overwritten.
<bufSize> is the capacity, in terms of BufferItem's, of the shared array/buffer. It should be atleast 1.
<IN_Log> the IN threads write some trace information to this file. If a file with that name already exists, it should be overwritten.
<OUT_Log> the OUT threads write some trace information to this file.If a file with that name already exists it should be overwritten.
-------------------------------------------------------------------------------------------------------------------
I have completed almost 90% of the assignment, the only part I am stuck in is how to write to the copy file using the exact offsets from the source file. I have used fseeko(...) but still my program just writes to the copy file sequentially what the threads read from the buffer, ignoring the offset values hence the copy file is not an exact copy of the source file.
The following is the code for my program:
#include <errno.h>
#include <sys/types.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
typedef struct{
char data;
off_t offset;
}BufferItem; /* struct storing info copied from source file */
void *inFunc(void *param); /* contains operations that IN threads should perform */
void *outFunc(void *param); /* contains operations that OUT threads should perform */
int m;
FILE *fd; /* this stream reads from the source file */
FILE *fd2; /* this stream writes to the copy file */
FILE *inLog; /* the IN threads report their activity using this stream */
FILE *outLog; /* the OUT threads report their activity using this steam */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int bufferSize;
//off_t offset;
BufferItem **array;
int nIn;
int nFin;
char *inFile; /* name of inLog file*/
char *outFile; /* name of outLog file */
char *copy; /* name of copied file */
int main(int argc, char *argv[])
{
nIn = atoi(argv[1]);
int nOut = atoi(argv[2]);
int i;
int status;
int number[nIn];
int number2[nOut];
pthread_t tid[nIn];
pthread_t tid2[nOut];
pthread_attr_t attr[nIn];
pthread_attr_t attr2[nOut];
fd = (FILE *)fopen(argv[3], "r");
if(fd == NULL)
{
printf("File open error fd1\n");
exit(1);
}
fd2 = (FILE *)fopen(argv[4], "w");
if(fd2 == NULL)
{
printf("File open error fd2\n");
exit(1);
}
bufferSize = atoi(argv[5]);
inLog = (FILE *)fopen(argv[6], "w");
outLog = (FILE *)fopen(argv[7], "w");
int fName1 = strlen(argv[6]);
int fName2 = strlen(argv[7]);
int fName3 = strlen(argv[4]);
inFile = (char *)malloc(sizeof(char) * fName1);
copy = (char *)malloc(sizeof(char) * fName3);
strcpy(copy, argv[4]);
strcpy(inFile, argv[6]);
outFile = (char *)malloc(sizeof(char) * fName2);
printf("%s\n", copy);
strcpy(outFile, argv[7]);
printf("%s\n", outFile);
array = (BufferItem **)malloc(sizeof(BufferItem *) * bufferSize); // setting up the circular array with the provided bufferSize.
int size;
// creating all IN threads.
if(nIn > nOut)
size = nIn;
else
size = nOut;
for(i = 0; i < size; i++)
{
if(i < nIn)
{
number[i] = i;
// setting up default thread attributes.
status = pthread_attr_init(&(attr[i]));
//checking initialization
if(status)
{
printf("Init error = %s\n", strerror(status));
exit(1);
}
// creating a thread.
status = pthread_create(&(tid[i]), &(attr[i]), inFunc,(void *)(&number[i]));
//checking thread creation.
if(status)
{
printf("Create error = %s\n", strerror(status));
exit(1);
}
}
//creaitng all OUT threads.
if( i < nOut)
{
number2[i] = i;
//setting up default thread attributes.
status = pthread_attr_init(&(attr2[i]));
//checking initialization
if(status)
{
printf("Init error = %s\n", strerror(status));
exit(1);
}
// creating a thread.
status = pthread_create(&(tid2[i]), &(attr2[i]), outFunc,(void *)(&number2[i]));
//checking thread creation.
if(status)
{
printf("Create error = %s\n", strerror(status));
exit(1);
}
}
}
// waiting for IN threads to finish.
for( i = 0; i < nIn; i++)
{
pthread_join(tid[i], NULL);
}
//waiting for OUT threads to finish.
for(i = 0; i < nOut; i++)
{
pthread_join(tid2[i], NULL);
}
i = 0;
/*while(array[i] != NULL)
{
printf("%c %ld\n", array[i]->data, array[i]->offset);
i++;
}*/
}
void *inFunc(void *param)
{
char c;
int id;
int loop = 1;
id = *((int *)param);
int index;
struct timespec tv;
tv.tv_sec = (time_t)(rand() % 1000) / 100000;
tv.tv_nsec = (long)(rand() % 1000000);
if(nanosleep(&tv, NULL)) perror("nanosleep");
int status;
while(loop)
{ // the thread goes to sleep for a random time between 0 and 0.1 sec.
//obtaining mutex lock in order to read from source file.
pthread_mutex_lock(&mutex);
//lseek(fd, offset, SEEK_SET);
off_t offset = ftello(fd);
if((c = fgetc(fd)) == EOF)
loop = 0;
BufferItem *newItem = (BufferItem *)malloc(sizeof(BufferItem));
newItem->data = c;
//printf("new item data %c, thread id %d\n", newItem->data, id);
newItem->offset = offset;
//printf("I am here \n");
pthread_mutex_unlock(&mutex); //mutex lock released.
//searching for empty space in buffer.
int i;
for(i = 0; (i >= 0) && (i < bufferSize) && (c != EOF); i++)
{
if( array[i] == NULL)
{
//obtaining mutex lock for writing in to the buffer.
pthread_mutex_lock(&mutex);
array[i] = newItem;
printf("1. c is %c offset is %d id is %d\n", newItem->data, newItem->offset, id);
index = i;
i = 100000;
pthread_mutex_unlock(&mutex); //mutex lock released.
}
if(i == (bufferSize - 1))
i = 0;
}
//printf("%s\n", inFile);
//obtaining mutex lock for writing in to inLog
pthread_mutex_lock(&mutex);
inLog = (FILE *)fopen(inFile, "a");
if(fprintf(inLog, "id = %d ", id) < 0)
perror("fprintf error\n");
if(fprintf(inLog, "offset = %ld ", offset) < 0)
perror("fprintf error\n");
if(fprintf(inLog, "index = %d\n", index) < 0)
perror("fprintf error\n");
fclose(inLog);
pthread_mutex_unlock(&mutex); //mutex lock released.
if(nanosleep(&tv, NULL)) perror("nanosleep");
}
//printf("I am inside inFunc3 id %d\n", id);
nFin++;
pthread_exit(0);
}
void *outFunc(void *param)
{
printf("-----------------------------------\n");
// the thread goes to sleep upon cretion for a random time between 0 and 0.1 sec.
int bufCheck = 1;
int id;
id = *((int *)param);
int arrCheck = 0;
int err;
//printf("Position 1a id %d\n", id);
int i;
int index;
char c;
off_t offset;
struct timespec tv;
tv.tv_sec = (time_t)(rand() % 1000) / 100000;
tv.tv_nsec = (long)(rand() % 1000000);
int loop = 1;
int k;
if(nanosleep(&tv, NULL)) perror("nanosleep");
//printf("Position 1 id %d\n", id);
while(loop)
{ bufCheck = 1;
printf("***** M = %d ID = %d *****\n",m, id);
arrCheck = 0;
//Checking if the buffer is empty.
for(i = 0; i < bufferSize; i++)
{
if(array[i] != NULL)
bufCheck = 0;
}
for( i = 0; i < bufferSize; i++)
{
if(array[i] != NULL)
bufCheck = 0;
}
//printf("Position 2 id %d\n", id);
if((bufCheck) && (nFin == nIn))
{
loop = 0;
//printf("****************************************\n");
}
pthread_mutex_lock(&mutex);
for( i = m; i < bufferSize; i++)
{
if(array[i] != NULL)
{ // Aquiring mutex lock for reading from the buffer
//printf(" Enter 1\n");
if(array[i] != NULL)
{
c = array[i]->data;
offset = array[i]->offset;
index = i;
arrCheck = 1;
array[i] = NULL;
printf("out c is %c offset is %d id is %d\n", c, offset, id);
// printf(" array pos i = %d is now NULL\n", i);
//if(array[i] == NULL)
//printf("Yes tested\n");
m = i;
if(((i+1) < bufferSize) && (array[i + 1] != NULL))
m++;
i = 1000000;
}
//printf("Exit 1\n");
}
}
if(arrCheck == 0)
m = 0;
printf("***** before mut unlock M = %d ID = %d *****\n",m, id);
pthread_mutex_unlock(&mutex);
//printf("Position 3 id %d\n", id);
if(arrCheck)
{
//writing to the copy file
//aquiring mutex lock for writing into the copy file
pthread_mutex_lock(&mutex);
//printf("2. c is %c offset is %d id is %d\n", c, offset, id);
fd2 = (FILE *)fopen(copy, "a");
fseeko(fd2, offset, SEEK_SET);
if(fwrite(&c,sizeof(char),1, fd2) == EOF)
perror("fputc error\n");
fclose(fd2);
pthread_mutex_unlock(&mutex); // releasing mutex lock.
//writing OUT thread info to log.
//Aquiring mutex lock for writing into outLog
pthread_mutex_lock(&mutex);
if((outLog = (FILE *)fopen(outFile, "a+")) == NULL)
printf("%s\n", strerror(errno));
//printf("Position 4a id %d\n", id);
if((err = fprintf(outLog, "%d ", id)) < 0)
printf("%s\n", strerror(err));
//printf("Position 4b id %d\n", id);
if(fprintf(outLog, "offset = %ld ", offset) < 0)
perror("fprintf error\n");
if(fprintf(outLog, "index = %d\n", index) < 0)
perror("fprintf error\n");
fclose(outLog);
pthread_mutex_unlock(&mutex); //mutex lock released.
}
//printf("Position 5 id %d\n", id);
if(nanosleep(&tv, NULL)) perror("nanosleep");
}
pthread_exit(0);
}
I would really appreciate anyone who would help me solve the problem.
Rossi.