Guide to Ganga

From GridPP Wiki
Jump to: navigation, search

Introduction

This is a guide to installing and configuring the Ganga Job Management tool for use with both local batch systems and the DIRAC workload management system. It's maintained by Ulrik Egede (ulrik<AT>monash.edu) - please email if you have any comments/problems!

General documentation on Ganga as well as a detailed user guide can be found from the GitHub site https://github.com/ganga-devs/ganga . For general Ganga issues (i.e. outside of pointing out problems with this guide :)) please post an issue on the GitHub site.

Requirements

Before you start using Ganga (assuming you want to use it to submit jobs to the grid rather than just for local batch system submission), there are a few steps you need to go through:

Installation and Configuration with CVMFS

It is by far the easiest to run Ganga if you have the /cvmfs file system available.

  • Create an alias
alias ganga=/cvmfs/ganga.cern.ch/runGanga-dirac.sh
  • Run Ganga with the -g flag to create the default .gangarc file. Replace <VO> with the name of your virtual organisation. Note the _user after the VO.
ganga -g -o[Configuration]RUNTIME_PATH=GangaDirac -o[defaults_DiracProxy]group=<VO>_user
  • Now edit your .gangarc file and set the following option. Again replace <VO> with the name of your virtual organisation. Take care to edit within the relevant sections of the file and make sure to remove the # at the

beginning of the line.

[DIRAC]
proxyInitCmd = dirac-proxy-init -g <VO> -M
userVO = <VO>
[defaults_DiracProxy]
group = <VO>_user
  • To test that all is working, start ganga amd try to submit a basic job to the local machine you're running and then to DIRAC:
Job().submit()
Job( backend=Dirac() ).submit()

Installation and Configuration without CVMFS

  • Make sure that you have python3 available, and that it is version 3.6 or higher. Also make sure that you have pip available.
  • Create a virtual environment for the code and enable it
python3 -m venv gangacode
cd gangacode
source bin/activate
  • Install Ganga and its dependencies
pip install ganga
  • Run Ganga with the -g flag to create the default .gangarc file. Replace <VO> with the name of your virtual organisation:
 ganga -g -o[Configuration]RUNTIME_PATH=GangaDirac -o[defaults_DiracProxy]group=<VO>_user
  • Now edit your .gangarc file and set the following option. Again replace <VO> with the name of your virtual organisation and insert the correct path for your Dirac client installation. Take care to edit within the relevant sections of the file and make sure to remove the # at the

beginning of the line.

[Configuration] 
RUNTIME_PATH = GangaDirac
[Dirac] 
DiracEnvSource = <PATH-TO-YOUR-DIRAC-INSTALLATON>/bashrc
[DIRAC]
proxyInitCmd = dirac-proxy-init -g <VO> -M
userVO = <VO>
[defaults_DiracProxy]
group = <VO>_user
  • To test that all is working, try to submit a basic job to the local machine you're running and then to DIRAC:
Job().submit()
Job( backend=Dirac() ).submit()

Getting Started

Ganga is a general job management tool to help with the submission, monitoring and manipulation of jobs to different systems. It is based on the idea of plugins that tell a Job what to run (Application), Where to run (Backend), how to run (Splitter and PostProcessor) and what data to use (InputFiles and OutputFiles). It is written almost entirely in Python and either the modified IPython prompt or scripts can be used to control it.

To start, we'll submit a default job that will go to the 'Local' backend (i.e. the machine you are using at present). Start ganga as above and then enter the following:

j = Job()
j.submit()

You should (almost immediately) have the job submit, start running and then complete. By default, the stdout/err are copied back with your job and stored in the Ganga workspace. To view them, you can use the following:

j.peek("stdout", "emacs")    # open any file in the j.outputdir with the given command
!emacs $j.outputdir/stdout   # Use '!' to give a shell command and '$' for an IPython command

This default job object uses the 'Executable' application with the exe set to 'echo' and the arguments set to 'Hello World'. To run your own scripts, do the following:

j = Job()
j.application = Executable()
j.application.exe = '/path/to/script'
j.application.args = [ ... ]
j.submit()

To view the jobs that you have created, use the 'jobs' command. This gives a list of the job objects along with their status. You can also use this to access the jobs themselves and view all the information about them, e.g.

jobs
j = jobs(0)    # grab jobs object ID 0
j              # view the object
j.application
j.backend

To get more information about the different objects and plugins, use the 'help' system:

help()
help(Job)
help(Executable)
plugins("applications")
plugins("backends")

Ganga will automatically ask for you to renew the proxy when required. However, it might be better to renew it in advance. In that case you can do

credential_store.create(DiracProxy())

To view your valid credentials, simply look at the credential_store

credential_store

Input and Output Data

Ganga tries to hide as much of the problems associated with Data Transfer and access as possible. The basic use case is that of providing a list of files that you want to be present on the worker node where your job runs (Input Files) and the list of files that the job will create that you wish to be transferred elsewhere (Output Files). There are various file types that you can use but the most basic is the 'LocalFile'. This will transfer local files to the worker node and make sure they are transferred back when the job finishes. As an example:

j.inputfiles = [ LocalFile("myinputfile.txt") ]
j.outputfiles = [ LocalFile("myoutputfile.txt") ]

Add these linesto a typical job script before submission and Ganga will transfer the file 'myinputfile.txt' to the worker node and attempt to transfer back the file 'myoutputfile.txt' to the output directory of the job. Importantly, this is independent of backend - if you're using a Batch backend, Dirac or LCG WMS, Ganga will figure out itself how to transfer the files itself.

There is a lot of power in the Input/Output files system including a number of different file types for accessing Dirac files, Mass Storage Files (e.g. castor), LCG SE files and Google Drvie files and all can accept wilcards.


Splitting into Subjobs

Quite often, you need to run the same job but with different arguments or input data or wish to take advantage of a backend's bulk submission capabilities. The Splitter in Ganga is what you use to achieve this. In GangaCore, there is a single splitter that should serve most purposes - the GenericSplitter. A simple example of it's use is shown below:

j = Job()
j.splitter = GenericSplitter()
j.splitter.attribute = "application.args"
j.splitter.values = [ 'arg 1', 'arg 2', 'arg 3' ]
j.submit()

This will create one master job with the 3 subjobs that have 3 different arguments. To view these subjobs, do:

j.subjobs
j.subjobs(0)

Note that these will have been submitted in bulk if the backend supports it (Dirac does not at time of writing).

As a second example, if you want to submit several subjobs but changing multiple parameters for each subjob, you can use also do this with the GenericSplitter:

j = Job()
j.splitter = GenericSplitter()
j.splitter.multi_args = { "application.args":["hello1", "hello2"], "application.env":[{"MYENV":"test1"}, {"MYENV":"test2"}] }

The multi_args field takes a dictionary with the keys of the parameter names you want to change and then values as a list of the what the parameter should be set to per subjob. In the example above, 2 subjobs are created, the first with:

application.args = "hello1"
application.env = {"MYENV":"test1"}

and the second with:

application.args = "hello2"
application.env = {"MYENV":"test2"}

For both of these examples, you can split on any property of the job, e.g. inputfiles, backend.requirements, etc.

Submitting to Different Backends

One of the main benefits of Ganga is that you can submit to different backends with very little change to your submission scripts. For example, above we were submitting to the Dirac backend but if you wanted to submit to a local PBS batch system, you only need to change the backend line:

j.backend = PBS()

All input and output data will be handled by Ganga. The supported Core backends include Local, LSF, PBS, Condor, ARC, CREAM, LCG and Dirac. Note that some requirements will be backend dependent so do check the associated requirements object, e.g.

j.backend.requirements

to see what options are available.


Using Queues to Speed Up Submission

When submitting to some backends, DIRAC included, it can take a bit of time to go through the whole submission process. When you have 10s-1000s of jobs to submit, this can become a significant problem. You can greatly speed things up by using the Ganga queues system to submit your jobs in parallel, e.g.:

for i in range(0, 10):
   j = Job( backend = Dirac() )
   queues.add(j.submit)

You can view the threads Ganga knows about by using the 'queues' command. To configure the number of queues, use:

[Queues] NumWorkerThreads

You can add any function call to the queues system to run in the background. To get more info, use help(queues).

Using Tasks for Automated Submission and Job Chaining

Introduction to GangaTasks

Even with Ganga, you can find that you may find managing a large set of jobs and steps in an analysis to be a bit cumbersome. The GangaTasks package was developed to help with these larger scale analyses and remove as much of the 'busy work' as possible. It can automatically submit jobs to keep a set number running, it can create new jobs when others complete and chain their data together, it can automatically transfer data around as required and a number of other things as well. As with all of Ganga it is based on the plugin system and so you can easily extend some elements of it to better suit your requirements.

GangaTasks essentially adds 3 new objects that control all aspects of the overall task:

  • Task: This is overall 'container' for the steps in your analysis. It is fairly light weight but is used to aggregate the overall status of the task and control overall settings, numbers of jobs, etc.
  • Transform: This is where most things occur. It is in some ways analogous to a Job Template in that it mostly contains the objects that will be assigned to the created jobs. This is where new Units are created and data is transferred between steps. You will generally have a Transform per 'step' or 'type' of job that you want to run.
  • Unit: This is the 'control class' for any created jobs and contains all the job-specific information (e.g. input data, application settings, etc.) that each actual Job will be setup with. After all the units in a Transform are created, each unit then creates a new Job and attempts to submit it. It will monitor the status of the job and will do any necessary actions (e.g. download output data) upon completion. If the job fails and it seems sensible to do so, it will also resubmit or recreate the job.

A typical example of how this structure works would be in a two stage analysis where you generate some MC in the first step and then run some analysis code on the output of this data. You would create an overall Task to manage both steps. Each step would have an associated Transform with the first being setup as MC generation and the second doing the analysis. You would set the input data of the second transform to be the output data of the first. Then, while running your Task, Units will be created to cover the number of events you wanted to create and jobs will be submitted for each of these. As these complete new units and jobs will be created by the analysis Transform to cover that step.

Basic Core Usage

It's quite likely you will want to develop your own plugins to maximise your use of GangaTasks, however there is a set of generalised classes that can get you started. Typical use of these is shown below:

# First create the overall Task
t = CoreTask()
# Now create the Transform ( -> Job template)
trf = CoreTransform()
trf.application = Executable()
trf.backend = Dirac()
# Set the unit splitter (unique to CoreTransform - you may have better ways of creating units in your own plugins)
# This will create a unit based on the splitting of any given splitter
# If you put in your own splitter here, use the trf.fields_to_copy string list to tell Tasks which fields of a Job to preserve from the split
# Here, Tasks already knows about GenericSplitter and knows that we want to change the 'application' object for each Unit/Master Job
trf.unit_splitter = GenericSplitter()
trf.unit_splitter.attribute = "application.args"
trf.unit_splitter.values = [ 'arg 1', 'arg 2', 'arg 3' ]
# Append the tranform
t.appendTransform( trf )
# set the maximum number of active jobs to have running (allows for throttling)
t.float = 100
# run the Task
t.run()

After running the above commands you won't see much happen initially as Tasks runs on a separate monitoring loop that triggers every 30s (configurable in .gangarc). Eventually though you will see the units created and then jobs for each of these units will be submitted. To see the progress of your tasks use:

tasks
tasks(0).overview()

Tasks can also take advantage of using queues for submission as well. Simply add:

# note - done at the transform level rather than task level as different backends may not need it
trf.max_active_threads = 10  # optional - specifies the max number of submissions to queue up
trf.submit_with_threads = True

Job Chaining

FAQ

Submission seems quite slow - how can I speed it up?

During submisssion, Ganga is simply acting as a nice frontend/wrapper for whatever underlying submission command the backend uses. Consequently, if this can take a while to submit, Ganga can't magically speed this up. However, there are a couple of things you can do alleviate the problem:

  • Use queues to submit several jobs in parallel [1]
  • Use a splitter to take advantage of bulk submission (note the backend needs to support this which is only really LCG/WMS at present) [2]

I'm using Tasks and it takes ~1min per job to submit - can I submit more jobs at once?

The Task monitoring loop runs every 60s and by default, it will abort the loop after a submission attempt. This is because if you have a slow backend you don't want to be waiting for all the jobs to submit (which could be very high) before you can quit Ganga. However, this may not be wanted if you have fast backend or you may just want to speed this up as much as possible anyway. In which case, you ca ndo the following:

  • Get Tasks to use queues by specifying:
trf.submit_with_threads = True
  • Have Tasks *not* abort the monitoring loop after a submission:
trf.abort_loop_on_submit = False 
  • Speed up the monitoring loop frequency from the config:
[Tasks]
TaskLoopFrequency = 60



Updated in response to request from the Ganga developers.