mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
FairMQ: Monitor Meta socket connection and insert address vector
This commit is contained in:
committed by
Mohammad Al-Turany
parent
92320604a9
commit
df5d5d4086
@@ -11,6 +11,7 @@
|
||||
#include <FairMQLogger.h>
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <cstring>
|
||||
#include <memory>
|
||||
#include <netinet/in.h>
|
||||
#include <rdma/fabric.h>
|
||||
@@ -79,26 +80,16 @@ auto Context::GetOfiApiVersion() const -> std::string
|
||||
return tools::ToString(FI_MAJOR(ofi_version), ".", FI_MINOR(ofi_version));
|
||||
}
|
||||
|
||||
auto Context::InitOfi(ConnectionType type, std::string address) -> void
|
||||
auto Context::InitOfi(ConnectionType type, std::string addr) -> void
|
||||
{
|
||||
// Parse address
|
||||
string protocol, ip;
|
||||
unsigned int port = 0;
|
||||
regex address_regex("^([a-z]+)://([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+):([0-9]+).*");
|
||||
smatch address_result;
|
||||
if (regex_match(address, address_result, address_regex)) {
|
||||
protocol = address_result[1];
|
||||
ip = address_result[2];
|
||||
port = stoul(address_result[3]);
|
||||
LOG(debug) << "Parsed '" << protocol << "', '" << ip << "', '" << port << "' fields from '" << address << "'";
|
||||
}
|
||||
if (protocol != "tcp") throw ContextError{"Wrong protocol: Supplied address must be in format tcp://ip:port"};
|
||||
auto addr2 = ConvertAddress(addr);
|
||||
if (addr2.Protocol != "tcp")
|
||||
throw ContextError{"Wrong protocol: Supplied address must be in format tcp://ip:port"};
|
||||
|
||||
if (!fOfiInfo) {
|
||||
sockaddr_in* sa = static_cast<sockaddr_in*>(malloc(sizeof(sockaddr_in)));
|
||||
inet_pton(AF_INET, ip.c_str(), &(sa->sin_addr));
|
||||
sa->sin_port = htons(port);
|
||||
sa->sin_family = AF_INET;
|
||||
auto sa2 = ConvertAddress(addr2);
|
||||
memcpy(sa, &sa2, sizeof(sockaddr_in));
|
||||
|
||||
// Prepare fi_getinfo query
|
||||
unique_ptr<fi_info, void(*)(fi_info*)> ofi_hints(fi_allocinfo(), fi_freeinfo);
|
||||
@@ -124,7 +115,7 @@ auto Context::InitOfi(ConnectionType type, std::string address) -> void
|
||||
if (!fOfiInfo) throw ContextError{"Could not find any ofi compatible fabric."};
|
||||
|
||||
// for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next) {
|
||||
// LOG(debug) << fi_tostr(fOfiInfo, FI_TYPE_INFO);
|
||||
LOG(debug) << fi_tostr(fOfiInfo, FI_TYPE_INFO);
|
||||
// }
|
||||
//
|
||||
} else {
|
||||
@@ -224,6 +215,45 @@ auto Context::CreateOfiCompletionQueue(Direction dir) -> fid_cq*
|
||||
return cq;
|
||||
}
|
||||
|
||||
auto Context::InsertAddressVector(sockaddr_in address) -> fi_addr_t
|
||||
{
|
||||
fi_addr_t mappedAddress;
|
||||
auto ret = fi_av_insert(fOfiAddressVector, &address, 1, &mappedAddress, 0, nullptr);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw ContextError{tools::ToString("Failed to insert address into ofi address vector, reason: ", fi_strerror(ret))};
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
auto Context::ConvertAddress(std::string address) -> Address
|
||||
{
|
||||
string protocol, ip;
|
||||
unsigned int port = 0;
|
||||
regex address_regex("^([a-z]+)://([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+):([0-9]+).*");
|
||||
smatch address_result;
|
||||
if (regex_match(address, address_result, address_regex)) {
|
||||
protocol = address_result[1];
|
||||
ip = address_result[2];
|
||||
port = stoul(address_result[3]);
|
||||
// LOG(debug) << "Parsed '" << protocol << "', '" << ip << "', '" << port << "' fields from '" << address << "'";
|
||||
} else {
|
||||
throw ContextError(tools::ToString("Wrong format: Address must be in format prot://ip:port"));
|
||||
}
|
||||
|
||||
return { protocol, ip, port };
|
||||
}
|
||||
|
||||
auto Context::ConvertAddress(Address address) -> sockaddr_in
|
||||
{
|
||||
sockaddr_in sa;
|
||||
if (inet_pton(AF_INET, address.Ip.c_str(), &(sa.sin_addr)) != 1)
|
||||
throw ContextError(tools::ToString("Failed to convert given IP '", address.Ip, "' to struct in_addr, reason: ", strerror(errno)));
|
||||
sa.sin_port = htons(address.Port);
|
||||
sa.sin_family = AF_INET;
|
||||
|
||||
return sa;
|
||||
}
|
||||
|
||||
} /* namespace ofi */
|
||||
} /* namespace mq */
|
||||
} /* namespace fair */
|
||||
|
Reference in New Issue
Block a user