forked from vesoft-inc/nebula
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRowReaderWrapper.cpp
176 lines (160 loc) · 5.46 KB
/
RowReaderWrapper.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include "codec/RowReaderWrapper.h"
namespace nebula {
// static
RowReaderWrapper RowReaderWrapper::getTagPropReader(meta::SchemaManager* schemaMan,
GraphSpaceID space,
TagID tag,
folly::StringPiece row) {
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (schemaVer >= 0) {
auto schema = schemaMan->getTagSchema(space, tag, schemaVer);
if (schema == nullptr) {
return RowReaderWrapper();
}
return RowReaderWrapper(schema.get(), row, readerVer);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return RowReaderWrapper();
}
}
// static
RowReaderWrapper RowReaderWrapper::getEdgePropReader(meta::SchemaManager* schemaMan,
GraphSpaceID space,
EdgeType edge,
folly::StringPiece row) {
if (schemaMan == nullptr) {
LOG(ERROR) << "schemaMan should not be nullptr!";
return RowReaderWrapper();
}
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (schemaVer >= 0) {
auto schema = schemaMan->getEdgeSchema(space, edge, schemaVer);
if (schema == nullptr) {
return RowReaderWrapper();
}
return RowReaderWrapper(schema.get(), row, readerVer);
} else {
LOG(WARNING) << "Invalid schema version in the row data!";
return RowReaderWrapper();
}
}
// static
RowReaderWrapper RowReaderWrapper::getRowReader(const meta::NebulaSchemaProvider* schema,
folly::StringPiece row) {
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (schemaVer != schema->getVersion()) {
return RowReaderWrapper();
}
return RowReaderWrapper(schema, row, readerVer);
}
// static
RowReaderWrapper RowReaderWrapper::getRowReader(
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
folly::StringPiece row) {
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (static_cast<size_t>(schemaVer) >= schemas.size() ||
schemaVer != schemas[schemaVer]->getVersion()) {
return RowReaderWrapper();
}
return RowReaderWrapper(schemas[schemaVer].get(), row, readerVer);
}
RowReaderWrapper::RowReaderWrapper(const meta::NebulaSchemaProvider* schema,
const folly::StringPiece& row,
int32_t& readerVer) {
CHECK_EQ(readerVer, 2);
CHECK_NOTNULL(schema);
readerV2_.resetImpl(schema, row);
currReader_ = &readerV2_;
}
bool RowReaderWrapper::reset(meta::NebulaSchemaProvider const* schema,
folly::StringPiece row,
int32_t readerVer) {
CHECK_EQ(readerVer, 2);
CHECK_NOTNULL(schema);
readerV2_.resetImpl(schema, row);
currReader_ = &readerV2_;
return true;
}
bool RowReaderWrapper::reset(meta::NebulaSchemaProvider const* schema, folly::StringPiece row) {
currReader_ = nullptr;
if (schema == nullptr) {
return false;
}
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (schemaVer != schema->getVersion()) {
return false;
}
return reset(schema, row, readerVer);
}
bool RowReaderWrapper::reset(
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
folly::StringPiece row) {
currReader_ = nullptr;
SchemaVer schemaVer;
int32_t readerVer;
RowReaderWrapper::getVersions(row, schemaVer, readerVer);
if (static_cast<size_t>(schemaVer) >= schemas.size()) {
return false;
}
// the schema is stored from oldest to newest, so just use version as idx
if (schemaVer != schemas[schemaVer]->getVersion()) {
return false;
}
return reset(schemas[schemaVer].get(), row, readerVer);
}
// static
void RowReaderWrapper::getVersions(const folly::StringPiece& row,
SchemaVer& schemaVer,
int32_t& readerVer) {
size_t index = 0;
if (row.empty()) {
LOG(WARNING) << "Row data is empty, so there is no version info";
schemaVer = -1;
readerVer = 2;
return;
}
readerVer = ((row[index] & 0x18) >> 3) + 1;
size_t verBytes = 0;
if (readerVer == 1) {
// The first three bits indicate the number of bytes for the
// schema version. If the number is zero, no schema version
// presents
verBytes = row[index++] >> 5;
} else if (readerVer == 2) {
// The last three bits indicate the number of bytes for the
// schema version. If the number is zero, no schema version
// presents
verBytes = row[index++] & 0x07;
} else {
LOG(WARNING) << "Invalid reader version: " << readerVer;
schemaVer = -1;
return;
}
schemaVer = 0;
if (verBytes > 0) {
if (verBytes + 1 > row.size()) {
// Data is too short
LOG(WARNING) << "Row data is too short: " << toHexStr(row);
schemaVer = -1;
return;
}
// Schema Version is stored in Little Endian
memcpy(reinterpret_cast<void*>(&schemaVer), &row[index], verBytes);
}
return;
}
} // namespace nebula