diff --git a/server/controller/src/main/java/ai/starwhale/mlops/common/TarFileUtil.java b/server/controller/src/main/java/ai/starwhale/mlops/common/TarFileUtil.java index 0710516564..3379a6a43b 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/common/TarFileUtil.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/common/TarFileUtil.java @@ -16,6 +16,7 @@ package ai.starwhale.mlops.common; +import ai.starwhale.mlops.storage.NopCloserInputStream; import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -148,7 +149,8 @@ public static void extract( if (entry.isDirectory()) { continue; } - func.apply(entry.getName(), entry.getSize(), archive); + var is = new NopCloserInputStream(archive); + func.apply(entry.getName(), entry.getSize(), is); } } catch (IOException e) { throw new RuntimeException(e); diff --git a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java index 495e14e07e..b3c945d46b 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/datastore/impl/MemoryTableImpl.java @@ -126,7 +126,7 @@ var record = reader.read(); this.recordMap.put(record.get(this.schema.getKeyColumn()), record); } } - } catch (IOException e) { + } catch (IOException | RuntimeException e) { throw new SwProcessException(ErrorType.DATASTORE, "failed to load " + this.tableName, e); } } diff --git a/server/storage-access-layer/src/main/java/ai/starwhale/mlops/storage/NopCloserInputStream.java b/server/storage-access-layer/src/main/java/ai/starwhale/mlops/storage/NopCloserInputStream.java new file mode 100644 index 0000000000..e690d75f70 --- /dev/null +++ b/server/storage-access-layer/src/main/java/ai/starwhale/mlops/storage/NopCloserInputStream.java @@ -0,0 +1,93 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.storage; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class NopCloserInputStream extends InputStream { + private final InputStream inputStream; + + public NopCloserInputStream(InputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public int read() throws IOException { + return this.inputStream.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return inputStream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); + } + + @Override + public byte[] readAllBytes() throws IOException { + return inputStream.readAllBytes(); + } + + @Override + public byte[] readNBytes(int len) throws IOException { + return inputStream.readNBytes(len); + } + + @Override + public int readNBytes(byte[] b, int off, int len) throws IOException { + return inputStream.readNBytes(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return inputStream.skip(n); + } + + @Override + public int available() throws IOException { + return inputStream.available(); + } + + @Override + public void close() throws IOException { + } + + @Override + public void mark(int readLimit) { + inputStream.mark(readLimit); + } + + @Override + public void reset() throws IOException { + inputStream.reset(); + } + + @Override + public boolean markSupported() { + return inputStream.markSupported(); + } + + @Override + public long transferTo(OutputStream out) throws IOException { + return inputStream.transferTo(out); + } +} diff --git a/server/storage-access-layer/src/main/java/ai/starwhale/mlops/storage/aliyun/StorageAccessServiceAliyun.java b/server/storage-access-layer/src/main/java/ai/starwhale/mlops/storage/aliyun/StorageAccessServiceAliyun.java index 2d5d4bea36..1f23b6edb1 100644 --- a/server/storage-access-layer/src/main/java/ai/starwhale/mlops/storage/aliyun/StorageAccessServiceAliyun.java +++ b/server/storage-access-layer/src/main/java/ai/starwhale/mlops/storage/aliyun/StorageAccessServiceAliyun.java @@ -17,6 +17,7 @@ package ai.starwhale.mlops.storage.aliyun; import ai.starwhale.mlops.storage.LengthAbleInputStream; +import ai.starwhale.mlops.storage.NopCloserInputStream; import ai.starwhale.mlops.storage.StorageAccessService; import ai.starwhale.mlops.storage.StorageObjectInfo; import ai.starwhale.mlops.storage.s3.S3Config; @@ -76,7 +77,10 @@ public StorageObjectInfo head(String path) throws IOException { public void put(String path, InputStream inputStream, long size) throws IOException { var meta = new ObjectMetadata(); meta.setContentLength(size); - this.ossClient.putObject(this.bucket, path, inputStream, meta); + // aliyun oss sdk will close the original input stream at the end + // https://github.com/aliyun/aliyun-oss-java-sdk/blob/10727ab9f79efa2a4f2c7fbec348e44c04dd6c42/src/main/java/com/aliyun/oss/common/comm/ServiceClient.java#L89 + var is = new NopCloserInputStream(inputStream); + this.ossClient.putObject(this.bucket, path, is, meta); } @Override