Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 105 additions & 118 deletions src/DIRAC/Resources/Computing/SSHComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@

**Code Documentation**
"""
import os
import errno
import json
import stat
import os
import pexpect
import shutil
import errno
import stat
from urllib.parse import urlparse
from urllib.parse import quote
from urllib.parse import unquote
Expand Down Expand Up @@ -136,46 +137,36 @@ def __ssh_call(self, command, timeout):
if not timeout:
timeout = 999

ssh_newkey = "Are you sure you want to continue connecting"
try:
import pexpect

ssh_newkey = "Are you sure you want to continue connecting"
try:
child = pexpect.spawn(command, timeout=timeout, encoding="utf-8")
i = child.expect([pexpect.TIMEOUT, ssh_newkey, pexpect.EOF, "assword: "])
child = pexpect.spawn(command, timeout=timeout, encoding="utf-8")
i = child.expect([pexpect.TIMEOUT, ssh_newkey, pexpect.EOF, "assword: "])
if i == 0: # Timeout
return S_OK((-1, child.before, "SSH login failed"))

if i == 1: # SSH does not have the public key. Just accept it.
child.sendline("yes")
child.expect("assword: ")
i = child.expect([pexpect.TIMEOUT, "assword: "])
if i == 0: # Timeout
return S_OK((-1, child.before, "SSH login failed"))
elif i == 1: # SSH does not have the public key. Just accept it.
child.sendline("yes")
child.expect("assword: ")
i = child.expect([pexpect.TIMEOUT, "assword: "])
if i == 0: # Timeout
return S_OK((-1, str(child.before) + str(child.after), "SSH login failed"))
elif i == 1:
child.sendline(self.password)
child.expect(pexpect.EOF)
return S_OK((0, child.before, ""))
elif i == 2:
# Passwordless login, get the output
return S_OK((0, child.before, ""))

if self.password:
return S_OK((-1, str(child.before) + str(child.after), "SSH login failed"))
if i == 1:
child.sendline(self.password)
child.expect(pexpect.EOF)
return S_OK((0, child.before, ""))
return S_ERROR((-2, child.before, ""))
except Exception as x:
res = (-1, f"Encountered exception {Exception}: {str(x)}")
return S_ERROR(res)
except BaseException:
from DIRAC.Core.Utilities.Subprocess import shellCall

# Try passwordless login
result = shellCall(timeout, command)
# print ( "!!! SSH command: %s returned %s\n" % (command, result) )
if result["Value"][0] == 255:
return S_ERROR((-1, f"Cannot connect to host {self.host}", ""))
return result

if i == 2:
# Passwordless login, get the output
return S_OK((0, child.before, ""))

if self.password:
child.sendline(self.password)
child.expect(pexpect.EOF)
return S_OK((0, child.before, ""))

return S_ERROR(f"Unknown error: {child.before}")
except Exception as x:
return S_ERROR(f"Encountered exception: {str(x)}")

def sshCall(self, timeout, cmdSeq):
"""Execute remote command via a ssh remote call
Expand Down Expand Up @@ -423,7 +414,7 @@ def _prepareRemoteHost(self, host=None):
self.log.verbose(f"Creating working directories on {self.ceParameters['SSHHost']}")
result = ssh.sshCall(30, cmd)
if not result["OK"]:
self.log.error("Failed creating working directories", f"({result['Message'][1]})")
self.log.error("Failed creating working directories", f"({result['Message']})")
return result
status, output, _error = result["Value"]
if status == -1:
Expand All @@ -443,7 +434,7 @@ def _prepareRemoteHost(self, host=None):
remoteScript = f"{self.sharedArea}/execute_batch"
result = ssh.scpCall(30, localScript, remoteScript, postUploadCommand=f"chmod +x {remoteScript}")
if not result["OK"]:
self.log.warn(f"Failed uploading control script: {result['Message'][1]}")
self.log.warn(f"Failed uploading control script: {result['Message']}")
return result
status, output, _error = result["Value"]
if status != 0:
Expand Down Expand Up @@ -634,13 +625,11 @@ def killJob(self, jobIDList):

def _killJobOnHost(self, jobIDList, host=None):
"""Kill the jobs for the given list of job IDs"""
jobDict = {}
for job in jobIDList:
stamp = os.path.basename(urlparse(job).path)
jobDict[stamp] = job
stampList = list(jobDict)
batchSystemJobList = []
for jobID in jobIDList:
batchSystemJobList.append(os.path.basename(urlparse(jobID.split(":::")[0]).path))

commandOptions = {"JobIDList": stampList, "User": self.user}
commandOptions = {"JobIDList": batchSystemJobList, "User": self.user}
resultCommand = self.__executeHostCommand("killJob", commandOptions, host=host)
if not resultCommand["OK"]:
return resultCommand
Expand All @@ -654,18 +643,6 @@ def _killJobOnHost(self, jobIDList, host=None):

return S_OK(len(result["Successful"]))

def _getHostStatus(self, host=None):
"""Get jobs running at a given host"""
resultCommand = self.__executeHostCommand("getCEStatus", {}, host=host)
if not resultCommand["OK"]:
return resultCommand

result = resultCommand["Value"]
if result["Status"] != 0:
return S_ERROR(f"Failed to get CE status: {result['Message']}")

return S_OK(result)

def getCEStatus(self):
"""Method to return information on running and pending jobs."""
result = S_OK()
Expand All @@ -686,21 +663,31 @@ def getCEStatus(self):

return result

def _getHostStatus(self, host=None):
"""Get jobs running at a given host"""
resultCommand = self.__executeHostCommand("getCEStatus", {}, host=host)
if not resultCommand["OK"]:
return resultCommand

result = resultCommand["Value"]
if result["Status"] != 0:
return S_ERROR(f"Failed to get CE status: {result['Message']}")

return S_OK(result)

def getJobStatus(self, jobIDList):
"""Get the status information for the given list of jobs"""
return self._getJobStatusOnHost(jobIDList)

def _getJobStatusOnHost(self, jobIDList, host=None):
"""Get the status information for the given list of jobs"""

resultDict = {}
jobDict = {}
for job in jobIDList:
stamp = os.path.basename(urlparse(job).path)
jobDict[stamp] = job
stampList = list(jobDict)
batchSystemJobDict = {}
for jobID in jobIDList:
batchSystemJobID = os.path.basename(urlparse(jobID.split(":::")[0]).path)
batchSystemJobDict[batchSystemJobID] = jobID

for jobList in breakListIntoChunks(stampList, 100):
for jobList in breakListIntoChunks(list(batchSystemJobDict), 100):
resultCommand = self.__executeHostCommand("getJobStatus", {"JobIDList": jobList}, host=host)
if not resultCommand["OK"]:
return resultCommand
Expand All @@ -709,14 +696,54 @@ def _getJobStatusOnHost(self, jobIDList, host=None):
if result["Status"] != 0:
return S_ERROR(f"Failed to get job status: {result['Message']}")

for stamp in result["Jobs"]:
resultDict[jobDict[stamp]] = result["Jobs"][stamp]
for batchSystemJobID in result["Jobs"]:
resultDict[batchSystemJobDict[batchSystemJobID]] = result["Jobs"][batchSystemJobID]

return S_OK(resultDict)

def getJobOutput(self, jobID, localDir=None):
"""Get the specified job standard output and error files. If the localDir is provided,
the output is returned as file in this directory. Otherwise, the output is returned
as strings.
"""
self.log.verbose("Getting output for jobID", jobID)
result = self._getJobOutputFiles(jobID)
if not result["OK"]:
return result

batchSystemJobID, host, outputFile, errorFile = result["Value"]

if localDir:
localOutputFile = f"{localDir}/{batchSystemJobID}.out"
localErrorFile = f"{localDir}/{batchSystemJobID}.err"
else:
localOutputFile = "Memory"
localErrorFile = "Memory"

# Take into account the SSHBatch possible SSHHost syntax
host = host.split("/")[0]

ssh = SSH(host=host, parameters=self.ceParameters)
resultStdout = ssh.scpCall(30, localOutputFile, outputFile, upload=False)
if not resultStdout["OK"]:
return resultStdout

resultStderr = ssh.scpCall(30, localErrorFile, errorFile, upload=False)
if not resultStderr["OK"]:
return resultStderr

if localDir:
output = localOutputFile
error = localErrorFile
else:
output = resultStdout["Value"][1]
error = resultStderr["Value"][1]

return S_OK((output, error))

def _getJobOutputFiles(self, jobID):
"""Get output file names for the specific CE"""
jobStamp = os.path.basename(urlparse(jobID).path)
batchSystemJobID = os.path.basename(urlparse(jobID.split(":::")[0]).path)
# host can be retrieved from the path of the jobID
# it might not be present, in this case host is an empty string and will be defined by the CE parameters later
host = os.path.dirname(urlparse(jobID).path).lstrip("/")
Expand All @@ -726,19 +753,19 @@ def _getJobOutputFiles(self, jobID):
self.errorTemplate = self.ceParameters["ErrorTemplate"]

if self.outputTemplate:
output = self.outputTemplate % jobStamp
error = self.errorTemplate % jobStamp
output = self.outputTemplate % batchSystemJobID
error = self.errorTemplate % batchSystemJobID
elif "OutputTemplate" in self.ceParameters:
self.outputTemplate = self.ceParameters["OutputTemplate"]
self.errorTemplate = self.ceParameters["ErrorTemplate"]
output = self.outputTemplate % jobStamp
error = self.errorTemplate % jobStamp
output = self.outputTemplate % batchSystemJobID
error = self.errorTemplate % batchSystemJobID
elif hasattr(self.batchSystem, "getJobOutputFiles"):
# numberOfNodes is treated as a string as it can contain values such as "2-4"
# where 2 would represent the minimum number of nodes to allocate, and 4 the maximum
numberOfNodes = self.ceParameters.get("NumberOfNodes", "1")
commandOptions = {
"JobIDList": [jobStamp],
"JobIDList": [batchSystemJobID],
"OutputDir": self.batchOutput,
"ErrorDir": self.batchError,
"NumberOfNodes": numberOfNodes,
Expand All @@ -755,50 +782,10 @@ def _getJobOutputFiles(self, jobID):
self.outputTemplate = result["OutputTemplate"]
self.errorTemplate = result["ErrorTemplate"]

output = result["Jobs"][jobStamp]["Output"]
error = result["Jobs"][jobStamp]["Error"]
else:
output = f"{self.batchOutput}/{jobStamp}.out"
error = f"{self.batchError}/{jobStamp}.err"

return S_OK((jobStamp, host, output, error))

def getJobOutput(self, jobID, localDir=None):
"""Get the specified job standard output and error files. If the localDir is provided,
the output is returned as file in this directory. Otherwise, the output is returned
as strings.
"""
self.log.verbose("Getting output for jobID", jobID)
result = self._getJobOutputFiles(jobID)
if not result["OK"]:
return result

jobStamp, host, outputFile, errorFile = result["Value"]

if localDir:
localOutputFile = f"{localDir}/{jobStamp}.out"
localErrorFile = f"{localDir}/{jobStamp}.err"
else:
localOutputFile = "Memory"
localErrorFile = "Memory"

# Take into account the SSHBatch possible SSHHost syntax
host = host.split("/")[0]

ssh = SSH(host=host, parameters=self.ceParameters)
resultStdout = ssh.scpCall(30, localOutputFile, outputFile, upload=False)
if not resultStdout["OK"]:
return resultStdout

resultStderr = ssh.scpCall(30, localErrorFile, errorFile, upload=False)
if not resultStderr["OK"]:
return resultStderr

if localDir:
output = localOutputFile
error = localErrorFile
output = result["Jobs"][batchSystemJobID]["Output"]
error = result["Jobs"][batchSystemJobID]["Error"]
else:
output = resultStdout["Value"][1]
error = resultStderr["Value"][1]
output = f"{self.batchOutput}/{batchSystemJobID}.out"
error = f"{self.batchError}/{batchSystemJobID}.err"

return S_OK((output, error))
return S_OK((batchSystemJobID, host, output, error))