Skip to content

Commit

Permalink
move cached info from TachyonFS to WorkerClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingfei committed Dec 22, 2014
1 parent c0cfee5 commit 59a0071
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 46 deletions.
50 changes: 4 additions & 46 deletions core/src/main/java/tachyon/client/TachyonFS.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,6 @@ public static synchronized TachyonFS get(String masterHost, int masterPort, bool

// Mapping from Id to Available space of each StorageDir.
private final Map<Long, Long> mIdToAvailableSpaceBytes = new HashMap<Long, Long>();
// Mapping from Id to root path of each StorageDir.
private final Map<Long, String> mIdToWorkerDirPath = new HashMap<Long, String>();
// Mapping from Id to under file system of each StorageDir
private final Map<Long, UnderFileSystem> mIdToDirFS = new HashMap<Long, UnderFileSystem>();
// Mapping from Id to user temporary path of each StorageDir.
private final Map<Long, String> mIdToUserLocalTempFolder = new HashMap<Long, String>();

Expand All @@ -165,7 +161,6 @@ private TachyonFS(InetSocketAddress masterAddress, boolean zookeeperMode) throws
mMasterClient =
mCloser.register(new MasterClient(mMasterAddress, mZookeeperMode, mExecutorService));
mWorkerClient = mCloser.register(new WorkerClient(mMasterClient, mExecutorService));
initializeDirFS(getWorkerDirInfos());
}

/**
Expand Down Expand Up @@ -278,10 +273,10 @@ public synchronized String createAndGetUserLocalTempFolder(long storageDirId) th
return mIdToUserLocalTempFolder.get(storageDirId);
}

if (mIdToWorkerDirPath.containsKey(storageDirId)) {
String dirPath = mIdToWorkerDirPath.get(storageDirId);
String dirPath = mWorkerClient.getWorkerDirPath(storageDirId);
if (dirPath != null) {
String userLocalTempFolder = CommonUtils.concat(dirPath, userTempFolder);
UnderFileSystem dirFS = mIdToDirFS.get(storageDirId);
UnderFileSystem dirFS = mWorkerClient.getWorkerDirFS(storageDirId);
boolean ret = false;
if (dirFS.exists(userLocalTempFolder)) {
if (!dirFS.isFile(userLocalTempFolder)) {
Expand Down Expand Up @@ -752,7 +747,7 @@ synchronized String getLocalBlockFilePath(ClientBlockInfo blockInfo) throws IOEx
* @throws IOException
*/
synchronized String getLocalBlockFilePath(long storageDirId, long blockId) throws IOException {
String dirPath = mIdToWorkerDirPath.get(storageDirId);
String dirPath = mWorkerClient.getWorkerDirPath(storageDirId);
if (dirPath != null) {
String dataFolder = getLocalDataFolder();
return CommonUtils.concat(dirPath, dataFolder, blockId);
Expand Down Expand Up @@ -848,16 +843,6 @@ public synchronized List<WorkerDirInfo> getWorkerDirInfos() throws IOException {
return mWorkerClient.getWorkerDirInfos();
}

/**
* Get path of specified StorageDir
*
* @param storageDirId the id of the StorageDir
* @return path of the StorageDir
*/
public String getWorkerDirPath(long storageDirId) {
return mIdToWorkerDirPath.get(storageDirId);
}

/**
* @return all the works' info
* @throws IOException
Expand All @@ -874,33 +859,6 @@ public synchronized boolean hasLocalWorker() throws IOException {
return mWorkerClient.isLocal();
}

/**
* Used to initialize file system of StorageDirs
*
* @param workerDirInfos information of StorageDirs on the worker
* @throws IOException
*/
private void initializeDirFS(List<WorkerDirInfo> workerDirInfos) throws IOException {
if (workerDirInfos == null) {
return;
}
for (WorkerDirInfo dirInfo : workerDirInfos) {
long storageDirId = dirInfo.getStorageDirId();
mIdToWorkerDirPath.put(storageDirId, dirInfo.getDirPath());

UnderFileSystem fs;
try {
fs =
UnderFileSystem.get(dirInfo.getDirPath(),
CommonUtils.byteArrayToObject(dirInfo.getConf()));
} catch (ClassNotFoundException e) {
throw new IOException(e.getMessage());
}
mIdToDirFS.put(storageDirId, fs);
}
return;
}

/**
* @return true if this client is connected to master, false otherwise
*/
Expand Down
52 changes: 52 additions & 0 deletions core/src/main/java/tachyon/worker/WorkerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand All @@ -35,6 +37,7 @@
import tachyon.Constants;
import tachyon.HeartbeatExecutor;
import tachyon.HeartbeatThread;
import tachyon.UnderFileSystem;
import tachyon.conf.UserConf;
import tachyon.master.MasterClient;
import tachyon.thrift.BlockInfoException;
Expand All @@ -46,6 +49,7 @@
import tachyon.thrift.TachyonException;
import tachyon.thrift.WorkerDirInfo;
import tachyon.thrift.WorkerService;
import tachyon.util.CommonUtils;
import tachyon.util.NetworkUtils;

/**
Expand All @@ -64,9 +68,14 @@ public class WorkerClient implements Closeable {
private NetAddress mWorkerNetAddress;
private boolean mConnected = false;
private boolean mIsLocal = false;
private boolean mDirFSInitialized = false;
private String mDataFolder = null;
private final ExecutorService mExecutorService;
private Future<?> mHeartbeat;
// Mapping from Id to root path of each StorageDir.
private final Map<Long, String> mIdToDirPath = new HashMap<Long, String>();
// Mapping from Id to under file system of each StorageDir
private final Map<Long, UnderFileSystem> mIdToDirFS = new HashMap<Long, UnderFileSystem>();

/**
* Create a WorkerClient, with a given MasterClient.
Expand Down Expand Up @@ -289,6 +298,22 @@ public synchronized NetAddress getNetAddress() {
return mWorkerNetAddress;
}

public synchronized String getWorkerDirPath(long storageDirId) throws IOException {
if (!mDirFSInitialized) {
initializeDirFS(getWorkerDirInfos());
mDirFSInitialized = true;
}
return mIdToDirPath.get(storageDirId);
}

public synchronized UnderFileSystem getWorkerDirFS(long storageDirId) throws IOException {
if (!mDirFSInitialized) {
initializeDirFS(getWorkerDirInfos());
mDirFSInitialized = true;
}
return mIdToDirFS.get(storageDirId);
}

/**
* Get the local user temporary folder of the specified user.
*
Expand Down Expand Up @@ -340,6 +365,33 @@ public synchronized List<WorkerDirInfo> getWorkerDirInfos() throws IOException {
}
}

/**
* Used to initialize file system of StorageDirs
*
* @param workerDirInfos information of StorageDirs on the worker
* @throws IOException
*/
private void initializeDirFS(List<WorkerDirInfo> workerDirInfos) throws IOException {
if (workerDirInfos == null) {
return;
}
for (WorkerDirInfo dirInfo : workerDirInfos) {
long storageDirId = dirInfo.getStorageDirId();
mIdToDirPath.put(storageDirId, dirInfo.getDirPath());

UnderFileSystem fs;
try {
fs =
UnderFileSystem.get(dirInfo.getDirPath(),
CommonUtils.byteArrayToObject(dirInfo.getConf()));
} catch (ClassNotFoundException e) {
throw new IOException(e.getMessage());
}
mIdToDirFS.put(storageDirId, fs);
}
return;
}

/**
* @return true if it's connected to the worker, false otherwise.
*/
Expand Down

0 comments on commit 59a0071

Please sign in to comment.