Commit c8147595 authored by Yoann Pigné's avatar Yoann Pigné

extracted transport layer from receiver

parent fe89f3b0
......@@ -12,6 +12,27 @@
global.netstream = {};
}
global.netstream.utf16to8 = function (str) {
var out, i, len, c;
out = "";
len = str.length;
for(i = 0; i < len; i++) {
c = str.charCodeAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
out += str.charAt(i);
} else if (c > 0x07FF) {
out += String.fromCharCode(0xE0 | ((c >> 12) & 0x0F));
out += String.fromCharCode(0x80 | ((c >> 6) & 0x3F));
out += String.fromCharCode(0x80 | ((c >> 0) & 0x3F));
} else {
out += String.fromCharCode(0xC0 | ((c >> 6) & 0x1F));
out += String.fromCharCode(0x80 | ((c >> 0) & 0x3F));
}
}
return out;
};
// Copyright (c) 2008 notmasteryet
global.netstream.TypedArrayReader = function(t_array) {
......@@ -227,7 +248,12 @@
},
encode: function(arrayBuffer) {
var base64 = '';
if('slice' in arrayBuffer){
var bytes = new Uint8Array(arrayBuffer);
} else {
var bytes = arrayBuffer;
}
var byteLength = bytes.byteLength;
var byteRemainder = byteLength % 3;
var mainLength = byteLength - byteRemainder;
......
......@@ -4,9 +4,9 @@
global.netstream = {};
}
global.netstream.DefaultGSSink = function () {
global.netstream.DefaultSink = function () {
};
global.netstream.DefaultGSSink.prototype = {
global.netstream.DefaultSink.prototype = {
nodeAdded: function () {},
nodeRemoved: function () {},
nodeAttributeAdded: function () {},
......@@ -24,11 +24,11 @@
stepBegins: function () {}
};
global.netstream.DOMGSSink = function (element) {
global.netstream.DOMSink = function (element) {
this.element = element;
};
global.netstream.DOMGSSink.prototype = {
global.netstream.DOMSink.prototype = {
element: "",
appendMsg: function (msg) {
var textNode = global.document.createTextNode(msg);
......@@ -85,9 +85,9 @@
global.netstream.LoggerGSSink = function () {
global.netstream.LoggerSink = function () {
};
global.netstream.LoggerGSSink.prototype = {
global.netstream.LoggerSink.prototype = {
nodeAdded: function (sourceId, timeId, nodeId) {
netstream.LOGGER("(GSSink, " + sourceId + ":" + timeId + ") : an \"" + nodeId + "\"");
},
......@@ -138,106 +138,42 @@
global.netstream.Receiver = function (scheme, server_ip, server_port, server_ws_path, gs_sink) {
// --------- constants
this.BUFFER_INITIAL_SIZE = 4096;
global.netstream.Receiver = function (options) {
// --------- parameters
this.SCHEME = scheme || "ws";
this.SERVER_IP = server_ip || global.location.hostname;
this.SERVER_PORT = server_port || 2003;
this.SERVER_WS_PATH = server_ws_path || "/gs/stream";
this.gs_sink = gs_sink || new netstream.DefaultGSSink();
this.sink = null;
this.transport = null;
this.debug = false;
this.stream = "Default";
// wrapping this
var gs = this;
// ----------- The WebSocket ------------
// Creation / initialization
if (typeof(WebSocket) == "undefined") {
this.socket = new MozWebSocket(this.SCHEME + "://" + this.SERVER_IP + ":" + this.SERVER_PORT + this.SERVER_WS_PATH);
// --------- init params
for(var prop in options) {
if(options.hasOwnProperty(prop) && this.hasOwnProperty(prop)) {
this[prop] = options[prop];
}
else {
this.socket = new WebSocket(this.SCHEME + "://" + this.SERVER_IP + ":" + this.SERVER_PORT + this.SERVER_WS_PATH);
}
// events
this.refreshIntervalId = 0;
this.socket.onopen = function () {
if (this.debug) {
netstream.LOGGER("(gs_transport): conection opened");
}
gs.setHeartbeating();
gs.onopen();
};
this.socket.onerror = function (e) {
netstream.LOGGER("(gs_transport): ERROR on the WebSocket");
};
this.socket.onmessage = function (e) {
if (e.data != "Heartbeat") {
if (gs.debug === true) {
netstream.LOGGER("(gs_transport): < Received a " + e.data.length + " bytes long data chunk.");
}
gs.handleMsg(e.data);
}
};
// ---------- Input buffer ------------
this.buffer = new ArrayBuffer(this.BUFFER_INITIAL_SIZE);
// wrapping this
var gs = this;
this.end = -1;
this.beg = 0;
this.pos = 0;
}
this.sink = this.sink || new netstream.DefaultGSSink();
global.netstream.Receiver.prototype = {
// link with the transport layer
this.transport = this.transport || new netstream.Transport(options);
// Small periodical send/receive of packet to keap the socket open.
setHeartbeating: function () {
var gs = this;
this.refreshIntervalId = setInterval(function (e) {
if (gs.socket.readyState == gs.socket.CLOSED) {
clearInterval(gs.refreshIntervalId);
}
else {
if (gs.debug) {
netstream.LOGGER("> Sending Hearbeat");
}
gs.socket.send("Heartbeat");
}
},
1000);
},
this.transport.onevent = function (e){
gs.receiveEvent(e);
};
this.transport.connect();
// Extra things to do as soon as the connection has established
onopen: function () {
if (this.debug) {
netstream.LOGGER("(gs_transport.onopen)");
}
},
// The size of an int as it is incoded (including the base64 packaging)
sizeOfInt: 8,
// Read "sizeOfInt" bytes from the buffer at the current position,
// unpack from base64 and this.read as a 4 bytes int.
unpackMessageSize: function (pos) {
//netstream.LOGGER("(GSTransport.unpackMessageSize): pos="+pos);
var arr = new Uint8Array(this.buffer, pos, 8);
//netstream.LOGGER(arr[0]+","+arr[1]+","+arr[2]+","+arr[3]+","+arr[4]+","+arr[5]+","+arr[6]+","+arr[7]);
var view = new DataView(netstream.base64.decode(arr).buffer);
//VIEW = view;
var size = view.getInt32(0);
//netstream.LOGGER("msg size = "+size);
return size;
},
global.netstream.Receiver.prototype = {
decodeMessage: function () {
receiveEvent: function (msg) {
//netstream.LOGGER(this.beg + " " + this.end);
var msg = new DataView(netstream.base64.decode(new Uint8Array(this.buffer, this.beg, this.end - this.beg)).buffer);
////////var msg = new DataView(netstream.base64.decode(new Uint8Array(this.buffer, this.beg, this.end - this.beg)).buffer);
var msgIndex = 0;
// get the Stream name
......@@ -245,18 +181,11 @@
var stream = res.data;
msgIndex = res.index;
if (this.debug) {
netstream.LOGGER("(GSTransport): Stream " + stream + " is addressed in this message");
// var m=[]
// ,arr = new Uint8Array(msg.buffer);
// for(var i =0; i< arr.length; i++){
// m[i]=arr[i]
// }
// netstream.LOGGER(m.join(', '));
netstream.LOGGER("(netstream.Receiver): Stream " + stream + " is addressed in this message");
}
// get the command
var cmd = -1;
cmd = msg.getInt8(msgIndex++);
if (cmd != -1) {
if (cmd == netstream.constants.EVENT_ADD_NODE) {
this.serve_EVENT_ADD_NODE(msg, msgIndex);
......@@ -290,142 +219,25 @@
this.serve_EVENT_DEL_EDGE_ATTR(msg, msgIndex);
} else if (cmd == netstream.constants.EVENT_END) {
if (this.debug) {
netstream.LOGGER("(GSTransport.decodeMessage): Client properly ended the connection.");
netstream.LOGGER("(netstream.Receiver.decodeMessage): Client properly ended the connection.");
}
return;
} else {
if (this.debug) {
netstream.LOGGER("(GSTransport.decodeMessage): Don't know this command: " + cmd);
netstream.LOGGER("(netstream.Receiver.decodeMessage): Don't know this command: " + cmd);
}
return;
}
}
},
compactBuffer: function (limit) {
if (this.beg > this.sizeOfInt) {
var off = this.beg;
var array = new Uint8Array(this.buffer);
for (var i = 0; i < limit - this.beg; i++) {
array[i] = array[this.beg + i];
}
this.pos -= this.beg;
this.end -= this.beg;
this.beg = 0;
return off;
}
return 0;
},
ensureBufferCapacity: function (size) {
if (size > this.buffer.byteLength) {
if (this.debug) {
console.log("Actual buffer size is " + this.buffer.byteLength + " resizing to " + (2 * size + this.pos) + " bytes", size, this.pos);
}
var newb = new ArrayBuffer(2 * size + this.pos)
, i=0;
for (; i < this.pos; i++) {
newb[i] = this.buffer[i];
}
delete this.buffer;
this.buffer = newb;
}
},
handleMsg: function (encodedMsg) {
var limit = 0;
// Index past the last byte this.read during the current
// invocation.
var nbytes = 0;
// Number of bytes this.read.
nbytes = encodedMsg.length;
this.ensureBufferCapacity(nbytes);
var encodedArray = new Uint8Array(this.buffer, this.pos, nbytes);
for (var i = 0; i < nbytes; i++) {
encodedArray[i] = encodedMsg.charCodeAt(i);
}
limit = this.pos + nbytes;
if (nbytes <= 0)
return;
// Read the first header.
if (this.end < 0) {
if ((limit - this.beg) >= this.sizeOfInt) {
// If no data has been read yet in the buffer or if the
// buffer was emptied completely at previous call: prepare to read
// a new message by decoding its header.
this.end = this.unpackMessageSize(0) + this.sizeOfInt;
this.beg = this.sizeOfInt;
} else {
// The header is incomplete, wait next call to complete it.
this.pos = limit;
}
}
// Read one or more messages or wait next call to buffers more.
if (this.end > 0) {
while (this.end < limit) {
// While the end of the message is in the limit of what was
// read, there are one or more complete messages. Decode
// them
// and read the header of the next message, until a message
// is
// incomplete or there are no more messages or a header is
// incomplete.
this.decodeMessage();
//this.buffer.position(end);
if (this.end + this.sizeOfInt <= limit) {
// There is a following message.
this.beg = this.end + this.sizeOfInt;
this.end = this.end +
this.unpackMessageSize(this.end) +
this.sizeOfInt;
} else {
// There is the beginning of a following message
// but the header is incomplete. Compact the buffer
// and stop here.
this.beg = this.end;
var p = limit - this.end;
this.compactBuffer(limit);
this.pos = p;
this.beg = 0;
this.end = -1;
break;
}
}
if (this.end == limit) {
// If the end of the message coincides with the limit of
// what
// was this.read we have one last complete message. We decode it
// and
// clear the buffer for the next call.
this.decodeMessage();
//////////buf.clear();
this.pos = 0;
this.beg = 0;
this.end = -1;
} else if (this.end > limit) {
// If the end of the message if after what was read, prepare
// to this.read more at next call when we will have buffered more
// data. If we are at the end of the buffer compact it (else
// no more space will be available for buffering).
this.pos = limit;
if (this.end > this.buffer.byteLength)
this.compactBuffer(limit);
}
}
},
readType: function (msg, msgIndex) {
var data = -1;
data = msg.getInt8(msgIndex);
if (data == -1) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readType): could not read type");
netstream.LOGGER("(netstream.Receiver.readType): could not read type");
}
return 0;
}
......@@ -534,7 +346,7 @@
readLong: function (msg, msgIndex) {
var head = msg.getInt32(msgIndex);
if (head !== 0) {
netstream.LOGGER("(GSTransport.readLong) : Read long does not fit in a 32bits int... Big problem !!!!");
netstream.LOGGER("(netstream.Receiver.readLong) : Read long does not fit in a 32bits int... Big problem !!!!");
}
var data = msg.getInt32(msgIndex + 4);
return {
......@@ -642,7 +454,7 @@
head = msg.getInt32(msgIndex);
msgIndex += 4;
if (head !== 0) {
netstream.LOGGER("(GSTransport.readLong) : Read long does not fit in a 32bits int... Big problem !!!!");
netstream.LOGGER("(netstream.Receiver.readLong) : Read long does not fit in a 32bits int... Big problem !!!!");
}
data[i] = msg.getInt32(msgIndex);
msgIndex += 4;
......@@ -671,7 +483,7 @@
//////////////////////////////////////////////////////////////////////////////
serve_EVENT_DEL_EDGE_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received DEL_EDGE_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received DEL_EDGE_ATTR command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -681,12 +493,12 @@
var edgeId = res.data;
res = this.readString(msg, res.index);
var attrId = res.data;
this.gs_sink.edgeAttributeRemoved(sourceId, timeId, edgeId, attrId);
this.sink.edgeAttributeRemoved(sourceId, timeId, edgeId, attrId);
},
serve_EVENT_CHG_EDGE_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received CHG_EDGE_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received CHG_EDGE_ATTR command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -707,12 +519,12 @@
res = this.readValue(msg, res.index + 1, valueType);
var newValue = res.data;
this.gs_sink.edgeAttributeChanged(sourceId, timeId, edgeId, attrId, oldValue, newValue);
this.sink.edgeAttributeChanged(sourceId, timeId, edgeId, attrId, oldValue, newValue);
},
serve_EVENT_ADD_EDGE_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received ADD_EDGE_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received ADD_EDGE_ATTR command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -727,12 +539,12 @@
res = this.readValue(msg, res.index + 1, this.readType(msg, res.index));
var value = res.data;
this.gs_sink.edgeAttributeAdded(sourceId, timeId, edgeId, attrId, value);
this.sink.edgeAttributeAdded(sourceId, timeId, edgeId, attrId, value);
},
serve_EVENT_DEL_NODE_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received DEL_NODE_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received DEL_NODE_ATTR command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -744,12 +556,12 @@
res = this.readString(msg, res.index);
var attrId = res.data;
this.gs_sink.nodeAttributeRemoved(sourceId, timeId, nodeId, attrId);
this.sink.nodeAttributeRemoved(sourceId, timeId, nodeId, attrId);
},
serve_EVENT_CHG_NODE_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_CHG_NODE_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_CHG_NODE_ATTR command.");
}
var res = this.readString(msg, msgIndex);
......@@ -771,12 +583,12 @@
res = this.readValue(msg, res.index + 1, valueType);
var newValue = res.data;
this.gs_sink.nodeAttributeChanged(sourceId, timeId, nodeId, attrId, oldValue, newValue);
this.sink.nodeAttributeChanged(sourceId, timeId, nodeId, attrId, oldValue, newValue);
},
serve_EVENT_ADD_NODE_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_ADD_NODE_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_ADD_NODE_ATTR command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -791,12 +603,12 @@
res = this.readValue(msg, res.index + 1, this.readType(msg, res.index));
var value = res.data;
this.gs_sink.nodeAttributeAdded(sourceId, timeId, nodeId, attrId, value);
this.sink.nodeAttributeAdded(sourceId, timeId, nodeId, attrId, value);
},
serve_EVENT_DEL_GRAPH_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_DEL_GRAPH_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_DEL_GRAPH_ATTR command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -806,12 +618,12 @@
res = this.readString(msg, res.index);
var attrId = res.data;
this.gs_sink.graphAttributeRemoved(sourceId, timeId, attrId);
this.sink.graphAttributeRemoved(sourceId, timeId, attrId);
},
serve_EVENT_CHG_GRAPH_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_CHG_GRAPH_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_CHG_GRAPH_ATTR command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -831,13 +643,13 @@
res = this.readValue(msg, res.index + 1, valueType);
var newValue = res.data;
this.gs_sink.graphAttributeChanged(sourceId, timeId, attrId, oldValue, newValue);
this.sink.graphAttributeChanged(sourceId, timeId, attrId, oldValue, newValue);
},
serve_EVENT_ADD_GRAPH_ATTR: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_ADD_GRAPH_ATTR command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_ADD_GRAPH_ATTR command.");
}
var res = this.readString(msg, msgIndex);
......@@ -851,24 +663,24 @@
res = this.readValue(msg, res.index + 1, this.readType(msg, res.index));
var value = res.data;
this.gs_sink.graphAttributeAdded(sourceId, timeId, attrId, value);
this.sink.graphAttributeAdded(sourceId, timeId, attrId, value);
},
serve_EVENT_CLEARED: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_CLEARED command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_CLEARED command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
res = this.readLong(msg, res.index);
var timeId = res.data;
this.gs_sink.graphCleared(sourceId, timeId);
this.sink.graphCleared(sourceId, timeId);
},
serve_EVENT_STEP: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_STEP command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_STEP command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -878,12 +690,12 @@
res = this.readDouble(msg, res.index);
var time = res.data;
this.gs_sink.stepBegins(sourceId, timeId, time);
this.sink.stepBegins(sourceId, timeId, time);
},
serve_EVENT_DEL_EDGE: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_DEL_EDGE command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_DEL_EDGE command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -893,12 +705,12 @@
res = this.readString(msg, res.index);
var edgeId = res.data;
this.gs_sink.edgeRemoved(sourceId, timeId, edgeId);
this.sink.edgeRemoved(sourceId, timeId, edgeId);
},
serve_EVENT_ADD_EDGE: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received ADD_EDGE command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received ADD_EDGE command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -917,12 +729,12 @@
res = this.readBoolean(msg, res.index);
var directed = res.data;
this.gs_sink.edgeAdded(sourceId, timeId, edgeId, from, to, directed);
this.sink.edgeAdded(sourceId, timeId, edgeId, from, to, directed);
},
serve_DEL_NODE: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received DEL_NODE command.");
netstream.LOGGER("(netstream.Receiver.readLong) : Received DEL_NODE command.");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -932,12 +744,12 @@
res = this.readString(msg, res.index);
var nodeId = res.data;
this.gs_sink.nodeRemoved(sourceId, timeId, nodeId);
this.sink.nodeRemoved(sourceId, timeId, nodeId);
},
serve_EVENT_ADD_NODE: function (msg, msgIndex) {
if (this.debug) {
netstream.LOGGER("(GSTransport.readLong) : Received EVENT_ADD_NODE command");
netstream.LOGGER("(netstream.Receiver.readLong) : Received EVENT_ADD_NODE command");
}
var res = this.readString(msg, msgIndex);
var sourceId = res.data;
......@@ -947,7 +759,7 @@
res = this.readString(msg, res.index);
var nodeId = res.data;
this.gs_sink.nodeAdded(sourceId, timeId, nodeId);
this.sink.nodeAdded(sourceId, timeId, nodeId);
}
};
......
(function (global) {
"use strict";
if (!global.netstream) {
global.netstream = {};
}
global.netstream.Transport = function (options) {
// --------- constants
this.BUFFER_INITIAL_SIZE = 1048576;
// --------- parameters
this.scheme = "ws";
this.host = "localhost";
this.port = 2003;
this.uri = "/gs/stream";
this.socket = null;
this.verbose = true;
this.base64 = true;
this.onevent = null;
this.onopen = null;
// --------- init params
for(var prop in options) {
if(options.hasOwnProperty(prop