Files
evolution/e-util/e-msgport.c
8 2ca8993160 Write the pipe notification outside the lock. This way if the pipe fills
2001-09-28    <NotZed@Ximian.com>

	* e-msgport.c (e_msgport_put): Write the pipe notification outside
	the lock.  This way if the pipe fills up because of too many
	outstanding request, the queue isn't deadlocked.  This only
	happens wiht 4096 outstanding messages, so something is getting
	VERY busy!  Fixes #11121.

svn path=/trunk/; revision=13218
2001-09-28 18:38:45 +00:00

869 lines
18 KiB
C

#include "e-msgport.h"
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>
#include <glib.h>
#define m(x) /* msgport debug */
#define t(x) /* thread debug */
void e_dlist_init(EDList *v)
{
v->head = (EDListNode *)&v->tail;
v->tail = 0;
v->tailpred = (EDListNode *)&v->head;
}
EDListNode *e_dlist_addhead(EDList *l, EDListNode *n)
{
n->next = l->head;
n->prev = (EDListNode *)&l->head;
l->head->prev = n;
l->head = n;
return n;
}
EDListNode *e_dlist_addtail(EDList *l, EDListNode *n)
{
n->next = (EDListNode *)&l->tail;
n->prev = l->tailpred;
l->tailpred->next = n;
l->tailpred = n;
return n;
}
EDListNode *e_dlist_remove(EDListNode *n)
{
n->next->prev = n->prev;
n->prev->next = n->next;
return n;
}
EDListNode *e_dlist_remhead(EDList *l)
{
EDListNode *n, *nn;
n = l->head;
nn = n->next;
if (nn) {
nn->prev = n->prev;
l->head = nn;
return n;
}
return NULL;
}
EDListNode *e_dlist_remtail(EDList *l)
{
EDListNode *n, *np;
n = l->tailpred;
np = n->prev;
if (np) {
np->next = n->next;
l->tailpred = np;
return n;
}
return NULL;
}
int e_dlist_empty(EDList *l)
{
return (l->head == (EDListNode *)&l->tail);
}
int e_dlist_length(EDList *l)
{
EDListNode *n, *nn;
int count = 0;
n = l->head;
nn = n->next;
while (nn) {
count++;
n = nn;
nn = n->next;
}
return 0;
}
struct _EMsgPort {
EDList queue;
int condwait; /* how many waiting in condwait */
union {
int pipe[2];
struct {
int read;
int write;
} fd;
} pipe;
/* @#@$#$ glib stuff */
GCond *cond;
GMutex *lock;
};
EMsgPort *e_msgport_new(void)
{
EMsgPort *mp;
mp = g_malloc(sizeof(*mp));
e_dlist_init(&mp->queue);
mp->lock = g_mutex_new();
mp->cond = g_cond_new();
mp->pipe.fd.read = -1;
mp->pipe.fd.write = -1;
mp->condwait = 0;
return mp;
}
void e_msgport_destroy(EMsgPort *mp)
{
g_mutex_free(mp->lock);
g_cond_free(mp->cond);
if (mp->pipe.fd.read != -1) {
close(mp->pipe.fd.read);
close(mp->pipe.fd.write);
}
g_free(mp);
}
/* get a fd that can be used to wait on the port asynchronously */
int e_msgport_fd(EMsgPort *mp)
{
int fd;
g_mutex_lock(mp->lock);
fd = mp->pipe.fd.read;
if (fd == -1) {
pipe(mp->pipe.pipe);
fd = mp->pipe.fd.read;
}
g_mutex_unlock(mp->lock);
return fd;
}
void e_msgport_put(EMsgPort *mp, EMsg *msg)
{
int fd;
m(printf("put:\n"));
g_mutex_lock(mp->lock);
e_dlist_addtail(&mp->queue, &msg->ln);
if (mp->condwait > 0) {
m(printf("put: condwait > 0, waking up\n"));
g_cond_signal(mp->cond);
}
fd = mp->pipe.fd.write;
g_mutex_unlock(mp->lock);
if (fd != -1) {
m(printf("put: have pipe, writing notification to it\n"));
write(fd, "", 1);
}
m(printf("put: done\n"));
}
static void
msgport_cleanlock(void *data)
{
EMsgPort *mp = data;
g_mutex_unlock(mp->lock);
}
EMsg *e_msgport_wait(EMsgPort *mp)
{
EMsg *msg;
m(printf("wait:\n"));
g_mutex_lock(mp->lock);
while (e_dlist_empty(&mp->queue)) {
if (mp->pipe.fd.read == -1) {
m(printf("wait: waiting on condition\n"));
mp->condwait++;
/* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */
pthread_cleanup_push(msgport_cleanlock, mp);
g_cond_wait(mp->cond, mp->lock);
pthread_cleanup_pop(0);
m(printf("wait: got condition\n"));
mp->condwait--;
} else {
fd_set rfds;
m(printf("wait: waitng on pipe\n"));
FD_ZERO(&rfds);
FD_SET(mp->pipe.fd.read, &rfds);
g_mutex_unlock(mp->lock);
select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL);
pthread_testcancel();
g_mutex_lock(mp->lock);
m(printf("wait: got pipe\n"));
}
}
msg = (EMsg *)mp->queue.head;
m(printf("wait: message = %p\n", msg));
g_mutex_unlock(mp->lock);
m(printf("wait: done\n"));
return msg;
}
EMsg *e_msgport_get(EMsgPort *mp)
{
EMsg *msg;
char dummy[1];
g_mutex_lock(mp->lock);
msg = (EMsg *)e_dlist_remhead(&mp->queue);
if (msg && mp->pipe.fd.read != -1)
read(mp->pipe.fd.read, dummy, 1);
m(printf("get: message = %p\n", msg));
g_mutex_unlock(mp->lock);
return msg;
}
void e_msgport_reply(EMsg *msg)
{
if (msg->reply_port) {
e_msgport_put(msg->reply_port, msg);
}
/* else lost? */
}
struct _thread_info {
pthread_t id;
int busy;
};
struct _EThread {
EMsgPort *server_port;
EMsgPort *reply_port;
pthread_mutex_t mutex;
e_thread_t type;
int queue_limit;
int waiting; /* if we are waiting for a new message, count of waiting processes */
pthread_t id; /* id of our running child thread */
GList *id_list; /* if THREAD_NEW, then a list of our child threads in thread_info structs */
EThreadFunc destroy;
void *destroy_data;
EThreadFunc received;
void *received_data;
EThreadFunc lost;
void *lost_data;
};
#define E_THREAD_NONE ((pthread_t)~0)
#define E_THREAD_QUIT_REPLYPORT ((struct _EMsgPort *)~0)
static void thread_destroy_msg(EThread *e, EMsg *m);
static struct _thread_info *thread_find(EThread *e, pthread_t id)
{
GList *node;
struct _thread_info *info;
node = e->id_list;
while (node) {
info = node->data;
if (info->id == id)
return info;
node = node->next;
}
return NULL;
}
#if 0
static void thread_remove(EThread *e, pthread_t id)
{
GList *node;
struct _thread_info *info;
node = e->id_list;
while (node) {
info = node->data;
if (info->id == id) {
e->id_list = g_list_remove(e->id_list, info);
g_free(info);
}
node = node->next;
}
}
#endif
EThread *e_thread_new(e_thread_t type)
{
EThread *e;
e = g_malloc0(sizeof(*e));
pthread_mutex_init(&e->mutex, 0);
e->type = type;
e->server_port = e_msgport_new();
e->id = E_THREAD_NONE;
e->queue_limit = INT_MAX;
return e;
}
/* close down the threads & resources etc */
void e_thread_destroy(EThread *e)
{
int busy = FALSE;
EMsg *msg;
struct _thread_info *info;
GList *l;
/* make sure we soak up all the messages first */
while ( (msg = e_msgport_get(e->server_port)) ) {
thread_destroy_msg(e, msg);
}
pthread_mutex_lock(&e->mutex);
switch(e->type) {
case E_THREAD_QUEUE:
case E_THREAD_DROP:
/* if we have a thread, 'kill' it */
if (e->id != E_THREAD_NONE) {
pthread_t id = e->id;
t(printf("Sending thread '%d' quit message\n", id));
e->id = E_THREAD_NONE;
msg = g_malloc0(sizeof(*msg));
msg->reply_port = E_THREAD_QUIT_REPLYPORT;
e_msgport_put(e->server_port, msg);
pthread_mutex_unlock(&e->mutex);
t(printf("Joining thread '%d'\n", id));
pthread_join(id, 0);
t(printf("Joined thread '%d'!\n", id));
pthread_mutex_lock(&e->mutex);
}
busy = e->id != E_THREAD_NONE;
break;
case E_THREAD_NEW:
/* first, send everyone a quit message */
l = e->id_list;
while (l) {
info = l->data;
t(printf("Sending thread '%d' quit message\n", info->id));
msg = g_malloc0(sizeof(*msg));
msg->reply_port = E_THREAD_QUIT_REPLYPORT;
e_msgport_put(e->server_port, msg);
l = l->next;
}
/* then, wait for everyone to quit */
while (e->id_list) {
info = e->id_list->data;
e->id_list = g_list_remove(e->id_list, info);
pthread_mutex_unlock(&e->mutex);
t(printf("Joining thread '%d'\n", info->id));
pthread_join(info->id, 0);
t(printf("Joined thread '%d'!\n", info->id));
pthread_mutex_lock(&e->mutex);
g_free(info);
}
busy = g_list_length(e->id_list) != 0;
break;
}
pthread_mutex_unlock(&e->mutex);
/* and clean up, if we can */
if (busy) {
g_warning("threads were busy, leaked EThread");
return;
}
e_msgport_destroy(e->server_port);
g_free(e);
}
/* set the queue maximum depth, what happens when the queue
fills up depends on the queue type */
void e_thread_set_queue_limit(EThread *e, int limit)
{
e->queue_limit = limit;
}
/* set a msg destroy callback, this can not call any e_thread functions on @e */
void e_thread_set_msg_destroy(EThread *e, EThreadFunc destroy, void *data)
{
pthread_mutex_lock(&e->mutex);
e->destroy = destroy;
e->destroy_data = data;
pthread_mutex_unlock(&e->mutex);
}
/* set a message lost callback, called if any message is discarded */
void e_thread_set_msg_lost(EThread *e, EThreadFunc lost, void *data)
{
pthread_mutex_lock(&e->mutex);
e->lost = lost;
e->lost_data = lost;
pthread_mutex_unlock(&e->mutex);
}
/* set a reply port, if set, then send messages back once finished */
void e_thread_set_reply_port(EThread *e, EMsgPort *reply_port)
{
e->reply_port = reply_port;
}
/* set a received data callback */
void e_thread_set_msg_received(EThread *e, EThreadFunc received, void *data)
{
pthread_mutex_lock(&e->mutex);
e->received = received;
e->received_data = data;
pthread_mutex_unlock(&e->mutex);
}
static void
thread_destroy_msg(EThread *e, EMsg *m)
{
EThreadFunc func;
void *func_data;
/* we do this so we never get an incomplete/unmatched callback + data */
pthread_mutex_lock(&e->mutex);
func = e->destroy;
func_data = e->destroy_data;
pthread_mutex_unlock(&e->mutex);
if (func)
func(e, m, func_data);
}
static void
thread_received_msg(EThread *e, EMsg *m)
{
EThreadFunc func;
void *func_data;
/* we do this so we never get an incomplete/unmatched callback + data */
pthread_mutex_lock(&e->mutex);
func = e->received;
func_data = e->received_data;
pthread_mutex_unlock(&e->mutex);
if (func)
func(e, m, func_data);
else
g_warning("No processing callback for EThread, message unprocessed");
}
static void
thread_lost_msg(EThread *e, EMsg *m)
{
EThreadFunc func;
void *func_data;
/* we do this so we never get an incomplete/unmatched callback + data */
pthread_mutex_lock(&e->mutex);
func = e->lost;
func_data = e->lost_data;
pthread_mutex_unlock(&e->mutex);
if (func)
func(e, m, func_data);
}
/* the actual thread dispatcher */
static void *
thread_dispatch(void *din)
{
EThread *e = din;
EMsg *m;
struct _thread_info *info;
pthread_t self = pthread_self();
t(printf("dispatch thread started: %ld\n", pthread_self()));
while (1) {
pthread_mutex_lock(&e->mutex);
m = e_msgport_get(e->server_port);
if (m == NULL) {
/* nothing to do? If we are a 'new' type thread, just quit.
Otherwise, go into waiting (can be cancelled here) */
info = NULL;
switch (e->type) {
case E_THREAD_NEW:
case E_THREAD_QUEUE:
case E_THREAD_DROP:
info = thread_find(e, self);
if (info)
info->busy = FALSE;
e->waiting++;
pthread_mutex_unlock(&e->mutex);
e_msgport_wait(e->server_port);
pthread_mutex_lock(&e->mutex);
e->waiting--;
pthread_mutex_unlock(&e->mutex);
break;
#if 0
case E_THREAD_NEW:
e->id_list = g_list_remove(e->id_list, (void *)pthread_self());
pthread_mutex_unlock(&e->mutex);
return 0;
#endif
}
continue;
} else if (m->reply_port == E_THREAD_QUIT_REPLYPORT) {
t(printf("Thread %d got quit message\n", self));
/* Handle a quit message, say we're quitting, free the message, and break out of the loop */
info = thread_find(e, self);
if (info)
info->busy = 2;
pthread_mutex_unlock(&e->mutex);
g_free(m);
break;
} else {
info = thread_find(e, self);
if (info)
info->busy = TRUE;
}
pthread_mutex_unlock(&e->mutex);
t(printf("got message in dispatch thread\n"));
/* process it */
thread_received_msg(e, m);
/* if we have a reply port, send it back, otherwise, lose it */
if (m->reply_port) {
e_msgport_reply(m);
} else {
thread_destroy_msg(e, m);
}
}
return NULL;
}
/* send a message to the thread, start thread if necessary */
void e_thread_put(EThread *e, EMsg *msg)
{
pthread_t id;
EMsg *dmsg = NULL;
pthread_mutex_lock(&e->mutex);
/* the caller forgot to tell us what to do, well, we can't do anything can we */
if (e->received == NULL) {
pthread_mutex_unlock(&e->mutex);
g_warning("EThread called with no receiver function, no work to do!");
thread_destroy_msg(e, msg);
return;
}
msg->reply_port = e->reply_port;
switch(e->type) {
case E_THREAD_QUEUE:
/* if the queue is full, lose this new addition */
if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
e_msgport_put(e->server_port, msg);
} else {
printf("queue limit reached, dropping new message\n");
dmsg = msg;
}
break;
case E_THREAD_DROP:
/* if the queue is full, lose the oldest (unprocessed) message */
if (e_dlist_length(&e->server_port->queue) < e->queue_limit) {
e_msgport_put(e->server_port, msg);
} else {
printf("queue limit reached, dropping old message\n");
e_msgport_put(e->server_port, msg);
dmsg = e_msgport_get(e->server_port);
}
break;
case E_THREAD_NEW:
/* it is possible that an existing thread can catch this message, so
we might create a thread with no work to do.
but that doesn't matter, the other alternative that it be lost is worse */
e_msgport_put(e->server_port, msg);
if (e->waiting == 0
&& g_list_length(e->id_list) < e->queue_limit
&& pthread_create(&id, NULL, thread_dispatch, e) == 0) {
struct _thread_info *info = g_malloc0(sizeof(*info));
t(printf("created NEW thread %ld\n", id));
info->id = id;
info->busy = TRUE;
e->id_list = g_list_append(e->id_list, info);
}
pthread_mutex_unlock(&e->mutex);
return;
}
/* create the thread, if there is none to receive it yet */
if (e->id == E_THREAD_NONE) {
if (pthread_create(&e->id, NULL, thread_dispatch, e) == -1) {
g_warning("Could not create dispatcher thread, message queued?: %s", strerror(errno));
e->id = E_THREAD_NONE;
}
}
pthread_mutex_unlock(&e->mutex);
if (dmsg) {
thread_lost_msg(e, dmsg);
thread_destroy_msg(e, dmsg);
}
}
/* yet-another-mutex interface */
struct _EMutex {
int type;
pthread_t owner;
short waiters;
short depth;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
/* sigh, this is just painful to have to need, but recursive
read/write, etc mutexes just aren't very common in thread
implementations */
/* TODO: Just make it use recursive mutexes if they are available */
EMutex *e_mutex_new(e_mutex_t type)
{
struct _EMutex *m;
m = g_malloc(sizeof(*m));
m->type = type;
m->waiters = 0;
m->depth = 0;
m->owner = E_THREAD_NONE;
switch (type) {
case E_MUTEX_SIMPLE:
pthread_mutex_init(&m->mutex, 0);
break;
case E_MUTEX_REC:
pthread_mutex_init(&m->mutex, 0);
pthread_cond_init(&m->cond, 0);
break;
/* read / write ? flags for same? */
}
return m;
}
int e_mutex_destroy(EMutex *m)
{
int ret = 0;
switch (m->type) {
case E_MUTEX_SIMPLE:
ret = pthread_mutex_destroy(&m->mutex);
if (ret == -1)
g_warning("EMutex destroy failed: %s", strerror(errno));
g_free(m);
break;
case E_MUTEX_REC:
ret = pthread_mutex_destroy(&m->mutex);
if (ret == -1)
g_warning("EMutex destroy failed: %s", strerror(errno));
ret = pthread_cond_destroy(&m->cond);
if (ret == -1)
g_warning("EMutex destroy failed: %s", strerror(errno));
g_free(m);
}
return ret;
}
int e_mutex_lock(EMutex *m)
{
pthread_t id;
switch (m->type) {
case E_MUTEX_SIMPLE:
return pthread_mutex_lock(&m->mutex);
case E_MUTEX_REC:
id = pthread_self();
if (pthread_mutex_lock(&m->mutex) == -1)
return -1;
while (1) {
if (m->owner == E_THREAD_NONE) {
m->owner = id;
m->depth = 1;
break;
} else if (id == m->owner) {
m->depth++;
break;
} else {
m->waiters++;
if (pthread_cond_wait(&m->cond, &m->mutex) == -1)
return -1;
m->waiters--;
}
}
return pthread_mutex_unlock(&m->mutex);
}
errno = EINVAL;
return -1;
}
int e_mutex_unlock(EMutex *m)
{
switch (m->type) {
case E_MUTEX_SIMPLE:
return pthread_mutex_unlock(&m->mutex);
case E_MUTEX_REC:
if (pthread_mutex_lock(&m->mutex) == -1)
return -1;
g_assert(m->owner == pthread_self());
m->depth--;
if (m->depth == 0) {
m->owner = E_THREAD_NONE;
if (m->waiters > 0)
pthread_cond_signal(&m->cond);
}
return pthread_mutex_unlock(&m->mutex);
}
errno = EINVAL;
return -1;
}
void e_mutex_assert_locked(EMutex *m)
{
g_return_if_fail (m->type == E_MUTEX_REC);
pthread_mutex_lock(&m->mutex);
g_assert(m->owner == pthread_self());
pthread_mutex_unlock(&m->mutex);
}
#ifdef STANDALONE
EMsgPort *server_port;
void *fdserver(void *data)
{
int fd;
EMsg *msg;
int id = (int)data;
fd_set rfds;
fd = e_msgport_fd(server_port);
while (1) {
int count = 0;
printf("server %d: waiting on fd %d\n", id, fd);
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
select(fd+1, &rfds, NULL, NULL, NULL);
printf("server %d: Got async notification, checking for messages\n", id);
while ((msg = e_msgport_get(server_port))) {
printf("server %d: got message\n", id);
sleep(1);
printf("server %d: replying\n", id);
e_msgport_reply(msg);
count++;
}
printf("server %d: got %d messages\n", id, count);
}
}
void *server(void *data)
{
EMsg *msg;
int id = (int)data;
while (1) {
printf("server %d: waiting\n", id);
msg = e_msgport_wait(server_port);
msg = e_msgport_get(server_port);
if (msg) {
printf("server %d: got message\n", id);
sleep(1);
printf("server %d: replying\n", id);
e_msgport_reply(msg);
} else {
printf("server %d: didn't get message\n", id);
}
}
}
void *client(void *data)
{
EMsg *msg;
EMsgPort *replyport;
int i;
replyport = e_msgport_new();
msg = g_malloc0(sizeof(*msg));
msg->reply_port = replyport;
for (i=0;i<10;i++) {
/* synchronous operation */
printf("client: sending\n");
e_msgport_put(server_port, msg);
printf("client: waiting for reply\n");
e_msgport_wait(replyport);
e_msgport_get(replyport);
printf("client: got reply\n");
}
printf("client: sleeping ...\n");
sleep(2);
printf("client: sending multiple\n");
for (i=0;i<10;i++) {
msg = g_malloc0(sizeof(*msg));
msg->reply_port = replyport;
e_msgport_put(server_port, msg);
}
printf("client: receiving multiple\n");
for (i=0;i<10;i++) {
e_msgport_wait(replyport);
msg = e_msgport_get(replyport);
g_free(msg);
}
printf("client: done\n");
}
int main(int argc, char **argv)
{
pthread_t serverid, clientid;
g_thread_init(NULL);
server_port = e_msgport_new();
/*pthread_create(&serverid, NULL, server, (void *)1);*/
pthread_create(&serverid, NULL, fdserver, (void *)1);
pthread_create(&clientid, NULL, client, NULL);
sleep(60);
return 0;
}
#endif