Commit 558ab5a8 authored by Yoann Pigné's avatar Yoann Pigné

Varints implemented fo the sender.

parent 8a5d71b1
......@@ -54,7 +54,11 @@ void NetStreamSender::init()
else
std::cout<<"."<<std::flush;
wait_for_server++;
sleep(1);
#ifdef WIN32
Sleep(ms);
#else
usleep(1000);
#endif
}
}
}
......@@ -64,71 +68,71 @@ void NetStreamSender::init()
// ===========================================
int NetStreamSender::_getType(char object){
if(debug){
cerr<<"NetStreamSernder: _getType : char"<<endl;
cerr<<" NetStreamSender: _getType : char"<<endl;
}
return TYPE_BYTE;
}
int NetStreamSender::_getType(bool object){
if(debug){
cerr<<"NetStreamSernder: _getType : bool"<<endl;
cerr<<" NetStreamSender: _getType : bool"<<endl;
}
return TYPE_BOOLEAN;}
int NetStreamSender::_getType(int object){
if(debug){
cerr<<"NetStreamSernder: _getType : int"<<endl;
cerr<<" NetStreamSender: _getType : int"<<endl;
}
return TYPE_INT;}
int NetStreamSender::_getType(long object){
if(debug){
cerr<<"NetStreamSernder: _getType : long"<<endl;
cerr<<" NetStreamSender: _getType : long"<<endl;
}
return TYPE_LONG;}
int NetStreamSender::_getType(float object){
if(debug){
cerr<<"NetStreamSernder: _getType : float"<<endl;
cerr<<" NetStreamSender: _getType : float"<<endl;
}
return TYPE_FLOAT;}
int NetStreamSender::_getType(double object){
if(debug){
cerr<<"NetStreamSernder: _getType : double"<<endl;
cerr<<" NetStreamSender: _getType : double"<<endl;
}
return TYPE_DOUBLE;}
int NetStreamSender::_getType(const string & object){
if(debug){
cerr<<"NetStreamSernder: _getType : string"<<endl;
cerr<<" NetStreamSender: _getType : string"<<endl;
}
return TYPE_STRING;}
int NetStreamSender::_getType(const vector<char> & object){
if(debug){
cerr<<"NetStreamSernder: _getType : char* "<<endl;
cerr<<" NetStreamSender: _getType : char* "<<endl;
}
return TYPE_BYTE_ARRAY;}
int NetStreamSender::_getType(const vector <bool> & object){
if(debug){
cerr<<"NetStreamSernder: _getType : bool*"<<endl;
cerr<<" NetStreamSender: _getType : bool*"<<endl;
}
return TYPE_BOOLEAN_ARRAY;}
int NetStreamSender::_getType(const vector<int> & object){
if(debug){
cerr<<"NetStreamSernder: _getType : int*"<<endl;
cerr<<" NetStreamSender: _getType : int*"<<endl;
}
return TYPE_INT_ARRAY;}
int NetStreamSender::_getType(const vector<long> & object){
if(debug){
cerr<<"NetStreamSernder: _getType : long*"<<endl;
cerr<<" NetStreamSender: _getType : long*"<<endl;
}
return TYPE_LONG_ARRAY;}
int NetStreamSender::_getType(const vector<float> & object){
if(debug){
cerr<<"NetStreamSernder: _getType : float*"<<endl;
cerr<<" NetStreamSender: _getType : float*"<<endl;
}
return TYPE_FLOAT_ARRAY;}
int NetStreamSender::_getType(const vector <double> & object){
if(debug){
cerr<<"NetStreamSernder: _getType : double*"<<endl;
cerr<<" NetStreamSender: _getType : double*"<<endl;
}
return TYPE_DOUBLE_ARRAY;}
......@@ -143,10 +147,10 @@ void NetStreamSender::_encode(NetStreamStorage & event, bool value){
event.writeByte(value?1:0);
}
void NetStreamSender::_encode(NetStreamStorage & event, int value){
event.writeInt(value);
event.writeVarint(value);
}
void NetStreamSender::_encode(NetStreamStorage & event, long value){
event.writeLong(value);
event.writeVarint(value);
}
void NetStreamSender::_encode(NetStreamStorage & event, float value){
event.writeFloat(value);
......@@ -158,37 +162,37 @@ void NetStreamSender::_encode(NetStreamStorage & event,const string & value){
event.writeString(value);
}
void NetStreamSender::_encode(NetStreamStorage & event, const vector<char> & value){
event.writeInt(value.size());
event.writeUnsignedVarint(value.size());
for(vector<char>::const_iterator i = value.begin(); i != value.end(); i++){
event.writeByte((*i));
}
}
void NetStreamSender::_encode(NetStreamStorage & event, const vector<bool> & value){
event.writeInt(value.size());
event.writeUnsignedVarint(value.size());
for(vector<bool>::const_iterator i = value.begin(); i != value.end(); i++){
event.writeByte((*i));
}
}
void NetStreamSender::_encode(NetStreamStorage & event, const vector<int> & value){
event.writeInt(value.size());
event.writeUnsignedVarint(value.size());
for(vector<int>::const_iterator i = value.begin(); i != value.end(); i++){
event.writeInt((*i));
event.writeVarint((*i));
}
}
void NetStreamSender::_encode(NetStreamStorage & event, const vector<long> & value){
event.writeInt(value.size());
event.writeUnsignedVarint(value.size());
for(vector<long>::const_iterator i = value.begin(); i != value.end(); i++){
event.writeLong((*i));
event.writeVarint((*i));
}
}
void NetStreamSender::_encode(NetStreamStorage & event, const vector<float> & value){
event.writeInt(value.size());
event.writeUnsignedVarint(value.size());
for(vector<float>::const_iterator i = value.begin(); i != value.end(); i++){
event.writeFloat((*i));
}
}
void NetStreamSender::_encode(NetStreamStorage & event, const vector<double> & value){
event.writeInt(value.size());
event.writeUnsignedVarint(value.size());
for(vector<double>::const_iterator i = value.begin(); i != value.end(); i++){
event.writeDouble((*i));
}
......@@ -223,7 +227,7 @@ void NetStreamSender::addNode(const string & source_id, long time_id, const stri
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_ADD_NODE);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(node_id);
_sendEvent(event);
}
......@@ -231,7 +235,7 @@ void NetStreamSender::removeNode(const string & source_id, long time_id, const s
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_DEL_NODE);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(node_id);
_sendEvent(event);
}
......@@ -239,7 +243,7 @@ void NetStreamSender::addEdge(const string & source_id, long time_id, const stri
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_ADD_EDGE);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(edge_id);
event.writeString(from_node);
event.writeString(to_node);
......@@ -250,7 +254,7 @@ void NetStreamSender::removeEdge(const string & source_id, long time_id, const s
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_DEL_EDGE);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(edge_id);
_sendEvent(event);
}
......@@ -258,7 +262,7 @@ void NetStreamSender::stepBegins(const string & source_id, long time_id, double
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_STEP);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeDouble(timestamp);
_sendEvent(event);
}
......@@ -266,7 +270,7 @@ void NetStreamSender::graphClear(const string & source_id, long time_id){
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_CLEARED);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
_sendEvent(event);
}
......@@ -278,7 +282,7 @@ void NetStreamSender::removeNodeAttribute(const string & source_id, long time_id
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_DEL_NODE_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(node_id);
event.writeString(attribute);
_sendEvent(event);
......@@ -287,7 +291,7 @@ void NetStreamSender::removeGraphAttribute(const string & source_id, long time_i
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_DEL_GRAPH_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(attribute);
_sendEvent(event);
}
......@@ -295,7 +299,7 @@ void NetStreamSender::removeEdgeAttribute(const string & source_id, long time_id
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_DEL_EDGE_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(edge_id);
event.writeString(attribute);
_sendEvent(event);
......
......@@ -13,10 +13,17 @@
#ifndef NETSTREAM_SENDER_H
#define NETSTREAM_SENDER_H
#include <iostream>
#include <sys/socket.h>
#include <errno.h>
#ifndef WIN32
#include <unistd.h>
#else
#include <windows.h>
#endif
#include <iostream>
#include "netstream-storage.h"
#include "netstream-socket.h"
......@@ -117,7 +124,7 @@ public:
NetStreamStorage event = NetStreamStorage();
event.writeByte(netstream::EVENT_ADD_GRAPH_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(attribute);
event.writeByte(getType(value));
encode(event, value);
......@@ -129,7 +136,7 @@ public:
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_CHG_GRAPH_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(attribute);
event.writeByte(getType(oldValue));
encode(event, oldValue);
......@@ -145,7 +152,7 @@ public:
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_ADD_NODE_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(node_id);
event.writeString(attribute);
event.writeByte(getType(value));
......@@ -158,7 +165,7 @@ public:
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_CHG_NODE_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(node_id);
event.writeString(attribute);
event.writeByte(getType(oldValue));
......@@ -175,7 +182,7 @@ public:
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_ADD_EDGE_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(edge_id);
event.writeString(attribute);
event.writeByte(getType(value));
......@@ -189,7 +196,7 @@ public:
NetStreamStorage event = NetStreamStorage();
event.writeByte(EVENT_CHG_EDGE_ATTR);
event.writeString(source_id);
event.writeLong(time_id);
event.writeUnsignedVarint(time_id);
event.writeString(edge_id);
event.writeString(attribute);
event.writeByte(getType(oldValue));
......
......@@ -24,6 +24,7 @@
#include <netdb.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#else
#ifdef ERROR
#undef ERROR
......
......@@ -36,9 +36,6 @@ namespace netstream
// ----------------------------------------------------------------------
NetStreamStorage::NetStreamStorage(unsigned char packet[], int length)
{
// Length is calculated, if -1, or given
if (length == -1) length = sizeof(packet) / sizeof(unsigned char);
store.reserve(length);
// Get the content
for(int i = 0; i < length; ++i) store.push_back(packet[i]);
......@@ -114,6 +111,75 @@ namespace netstream
}
size_t NetStreamStorage::varintSize(uint_fast64_t data){
// 7 bits -> 127
if(data < (1L << 7)){return 1;}
// 14 bits -> 16383
if(data < (1L << 14)){return 2;}
// 21 bits -> 2097151
if(data < (1L << 21)){return 3;}
// 28 bits -> 268435455
if(data < (1L << 28)){return 4;}
// 35 bits -> 34359738367
if(data < (1L << 35)){return 5;}
// 42 bits -> 4398046511103
if(data < (1L << 42)){return 6;}
// 49 bits -> 562949953421311
if(data < (1L << 49)){return 7;}
// 56 bits -> 72057594037927935
if(data < (1L << 56)){return 8;}
return 9;
}
// ----------------------------------------------------------------------
/**
* Reads a varint form the array
* @return The read varint
*/
int_fast64_t NetStreamStorage::readVarint() throw(std::invalid_argument)
{
uint_fast64_t number = readUnsignedVarint();
return (int_fast64_t)((number & 1) == 0) ? number >> 1 : -(number >> 1);
}
// ----------------------------------------------------------------------
/**
*
*/
void NetStreamStorage::writeVarint(int_fast64_t value) throw(std::invalid_argument)
{
writeUnsignedVarint((value << 1) ^ (value >> 63));
}
// ----------------------------------------------------------------------
/**
* Reads a unsigned varint form the array
* @return The read u_varint
*/
uint_fast64_t NetStreamStorage::readUnsignedVarint() throw(std::invalid_argument)
{
// TODO
return 0;
}
// ----------------------------------------------------------------------
/**
*
*/
void NetStreamStorage::writeUnsignedVarint(uint_fast64_t value) throw(std::invalid_argument)
{
size_t size = varintSize(value);
unsigned char buffer[size];
for(int i = 0; i < size; i++){
int head=128;
if(i==size-1) head = 0;
long b = ((value >> (7*i)) & 127) ^ head;
buffer[size-1-i] = ((unsigned char)(b & 255 ));
}
writeByEndianess(buffer, size);
}
// ----------------------------------------------------------------------
/**
* Reads a byte form the array
......@@ -190,8 +256,7 @@ namespace netstream
*/
void NetStreamStorage::writeString(const std::string &s) throw()
{
writeInt(static_cast<int>(s.length()));
writeUnsignedVarint(static_cast<size_t>(s.length()));
store.insert(store.end(), s.begin(), s.end());
iter_ = store.begin();
}
......@@ -222,7 +287,7 @@ namespace netstream
*/
void NetStreamStorage::writeStringList(const std::vector<std::string> &s) throw()
{
writeInt(static_cast<int>(s.size()));
writeUnsignedVarint(s.size());
for (std::vector<std::string>::const_iterator it = s.begin(); it!=s.end() ; it++)
{
writeString(*it);
......@@ -258,6 +323,7 @@ namespace netstream
short svalue = static_cast<short>(value);
unsigned char *p_svalue = reinterpret_cast<unsigned char*>(&svalue);
writeByEndianess(p_svalue, 2);
}
......
......@@ -17,6 +17,7 @@
#include <vector>
#include <string>
#include <stdexcept>
#include <cstdint>
namespace netstream
{
......@@ -66,6 +67,13 @@ public:
void reset();
virtual size_t varintSize(uint_fast64_t);
virtual uint_fast64_t readUnsignedVarint() throw(std::invalid_argument);
virtual void writeUnsignedVarint(uint_fast64_t) throw(std::invalid_argument);
virtual int_fast64_t readVarint() throw(std::invalid_argument);
virtual void writeVarint(int_fast64_t) throw(std::invalid_argument);
virtual unsigned char readChar() throw(std::invalid_argument);
virtual void writeChar(unsigned char) throw();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment