Skip to content

abhijithrm/hjhjh

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 

Repository files navigation

Drone handler:

package com.odafa.dronecloudapp.service;

import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;

import com.odafa.dronecloudapp.dto.DataPoint; import com.odafa.dronecloudapp.dto.DroneInfo; import com.odafa.dronecloudapp.utils.DataMapper;

import lombok.extern.slf4j.Slf4j;

@Slf4j public class DroneHandler { private static final long MAX_WAIT_TIME = 10_000L; private final String droneId;

private volatile long lastUpdateTime;
private DroneInfo lastStatus;
private  String latestDroneConsoleLogMessage;

private final Socket droneSocket;
private final InputStream streamIn;
private final OutputStream streamOut;
private final InputStream clientLogStreamIn;
private final Socket clientLogStreamerSocket;

private final BlockingQueue<byte[]> indoxMessageBuffer;
private final BlockingQueue<String> clientConsoleLogMessageBuffer;
private final ExecutorService handlerExecutor;
private final ControlManager manager;

public DroneHandler(ControlManager controlManager, Socket clientSocket, Socket clientLogStreamer) {
	this.manager = controlManager;
	this.droneSocket = clientSocket;
	this.clientLogStreamerSocket = clientLogStreamer;
	this.indoxMessageBuffer = new ArrayBlockingQueue<>(1024);
	this.clientConsoleLogMessageBuffer = new ArrayBlockingQueue<>(1024);
	this.handlerExecutor = Executors.newFixedThreadPool(3);

	try {
		this.streamIn  = droneSocket.getInputStream();
		this.streamOut = droneSocket.getOutputStream();
		this.clientLogStreamIn = clientLogStreamerSocket.getInputStream();
		droneId = DataMapper.extractDroneIdFromNetwork(droneSocket);
	} catch (Exception e) {
		close();
		throw new RuntimeException(e);
	}
	manager.setControlHadlerForDroneId(droneId, this);
	log.info("Control Connection Established ID {}, IP {} ", droneId, droneSocket.getInetAddress().toString());
}

public void activate() {
	
	handlerExecutor.execute( () -> {
		while (!droneSocket.isClosed()) {
			try {
				this.lastStatus = DataMapper.fromNetworkToDroneInfo(streamIn);
				this.lastUpdateTime = System.currentTimeMillis();
			} catch (Exception e) {
				log.info("Control Connection with {} Closed, reason: {}", droneSocket.getInetAddress().toString(), e.getMessage());
				close();
			}
		}
		close();
	});

	handlerExecutor.execute( () -> {
		while (!droneSocket.isClosed()) {
			try {
				streamOut.write( indoxMessageBuffer.take());
				streamOut.flush();
			} catch (SocketException se) {
				log.info("Socket has been closed: {}", se.getMessage());
				close();
			} catch (Exception e) {
				log.error(e.getMessage());
			}
		}
	});
}

public void startLogStreaming()
{

	handlerExecutor.execute(()->{
		while (!clientLogStreamerSocket.isClosed()) {
			try {
				 this.latestDroneConsoleLogMessage = DataMapper.fromNetworkMessageToDroneConsoleLog(clientLogStreamIn);
				 this.clientConsoleLogMessageBuffer.put(this.latestDroneConsoleLogMessage);
				//this.lastUpdateTime = System.currentTimeMillis();
			} catch (Exception e) {
				log.info("Log streaming socket connection with {} closed, reason: {}", clientLogStreamerSocket.getInetAddress().toString(), e.getMessage());
				close();
			}
		}
		close();
	});

}

public void sendMissionData(List<DataPoint> dataPoints) {
	final byte[] message = DataMapper.toNetworkMessage(dataPoints);
	this.indoxMessageBuffer.add(message);

	log.debug("Sending Mission Data: {}", dataPoints);
}

public void sendCommand(int commandCode) {
	final byte[] message = DataMapper.toNetworkMessage(commandCode);
	this.indoxMessageBuffer.add(message);
	
	log.debug("Sending Command Code: {} For Drone ID {}", commandCode, droneId);
}

public DroneInfo getDroneLastStatus() {
	if (isMaxWaitTimeExceeded()) {
		log.warn("Maximum Wait Time for Drone ID {} exceeded. Control socket closed", droneId);
		close();
	}
	return this.lastStatus;
}

public String getLatestClientConsoleLogMessage()

{ return this.latestDroneConsoleLogMessage; }

public BlockingQueue getClientConsoleLogMessageQueue() { return this.clientConsoleLogMessageBuffer; }

private boolean isMaxWaitTimeExceeded() {
	return System.currentTimeMillis() - lastUpdateTime > MAX_WAIT_TIME;
}

private void close() {
	try {
		droneSocket.close();
	} catch (Exception e) {
		log.error(e.getMessage());
	} 
	try {
		streamIn.close();
	} catch (Exception e) {
		log.error(e.getMessage());
	} 
	try {
		streamOut.close();
	} catch (Exception e) {
		log.error(e.getMessage());
	} 
	try {
		clientLogStreamerSocket.close();
	} catch (Exception e) {
		log.error(e.getMessage());
	}
	try {
		clientLogStreamIn.close();
	} catch (Exception e) {
		log.error(e.getMessage());
	}
	manager.removeControlHadlerForDroneId(droneId);
	handlerExecutor.shutdownNow(); 
}

}

control man: package com.odafa.dronecloudapp.service;

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;

import com.odafa.dronecloudapp.configuration.ConfigReader; import com.odafa.dronecloudapp.dto.DataPoint; import com.odafa.dronecloudapp.dto.DroneInfo; import com.odafa.dronecloudapp.utils.DataMapper;

import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j @Component public class ControlManager implements Runnable { private final ServerSocket serverSocket; private final ServerSocket logStreamerSocket; private final ExecutorService serverRunner;

private final Map<String, DroneHandler> droneIdToHandler;

public ControlManager(ConfigReader configurations) {
	try {
		serverSocket = new ServerSocket( configurations.getControlServerPort());
		logStreamerSocket = new ServerSocket(configurations.getLogStreamerPort());
	} catch (IOException e) {
		log.error(e.getMessage());
		throw new RuntimeException(e);
	}
	serverRunner = Executors.newSingleThreadExecutor();
	droneIdToHandler = new ConcurrentHashMap<>();

	serverRunner.execute(this);
} 

public void run() {
	while (!serverSocket.isClosed() && !logStreamerSocket.isClosed()) {
		try 
		{
			Socket clientSocket = serverSocket.accept();
			Socket clientLogStreamerSocket = logStreamerSocket.accept();//Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made. 

			final DroneHandler handler = new DroneHandler(this, clientSocket, clientLogStreamerSocket);
			handler.activate();
			String droneIdFromClientLogStreamer = DataMapper.extractDroneIdFromNetwork(clientLogStreamerSocket);
			while(true)
			{
				if(droneIdToHandler.containsKey(droneIdFromClientLogStreamer))
				{
					droneIdToHandler.get(droneIdFromClientLogStreamer).startLogStreaming();
					break;
				}
			}

		}
		catch (Exception e) 
		{
			log.error(e.getMessage());
		}
	}
}

public void sendMissionDataToDrone(String droneId, List<DataPoint> dataPoints) {
	final DroneHandler handler = droneIdToHandler.get(droneId);
	if(handler != null) {
		handler.sendMissionData(dataPoints);
	}
}

public void sendMessageFromUserIdToDrone(String droneId, int commandCode) {
	final DroneHandler handler = droneIdToHandler.get(droneId);
	if(handler != null) {
		handler.sendCommand(commandCode);
	}
}

public List<DroneInfo> getDroneStatusAll() {
	List<DroneInfo> drones = new ArrayList<>();

	droneIdToHandler.values().forEach( handler -> {
		drones.add(handler.getDroneLastStatus());
	});
	return drones;
}

public void setControlHadlerForDroneId(String droneId, DroneHandler handler) {
	droneIdToHandler.put(droneId, handler);
}

public void removeControlHadlerForDroneId(String droneId) {
	droneIdToHandler.remove(droneId);
}

}

data mapper: package com.odafa.dronecloudapp.utils;

import java.io.InputStream; import java.net.Socket; import java.util.List;

import com.odafa.dronecloudapp.dto.DataPoint; import com.odafa.dronecloudapp.dto.DroneInfo;

public class DataMapper {

private static final int START_MISSION_CODE = 14;

public static String extractDroneIdFromNetwork(Socket droneSocket) throws Exception {
	return new String( NetworkFormatter.readNetworkMessage( droneSocket.getInputStream()));
}

public static DroneInfo fromNetworkToDroneInfo(InputStream streamIn) throws Exception {
	byte[] result = NetworkFormatter.readNetworkMessage(streamIn);
	final ProtoData.DroneData droneData = ProtoData.DroneData.parseFrom(result);
	final float speedInKmH = droneData.getSpeed() * 3.6f;

	return new DroneInfo(droneData.getDroneId(), droneData.getLatitude(), droneData.getLongitude(), speedInKmH,
			                droneData.getAltitude(), droneData.getVoltage(), droneData.getState());
}

public static String fromNetworkMessageToDroneConsoleLog(InputStream clientLogInputStream) throws Exception
{
	byte[] result = NetworkFormatter.readNetworkMessage(clientLogInputStream);
	return result.toString();
}

public static byte[] toNetworkMessage(List<DataPoint> dataPoints) {

	ProtoData.MissionData.Builder missionData = ProtoData.MissionData.newBuilder();

	for (DataPoint point : dataPoints) {
		missionData.addPoint( ProtoData.DataPoint.newBuilder()
				                                 .setLatitude(point.getLat())
				                                 .setLongitude(point.getLng())
				                                 .setSpeed(point.getSpeed())
				                                 .setAltitude(point.getHeight())
				                                 .setAction(point.getAction())
				                                 .build());
	}

	byte[] missionDataArr = ProtoData.Command.newBuilder().setCode(START_MISSION_CODE)
	                                         .setPayload( missionData.build().toByteString())
	                                         .build().toByteArray();

	return NetworkFormatter.createNetworkMessage(missionDataArr);
}

public static byte[] toNetworkMessage(int commandCode) {
	byte[] command = ProtoData.Command.newBuilder().setCode(commandCode).build().toByteArray();
	return NetworkFormatter.createNetworkMessage(command);
}

}

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published