Source code for GaugiKernel.MultiProcessing

__all__ = [ "Pool","Slot", "chunks"]


from GaugiKernel import LoggingLevel, Logger
from GaugiKernel.macros import *
from pprint import pprint
import argparse
import os, time
import subprocess

[docs] def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n]
[docs] class Slot(object): def __init__( self ): self.__proc = None self.__lock = False
[docs] def lock(self): self.__lock=True
[docs] def unlock(self): self.__lock=False
[docs] def update(self): if self.__proc and not self.__proc.poll(): self.unlock()
[docs] def run(self, command, dry_run=False): pprint(command) time.sleep(2) if not dry_run: self.__proc = subprocess.Popen(command, shell=True, env=os.environ) self.lock()
[docs] def isAvailable(self): if self.__proc: if not self.__proc.poll() is None: self.unlock() return not self.__lock
[docs] class Pool( Logger ): def __init__(self, func, inputs,maxJobs, output, dry_run): Logger.__init__(self) self.__inputs = inputs self.__func = func self.__slots = [Slot() for _ in range(maxJobs)] self.__output = output self.__outputs = [] self.dry_run = dry_run
[docs] def getAvailable(self): for slot in self.__slots: if slot.isAvailable(): return slot return None
[docs] def busy(self): for slot in self.__slots: if not slot.isAvailable(): return True return False
[docs] def generate(self): # prepare the command job inputs = self.__inputs.pop() idx = len(self.__inputs) # output label number output = self.__output + '.' + str(idx) command = self.__func(inputs, output + '.tmp') # remove tmp name when complete the job command += f' && mv {output}.tmp {output}' self.__outputs.append(output) print(command) return command, output
# # Run jobs #
[docs] def run( self ): while len(self.__inputs) > 0: slot = self.getAvailable() if slot: command, output = self.generate() if os.path.exists(output): continue slot.run( command , self.dry_run) while self.busy(): continue
[docs] def merge(self): command = "hadd -f "+self.__output for fname in self.__outputs: command += ' '+fname os.system(command) for fname in self.__outputs: os.system( 'rm -rf '+fname)