Commit 4849aab0 authored by Yoann Pigné's avatar Yoann Pigné

Mmm...

parent ae07dde0
......@@ -13,32 +13,88 @@ Copyright (c) 2011 University of Luxembourg. All rights reserved.
import socket
import struct
import types
import base64
from graphstream import AttributeSink, ElementSink
from netstream_constants import *
class NetStreamTransport(object):
"""Defines the general behaviour of the class that will be in charge of the actuall data sending"""
def __init__(self, stream, host, port):
raise NotImplementedError
def connect(self):
raise NotImplementedError
class DefaultNetStreamTransport(NetStreamTransport):
stream="Default"
host="localhost"
port=2001
socket=None
def __init__(self, stream, host, port):
self.stream = stream
self.stream_string=struct.pack("!i",len(stream))+stream
self.host = host
self.port = port
def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
def send(self, event):
if(self.socket == None):
self.connect()
self.socket.sendall(struct.pack("!i",+len(self.stream_string)+len(event))+self.stream_string+event)
class Base64NetStreamTransport(NetStreamTransport):
stream="Default"
host="localhost"
port=2001
socket=None
def __init__(self, stream, host, port):
self.stream = stream
self.stream_string=struct.pack("!i",len(stream))+stream
self.host = host
self.port = port
def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
def send(self, event):
if(self.socket == None):
self.connect()
encoded_msg = base64.b64encode(self.stream_string+event)
self.socket.sendall(base64.b64encode(struct.pack("!i",len(encoded_msg)))+encoded_msg)
class NetStreamSender(AttributeSink,ElementSink):
"""One client must send to only one identified stream (streamID, host, port)"""
stream="Default"
host="localhost"
port=2001
def __init__(self, stream, host, port):
"""Constructor with a Stream ID, a Host and a port number"""
super(NetStreamSender, self).__init__()
self.stream = stream
self.host = host
self._stream = self._encodeString(stream)
self._connect()
transport = None
def __init__(self, *args, **kwargs):
"""Constructor can be with one NetStreamTransport object OR with 3 args: Stream ID, Host, and port number"""
if len(args) == 1 and isinstance(args[0], NetStreamTransport):
print("Initialize from transport object")
self.init_from_transport(args[0])
else:
if len(args) == 3 and isinstance(args[0], types.StringType) and isinstance(args[1], types.StringType) and isinstance(args[2], types.IntType):
print("Initialize from args")
self.init_from_args(args[0], args[1], args[2])
else:
print("Impossible to Initialize")
def init_from_args(self, stream, host, port):
self.transport = DefaultNetStreamTransport(stream, host,port)
self.transport.connect();
def _connect(self):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((self.host, self.port))
def init_from_transport(self,transport):
self.transport = transport
def encode_string(self, string):
"""Encodes a given string"""
......@@ -153,8 +209,6 @@ class NetStreamSender(AttributeSink,ElementSink):
def _encodeByte(self, value):
return struct.pack("!b",value)
def _send(self, event):
self._socket.sendall(struct.pack("!i",+len(self._stream)+len(event))+self._stream+event)
# =========================
# = AttributeSink methods =
......@@ -166,7 +220,7 @@ class NetStreamSender(AttributeSink,ElementSink):
type = self._getType(value)
event = event + self._encodeByte(type)
event = event + self._encodeValue(value,type)
self._send(event)
self.transport.send(event)
def graphAttributeChanged(self, source_id, time_id, attribute, old_value, new_value):
event = self._encodeByte(EVENT_CHG_GRAPH_ATTR)
......@@ -176,13 +230,13 @@ class NetStreamSender(AttributeSink,ElementSink):
event = event + self._encodeByte(type)
event = event + self._encodeValue(old_value,type)
event = event + self._encodeValue(new_value,type)
self._send(event)
self.transport.send(event)
def graphAttributeRemoved(self, source_id, time_id, attribute):
event = self._encodeByte(EVENT_DEL_GRAPH_ATTR)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(attribute)
self._send(event)
self.transport.send(event)
def nodeAttributeAdded(self, source_id, time_id, node_id, attribute, value):
event = self._encodeByte(EVENT_ADD_NODE_ATTR)
......@@ -192,7 +246,7 @@ class NetStreamSender(AttributeSink,ElementSink):
type = self._getType(value)
event = event + self._encodeByte(type)
event = event + self._encodeValue(value,type)
self._send(event)
self.transport.send(event)
def nodeAttributeChanged(self, source_id, time_id, node_id, attribute, old_value, new_value):
event = self._encodeByte(EVENT_CHG_NODE_ATTR)
......@@ -203,14 +257,14 @@ class NetStreamSender(AttributeSink,ElementSink):
event = event + self._encodeByte(type)
event = event + self._encodeValue(old_value,type)
event = event + self._encodeValue(new_value,type)
self._send(event)
self.transport.send(event)
def nodeAttributeRemoved(self, source_id, time_id, node_id, attribute):
event = self._encodeByte(EVENT_DEL_NODE_ATTR)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(node_id)
event = event + self._encodeString(attribute)
self._send(event)
self.transport.send(event)
def edgeAttributeAdded(self, source_id, time_id, edge_id, attribute, value):
event = self._encodeByte(EVENT_ADD_EDGE_ATTR)
......@@ -220,7 +274,7 @@ class NetStreamSender(AttributeSink,ElementSink):
type = self._getType(value)
event = event + self._encodeByte(type)
event = event + self._encodeValue(value,type)
self._send(event)
self.transport.send(event)
def edgeAttributeChanged(self, source_id, time_id, edge_id, attribute, old_value, new_value):
event = self._encodeByte(EVENT_CHG_EDGE_ATTR)
......@@ -231,26 +285,26 @@ class NetStreamSender(AttributeSink,ElementSink):
event = event + self._encodeByte(type)
event = event + self._encodeValue(old_value,type)
event = event + self._encodeValue(new_value,type)
self._send(event)
self.transport.send(event)
def edgeAttributeRemoved(self, source_id, time_id, edge_id, attribute):
event = self._encodeByte(EVENT_DEL_EDGE_ATTR)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(edge_id)
event = event + self._encodeString(attribute)
self._send(event)
self.transport.send(event)
def nodeAdded(self, source_id, time_id, node_id):
event = self._encodeByte(EVENT_ADD_NODE)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(node_id)
self._send(event)
self.transport.send(event)
def nodeRemoved(self, source_id, time_id, node_id):
event = self._encodeByte(EVENT_DEL_NODE)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(node_id)
self._send(event)
self.transport.send(event)
def edgeAdded(self, source_id, time_id, edge_id, from_node, to_node, directed):
event = self._encodeByte(EVENT_ADD_EDGE)
......@@ -259,29 +313,29 @@ class NetStreamSender(AttributeSink,ElementSink):
event = event + self._encodeString(from_node)
event = event + self._encodeString(to_node)
event = event + self._encodeBoolean(directed)
self._send(event)
self.transport.send(event)
def edgeRemoved(self, source_id, time_id, edge_id):
event = self._encodeByte(EVENT_DEL_EDGE)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(edge_id)
self._send(event)
self.transport.send(event)
def graphCleared(self, source_id, time_id):
event = self._encodeByte(EVENT_CLEARED)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
self._send(event)
self.transport.send(event)
def stepBegins(self, source_id, time_id, timestamp):
event = self._encodeByte(EVENT_STEP)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeDouble(timestamp)
self._send(event)
self.transport.send(event)
def example():
"""docstring for main"""
stream = NetStreamSender("default","localhost",2001)
stream = NetStreamSender(Base64NetStreamTransport("default","localhost",2001))
source_id="xxx"
time_id=0L
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment