Are you sure you want to delete this task? Once this task is deleted, it cannot be recovered.
wangyilin 715feb2d41 | 2 years ago | |
---|---|---|
.. | ||
img | 2 years ago | |
Accumulator.h | 2 years ago | |
AccumulatorClient.cpp | 2 years ago | |
AccumulatorClient.h | 2 years ago | |
AccumulatorManager.cpp | 2 years ago | |
AccumulatorManager.h | 2 years ago | |
AccumulatorServer.cpp | 2 years ago | |
AccumulatorServer.h | 2 years ago | |
Aggregator.h | 2 years ago | |
AggregatorFactory.h | 2 years ago | |
ArithmeticMaxAggregator.h | 2 years ago | |
ArithmeticMinAggregator.h | 2 years ago | |
AutoTimer.h | 2 years ago | |
AvgAggregator.h | 2 years ago | |
Design.md | 2 years ago | |
Design_cn.md | 2 years ago | |
README.md | 2 years ago | |
README_cn.md | 2 years ago | |
SumAggregator.h | 2 years ago | |
TimerAggregator.h | 2 years ago |
English version | 中文版
Accumulator
is a distributed component. Each node can push data, and Accumulator
server aggregates all the pushed data for query. Accumulator
supports custom aggregator while some aggregators are predifined.
The predefined aggregators are as follows:
The following is a simple distributed example.
Before using Accumulator
, each node must initialize prpc firstly.
_master = std::make_unique<Master>("127.0.0.1");
_master->initialize();
_mc = std::make_unique<TcpMasterClient>(_master->endpoint());
_mc->initialize();
RpcConfig rpc_config;
rpc_config.protocol = "tcp";
rpc_config.bind_ip = "127.0.0.1";
rpc_config.io_thread_num = 1;
_rpc = std::make_unique<RpcService>();
_rpc->initialize(_mc.get(), rpc_config);
Then, select a node as the server node, and the server node needs to be responsible for receiving data from all nodes. Here, the first node that is successfully initialized is selected as the server node.
if (_rpc->global_rank() == 0) {
AccumulatorServer::singleton().initialize(_rpc.get());
}
AccumulatorClient::singleton().initialize(_rpc.get());
Then define the same Accumulator
on all nodes. Here, "sum_int_count_single_ok" is the ID of the Accumulator
. Same ID in different nodes will point to the same Accumulator
entity. And SumAggregator<int64_t>
is the aggregator, which must be strictly consistent when Accumulator
defined on different nodes. Parameter 10 is the refresh frequency, which means that every 10 times write
the client aggregates and pushes data to the server.
Accumulator<SumAggregator<int64_t>> counter("sum_int_counter_single_ok", 10)
After that, each node pushes different data according to its rank.
const int count_max = 1000;
for (int i = 0; i < count_max; i++) {
ASSERT_TRUE(counter.write((i+1) * (_rpc.global_rank()+1)));
}
AccumulatorClient::singleton().wait_empty();
After the data of all nodes have been pushed, you can get the summary and check it.
ASSERT_TRUE(counter.try_read_to_string(cnt_res));
std::string right_res = boost::lexical_cast<std::string>((1+count_max)*count_max/2
* _rpc.global_rank() * (_rpc.global_rank() + 1) / 2);
EXPECT_STREQ(right_res.c_str(), cnt_res.c_str());
Finally, the resources need to be released in sequence.
if (_rpc.global_rank() == 0) {
AccumulatorClient::singleton().erase_all();
}
AccumulatorClient::singleton().finalize();
if (_rpc.global_rank() == 0) {
AccumulatorServer::singleton().finalize();
}
_rpc->finalize();
_master->exit();
_master->finalize();
An RPC framework that provides network communication for high-performance computing, with components such as accumulator.
C++ Markdown CMake Text other
Dear OpenI User
Thank you for your continuous support to the Openl Qizhi Community AI Collaboration Platform. In order to protect your usage rights and ensure network security, we updated the Openl Qizhi Community AI Collaboration Platform Usage Agreement in January 2024. The updated agreement specifies that users are prohibited from using intranet penetration tools. After you click "Agree and continue", you can continue to use our services. Thank you for your cooperation and understanding.
For more agreement content, please refer to the《Openl Qizhi Community AI Collaboration Platform Usage Agreement》