C Job Queue - Linked List -

hell-student

Lieutenant
Registriert
Nov. 2007
Beiträge
671
Hallo Zusammen,

Ich hänge momentan an einem Problem und komme einfach nicht weiter. Ich habe einen Threadpool von 100 Threads und möchte Jobs zuweisen bzw. wenn ein Job in die Job Queue kommt, werden die wartenden Threads "geweckt" und führen den Job aus. Mein Problem ist, dass ich irgendwie das Füllen bzw. entnehmen eines Jobs nicht richtig hinbekommen habe. Habe mich hier an diesen Link gehalten. Wenn ich Ein Job Element hinzufüge, sollte ein Thread den Job bearbeiten. Da ich einfach nur in jedem Job herunterzähle und dies auch ausgebe, sollte das eigentich passen. Das Problem ist aber das ich viele Jobs hinzufügen kann, aber nur einer bearbeitet wird. Irgendwie hängst einfach beim einfügen und entfernen.

Ich möchte eigentlich eine Linked List haben, bei der der erst eingefügte Job als erstes bearbeitet wird, also einfach immer am Head die Jobs abgreifen. Sollte also eigentlich der einfachste fall sein, eine Liste zu füllen und zu entleeren.

Hier ist der Code:

Code:
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include "pthread.h"
#include "runtime_system.h"

#define   THREADS_PER_SET  100

typedef enum {
    UNUSED = 0,
    PENDING,
    WORKING,
    SUCCESS,
    FAILURE,
    CANCELED
} status_t;

struct invade_job_struct {
	struct invade_job_struct *next;
	volatile int canceled;
	pthread_mutex_t mutex;
	status_t status;

	/* Job parameters. This one iterates n times sleep(1)
	 */
	unsigned long n;
};

static pthread_mutex_t       work_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t        work_cond = PTHREAD_COND_INITIALIZER;

static struct invade_job_struct *invade_queue_head = NULL;
static struct invade_job_struct *invade_queue_ptr = NULL;
static struct invade_job_struct *finished = NULL;

/*
 * job worker function
 */
#define DEBUG 1
status_t invade(struct invade_job_struct *const job)
{
	volatile int *const canceld = &job->canceled;
	unsigned long n = job->n;

	while (n > 0UL) {
		if (*canceld) {
			return CANCELED;
		}

		n--;

		if (DEBUG) {
			printf("Thread %u : invade = %lu\n",  (unsigned int)pthread_self(), n);
			fflush(stdout);
		}
	}

	return SUCCESS;
}

/* Invade thread function
 *
 */
void *invade_thread_worker(void *payload __attribute__((unused)))
{
	struct invade_job_struct *job;
	status_t status;

	pthread_mutex_lock(&work_mutex);

	while (1) {

		/* no job in queue */
		if (!invade_queue_head) {
			pthread_cond_wait(&work_cond, &work_mutex);
			continue;
		}

		/* grep first job element of invade queue */
		job = invade_queue_head;
		if (invade_queue_head->next != NULL) {
			invade_queue_ptr = invade_queue_head->next;
			invade_queue_head = invade_queue_ptr;
		}
		else {
			invade_queue_ptr = NULL;
			invade_queue_head = NULL;
		}

		pthread_mutex_lock(&job->mutex);
		if (job->status == PENDING) {

			/* work on it */
		    job->status = WORKING;
		    pthread_mutex_unlock(&job->mutex);

		    status = invade(job);

		    pthread_mutex_lock(&job->mutex);
		    if (job->status == WORKING) {
		    	job->status = status;
		    }
		}
		pthread_mutex_unlock(&job->mutex);
	}
}

void cancel_invade_job(struct invade_job_struct *const job)
{
    pthread_mutex_lock(&job->mutex);
    if (job->status == PENDING || job->status == WORKING) {
        job->canceled = ~0;
        job->status = CANCELED;
    }
    pthread_mutex_unlock(&job->mutex);
}

void create_invade_job()
{
	struct invade_job_struct *job;
	/* empty invade job queue */
	if(invade_queue_head == NULL) {
		invade_queue_head = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
		invade_queue_head->status = PENDING;
		invade_queue_head->n = 1000;
		invade_queue_head->next = NULL;
		invade_queue_ptr = invade_queue_head;
		printf("created first element\n");
	}
	else {
		invade_queue_ptr = invade_queue_head;
		while (invade_queue_ptr->next != NULL) {
			invade_queue_ptr = invade_queue_ptr->next;
		}
		invade_queue_ptr->next = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
		invade_queue_ptr = invade_queue_ptr->next;
		invade_queue_ptr->status = PENDING;
		invade_queue_ptr->n = 1000;
		invade_queue_ptr->next = NULL;
		printf("created element\n");
	}
}

#define DEBUG 1
int main()
{
	int i;
	int recv_bytes;
	int fifo_fd;
	char fifo_name[100];
	char msg[100];
	pthread_attr_t attrs;
	pthread_t id[THREADS_PER_SET];


	sprintf(fifo_name, "runtime_system_fifo%i\0", 1);
	mkfifo(fifo_name, 0666);
	fifo_fd = open(fifo_name, O_RDONLY | O_NONBLOCK);

	pthread_attr_init(&attrs);
	pthread_attr_setstacksize(&attrs, 65536);

	for (i = 0; i < THREADS_PER_SET; i++) {
		pthread_create(&id[i], &attrs, invade_thread_worker, NULL);
	}

	while(1) {
		recv_bytes = read(fifo_fd, msg, sizeof(msg));
		msg[100] = '\0';
		if(strcmp(msg, "start") == 0) {
			pthread_mutex_lock(&work_mutex);

			create_invade_job();

			pthread_cond_signal(&work_cond);
			pthread_mutex_unlock(&work_mutex);
			sprintf(msg, "%i", 1);
		}
	}

	return EXIT_SUCCESS;
}

Per FIFO wird eine Nachricht gesendet, und falls diese "start" lautet, soll ein neuer Job erstellt werden und dann auch abgebarbeitet werden. Sende ich aber nacheinander 2 mal "start" printet mir nur ein Thread was aus, also gehe ich davon aus, dass ich die Jobs nicht richtig einfüge.
 
Code:
msg[100] = '\0';
Die Zeile ist eine Speicherzugriffsfehler und das Lock/Mutex sollte vor der Bedingungsvariablen entsperrt werden (d.h. 177 & 178 vertauschen). Wartende Threads sperren das Mutex vor Verlassen der wait-Funktion wieder (77).

Der Worker-Thread entsperrt work_mutex nicht. Er hat es also nach Verlassen der wait-Funkion in der HAnd und gibt's nie wieder her. Demnach kommt der main-Thread nicht weiter.
 
Zuletzt bearbeitet:
Danke für die Hilfe, aber dennoch kann ich nicht 2 Threads mit Aufgaben versorgen. Dachte die Ausgabe müsste so ungefähr interleaved ausgegeben werden so in etwa:

Thread 1570559744 : invade = 986
Thread 1570559711 : invade = 987
Thread 1570559744 : invade = 985
Thread 1570559711: invade = 986
Thread 1570559711 : invade = 985

Aber die Threads laufen wohl nacheinander
 
Ih hab's nicht übersetzt und nur aus dem Kopf "verbessert". Einiges ist da unklar:

Code:
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include "pthread.h"
#include "runtime_system.h"
 
#define   THREADS_PER_SET  100
 
typedef enum {
    UNUSED = 0,
    PENDING,
    WORKING,
    SUCCESS,
    FAILURE,
    CANCELED
} status_t;
 
struct invade_job_struct {
  struct invade_job_struct *next;
	volatile int canceled;
	pthread_mutex_t mutex;
	status_t status;
 
	/* Job parameters. This one iterates n times sleep(1)
	 */
	unsigned long n;
};
 
static pthread_mutex_t       work_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t        work_cond = PTHREAD_COND_INITIALIZER;
 
static struct invade_job_struct *invade_queue_head = NULL;
static struct invade_job_struct *invade_queue_tail = NULL;
static struct invade_job_struct *finished = NULL;
 
/*
 * job worker function
 */
#define DEBUG 1
status_t invade(struct invade_job_struct *const job)
{
	volatile int *const canceld = &job->canceled;
	unsigned long n = job->n;
 
	while (n > 0UL) {
		if (*canceld) 
			return CANCELED;
 
		n--;
 
		if (DEBUG) {
			printf("Thread %u : invade = %lu\n",  (unsigned int)pthread_self(), n);
			fflush(stdout);
		}
	} 
	return SUCCESS;
}
 
/* Invade thread function
 *
 */
void *invade_thread_worker(void *payload __attribute__((unused)))
{
	struct invade_job_struct *job;
	status_t status;
  
	while (1) {
 		pthread_mutex_lock(&work_mutex);
		/* no job in queue */
		while(!invade_queue_head) {
			pthread_cond_wait(&work_cond, &work_mutex);
		}
 
		/* grab first job element of invade queue */
		job = invade_queue_head;
		invade_queue_head = invade_queue_head->next;
		/* we're done grabbing a job. unlock list */
		pthread_mutex_unlock(&work_mutex);
 
		/* these locks are quite obscure. are they actually necessary? */
		/* they aren't initialized as well */
		0 && pthread_mutex_lock(&job->mutex);
		if (job->status == PENDING) {
 
			/* work on it */
		    job->status = WORKING;
		    0 && pthread_mutex_unlock(&job->mutex);
 
		    status = invade(job);
 
		    0 && pthread_mutex_lock(&job->mutex);
		    if (job->status == WORKING) {
		    	job->status = status;
		    }
		}
		0 && pthread_mutex_unlock(&job->mutex);
	}
}
 
void cancel_invade_job(struct invade_job_struct *const job)
{
    pthread_mutex_lock(&job->mutex);
    if (job->status == PENDING || job->status == WORKING) {
        job->canceled = ~0;
        job->status = CANCELED;
    }
    pthread_mutex_unlock(&job->mutex);
}
 
void create_invade_job()
{
	struct invade_job_struct *job;
	/* empty invade job queue */
 
		job = (struct invade_job_struct *) calloc(1, sizeof(struct invade_job_struct));
		job->status = PENDING;
		job->n = 1000;
		job->next = NULL;
		// FIXME
		// initialize mutex?
 
		if(invade_queue_head == NULL) {
			/* list/deque is empty */
			invade_queue_head = job;
			invade_queue_tail = job;
		}
		else
			invade_queue_tail->next = job;
		printf("job element\n");
}
 
#define DEBUG 1
int main()
{
	int i;
	int recv_bytes;
	int fifo_fd;
	char fifo_name[100];
	char msg[100];
	pthread_attr_t attrs;
	pthread_t id[THREADS_PER_SET];
  
	sprintf(fifo_name, "runtime_system_fifo%i\0", 1);
	mkfifo(fifo_name, 0666);
	fifo_fd = open(fifo_name, O_RDONLY | O_NONBLOCK);
 
	pthread_attr_init(&attrs);
	pthread_attr_setstacksize(&attrs, 65536);
 
	for (i = 0; i < THREADS_PER_SET; i++) {
		pthread_create(&id[i], &attrs, invade_thread_worker, NULL);
	}
 
	while(1) {
		recv_bytes = read(fifo_fd, msg, sizeof(msg));
		msg[99] = '\0';
		if(strcmp(msg, "start") == 0) {
			/* since we'd like to modify the list we have to lock it */
			pthread_mutex_lock(&work_mutex);
 			create_invade_job();
			pthread_mutex_unlock(&work_mutex);
			/* signaled threads will grab that lock to acquire a job */ 
 			pthread_cond_signal(&work_cond);
			sprintf(msg, "%i", 1);
		}
	}
 
	return EXIT_SUCCESS;
}
 
Danke,

aber leider klappt dies auch nicht. Ich bin schon am verzweifeln =). Mir ist eigentlich schon klar was ich locken muss und was nicht und wann ich die locks wieder freigeben muss. job->mutex ist dazu da um zu verhindern, dass ein thread den status auf working ändert, während er gecancelt wird.

Ist das Einfügen und Löschen in die Liste richtig?
 
Nein, nicht ganz. Nach der Zeile 130 hab ich "invade_queue_tail = job" vergessen (besser gleich Z. 127 ans Ende der Fkt. schieben).
In der jetzigen Form sind die Job-Mutexes überflüssig, da ein Job stets genau einem Thread zugeordnet wird und sich daher niemand in die Quere kommt. Zumindest nahm ich an, dass es das ist, was du machen willst.
 
Zuletzt bearbeitet:
pthread_cond_wait(&work_cond, &work_mutex) Gibt doch den Mutex bereits frei, bei ersten aufruf und wartet auf die COND. Müsste nicht dannach wieder der Mutex gelockt werden: Zeile 70-80?

Ich möchte eigentlich gerne folgendes haben:

Ein Job soll genau einem Thread zugeordnet werden, also auch nur 1 mal bearbeitet werden. Dannach soll der Job nicht mehr zuweisbar sein (also kann ich den ja einfach aus der queue holen, und gegebenenfalls in eine finish-queue einhängen. Nur leider funktioniert dein tipp von heute auch nicht. Starte ich nacheinander, pass alles. Starte ich jedoch parallel und und Zeitversetzt immer mal wieder neue Jobs so hängt es irgendwann und es können keine neuen Jobs mehr einggestellt werden. Kann man am "printf("job element\n");" erkennen.



EDIT:

Doch noch nicht gelöst
 
Zuletzt bearbeitet:
Zurück
Oben