Skip to content

Solve issue with losing track of SLURM jobs using cephfs #5976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,10 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
* http://superuser.com/questions/422061/how-to-determine-whether-a-directory-is-on-an-nfs-mounted-drive
*/
workDirList = FileHelper.listDirectory(task.workDir)
log.trace "JobId `$jobId`: running FileHelper.listDirectory()"
}


/*
* when the file does not exist return null, to force the monitor to continue to wait
*/
Expand All @@ -340,6 +342,7 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
else
log.trace "JobId `$jobId` exit file: ${exitFile.toUriString()} - lastModified: ${exitAttrs?.lastModifiedTime()} - size: ${exitAttrs?.size()}"
}

// -- fetch the job status before return a result
final active = executor.checkActiveStatus(jobId, queue)

Expand Down Expand Up @@ -382,6 +385,18 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
return Integer.MAX_VALUE
}

// refresh cephfs metadata on workDir
// run sync on the filesystem that holds the task.workDir
// run simpleFind with depth 0 (stay on level) on both,
// the task.workDir parent and the task.workDir itself
if(FileHelper.getPathFsType(task.workDir) == 'ceph') {
log.trace "JobId `$jobId`: force refresh CEPH metadata"
final syncStatus = FileHelper.syncFS(task.workDir)
final findStatusParent = FileHelper.simpleFind(task.workDir, 0, true)
final findStatus = FileHelper.simpleFind(task.workDir, 0, false)
}


/*
* read the exit file, it should contain the executed process exit status
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import nextflow.SysEnv
import nextflow.exception.ProcessSubmitTimeoutException
import nextflow.executor.BatchCleanup
import nextflow.executor.GridTaskHandler
import nextflow.file.FileHelper
import nextflow.util.Duration
import nextflow.util.SysHelper
import nextflow.util.Threads
Expand Down Expand Up @@ -421,7 +422,19 @@ class TaskPollingMonitor implements TaskMonitor {

// if no task has been submitted wait for a new slot to be available
if( !processed ) {
Throttle.after(dumpInterval) { dumpSubmitQueue() }
Throttle.after(dumpInterval) {
dumpSubmitQueue()

// Try to refresh CEPH metadata
// run simpleFind on session.workDir down to 2 levels
if(FileHelper.getPathFsType(session.workDir) == 'ceph') {
log.trace "Refresh CEPH metadata: submitLoop()"
FileHelper.syncFS(session.workDir)
FileHelper.simpleFind(session.workDir, 2, false)
}

}

awaitSlots()
}
}
Expand All @@ -444,6 +457,14 @@ class TaskPollingMonitor implements TaskMonitor {
previous = sz
}

// Try to refresh CEPH metadata
// run simpleFind on session.workDir down to 2 levels
if(FileHelper.getPathFsType(session.workDir) == 'ceph') {
log.trace "Refresh CEPH metadata: pollLoop()"
FileHelper.syncFS(session.workDir)
FileHelper.simpleFind(session.workDir, 2, false)
}

// check all running tasks for termination
checkAllTasks(tasks)

Expand Down
79 changes: 77 additions & 2 deletions modules/nf-commons/src/main/nextflow/file/FileHelper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,7 @@ class FileHelper {
return false

final type = getPathFsType(path)
final result = type == 'nfs' || type == 'lustre'
log.debug "FS path type ($result): $path"
final result = type == 'nfs' || type == 'lustre' || type == 'ceph'
return result
}

Expand Down Expand Up @@ -559,6 +558,7 @@ class FileHelper {

final begin = System.currentTimeMillis()
final path = (self.parent ?: '/').toString()

while( true ) {
final process = Runtime.runtime.exec("ls -la ${path}")
final status = process.waitFor()
Expand Down Expand Up @@ -1041,6 +1041,7 @@ class FileHelper {

})
}

/**
* List the content of a file system path
*
Expand Down Expand Up @@ -1077,6 +1078,80 @@ class FileHelper {
return result
}

/**
* run fs sync a file system path
*
* @param path
* The system system directory of the fs to sync
* @return
* Exit status
*/
static int syncFS(Path path) {

int result = 1
Process process = null
final target = Escape.path(path)

try {
process = Runtime.runtime.exec(['sh', '-c', "sync -f ${target}"] as String[])
process.waitForOrKill(10_000)
log.trace "Syncing filesystem: -- path: ${target}\n"

def syncStatus = process.exitValue()
if( syncStatus>0 ) {
log.debug "Can't sync filesystem: ${target} -- Exit status: $syncStatus"
}
else {
result = syncStatus
}
}
catch( IOException e ) {
log.debug "Can't sync filesystem: $target -- Cause: ${e.message ?: e.toString()}"
}
finally {
process?.destroy()
}

return result
}

/**
* Simple 'find': lists all files and directories under basePath up to maxDepth
* @param basePath The root directory to search
* @param maxDepth Maximum depth of recursion (0 = only basePath)
* @return List of matching Paths, or an empty list if an error occurs
*/
static List<Path> simpleFind(Path basePath, int maxDepth = Integer.MAX_VALUE, boolean parent = false) {
List<Path> result = []
def path

try {
path = parent ? basePath.parent : basePath

if (!Files.exists(path)) {
log.debug "[ERROR] Path does not exist: $basePath"
return result
}
if (!Files.isReadable(path)) {
log.debug "[ERROR] Path is not readable: $basePath"
return result
}

Files.walk(path, maxDepth).forEach { p ->
result << p
}
} catch (IOException e) {
log.debug "[ERROR] IO exception occurred: ${e.message}"
} catch (SecurityException e) {
log.debug "[ERROR] Security exception: ${e.message}"
} catch (Exception e) {
log.debug "[ERROR] Unexpected error: ${e.message}"
}

return result
}


static BasicFileAttributes readAttributes(Path path, LinkOption... options) {
try {
Files.readAttributes(path,BasicFileAttributes,options)
Expand Down