Skip to content

Commit 164f5f5

Browse files
committed
Step 1/2: Overwrite SnapStream to remove deps of snappy in ZK client 3.6
1 parent 6b0166f commit 164f5f5

File tree

2 files changed

+325
-0
lines changed
  • kyuubi-relocated-zookeeper-parent/kyuubi-relocated-zookeeper-36

2 files changed

+325
-0
lines changed

kyuubi-relocated-zookeeper-parent/kyuubi-relocated-zookeeper-36/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ under the License.
3232
<description>Relocated Zookeeper 3.6 classes used by Kyuubi internally.</description>
3333

3434
<properties>
35+
<slf4j.version>1.7.36</slf4j.version>
3536
<zookeeper.version>3.6.4</zookeeper.version>
3637
<curator.version>5.4.0</curator.version>
3738
<netty.version>4.1.91.Final</netty.version>
@@ -56,6 +57,13 @@ under the License.
5657
</dependencyManagement>
5758

5859
<dependencies>
60+
<dependency>
61+
<!-- kyuubi distribution provides logging classes -->
62+
<groupId>org.slf4j</groupId>
63+
<artifactId>slf4j-api</artifactId>
64+
<version>${slf4j.version}</version>
65+
<scope>provided</scope>
66+
</dependency>
5967
<dependency>
6068
<groupId>org.apache.zookeeper</groupId>
6169
<artifactId>zookeeper</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.zookeeper.server.persistence;
20+
21+
import java.io.BufferedInputStream;
22+
import java.io.BufferedOutputStream;
23+
import java.io.File;
24+
import java.io.FileInputStream;
25+
import java.io.FileNotFoundException;
26+
import java.io.FileOutputStream;
27+
import java.io.IOException;
28+
import java.io.InputStream;
29+
import java.io.OutputStream;
30+
import java.io.RandomAccessFile;
31+
import java.nio.ByteBuffer;
32+
import java.util.Arrays;
33+
import java.util.zip.Adler32;
34+
import java.util.zip.CheckedInputStream;
35+
import java.util.zip.CheckedOutputStream;
36+
import java.util.zip.GZIPInputStream;
37+
import java.util.zip.GZIPOutputStream;
38+
import org.apache.jute.InputArchive;
39+
import org.apache.jute.OutputArchive;
40+
import org.apache.zookeeper.common.AtomicFileOutputStream;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
import org.xerial.snappy.SnappyCodec;
44+
import org.xerial.snappy.SnappyInputStream;
45+
import org.xerial.snappy.SnappyOutputStream;
46+
47+
/**
48+
* Represent the Stream used in serialize and deserialize the Snapshot.
49+
*/
50+
public class SnapStream {
51+
52+
private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
53+
54+
public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE = "zookeeper.snapshot.compression.method";
55+
56+
private static StreamMode streamMode = StreamMode.fromString(
57+
System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
58+
StreamMode.DEFAULT_MODE.getName()));
59+
60+
static {
61+
LOG.info("{} = {}", ZOOKEEPER_SHAPSHOT_STREAM_MODE, streamMode);
62+
}
63+
64+
public enum StreamMode {
65+
GZIP("gz"),
66+
SNAPPY("snappy"),
67+
CHECKED("");
68+
69+
public static final StreamMode DEFAULT_MODE = CHECKED;
70+
71+
private String name;
72+
73+
StreamMode(String name) {
74+
this.name = name;
75+
}
76+
77+
public String getName() {
78+
return name;
79+
}
80+
81+
public String getFileExtension() {
82+
return name.isEmpty() ? "" : "." + name;
83+
}
84+
85+
public static StreamMode fromString(String name) {
86+
for (StreamMode c : values()) {
87+
if (c.getName().compareToIgnoreCase(name) == 0) {
88+
return c;
89+
}
90+
}
91+
return DEFAULT_MODE;
92+
}
93+
}
94+
95+
/**
96+
* Return the CheckedInputStream based on the extension of the fileName.
97+
*
98+
* @param file the file the InputStream read from
99+
* @return the specific InputStream
100+
* @throws IOException
101+
*/
102+
public static CheckedInputStream getInputStream(File file) throws IOException {
103+
FileInputStream fis = new FileInputStream(file);
104+
InputStream is;
105+
switch (getStreamMode(file.getName())) {
106+
case GZIP:
107+
is = new GZIPInputStream(fis);
108+
break;
109+
case SNAPPY:
110+
is = new SnappyInputStream(fis);
111+
break;
112+
case CHECKED:
113+
default:
114+
is = new BufferedInputStream(fis);
115+
}
116+
return new CheckedInputStream(is, new Adler32());
117+
}
118+
119+
/**
120+
* Return the OutputStream based on predefined stream mode.
121+
*
122+
* @param file the file the OutputStream writes to
123+
* @param fsync sync the file immediately after write
124+
* @return the specific OutputStream
125+
* @throws IOException
126+
*/
127+
public static CheckedOutputStream getOutputStream(File file, boolean fsync) throws IOException {
128+
OutputStream fos = fsync ? new AtomicFileOutputStream(file) : new FileOutputStream(file);
129+
OutputStream os;
130+
switch (streamMode) {
131+
case GZIP:
132+
os = new GZIPOutputStream(fos);
133+
break;
134+
case SNAPPY:
135+
os = new SnappyOutputStream(fos);
136+
break;
137+
case CHECKED:
138+
default:
139+
os = new BufferedOutputStream(fos);
140+
}
141+
return new CheckedOutputStream(os, new Adler32());
142+
}
143+
144+
/**
145+
* Write specific seal to the OutputArchive and close the OutputStream.
146+
* Currently, only CheckedOutputStream will write it's checkSum to the
147+
* end of the stream.
148+
*
149+
*/
150+
public static void sealStream(CheckedOutputStream os, OutputArchive oa) throws IOException {
151+
long val = os.getChecksum().getValue();
152+
oa.writeLong(val, "val");
153+
oa.writeString("/", "path");
154+
}
155+
156+
/**
157+
* Verify the integrity of the seal, only CheckedInputStream will verify
158+
* the checkSum of the content.
159+
*
160+
*/
161+
static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException {
162+
long checkSum = is.getChecksum().getValue();
163+
long val = ia.readLong("val");
164+
ia.readString("path"); // Read and ignore "/" written by SealStream.
165+
if (val != checkSum) {
166+
throw new IOException("CRC corruption");
167+
}
168+
}
169+
170+
/**
171+
* Verifies that the file is a valid snapshot. Snapshot may be invalid if
172+
* it's incomplete as in a situation when the server dies while in the
173+
* process of storing a snapshot. Any files that are improperly formated
174+
* or corrupted are invalid. Any file that is not a snapshot is also an
175+
* invalid snapshot.
176+
*
177+
* @param file file to verify
178+
* @return true if the snapshot is valid
179+
* @throws IOException
180+
*/
181+
public static boolean isValidSnapshot(File file) throws IOException {
182+
if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
183+
return false;
184+
}
185+
186+
boolean isValid = false;
187+
switch (getStreamMode(file.getName())) {
188+
case GZIP:
189+
isValid = isValidGZipStream(file);
190+
break;
191+
case SNAPPY:
192+
isValid = isValidSnappyStream(file);
193+
break;
194+
case CHECKED:
195+
default:
196+
isValid = isValidCheckedStream(file);
197+
}
198+
return isValid;
199+
}
200+
201+
public static void setStreamMode(StreamMode mode) {
202+
streamMode = mode;
203+
}
204+
205+
public static StreamMode getStreamMode() {
206+
return streamMode;
207+
}
208+
209+
/**
210+
* Detect the stream mode from file name extension
211+
*
212+
* @param fileName
213+
* @return
214+
*/
215+
public static StreamMode getStreamMode(String fileName) {
216+
String[] splitSnapName = fileName.split("\\.");
217+
218+
// Use file extension to detect format
219+
if (splitSnapName.length > 1) {
220+
String mode = splitSnapName[splitSnapName.length - 1];
221+
return StreamMode.fromString(mode);
222+
}
223+
224+
return StreamMode.CHECKED;
225+
}
226+
227+
/**
228+
* Certify the GZip stream integrity by checking the header
229+
* for the GZip magic string
230+
*
231+
* @param f file to verify
232+
* @return true if it has the correct GZip magic string
233+
* @throws IOException
234+
*/
235+
private static boolean isValidGZipStream(File f) throws IOException {
236+
byte[] byteArray = new byte[2];
237+
try (FileInputStream fis = new FileInputStream(f)) {
238+
if (2 != fis.read(byteArray, 0, 2)) {
239+
LOG.error("Read incorrect number of bytes from {}", f.getName());
240+
return false;
241+
}
242+
ByteBuffer bb = ByteBuffer.wrap(byteArray);
243+
byte[] magicHeader = new byte[2];
244+
bb.get(magicHeader, 0, 2);
245+
int magic = magicHeader[0] & 0xff | ((magicHeader[1] << 8) & 0xff00);
246+
return magic == GZIPInputStream.GZIP_MAGIC;
247+
} catch (FileNotFoundException e) {
248+
LOG.error("Unable to open file {}", f.getName(), e);
249+
return false;
250+
}
251+
}
252+
253+
/**
254+
* Certify the Snappy stream integrity by checking the header
255+
* for the Snappy magic string
256+
*
257+
* @param f file to verify
258+
* @return true if it has the correct Snappy magic string
259+
* @throws IOException
260+
*/
261+
private static boolean isValidSnappyStream(File f) throws IOException {
262+
byte[] byteArray = new byte[SnappyCodec.MAGIC_LEN];
263+
try (FileInputStream fis = new FileInputStream(f)) {
264+
if (SnappyCodec.MAGIC_LEN != fis.read(byteArray, 0, SnappyCodec.MAGIC_LEN)) {
265+
LOG.error("Read incorrect number of bytes from {}", f.getName());
266+
return false;
267+
}
268+
ByteBuffer bb = ByteBuffer.wrap(byteArray);
269+
byte[] magicHeader = new byte[SnappyCodec.MAGIC_LEN];
270+
bb.get(magicHeader, 0, SnappyCodec.MAGIC_LEN);
271+
return Arrays.equals(magicHeader, SnappyCodec.getMagicHeader());
272+
} catch (FileNotFoundException e) {
273+
LOG.error("Unable to open file {}", f.getName(), e);
274+
return false;
275+
}
276+
}
277+
278+
/**
279+
* Certify the Checked stream integrity by checking the header
280+
* length and format
281+
*
282+
* @param f file to verify
283+
* @return true if it has the correct header
284+
* @throws IOException
285+
*/
286+
private static boolean isValidCheckedStream(File f) throws IOException {
287+
try (RandomAccessFile raf = new RandomAccessFile(f, "r")) {
288+
// including the header and the last / bytes
289+
// the snapshot should be at least 10 bytes
290+
if (raf.length() < 10) {
291+
return false;
292+
}
293+
294+
raf.seek(raf.length() - 5);
295+
byte[] bytes = new byte[5];
296+
int readlen = 0;
297+
int l;
298+
while (readlen < 5 && (l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) {
299+
readlen += l;
300+
}
301+
if (readlen != bytes.length) {
302+
LOG.info("Invalid snapshot {}. too short, len = {} bytes", f.getName(), readlen);
303+
return false;
304+
}
305+
ByteBuffer bb = ByteBuffer.wrap(bytes);
306+
int len = bb.getInt();
307+
byte b = bb.get();
308+
if (len != 1 || b != '/') {
309+
LOG.info("Invalid snapshot {}. len = {}, byte = {}", f.getName(), len, (b & 0xff));
310+
return false;
311+
}
312+
}
313+
314+
return true;
315+
}
316+
317+
}

0 commit comments

Comments
 (0)