Commit 6fcc3570 authored by Yoann Pigné's avatar Yoann Pigné

clean up. a little

parent 38e1f26b
......@@ -17,7 +17,7 @@
this.debug = true;
this.sender = null;
this.stream = null;
this.source = null;
this.id = null;
for(var prop in options) {
if(options.hasOwnProperty(prop) && this.hasOwnProperty(prop)) {
......@@ -28,72 +28,72 @@
if (this.stream){
this.sender.stream = this.stream;
}
this.source = this.source || "s"+Math.floor(Math.random()*1000);
this.id = this.id || "s"+Math.floor(Math.random()*1000);
this._timeId = 0;
};
global.netstream.Source.prototype = {
addNode: function (node){
this.sender.nodeAdded(this.source, this._timeId, node);
this.sender.nodeAdded(this.id, this._timeId, node);
this._timeId+=1;
},
removeNode: function (node){
this.sender.nodeRemoved(this.source, this._timeId, node);
this.sender.nodeRemoved(this.id, this._timeId, node);
this._timeId+=1;
},
addEdge: function (edge, from_node, to_node, directed){
this.sender.edgeAdded(this.source, this._timeId, edge, from_node, to_node, directed);
this.sender.edgeAdded(this.id, this._timeId, edge, from_node, to_node, directed);
this._timeId+=1;
},
removeEdge: function(edge){
this.sender.edgeRemoved(this.source, this._timeId, edge);
this.sender.edgeRemoved(this.id, this._timeId, edge);
this._timeId+=1;
},
addAttribute: function (attribute, value){
this.sender.graphAttributeAdded(this.source, this._timeId, attribute, value);
this.sender.graphAttributeAdded(this.id, this._timeId, attribute, value);
this._timeId+=1;
},
removeAttribute: function (attribute){
this.sender.graphAttributeRemoved(this.source, this._timeId, attribute);
this.sender.graphAttributeRemoved(this.id, this._timeId, attribute);
this._timeId+=1;
},
changeAttribute: function (attribute, oldValue, newValue){
this.sender.graphAttributeChanged(this.source, this._timeId, attribute, oldValue, newValue);
this.sender.graphAttributeChanged(this.id, this._timeId, attribute, oldValue, newValue);
this._timeId+=1;
},
addNodeAttribute: function (node, attribute, value){
this.sender.nodeAttributeAdded(this.source, this._timeId, node, attribute, value);
this.sender.nodeAttributeAdded(this.id, this._timeId, node, attribute, value);
this._timeId+=1;
},
removeNodeAttibute: function(node, attribute){
this.sender.nodeAttributeRemoved(this.source, this._timeId, node, attribute);
this.sender.nodeAttributeRemoved(this.id, this._timeId, node, attribute);
this._timeId+=1;
},
changeNodeAttribute: function(node, attribute, oldValue, newValue){
this.sender.nodeAttributeChanged(this.source, this._timeId, node, attribute, oldValue, newValue);
this.sender.nodeAttributeChanged(this.id, this._timeId, node, attribute, oldValue, newValue);
this._timeId+=1;
},
addEdgeAttribute: function (edge, attribute, value){
this.sender.edgeAttributeAdded(this.source, this._timeId, edge, attribute, value);
this.sender.edgeAttributeAdded(this.id, this._timeId, edge, attribute, value);
this._timeId+=1;
},
removeEdgeAttribute: function (edge, attribute){
this.sender.edgeAttributeRemoved(this.source, this._timeId, edge, attribute);
this.sender.edgeAttributeRemoved(this.id, this._timeId, edge, attribute);
this._timeId+=1;
},
changeEdgeAttrivute: function (edge, attribute, oldValue, newValue){
this.sender.edgeAttributeChanged(this.source, this._timeId, edge, attribute, oldValue, newValue);
this.sender.edgeAttributeChanged(this.id, this._timeId, edge, attribute, oldValue, newValue);
this._timeId+=1;
},
clearGraph: function (){
this.sender.graphCleared(this.source, this._timeId);
this.sender.graphCleared(this.id, this._timeId);
this._timeId+=1;
},
stepBegins: function (time){
this.sender.stepBegins(this.source, this._timeId, time);
this.sender.stepBegins(this.id, this._timeId, time);
this._timeId+=1;
}
};
......
......@@ -116,7 +116,7 @@
},
sendEvent: function(buffer, start, length){
netstream.LOGGER("(netstream.transport.sendEvent) with buffer:"+buffer.byteLength+" ("+start+","+length+")");
//netstream.LOGGER("(netstream.transport.sendEvent) with buffer:"+buffer.byteLength+" ("+start+","+length+")");
// prepare buffer
......
......@@ -26,7 +26,7 @@ import org.graphstream.stream.netstream.packing.Base64Packer;
/**
*
*/
public class NetStreamWebSocketTest {
public class Test1 {
String clientSentence;
String capitalizedSentence;
......@@ -35,9 +35,9 @@ public class NetStreamWebSocketTest {
* @throws IOException
*/
public static void main(String[] args) throws IOException {
new NetStreamWebSocketTest();
new Test1();
}
public NetStreamWebSocketTest() throws IOException{
public Test1() throws IOException{
ServerSocket welcomeSocket = new ServerSocket(2001);
System.out.println("Listening?...");
while(true)
......@@ -51,7 +51,15 @@ public class NetStreamWebSocketTest {
new Thread(){
@Override
public void run() {
try {
sendGraph(new Integer(clientSentence));
} catch (NumberFormatException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}.start();
......@@ -59,13 +67,13 @@ public class NetStreamWebSocketTest {
}
public void sendGraph(int port){
public void sendGraph(int port) throws IOException{
Graph g = new MultiGraph("G");
// - the sender
NetStreamSender nsc = null;
try {
System.err.printf("Trying to connect to port %d%n",port);
nsc = new NetStreamSender(port);
nsc = new NetStreamSender("D", "localhost", port);
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
......@@ -84,18 +92,19 @@ public class NetStreamWebSocketTest {
// - generate some events on the client side
String style = "node{fill-mode:plain;fill-color:#567;size:6px;}";
g.addAttribute("stylesheet", style);
for (int i = 0; i < 50; i++) {
for (int i = 0; i < 5000; i++) {
try {
Thread.sleep(100);
//Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
g.addNode(i + "");
g.addNode("n"+i);
if (i > 0) {
g.addEdge(i + "-" + (i - 1), i + "", (i - 1) + "");
g.addEdge(i + "--" + (i / 2), i + "", (i / 2) + "");
g.addEdge("n"+i + "_" + "n"+(i - 1), "n"+i, "n"+(i - 1));
g.addEdge("n"+i + "__" + "n"+(i / 2), "n"+i , "n"+(i / 2));
}
}
g.write("ok.dgs");
System.err.printf("Done.");
}
......
......@@ -10,6 +10,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.graphstream.graph.Graph;
import org.graphstream.graph.implementations.AdjacencyListGraph;
import org.graphstream.graph.implementations.DefaultGraph;
import org.graphstream.graph.implementations.MultiGraph;
import org.graphstream.stream.GraphReplay;
import org.graphstream.stream.netstream.NetStreamReceiver;
import org.graphstream.stream.netstream.NetStreamSender;
......@@ -31,23 +32,23 @@ import org.graphstream.stream.thread.ThreadProxyPipe;
/**
*
*/
public class W3SinkDemo {
public class TestApp {
boolean alive;
ServerSocket serverSocket;
Graph g;
ConcurrentLinkedQueue<Connection> pending;
LinkedList<Connection> active;
public W3SinkDemo() throws IOException {
public TestApp() throws IOException {
this.serverSocket = new ServerSocket(2001);
this.alive = true;
this.pending = new ConcurrentLinkedQueue<Connection>();
this.g = new AdjacencyListGraph("w3sink-demo");
this.g = new MultiGraph("w3sink-demo", false, true);
this.active = new LinkedList<Connection>();
Runnable r = new Runnable() {
public void run() {
W3SinkDemo.this.listen();
TestApp.this.listen();
}
};
......@@ -58,7 +59,7 @@ public class W3SinkDemo {
r = new Runnable() {
public void run() {
try {
W3SinkDemo.this.handleGraph();
TestApp.this.handleGraph();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
......@@ -75,7 +76,7 @@ public class W3SinkDemo {
System.out.printf(" * graph running ...\n");
NetStreamReceiver receiver = new NetStreamReceiver("localhost", 2002, true);
NetStreamReceiver receiver = new NetStreamReceiver("localhost", 2002);
receiver.setUnpacker(new Base64Unpacker());
ThreadProxyPipe pipe = receiver.getDefaultStream();
......@@ -174,7 +175,7 @@ public class W3SinkDemo {
* @throws IOException
*/
public static void main(String[] args) throws IOException {
new W3SinkDemo();
new TestApp();
}
......
var SERVER_IP = "127.0.0.1";
var http = require('http')
, fs = require('fs')
, WebSocketServer = require('ws').Server
, WebSocket = require('ws')
, sourceID="nodeServer"
, timeID=0
, net = require('net')
, netstream = {};
netstream.constants = require("../netstream_constants").netstream.constants;
//
// Classical Http server to serve the files...
//
http.createServer(function (request, response) {
console.log('request starting... '+request.url);
fs.readFile('./'+request.url, function(error, content) {
if (error) {
response.writeHead(500);
response.end();
//console.log(error);
}
else {
response.writeHead(200, { 'Content-Type': 'text/html' });
response.end(content, 'utf-8');
}
});
}).listen(8080);
console.log('Http Server running at http://127.0.0.1:8080/');
var wss = new WebSocketServer({port: 2003, host:SERVER_IP});
wss.on('error', function(){
console.log('WS error....');
});
wss.on('connection', function(ws) {
console.log("WS Client connected to node");
// create a response server for gs. Random (free) port
// let's get a graph...
var events_server = net.createServer(function(c) {
console.log('events_server connected');
c.on('end', function() {
console.log('events_server disconnected');
});
c.on("data", function (data) {
if(ws.readyState === WebSocket.OPEN){
ws.send(data);
}
});
});
events_server.listen(function() { //'listening' listener
console.log('events_server bound');
});
var events_server_port = events_server.address().port;
// ask GS
var gs_client = net.connect(2001, function() { //'connect' listener
console.log('gs_client connected');
gs_client.write(""+events_server_port);
gs_client.end();
});
gs_client.on('end', function() {
console.log('gs_client disconnected');
});
// this client closes the connection
ws.on('close', function(){
console.log("WS client closed it.")
});
// this client sends somthing to GS
ws.on('message', function(message) {
//console.log('received: %s', message);
// if (message === "Heartbeat"){
// console.log('Heartbeating...');
//
// }
});
});
......@@ -13,6 +13,8 @@
<script src="/lib/netstream_constants.js" type="text/javascript"></script>
<script src="/lib/netstream_commons.js" type="text/javascript"></script>
<script src="/lib/data_view.js" type="text/javascript"></script>
<script src="/lib/netstream_transport.js" type="text/javascript"></script>
<script src="/lib/netstream_receiver.js" type="text/javascript"></script>
<script>
......@@ -20,8 +22,9 @@
var elmt = document.getElementById("debug")
var handler = new netstream.DOMGSSink(elmt);
//var handler = new netstream.DefaultGSSink();
var transport = new netstream.Receiver("ws", "127.0.0.1", "2003", "/gs/stream", handler);
var transport = new netstream.Receiver({'debug':false,"sink":handler});
// transport.debug = true;
})();
</script>
......
var SERVER_IP = "127.0.0.1";
var http = require('http')
, fs = require('fs')
, WebSocketServer = require('ws').Server
, WebSocket = require('ws')
, sourceID = "nodeServer"
, timeID = 0
, net = require('net')
, netstream = {};
netstream.constants = require("../netstream_constants").netstream.constants;
//
// Classical Http server to serve the files...
//
http.createServer(function(request, response) {
console.log(request.url);
fs.readFile('./' + request.url,
function(error, content) {
if (error) {
response.writeHead(500);
response.end();
//console.log(error);
}
else {
response.writeHead(200, {
'Content-Type': 'text/html'
});
response.end(content, 'utf-8');
}
});
}).listen(8080);
console.log('Http Server running at http://127.0.0.1:8080/');
var wss = new WebSocketServer({
port: 2003,
host: SERVER_IP
});
wss.on('error',
function() {
console.log('WS error....');
});
wss.on('connection',
function(ws) {
console.log("WS Client connected to node");
// create a response server for gs. Random (free) port
// let's get a graph...
var events_server = net.createServer(function(c) {
console.log('events_server connected');
c.on('end',
function() {
console.log('events_server disconnected');
});
c.on("data",
function(data) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
});
events_server.listen(function() {
//'listening' listener
console.log('events_server bound');
});
var events_server_port = events_server.address().port;
// ask GS
var gs_client = net.connect(2001,
function() {
//'connect' listener
console.log('gs_client connected');
gs_client.write("" + events_server_port);
gs_client.end();
});
gs_client.on('end',
function() {
console.log('gs_client disconnected');
});
// this client closes the connection
ws.on('close',
function() {
console.log("WS client closed it.")
});
// this client sends somthing to GS
ws.on('message',
function(message) {
//console.log('received: %s', message);
// if (message === "Heartbeat"){
// console.log('Heartbeating...');
//
// }
});
});
......@@ -30,6 +30,7 @@
'base64': true,
'debug': false
});
transport.connect();
// ------- receives events from the socket
var sink = new netstream.DOMSink(document.getElementById("debug"));
......@@ -51,7 +52,7 @@
'sender': sender
});
console.log("My name is %s", source.id);
var ss = "node{fill-mode:plain;fill-color:#567;size:6px;}";
source.addAttribute("stylesheet", ss);
source.addAttribute("ui.antialias", true);
......@@ -60,7 +61,14 @@
source.addNode('1');
source.addNode('2');
source.addNode('3');
source.addNode(source.id+'0');
source.addNode(source.id+'1');
source.addNode(source.id+'2');
source.addNode(source.id+'3');
source.addEdge(source.id+'00', source.id+'0', ''+Math.floor(Math.random()*4), false);
source.addEdge(source.id+'01', source.id+'1', ''+Math.floor(Math.random()*4), false);
source.addEdge(source.id+'02', source.id+'2', ''+Math.floor(Math.random()*4), false);
source.addEdge(source.id+'03', source.id+'3', ''+Math.floor(Math.random()*4), false);
} ());
</script>
......
......@@ -14,6 +14,7 @@ var http = require('http')
, gs_client=null;
netstream.constants = require("../netstream_constants").netstream.constants;
netstream.sender = require("../netstream_sender").netstream.sender;
......@@ -22,7 +23,6 @@ netstream.constants = require("../netstream_constants").netstream.constants;
// Classical Http server to serve the files...
//
http.createServer(function(request, response) {
console.log(request.url);
fs.readFile('./' + request.url,
function(error, content) {
......@@ -38,18 +38,16 @@ http.createServer(function(request, response) {
response.end(content, 'utf-8');
}
});
}).listen(8080);
}).listen(8080, function(){console.log('Http Server running at http://127.0.0.1:8080/')});
console.log('Http Server running at http://127.0.0.1:8080/');
var gs_client = null;
// back server to receive and broadcast events from GS to the clients
var gs_server = net.createServer(function(c) {
console.log('gs_server connected');
//console.log('gs_server connected');
c.on('end',
function() {
console.log('gs_server disconnected');
//console.log('gs_server disconnected');
});
c.on("data",
function(data) {
......@@ -63,7 +61,7 @@ var gs_server = net.createServer(function(c) {
gs_server.listen(2000, function() {
//'listening' listener
console.log('gs_server bound');
//console.log('gs_server bound');
});
......@@ -81,16 +79,13 @@ function() {
wss.on('connection',
function(ws) {
console.log("WS Client connected to node");
// create a response server for gs. Random (free) port
// let's get a graph...
var events_server = net.createServer(function(c) {
console.log('events_server connected');
//console.log('events_server connected');
c.on('end',
function() {
console.log('events_server disconnected');
//console.log('events_server disconnected');
});
c.on("data",
function(data) {
......@@ -101,7 +96,7 @@ function(ws) {
});
events_server.listen(function() {
//'listening' listener
console.log('events_server bound');
//console.log('events_server bound');
});
var events_server_port = events_server.address().port;
......@@ -125,7 +120,6 @@ function(ws) {
// this client closes the connection
ws.on('close',
function() {
console.log("WS client closed it.")
});
......@@ -136,23 +130,22 @@ function(ws) {
//console.log('Heartbeating...');
}
else{
console.log('received from WS: %s', message);
console.log('received from WS');
if(!gs_comm_initiated){
gs_comm_initiated=true;
do_gs_client();
}
if ( gs_client === null || typeof gs_client ==="undefined" ){
console.log('client undefined, queuing msg : %s', message);
console.log(typeof gs_client);
console.log('queuing message (client undefined )');
queue.push(message);
}
else if (gs_client.readyState !== 'open'){
console.log(gs_client.readyState);
console.log('gs_client not ready open, queuing msg : %s', message);
console.log('queuing message (client opening)');
queue.push(message);
} else{
console.log('directly passing msg to GS : %s', message);
//console.log('directly passing msg to GS : %s', message);
gs_client.write(message);
}
}
......@@ -165,21 +158,20 @@ function(ws) {
function do_gs_client(callback){
// one connection from node to GS'receiver
gs_client = net.connect(2002, function() { //'connect' listener
console.log('gs_client connected');