-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathpika_binlog.h
132 lines (94 loc) · 3.05 KB
/
pika_binlog.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef PIKA_BINLOG_H_
#define PIKA_BINLOG_H_
#include <atomic>
#include "pstd/include/env.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"
#include "include/pika_define.h"
using pstd::Slice;
using pstd::Status;
std::string NewFileName(const std::string name, const uint32_t current);
class Version {
public:
Version(pstd::RWFile* save);
~Version();
Status Init();
// RWLock should be held when access members.
Status StableSave();
uint32_t pro_num_ = 0;
uint64_t pro_offset_ = 0;
uint64_t logic_id_ = 0;
uint32_t term_ = 0;
pthread_rwlock_t rwlock_;
void debug() {
pstd::RWLock(&rwlock_, false);
printf("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_);
}
private:
pstd::RWFile* save_ = nullptr;
// No copying allowed;
Version(const Version&);
void operator=(const Version&);
};
class Binlog {
public:
Binlog(const std::string& Binlog_path, const int file_size = 100 * 1024 * 1024);
~Binlog();
void Lock() { mutex_.Lock(); }
void Unlock() { mutex_.Unlock(); }
Status Put(const std::string& item);
Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
/*
* Set Producer pro_num and pro_offset with lock
*/
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset, uint32_t term = 0, uint64_t index = 0);
// Need to hold Lock();
Status Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index);
uint64_t file_size() { return file_size_; }
std::string filename() { return filename_; }
bool IsBinlogIoError() { return binlog_io_error_; }
// need to hold mutex_
void SetTerm(uint32_t term) {
pstd::RWLock(&(version_->rwlock_), true);
version_->term_ = term;
version_->StableSave();
}
uint32_t term() {
pstd::RWLock(&(version_->rwlock_), true);
return version_->term_;
}
void Close();
private:
Status Put(const char* item, int len);
static Status AppendPadding(pstd::WritableFile* file, uint64_t* len);
// pstd::WritableFile *queue() { return queue_; }
void InitLogFile();
Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);
/*
* Produce
*/
Status Produce(const Slice& item, int* pro_offset);
std::atomic<bool> opened_;
Version* version_ = nullptr;
pstd::WritableFile* queue_ = nullptr;
pstd::RWFile* versionfile_ = nullptr;
pstd::Mutex mutex_;
uint32_t pro_num_ = 0;
int block_offset_ = 0;
char* pool_ = nullptr;
bool exit_all_consume_ = false;
const std::string binlog_path_;
uint64_t file_size_ = 0;
std::string filename_;
std::atomic<bool> binlog_io_error_;
// Not use
// int32_t retry_;
// No copying allowed
Binlog(const Binlog&);
void operator=(const Binlog&);
};
#endif