diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx index e6247f5c..bd33430f 100644 --- a/fairmq/FairMQLogger.cxx +++ b/fairmq/FairMQLogger.cxx @@ -52,8 +52,11 @@ std::ostringstream& FairMQLogger::Log(int type) case ERROR: type_str = "\033[01;31mERROR\033[0m"; break; + case WARN: + type_str = "\033[01;33mWARN\033[0m"; + break; case STATE: - type_str = "\033[01;33mSTATE\033[0m"; + type_str = "\033[01;35mSTATE\033[0m"; default: break; } diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h index 6464b539..2846622f 100644 --- a/fairmq/FairMQLogger.h +++ b/fairmq/FairMQLogger.h @@ -31,6 +31,7 @@ class FairMQLogger DEBUG, INFO, ERROR, + WARN, STATE }; FairMQLogger(); @@ -46,5 +47,6 @@ typedef unsigned long long timestamp_t; timestamp_t get_timestamp(); #define LOG(type) FairMQLogger().Log(FairMQLogger::type) +#define MQLOG(type) FairMQLogger().Log(FairMQLogger::type) #endif /* FAIRMQLOGGER_H_ */ diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 5d61122e..a6c16f70 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -29,14 +29,15 @@ class FairMQSocket virtual void Bind(const string& address) = 0; virtual void Connect(const string& address) = 0; - virtual size_t Send(FairMQMessage* msg) = 0; - virtual size_t Receive(FairMQMessage* msg) = 0; + virtual size_t Send(FairMQMessage* msg, const string& flag="") = 0; + virtual size_t Receive(FairMQMessage* msg, const string& flag="") = 0; virtual void* GetSocket() = 0; virtual int GetSocket(int nothing) = 0; virtual void Close() = 0; virtual void SetOption(const string& option, const void* value, size_t valueSize) = 0; + virtual void GetOption(const string& option, void* value, size_t* valueSize) = 0; virtual unsigned long GetBytesTx() = 0; virtual unsigned long GetBytesRx() = 0; diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 5a644b4c..868525d0 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -176,18 +176,18 @@ namespace FairMQFSM } // Transition table for FairMQFMS struct transition_table : mpl::vector< - // Start Event Next Action Guard - // +---------+---------+-------+---------+--------+ - msmf::Row, - msmf::Row, // this is an invalid transition... - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row > + // Start Event Next Action Guard + // +------------------+----------+------------------+-------------+---------+ + msmf::Row, + msmf::Row, // this is an invalid transition... + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row > { }; // Replaces the default no-transition response. diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 93983111..d535dffb 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -37,6 +37,12 @@ FairMQMessageNN::FairMQMessageNN(size_t size) fReceiving = false; } + +/* nanomsg does not offer support for creating a message out of an existing buffer, + * therefore the following method is using memcpy. For more efficient handling, + * create FairMQMessage object only with size parameter and fill it with data. + * possible TODO: make this zero copy (will should then be as efficient as ZeroMQ). +*/ FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { fMessage = nn_allocmsg(size, 0); diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index c67ea982..c4ab2330 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -69,7 +69,7 @@ void FairMQSocketNN::Connect(const string& address) } } -size_t FairMQSocketNN::Send(FairMQMessage* msg) +size_t FairMQSocketNN::Send(FairMQMessage* msg, const string& flag) { void* ptr = msg->GetMessage(); int rc = nn_send(fSocket, &ptr, NN_MSG, 0); @@ -87,7 +87,7 @@ size_t FairMQSocketNN::Send(FairMQMessage* msg) return rc; } -size_t FairMQSocketNN::Receive(FairMQMessage* msg) +size_t FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) { void* ptr = NULL; int rc = nn_recv(fSocket, &ptr, NN_MSG, 0); @@ -130,6 +130,14 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v } } +void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueSize) +{ + int rc = nn_getsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); + if (rc < 0) { + LOG(ERROR) << "failed getting socket option, reason: " << nn_strerror(errno); + } +} + unsigned long FairMQSocketNN::GetBytesTx() { return fBytesTx; @@ -152,6 +160,8 @@ unsigned long FairMQSocketNN::GetMessagesRx() int FairMQSocketNN::GetConstant(const string& constant) { + if (constant == "") + return 0; if (constant == "sub") return NN_SUB; if (constant == "pub") @@ -168,6 +178,14 @@ int FairMQSocketNN::GetConstant(const string& constant) return NN_SNDBUF; if (constant == "rcv-hwm") return NN_RCVBUF; + if (constant == "snd-more") { + LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; + return -1; + } + if (constant == "rcv-more") { + LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; + return -1; + } return -1; } diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 9ec083f8..8ae536e2 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -31,14 +31,15 @@ class FairMQSocketNN : public FairMQSocket virtual void Bind(const string& address); virtual void Connect(const string& address); - virtual size_t Send(FairMQMessage* msg); - virtual size_t Receive(FairMQMessage* msg); + virtual size_t Send(FairMQMessage* msg, const string& flag=""); + virtual size_t Receive(FairMQMessage* msg, const string& flag=""); virtual void* GetSocket(); virtual int GetSocket(int nothing); virtual void Close(); virtual void SetOption(const string& option, const void* value, size_t valueSize); + virtual void GetOption(const string& option, void* value, size_t* valueSize); unsigned long GetBytesTx(); unsigned long GetBytesRx(); diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 970f2cf8..43027f31 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -82,9 +82,9 @@ void FairMQSocketZMQ::Connect(const string& address) } } -size_t FairMQSocketZMQ::Send(FairMQMessage* msg) +size_t FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag) { - int nbytes = zmq_msg_send(static_cast(msg->GetMessage()), fSocket, 0); + int nbytes = zmq_msg_send(static_cast(msg->GetMessage()), fSocket, GetConstant(flag)); if (nbytes >= 0) { fBytesTx += nbytes; @@ -99,9 +99,9 @@ size_t FairMQSocketZMQ::Send(FairMQMessage* msg) return nbytes; } -size_t FairMQSocketZMQ::Receive(FairMQMessage* msg) +size_t FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) { - int nbytes = zmq_msg_recv(static_cast(msg->GetMessage()), fSocket, 0); + int nbytes = zmq_msg_recv(static_cast(msg->GetMessage()), fSocket, GetConstant(flag)); if (nbytes >= 0) { fBytesRx += nbytes; @@ -152,6 +152,14 @@ void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t } } +void FairMQSocketZMQ::GetOption(const string& option, void* value, size_t* valueSize) +{ + int rc = zmq_getsockopt(fSocket, GetConstant(option), value, valueSize); + if (rc < 0) { + LOG(ERROR) << "failed getting socket option, reason: " << zmq_strerror(errno); + } +} + unsigned long FairMQSocketZMQ::GetBytesTx() { return fBytesTx; @@ -174,6 +182,8 @@ unsigned long FairMQSocketZMQ::GetMessagesRx() int FairMQSocketZMQ::GetConstant(const string& constant) { + if (constant == "") + return 0; if (constant == "sub") return ZMQ_SUB; if (constant == "pub") @@ -190,6 +200,10 @@ int FairMQSocketZMQ::GetConstant(const string& constant) return ZMQ_SNDHWM; if (constant == "rcv-hwm") return ZMQ_RCVHWM; + if (constant == "snd-more") + return ZMQ_SNDMORE; + if (constant == "rcv-more") + return ZMQ_RCVMORE; return -1; } diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 2a9389f5..e42687d1 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -32,14 +32,15 @@ class FairMQSocketZMQ : public FairMQSocket virtual void Bind(const string& address); virtual void Connect(const string& address); - virtual size_t Send(FairMQMessage* msg); - virtual size_t Receive(FairMQMessage* msg); + virtual size_t Send(FairMQMessage* msg, const string& flag=""); + virtual size_t Receive(FairMQMessage* msg, const string& flag=""); virtual void* GetSocket(); virtual int GetSocket(int nothing); virtual void Close(); virtual void SetOption(const string& option, const void* value, size_t valueSize); + virtual void GetOption(const string& option, void* value, size_t* valueSize); virtual unsigned long GetBytesTx(); virtual unsigned long GetBytesRx();