[broadway] Stream data over websocket

The zlib compressed xmlhttprequest thing was a nice hack, but it doesn't
really work in production. Its not portable, doesn't have enought API
(missing notification for closed sockets) and having to synchronize
between two different connections in a reliable way is a pain.

So, we're going everything over the websocket. This is a pure switch,
but after this we want to modify the protocol to work better over
the uncompressed utf8 transport of websockets.
This commit is contained in:
Alexander Larsson 2011-04-18 19:52:05 +02:00
parent 2cb9ce954e
commit 0abd5e2767
4 changed files with 34 additions and 132 deletions

View File

@ -4,10 +4,6 @@
#include <assert.h>
#include <errno.h>
#include <zlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include "broadway.h"
@ -450,126 +446,50 @@ to_png_a (int w, int h, int byte_stride, guint8 *data)
************************************************************************/
struct BroadwayOutput {
int fd;
gzFile *zfd;
GOutputStream *out;
int error;
guint32 serial;
};
static void
broadway_output_write_raw (BroadwayOutput *output,
const void *buf, gsize count)
broadway_output_write_header (BroadwayOutput *output)
{
gssize res;
int errsave;
const char *ptr = (const char *)buf;
if (output->error)
return;
while (count > 0)
{
res = write(output->fd, ptr, count);
if (res == -1)
{
errsave = errno;
if (errsave == EINTR)
continue;
output->error = TRUE;
return;
}
if (res == 0)
{
output->error = TRUE;
return;
}
count -= res;
ptr += res;
}
g_output_stream_write (output->out, "\0", 1, NULL, NULL);
}
static void
broadway_output_write (BroadwayOutput *output,
const void *buf, gsize count)
{
gssize res;
const char *ptr = (const char *)buf;
if (output->error)
return;
while (count > 0)
{
res = gzwrite(output->zfd, ptr, count);
if (res == -1)
{
output->error = TRUE;
return;
}
if (res == 0)
{
output->error = TRUE;
return;
}
count -= res;
ptr += res;
}
}
static void
broadway_output_write_header (BroadwayOutput *output)
{
char *header;
header =
"HTTP/1.1 200 OK\r\n"
"Content-type: multipart/x-mixed-replace;boundary=x\r\n"
"Content-Encoding: gzip\r\n"
"\r\n";
broadway_output_write_raw (output,
header, strlen (header));
g_output_stream_write_all (output->out, buf, count, NULL, NULL, NULL);
}
static void
send_boundary (BroadwayOutput *output)
{
char *boundary =
"--x\r\n"
"\r\n";
broadway_output_write (output, boundary, strlen (boundary));
broadway_output_write (output, "\xff", 1);
broadway_output_write (output, "\0", 1);
}
BroadwayOutput *
broadway_output_new(int fd, guint32 serial)
broadway_output_new(GOutputStream *out, guint32 serial)
{
BroadwayOutput *output;
int flag = 1;
output = g_new0 (BroadwayOutput, 1);
output->fd = fd;
output->out = g_object_ref (out);
output->serial = serial;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
broadway_output_write_header (output);
output->zfd = gzdopen(fd, "wb");
/* Need an initial multipart boundary */
send_boundary (output);
return output;
}
void
broadway_output_free (BroadwayOutput *output)
{
if (output->zfd)
gzclose (output->zfd);
else
close (output->fd);
g_object_unref (output->out);
free (output);
}
@ -583,7 +503,6 @@ int
broadway_output_flush (BroadwayOutput *output)
{
send_boundary (output);
gzflush (output->zfd, Z_SYNC_FLUSH);
return !output->error;
}

View File

@ -1,4 +1,5 @@
#include <glib.h>
#include <gio/gio.h>
typedef struct BroadwayOutput BroadwayOutput;
@ -7,7 +8,7 @@ typedef struct {
int width, height;
} BroadwayRect;
BroadwayOutput *broadway_output_new (int fd,
BroadwayOutput *broadway_output_new (GOutputStream *out,
guint32 serial);
void broadway_output_free (BroadwayOutput *output);
int broadway_output_flush (BroadwayOutput *output);

View File

@ -888,10 +888,10 @@ function handleOutstanding()
}
}
function handleLoad(event)
function handleMessage(message)
{
var cmdObj = {};
cmdObj.data = event.target.responseText;
cmdObj.data = message;
cmdObj.pos = 0;
outstandingCommands.push(cmdObj);
@ -2768,22 +2768,10 @@ function connect()
if (params[0].indexOf("toplevel") != -1)
useToplevelWindows = true;
}
var xhr = createXHR();
if (xhr) {
if (typeof xhr.multipart == 'undefined') {
alert("Sorry, this example only works in browsers that support multipart.");
return;
}
xhr.multipart = true;
xhr.open("GET", "/output", true);
xhr.onload = handleLoad;
xhr.send(null);
}
if ("WebSocket" in window) {
var loc = window.location.toString().replace("http:", "ws:");
loc = loc.substr(0, loc.lastIndexOf('/')) + "/input";
loc = loc.substr(0, loc.lastIndexOf('/')) + "/socket";
var ws = new WebSocket(loc, "broadway");
ws.onopen = function() {
inputSocket = ws;
@ -2806,6 +2794,9 @@ function connect()
ws.onclose = function() {
inputSocket = null;
};
ws.onmessage = function(event) {
handleMessage(event.data);
};
} else {
alert("WebSocket not supported, input will not work!");
}

View File

@ -40,6 +40,10 @@
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
static void gdk_broadway_display_dispose (GObject *object);
static void gdk_broadway_display_finalize (GObject *object);
@ -124,6 +128,8 @@ typedef struct HttpRequest {
GString *request;
} HttpRequest;
static void start_output (HttpRequest *request);
static void
http_request_free (HttpRequest *request)
{
@ -495,21 +501,6 @@ _gdk_broadway_display_block_for_input (GdkDisplay *display, char op,
}
}
#include <unistd.h>
#include <fcntl.h>
static void
set_fd_blocking (int fd)
{
glong arg;
if ((arg = fcntl (fd, F_GETFL, NULL)) < 0)
arg = 0;
arg = arg & ~O_NONBLOCK;
fcntl (fd, F_SETFL, arg);
}
static char *
parse_line (char *line, char *key)
{
@ -657,7 +648,7 @@ start_input (HttpRequest *request)
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Origin: %s\r\n"
"Sec-WebSocket-Location: ws://%s/input\r\n"
"Sec-WebSocket-Location: ws://%s/socket\r\n"
"Sec-WebSocket-Protocol: broadway\r\n"
"\r\n",
origin, host);
@ -679,6 +670,8 @@ start_input (HttpRequest *request)
broadway_display->input = input;
start_output (request);
/* This will free and close the data input stream, but we got all the buffered content already */
http_request_free (request);
@ -699,14 +692,13 @@ start_output (HttpRequest *request)
{
GSocket *socket;
GdkBroadwayDisplay *broadway_display;
int fd;
int flag = 1;
socket = g_socket_connection_get_socket (request->connection);
setsockopt(g_socket_get_fd (socket), IPPROTO_TCP,
TCP_NODELAY, (char *) &flag, sizeof(int));
broadway_display = GDK_BROADWAY_DISPLAY (request->display);
fd = g_socket_get_fd (socket);
set_fd_blocking (fd);
/* We dup this because otherwise it'll be closed with the request SocketConnection */
if (broadway_display->output)
{
@ -714,15 +706,16 @@ start_output (HttpRequest *request)
broadway_output_free (broadway_display->output);
}
broadway_display->output = broadway_output_new (dup(fd), broadway_display->saved_serial);
broadway_display->output =
broadway_output_new (g_io_stream_get_output_stream (G_IO_STREAM (request->connection)),
broadway_display->saved_serial);
_gdk_broadway_resync_windows ();
if (broadway_display->pointer_grab_window)
broadway_output_grab_pointer (broadway_display->output,
GDK_WINDOW_IMPL_BROADWAY (broadway_display->pointer_grab_window->impl)->id,
broadway_display->pointer_grab_owner_events);
http_request_free (request);
}
static void
@ -787,9 +780,7 @@ got_request (HttpRequest *request)
send_data (request, "text/html", client_html, G_N_ELEMENTS(client_html) - 1);
else if (strcmp (escaped, "/broadway.js") == 0)
send_data (request, "text/javascript", broadway_js, G_N_ELEMENTS(broadway_js) - 1);
else if (strcmp (escaped, "/output") == 0)
start_output (request);
else if (strcmp (escaped, "/input") == 0)
else if (strcmp (escaped, "/socket") == 0)
start_input (request);
else
send_error (request, 404, "File not found");