Skip to content

Commit cb525d4

Browse files
zmxdreamchao9527DesmonDayhuwei02yangjunchao
authored
[Pglbox2.0] merge gpugraph to develop (#49946)
* add set slot_num for psgpuwraper (#177) * add set slot_num_for_pull_feature for psgpuwarper * Add get_epoch_finish python interface (#182) * add get_epoch_finish interface * add return * delete return * add unzip op (#183) * fix miss key for error dataset (#186) * fix miss key for error dataset * fix miss key for error dataset Co-authored-by: yangjunchao <yangjunchao@baidu.com> * add excluded_train_pair and infer_node_type (#187) * support return of degree (#188) * fix task stuck in barrier (#189) Co-authored-by: yangjunchao <yangjunchao@baidu.com> * check node/feature format when loading (#190) * check node&feature format when loading * check node&feature format when loading (2£ (2) * degrade log (#191) * [PGLBOX]fix conflict * [PGLBOX]fix conflict * [PGLBOX]replace LodTensor with phi::DenseTensor * [PGLBOX]fix gpu_primitives.h include path * [PGLBOX]from platform::PADDLE_CUDA_NUM_THREADS to phi::PADDLE_CUDA_NUM_THREADS * [PGLBOX]fix unzip example code * [PGLBOX]fix unzip example code * [PGLBOX]fix unzip example code * [PGLBOX]fix unzip example code * [PGLBOX]fix unzip ut * [PGLBOX]fix unzip ut * [PGLBOX]fix code style * [PGLBOX]fix code style * [PGLBOX]fix code style * fix code style * fix code style * fix unzip ut * fix unzip ut * fix unzip ut * fix unzip * fix code stype * add ut * add c++ ut & fix train_mode_ set * fix load into memory * fix c++ ut * fix c++ ut * fix c++ ut * fix c++ ut * fix code style * fix collective * fix unzip_op.cc * fix barrier * fix code style * fix barrier * fix barrier * fix code styple * fix unzip * add unzip.py * add unzip.py * fix unzip.py --------- Co-authored-by: chao9527 <33347532+chao9527@users.noreply.github.com> Co-authored-by: Siming Dai <908660116@qq.com> Co-authored-by: huwei02 <53012141+huwei02@users.noreply.github.com> Co-authored-by: yangjunchao <yangjunchao@baidu.com>
1 parent 382e9a0 commit cb525d4

34 files changed

+1261
-189
lines changed

paddle/fluid/distributed/ps/service/ps_local_client.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,10 +306,10 @@ ::std::future<int32_t> PsLocalClient::SaveCacheTable(uint32_t table_id,
306306
size_t threshold) {
307307
auto* table_ptr = GetTable(table_id);
308308
std::pair<int64_t, int64_t> ret = table_ptr->PrintTableStat();
309-
VLOG(0) << "table id: " << table_id << ", feasign size: " << ret.first
309+
VLOG(1) << "table id: " << table_id << ", feasign size: " << ret.first
310310
<< ", mf size: " << ret.second;
311311
if (ret.first > (int64_t)threshold) {
312-
VLOG(0) << "run cache table";
312+
VLOG(1) << "run cache table";
313313
table_ptr->CacheTable(pass_id);
314314
}
315315
return done();

paddle/fluid/distributed/ps/table/common_graph_table.cc

Lines changed: 96 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,13 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
124124
}
125125
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
126126

127-
std::stringstream ss;
128-
for (int k = 0; k < slot_num; ++k) {
129-
ss << slot_feature_num_map_[k] << " ";
127+
if (FLAGS_v > 0) {
128+
std::stringstream ss;
129+
for (int k = 0; k < slot_num; ++k) {
130+
ss << slot_feature_num_map_[k] << " ";
131+
}
132+
VLOG(1) << "slot_feature_num_map: " << ss.str();
130133
}
131-
VLOG(0) << "slot_feature_num_map: " << ss.str();
132134

133135
tasks.clear();
134136

@@ -137,7 +139,7 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
137139
for (size_t i = 0; i < shard_num; i++) {
138140
tot_len += feature_array[i].size();
139141
}
140-
VLOG(0) << "Loaded feature table on cpu, feature_list_size[" << tot_len
142+
VLOG(1) << "Loaded feature table on cpu, feature_list_size[" << tot_len
141143
<< "] node_ids_size[" << node_ids.size() << "]";
142144
res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), slot_num);
143145
unsigned int offset = 0, ind = 0;
@@ -494,6 +496,8 @@ void GraphTable::export_partition_files(int idx, std::string file_path) {
494496

495497
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
496498
}
499+
#endif
500+
497501
void GraphTable::clear_graph(int idx) {
498502
for (auto p : edge_shards[idx]) {
499503
p->clear();
@@ -506,6 +510,7 @@ void GraphTable::clear_graph(int idx) {
506510
}
507511
}
508512

513+
#ifdef PADDLE_WITH_HETERPS
509514
void GraphTable::release_graph() {
510515
// Before releasing graph, prepare for sampling ids and embedding keys.
511516
build_graph_type_keys();
@@ -545,6 +550,7 @@ void GraphTable::release_graph_node() {
545550
feature_shrink_to_fit();
546551
}
547552
}
553+
#endif
548554

549555
void GraphTable::clear_edge_shard() {
550556
VLOG(0) << "begin clear edge shard";
@@ -590,6 +596,7 @@ void GraphTable::clear_feature_shard() {
590596
VLOG(0) << "finish clear feature shard";
591597
}
592598

599+
#ifdef PADDLE_WITH_HETERPS
593600
void GraphTable::feature_shrink_to_fit() {
594601
std::vector<std::future<int>> tasks;
595602
for (auto &type_shards : feature_shards) {
@@ -619,13 +626,16 @@ void GraphTable::merge_feature_shard() {
619626
feature_shards.resize(1);
620627
}
621628

629+
#endif
630+
622631
void GraphTable::clear_graph() {
623632
VLOG(0) << "begin clear_graph";
624633
clear_edge_shard();
625634
clear_feature_shard();
626635
VLOG(0) << "finish clear_graph";
627636
}
628637

638+
#ifdef PADDLE_WITH_HETERPS
629639
int32_t GraphTable::load_next_partition(int idx) {
630640
if (next_partition >= static_cast<int>(partitions[idx].size())) {
631641
VLOG(0) << "partition iteration is done";
@@ -1203,11 +1213,21 @@ int32_t GraphTable::Load(const std::string &path, const std::string &param) {
12031213
if (load_edge) {
12041214
bool reverse_edge = (param[1] == '<');
12051215
std::string edge_type = param.substr(2);
1206-
return this->load_edges(path, reverse_edge, edge_type);
1216+
int ret = this->load_edges(path, reverse_edge, edge_type);
1217+
if (ret != 0) {
1218+
VLOG(0) << "Fail to load edges, path[" << path << "] edge_type["
1219+
<< edge_type << "]";
1220+
return -1;
1221+
}
12071222
}
12081223
if (load_node) {
12091224
std::string node_type = param.substr(1);
1210-
return this->load_nodes(path, node_type);
1225+
int ret = this->load_nodes(path, node_type);
1226+
if (ret != 0) {
1227+
VLOG(0) << "Fail to load nodes, path[" << path << "] node_type["
1228+
<< node_type << "]";
1229+
return -1;
1230+
}
12111231
}
12121232
return 0;
12131233
}
@@ -1319,10 +1339,19 @@ int32_t GraphTable::parse_node_and_load(std::string ntype2files,
13191339
return 0;
13201340
}
13211341
if (FLAGS_graph_load_in_parallel) {
1322-
this->load_nodes(npath_str, "");
1342+
int ret = this->load_nodes(npath_str, "");
1343+
if (ret != 0) {
1344+
VLOG(0) << "Fail to load nodes, path[" << npath << "]";
1345+
return -1;
1346+
}
13231347
} else {
13241348
for (size_t j = 0; j < ntypes.size(); j++) {
1325-
this->load_nodes(npath_str, ntypes[j]);
1349+
int ret = this->load_nodes(npath_str, ntypes[j]);
1350+
if (ret != 0) {
1351+
VLOG(0) << "Fail to load nodes, path[" << npath << "], ntypes["
1352+
<< ntypes[j] << "]";
1353+
return -1;
1354+
}
13261355
}
13271356
}
13281357
return 0;
@@ -1397,17 +1426,30 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype2files,
13971426
return 0;
13981427
}
13991428
if (FLAGS_graph_load_in_parallel) {
1400-
this->load_nodes(npath_str, "");
1429+
int ret = this->load_nodes(npath_str, "");
1430+
if (ret != 0) {
1431+
VLOG(0) << "Fail to load nodes, path[" << npath_str << "]";
1432+
return -1;
1433+
}
14011434
} else {
14021435
for (size_t j = 0; j < ntypes.size(); j++) {
1403-
this->load_nodes(npath_str, ntypes[j]);
1436+
int ret = this->load_nodes(npath_str, ntypes[j]);
1437+
if (ret != 0) {
1438+
VLOG(0) << "Fail to load nodes, path[" << npath_str
1439+
<< "], ntypes[" << ntypes[j] << "]";
1440+
return -1;
1441+
}
14041442
}
14051443
}
14061444
}
14071445
return 0;
14081446
}));
14091447
}
14101448
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
1449+
if (is_parse_node_fail_) {
1450+
VLOG(0) << "Fail to load node_and_edge_file";
1451+
return -1;
1452+
}
14111453
return 0;
14121454
}
14131455

@@ -1499,7 +1541,12 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
14991541
node->set_feature_size(feat_name[idx].size());
15001542
for (int i = 1; i < num; ++i) {
15011543
auto &v = vals[i];
1502-
parse_feature(idx, v.ptr, v.len, node);
1544+
int ret = parse_feature(idx, v.ptr, v.len, node);
1545+
if (ret != 0) {
1546+
VLOG(0) << "Fail to parse feature, node_id[" << id << "]";
1547+
is_parse_node_fail_ = true;
1548+
return {0, 0};
1549+
}
15031550
}
15041551
}
15051552
local_valid_count++;
@@ -1551,7 +1598,12 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
15511598
if (node != NULL) {
15521599
for (int i = 2; i < num; ++i) {
15531600
auto &v = vals[i];
1554-
parse_feature(idx, v.ptr, v.len, node);
1601+
int ret = parse_feature(idx, v.ptr, v.len, node);
1602+
if (ret != 0) {
1603+
VLOG(0) << "Fail to parse feature, node_id[" << id << "]";
1604+
is_parse_node_fail_ = true;
1605+
return {0, 0};
1606+
}
15551607
}
15561608
}
15571609
local_valid_count++;
@@ -1603,6 +1655,11 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) {
16031655
valid_count += res.second;
16041656
}
16051657
}
1658+
if (is_parse_node_fail_) {
1659+
VLOG(0) << "Fail to load nodes, path[" << paths[0] << ".."
1660+
<< paths[paths.size() - 1] << "] node_type[" << node_type << "]";
1661+
return -1;
1662+
}
16061663

16071664
VLOG(0) << valid_count << "/" << count << " nodes in node_type[ " << node_type
16081665
<< "] are loaded successfully!";
@@ -2103,36 +2160,56 @@ int GraphTable::parse_feature(int idx,
21032160
if (dtype == "feasign") {
21042161
// string_vector_2_string(fields.begin() + 1, fields.end(), ' ',
21052162
// fea_ptr);
2106-
FeatureNode::parse_value_to_bytes<uint64_t>(
2163+
int ret = FeatureNode::parse_value_to_bytes<uint64_t>(
21072164
fea_fields.begin(), fea_fields.end(), fea_ptr);
2165+
if (ret != 0) {
2166+
VLOG(0) << "Fail to parse value";
2167+
return -1;
2168+
}
21082169
return 0;
21092170
} else if (dtype == "string") {
21102171
string_vector_2_string(
21112172
fea_fields.begin(), fea_fields.end(), ' ', fea_ptr);
21122173
return 0;
21132174
} else if (dtype == "float32") {
2114-
FeatureNode::parse_value_to_bytes<float>(
2175+
int ret = FeatureNode::parse_value_to_bytes<float>(
21152176
fea_fields.begin(), fea_fields.end(), fea_ptr);
2177+
if (ret != 0) {
2178+
VLOG(0) << "Fail to parse value";
2179+
return -1;
2180+
}
21162181
return 0;
21172182
} else if (dtype == "float64") {
2118-
FeatureNode::parse_value_to_bytes<double>(
2183+
int ret = FeatureNode::parse_value_to_bytes<double>(
21192184
fea_fields.begin(), fea_fields.end(), fea_ptr);
2185+
if (ret != 0) {
2186+
VLOG(0) << "Fail to parse value";
2187+
return -1;
2188+
}
21202189
return 0;
21212190
} else if (dtype == "int32") {
2122-
FeatureNode::parse_value_to_bytes<int32_t>(
2191+
int ret = FeatureNode::parse_value_to_bytes<int32_t>(
21232192
fea_fields.begin(), fea_fields.end(), fea_ptr);
2193+
if (ret != 0) {
2194+
VLOG(0) << "Fail to parse value";
2195+
return -1;
2196+
}
21242197
return 0;
21252198
} else if (dtype == "int64") {
2126-
FeatureNode::parse_value_to_bytes<uint64_t>(
2199+
int ret = FeatureNode::parse_value_to_bytes<uint64_t>(
21272200
fea_fields.begin(), fea_fields.end(), fea_ptr);
2201+
if (ret != 0) {
2202+
VLOG(0) << "Fail to parse value";
2203+
return -1;
2204+
}
21282205
return 0;
21292206
}
21302207
} else {
21312208
VLOG(2) << "feature_name[" << name << "] is not in feat_id_map, ntype_id["
21322209
<< idx << "] feat_id_map_size[" << feat_id_map.size() << "]";
21332210
}
21342211

2135-
return -1;
2212+
return 0;
21362213
}
21372214
// thread safe shard vector merge
21382215
class MergeShardVector {

paddle/fluid/distributed/ps/table/common_graph_table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,7 @@ class GraphTable : public Table {
789789
std::string slot_feature_separator_ = std::string(" ");
790790
std::string feature_separator_ = std::string(" ");
791791
std::vector<int> slot_feature_num_map_;
792+
bool is_parse_node_fail_ = false;
792793
};
793794

794795
/*

paddle/fluid/distributed/ps/table/graph/graph_node.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class FeatureNode : public Node {
255255
}
256256

257257
template <typename T>
258-
static void parse_value_to_bytes(
258+
static int parse_value_to_bytes(
259259
std::vector<paddle::string::str_ptr>::iterator feat_str_begin,
260260
std::vector<paddle::string::str_ptr>::iterator feat_str_end,
261261
std::string *output) {
@@ -269,8 +269,14 @@ class FeatureNode : public Node {
269269
thread_local paddle::string::str_ptr_stream ss;
270270
for (size_t i = 0; i < feat_str_size; i++) {
271271
ss.reset(*(feat_str_begin + i));
272+
int len = ss.end - ss.ptr;
273+
char *old_ptr = ss.ptr;
272274
ss >> fea_ptrs[i];
275+
if (ss.ptr - old_ptr != len) {
276+
return -1;
277+
}
273278
}
279+
return 0;
274280
}
275281

276282
protected:

paddle/fluid/framework/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -866,7 +866,8 @@ if(WITH_DISTRIBUTE)
866866
fleet
867867
heter_server
868868
brpc
869-
fleet_executor)
869+
fleet_executor
870+
flags)
870871
set(DISTRIBUTE_COMPILE_FLAGS
871872
"-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=parentheses"
872873
)

paddle/fluid/framework/barrier.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414

1515
#pragma once
1616

17+
#if defined _WIN32 || defined __APPLE__
18+
#else
19+
#define __LINUX__
20+
#endif
21+
1722
#ifdef __LINUX__
1823
#include <pthread.h>
1924
#include <semaphore.h>
@@ -48,7 +53,7 @@ class Barrier {
4853
void wait() {
4954
#ifdef __LINUX__
5055
int err = pthread_barrier_wait(&_barrier);
51-
if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD)) {
56+
if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD) {
5257
CHECK_EQ(1, 0);
5358
}
5459
#endif

paddle/fluid/framework/data_feed.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,15 +2112,24 @@ void SlotRecordInMemoryDataFeed::Init(const DataFeedDesc& data_feed_desc) {
21122112
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
21132113
gpu_graph_data_generator_.SetConfig(data_feed_desc);
21142114
#endif
2115+
if (gpu_graph_mode_) {
2116+
train_mode_ = true;
2117+
} else {
2118+
train_mode_ = data_feed_desc.graph_config().gpu_graph_training();
2119+
}
21152120
}
21162121

21172122
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
21182123
void SlotRecordInMemoryDataFeed::InitGraphResource() {
2124+
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
21192125
gpu_graph_data_generator_.AllocResource(thread_id_, feed_vec_);
2126+
#endif
21202127
}
21212128

21222129
void SlotRecordInMemoryDataFeed::InitGraphTrainResource() {
2130+
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
21232131
gpu_graph_data_generator_.AllocTrainResource(thread_id_);
2132+
#endif
21242133
}
21252134
#endif
21262135

0 commit comments

Comments
 (0)