2727import java .rmi .registry .LocateRegistry ;
2828import java .rmi .server .UnicastRemoteObject ;
2929import java .util .ArrayList ;
30- import java .util .HashMap ;
3130import java .util .List ;
3231import java .util .Objects ;
3332import java .util .Properties ;
3433import java .util .Random ;
3534import java .util .Set ;
35+ import java .util .concurrent .ConcurrentHashMap ;
36+ import java .util .concurrent .Semaphore ;
3637import java .util .logging .Level ;
3738import java .util .logging .Logger ;
3839
@@ -50,21 +51,35 @@ public class NameNode extends UnicastRemoteObject implements INameNode {
5051 private static final Integer REP_FACTOR = 2 ; //Replication Factor in DNs
5152
5253 private static Integer DN_COUNT = -1 ;
54+ private static final Semaphore blockLock = new Semaphore (1 , true );
5355 private static Integer globalBlockCounter = 0 ;
56+ private static final Semaphore fileLock = new Semaphore (1 , true );
5457 private static Integer globalFileCounter = 0 ;
5558 private static Properties props = new Properties ();
5659
57- private static final HashMap <Integer , IDataNode > dns = new HashMap <>();
58- private static final HashMap <Integer , DataNodeLocation > dnLocations = new HashMap <>();
59- private static final HashMap <Integer , String > handleToOpenFileName = new HashMap <>();
60- private static final HashMap <String , Integer > fileNameToHandle = new HashMap <>();
61- private static final HashMap <Integer , ArrayList <Integer > > handleToBlocks = new HashMap <>();
62- private static final HashMap <Integer , ArrayList <DataNodeLocation > > blockToDnLocations = new HashMap <>();
60+ private static final ConcurrentHashMap <Integer , IDataNode > dns = new ConcurrentHashMap <>();
61+ private static final ConcurrentHashMap <Integer , DataNodeLocation > dnLocations = new ConcurrentHashMap <>();
62+ private static final ConcurrentHashMap <Integer , String > handleToOpenFileName = new ConcurrentHashMap <>();
63+ private static final ConcurrentHashMap <String , Integer > fileNameToHandle = new ConcurrentHashMap <>();
64+ private static final ConcurrentHashMap <Integer , ArrayList <Integer > > handleToBlocks = new ConcurrentHashMap <>();
65+ private static final ConcurrentHashMap <Integer , ArrayList <DataNodeLocation > > blockToDnLocations = new ConcurrentHashMap <>();
6366
6467 NameNode () throws RemoteException {
6568 super ();
6669 }
6770
71+ private static void acquireLock (Semaphore s ) {
72+ try {
73+ s .acquire ();
74+ } catch (InterruptedException ex ) {
75+ Logger .getLogger (NameNode .class .getName ()).log (Level .SEVERE , null , ex );
76+ }
77+ }
78+
79+ private static void releaseLock (Semaphore s ) {
80+ s .release ();
81+ }
82+
6883 public static void log (String s ) {
6984 String op = String .valueOf (System .currentTimeMillis ()) + " " ;
7085 op += "[NameNode] " ;
@@ -73,8 +88,10 @@ public static void log(String s) {
7388 }
7489
7590 private Integer createFile (String fileName ) throws IOException {
91+ acquireLock (fileLock );
7692 Integer handle = globalFileCounter ;
7793 globalFileCounter ++;
94+ releaseLock (fileLock );
7895 handleToOpenFileName .put (handle , fileName );
7996
8097 String blockFile = DIRECTORY + handle .toString () + ".txt" ;
@@ -180,11 +197,11 @@ public byte[] openFile(byte[] inp) throws RemoteException {
180197 try {
181198 openFileRequest = Hdfs .OpenFileRequest .parseFrom (inp );
182199 if (openFileRequest .getForRead () == false ) {
200+ Integer handle = createFile (openFileRequest .getFileName ());
183201 log ("Opening file '"
184202 + openFileRequest .getFileName ()
185203 + "' for Writing with handle "
186- + globalFileCounter .toString ());
187- Integer handle = createFile (openFileRequest .getFileName ());
204+ + handle .toString ());
188205 // ^ TODO Check for errors while creating file and send response accordingly
189206 byte [] openFileResponse = ProtoMessage .openFileResponse (1 , handle );
190207 return openFileResponse ;
@@ -248,7 +265,11 @@ public byte[] assignBlock(byte[] inp) throws RemoteException {
248265 try {
249266 assignBlockRequest = Hdfs .AssignBlockRequest .parseFrom (inp );
250267 Integer handle = assignBlockRequest .getHandle ();
251- addBlockToHandle (handle , globalBlockCounter );
268+ acquireLock (blockLock );
269+ Integer assignedBlockNumber = globalBlockCounter ;
270+ globalBlockCounter ++;
271+ releaseLock (blockLock );
272+ addBlockToHandle (handle , assignedBlockNumber );
252273 ArrayList <String > ips = new ArrayList <>();
253274 ArrayList <Integer > ports = new ArrayList <>();
254275 ArrayList <Integer > dnIds = new ArrayList <>();
@@ -263,15 +284,14 @@ public byte[] assignBlock(byte[] inp) throws RemoteException {
263284 DataNodeLocation dnl = dnLocations .get (dataNodeId );
264285 ips .add (dnl .ip );
265286 ports .add (dnl .port );
266- addDnLocationToBlock (globalBlockCounter , dnl );
287+ addDnLocationToBlock (assignedBlockNumber , dnl );
267288 }
268289 }
269290 log ("Handle " + handle .toString ()
270- + " assigned Block " + globalBlockCounter .toString ()
291+ + " assigned Block " + assignedBlockNumber .toString ()
271292 + " assigned DNs: " + dnIds .toString ());
272293
273- ret = ProtoMessage .assignBlockResponse (1 , globalBlockCounter , ips , ports );
274- globalBlockCounter ++;
294+ ret = ProtoMessage .assignBlockResponse (1 , assignedBlockNumber , ips , ports );
275295 } catch (InvalidProtocolBufferException ex ) {
276296 Logger .getLogger (NameNode .class .getName ()).log (Level .SEVERE , null , ex );
277297 } catch (IOException ex ) {
0 commit comments