Skip to content

Commit

Permalink
Revert "Merge hierarchy store into Tachyon [TACHYON-33 sub-task 5 & 6]"
Browse files Browse the repository at this point in the history
  • Loading branch information
haoyuan committed Dec 24, 2014
1 parent fe42906 commit dee5305
Show file tree
Hide file tree
Showing 55 changed files with 1,773 additions and 6,699 deletions.
6 changes: 1 addition & 5 deletions conf/tachyon-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,12 @@ CONF_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
export TACHYON_JAVA_OPTS+="
-Dlog4j.configuration=file:$CONF_DIR/log4j.properties
-Dtachyon.debug=false
-Dtachyon.worker.hierarchystore.level.max=1
-Dtachyon.worker.hierarchystore.level0.alias=MEM
-Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
-Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
-Dtachyon.underfs.address=$TACHYON_UNDERFS_ADDRESS
-Dtachyon.underfs.hdfs.impl=$TACHYON_UNDERFS_HDFS_IMPL
-Dtachyon.data.folder=$TACHYON_UNDERFS_ADDRESS/tmp/tachyon/data
-Dtachyon.workers.folder=$TACHYON_UNDERFS_ADDRESS/tmp/tachyon/workers
-Dtachyon.worker.memory.size=$TACHYON_WORKER_MEMORY_SIZE
-Dtachyon.worker.data.folder=/tachyonworker/
-Dtachyon.worker.data.folder=$TACHYON_RAM_FOLDER/tachyonworker/
-Dtachyon.master.worker.timeout.ms=60000
-Dtachyon.master.hostname=$TACHYON_MASTER_ADDRESS
-Dtachyon.master.journal.folder=$TACHYON_HOME/journal/
Expand Down
19 changes: 7 additions & 12 deletions core/src/main/java/tachyon/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,13 @@ public static void main(String[] args) throws IOException {
+ System.currentTimeMillis());
} else if (args[0].toUpperCase().equals("WORKER")) {
WorkerConf workerConf = WorkerConf.get();
for (int level = 0; level < workerConf.MAX_HIERARCHY_STORAGE_LEVEL; level ++) {
String[] dirPaths = workerConf.STORAGE_TIER_DIRS[level].split(",");
for (int i = 0; i < dirPaths.length; i ++) {
String dataPath = CommonUtils.concat(dirPaths[i].trim(), workerConf.DATA_FOLDER);
UnderFileSystem ufs = UnderFileSystem.get(dataPath);
System.out.println("Removing data under folder: " + dataPath);
if (ufs.exists(dataPath)) {
String[] files = ufs.list(dataPath);
for (String file : files) {
ufs.delete(CommonUtils.concat(dataPath, file), true);
}
}
String localFolder = workerConf.DATA_FOLDER;
UnderFileSystem ufs = UnderFileSystem.get(localFolder);
System.out.println("Removing local data under folder: " + localFolder);
if (ufs.exists(localFolder)) {
String[] files = ufs.list(localFolder);
for (String file : files) {
ufs.delete(CommonUtils.concat(localFolder, file), true);
}
}
} else {
Expand Down
13 changes: 0 additions & 13 deletions core/src/main/java/tachyon/StorageDirId.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,6 @@
public class StorageDirId {
static final long UNKNOWN = -1;

/**
* Compare storage level of StorageDirs
*
* @param storageDirIdLeft The left value of StorageDirId to be compared
* @param storageDirIdRight The right value of StorageDirId to be compared
* @return negative if storage level of left StorageDirId is higher than the right one, zero if
* equals, positive if lower.
*/
public static int compareStorageLevel(long storageDirIdLeft, long storageDirIdRight) {
return getStorageLevelAliasValue(storageDirIdLeft)
- getStorageLevelAliasValue(storageDirIdRight);
}

/**
* Generate StorageDirId from given information
*
Expand Down
1 change: 0 additions & 1 deletion core/src/main/java/tachyon/Users.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class Users {
public static final int DATASERVER_USER_ID = -1;
public static final int CHECKPOINT_USER_ID = -2;
public static final int EVICT_USER_ID = -3;

private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
* the License.
*/

package tachyon.worker;
package tachyon.client;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;

import tachyon.TachyonURI;

Expand Down Expand Up @@ -76,21 +75,6 @@ public int append(long blockOffset, byte[] buf, int offset, int length) throws I
*/
public abstract boolean delete() throws IOException;

/**
* Get channel used to access block file
*
* @return the channel bounded with the block file
*/
public abstract ByteChannel getChannel();

/**
* Get length of the block file
*
* @return the length of the block file
* @throws IOException
*/
public abstract long getLength() throws IOException;

/**
* Read data from block file
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
* the License.
*/

package tachyon.worker;
package tachyon.client;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;

Expand Down Expand Up @@ -84,16 +83,6 @@ public boolean delete() throws IOException {
return new File(mFilePath).delete();
}

@Override
public ByteChannel getChannel() {
return mLocalFileChannel;
}

@Override
public long getLength() throws IOException {
return mLocalFile.length();
}

@Override
public ByteBuffer read(long blockOffset, int length) throws IOException {
long fileLength = mLocalFile.length();
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/java/tachyon/client/BlockInStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ public static BlockInStream get(TachyonFile tachyonFile, ReadType readType, int
Object ufsConf) throws IOException {
TachyonByteBuffer buf = tachyonFile.readLocalByteBuffer(blockIndex);
if (buf != null) {
if (readType.isPromote()) {
tachyonFile.promoteBlock(blockIndex);
}
return new LocalBlockInStream(tachyonFile, readType, blockIndex, buf);
}

Expand Down
71 changes: 22 additions & 49 deletions core/src/main/java/tachyon/client/BlockOutStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.slf4j.LoggerFactory;

import tachyon.Constants;
import tachyon.thrift.ClientLocationInfo;
import tachyon.util.CommonUtils;

/**
Expand All @@ -44,7 +43,6 @@ public class BlockOutStream extends OutStream {

private long mInFileBytes = 0;
private long mWrittenBytes = 0;
private ClientLocationInfo mLocationInfo = null;

private String mLocalFilePath = null;
private RandomAccessFile mLocalFile = null;
Expand Down Expand Up @@ -83,19 +81,28 @@ public class BlockOutStream extends OutStream {
throw new IOException(msg);
}

File localFolder = mTachyonFS.createAndGetUserLocalTempFolder();
if (localFolder == null) {
mCanWrite = false;
String msg = "Failed to create temp user folder for tachyon client.";
throw new IOException(msg);
}

mLocalFilePath = CommonUtils.concat(localFolder.getPath(), mBlockId);
mLocalFile = new RandomAccessFile(mLocalFilePath, "rw");
mLocalFileChannel = mLocalFile.getChannel();
// change the permission of the temporary file in order that the worker can move it.
CommonUtils.changeLocalFileToFullPermission(mLocalFilePath);
// use the sticky bit, only the client and the worker can write to the block
CommonUtils.setLocalFileStickyBit(mLocalFilePath);
LOG.info(mLocalFilePath + " was created!");

mBuffer = ByteBuffer.allocate(mUserConf.FILE_BUFFER_BYTES + 4);
}

private synchronized void appendCurrentBuffer(byte[] buf, int offset, int length)
throws IOException {
boolean reqResult = false;
if (mLocationInfo == null) {
mLocationInfo = mTachyonFS.requestSpace(length);
reqResult = (mLocationInfo != null);
} else {
reqResult = mTachyonFS.requestSpace(mLocationInfo.getStorageDirId(), length);
}
if (!reqResult) {
if (!mTachyonFS.requestSpace(length)) {
mCanWrite = false;

String msg =
Expand All @@ -104,17 +111,6 @@ private synchronized void appendCurrentBuffer(byte[] buf, int offset, int length

throw new IOException(msg);
}
if (mLocalFilePath == null) {
String localTempFolder = createUserLocalTempFolder();
mLocalFilePath = CommonUtils.concat(localTempFolder, mBlockId);
mLocalFile = new RandomAccessFile(mLocalFilePath, "rw");
mLocalFileChannel = mLocalFile.getChannel();
// change the permission of the temporary file in order that the worker can move it.
CommonUtils.changeLocalFileToFullPermission(mLocalFilePath);
// use the sticky bit, only the client and the worker can write to the block
CommonUtils.setLocalFileStickyBit(mLocalFilePath);
LOG.info(mLocalFilePath + " was created!");
}

MappedByteBuffer out = mLocalFileChannel.map(MapMode.READ_WRITE, mInFileBytes, length);
out.put(buf, offset, length);
Expand Down Expand Up @@ -147,39 +143,16 @@ public void close() throws IOException {
}

if (mCancel) {
if (mLocationInfo != null) { // if file was written
mTachyonFS.releaseSpace(mLocationInfo.getStorageDirId(),
mWrittenBytes - mBuffer.position());
new File(mLocalFilePath).delete();
LOG.info("Canceled output of block " + mBlockId + ", deleted local file "
+ mLocalFilePath);
}
} else if (mLocationInfo != null) {
mTachyonFS.cacheBlock(mLocationInfo.getStorageDirId(), mBlockId);
mTachyonFS.releaseSpace(mWrittenBytes - mBuffer.position());
new File(mLocalFilePath).delete();
LOG.info("Canceled output of block " + mBlockId + ", deleted local file " + mLocalFilePath);
} else {
mTachyonFS.cacheBlock(mBlockId);
}
}
mClosed = true;
}

/**
* Create temporary folder for the user in some StorageDir
*
* @return the path of the folder
* @throws IOException
*/
private String createUserLocalTempFolder() throws IOException {
String localTempFolder = null;
if (mLocationInfo != null) {
localTempFolder = mTachyonFS.createAndGetUserLocalTempFolder(mLocationInfo.getPath());
}

if (localTempFolder == null) {
mCanWrite = false;
throw new IOException("Failed to create temp user folder for tachyon client.");
}
return localTempFolder;
}

@Override
public void flush() throws IOException {
// Since this only writes to memory, this flush is not outside visible.
Expand Down
16 changes: 2 additions & 14 deletions core/src/main/java/tachyon/client/ReadType.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@ public enum ReadType {
/**
* Read the file and cache it.
*/
CACHE(2),

/**
* Read the file and promote it back to top StorageTier
*/
CACHE_PROMOTE(3);
CACHE(2);

private final int mValue;

Expand All @@ -52,13 +47,6 @@ public int getValue() {
* @return true if the read type is CACHE, false otherwise
*/
public boolean isCache() {
return mValue == CACHE.mValue || mValue == CACHE_PROMOTE.mValue;
}

/**
* @return true if the read type is CACHE_PROMOTE, false otherwise
*/
public boolean isPromote() {
return mValue == CACHE_PROMOTE.mValue;
return mValue == CACHE.mValue;
}
}
16 changes: 9 additions & 7 deletions core/src/main/java/tachyon/client/RemoteBlockInStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import tachyon.conf.UserConf;
import tachyon.thrift.ClientBlockInfo;
import tachyon.thrift.NetAddress;
import tachyon.util.CommonUtils;
import tachyon.util.NetworkUtils;
import tachyon.worker.nio.DataServerMessage;

Expand Down Expand Up @@ -237,18 +238,20 @@ public static ByteBuffer readRemoteByteBuffer(TachyonFS tachyonFS, ClientBlockIn
List<NetAddress> blockLocations = blockInfo.getLocations();
LOG.info("Block locations:" + blockLocations);

for (NetAddress blockLocation : blockLocations) {
String host = blockLocation.mHost;
int port = blockLocation.mSecondaryPort;
for (int k = 0; k < blockLocations.size(); k++) {
String host = blockLocations.get(k).mHost;
int port = blockLocations.get(k).mSecondaryPort;

// The data is not in remote machine's memory if port == -1.
// The data is not in remote machine's memory if port == -1
if (port == -1) {
continue;
}
if (host.equals(InetAddress.getLocalHost().getHostName())
|| host.equals(InetAddress.getLocalHost().getHostAddress())
|| host.equals(NetworkUtils.getLocalHostName())) {
LOG.warn("Master thinks the local machine has data, But not!");
String localFileName =
CommonUtils.concat(tachyonFS.getLocalDataFolder(), blockInfo.blockId);
LOG.warn("Master thinks the local machine has data " + localFileName + "! But not!");
}
LOG.info(host + ":" + port + " current host is " + NetworkUtils.getLocalHostName() + " "
+ NetworkUtils.getLocalIpAddress());
Expand Down Expand Up @@ -290,8 +293,7 @@ private static ByteBuffer retrieveByteBufferFromRemoteMachine(InetSocketAddress

LOG.info("Data " + blockId + " to remote machine " + address + " sent");

DataServerMessage recvMsg =
DataServerMessage.createBlockResponseMessage(false, blockId, null);
DataServerMessage recvMsg = DataServerMessage.createBlockResponseMessage(false, blockId);
while (!recvMsg.isMessageReady()) {
int numRead = recvMsg.recv(socketChannel);
if (numRead == -1) {
Expand Down
Loading

0 comments on commit dee5305

Please sign in to comment.