Skip to content

[wip] Changeset index #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ set(ALL_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} ${EXPAT_LIBRARIES} ${ROCKSDB_LIBRARI
#----------------------------------------------------------------------
add_executable(build_tag_lookup build_tag_lookup.cpp)
add_executable(add_tags add_tags.cpp)
add_executable(build_changeset_lookup build_changeset_lookup.cpp)

target_link_libraries(build_changeset_lookup ${ALL_LIBRARIES})
target_link_libraries(build_tag_lookup ${ALL_LIBRARIES})
target_link_libraries(add_tags ${ALL_LIBRARIES})
#-----------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ make

## Run

First build up a changeset index (`http://planet.osm.org/planet/changesets-latest.osm.bz2`).

```
build_changeset_lookup INDEX_DIR OSM_CHANGESET_FILE
```

First build up a historic tag index.

```
Expand Down
4 changes: 2 additions & 2 deletions add_tags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ typedef std::map<std::string,std::string> StringStringMap;
typedef std::map<std::string,std::string> VersionTags;
typedef std::vector < std::map<std::string, std::string> > TagHistoryArray;

void write_with_history_tags(TagStore* store, const std::string line) {
void write_with_history_tags(osmwayback::TagStore* store, const std::string line) {
rapidjson::Document geojson_doc;
if(geojson_doc.Parse<0>(line.c_str()).HasParseError()) {
std::cerr << "ERROR" << std::endl;
Expand Down Expand Up @@ -213,7 +213,7 @@ int main(int argc, char* argv[]) {

std::string index_dir = argv[1];
std::cout << "init tag dir" << std::endl;
TagStore store(index_dir, false);
osmwayback::TagStore store(index_dir, false);

rapidjson::Document doc;
for (std::string line; std::getline(std::cin, line);) {
Expand Down
67 changes: 67 additions & 0 deletions build_changeset_lookup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include <cstdlib> // for std::exit
#include <cstring> // for std::strncmp
#include <iostream> // for std::cout, std::cerr
#include <sstream>
#include <chrono>

#include <osmium/io/any_input.hpp>
#include <osmium/osm/types.hpp>
#include <osmium/handler.hpp>
#include <osmium/visitor.hpp>

#include "db.hpp"

class ChangesetHandler : public osmium::handler::Handler {
osmwayback::TagStore* m_store;

public:
ChangesetHandler(osmwayback::TagStore* store) : m_store(store) {}
void changeset(const osmium::Changeset& changeset) {
m_store->store_changeset(changeset);
}
};

std::atomic_bool stop_progress{false};

void report_progress(const osmwayback::TagStore* store) {
unsigned long last_changesets_count{0};
auto start = std::chrono::steady_clock::now();

while(true) {
if(stop_progress) {
auto end = std::chrono::steady_clock::now();
auto diff = end - start;

std::cerr << "Processed " << last_changesets_count << " changesets in " << std::chrono::duration <double, std::milli> (diff).count() << " ms" << std::endl;
break;
}

auto diff_changesets_count = store->stored_changesets_count - last_changesets_count;
std::cerr << "Processing " << diff_changesets_count << " changesets/s" << std::endl;

std::this_thread::sleep_for(std::chrono::milliseconds(1000));
last_changesets_count += diff_changesets_count;
}
}

int main(int argc, char* argv[]) {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " INDEX_DIR CHANGESET_FILE" << std::endl;
std::exit(1);
}

std::string index_dir = argv[1];
std::string changeset_filename = argv[2];

osmwayback::TagStore store(index_dir, true);
ChangesetHandler handler(&store);

std::thread t_progress(report_progress, &store);

osmium::io::Reader reader{changeset_filename, osmium::osm_entity_bits::changeset};
osmium::apply(reader, handler);

stop_progress = true;
t_progress.join();
store.flush();
}
16 changes: 4 additions & 12 deletions build_tag_lookup.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
/*

Build Tag History Database

(Based on osmium_pub_names example)

*/

#include <cstdlib> // for std::exit
#include <cstring> // for std::strncmp
#include <iostream> // for std::cout, std::cerr
Expand All @@ -20,10 +12,10 @@
#include "db.hpp"

class TagStoreHandler : public osmium::handler::Handler {
TagStore* m_store;
osmwayback::TagStore* m_store;

public:
TagStoreHandler(TagStore* store) : m_store(store) {}
TagStoreHandler(osmwayback::TagStore* store) : m_store(store) {}
long node_count = 0;
int way_count = 0;
int rel_count = 0;
Expand All @@ -45,7 +37,7 @@ class TagStoreHandler : public osmium::handler::Handler {

std::atomic_bool stop_progress{false};

void report_progress(const TagStore* store) {
void report_progress(const osmwayback::TagStore* store) {
unsigned long last_nodes_count{0};
unsigned long last_ways_count{0};
unsigned long last_relations_count{0};
Expand Down Expand Up @@ -85,7 +77,7 @@ int main(int argc, char* argv[]) {
std::string index_dir = argv[1];
std::string osm_filename = argv[2];

TagStore store(index_dir, true);
osmwayback::TagStore store(index_dir, false);
TagStoreHandler tag_handler(&store);

std::thread t_progress(report_progress, &store);
Expand Down
122 changes: 106 additions & 16 deletions db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
#include <osmium/visitor.hpp>

#include <chrono>
#include "pbf_encoder.hpp"

namespace osmwayback {

const std::string make_lookup(const int64_t osm_id, const int version){
return std::to_string(osm_id) + "!" + std::to_string(version);
}

class TagStore {
class RocksDBStore {
protected:

rocksdb::DB* m_db;
rocksdb::ColumnFamilyHandle* m_cf_ways;
rocksdb::ColumnFamilyHandle* m_cf_nodes;
rocksdb::ColumnFamilyHandle* m_cf_relations;
rocksdb::ColumnFamilyHandle* m_cf_changesets;
rocksdb::WriteOptions m_write_options;

rocksdb::WriteBatch m_buffer_batch;

void flush_family(const std::string type, rocksdb::ColumnFamilyHandle* cf) {
const auto start = std::chrono::steady_clock::now();
std::cerr << "Flushing " << type << std::endl;
Expand Down Expand Up @@ -62,21 +66,13 @@ class TagStore {
uint64_t relation_keys{0};
m_db->GetIntProperty(m_cf_relations, "rocksdb.estimate-num-keys", &relation_keys);
std::cerr << "Stored ~" << relation_keys << "/" << stored_relations_count << " relations" << std::endl;
}

public:
unsigned long empty_objects_count{0};
unsigned long stored_tags_count{0};

unsigned long stored_nodes_count{0};
unsigned long stored_ways_count{0};
unsigned long stored_relations_count{0};

unsigned long stored_objects_count() {
return stored_nodes_count + stored_ways_count + stored_relations_count;
uint64_t changeset_keys{0};
m_db->GetIntProperty(m_cf_changesets, "rocksdb.estimate-num-keys", &changeset_keys);
std::cerr << "Stored ~" << changeset_keys << "/" << stored_changesets_count << " changesets" << std::endl;
}

TagStore(const std::string index_dir, const bool create) {
RocksDBStore (const std::string index_dir, const bool create) {
rocksdb::Options db_options;
db_options.allow_mmap_writes = false;
db_options.max_background_flushes = 4;
Expand Down Expand Up @@ -104,6 +100,7 @@ class TagStore {
s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "ways", &m_cf_ways);
assert(s.ok());
s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "relations", &m_cf_relations);
s = m_db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "changesets", &m_cf_changesets);
assert(s.ok());
} else {
db_options.error_if_exists = false;
Expand All @@ -118,6 +115,7 @@ class TagStore {
column_families.push_back(rocksdb::ColumnFamilyDescriptor( "nodes", rocksdb::ColumnFamilyOptions()));
column_families.push_back(rocksdb::ColumnFamilyDescriptor( "ways", rocksdb::ColumnFamilyOptions()));
column_families.push_back(rocksdb::ColumnFamilyDescriptor( "relations", rocksdb::ColumnFamilyOptions()));
column_families.push_back(rocksdb::ColumnFamilyDescriptor( "changesets", rocksdb::ColumnFamilyOptions()));

std::vector<rocksdb::ColumnFamilyHandle*> handles;

Expand All @@ -127,7 +125,31 @@ class TagStore {
m_cf_nodes = handles[1];
m_cf_ways = handles[2];
m_cf_relations = handles[3];
m_cf_changesets = handles[4];
}

}

public:

unsigned long empty_objects_count{0};
unsigned long stored_tags_count{0};

unsigned long stored_nodes_count{0};
unsigned long stored_ways_count{0};
unsigned long stored_relations_count{0};
unsigned long stored_changesets_count{0};

unsigned long stored_objects_count() {
return stored_nodes_count + stored_ways_count + stored_relations_count;
}
};

class TagStore : public RocksDBStore {
rocksdb::WriteBatch m_buffer_batch;

public:
TagStore(const std::string index_dir, const bool create) : RocksDBStore(index_dir, create) {
}
rocksdb::Status get_tags(const int64_t osm_id, const int osm_type, const int version, std::string* json_value) {
const auto lookup = make_lookup(osm_id, version);
Expand All @@ -141,7 +163,52 @@ class TagStore {
}
}

void lookup_nodes(const osmium::Way& way, const int closed_at) {
const osmium::WayNodeList& node_refs = way.nodes();
for (const osmium::NodeRef& node_ref : node_refs) {
auto node_id = node_ref.ref();

// Find all the versions
int node_version{1};
for(int v = 1; v < 1000; v++) {
std::string node_json;

auto read_status = m_db->Get(rocksdb::ReadOptions(), m_cf_nodes, make_lookup(node_id, v), &node_json);

if(read_status.ok()) {
rapidjson::Document node_doc;
if(!node_doc.Parse<0>(node_json.c_str()).HasParseError()) {
if(node_doc.HasMember("@timestamp")) {
auto ts = node_doc["@timestamp"].GetInt();
if (ts > closed_at) {
break;
} else {
node_version = v;
}
}
}
}
}

std::cout << "Found real node version " << node_version << std::endl;
}
}

void store_tags(const osmium::Way& way) {
// Add closed at if found
std::string changeset_json;
auto read_status = m_db->Get(rocksdb::ReadOptions(), m_cf_changesets, std::to_string(way.changeset()), &changeset_json);
if (read_status.ok()) {
rapidjson::Document changeset_doc;
std::cout << changeset_json << std::endl;
if(!changeset_doc.Parse<0>(changeset_json.c_str()).HasParseError()) {
if(changeset_doc.HasMember("@closed_at")) {
auto closed_at = changeset_doc["@closed_at"].GetInt();
lookup_nodes(way, closed_at);
}
}
}

if(store_tags(way, m_cf_ways)) {
stored_ways_count++;
}
Expand All @@ -159,7 +226,26 @@ class TagStore {
}
}

bool store_changeset(const osmium::Changeset& changeset) {
auto lookup = std::to_string(changeset.id());
auto value = osmwayback::encode_changeset(changeset);
rocksdb::Status stat = m_buffer_batch.Put(m_cf_changesets, lookup, value);

stored_changesets_count++;
if (m_buffer_batch.Count() > 1000) {
m_db->Write(m_write_options, &m_buffer_batch);
m_buffer_batch.Clear();
}

if (stored_changesets_count != 0 && (stored_changesets_count % 1000000) == 0) {
flush_family("changesets", m_cf_changesets);
report_count_stats();
}
return true;
}

bool store_tags(const osmium::OSMObject& object, rocksdb::ColumnFamilyHandle* cf) {

const auto lookup = make_lookup(object.id(), object.version());
if (object.tags().empty()) {
empty_objects_count++;
Expand All @@ -171,7 +257,7 @@ class TagStore {

rapidjson::Document::AllocatorType& a = doc.GetAllocator();

doc.AddMember("@timestamp", object.timestamp().to_iso(), a); //ISO is helpful for debugging, but we should leave it
doc.AddMember("@timestamp", static_cast<int>(object.timestamp().seconds_since_epoch()), a);
if (object.deleted()){
doc.AddMember("@deleted", object.deleted(), a);
}
Expand Down Expand Up @@ -228,11 +314,15 @@ class TagStore {
flush_family("nodes", m_cf_nodes);
flush_family("ways", m_cf_ways);
flush_family("relations", m_cf_relations);
flush_family("changesets", m_cf_changesets);

compact_family("nodes", m_cf_nodes);
compact_family("ways", m_cf_ways);
compact_family("relations", m_cf_relations);
compact_family("changesets", m_cf_changesets);

report_count_stats();
}
};

}
Loading