A cancellable connection routine. (remote_send_string): Return cancelled

2001-01-15  Not Zed  <NotZed@Ximian.com>

        * camel-remote-store.c (socket_connect): A cancellable connection
        routine.
        (remote_send_string): Return cancelled exception if we were.
        (remote_send_stream): "
        (remote_recv_line): "

        * camel-stream-fs.c (stream_read): First cut at cancellation
        stuff.  Its looking a bit ugly.

svn path=/trunk/; revision=7496
This commit is contained in:
Not Zed
2001-01-15 07:55:30 +00:00
committed by Michael Zucci
parent cff9059380
commit b64ffc9183
5 changed files with 434 additions and 24 deletions

View File

@ -1,3 +1,14 @@
2001-01-15 Not Zed <NotZed@Ximian.com>
* camel-remote-store.c (socket_connect): A cancellable connection
routine.
(remote_send_string): Return cancelled exception if we were.
(remote_send_stream): "
(remote_recv_line): "
* camel-stream-fs.c (stream_read): First cut at cancellation
stuff. Its looking a bit ugly.
2001-01-15 Jeffrey Stedfast <fejj@ximian.com>
* camel-tcp-stream-ssl.c (stream_connect): Uses an SSL socket now

View File

@ -25,6 +25,7 @@
#include <config.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
@ -205,6 +206,87 @@ timeout_cb (gpointer data)
return TRUE;
}
/* this is a 'cancellable' connect, cancellable from camel_cancel etc */
/* returns -1 & errno == EINTR if the connection was cancelled */
static int socket_connect(struct hostent *h, int port)
{
struct sockaddr_in sin;
int fd;
int ret;
socklen_t len;
struct timeval tv;
int cancel_fd;
/* see if we're cancelled yet */
if (camel_cancel_check(NULL)) {
errno = EINTR;
return -1;
}
/* setup connect, we do it using a nonblocking socket so we can poll it */
sin.sin_port = htons(port);
sin.sin_family = h->h_addrtype;
memcpy (&sin.sin_addr, h->h_addr, sizeof (sin.sin_addr));
fd = socket (h->h_addrtype, SOCK_STREAM, 0);
cancel_fd = camel_cancel_fd(NULL);
if (cancel_fd == -1) {
ret = connect(fd, (struct sockaddr *)&sin, sizeof (sin));
if (ret == -1) {
close(fd);
return -1;
}
return fd;
} else {
fd_set rdset, wrset;
long flags;
fcntl(fd, F_GETFL, &flags);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
ret = connect(fd, (struct sockaddr *)&sin, sizeof (sin));
if (ret == 0)
return fd;
if (errno != EINPROGRESS) {
close(fd);
return -1;
}
FD_ZERO(&rdset);
FD_ZERO(&wrset);
FD_SET(fd, &wrset);
FD_SET(cancel_fd, &rdset);
tv.tv_usec = 0;
tv.tv_sec = 30;
if (select((fd+cancel_fd)/2+1, &rdset, &wrset, 0, &tv) == 0) {
close(fd);
errno = ETIMEDOUT;
return -1;
}
if (cancel_fd != -1 && FD_ISSET(cancel_fd, &rdset)) {
close(fd);
errno = EINTR;
return -1;
} else {
len = sizeof(int);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &ret, &len) == -1) {
close(fd);
return -1;
}
if (ret != 0) {
close(fd);
errno = ret;
return -1;
}
}
fcntl(fd, F_SETFL, flags);
}
return fd;
}
static gboolean
remote_connect (CamelService *service, CamelException *ex)
{
@ -225,6 +307,20 @@ remote_connect (CamelService *service, CamelException *ex)
port = service->url->port;
else
port = store->default_port;
#if 1
fd = socket_connect(h, port);
if (fd == -1) {
if (errno == EINTR)
camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Connection cancelled"));
else
camel_exception_setv (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE,
_("Could not connect to %s (port %d): %s"),
service->url->host ? service->url->host : _("(unknown host)"),
port, strerror (errno));
return FALSE;
}
#else
sin.sin_port = htons (port);
memcpy (&sin.sin_addr, h->h_addr, sizeof (sin.sin_addr));
@ -240,6 +336,7 @@ remote_connect (CamelService *service, CamelException *ex)
return FALSE;
}
#endif
/* parent class connect initialization */
if (CAMEL_SERVICE_CLASS (store_class)->connect (service, ex) == FALSE)
@ -322,9 +419,11 @@ remote_send_string (CamelRemoteStore *store, CamelException *ex, char *fmt, va_l
#endif
if (camel_stream_printf (store->ostream, "%s", cmdbuf) == -1) {
if (errno == EINTR)
camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled"));
else
camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno));
g_free (cmdbuf);
camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE,
g_strerror (errno));
camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL);
return -1;
@ -381,9 +480,11 @@ remote_send_stream (CamelRemoteStore *store, CamelStream *stream, CamelException
d(fprintf (stderr, "(sending stream)\n"));
ret = camel_stream_write_to_stream (stream, store->ostream);
if (ret < 0) {
camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE,
g_strerror (errno));
if (ret == -1) {
if (errno == EINTR)
camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled"));
else
camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno));
camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL);
}
@ -446,23 +547,22 @@ remote_recv_line (CamelRemoteStore *store, char **dest, CamelException *ex)
if (nread > 0)
g_byte_array_append (bytes, buf, nread);
} while (nread == sizeof (buf) - 1);
if (nread == -1) {
if (errno == EINTR)
camel_exception_set(ex, CAMEL_EXCEPTION_USER_CANCEL, _("Operation cancelled"));
else
camel_exception_set(ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE, strerror(errno));
g_byte_array_free(bytes, TRUE);
camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL);
return -1;
}
g_byte_array_append (bytes, "", 1);
ret = bytes->data;
nread = bytes->len - 1;
g_byte_array_free (bytes, FALSE);
if (nread <= 0) {
g_free (ret);
ret = NULL;
camel_exception_set (ex, CAMEL_EXCEPTION_SERVICE_UNAVAILABLE,
nread ? g_strerror (errno) :
_("Server disconnected."));
camel_service_disconnect (CAMEL_SERVICE (store), FALSE, NULL);
return -1;
}
/* strip off the CRLF sequence */
while (nread > 0 && ret[nread] != '\r')
ret[nread--] = '\0';

View File

@ -482,3 +482,229 @@ camel_session_remove_timeout (CamelSession *session, guint handle)
{
return session->remover (handle);
}
/* ********************************************************************** */
struct _CamelCancel {
pthread_t id; /* id of running thread */
guint32 flags; /* cancelled ? */
int blocked; /* cancellation blocked depth */
int refcount;
#ifdef ENABLE_THREADS
EMsgPort *cancel_port;
int cancel_fd;
pthread_mutex_t lock;
#endif
};
#define CAMEL_CANCEL_CANCELLED (1<<0)
#ifdef ENABLE_THREADS
#define CAMEL_CANCEL_LOCK(cc) pthread_mutex_lock(&cc->lock)
#define CAMEL_CANCEL_UNLOCK(cc) pthread_mutex_lock(&cc->lock)
#define CAMEL_ACTIVE_LOCK() pthread_mutex_lock(&cancel_active_lock)
#define CAMEL_ACTIVE_UNLOCK() pthread_mutex_lock(&cancel_active_lock)
static pthread_mutex_t cancel_active_lock = PTHREAD_MUTEX_INITIALIZER;
#else
#define CAMEL_CANCEL_LOCK(cc)
#define CAMEL_CANCEL_UNLOCK(cc)
#define CAMEL_ACTIVE_LOCK()
#define CAMEL_ACTIVE_UNLOCK()
#endif
static GHashTable *cancel_active;
typedef struct _CamelCancelMsg {
EMsg msg;
} CamelCancelMsg ;
/* creates a new cancel handle */
CamelCancel *camel_cancel_new(void)
{
CamelCancel *cc;
cc = g_malloc0(sizeof(*cc));
cc->flags = 0;
cc->blocked = 0;
cc->refcount = 1;
#ifdef ENABLE_THREADS
cc->id = ~0;
cc->cancel_port = e_msgport_new();
cc->cancel_fd = e_msgport_fd(cc->cancel_port);
pthread_mutex_init(&cc->lock, NULL);
#endif
return cc;
}
void camel_cancel_reset(CamelCancel *cc)
{
#ifdef ENABLE_THREADS
CamelCancelMsg *msg;
while ((msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port)))
g_free(msg);
#endif
cc->flags = 0;
cc->blocked = 0;
}
void camel_cancel_ref(CamelCancel *cc)
{
CAMEL_CANCEL_LOCK(cc);
cc->refcount++;
CAMEL_CANCEL_UNLOCK(cc);
}
void camel_cancel_unref(CamelCancel *cc)
{
#ifdef ENABLE_THREADS
CamelCancelMsg *msg;
if (cc->refcount == 1) {
while ((msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port)))
g_free(msg);
e_msgport_destroy(cc->cancel_port);
#endif
g_free(cc);
} else {
CAMEL_CANCEL_LOCK(cc);
cc->refcount--;
CAMEL_CANCEL_UNLOCK(cc);
}
}
/* block cancellation */
void camel_cancel_block(CamelCancel *cc)
{
CAMEL_CANCEL_LOCK(cc);
cc->blocked++;
CAMEL_CANCEL_UNLOCK(cc);
}
/* unblock cancellation */
void camel_cancel_unblock(CamelCancel *cc)
{
CAMEL_CANCEL_LOCK(cc);
cc->blocked--;
CAMEL_CANCEL_UNLOCK(cc);
}
/* cancels an operation */
void camel_cancel_cancel(CamelCancel *cc)
{
CamelCancelMsg *msg;
if ((cc->flags & CAMEL_CANCEL_CANCELLED) == 0) {
CAMEL_CANCEL_LOCK(cc);
msg = g_malloc0(sizeof(*msg));
e_msgport_put(cc->cancel_port, (EMsg *)msg);
cc->flags |= CAMEL_CANCEL_CANCELLED;
CAMEL_CANCEL_UNLOCK(cc);
}
}
/* register a thread for cancellation */
void camel_cancel_register(CamelCancel *cc)
{
pthread_t id = pthread_self();
CAMEL_ACTIVE_LOCK();
if (cancel_active == NULL)
cancel_active = g_hash_table_new(NULL, NULL);
if (cc == NULL) {
cc = g_hash_table_lookup(cancel_active, (void *)id);
if (cc == NULL) {
cc = camel_cancel_new();
}
}
cc->id = id;
g_hash_table_insert(cancel_active, (void *)id, cc);
camel_cancel_ref(cc);
CAMEL_ACTIVE_UNLOCK();
}
/* remove a thread from being able to be cancelled */
void camel_cancel_unregister(CamelCancel *cc)
{
CAMEL_ACTIVE_LOCK();
if (cancel_active == NULL)
cancel_active = g_hash_table_new(NULL, NULL);
if (cc == NULL) {
cc = g_hash_table_lookup(cancel_active, (void *)cc->id);
if (cc == NULL) {
g_warning("Trying to unregister a thread that was never registered for cancellation");
}
}
if (cc)
g_hash_table_remove(cancel_active, (void *)cc->id);
CAMEL_ACTIVE_UNLOCK();
if (cc)
camel_cancel_unref(cc);
}
/* test for cancellation */
gboolean camel_cancel_check(CamelCancel *cc)
{
CamelCancelMsg *msg;
if (cc == NULL) {
if (cancel_active) {
CAMEL_ACTIVE_LOCK();
cc = g_hash_table_lookup(cancel_active, (void *)pthread_self());
CAMEL_ACTIVE_UNLOCK();
}
if (cc == NULL)
return FALSE;
}
if (cc->blocked > 0)
return FALSE;
if (cc->flags & CAMEL_CANCEL_CANCELLED)
return TRUE;
msg = (CamelCancelMsg *)e_msgport_get(cc->cancel_port);
if (msg) {
CAMEL_CANCEL_LOCK(cc);
cc->flags |= CAMEL_CANCEL_CANCELLED;
CAMEL_CANCEL_UNLOCK(cc);
return TRUE;
}
return FALSE;
}
/* get the fd for cancellation waiting */
int camel_cancel_fd(CamelCancel *cc)
{
if (cc == NULL) {
if (cancel_active) {
CAMEL_ACTIVE_LOCK();
cc = g_hash_table_lookup(cancel_active, (void *)pthread_self());
CAMEL_ACTIVE_UNLOCK();
}
if (cc == NULL)
return -1;
}
if (cc->blocked)
return -1;
return cc->cancel_fd;
}

View File

@ -126,6 +126,24 @@ guint camel_session_register_timeout (CamelSession *session,
gboolean camel_session_remove_timeout (CamelSession *session,
guint handle);
/* cancellation helper stuff, not yet finalised */
typedef struct _CamelCancel CamelCancel;
/* main thread functions */
CamelCancel *camel_cancel_new(void);
void camel_cancel_ref(CamelCancel *cc);
void camel_cancel_unref(CamelCancel *cc);
void camel_cancel_reset(CamelCancel *cc);
void camel_cancel_cancel(CamelCancel *cc);
/* subthread functions */
void camel_cancel_register(CamelCancel *cc);
void camel_cancel_unregister(CamelCancel *cc);
/* called internally by camel, for the current thread */
void camel_cancel_block(CamelCancel *cc);
void camel_cancel_unblock(CamelCancel *cc);
gboolean camel_cancel_check(CamelCancel *cc);
int camel_cancel_fd(CamelCancel *cc);
#ifdef __cplusplus
}
#endif /* __cplusplus */

View File

@ -209,13 +209,39 @@ stream_read (CamelStream *stream, char *buffer, size_t n)
CamelStreamFs *stream_fs = CAMEL_STREAM_FS (stream);
CamelSeekableStream *seekable = CAMEL_SEEKABLE_STREAM (stream);
ssize_t nread;
int cancel_fd;
if (camel_cancel_check(NULL)) {
errno = EINTR;
return -1;
}
if (seekable->bound_end != CAMEL_STREAM_UNBOUND)
n = MIN (seekable->bound_end - seekable->position, n);
do {
nread = read (stream_fs->fd, buffer, n);
} while (nread == -1 && errno == EINTR);
cancel_fd = camel_cancel_fd(NULL);
if (cancel_fd == -1) {
do {
nread = read (stream_fs->fd, buffer, n);
} while (nread == -1 && errno == EINTR);
} else {
fd_set rdset;
long flags;
fcntl(stream_fs->fd, F_GETFL, &flags);
fcntl(stream_fs->fd, F_SETFL, flags | O_NONBLOCK);
FD_ZERO(&rdset);
FD_SET(stream_fs->fd, &rdset);
FD_SET(cancel_fd, &rdset);
select((stream_fs->fd+cancel_fd)/2+1, &rdset, 0, 0, NULL);
if (FD_ISSET(cancel_fd, &rdset)) {
fcntl(stream_fs->fd, F_SETFL, flags);
errno = EINTR;
return -1;
}
nread = read(stream_fs->fd, buffer, n);
fcntl(stream_fs->fd, F_SETFL, flags);
}
if (nread > 0)
seekable->position += nread;
@ -231,15 +257,44 @@ stream_write (CamelStream *stream, const char *buffer, size_t n)
CamelStreamFs *stream_fs = CAMEL_STREAM_FS (stream);
CamelSeekableStream *seekable = CAMEL_SEEKABLE_STREAM (stream);
ssize_t v, written = 0;
int cancel_fd;
if (camel_cancel_check(NULL)) {
errno = EINTR;
return -1;
}
if (seekable->bound_end != CAMEL_STREAM_UNBOUND)
n = MIN (seekable->bound_end - seekable->position, n);
do {
v = write (stream_fs->fd, buffer, n);
if (v > 0)
cancel_fd = camel_cancel_fd(NULL);
if (cancel_fd == -1) {
do {
v = write (stream_fs->fd, buffer, n);
if (v > 0)
written += v;
} while (v == -1 && errno == EINTR);
} else {
fd_set rdset, wrset;
long flags;
fcntl(stream_fs->fd, F_GETFL, &flags);
fcntl(stream_fs->fd, F_SETFL, flags | O_NONBLOCK);
FD_ZERO(&rdset);
FD_ZERO(&wrset);
FD_SET(stream_fs->fd, &wrset);
FD_SET(cancel_fd, &rdset);
select((stream_fs->fd+cancel_fd)/2+1, &rdset, &wrset, 0, NULL);
if (FD_ISSET(cancel_fd, &rdset)) {
fcntl(stream_fs->fd, F_SETFL, flags);
errno = EINTR;
return -1;
}
v = write(stream_fs->fd, buffer, n);
if (v>0)
written += v;
} while (v == -1 && errno == EINTR);
fcntl(stream_fs->fd, F_SETFL, flags);
}
if (written > 0)
seekable->position += written;