Commit 38e1f26b authored by Yoann Pigné's avatar Yoann Pigné

test files

parent 83638257
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.graphstream.graph.Graph;
import org.graphstream.graph.implementations.AdjacencyListGraph;
import org.graphstream.graph.implementations.DefaultGraph;
import org.graphstream.stream.GraphReplay;
import org.graphstream.stream.netstream.NetStreamReceiver;
import org.graphstream.stream.netstream.NetStreamSender;
import org.graphstream.stream.netstream.packing.Base64Packer;
import org.graphstream.stream.netstream.packing.Base64Unpacker;
import org.graphstream.stream.thread.ThreadProxyPipe;
/**
*
* Copyright (c) 2012 University of Le Havre
*
* @file NetSteamWebSocketTestApp.java
* @date May 21, 2012
*
* @author Yoann Pigné
*
*/
/**
*
*/
public class W3SinkDemo {
boolean alive;
ServerSocket serverSocket;
Graph g;
ConcurrentLinkedQueue<Connection> pending;
LinkedList<Connection> active;
public W3SinkDemo() throws IOException {
this.serverSocket = new ServerSocket(2001);
this.alive = true;
this.pending = new ConcurrentLinkedQueue<Connection>();
this.g = new AdjacencyListGraph("w3sink-demo");
this.active = new LinkedList<Connection>();
Runnable r = new Runnable() {
public void run() {
W3SinkDemo.this.listen();
}
};
Thread t = new Thread(r);
t.setDaemon(true);
t.start();
r = new Runnable() {
public void run() {
try {
W3SinkDemo.this.handleGraph();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
};
t = new Thread(r);
t.start();
}
private void handleGraph() throws UnknownHostException, IOException {
System.out.printf(" * graph running ...\n");
NetStreamReceiver receiver = new NetStreamReceiver("localhost", 2002, true);
receiver.setUnpacker(new Base64Unpacker());
ThreadProxyPipe pipe = receiver.getDefaultStream();
pipe.addSink(g);
// send events to clients through node.js
NetStreamSender sender = new NetStreamSender(2000);
sender.setPacker(new Base64Packer());
g.addSink(sender);
while (alive) {
// get events from clients
pipe.pump();
// get new clients and send replay
while (pending.size() > 0)
register(pending.poll());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
private void register(Connection conn) {
GraphReplay replay = new GraphReplay("replay-" + g.getId());
replay.addSink(conn.nss);
replay.replay(g);
replay.removeSink(conn.nss);
g.addSink(conn.nss);
active.add(conn);
System.out.printf("[generator] new connection registered'\n");
}
private void listen() {
System.out.printf(" * Server is listening ...\n");
while (alive) {
Socket s = null;
Connection c = null;
try {
s = serverSocket.accept();
c = new Connection(s);
pending.add(c);
System.out.printf("[server] new connection from '%s'\n", s.getInetAddress().getHostName());
} catch (IOException e) {
e.printStackTrace();
try {
s.close();
} catch (IOException e2) {
}
}
}
try {
serverSocket.close();
} catch (Exception e) {
}
System.out.printf(" * Server is closed ...\n");
}
private class Connection {
BufferedReader in;
NetStreamSender nss;
Connection(Socket socket) throws IOException {
int port;
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
port = Integer.parseInt(in.readLine());
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
nss = new NetStreamSender(port);
nss.setPacker(new Base64Packer());
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
new W3SinkDemo();
}
}
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>Web Socket Example</title>
</head>
<body>
<div id="debug"></div>
<div id="holder" style="width:600px; height:300px"></div>
<script src="/lib/netstream_constants.js" type="text/javascript"></script>
<script src="/lib/netstream_commons.js" type="text/javascript"></script>
<script src="/lib/data_view.js" type="text/javascript"></script>
<script src="/lib/netstream_transport.js" type="text/javascript"></script>
<script src="/lib/netstream_receiver.js" type="text/javascript"></script>
<script src="/lib/netstream_sender.js" type="text/javascript"></script>
<script>
(function() {
// ------- the socket itself (and a bit more)
var transport = new netstream.Transport({
'host': "localhost",
'port': 2003,
'uri': '/',
'base64': true,
'debug': false
});
// ------- receives events from the socket
var sink = new netstream.DOMSink(document.getElementById("debug"));
var receiver = new netstream.Receiver({
'debug': false,
'transport': transport,
'sink': sink
});
// -------- send events through the socket
var sender = new netstream.Sender({
'debug': false,
'transport': transport
});
// -------- a utility objects that handles sourceIds and timeIds for us
source = new netstream.Source({
'sender': sender
});
var ss = "node{fill-mode:plain;fill-color:#567;size:6px;}";
source.addAttribute("stylesheet", ss);
source.addAttribute("ui.antialias", true);
source.addAttribute("layout.stabilization-limit", 0);
source.addNode('0');
source.addNode('1');
source.addNode('2');
source.addNode('3');
} ());
</script>
</body>
</html>
var SERVER_IP = "127.0.0.1";
var http = require('http')
, fs = require('fs')
, WebSocketServer = require('ws').Server
, WebSocket = require('ws')
, sourceID = "nodeServer"
, timeID = 0
, net = require('net')
, netstream = {}
, queue=[]
, gs_comm_initiated=false
, clients=[]
, gs_client=null;
netstream.constants = require("../netstream_constants").netstream.constants;
//
// Classical Http server to serve the files...
//
http.createServer(function(request, response) {
console.log(request.url);
fs.readFile('./' + request.url,
function(error, content) {
if (error) {
response.writeHead(500);
response.end();
//console.log(error);
}
else {
response.writeHead(200, {
'Content-Type': 'text/html'
});
response.end(content, 'utf-8');
}
});
}).listen(8080);
console.log('Http Server running at http://127.0.0.1:8080/');
var gs_client = null;
// back server to receive and broadcast events from GS to the clients
var gs_server = net.createServer(function(c) {
console.log('gs_server connected');
c.on('end',
function() {
console.log('gs_server disconnected');
});
c.on("data",
function(data) {
clients.forEach(function(ws){
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
});
});
gs_server.listen(2000, function() {
//'listening' listener
console.log('gs_server bound');
});
var wss = new WebSocketServer({
port: 2003,
host: SERVER_IP
});
wss.on('error',
function() {
console.log('WS error....');
});
wss.on('connection',
function(ws) {
console.log("WS Client connected to node");
// create a response server for gs. Random (free) port
// let's get a graph...
var events_server = net.createServer(function(c) {
console.log('events_server connected');
c.on('end',
function() {
console.log('events_server disconnected');
});
c.on("data",
function(data) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
});
events_server.listen(function() {
//'listening' listener
console.log('events_server bound');
});
var events_server_port = events_server.address().port;
// ask GS for a replay
var tmp_gs_client = net.connect(2001,
function() {
//'connect' listener
console.log('tmp_gs_client connected');
tmp_gs_client.write("" + events_server_port);
tmp_gs_client.end();
});
tmp_gs_client.on('end',
function() {
console.log('tmp_gs_client disconnected');
});
// this client closes the connection
ws.on('close',
function() {
console.log("WS client closed it.")
});
// this client sends somthing to GS
ws.on('message',
function(message) {
if (message === "Heartbeat"){
//console.log('Heartbeating...');
}
else{
console.log('received from WS: %s', message);
if(!gs_comm_initiated){
gs_comm_initiated=true;
do_gs_client();
}
if ( gs_client === null || typeof gs_client ==="undefined" ){
console.log('client undefined, queuing msg : %s', message);
console.log(typeof gs_client);
queue.push(message);
}
else if (gs_client.readyState !== 'open'){
console.log(gs_client.readyState);
console.log('gs_client not ready open, queuing msg : %s', message);
queue.push(message);
} else{
console.log('directly passing msg to GS : %s', message);
gs_client.write(message);
}
}
});
});
function do_gs_client(callback){
// one connection from node to GS'receiver
gs_client = net.connect(2002, function() { //'connect' listener
console.log('gs_client connected');
for(var i=0; i<queue.length; i++){
console.log('dequeuing: %s', queue[i]);
this.write(queue[i]);
}
delete queue;
});
gs_client.on('data', function(data) {
console.log("from gs_client: "+data.toString());
});
gs_client.on('end', function() {
console.log('gs_client disconnected');
gs_comm_initiated=false;
});
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment