#!/usr/bin/python3 #File: /users/krowe/htcondor/plugin/gibson_plugin.py #Author: K. Scott Rowe #Time-stamp: <04/25/2022 15:49:05 krowe@nmpost-master> # krowe Apr 30 2021: syntax of infile is # ://[userinfo@] # E.g. # gibson://krowe@/users/krowe/.bashrc # gibson:///user/krowe/RCS # note the tripple slash # what about paths relative to ~userinfo # I use subprocess.Popen because it works with a wide veriety of python # versions and can suppress stdout/stderr unlike subprocess.call. # Apparently subprocess can't handle arguments to ssh that are then # arguments to rsync. E.g. rsync -a -e ssh -q -l userinfo... # So I use the RSYNC_RSH environment variable. # krowe May 3 2021: transfer plugins require the python2-condor # or python3-condor RPM. # krowe May 11 2021: apparently there is some code in HTCondor somewhere # that keys of the underbar (_) in the name of the plugin. In other words # I got an error when this was named nrao_rsync_plugin.py and I used # 'nrao_rsync://' as the scheme. So I changed it to 'nraorsync://' # krowe May 5 2021: Greg is right. HTCondor is bad as naming things. # I think the infile is a file containing the arguments to the plugin # and not the files to import or download. When called to # upload files with -upload, the arguments will again be in the infile. # I also think downloading files is the default, so if there isn't a # -upload argument, the plugin assumes it is downloading. The outfile # I still don't quite get. # krowe Apr 22 2022: Actually I think that since I set # *output_destination = nraorsync://...* in the submit description # file, the FTM *is* using the plugin. It has to because # output_destination requires that everything use the plugin. So the # FTM calls the plugin with _condor_stdout and _condor_stderr which # activates the upload_file() function in the plugin and the files are # copied. This is why they aren't remapped. I configured the plugin # to also copy these files and rename them. Perhaps instead I could # watch for them in upload_file() and remap them there? # # Done # # - Done: I can use _CONDOR_JOB_AD and/or _CONDOR_MACHINE_AD environment # variables to find the job.ad and machine.ad files. # - Done: change the name from gibson_plugin to something more generic # yet still informative like rsync_plugin or maybe nrao_rsync_plugin. # - Done: If userinfo is not set in the URI, rely on the RemoteUser MachineAd # to get the userinfo. # - Done: set/use a MachineAd like NRAO_TRANSFER_HOST as the host to # rsync to/from. Or use cp if it is LOCAL or something like that. # - Done: If we are at a third party like CHTC, NRAO_TRANSFER_HOST won't exist # but we will still need to know what host to use. This could be decided # based on a JobAd like GlobalJobId or UidDomain or User # Or maybe a MachineAd like RemoteOwner or RemoteUser. # I think we will always flock to a third party like CHTC so we can use # RemoteOwner to tell which site we came from (AOC or CV). # - Done: Look for an ssh key to use. A known filename like # condor_transfer.key or defaulting to ~/.ssh. # - Done: get -upload working so that files can be copied back # - How do I know the directory to upload to? # - environ:_CONDOR_JOB_IWD This isn't available to the plugin apparently. # - .job.ad:OrigIwd This is initialdir if set in the submit file, # else it is the CWD where condor_submit was run. # - actually I think I should use Url in the inputfile # instead of OrigIwd. This gets set by the output_destination # attribute in the submit file. # - Done: Set NRAO_TRANSFER_HOST on the nmpost nodes both AOC and NMT # - upload_file() assumes you are using ssh. dest is always the form # rsynchost + fqpn. This will fail if an ssh key is not provided # for example when using LOCAL hosts like nmpost040. # - Done: krowe Apr 5 2022: remove --timeout=30 from rsync commands. We would # rather rsync keep trying to download/upload data instead of giving # up so quickly. Eventually, NOT_RESPONDING_TIMEOUT will kick in and # kill the job. # - Done: krowe Apr 22 2022: I think that since I set # output_destination = nraorsync://... in the submit description file, # it forces the FTM to use the plugin. So the FTM calls the plugin # with _condor_stdout and _condor_stderr which activates the # upload_file() function in the plugin and the files are copied. This # is why they aren't remapped. I configured the plugin to also copy # these files and rename them in upload_rsync() but I think the better # solution is to watch for them in upload_file() and remap them there? # version 0.91 # # To Do # # - Can I produce useful error messages that will show up in # condor_q -hold? I don't see how. # krowe Sep 7 2021: Perhaps TransferError and/or TransferUrl are used # in the hold error message. Need to test. import os import sys import socket import time try: from urllib.parse import urlparse # Python 3 except ImportError: from urlparse import urlparse # Python 2 import posixpath import json import classad import htcondor import subprocess #debugfile = open('/tmp/nraorsync_plugin.out', 'a') #os.chmod('/tmp/nraorsync_plugin.out', 0o0777) #debugfile.write("DEBUG: Starting nraorsync\n") NRAORSYNC_PLUGIN_VERSION = '0.91' # change def print_help(stream = sys.stderr): help_msg = '''Usage: {0} [-upload] -infile -outfile {0} -classad This plugin is expected to either be set with the FILETRANSFER_PLUGINS in the condor config file like so FILETRANSFER_PLUGINS = $(LIBEXEC)/nraorsync_plugin.py, $(FILETRANSFER_PLUGINS) or with the transfer_plugins attribute in the submit description file like so transfer_plugins = nraorsync=/users/krowe/htcondor/plugin/nraorsync_plugin.py Then, you can use this plugin to transfer files by adding a URI to the transfer_input_files attribute in the submit description file like so transfer_input_files = nraorsync:///users/krowe/hpg or transfer_input_files = nraorsync://krowe@/users/krowe/hpg If there is not @ in the URI, the plugin will use the RemoteUser found in .machine.ad. Finally, output files can be transfered back via this plugin by using the custom classad nrao_output_files like so +nrao_output_files = "software data" Options: -classad Print a ClassAd containing the capablities of this file transfer plugin. -infile Input ClassAd file -outfile Output ClassAd file -upload Indicates this transfer is an upload (default is download) ''' stream.write(help_msg.format(sys.argv[0])) def print_capabilities(): capabilities = { 'MultipleFileSupport': True, 'PluginType': 'FileTransfer', 'SupportedMethods': 'nraorsync', 'Version': NRAORSYNC_PLUGIN_VERSION, } sys.stdout.write(classad.ClassAd(capabilities).printOld()) def parse_args(): '''The optparse library can't handle the types of arguments that the file transfer plugin sends, the argparse library can't be expected to be found on machines running EL 6 (Python 2.6), and a plugin should not reach outside the standard library, so the plugin must roll its own argument parser. The expected input is very rigid, so this isn't too awful.''' # The only argument lists that are acceptable are # -classad # -infile -outfile # -outfile -infile if not len(sys.argv) in [2, 5, 6]: print_help() sys.exit(-1) # If -classad, print the capabilities of the plugin and exit early if (len(sys.argv) == 2) and (sys.argv[1] == '-classad'): print_capabilities() sys.exit(0) # If -upload, set is_upload to True and remove it from the args list is_upload = False if '-upload' in sys.argv[1:]: is_upload = True sys.argv.remove('-upload') # -infile and -outfile must be in the first and third position if not ( ('-infile' in sys.argv[1:]) and ('-outfile' in sys.argv[1:]) and (sys.argv[1] in ['-infile', '-outfile']) and (sys.argv[3] in ['-infile', '-outfile']) and (len(sys.argv) == 5)): print_help() sys.exit(-1) infile = None outfile = None try: for i, arg in enumerate(sys.argv): if i == 0: continue elif arg == '-infile': infile = sys.argv[i+1] elif arg == '-outfile': outfile = sys.argv[i+1] except IndexError: print_help() sys.exit(-1) return {'infile': infile, 'outfile': outfile, 'upload': is_upload} def get_scheme_name(url): '''Return the scheme portion of a URI, e.g. http or file''' scheme = url.split('://')[0] if '+' in scheme: (handle, provider) = scheme.split('+') scheme_name = '{0}_{1}'.format(provider, handle) else: scheme_name = scheme return scheme_name def get_userinfo(url): '''Return the username portion of a URL, if there is one. Return None if there isn't one.''' userinfo = url.split('://')[1].split('/')[0] if '@' in userinfo: userinfo = userinfo.split('@')[0] else: userinfo = None return userinfo def get_fqpn(url): '''Return the Fully Qualified Path Name of the file or directory portion of the URL''' fqpn = url.split('://')[1] if '@' in fqpn: fqpn = fqpn.split('@')[1] return fqpn def format_error(error): return '{0}: {1}'.format(type(error).__name__, str(error)) def get_error_dict(error, url = ''): error_string = format_error(error) error_dict = { 'TransferSuccess': False, 'TransferError': error_string, 'TransferUrl': url, } return error_dict def download_file(url, local_file_path): '''Copies data from NRAO_TRANSFER_HOST via the best interface.''' scheme_name = get_scheme_name(url) userinfo = get_userinfo(url) fqpn = get_fqpn(url) # Fully Qualified Path Name dest = "." # should this be local_file_path? start_time = time.time() try: ads = classad.parseAds(open(os.environ['_CONDOR_MACHINE_AD'], 'r')) for ad in ads: try: remote_user = str(ad['RemoteUser']) except: subprocess.call(['/usr/libexec/condor/condor_chirp', 'ulog', '"nraorsync ERROR: reading RemoteUser ad."']) sys.exit(-1) try: transfer_host = str(ad['NRAO_TRANSFER_HOST']) except: transfer_host = None except Exception: subprocess.call(['/usr/libexec/condor/condor_chirp', 'ulog', '"nraorsync ERROR: parsing _CONDOR_MACHINE_AD"']) sys.exit(-1) if(userinfo == None): userinfo = remote_user.split('@')[0] if(transfer_host == None): # Third party E.g. CHTC if('aoc.nrao.edu' in remote_user): source = "gibson.aoc.nrao.edu" + ":" + fqpn elif('cv.nrao.edu' in remote_user): source = "ssh.cv.nrao.edu" + ":" + fqpn else: subprocess.call(['/usr/libexec/condor/condor_chirp', 'ulog', '"nraorsync ERROR: remote_user contains neither aoc.nrao.edu nor cv.nrao.edu"']) sys.exit(-1) if(os.path.exists('./condor_transfer') and (os.path.getsize('./condor_transfer') != 0)): rsync_rsh='ssh -l ' + userinfo + ' -i ./condor_transfer -q' else: subprocess.call(['/usr/libexec/condor/condor_chirp', 'ulog', '"nraorsync ERROR: condor_transfer not found or empty"']) sys.exit(-1) elif(transfer_host == "LOCAL"): # Home site E.g. AOC, CV if(os.path.exists(fqpn)): source = fqpn rsync_rsh='' else: subprocess.call(['/usr/libexec/condor/condor_chirp', 'ulog', '"nraorsync ERROR: fqpn not found"']) sys.exit(-1) else: # Near site E.g. NMT transfer_host=gibson-10g2 source = userinfo + "@" + transfer_host + ":" + fqpn if(os.path.exists('./condor_transfer') and (os.path.getsize('./condor_transfer') != 0)): rsync_rsh='ssh -l ' + userinfo + ' -i ./condor_transfer -q' else: subprocess.call(['/usr/libexec/condor/condor_chirp', 'ulog', '"nraorsync ERROR: condor_transfer not found or empty"']) sys.exit(-1) connection_start_time = time.time() ret = subprocess.Popen(['rsync', '-aq', source, dest], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={"RSYNC_RSH": rsync_rsh}) stdout, stderr = ret.communicate() if(ret.returncode == 0): file_size = os.stat(local_file_path).st_size end_time = time.time() transfer_stats = { 'TransferSuccess': True, 'TransferProtocol': 'rsync', 'TransferType': 'download', 'TransferFileName': local_file_path, 'TransferFileBytes': file_size, 'TransferTotalBytes': file_size, 'TransferStartTime': int(start_time), 'TransferEndTime': int(end_time), 'ConnectionTimeSeconds': end_time - connection_start_time, 'TransferHostName': transfer_host, 'TransferLocalMachineName': socket.gethostname(), } return(transfer_stats) else: message = 'nraorsync ERROR: rsync download failed ' + str(ret.returncode) subprocess.call(['/usr/libexec/condor/condor_chirp', 'ulog', message]) message = 'nraorsync ERROR: ' + str(stderr) subprocess.call(['/usr/libexec/condor/condor_chirp', 'ulog', message]) sys.exit(ret.returncode) def upload_rsync(): '''Rsyncs everything in a custom job ad to the appropriate data_mover host.''' rsynchost = 'gibson.aoc.nrao.edu:' try: ads = classad.parseAds(open(os.environ['_CONDOR_JOB_AD'], 'r')) except Exception: return(-1) for ad in ads: try: output_destination = str(ad['OutputDestination']) globaljobid = str(ad['GlobalJobId']) fqpn = get_fqpn(output_destination) except: return(-1) try: nrao_output_files = str(ad['nrao_output_files']) except: return(0) try: ads = classad.parseAds(open(os.environ['_CONDOR_MACHINE_AD'], 'r')) for ad in ads: try: remote_user = str(ad['RemoteUser']) userinfo = remote_user.split('@')[0] except: return(-1) try: transfer_host = str(ad['NRAO_TRANSFER_HOST']) except: transfer_host = None except Exception: return(-1) if(transfer_host == None): # Third party E.g. CHTC if('aoc.nrao.edu' in remote_user): dest = "gibson.aoc.nrao.edu" + ":" + fqpn elif('cv.nrao.edu' in remote_user): dest = "ssh.cv.nrao.edu" + ":" + fqpn else: return(-1) if(os.path.exists('./condor_transfer') and (os.path.getsize('./condor_transfer') != 0)): rsync_rsh='ssh -l ' + userinfo + ' -i ./condor_transfer -q' else: return(-1) elif(transfer_host == "LOCAL"): # Home site E.g. AOC, CV dest = fqpn rsync_rsh='' else: # Near site E.g. NMT transfer_host=gibson-10g2 dest = userinfo + "@" + transfer_host + ":" + fqpn if(os.path.exists('./condor_transfer') and (os.path.getsize('./condor_transfer') != 0)): rsync_rsh='ssh -l ' + userinfo + ' -i ./condor_transfer -q' else: return(-1) for file in nrao_output_files.split(): if(os.path.exists(file)): # debugfile.write("DEBUG: upload_rsync(): " + "rsync_rsh is (" + rsync_rsh + ")\n") # debugfile.write("DEBUG: upload_rsync(): " + "rsync " + "-aq " + file + " " + dest + "\n") ret = subprocess.Popen(["rsync", "-aq", file, dest], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={"RSYNC_RSH": rsync_rsh}) stdout, stderr = ret.communicate() if(ret.returncode != 0): return(-1) return(0) def upload_file(url, local_file_path): '''Copies a file/directory to the appropriate data_mover host.''' fqpn = get_fqpn(url) start_time = time.time() rsynchost = 'gibson.aoc.nrao.edu:' try: ads = classad.parseAds(open(os.environ['_CONDOR_JOB_AD'], 'r')) for ad in ads: try: output_destination = str(ad['OutputDestination']) globaljobid = str(ad['GlobalJobId']) fqpn = get_fqpn(output_destination) except: return(-1) except Exception: return(-1) try: ads = classad.parseAds(open(os.environ['_CONDOR_MACHINE_AD'], 'r')) for ad in ads: try: remote_user = str(ad['RemoteUser']) userinfo = remote_user.split('@')[0] except: sys.exit(-1) try: transfer_host = str(ad['NRAO_TRANSFER_HOST']) except: transfer_host = None except Exception: sys.exit(-1) if(transfer_host == None): # Third party E.g. CHTC if('aoc.nrao.edu' in remote_user): dest = "gibson.aoc.nrao.edu" + ":" + fqpn elif('cv.nrao.edu' in remote_user): dest = "ssh.cv.nrao.edu" + ":" + fqpn else: sys.exit(-1) if(os.path.exists('./condor_transfer') and (os.path.getsize('./condor_transfer') != 0)): rsync_rsh='ssh -l ' + userinfo + ' -i ./condor_transfer -q' else: sys.exit(-1) elif(transfer_host == "LOCAL"): # Home site E.g. AOC, CV if(os.path.exists(local_file_path)): dest = fqpn rsync_rsh='' else: sys.exit(-1) else: # Near site E.g. NMT transfer_host=gibson-10g2 dest = userinfo + "@" + transfer_host + ":" + fqpn if(os.path.exists('./condor_transfer') and (os.path.getsize('./condor_transfer') != 0)): rsync_rsh='ssh -l ' + userinfo + ' -i ./condor_transfer -q' else: sys.exit(-1) if(os.path.exists(local_file_path)): file_size = os.stat(local_file_path).st_size connection_start_time = time.time() # krowe Apr 22 2022: remap _condor_stdout and _condor_stderr filename = os.path.split(local_file_path)[1] if(filename == '_condor_stdout') or (filename == '_condor_stderr'): # debugfile.write("DEBUG: upload_file(): _condor_stdout or _condor_stderr\n") submithost = globaljobid.split('#')[0] collector = htcondor.Collector() schedd_ad = collector.locate(htcondor.DaemonTypes.Schedd, submithost) schedd = htcondor.Schedd(schedd_ad) for job in schedd.xquery(constraint='GlobalJobId=="' + globaljobid + '"', projection=['Out','Err']): if(filename == '_condor_stdout'): dest += "/" + job.get('Out') if(filename == '_condor_stderr'): dest += "/" + job.get('Err') # debugfile.write("DEBUG: upload_file(): " + "rsync_rsh is (" + rsync_rsh + ")\n") # debugfile.write("DEBUG: upload_file(): " + "rsync " + "-aq " + local_file_path + " " + dest + "\n") ret = subprocess.Popen(["rsync", "-aq", local_file_path, dest], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={"RSYNC_RSH": rsync_rsh}) stdout, stderr = ret.communicate() if(ret.returncode == 0): end_time = time.time() transfer_stats = { 'TransferSuccess': True, 'TransferProtocol': 'rsync', 'TransferType': 'upload', 'TransferFileName': local_file_path, 'TransferFileBytes': file_size, 'TransferTotalBytes': file_size, 'TransferStartTime': int(start_time), 'TransferEndTime': int(end_time), 'ConnectionTimeSeconds': end_time - connection_start_time, 'TransferHostName': rsynchost, 'TransferLocalMachineName': socket.gethostname(), 'TransferUrl': 'https://gibson.aoc.nrao.edu/', } return(transfer_stats) else: sys.exit(ret.returncode) else: sys.exit(-1) if __name__ == '__main__': # Per the design doc, all failures should result in exit code -1. # This is true even if we cannot write a ClassAd to the outfile, # so we catch all exceptions, try to write to the outfile if we can # and always exit -1 on error. # # Exiting -1 without an outfile thus means one of two things: # 1. Couldn't parse arguments. # 2. Couldn't open outfile for writing. # Start by parsing input arguments try: args = parse_args() except Exception: sys.exit(-1) # Parse in the classads stored in the input file. # Each ad represents a single file to be transferred. try: infile_ads = classad.parseAds(open(args['infile'], 'r')) except Exception as err: try: with open(args['outfile'], 'w') as outfile: outfile_dict = get_error_dict(err) outfile.write(str(classad.ClassAd(outfile_dict))) except Exception: pass sys.exit(-1) # Now iterate over the list of classads and perform the transfers. try: with open(args['outfile'], 'w') as outfile: # krowe Oct 28 2021: rsync nrao_output_files if args['upload']: if upload_rsync() != 0: raise err for ad in infile_ads: tries = 0 while tries < 3: tries += 1 try: if not args['upload']: outfile_dict = download_file(ad['Url'], ad['LocalFileName']) else: outfile_dict = upload_file(ad['Url'], ad['LocalFileName']) except IOError as err: # Retry on socket closed unexpectedly if (err.errno == 32) and (tries < 3): pass else: raise err else: break outfile.write(str(classad.ClassAd(outfile_dict))) except Exception as err: try: outfile_dict = get_error_dict(err, url = ad['Url']) outfile.write(str(classad.ClassAd(outfile_dict))) except Exception: pass # debugfile.close() sys.exit(-1)