Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ startTango:

clean:
rm -f *~
rm -rf *.pyc
4 changes: 2 additions & 2 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ class Config:
# Unique prefix that defines VM name space for this Tango
# version. When working in development, this prefix should be your
# unique identifier. The "prod" prefix is reserved for production.
PREFIX = "dev"
PREFIX = "local"

# Default port for the RESTful server to listen on. Port 9090 is
# reserved for production. Port 8080 for the lead developer.
# Other developers should pick their own unique ports.
PORT = 8080
PORT = 3000

# Log file. Setting this to None sends the server output to stdout
LOGFILE = None
Expand Down
7 changes: 4 additions & 3 deletions jobQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time, threading, logging

from config import *
from tangoObjects import *
from worker import Worker

#
Expand Down Expand Up @@ -84,7 +85,7 @@ def add(self, job):
self.queueLock.acquire()
self.jobQueue[job.id] = job
job.trace.append("%s|Added job %s:%d to queue" %
(time.asctime(), job.name, job.id))
(time.ctime(time.time()+time.timezone), job.name, job.id))
self.queueLock.release()
self.log.debug("add|Releasing lock to job queue.")

Expand Down Expand Up @@ -224,7 +225,7 @@ def makeDead(self, id, reason):
del self.jobQueue[id]
if job.trace is None:
job.trace = []
job.trace.append("%s|%s" % (time.asctime(), reason))
job.trace.append("%s|%s" % (time.ctime(time.time()+time.timezone), reason))
self.log.info("Terminated job %s:%d: %s" %
(job.name, job.id, reason))
self.deadJobs[id] = job
Expand Down Expand Up @@ -275,7 +276,7 @@ def __manage(self):
self.log.info("Dispatched job %s:%d [try %d]" %
(job.name, job.id, job.retries))
job.trace.append("%s|Dispatched job %s:%d [try %d]" %
(time.asctime(), job.name, job.id,
(time.ctime(time.time()+time.timezone), job.name, job.id,
job.retries))
vmms = self.vmms[job.vm.vmms] # Create new vmms object
worker = Worker(job, vmms, self.jobQueue, self.preallocator, preVM).start()
Expand Down
2 changes: 1 addition & 1 deletion restful-tango/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/local/bin/python2.7
#!/usr/bin/python

from tornado.ioloop import IOLoop
import tornado.web
Expand Down
47 changes: 30 additions & 17 deletions restful-tango/tangoREST.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# tango-rest.py
# tangoREST.py
#
# Implements open, upload, addJob, and poll to be used for the RESTful
# interface of Tango.
Expand All @@ -9,7 +9,8 @@
from tangod import *
from jobQueue import *
from preallocator import *
from vmms.ec2SSH import *
from vmms.localSSH import *
from tangoObjects import *
from config import Config

class Status:
Expand Down Expand Up @@ -46,10 +47,10 @@ class TangoREST:

# Replace with choice of key store and override validateKey.
# This key is just for testing.
keys = ['403926033d001b5279df37cbbe5287b7c7c267bc']
keys = ['local']

def __init__(self):
self.vmms = {'ec2SSH':Ec2SSH()}
self.vmms = {'localSSH':LocalSSH()}
self.preallocator = Preallocator(self.vmms)
self.queue = JobQueue(self.preallocator)
self.jobManager = JobManager(self.queue, self.vmms, self.preallocator)
Expand Down Expand Up @@ -105,13 +106,13 @@ def computeMD5(self, directory, files):
continue
return result

def createTangoMachine(self, image,
def createTangoMachine(self, image, vmms = "localSSH",
vmObj={'cores': 1, 'memory' : 512}):
""" createTangoMachine - Creates a tango machine object from image
"""
return TangoMachine(
name = image,
vmms = "tashiSSH",
vmms = vmms,
image = "%s.img" % (image),
cores = vmObj["cores"],
memory = vmObj["memory"],
Expand All @@ -121,30 +122,41 @@ def createTangoMachine(self, image,
def convertJobObj(self, dirName, jobObj):
""" convertJobObj - Converts a dictionary into a TangoJob object
"""
# Basic arguments
job = TangoJob()

# TODO: Find something better to do here for job name
job.name = jobObj['jobName']
job.outputFile = "%s/%s/%s/%s" % (self.COURSELABS, dirName,
name = jobObj['jobName']
outputFile = "%s/%s/%s/%s" % (self.COURSELABS, dirName,
self.OUTPUT_FOLDER, jobObj['output_file'])
job.timeout = jobObj['timeout']
timeout = jobObj['timeout']
notifyURL = None
maxOutputFileSize = 512
if 'callback_url' in jobObj:
job.notifyURL = jobObj['callback_url']
notifyURL = jobObj['callback_url']
if 'max_kb' in jobObj:
job.maxOutputFileSize = jobObj['max_kb']
maxOutputFileSize = jobObj['max_kb']

# List of input files
job.input = []
input = []
for file in jobObj['files']:
handinfile = InputFile(
localFile = "%s/%s/%s" % (self.COURSELABS, dirName, file),
destFile = file)
job.input.append(handinfile)
input.append(handinfile)

# VM object
job.vm = self.createTangoMachine(jobObj["image"])
vm = self.createTangoMachine(jobObj["image"])

job = TangoJob(
name = name,
vm = vm,
outputFile = outputFile,
input = input,
timeout = timeout,
notifyURL = notifyURL,
maxOutputFileSize = 512)
self.log.debug("inputFile: %s" % input)
self.log.debug("outputFile: %s" % outputFile)
return job


def convertTangoMachineObj(self, tangoMachine):
""" convertVMObj - Converts a TangoMachine object into a dictionary
Expand Down Expand Up @@ -260,6 +272,7 @@ def addJob(self, key, courselab, jobStr):
jobObj = json.loads(jobStr)
job = self.convertJobObj(labName, jobObj)
jobId = self.tango.addJob(job)
self.log.debug("Done adding job")
if (jobId == -1):
self.log.info("Failed to add job to tango")
return self.status.create(-1, job.trace)
Expand Down
49 changes: 49 additions & 0 deletions tangoObjects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# tangoREST.py
#
# Implements objects used to pass state within Tango.
#

class InputFile():
"""
InputFile - Stores pointer to the path on the local machine and the
name of the file on the destination machine
"""
def __init__(self, localFile, destFile):
self.localFile = localFile
self.destFile = destFile

class TangoMachine():
"""
TangoMachine - A description of the Autograding Virtual Machine
"""
def __init__(self, name = "LocalVM", image = None, vmms = "localSSH",
network = None, cores = None, memory = None, disk = None,
domain_name = None, ec2_id = None):
self.name = name
self.image = image
self.network = network
self.cores = cores
self.memory = memory
self.disk = disk
self.vmms = vmms
self.domain_name = domain_name
self.ec2_id = ec2_id

class TangoJob():
"""
TangoJob - A job that is to be run on a TangoMachine
"""
def __init__(self, assigned = False, retries = 0, vm = None,
outputFile = None, name = None, input = [],
notifyURL = None, timeout = 0, trace = None,
maxOutputFileSize = 512):
self.assigned = assigned
self.retries = retries
self.vm = vm
self.input = input
self.outputFile = outputFile
self.name = name
self.notifyURL = notifyURL
self.timeout = timeout
self.trace = trace
self.maxOutputFileSize = maxOutputFileSize
60 changes: 31 additions & 29 deletions tangod.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
from config import *
from preallocator import *
from jobQueue import *
from vmms.ec2SSH import *
from vmms.localSSH import *
from tangoObjects import *

class tangoServer:
""" tangoServer - Defines the RPC calls that the server accepts
Expand Down Expand Up @@ -217,63 +218,64 @@ def validateJob(job, vmms):
if not job.name:
log.error("validateJob: Missing job.name")
job.trace.append("%s|validateJob: Missing job.name" %
(time.asctime()))
(time.ctime(time.time()+time.timezone)))
errors += 1

# Check the virtual machine field
if not job.vm:
log.error("validateJob: Missing job.vm")
job.trace.append("%s|validateJob: Missing job.vm" %
(time.asctime()))
(time.ctime(time.time()+time.timezone)))
errors += 1
else:
if not job.vm.image:
log.error("validateJob: Missing job.vm.image")
job.trace.append("%s|validateJob: Missing job.vm.image" %
(time.asctime()))
(time.ctime(time.time()+time.timezone)))
errors += 1
else:
# Check if VM name exists in Tashi directory
imgList = os.listdir(Config.TASHI_IMAGE_PATH)
imgPath = Config.TASHI_IMAGE_PATH + job.vm.image
if job.vm.image not in imgList:
log.error("validateJob: Image not found: %s" % job.vm.image)
job.trace.append("%s|validateJob: Image not found: %s" %
(time.asctime(), job.vm.image))
errors += 1
# Check if image has read permissions
elif not (os.stat(imgPath).st_mode & stat.S_IRUSR):
log.error("validateJob: Not readable: %s" % job.vm.image)
job.trace.append("%s|validateJob: Not readable: %s" %
(time.asctime(), job.vm.image))
errors += 1
else:
(base, ext) = os.path.splitext(job.vm.image)
job.vm.name = base;
if job.vm.vmms == "tashiSSH":
# Check if VM name exists in Tashi directory
imgList = os.listdir(Config.TASHI_IMAGE_PATH)
imgPath = Config.TASHI_IMAGE_PATH + job.vm.image
if job.vm.image not in imgList:
log.error("validateJob: Image not found: %s" % job.vm.image)
job.trace.append("%s|validateJob: Image not found: %s" %
(time.ctime(time.time()+time.timezone), job.vm.image))
errors += 1
# Check if image has read permissions
elif not (os.stat(imgPath).st_mode & stat.S_IRUSR):
log.error("validateJob: Not readable: %s" % job.vm.image)
job.trace.append("%s|validateJob: Not readable: %s" %
(time.ctime(time.time()+time.timezone), job.vm.image))
errors += 1
else:
(base, ext) = os.path.splitext(job.vm.image)
job.vm.name = base;

if not job.vm.vmms:
log.error("validateJob: Missing job.vm.vmms")
job.trace.append("%s|validateJob: Missing job.vm.vmms" %
(time.asctime()))
(time.ctime(time.time()+time.timezone)))
errors += 1
else:
if job.vm.vmms not in vmms:
log.error("validateJob: Invalid vmms name: %s" % job.vm.vmms)
job.trace.append("%s|validateJob: Invalid vmms name: %s" %
(time.asctime(), job.vm.vmms))
(time.ctime(time.time()+time.timezone), job.vm.vmms))
errors += 1

# Check the output file
if not job.outputFile:
log.error("validateJob: Missing job.outputFile")
job.trace.append("%s|validateJob: Missing job.outputFile" %
(time.asctime()))
(time.ctime(time.time()+time.timezone)))
errors += 1
else:
if not os.path.exists(os.path.dirname(job.outputFile)):
log.error("validateJob: Bad output path: %s", job.outputFile)
job.trace.append("%s|validateJob: Bad output path: %s" %
(time.asctime(), job.outputFile))
(time.ctime(time.time()+time.timezone), job.outputFile))
errors += 1

# Check for max output file size parameter
Expand All @@ -287,14 +289,14 @@ def validateJob(job, vmms):
if not inputFile.localFile:
log.error("validateJob: Missing inputFile.localFile")
job.trace.append("%s|validateJob: Missing inputFile.localFile" %
(time.asctime()))
(time.ctime(time.time()+time.timezone)))
errors += 1
else:
if not os.path.exists(inputFile.localFile):
log.error("validateJob: Input file %s not found" %
(inputFile.localFile))
job.trace.append("%s|validateJob: Input file %s not found" %
(time.asctime(), inputFile.localFile))
(time.ctime(time.time()+time.timezone), inputFile.localFile))
errors += 1

# Check if job timeout has been set; If not set timeout to default
Expand All @@ -307,7 +309,7 @@ def validateJob(job, vmms):
if errors > 0:
log.error("validateJob: Job rejected: %d errors" % errors)
job.trace.append("%s|validateJob: Job rejected: %d errors" %
(time.asctime(), errors))
(time.ctime(time.time()+time.timezone), errors))
return -1
else:
return 0
Expand Down Expand Up @@ -336,7 +338,7 @@ def main():

# Initialize the hash of supported VMM systems.
# vmms = {'tashiSSH':TashiSSH()}
vmms = {'tashiSSH':TashiSSH()}
vmms = {'localSSH':LocalSSH()}

# Instantiate the main components in the service
preallocator = Preallocator(vmms)
Expand Down
2 changes: 1 addition & 1 deletion vmms/ec2SSH.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# ec2SSH.py - Implements the Tango VMMS interface.
# ec2SSH.py - Implements the Tango VMMS interface to run Tango jobs on Amazon EC2.
#
# This implementation uses the AWS EC2 SDK to manage the virtual machines and
# ssh and scp to access them. The following excecption are raised back
Expand Down
Loading