Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tree-based-model #31696

Merged
merged 33 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a431b31
first_commit for index_dataset
123malin Mar 17, 2021
4849f6f
add basic index_sampler
123malin Mar 17, 2021
16fc441
tmp
123malin Mar 19, 2021
87837ac
test=develop, fix index_sampler
123malin Mar 22, 2021
56c5c15
tmp, add tree_learning & tdm model(testing)
123malin Mar 29, 2021
d05f645
tmp, fix shared_ptr
123malin Mar 30, 2021
bca106f
test=develop, update
123malin Apr 1, 2021
f2af4d0
test=develop, update
123malin Apr 1, 2021
d9fd612
test=develop, update
123malin Apr 1, 2021
99f48c8
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
123malin Apr 1, 2021
b7358d7
test=develop, add unittest
123malin Apr 1, 2021
1c0a781
test=develop, fix
123malin Apr 1, 2021
c4b83e0
test=develop, add unittest
123malin Apr 1, 2021
1904595
test=develop, syntax
123malin Apr 1, 2021
51d138c
test=develop, fix py3 bug
123malin Apr 2, 2021
480134b
test=develop, paddle_enforce error message optimize
123malin Apr 2, 2021
23aa883
test=develop, mv index_dataset to distributed
123malin Apr 6, 2021
99aeb61
test=develop, fix
123malin Apr 6, 2021
e425fe2
test=develop, fix
123malin Apr 6, 2021
b044e55
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
123malin Apr 7, 2021
26c0b23
test=develop, fix
123malin Apr 7, 2021
b74c633
test=develop, fix
123malin Apr 7, 2021
cabb3d6
test=develop, rename test_tree_index
123malin Apr 7, 2021
32f704c
test=develop, format
123malin Apr 7, 2021
943c846
test=develop, resolve conflict
123malin Apr 7, 2021
5205faa
test=develop, fix
123malin Apr 7, 2021
85ee165
test=develop, update format
123malin Apr 7, 2021
20c0de6
test=develop, fix
123malin Apr 7, 2021
f758c1f
test=develop, add python_proto
123malin Apr 8, 2021
63a32ce
test=develop, fix cmake
123malin Apr 8, 2021
bc719d2
test=develop, fix
123malin Apr 14, 2021
952ac68
test=develop, fix
123malin Apr 14, 2021
91f91ba
test=develop, fix
123malin Apr 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
first_commit for index_dataset
  • Loading branch information
123malin committed Mar 17, 2021
commit a431b31780d3af87e4ef9217ba5c846212507b05
14 changes: 8 additions & 6 deletions paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ add_subdirectory(io)
proto_library(framework_proto SRCS framework.proto)
proto_library(heter_service_proto SRCS heter_service.proto)
proto_library(data_feed_proto SRCS data_feed.proto)
proto_library(index_dataset_proto SRCS index_dataset.proto)
proto_library(trainer_desc_proto SRCS trainer_desc.proto DEPS framework_proto
data_feed_proto)

Expand Down Expand Up @@ -189,10 +190,11 @@ if(WITH_PYTHON)
py_proto_compile(framework_py_proto SRCS framework.proto data_feed.proto)
py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto)
py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto)
py_proto_compile(index_dataset_py_proto SRCS index_dataset.proto)
#Generate an empty \
#__init__.py to make framework_py_proto as a valid python module.
add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto)
add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto index_dataset_py_proto)
if (NOT WIN32)
add_custom_command(TARGET framework_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto
Expand Down Expand Up @@ -237,7 +239,7 @@ if(WITH_DISTRIBUTE)
fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer
lod_rank_table feed_fetch_method collective_helper ${GLOB_DISTRIBUTE_DEPS}
graph_to_program_pass variable_helper data_feed_proto timer monitor
heter_service_proto pslib_brpc)
heter_service_proto pslib_brpc index_dataset_proto)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
elseif(WITH_PSCORE)
Expand All @@ -249,7 +251,7 @@ if(WITH_DISTRIBUTE)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor heter_service_proto fleet)
graph_to_program_pass variable_helper timer monitor heter_service_proto fleet index_dataset_proto)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(multi_trainer.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
Expand All @@ -263,7 +265,7 @@ if(WITH_DISTRIBUTE)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
graph_to_program_pass variable_helper timer monitor index_dataset_proto)
endif()
elseif(WITH_PSLIB)
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
Expand All @@ -274,7 +276,7 @@ elseif(WITH_PSLIB)
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor pslib_brpc )
graph_to_program_pass variable_helper timer monitor pslib_brpc index_dataset_proto)
else()
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
Expand All @@ -284,7 +286,7 @@ else()
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
device_context scope framework_proto data_feed_proto heter_service_proto trainer_desc_proto glog
lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper lodtensor_printer feed_fetch_method
graph_to_program_pass variable_helper timer monitor)
graph_to_program_pass variable_helper timer monitor index_dataset_proto)
endif()

target_link_libraries(executor while_op_helper executor_gc_helper recurrent_op_helper conditional_block_op_helper)
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/fleet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ else()
endif(WITH_GLOO)

cc_library(heter_wrapper SRCS heter_wrapper.cc DEPS framework_proto device_context heter_service_proto)
cc_library(index_wrapper SRCS index_wrapper.cc DEPS index_dataset_proto)
cc_library(index_sampler SRCS index_sampler.cc DEPS index_wrapper)

cc_test(test_fleet_cc SRCS test_fleet.cc DEPS fleet_wrapper gloo_wrapper fs shell)

Expand Down
72 changes: 72 additions & 0 deletions paddle/fluid/framework/fleet/index_sampler.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include "paddle/fluid/framework/fleet/index_sampler.h"

namespace paddle {
namespace framework {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namespace upgrade.


std::vector<std::vector<uint64_t>> LayerWiseSampler::sample(std::vector<std::vector<uint64_t>>& user_inputs, std::vector<uint64_t>& target_ids) {
std::vector<std::vector<uint64_t>> outputs;
return outputs;
//auto ids_num = target_ids->size();
//(ids_num * layer_counts_sum_, std::vector<uint64_t>(inputs[0].size() + 1));

// std::vector<std::vector<uint64_t>> ancestors(ids_num);
// tree_->Ancestors(*input_ids, ancestors);
// int i = 0;
// for (auto it = ancestors.begin(); it != ancestors.end(); ++it) {
// auto& ancs = *it;
// if (!ancs.empty()) {
// if (ancs.size() > layer_counts_.size()) {
// ancs.resize(layer_counts_.size());
// }

// auto level = tree_->tree_height();
// for (size_t j = 0; j < ancs.size(); ++j) {
// level --;
// // sample +
// outputs->at(i).first = ancs[j];
// outputs->at(i).second = 1;
// i ++;

// // sample: -
// auto layer_nodes_info = tree_->layer_nodes(level);

// size_t cur_layer_count = layer_counts_.at(level);
// std::unordered_set<int> neighbor_indices_set;
// std::vector<int> neighbor_indices;
// neighbor_indices_set.reserve(cur_layer_count);
// neighbor_indices.reserve(cur_layer_count);
// auto neighbors_count = layer_nodes_info.second;

// static __thread std::hash<std::thread::id> hasher;
// static __thread std::mt19937 rng(
// clock() + hasher(std::this_thread::get_id()));
// std::uniform_int_distribution<int> distrib(0, neighbors_count);

// while (neighbor_indices_set.size() < cur_layer_count) {
// int q = distrib(rng);
// auto id = layer_nodes_info.first[q].id;
// if (neighbor_indices_set.find(q) != neighbor_indices_set.end() || id == ancs[j]) {
// continue;
// }

// neighbor_indices_set.insert(id);
// neighbor_indices.push_back(id);
// }
// for (size_t k = 0; k < cur_layer_count; ++k) {
// outputs->at(i).first = neighbor_indices[k];
// outputs->at(i).second = 0;
// i++;
// }
// }
// }
// }
// return;
}

std::vector<std::vector<uint64_t>> BeamSearchSampler::sample(std::vector<std::vector<uint64_t>>& user_inputs, std::vector<uint64_t>& target_ids) {
std::vector<std::vector<uint64_t>> outputs;
return outputs;
}

} // end namespace framework
} // end namespace paddle
62 changes: 62 additions & 0 deletions paddle/fluid/framework/fleet/index_sampler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include <vector>
#include "paddle/fluid/framework/fleet/index_wrapper.h"
#include "paddle/fluid/framework/program_desc.h"

namespace paddle {
namespace framework {

class Sampler {
public:
virtual ~Sampler() {}
Sampler() {}

template <typename T>
static std::shared_ptr<Sampler> Init(const std::string& name) {
std::shared_ptr<Sampler> instance = nullptr;
instance.reset(new T(name));
return instance;
}

virtual void init_layerwise_conf(const std::vector<int64_t> &layer_sample_counts) {};
virtual void init_beamsearch_conf(const int64_t k) {};
virtual std::vector<std::vector<uint64_t>> sample(std::vector<std::vector<uint64_t>>& user_inputs, std::vector<uint64_t>& input_targets) = 0;
};

class LayerWiseSampler : public Sampler {
public:
virtual ~LayerWiseSampler() {}
LayerWiseSampler(const std::string& name) {tree_ = IndexWrapper::GetInstance()->GetTreeIndex(name);}

void init_layerwise_conf(const std::vector<int64_t> &layer_sample_counts) override {
for (int i = 0; i < tree_->height(); ++i) {
layer_counts_sum_ += layer_sample_counts[i] + 1;
layer_counts_.push_back(layer_sample_counts[i]);
VLOG(0) << "[INFO] level " << i << " layer_counts.push_back: " << layer_sample_counts[i];
}
}
std::vector<std::vector<uint64_t>> sample(std::vector<std::vector<uint64_t>>& user_inputs, std::vector<uint64_t>& target_ids) override;

private:
std::vector<int64_t> layer_counts_;
int64_t layer_counts_sum_;
TreePtr tree_{nullptr};
};

class BeamSearchSampler : public Sampler{
public:
virtual ~BeamSearchSampler() {}
BeamSearchSampler(const std::string& name) {tree_ = IndexWrapper::GetInstance()->GetTreeIndex(name);}

void init_beamsearch_conf(const int64_t k) override {
k_ = k;
return;
}
std::vector<std::vector<uint64_t>> sample(std::vector<std::vector<uint64_t>>& user_inputs, std::vector<uint64_t>& target_ids) override;

private:
int64_t k_;
TreePtr tree_{nullptr};
};

} // end namespace framework
} // end namespace paddle
115 changes: 115 additions & 0 deletions paddle/fluid/framework/fleet/index_wrapper.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <thread>
#include <stdio.h>

#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
#include "paddle/fluid/distributed/service/communicator.h"
#include "paddle/fluid/framework/fleet/index_wrapper.h"

namespace paddle {
namespace framework {

using paddle::distributed::Communicator;

std::shared_ptr<IndexWrapper> IndexWrapper::s_instance_(nullptr);

int TreeIndex::load(std::string filename) {
FILE* fp = fopen(filename.c_str(), "rb");
if (fp == NULL) {
fprintf(stderr, "Can not open file: %s\n", filename.c_str());
return -1;
}

int num = 0;
size_t ret = fread(&num, sizeof(num), 1, fp);
while (ret == 1 && num > 0) {
std::string content(num, '\0');
if (fread(const_cast<char*>(content.data()), 1, num, fp)
!= static_cast<size_t>(num)) {
fprintf(stderr, "Read from file: %s failed, invalid format.\n",
filename.c_str());
break;
}
KVItem item;
if (!item.ParseFromString(content)) {
fprintf(stderr, "Parse from file: %s failed.\n", filename.c_str());
break;
}
if (item.key() == ".tree_meta") {
meta_.ParseFromString(item.value());
} else {
auto code = boost::lexical_cast<uint64_t>(item.key());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need boost?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to parse from bytes to uint64.

Node node;
node.ParseFromString(item.value());
if (node.is_leaf()) {
id_codes_map_[node.id()] = code;
}
data_[code] = node;
}
ret = fread(&num, sizeof(num), 1, fp);
}
fclose(fp);
total_nodes_num_ = data_.size();
return 0;
}

std::vector<uint64_t> TreeIndex::get_nodes_given_level(int level, bool ret_code) {
uint64_t level_num = static_cast<uint64_t>(std::pow(meta_.branch(), level));
uint64_t level_offset = level_num - 1;

std::vector<uint64_t> res;
res.reserve(level_num);
for (uint64_t i = 0; i < level_num; i++) {
auto code = level_offset + i;
if (data_.find(code) != data_.end()) {
res.push_back(code);
}
}
if (ret_code == false) {
for (size_t i = 0; i < res.size(); i++) {
res[i] = data_[res[i]].id();
}
}
return res;
}

std::vector<uint64_t> TreeIndex::get_travel_path(uint64_t id, bool ret_code, int start_level) {
std::vector<uint64_t> res;
if (id_codes_map_.find(id) == id_codes_map_.end()) {
return res;
}
res.reserve(meta_.height());
auto code = id_codes_map_[id];
res.push_back(code);
int level = meta_.height() - 1;

while (--level >= start_level) {
code = (code - 1) / meta_.branch();
res.push_back(code);
}
if (ret_code == false) {
for (size_t i = 0; i < res.size(); i++) {
res[i] = data_[res[i]].id();
}
}
return res;
}

} // end namespace framework
} // end namespace paddle
Loading