Skip to content

Commit 765bb0e

Browse files
authored
Merge pull request #107 from DICL/refactor-update
Refactor update
2 parents 33b61d6 + da78eff commit 765bb0e

16 files changed

+321
-21
lines changed

Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ bin_PROGRAMS = eclipse_node dfs
88
messages_files = src/messages/boundaries.cc \
99
src/messages/message.cc \
1010
src/messages/keyvalue.cc \
11+
src/messages/offsetkv.cc \
1112
src/messages/keyrequest.cc \
1213
src/messages/control.cc \
1314
src/messages/boost_impl.cc \
1415
src/messages/factory.cc \
1516
src/messages/fileinfo.cc \
1617
src/messages/blockinfo.cc \
18+
src/messages/blockupdate.cc \
1719
src/messages/task.cc \
1820
src/messages/reply.cc \
1921
src/messages/filerequest.cc \

src/fs/dfs.cc

Lines changed: 114 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -441,10 +441,10 @@ namespace eclipse{
441441
return EXIT_SUCCESS;
442442
}
443443

444-
int DFS::partial_get(int argc, char* argv[]) {
444+
int DFS::pget(int argc, char* argv[]) {
445445
string file_name = "";
446446
if (argc < 5) {
447-
cout << "[INFO] dfs partial_get file_name start_offset read_byte" << endl;
447+
cout << "[INFO] dfs pget file_name start_offset read_byte" << endl;
448448
return EXIT_FAILURE;
449449
} else {
450450
Histogram boundaries(NUM_NODES, 0);
@@ -474,7 +474,7 @@ namespace eclipse{
474474
cerr << "[ERR] Wrong read byte." << endl;
475475
return EXIT_FAILURE;
476476
}
477-
string outfile = "partial_" + file_name;
477+
string outfile = "p_" + file_name;
478478
ofstream f(outfile);
479479
int block_seq = 0;
480480
uint64_t passed_byte = 0;
@@ -502,12 +502,12 @@ namespace eclipse{
502502
} else {
503503
start_pos = 0;
504504
}
505-
string sub_str = block_content.substr(start_pos);
506-
if (read_byte_cnt + sub_str.length() > read_byte) {
505+
uint32_t read_length = block_content.length();
506+
if (read_byte_cnt + read_length > read_byte) {
507507
final_block = true;
508-
uint32_t sub_length = read_byte - read_byte_cnt;
509-
sub_str = block_content.substr(start_pos, sub_length);
508+
read_length = read_byte - read_byte_cnt;
510509
}
510+
string sub_str = msg->content.substr(start_pos, read_length);
511511
f << sub_str;
512512
read_byte_cnt += sub_str.length();
513513
tmp_socket->close();
@@ -521,4 +521,111 @@ namespace eclipse{
521521
cout << "[INFO] " << file_name << " is read." << endl;
522522
return EXIT_SUCCESS;
523523
}
524+
525+
int DFS::update(int argc, char* argv[]) {
526+
string ori_file_name = "";
527+
if (argc < 5) {
528+
cout << "[INFO] dfs update original_file new_file start_offset" << endl;
529+
return EXIT_FAILURE;
530+
} else {
531+
Histogram boundaries(NUM_NODES, 0);
532+
boundaries.initialize();
533+
534+
ori_file_name = argv[2];
535+
string new_file_name = argv[3];
536+
uint64_t start_offset = (uint64_t)atoi(argv[4]);
537+
uint32_t file_hash_key = h(ori_file_name);
538+
auto socket = connect (file_hash_key);
539+
FileExist fe;
540+
fe.name = ori_file_name;
541+
send_message(socket.get(), &fe);
542+
auto rep = read_reply<Reply> (socket.get());
543+
544+
if (rep->message != "TRUE") {
545+
cerr << "[ERR] " << ori_file_name << " doesn't exist." << endl;
546+
return EXIT_FAILURE;
547+
}
548+
FileRequest fr;
549+
fr.name = ori_file_name;
550+
551+
ifstream myfile(new_file_name);
552+
myfile.seekg(0, myfile.end);
553+
uint64_t new_file_size = myfile.tellg();
554+
555+
send_message(socket.get(), &fr);
556+
auto fd = read_reply<FileDescription> (socket.get());
557+
socket->close();
558+
if (start_offset + new_file_size > fd->size) {
559+
cerr << "[ERR] Wrong file size." << endl;
560+
return EXIT_FAILURE;
561+
}
562+
myfile.seekg(0, myfile.beg);
563+
char *buffer = new char[new_file_size];
564+
myfile.read(buffer, new_file_size);
565+
string sbuffer(buffer);
566+
delete[] buffer;
567+
myfile.close();
568+
569+
int block_seq = 0;
570+
uint64_t passed_byte = 0;
571+
uint64_t write_byte_cnt = 0;
572+
uint32_t ori_start_pos = 0;
573+
uint32_t to_write_byte = new_file_size;
574+
bool first_block = true;
575+
bool final_block = false;
576+
for (auto block_name : fd->blocks) {
577+
// pass until find the block which has start_offset
578+
if (passed_byte + fd->block_size[block_seq] < start_offset) {
579+
passed_byte += fd->block_size[block_seq];
580+
block_seq++;
581+
continue;
582+
} else {
583+
// If this block is the first one of updating blocks,
584+
// start position will be start_offset - passed_byte.
585+
// Otherwise, start position will be 0.
586+
uint32_t hash_key = fd->hash_keys[block_seq];
587+
if (first_block) {
588+
first_block = false;
589+
ori_start_pos = start_offset - passed_byte;
590+
} else {
591+
ori_start_pos = 0;
592+
}
593+
// write length means the lenght which should be repliaced in THIS block.
594+
// to_write_byte means remaining total bytes to write
595+
// If this block is the last one, write_length should be same as to_write_byte
596+
// Otherwise, write_length should be same as block_size - start position
597+
uint32_t write_length = fd->block_size[block_seq] - ori_start_pos;
598+
block_seq++;
599+
if (to_write_byte < write_length) {
600+
final_block = true;
601+
write_length = to_write_byte;
602+
}
603+
// send message
604+
BlockUpdate bu;
605+
bu.name = block_name;
606+
bu.replica = fd->replica;
607+
bu.hash_key = hash_key;
608+
bu.pos = ori_start_pos;
609+
bu.len = write_length;
610+
bu.content = sbuffer.substr(write_byte_cnt, write_length);
611+
auto tmp_socket = connect(boundaries.get_index(hash_key));
612+
send_message(tmp_socket.get(), &bu);
613+
auto reply = read_reply<Reply> (tmp_socket.get());
614+
tmp_socket->close();
615+
if (reply->message != "OK") {
616+
cerr << "[ERR] Failed to upload file. Details: " << reply->details << endl;
617+
return EXIT_FAILURE;
618+
}
619+
// calculate total write bytes and remaining write bytes
620+
write_byte_cnt += write_length;
621+
if (final_block) {
622+
break;
623+
}
624+
to_write_byte -= write_length;
625+
}
626+
}
627+
}
628+
cout << "[INFO] " << ori_file_name << " is updated." << endl;
629+
return EXIT_SUCCESS;
630+
}
524631
}

src/fs/dfs.hh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "../messages/factory.hh"
77
#include "../messages/fileinfo.hh"
88
#include "../messages/blockinfo.hh"
9+
#include "../messages/blockupdate.hh"
910
#include "../messages/fileexist.hh"
1011
#include "../messages/filerequest.hh"
1112
#include "../messages/filelist.hh"
@@ -57,7 +58,7 @@ namespace eclipse {
5758
int rm(int argc, char* argv[]);
5859
int format(int argc, char* argv[]);
5960
int show(int argc, char* argv[]);
60-
int partial_get(int argc, char* argv[]);
61+
int pget(int argc, char* argv[]);
6162
int update(int argc, char* argv[]);
6263
};
6364
}

src/fs/main.cc

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,42 @@ int main(int argc, char* argv[]) {
1111
string op = argv[1];
1212
if (op.compare("put") == 0) {
1313
dfs.put(argc, argv);
14-
return 0;
14+
return EXIT_SUCCESS;
1515
}
1616
else if (op.compare("get") == 0) {
1717
dfs.get(argc, argv);
18-
return 0;
18+
return EXIT_SUCCESS;
1919
}
2020
else if (op.compare("cat") == 0) {
2121
dfs.cat(argc, argv);
22-
return 0;
22+
return EXIT_SUCCESS;
2323
}
2424
else if (op.compare("ls") == 0) {
2525
dfs.ls(argc, argv);
26-
return 0;
26+
return EXIT_SUCCESS;
2727
}
2828
else if (op.compare("rm") == 0) {
2929
dfs.rm(argc, argv);
30-
return 0;
30+
return EXIT_SUCCESS;
3131
}
3232
else if (op.compare("format") == 0) {
3333
dfs.format(argc, argv);
34-
return 0;
34+
return EXIT_SUCCESS;
3535
}
3636
else if (op.compare("show") == 0) {
3737
dfs.show(argc, argv);
38-
return 0;
38+
return EXIT_SUCCESS;
3939
}
40-
else if (op.compare("partial_get") == 0) {
41-
dfs.DFS::partial_get(argc, argv);
42-
return 0;
40+
else if (op.compare("pget") == 0) {
41+
dfs.DFS::pget(argc, argv);
42+
return EXIT_SUCCESS;
43+
}
44+
else if (op.compare("update") == 0) {
45+
dfs.DFS::update(argc, argv);
46+
return EXIT_SUCCESS;
4347
}
4448
}
4549
cerr << "[ERR] Unknown operation" << endl;
46-
cout << "[INFO] dfs put|get|cat|ls|rm|format|partial_get" << endl;
47-
return -1;
50+
cout << "[INFO] dfs put|get|cat|ls|rm|format|pget|update" << endl;
51+
return EXIT_FAILURE;
4852
}

src/messages/blockupdate.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "blockupdate.hh"
2+
3+
using namespace eclipse::messages;
4+
5+
std::string BlockUpdate::get_type() const { return "BlockUpdate"; }

src/messages/blockupdate.hh

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
#include "message.hh"
3+
#include <cstdint>
4+
5+
namespace eclipse {
6+
namespace messages {
7+
struct BlockUpdate: public Message {
8+
std::string get_type() const override;
9+
10+
std::string name;
11+
uint32_t hash_key;
12+
int replica;
13+
uint32_t pos;
14+
uint32_t len;
15+
std::string content;
16+
};
17+
}
18+
}

src/messages/boost_impl.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
//! 4) Also here
77
BOOST_CLASS_EXPORT(eclipse::messages::Boundaries);
88
BOOST_CLASS_EXPORT(eclipse::messages::KeyValue);
9+
BOOST_CLASS_EXPORT(eclipse::messages::OffsetKeyValue);
910
BOOST_CLASS_EXPORT(eclipse::messages::Control);
1011
BOOST_CLASS_EXPORT(eclipse::messages::KeyRequest);
1112
BOOST_CLASS_EXPORT(eclipse::messages::Task);
1213
BOOST_CLASS_EXPORT(eclipse::messages::FileInfo);
1314
BOOST_CLASS_EXPORT(eclipse::messages::FileList);
1415
BOOST_CLASS_EXPORT(eclipse::messages::BlockInfo);
16+
BOOST_CLASS_EXPORT(eclipse::messages::BlockUpdate);
1517
BOOST_CLASS_EXPORT(eclipse::messages::Reply);
1618
BOOST_CLASS_EXPORT(eclipse::messages::CacheInfo);
1719
BOOST_CLASS_EXPORT(eclipse::messages::FileRequest);

src/messages/boost_impl.hh

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
#include "filedel.hh"
2525
#include "formatrequest.hh"
2626
#include "fileexist.hh"
27+
#include "offsetkv.hh"
28+
#include "blockupdate.hh"
2729

2830
#include <boost/serialization/export.hpp>
2931
#include <boost/serialization/access.hpp>
@@ -62,6 +64,16 @@ template <typename Archive>
6264
ar & BOOST_SERIALIZATION_NVP(k.value);
6365
}
6466

67+
template <typename Archive>
68+
void serialize (Archive& ar, eclipse::messages::OffsetKeyValue& k, unsigned int) {
69+
ar & BASE_OBJECT(Message, k);
70+
ar & BOOST_SERIALIZATION_NVP(k.key);
71+
ar & BOOST_SERIALIZATION_NVP(k.name);
72+
ar & BOOST_SERIALIZATION_NVP(k.value);
73+
ar & BOOST_SERIALIZATION_NVP(k.pos);
74+
ar & BOOST_SERIALIZATION_NVP(k.len);
75+
}
76+
6577
template <typename Archive>
6678
void serialize (Archive& ar, eclipse::messages::Control& c, unsigned int) {
6779
ar & BASE_OBJECT(Message, c);
@@ -102,6 +114,17 @@ template <typename Archive>
102114
ar & BOOST_SERIALIZATION_NVP(c.content);
103115
}
104116

117+
template <typename Archive>
118+
void serialize (Archive& ar, eclipse::messages::BlockUpdate& c, unsigned int) {
119+
ar & BASE_OBJECT(Message, c);
120+
ar & BOOST_SERIALIZATION_NVP(c.name);
121+
ar & BOOST_SERIALIZATION_NVP(c.hash_key);
122+
ar & BOOST_SERIALIZATION_NVP(c.replica);
123+
ar & BOOST_SERIALIZATION_NVP(c.pos);
124+
ar & BOOST_SERIALIZATION_NVP(c.len);
125+
ar & BOOST_SERIALIZATION_NVP(c.content);
126+
}
127+
105128
template <typename Archive>
106129
void serialize (Archive& ar, eclipse::messages::Task& c, unsigned int) {
107130
ar & BASE_OBJECT(Message, c);
@@ -189,10 +212,12 @@ BOOST_SERIALIZATION_ASSUME_ABSTRACT(eclipse::messages::Message);
189212
BOOST_CLASS_TRACKING(eclipse::messages::Message, boost::serialization::track_never);
190213
BOOST_CLASS_TRACKING(eclipse::messages::Boundaries, boost::serialization::track_never);
191214
BOOST_CLASS_TRACKING(eclipse::messages::KeyValue, boost::serialization::track_never);
215+
BOOST_CLASS_TRACKING(eclipse::messages::OffsetKeyValue, boost::serialization::track_never);
192216
BOOST_CLASS_TRACKING(eclipse::messages::Control, boost::serialization::track_never);
193217
BOOST_CLASS_TRACKING(eclipse::messages::KeyRequest, boost::serialization::track_never);
194218
BOOST_CLASS_TRACKING(eclipse::messages::FileInfo, boost::serialization::track_never);
195219
BOOST_CLASS_TRACKING(eclipse::messages::BlockInfo, boost::serialization::track_never);
220+
BOOST_CLASS_TRACKING(eclipse::messages::BlockUpdate, boost::serialization::track_never);
196221
BOOST_CLASS_TRACKING(eclipse::messages::Task, boost::serialization::track_never);
197222
BOOST_CLASS_TRACKING(eclipse::messages::FileList, boost::serialization::track_never);
198223
BOOST_CLASS_TRACKING(eclipse::messages::Reply, boost::serialization::track_never);

src/messages/offsetkv.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#include "offsetkv.hh"
2+
3+
namespace eclipse {
4+
namespace messages {
5+
6+
OffsetKeyValue::OffsetKeyValue (uint32_t k, std::string n, std::string v, uint32_t p, uint32_t l) : key(k), name(n), value(v), pos(p), len(l) { }
7+
8+
std::string OffsetKeyValue::get_type() const { return "OffsetKeyValue"; }
9+
10+
}
11+
}

src/messages/offsetkv.hh

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#pragma once
2+
3+
#include "message.hh"
4+
#include <string>
5+
6+
namespace eclipse {
7+
namespace messages {
8+
9+
struct OffsetKeyValue: public Message {
10+
OffsetKeyValue () = default;
11+
OffsetKeyValue (uint32_t, std::string, std::string, uint32_t, uint32_t);
12+
13+
std::string get_type() const override;
14+
uint32_t key;
15+
std::string name, value;
16+
uint32_t pos, len;
17+
};
18+
19+
}
20+
}

0 commit comments

Comments
 (0)