Commit 02e5f69e authored by Hugo Hromic's avatar Hugo Hromic

Refactored the sender class and added compatibility with newer version of gs-core (varints).

parent 5d8c3adf
# Created by https://www.gitignore.io
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
from gs_netstream.sender import NetStreamProxyGraph, NetStreamSender, Base64NetStreamTransport
import logging
from gs_netstream.sender import NetStreamProxyGraph, NetStreamSender
transport = Base64NetStreamTransport("default","localhost",2001)
sender = NetStreamSender(transport)
logging.basicConfig(level=logging.DEBUG)
sender = NetStreamSender(2012)
proxy = NetStreamProxyGraph(sender)
style = "node{fill-mode:plain;fill-color:gray;size:1px;}"
proxy.addAttribute("stylesheet", style)
proxy.add_attribute("stylesheet", style)
proxy.addAttribute("ui.antialias", True)
proxy.addAttribute("layout.stabilization-limit", 0)
proxy.add_attribute("ui.antialias", True)
proxy.add_attribute("layout.stabilization-limit", 0)
for i in range(0,500):
proxy.addNode(str(i))
if(i>0):
proxy.addEdge(str(i)+"_"+str(i-1), str(i), str(i-1),False)
proxy.addEdge(str(i)+"__"+str(i/2), str(i), str(i/2), False)
\ No newline at end of file
proxy.add_node(str(i))
if i > 0:
proxy.add_edge(str(i) + "_" + str(i-1), str(i), str(i-1), False)
proxy.add_edge(str(i) + "__" + str(i/2), str(i), str(i/2), False)
......@@ -9,54 +9,50 @@ Copyright (c) 2011 University of Luxembourg. All rights reserved.
import sys
import os
import unittest
class AttributeSink(object):
def graphAttributeAdded(self, source_id, time_id, attribute, value):
raise NotImplementedError
def graphAttributeChanged(self, source_id, time_id, attribute, old_value, new_value):
raise NotImplementedError
def graphAttributeRemoved(self, source_id, time_id, attribute):
raise NotImplementedError
def nodeAttributeAdded(self, source_id, time_id, node_id, attribute, value):
raise NotImplementedError
def nodeAttributeChanged(self, source_id, time_id, node_id, attribute, old_value, new_value):
raise NotImplementedError
def nodeAttributeRemoved(self, source_id, time_id, node_id, attribute):
raise NotImplementedError
def edgeAttributeAdded(self, source_id, time_id, edge_id, attribute, value):
raise NotImplementedError
def edgeAttributeChanged(self, source_id, time_id, edge_id, attribute, old_value, new_value):
raise NotImplementedError
def edgeAttributeRemoved(self, source_id, time_id, edge_id, attribute):
raise NotImplementedError
def graph_attribute_added(self, source_id, time_id, attribute, value):
raise NotImplementedError
def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_value):
raise NotImplementedError
def graph_attribute_removed(self, source_id, time_id, attribute):
raise NotImplementedError
def node_attribute_added(self, source_id, time_id, node_id, attribute, value):
raise NotImplementedError
def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_value, new_value):
raise NotImplementedError
def node_attribute_removed(self, source_id, time_id, node_id, attribute):
raise NotImplementedError
def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value):
raise NotImplementedError
def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_value, new_value):
raise NotImplementedError
def edge_attribute_removed(self, source_id, time_id, edge_id, attribute):
raise NotImplementedError
class ElementSink(object):
def nodeAdded(self, source_id, time_id, node_id):
raise NotImplementedError
def nodeRemoved(self, source_id, time_id, node_id):
raise NotImplementedError
def edgeAdded(self, source_id, time_id, edge_id, from_node, to_node, directed):
raise NotImplementedError
def edgeRemoved(self, source_id, time_id, edge_id):
raise NotImplementedError
def graphCleared(self):
raise NotImplementedError
def stepBegins(self, source_id, time_id, timestamp):
raise NotImplementedError
def node_added(self, source_id, time_id, node_id):
raise NotImplementedError
def node_removed(self, source_id, time_id, node_id):
raise NotImplementedError
def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed):
raise NotImplementedError
def edge_removed(self, source_id, time_id, edge_id):
raise NotImplementedError
def step_begun(self, source_id, time_id, timestamp):
raise NotImplementedError
def graph_cleared(self, source_id, time_id):
raise NotImplementedError
"""
the NetStream constants module.
the NetStream constants module.
Contains the constant bytes used in the protocol to identifie data types and events.
Contains the constant bytes used in the protocol to identifie data types and events.
"""
"""Followed by an 32-bit signed integer for this protocol version. Certainly useless."""
......@@ -13,113 +13,104 @@ EVENT_START = 0x01
"""Not used."""
EVENT_END = 0x02
# ==============================
# = GraphStream's graph events =
# ==============================
"""Followed by a node id (TYPE_STRING format)"""
EVENT_ADD_NODE = 0x10
"""Followed by a node id (TYPE_STRING format)"""
EVENT_DEL_NODE = 0x11
"""
Followed by
- an edge id (TYPE_STRING format),
- an source node id (TYPE_STRING format),
Followed by
- an edge id (TYPE_STRING format),
- an source node id (TYPE_STRING format),
- a target node id (TYPE_STRING format
- a boolean indicating if directed (TYPE_BOOLEAN format)
- a boolean indicating if directed (TYPE_BOOLEAN format)
"""
EVENT_ADD_EDGE = 0x12
"""Followed by an edge id (TYPE_STRING format) """
EVENT_DEL_EDGE = 0x13
"""Followed by double (TYPE_DOUBLE format) """
EVENT_STEP = 0x14
EVENT_CLEARED = 0x15
"""
Followed by
Followed by
- an attribute id (TYPE_STRING format)
- the attribute TYPE
- the attribute value
- the attribute value
"""
EVENT_ADD_GRAPH_ATTR = 0x16
"""
Followed by
Followed by
- an attribute id (TYPE_STRING format)
- the attribute TYPE
- the attribute old value
- the attribute new value
- the attribute new value
"""
EVENT_CHG_GRAPH_ATTR = 0x17
"""
Followed by
Followed by
- the attribute id (TYPE_STRING format)
"""
EVENT_DEL_GRAPH_ATTR = 0x18
"""
Followed by
Followed by
- an attribute id (TYPE_STRING format)
- the attribute TYPE
- the attribute value
- the attribute value
"""
EVENT_ADD_NODE_ATTR = 0x19
"""
Followed by
Followed by
- an attribute id (TYPE_STRING format)
- the attribute TYPE
- the attribute old value
- the attribute new value
- the attribute new value
"""
EVENT_CHG_NODE_ATTR = 0x1a
"""
Followed by
Followed by
- the node id (TYPE_STRING format)
- the attribute id (TYPE_STRING format)
"""
EVENT_DEL_NODE_ATTR = 0x1b
"""
Followed by
Followed by
- an attribute id (TYPE_STRING format)
- the attribute TYPE
- the attribute value
- the attribute value
"""
EVENT_ADD_EDGE_ATTR = 0x1c
"""
Followed by
Followed by
- an attribute id (TYPE_STRING format)
- the attribute TYPE
- the attribute old value
- the attribute new value
- the attribute new value
"""
EVENT_CHG_EDGE_ATTR = 0x1d
"""
Followed by
Followed by
- the edge id (TYPE_STRING format)
- the attribute id (TYPE_STRING format)
"""
EVENT_DEL_EDGE_ATTR = 0x1e
# ===============
# = Value Types =
# ===============
......@@ -128,14 +119,14 @@ EVENT_DEL_EDGE_ATTR = 0x1e
Followed by a byte who's value is 0 or 1
"""
TYPE_BOOLEAN = 0x50
"""
An array of booleans. Followed by first, a 16-bits integer for the number
of booleans and then, a list of bytes who's value is 0 or 1
"""
TYPE_BOOLEAN_ARRAY = 0x51
"""Followed by a signed byte [-127,127]"""
"""Followed by a signed byte [-127,127]"""
TYPE_BYTE = 0x52
"""
......@@ -169,7 +160,7 @@ TYPE_LONG = 0x58
"""
An array of longs. Followed by first, a 16-bits integer for the number of
longs and then, a list of 62-bit signed integers
longs and then, a list of 64-bit signed integers
"""
TYPE_LONG_ARRAY = 0x59
......@@ -187,7 +178,6 @@ TYPE_FLOAT_ARRAY = 0x5b
"""Followed by a double precision 64-bits floating point number"""
TYPE_DOUBLE = 0x5c
"""
Array of double. Followed by first, a 16-bits integer for the number of
doubles and then, a list of 64-bit doubles
......@@ -206,11 +196,10 @@ Raw data, good for serialization. Followed by first, a 16-bits integer
indicating the length in bytes of the dataset, and then the data itself.
"""
TYPE_RAW = 0x5f
"""
An type-unspecified array. Followed by first, a
16-bits integer indicating the number of elements, and then, the elements
themselves. The elements themselves have to give their type.
"""
TYPE_ARRAY = 0x60
......@@ -8,492 +8,590 @@ This module implements a NetStream Sender
Created by Yoann Pigné on 2011-08-20.
Copyright (c) 2011 University of Luxembourg. All rights reserved.
Heavily modified to work with new GraphStream core versions.
Hugo Hromic <hugo.hromic@insight-centre.org>
"""
import socket
import struct
import types
import base64
import varint
import logging
from random import random
from common import AttributeSink, ElementSink
from constants import *
class DefaultNetStreamTransport(object):
"""Default transport class using TCP/IP networking."""
def __init__(self, host, port):
"""Initialize using host and port."""
self.host = host
self.port = port
self.socket = None
class NetStreamTransport(object):
"""Defines the general behaviour of the class that will be in charge of the actuall data sending"""
def __init__(self, stream, host, port):
raise NotImplementedError
def connect(self):
raise NotImplementedError
class DefaultNetStreamTransport(NetStreamTransport):
stream="Default"
host="localhost"
port=2001
socket=None
def __init__(self, stream, host, port):
self.stream = stream
self.stream_string=struct.pack("!i",len(stream))+stream
self.host = host
self.port = port
def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
def send(self, event):
if(self.socket == None):
self.connect()
self.socket.sendall(struct.pack("!i",+len(self.stream_string)+len(event))+self.stream_string+event)
class Base64NetStreamTransport(NetStreamTransport):
stream="Default"
host="localhost"
port=2001
socket=None
def __init__(self, stream, host, port):
self.stream = stream
self.stream_string=struct.pack("!i",len(stream))+stream
self.host = host
self.port = port
def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
def send(self, event):
if(self.socket == None):
self.connect()
encoded_msg = base64.b64encode(self.stream_string+event)
self.socket.sendall(base64.b64encode(struct.pack("!i",len(encoded_msg)))+encoded_msg)
class NetStreamSender(AttributeSink,ElementSink):
"""One client must send to only one identified stream (streamID, host, port)"""
transport = None
def __init__(self, *args, **kwargs):
"""Constructor can be with one NetStreamTransport object OR with 3 args: Stream ID, Host, and port number"""
if len(args) == 1 and isinstance(args[0], NetStreamTransport):
print("Initialize from transport object")
self.init_from_transport(args[0])
else:
if len(args) == 3 and isinstance(args[0], types.StringType) and isinstance(args[1], types.StringType) and isinstance(args[2], types.IntType):
print("Initialize from args")
self.init_from_args(args[0], args[1], args[2])
else:
print("Impossible to Initialize")
def init_from_args(self, stream, host, port):
self.transport = DefaultNetStreamTransport(stream, host,port)
self.transport.connect();
def init_from_transport(self,transport):
self.transport = transport
def encode_string(self, string):
"""Encodes a given string"""
return struct.pack("!i",len(string))+string
def _getType(self, value):
is_array = isinstance(value,types.ListType)
if is_array:
value = value[0]
if isinstance(value, types.BooleanType):
if is_array:
return TYPE_BOOLEAN_ARRAY
else:
return TYPE_BOOLEAN
elif isinstance(value, types.IntType):
if is_array:
return TYPE_INT_ARRAY
else:
return TYPE_INT
elif isinstance(value, types.LongType):
if is_array:
return TYPE_LONG_ARRAY
else:
return TYPE_LONG
elif isinstance(value, types.FloatType):
if is_array:
return TYPE_DOUBLE_ARRAY
else:
return TYPE_DOUBLE
elif isinstance(value, types.StringType) or isinstance(value, types.UnicodeType):
return TYPE_STRING
elif isinstance(value, types.DictType):
raise NotImplementedError("You tried to send a dictionary through the NetStream Protocol, though it is not yet implemented.")
def _encodeValue(self, value, value_type):
if TYPE_BOOLEAN == value_type:
return self._encodeBoolean(value)
elif TYPE_BOOLEAN_ARRAY == value_type:
return self._encodeBooleanArray(value)
elif TYPE_INT == value_type:
return self._encodeInt(value)
elif TYPE_INT_ARRAY == value_type:
return self._encodeIntArray(value)
elif TYPE_LONG == value_type:
return self._encodeLong(value)
elif TYPE_LONG_ARRAY == value_type:
return self._encodeLongArray(value)
elif TYPE_DOUBLE == value_type:
return self._encodeDouble(value)
elif TYPE_DOUBLE_ARRAY == value_type:
return self._encodeDoubleArray(value)
elif TYPE_STRING == value_type:
return self._encodeString(value);
return None
def _encodeString(self, value):
return struct.pack("!i",len(value))+value
def _encodeDoubleArray(self, value):
if not isinstance(value, types.ListType) or not isinstance(value[0], types.FloatType):
raise TypeError("You've specified an incorrect type")
event = struct.pack("!i",len(value))
for v in value:
if not isinstance(v,types.FloatType):
raise TypeError("You've specified an incorrect type")
event = event+struct.pack("!d",v)
return event
def _encodeDouble(self, value):
return struct.pack("!d",value)
def _encodeLongArray(self, value):
if not isinstance(value, types.ListType) or not isinstance(value[0], types.LongType):
raise TypeError("You've specified an incorrect type")
event = struct.pack("!i",len(value))
for v in value:
if not isinstance(v,types.LongType):
raise TypeError("You've specified an incorrect type")
event = event+struct.pack("!q",v)
return event
def _encodeLong(self, value):
return struct.pack("!q",value)
def _encodeIntArray(self, value):
if not isinstance(value, types.ListType) or not isinstance(value[0], types.IntType):
raise TypeError("You've specified an incorrect type")
event = struct.pack("!i",len(value))
for v in value:
if not isinstance(v,types.IntType):
raise TypeError("You've specified an incorrect type")
event = event+struct.pack("!i",v)
return event
def _encodeInt(self, value):
return struct.pack("!i",value)
def _encodeBooleanArray(self, value):
if not isinstance(value, types.ListType) or not isinstance(value[0], types.BooleanType):
raise TypeError("You've specified an incorrect type")
event = struct.pack("!i",len(value))
for v in value:
if not isinstance(v,types.BooleanType):
raise TypeError("You've specified an incorrect type")
event = event+struct.pack("!?",v)
return event
def _encodeBoolean(self, value):
return struct.pack("!?",value)
def _encodeByte(self, value):
return struct.pack("!b",value)
# =========================
# = AttributeSink methods =
# =========================
def graphAttributeAdded(self, source_id, time_id, attribute, value):
event= self._encodeByte(EVENT_ADD_GRAPH_ATTR)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(attribute)
type = self._getType(value)
event = event + self._encodeByte(type)
event = event + self._encodeValue(value,type)
self.transport.send(event)
def graphAttributeChanged(self, source_id, time_id, attribute, old_value, new_value):
event = self._encodeByte(EVENT_CHG_GRAPH_ATTR)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(attribute)
type = self._getType(old_value)
event = event + self._encodeByte(type)
event = event + self._encodeValue(old_value,type)
type = self._getType(new_value)
event = event + self._encodeByte(type)
event = event + self._encodeValue(new_value,type)
self.transport.send(event)
def graphAttributeRemoved(self, source_id, time_id, attribute):
event = self._encodeByte(EVENT_DEL_GRAPH_ATTR)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(attribute)
self.transport.send(event)
def nodeAttributeAdded(self, source_id, time_id, node_id, attribute, value):
event = self._encodeByte(EVENT_ADD_NODE_ATTR)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(node_id)
event = event + self._encodeString(attribute)
type = self._getType(value)
event = event + self._encodeByte(type)
event = event + self._encodeValue(value,type)
self.transport.send(event)
def nodeAttributeChanged(self, source_id, time_id, node_id, attribute, old_value, new_value):
event = self._encodeByte(EVENT_CHG_NODE_ATTR)
event = event + self._encodeString(source_id) + self._encodeLong(time_id)
event = event + self._encodeString(node_id)
event = event + self._encodeString(attribute)
type = self._getType(old_value)
event = event + self._encodeByte(type)
event = event + self._encodeValue(old_value,type)