Skip to content

Commit 24b5e2a

Browse files
authored
Merge pull request #9 from sakagg/multithread
Multithreading all the operations
2 parents f39b60b + be3d407 commit 24b5e2a

File tree

2 files changed

+197
-126
lines changed

2 files changed

+197
-126
lines changed

src/Client/Client.java

Lines changed: 163 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,13 @@
1010
import NameNode.NameNode;
1111
import Proto.Hdfs;
1212
import Proto.ProtoMessage;
13-
import com.google.protobuf.ByteString;
1413
import com.google.protobuf.InvalidProtocolBufferException;
1514
import java.io.BufferedReader;
1615
import java.io.FileInputStream;
1716
import java.io.FileNotFoundException;
1817
import java.io.FileOutputStream;
1918
import java.io.FileReader;
2019
import java.io.IOException;
21-
import java.io.InputStream;
2220
import java.io.OutputStream;
2321
import java.net.MalformedURLException;
2422
import java.rmi.Naming;
@@ -36,67 +34,25 @@
3634
*
3735
* @author saksham
3836
*/
39-
public class Client {
40-
private static final String NN_NAME = "NameNode";
41-
private static final String DN_PREFIX = "DataNode";
42-
private static final Integer CHUNK_SIZE = 128*1024*1024;
43-
private static final Properties props = new Properties();
37+
38+
class ClientOperation implements Runnable {
4439

45-
private INameNode nn = null;
40+
public enum OperationType { PUT, GET, LIST };
4641

47-
public static void log(String s) {
48-
String op = String.valueOf(System.currentTimeMillis()) + " ";
49-
op += "[Client] ";
50-
op += ": ";
51-
System.out.println(op + s);
52-
}
42+
private OperationType op;
43+
private INameNode nn;
44+
private String inputFilename;
45+
private String outputFilename;
5346

54-
public static void main(String args[]) {
55-
try {
56-
props.load(new BufferedReader(new FileReader("config.properties")));
57-
} catch (IOException ex) {
58-
Logger.getLogger(NameNode.class.getName()).log(Level.SEVERE, null, ex);
59-
}
60-
61-
Client client = new Client();
62-
client.findnn();
63-
client.mainloop();
47+
public ClientOperation(OperationType op, INameNode nn) {
48+
this.op = op;
49+
this.nn = nn;
6450
}
6551

66-
public void findnn() {
67-
while(nn == null)
68-
{
69-
try {
70-
nn = (INameNode) Naming.lookup("rmi://" + props.getProperty("rmi.namenode.ip")
71-
+ ":" + props.getProperty("rmi.namenode.port") + "/" + NN_NAME);
72-
log("Found Name Node");
73-
} catch (NotBoundException | MalformedURLException | RemoteException ex) {
74-
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
75-
}
76-
if (nn == null)
77-
try {
78-
Thread.sleep(1000);
79-
} catch (Exception e) {}
80-
}
81-
}
82-
83-
public void listFiles() {
84-
byte[] inp = "*".getBytes();
85-
try {
86-
byte[] res = nn.list(inp);
87-
Hdfs.ListFilesResponse listFilesResponse = Hdfs.ListFilesResponse.parseFrom(res);
88-
if(listFilesResponse.getStatus() == 1) {
89-
listFilesResponse.getFileNamesList().stream().forEach((fileName) -> {
90-
System.out.println(fileName);
91-
});
92-
System.out.println("");
93-
} else {
94-
System.err.println("Some Error Occured During Listing File");
95-
}
96-
} catch (RemoteException | InvalidProtocolBufferException ex) {
97-
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
98-
}
99-
52+
public ClientOperation(OperationType op, INameNode nn, String inputFilename, String outputFilename) {
53+
this(op, nn);
54+
this.inputFilename = inputFilename;
55+
this.outputFilename = outputFilename;
10056
}
10157

10258
public Integer openFileForWrite(String filename) {
@@ -111,37 +67,12 @@ public Integer openFileForWrite(String filename) {
11167
return handle;
11268
}
11369

114-
public List<Hdfs.BlockLocations> openFileForRead(String fileName) {
115-
byte[] openRequest = ProtoMessage.openFileRequest(fileName, Boolean.TRUE);
116-
List<Hdfs.BlockLocations> blockLocations = null;
117-
try {
118-
byte[] res = nn.openFile(openRequest);
119-
Hdfs.OpenFileResponse openResponse = Hdfs.OpenFileResponse.parseFrom(res);
120-
if(openResponse.getStatus() == 1) {
121-
byte[] blockLocationRequest = ProtoMessage.blockLocationRequest(openResponse.getBlockNumsList());
122-
byte[] blockLocationResponse = nn.getBlockLocations(blockLocationRequest);
123-
Hdfs.BlockLocationResponse response = Hdfs.BlockLocationResponse.parseFrom(blockLocationResponse);
124-
blockLocations = response.getBlockLocationsList();
125-
}
126-
} catch (RemoteException | InvalidProtocolBufferException ex) {
127-
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
128-
}
129-
return blockLocations;
130-
}
131-
132-
public void closeFile(Integer handle) {
133-
byte[] closeRequest = ProtoMessage.closeFileRequest(handle);
134-
try {
135-
nn.closeFile(closeRequest);
136-
} catch (Exception e) {}
137-
}
138-
13970
IDataNode getDataNode(String ip, Integer port) {
14071
IDataNode dn = null;
14172
try {
14273
dn = (IDataNode) Naming.lookup("rmi://" + ip
14374
+ ":" + port
144-
+ "/" + DN_PREFIX);
75+
+ "/" + Client.DN_PREFIX);
14576
} catch (NotBoundException | MalformedURLException | RemoteException ex) {
14677
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
14778
}
@@ -152,14 +83,14 @@ public void putFile(String inputFileName, String outputFileName) {
15283

15384
try {
15485
FileInputStream is = new FileInputStream(inputFileName);
155-
byte chunk_data[] = new byte[CHUNK_SIZE];
86+
byte chunk_data[] = new byte[Client.CHUNK_SIZE];
15687
Integer handle = openFileForWrite(outputFileName);
15788
byte[] assignBlockRequest = ProtoMessage.assignBlockRequest(handle);
15889
int read, chunk_number = 0;
15990

16091
while ((read = is.read(chunk_data)) != -1) {
16192
chunk_number++;
162-
log("Chunk number: " + chunk_number);
93+
Client.log("Chunk number: " + chunk_number);
16394
Hdfs.BlockLocations blockLocations = null;
16495
try {
16596
byte[] response = nn.assignBlock(assignBlockRequest);
@@ -169,7 +100,7 @@ public void putFile(String inputFileName, String outputFileName) {
169100
}
170101

171102
IDataNode dn = getDataNode(blockLocations.getLocations(0).getIp(), blockLocations.getLocations(0).getPort());
172-
if(read != CHUNK_SIZE)
103+
if(read != Client.CHUNK_SIZE)
173104
chunk_data = Arrays.copyOfRange(chunk_data, 0, read);
174105

175106
byte[] writeBlockRequest = ProtoMessage.writeBlockRequest(chunk_data, blockLocations);
@@ -189,32 +120,57 @@ public void putFile(String inputFileName, String outputFileName) {
189120
}
190121
}
191122

123+
public void closeFile(Integer handle) {
124+
byte[] closeRequest = ProtoMessage.closeFileRequest(handle);
125+
try {
126+
nn.closeFile(closeRequest);
127+
} catch (Exception e) {}
128+
}
129+
130+
public List<Hdfs.BlockLocations> openFileForRead(String fileName) {
131+
byte[] openRequest = ProtoMessage.openFileRequest(fileName, Boolean.TRUE);
132+
List<Hdfs.BlockLocations> blockLocations = null;
133+
try {
134+
byte[] res = nn.openFile(openRequest);
135+
Hdfs.OpenFileResponse openResponse = Hdfs.OpenFileResponse.parseFrom(res);
136+
if(openResponse.getStatus() == 1) {
137+
byte[] blockLocationRequest = ProtoMessage.blockLocationRequest(openResponse.getBlockNumsList());
138+
byte[] blockLocationResponse = nn.getBlockLocations(blockLocationRequest);
139+
Hdfs.BlockLocationResponse response = Hdfs.BlockLocationResponse.parseFrom(blockLocationResponse);
140+
blockLocations = response.getBlockLocationsList();
141+
}
142+
} catch (RemoteException | InvalidProtocolBufferException ex) {
143+
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
144+
}
145+
return blockLocations;
146+
}
147+
192148
public void getFile(String inputFileName, String outputFileName) {
193149
List<Hdfs.BlockLocations> blockLocations = openFileForRead(inputFileName);
194150
try {
195151
if(blockLocations != null) {
196-
OutputStream os = new FileOutputStream(outputFileName);
197-
for (Hdfs.BlockLocations block: blockLocations) {
198-
Random rand = new Random();
199-
Integer dataNodeInd = rand.nextInt(block.getLocationsCount());
200-
Hdfs.DataNodeLocation dnl = block.getLocations(dataNodeInd);
201-
log("Pulling block " + block.getBlockNumber() + " from DN " + dnl.getIp() + ":" + dnl.getPort());
202-
try {
203-
byte[] request = ProtoMessage.readBlockRequest(block.getBlockNumber());
204-
byte[] response = getDataNode(dnl.getIp(), dnl.getPort()).readBlock(request);
205-
Hdfs.ReadBlockResponse readBlockResponse = Hdfs.ReadBlockResponse.parseFrom(response);
206-
byte[] chunk_data = readBlockResponse.getData(0).toByteArray();
207-
os.write(chunk_data);
208-
} catch (RemoteException | InvalidProtocolBufferException ex) {
209-
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
210-
} catch (IOException ex) {
211-
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
212-
}
152+
try (OutputStream os = new FileOutputStream(outputFileName)) {
153+
blockLocations.stream().forEach((block) -> {
154+
Random rand = new Random();
155+
Integer dataNodeInd = rand.nextInt(block.getLocationsCount());
156+
Hdfs.DataNodeLocation dnl = block.getLocations(dataNodeInd);
157+
Client.log("Pulling block " + block.getBlockNumber() + " from DN " + dnl.getIp() + ":" + dnl.getPort());
158+
try {
159+
byte[] request = ProtoMessage.readBlockRequest(block.getBlockNumber());
160+
byte[] response = getDataNode(dnl.getIp(), dnl.getPort()).readBlock(request);
161+
Hdfs.ReadBlockResponse readBlockResponse = Hdfs.ReadBlockResponse.parseFrom(response);
162+
byte[] chunk_data = readBlockResponse.getData(0).toByteArray();
163+
os.write(chunk_data);
164+
} catch (RemoteException | InvalidProtocolBufferException ex) {
165+
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
166+
} catch (IOException ex) {
167+
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
168+
}
169+
});
213170
}
214-
os.close();
215171
System.out.println("File " + inputFileName + " pulled from mHdfs and stored on local system as " + outputFileName);
216172
} else {
217-
System.out.println("File : " + inputFileName + " does not exist.");
173+
System.out.println("File: " + inputFileName + " does not exist.");
218174
}
219175
} catch (FileNotFoundException ex) {
220176
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
@@ -223,25 +179,120 @@ public void getFile(String inputFileName, String outputFileName) {
223179
}
224180
}
225181

182+
public void listFiles() {
183+
byte[] inp = "*".getBytes();
184+
try {
185+
byte[] res = nn.list(inp);
186+
Hdfs.ListFilesResponse listFilesResponse = Hdfs.ListFilesResponse.parseFrom(res);
187+
if(listFilesResponse.getStatus() == 1) {
188+
listFilesResponse.getFileNamesList().stream().forEach((fileName) -> {
189+
System.out.println(fileName);
190+
});
191+
System.out.println("");
192+
} else {
193+
System.err.println("Some Error Occured During Listing File");
194+
}
195+
} catch (RemoteException | InvalidProtocolBufferException ex) {
196+
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
197+
}
198+
199+
}
200+
201+
@Override
202+
public void run() {
203+
switch(op) {
204+
case PUT:
205+
putFile(inputFilename, outputFilename);
206+
break;
207+
case GET:
208+
getFile(inputFilename, outputFilename);
209+
break;
210+
case LIST:
211+
listFiles();
212+
break;
213+
}
214+
}
215+
216+
}
217+
218+
public class Client {
219+
static final String NN_NAME = "NameNode";
220+
static final String DN_PREFIX = "DataNode";
221+
static final Integer CHUNK_SIZE = 128*1024*1024;
222+
223+
private static final Properties PROPS = new Properties();
224+
225+
private INameNode nn = null;
226+
227+
public static void log(String s) {
228+
String op = String.valueOf(System.currentTimeMillis()) + " ";
229+
op += "[Client] ";
230+
op += ": ";
231+
System.out.println(op + s);
232+
}
233+
234+
public static void main(String args[]) {
235+
try {
236+
PROPS.load(new BufferedReader(new FileReader("config.properties")));
237+
} catch (IOException ex) {
238+
Logger.getLogger(NameNode.class.getName()).log(Level.SEVERE, null, ex);
239+
}
240+
241+
Client client = new Client();
242+
client.findnn();
243+
client.mainloop();
244+
}
245+
246+
public void findnn() {
247+
while(nn == null)
248+
{
249+
try {
250+
nn = (INameNode) Naming.lookup("rmi://" + PROPS.getProperty("rmi.namenode.ip")
251+
+ ":" + PROPS.getProperty("rmi.namenode.port") + "/" + NN_NAME);
252+
log("Found Name Node");
253+
} catch (NotBoundException | MalformedURLException | RemoteException ex) {
254+
//Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
255+
}
256+
if (nn == null)
257+
try {
258+
Thread.sleep(1000);
259+
} catch (Exception e) {}
260+
}
261+
}
262+
226263
public void mainloop() {
264+
ClientOperation clientOperation = null;
227265
Scanner in = new Scanner(System.in);
228266
for(;;) {
267+
System.out.print("Command -> ");
229268
String line = in.nextLine();
230269
String[] ip = line.split(" ");
231-
switch (ip[0]) {
270+
Boolean quit = false;
271+
switch (ip[0].toLowerCase()) {
232272
case "put":
233-
putFile(ip[1], ip[2]);
273+
clientOperation = new ClientOperation(ClientOperation.OperationType.PUT, nn, ip[1], ip[2]);
234274
break;
235275
case "get":
236-
getFile(ip[1], ip[2]);
276+
clientOperation = new ClientOperation(ClientOperation.OperationType.GET, nn, ip[1], ip[2]);
237277
break;
238278
case "list":
239-
listFiles();
279+
clientOperation = new ClientOperation(ClientOperation.OperationType.LIST, nn);
240280
break;
241-
242-
default:
281+
case "exit":
282+
quit = true;
243283
break;
284+
285+
default:
286+
System.out.println("Command " + ip[0] + " not recognized!");
287+
continue;
244288
}
289+
if(quit)
290+
break;
291+
Thread t = new Thread(clientOperation);
292+
t.start();
245293
}
294+
System.out.println("Thank you for using mDFS!");
295+
System.out.println("Please hit Ctrl+C to kill all related services.");
296+
for(;;) {}
246297
}
247298
}

0 commit comments

Comments
 (0)