forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_writer.hh
146 lines (132 loc) · 5.28 KB
/
stream_writer.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Modified by Cloudius Systems.
* Copyright 2015 Cloudius Systems.
*/
#pragma once
#include "streaming/stream_session.hh"
#include "sstables/sstables.hh"
#include <map>
namespace streaming {
/**
* StreamWriter writes given section of the SSTable to given channel.
*/
class stream_writer {
static constexpr int DEFAULT_CHUNK_SIZE = 64 * 1024;
protected:
sstables::sstable& sstable;
std::map<int64_t, int64_t> sections;
//StreamRateLimiter limiter;
stream_session& session;
#if 0
private OutputStream compressedOutput;
// allocate buffer to use for transfers only once
private byte[] transferBuffer;
#endif
public:
stream_writer(sstables::sstable& sstable_, std::map<int64_t, int64_t> sections_, stream_session& session_)
: sstable(sstable_)
, sections(std::move(sections_))
, session(session_) {
//this.limiter = StreamManager.getRateLimiter(session.peer);
}
#if 0
/**
* Stream file of specified sections to given channel.
*
* StreamWriter uses LZF compression on wire to decrease size to transfer.
*
* @param channel where this writes data to
* @throws IOException on any I/O error
*/
public void write(WritableByteChannel channel) throws IOException
{
long totalSize = totalSize();
RandomAccessReader file = sstable.openDataReader();
ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
: null;
transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
// setting up data compression stream
compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel));
long progress = 0L;
try
{
// stream each of the required sections of the file
for (Pair<Long, Long> section : sections)
{
long start = validator == null ? section.left : validator.chunkStart(section.left);
int readOffset = (int) (section.left - start);
// seek to the beginning of the section
file.seek(start);
if (validator != null)
validator.seek(start);
// length of the section to read
long length = section.right - start;
// tracks write progress
long bytesRead = 0;
while (bytesRead < length)
{
long lastBytesRead = write(file, validator, readOffset, length, bytesRead);
bytesRead += lastBytesRead;
progress += (lastBytesRead - readOffset);
session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
readOffset = 0;
}
// make sure that current section is send
compressedOutput.flush();
}
}
finally
{
// no matter what happens close file
FileUtils.closeQuietly(file);
FileUtils.closeQuietly(validator);
}
}
#endif
protected:
int64_t total_size();
#if 0
/**
* Sequentially read bytes from the file and write them to the output stream
*
* @param reader The file reader to read from
* @param validator validator to verify data integrity
* @param start number of bytes to skip transfer, but include for validation.
* @param length The full length that should be read from {@code reader}
* @param bytesTransferred Number of bytes already read out of {@code length}
*
* @return Number of bytes read
*
* @throws java.io.IOException on any I/O error
*/
protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException
{
int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred);
int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer());
reader.readFully(transferBuffer, 0, minReadable);
if (validator != null)
validator.validate(transferBuffer, 0, minReadable);
limiter.acquire(toTransfer - start);
compressedOutput.write(transferBuffer, start, (toTransfer - start));
return toTransfer;
}
#endif
};
} // namespace streaming