File size: 3,898 Bytes
92f0e98
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
'''
Utility for simple distribution of work on multiple processes, by
making sure only one process is working on a job at once.
'''

import os, errno, socket, atexit, time, sys

def exclusive_dirfn(*args):
    '''
    Convenience function to get exclusive access to an unfinished
    experiment directory.  Exits the program if the directory is
    already done or busy (using exit_of_job_done).  Otherwise,
    returns a function creates filenames within that directory.
    '''
    directory = os.path.join(*[str(a) for a in args])
    exit_if_job_done(directory)
    def dirfn(*fn):
        return os.path.join(directory, *fn)
    dirfn.dir = directory
    def done():
        mark_job_done(directory)
    dirfn.done = done
    print('Working in %s' % directory)
    return dirfn

def exit_if_job_done(directory, redo=False, force=False, verbose=True):
    if pidfile_taken(os.path.join(directory, 'lockfile.pid'),
            force=force, verbose=verbose):
        sys.exit(0)
    donefile = os.path.join(directory, 'done.txt')
    if os.path.isfile(donefile):
        with open(donefile) as f:
            msg = f.read()
        if redo or force:
            if verbose:
                print('Removing %s %s' % (donefile, msg))
            os.remove(donefile)
        else:
            if verbose:
                print('%s %s' % (donefile, msg))
            sys.exit(0)

def mark_job_done(directory):
    with open(os.path.join(directory, 'done.txt'), 'w') as f:
        f.write('done by %d@%s %s at %s' %
                (os.getpid(), socket.gethostname(),
                 os.getenv('STY', ''),
                 time.strftime('%c')))

def pidfile_taken(path, verbose=False, force=False):
    '''
    Usage.  To grab an exclusive lock for the remaining duration of the
    current process (and exit if another process already has the lock),
    do this:

    if pidfile_taken('job_423/lockfile.pid', verbose=True):
        sys.exit(0)

    To do a batch of jobs, just run a script that does them all on
    each available machine, sharing a network filesystem.  When each
    job grabs a lock, then this will automatically distribute the
    jobs so that each one is done just once on one machine.
    '''

    # Try to create the file exclusively and write my pid into it.
    try:
        os.makedirs(os.path.dirname(path), exist_ok=True)
        fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
    except OSError as e:
        if e.errno == errno.EEXIST:
            # If we cannot because there was a race, yield the conflicter.
            conflicter = 'race'
            try:
                with open(path, 'r') as lockfile:
                    conflicter = lockfile.read().strip() or 'empty'
            except:
                pass
            # Force is for manual one-time use, for deleting stale lockfiles.
            if force:
                if verbose:
                    print('Removing %s from %s' % (path, conflicter))
                os.remove(path)
                return pidfile_taken(path, verbose=verbose, force=False)
            if verbose:
                print('%s held by %s' % (path, conflicter))
            return conflicter
        else:
            # Other problems get an exception.
            raise
    # Register to delete this file on exit.
    lockfile = os.fdopen(fd, 'r+')
    atexit.register(delete_pidfile, lockfile, path)
    # Write my pid into the open file.
    lockfile.write('%d@%s %s\n' % (os.getpid(), socket.gethostname(),
        os.getenv('STY', '')))
    lockfile.flush()
    os.fsync(lockfile)
    # Return 'None' to say there was not a conflict.
    return None

def delete_pidfile(lockfile, path):
    '''
    Runs at exit after pidfile_taken succeeds.
    '''
    if lockfile is not None:
        try:
            lockfile.close()
        except:
            pass
    try:
        os.unlink(path)
    except:
        pass