/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

#include <pbs_config.h> /* the master config generated by configure */
#ifdef PYTHON
#include <pbs_python_private.h>
#include <Python.h>
#endif

#include <unistd.h>
#include <dirent.h>
#include <pwd.h>
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "dis.h"
#include "libpbs.h"
#include "list_link.h"
#include "server_limits.h"
#include "attribute.h"
#include "resource.h"
#include "job.h"
#include "log.h"
#include "work_task.h"
#include "credential.h"
#include "batch_request.h"
#include "net_connect.h"
#include "pbs_nodes.h"
#include "svrfunc.h"
#include "mom_mach.h"
#include "mom_func.h"
#include "mom_server.h"
#include "mom_vnode.h"
#include "pbs_error.h"
#include "tpp.h"
#include "mom_hook_func.h"
#include "placementsets.h"
#include "hook.h"
#include "renew_creds.h"
#include "mock_run.h"
#include <libutil.h>

/**
 * @file	catch_child.c
 */
/* External Functions */
void (*free_job_CPUs)(job *) = NULL;

/* External Globals */

extern char mom_host[];
extern char *path_epilog;
extern char *path_jobs;
extern unsigned int default_server_port;
extern pbs_list_head svr_alljobs;
extern int exiting_tasks;
extern char *msg_daemonname;
extern char *mom_home;
#ifndef WIN32
extern int termin_child;
#endif
extern int server_stream;
extern time_t time_now;
extern pbs_list_head mom_polljobs;
extern unsigned int pbs_mom_port;
extern int gen_nodefile_on_sister_mom;
#if MOM_ALPS
extern useconds_t alps_release_wait_time;
extern int alps_release_timeout;
extern useconds_t alps_release_jitter;
#endif

extern char *path_hooks_workdir;

#ifndef WIN32
/**
 * @brief
 *	catch_child() - the signal handler for  SIGCHLD.
 *
 * @param[in] sig - signal number
 *
 * @par To keep the signal handler simple for
 *	SIGCHLD  - just indicate there was one.
 *
 * @return Void
 *
 */
void
catch_child(int sig)
{
	termin_child = 1;
}
#endif

/**
 * @brief
 *	returns execution node info for job pjob
 *
 * @param[in] pjob - job pointer to job
 * @param[in] nodeid - nodeid on which  job execs
 *
 * @return hnodent *
 * @retval  hostdetails  SUCCESS
 * @retval  NULL  	 Failure
 *
 */
hnodent *
get_node(job *pjob, tm_node_id nodeid)
{
	int i;
	vmpiprocs *vp = pjob->ji_vnods;

	for (i = 0; i < pjob->ji_numvnod; i++, vp++) {
		if (vp->vn_node == nodeid)
			return vp->vn_host;
	}
	return NULL;
}

/**
 * @brief
 *	Restart each task which has exited and has TI_FLAGS_CHKPT turned on.
 *	If all tasks have been restarted, turn off MOM_CHKPT_POST.
 *
 * @param[in] pjob - pointer to job structure
 *
 * @return Void
 *
 */
void
chkpt_partial(job *pjob)
{
	int i;
	char namebuf[MAXPATHLEN + 1];
	char *filnam;
	pbs_task *ptask;
	int texit = 0;
	extern char task_fmt[];
	extern char *path_checkpoint;

	assert(pjob != NULL);

	pbs_strncpy(namebuf, path_checkpoint, sizeof(namebuf));
	if (*pjob->ji_qs.ji_fileprefix != '\0')
		strcat(namebuf, pjob->ji_qs.ji_fileprefix);
	else
		strcat(namebuf, pjob->ji_qs.ji_jobid);
	strcat(namebuf, JOB_CKPT_SUFFIX);

	i = strlen(namebuf);
	filnam = &namebuf[i];

	pjob->ji_sampletim = 0; /* reset sampletime for cpupercent */

	for (ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
	     ptask != NULL;
	     ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask)) {
		/*
		 ** See if the task was marked as one of those that did
		 ** actually checkpoint.
		 */
		if ((ptask->ti_flags & TI_FLAGS_CHKPT) == 0)
			continue;
		texit++;
		/*
		 ** Now see if it was reaped.  We don't want to
		 ** fool with it until we see it die.
		 */
		if (ptask->ti_qs.ti_status != TI_STATE_EXITED)
			continue;
		texit--;

		sprintf(filnam, task_fmt, ptask->ti_qs.ti_task);

		/*
		 **	Try action script with no post function.
		 */
		i = do_mom_action_script(RestartAction, pjob, ptask,
					 namebuf, NULL);
		if (i != 0) { /* script failed */
			/* if there is no script, try native support */
			if (i == -2)
				i = mach_restart(ptask, namebuf);
			if (i != 0) /* everything failed */
				goto fail;
		}

		ptask->ti_qs.ti_status = TI_STATE_RUNNING;
		/*
		 ** Turn off TI_FLAGS_CHKPT if TI_FLAGS_SAVECKP is off.
		 ** Turn off TI_FLAGS_SAVECKP if it is on.
		 */
		if ((ptask->ti_flags & TI_FLAGS_SAVECKP) == 0)
			ptask->ti_flags &= ~TI_FLAGS_CHKPT;
		else
			ptask->ti_flags &= ~TI_FLAGS_SAVECKP;
		(void) task_save(ptask);
	}

	if (texit == 0) {
		char oldname[MAXPATHLEN + 1];
		struct stat statbuf;

		/*
		 ** All tasks should now be running.
		 ** Turn off MOM_CHKPT_POST and MOM_CHKPT_ACTIVE flags.
		 ** Job is back to where it was before the bad checkpoint
		 ** attempt.
		 */
		pjob->ji_flags &= ~MOM_CHKPT_POST;
		pjob->ji_flags &= ~MOM_CHKPT_ACTIVE;
		/*
		 ** Get rid of incomplete checkpoint directory and
		 ** move old chkpt dir back to regular if it exists.
		 */
		*filnam = '\0';
		(void) remtree(namebuf);
		strcpy(oldname, namebuf);
		strcat(oldname, ".old");
		if (stat(oldname, &statbuf) == 0) {
			if (rename(oldname, namebuf) == -1)
				pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_CHKPT;
		}
	}
	return;

fail:
	/*
	 ** If we cannot restart a task from a partially failed checkpoint,
	 ** the job will be killed.
	 */
	log_joberr(errno, __func__, "failed to restart", pjob->ji_qs.ji_jobid);
	pjob->ji_flags &= ~MOM_CHKPT_POST;
	(void) kill_job(pjob, SIGKILL);
	return;
}

/**
 * @brief
 * 	update jobs rescused to the server
 *
 * @return void
 *
 */
void
update_jobs_status(void)
{
	job *pjob = (job *) GET_NEXT(svr_alljobs);
	for (; pjob; pjob = (job *) GET_NEXT(pjob->ji_alljobs)) {
		if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0)
			continue;
		if (!check_job_substate(pjob, JOB_SUBSTATE_RUNNING))
			continue;
		enqueue_update_for_send(pjob, IS_RESCUSED);
	}
}

/**
 * @brief
 * 	send_obit - routine called following completion of epilogue process
 *	Job then moved into substate OBIT and Obit message sent to server over TPP stream
 *
 * @param[in] pjob - pointer to job structure
 * @param[in] exval - exit value
 *
 * @return Void
 *
 */
void
send_obit(job *pjob, int exval)
{
#ifndef WIN32
	/* update pjob with values set from an epilogue hook */
	/* since these are hooks that are executing in a child process */
	/* and changes inside the child will not be reflected in main */
	/* mom */
	if (num_eligible_hooks(HOOK_EVENT_EXECJOB_EPILOGUE) > 0) {
		char hook_outfile[MAXPATHLEN + 1];
		struct stat stbuf;

		snprintf(hook_outfile, MAXPATHLEN, FMT_HOOK_JOB_OUTFILE, path_hooks_workdir, pjob->ji_qs.ji_jobid);
		if (stat(hook_outfile, &stbuf) == 0) {
			pbs_list_head vnl_changes;
			int reject_deletejob = 0;
			int reject_rerunjob = 0;
			int accept_flag = 1;

			CLEAR_HEAD(vnl_changes);
			if (get_hook_results(hook_outfile, &accept_flag, NULL, NULL, 0,
					     &reject_rerunjob, &reject_deletejob, NULL,
					     NULL, 0, &vnl_changes, pjob,
					     NULL, 0, NULL) != 0) {
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK, LOG_ERR, __func__, "Failed to get epilogue hook results");
				vna_list_free(vnl_changes);
			} else {
				/* Delete job or reject job actions */
				/* NOTE: Must appear here before vnode changes, */
				/* since this action will be sent whether hook  */
				/* script executed by PBSADMIN or PBSUSER.      */
				if (reject_deletejob) {
					/* deletejob takes precedence */
					new_job_action_req(pjob, HOOK_PBSADMIN, JOB_ACT_REQ_DELETE);
				} else if (reject_rerunjob) {
					new_job_action_req(pjob, HOOK_PBSADMIN, JOB_ACT_REQ_REQUEUE);
				} else if (!accept_flag) {
					/* Per EDD on a pbs.event().reject() from an */
					/* epilogue hook, must delete the job. */
					new_job_action_req(pjob, HOOK_PBSADMIN, JOB_ACT_REQ_DELETE);
				}

				/* Whether or not we accept or reject, we'll make */
				/* job changes, vnode changes, job actions */
				enqueue_update_for_send(pjob, IS_RESCUSED_FROM_HOOK);

				/* Push vnl hook changes to server */
				hook_requests_to_server(&vnl_changes);
			}
			/* need to clear out hook_outfile, */
			/* as epilogue hook processing  */
			/* in mom_process_hooks() will end up appending to */
			/* this same file when job is rerun, resulting in */
			/* duplicate actions. */
			unlink(hook_outfile);
		}
	}
#endif

	if (is_jattr_set(pjob, JOB_ATR_run_version)) {
		DBPRT(("send_obit: job %s run_version %ld exval %d\n",
		       pjob->ji_qs.ji_jobid, get_jattr_long(pjob, JOB_ATR_run_version), exval))
	} else {
		DBPRT(("send_obit: job %s runcount %ld exval %d\n",
		       pjob->ji_qs.ji_jobid, get_jattr_long(pjob, JOB_ATR_runcount), exval))
	}

	pjob->ji_mompost = NULL;
	if (!check_job_substate(pjob, JOB_SUBSTATE_OBIT)) {
		set_job_substate(pjob, JOB_SUBSTATE_OBIT);
		job_save(pjob);
	}

	pjob->ji_sampletim = time_now; /* when obit sent to server */
	/* epilogue script exit of 2 means requeue for	*/
	/* chkpt/restart if job was checkpointed	*/
	if (exval == 2 && (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT))
		pjob->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_QUERST;
	if (enqueue_update_for_send(pjob, IS_JOBOBIT) != 0)
		log_joberr(PBSE_SYSTEM, __func__, "Failed to enque job obit", pjob->ji_qs.ji_jobid);
	log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "Obit sent");
}

/**
 * @brief
 * 	Look for job tasks that have terminated (see scan_for_terminating),
 *	and for each task, find which job the task was part, and if the top
 *	shell, start end of job processing by running the epilogue.
 *
 * @return Void
 *
 */

void
scan_for_exiting(void)
{

	pid_t cpid;
	int i;
	int extval;
	int found_one = 0;
	u_long hours, mins, secs;
	job *nxjob;
	job *pjob;
	pbs_task *ptask;
	obitent *pobit;
	char *cookie;
	u_long gettime(resource * pres);
	u_long getsize(resource * pres);
	int im_compose(int, char *, char *, int, tm_event_t, tm_task_id, int);
	mom_hook_input_t hook_input;
	int has_epilog = 0;
	int update_svr = 0;

#ifdef WIN32
	/* update the latest intelligence about the running jobs; */
	time_now = time(NULL);
	mom_set_use_all();
	update_svr = 1;
#endif
	if (num_eligible_hooks(HOOK_EVENT_EXECJOB_EPILOGUE) > 0 || file_exists(path_epilog))
		has_epilog = 1;

	/*
	 ** Look through the jobs.  Each one has it's tasks examined
	 ** and if the job is EXITING, it meets it's fate depending
	 ** on whether this is the Mother Superior or not.
	 */
	for (pjob = (job *) GET_NEXT(svr_alljobs); pjob; pjob = nxjob) {
		nxjob = (job *) GET_NEXT(pjob->ji_alljobs);

		if (pjob->ji_numnodes > 1 && !pjob->ji_msconnected && pjob->ji_nodeid) /* assume that MS has a connection to itself at all times */
			continue;

		/*
		 ** If a restart is active, skip this job since
		 ** not all of the tasks may have started yet.
		 */
		if (pjob->ji_flags & MOM_RESTART_ACTIVE) {
			continue;
		}
		/*
		 ** If a checkpoint with aborts is active,
		 ** skip it.  We don't want to report any obits
		 ** until we know that the whole thing worked.
		 */
		if ((pjob->ji_flags & MOM_CHKPT_ACTIVE) &&
		    (pjob->ji_mompost != NULL)) {
			continue;
		}
		/*
		 ** If the job has had an error doing a checkpoint with
		 ** abort, the MOM_CHKPT_POST flag will be on.
		 */
		if (pjob->ji_flags & MOM_CHKPT_POST) {
			chkpt_partial(pjob);
			continue;
		}

		if (is_jattr_set(pjob, JOB_ATR_Cookie))
			cookie = get_jattr_str(pjob, JOB_ATR_Cookie);
		else
			cookie = NULL;

		/*
		 ** Check each EXITED task.  They transistion to DEAD here.
		 */
		for (ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
		     ptask != NULL;
		     ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask)) {
			if (ptask->ti_qs.ti_status != TI_STATE_EXITED)
				continue;
			/*
			 ** Check if it is the top shell.
			 */
			if (ptask->ti_qs.ti_parenttask == TM_NULL_TASK) {
				int *exitstat =
					&pjob->ji_qs.ji_un.ji_momt.ji_exitstat;

				set_job_state(pjob, JOB_STATE_LTR_EXITING);
				set_job_substate(pjob, JOB_SUBSTATE_KILLSIS);
				if (*exitstat >= 0)
					*exitstat = ptask->ti_qs.ti_exitstat;
				log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB,
					  LOG_INFO,
					  pjob->ji_qs.ji_jobid, "Terminated");

				if (send_sisters(pjob, IM_KILL_JOB, NULL) == 0) {
					set_job_substate(pjob, JOB_SUBSTATE_EXITING);
					/*
					 ** if the job was checkpointed ok,
					 ** reset ji_nodekill to prevent mom_comm
					 ** error on restart resulting in job
					 ** being killed.
					 */
					if ((pjob->ji_flags & MOM_CHKPT_ACTIVE) &&
					    !(pjob->ji_flags & MOM_CHKPT_POST) &&
					    (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT))
						pjob->ji_nodekill = TM_ERROR_NODE;
				}
			}
			/*
			 ** Go through any TM client obits waiting.
			 */
			for (pobit = (obitent *) GET_NEXT(ptask->ti_obits);
			     pobit != NULL;
			     pobit = (obitent *) GET_NEXT(ptask->ti_obits)) {

				hnodent *pnode;

				/* see if this is a batch request */
				if (pobit->oe_type == OBIT_TYPE_BREVENT) {
					pobit->oe_u.oe_preq->rq_reply.brp_code =
						PBSE_NONE;
					pobit->oe_u.oe_preq->rq_reply.brp_auxcode =
						ptask->ti_qs.ti_exitstat;
					pobit->oe_u.oe_preq->rq_reply.brp_choice =
						BATCH_REPLY_CHOICE_NULL;
					(void) reply_send(pobit->oe_u.oe_preq);
					goto end_loop;
				}

				pnode = get_node(pjob, pobit->oe_u.oe_tm.oe_node);

				/* see if this is mother superior or a sister */
				if (pjob->ji_nodeid == pnode->hn_node) {
					pbs_task *tp;

					/* Send response locally */
					tp = task_find(pjob, pobit->oe_u.oe_tm.oe_taskid);
					if (pobit->oe_u.oe_tm.oe_fd != -1) {
						assert(tp != NULL);
						(void) tm_reply(pobit->oe_u.oe_tm.oe_fd,
								tp->ti_protover, IM_ALL_OKAY,
								pobit->oe_u.oe_tm.oe_event);
						(void) diswsi(pobit->oe_u.oe_tm.oe_fd,
							      ptask->ti_qs.ti_exitstat);
						(void) dis_flush(pobit->oe_u.oe_tm.oe_fd);
					}
				} else if (pnode->hn_stream != -1 &&
					   cookie != NULL) {
					/*
					 * Send a response over to MOM
					 * whose child sent the request
					 */
					(void) im_compose(pnode->hn_stream,
							  pjob->ji_qs.ji_jobid,
							  cookie, IM_ALL_OKAY,
							  pobit->oe_u.oe_tm.oe_event,
							  pobit->oe_u.oe_tm.oe_taskid, IM_OLD_PROTOCOL_VER);
					(void) diswsi(pnode->hn_stream,
						      ptask->ti_qs.ti_exitstat);
					(void) dis_flush(pnode->hn_stream);
				}

			end_loop:
				delete_link(&pobit->oe_next);
				free(pobit);
			}
			ptask->ti_qs.ti_status = TI_STATE_DEAD;

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
			AFSLOG_TERM(ptask);
#endif

			/*
			 ** KLUDGE
			 ** We need to save the value of the sid here just
			 ** in case it is exiting from a checkpoint/abort
			 ** and it will be restarted later.  Just set it
			 ** to the negative of itself.
			 */
			if (ptask->ti_qs.ti_sid <= 1) {
				ptask->ti_qs.ti_sid = 0;
			} else
				ptask->ti_qs.ti_sid = -ptask->ti_qs.ti_sid;
			task_save(ptask);
		}

		/*
		 ** Look to see if the job has terminated.  If it is
		 ** in any state other than EXITING continue on.
		 */
		if (!check_job_substate(pjob, JOB_SUBSTATE_EXITING))
			continue;

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
		/* job in state JOB_SUBSTATE_EXITING destroy creds */
		if (cred_by_job(pjob, CRED_DESTROY) != PBS_KRB5_OK) {
			log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid,
				   "failed to destroy credentials");
		}
#endif

		/*
		 ** This job is exiting.  If MOM_CHKPT_ACTIVE is on, it
		 ** is time to turn if off.
		 */
		pjob->ji_flags &= ~MOM_CHKPT_ACTIVE;

		/*
		 ** Once a job is exiting each task that is done running
		 ** gets a log message for the cpu and mem usage.
		 */
		ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
		while (ptask != NULL) {
			secs = ptask->ti_cput;
			hours = secs / 3600;
			secs -= hours * 3600;
			mins = secs / 60;
			secs -= mins * 60;
			sprintf(log_buffer,
				"task %8.8X cput=%02lu:%2.2lu:%2.2lu",
				ptask->ti_qs.ti_task,
				hours, mins, secs);
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
				  LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
			ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask);
		}

		/*
		 ** Look to see if I am a regular sister.  If so,
		 ** check to see if there is a obit event to
		 ** send back to mother superior.
		 ** Otherwise, I need to wait for her to send a KILL_JOB
		 ** so I can send the obit (unless she died).
		 */
		if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
			int stream = (pjob->ji_hosts == NULL) ? -1 : pjob->ji_hosts[0].hn_stream;

			/*
			 ** Check to see if I'm still in touch with
			 ** the head office.  If not, I'm just going to
			 ** get rid of this job.
			 */
			if (stream == -1) {
				(void) kill_job(pjob, SIGKILL);
				if ((pjob->ji_qs.ji_svrflags &
				     (JOB_SVFLG_CHKPT | JOB_SVFLG_ChkptMig)) == 0) {
					mom_deljob(pjob);
				}
				continue;
			}

			/*
			 * No event waiting for sending info to MS
			 * so I'll just sit tight.
			 */
			if (pjob->ji_obit == TM_NULL_EVENT)
				continue;

			/* Check to see if any tasks are running */
			ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
			while (ptask != NULL) {
				if (ptask->ti_qs.ti_status == TI_STATE_RUNNING)
					break;
				ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask);
			}
			/* Still somebody there so don't send it yet. */
			if (ptask != NULL)
				continue;
			/* No tasks running. Format and send a reply to the mother superior */
			if (cookie != NULL) {
				(void) im_compose(stream, pjob->ji_qs.ji_jobid,
						  cookie, IM_ALL_OKAY,
						  pjob->ji_obit, TM_NULL_TASK, IM_OLD_PROTOCOL_VER);
				(void) diswul(stream,
					      resc_used(pjob, "cput", gettime));
				(void) diswul(stream,
					      resc_used(pjob, "mem", getsize));
				(void) diswul(stream,
					      resc_used(pjob, "cpupercent", gettime));
				(void) send_resc_used_to_ms(stream, pjob);
				(void) dis_flush(stream);
				pjob->ji_obit = TM_NULL_EVENT;
			}
			continue;
		}

		/*
		 * At this point, we know we are Mother Superior for this
		 * job which is EXITING.  Time for it to die.
		 */
		pjob->ji_qs.ji_svrflags &= ~(JOB_SVFLG_Suspend |
					     JOB_SVFLG_Actsuspd);
		if (pjob->ji_qs.ji_un.ji_momt.ji_exitstat != JOB_EXEC_INITABT)
			(void) kill_job(pjob, SIGKILL);
		delete_link(&pjob->ji_jobque); /* unlink from poll list */

		/*
		 * The SISTER_KILLDONE flag needs to be reset so
		 * we can talk to the sisterhood.
		 */
		for (i = 0; i < pjob->ji_numnodes; i++) {
			hnodent *np = &pjob->ji_hosts[i];

			if (np->hn_node == pjob->ji_nodeid) /* me */
				continue;

			if (np->hn_sister == SISTER_KILLDONE)
				np->hn_sister = SISTER_OKAY;
		}

		/* Job termination begins */

		/* stop counting walltime */
		stop_walltime(pjob);

		/* summary for MS */
		secs = resc_used(pjob, "cput", gettime);
		hours = secs / 3600;
		secs -= hours * 3600;
		mins = secs / 60;
		secs -= mins * 60;
		sprintf(log_buffer,
			"%s cput=%02lu:%2.2lu:%2.2lu mem=%lukb",
			mom_short_name, hours, mins, secs,
			resc_used(pjob, "mem", getsize));
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
			  LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);

		/* summary for other nodes */
		for (i = 0; i < pjob->ji_numrescs; i++) {
			noderes *nr = &pjob->ji_resources[i];
			secs = nr->nr_cput;

			hours = secs / 3600;
			secs -= hours * 3600;
			mins = secs / 60;
			secs -= mins * 60;

			/*
			 ** ji_hosts starts with node 0 (MS)
			 ** ji_resource starts with node 1
			 */
			sprintf(log_buffer,
				"%s cput=%02lu:%2.2lu:%2.2lu mem=%lukb",
				pjob->ji_resources[i].nodehost ? pjob->ji_resources[i].nodehost : "",
				hours, mins, secs, nr->nr_mem);
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
				  LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
		}

		/*
		 ** Do dependent end of job processing if it needs to be
		 ** done.
		 */
		if (job_end_final != NULL)
			job_end_final(pjob);

		if (mock_run || !has_epilog) {
			send_obit(pjob, 0);
			continue;
		}

		/*
		 * Parent:
		 *  +  fork child process to run epilogue,
		 *  +  look for more terminated jobs.
		 * Child:
		 *  +  Run the epilogue script
		 */

		cpid = fork_me(-1);
		if (cpid > 0) {
			pjob->ji_sampletim = 0;
			pjob->ji_momsubt = cpid;
			pjob->ji_actalarm = 0;
			pjob->ji_mompost = send_obit;
			set_job_substate(pjob, JOB_SUBSTATE_RUNEPILOG);

			if (found_one++ < 20) {
				continue; /* look for more exiting jobs */
			} else {
				break; /* 20 exiting jobs at a time is our limit */
			}
		} else if (cpid < 0 && errno != ENOSYS)
			continue; /* curses, failed again */

		if (pjob->ji_grpcache) {
			if ((is_jattr_set(pjob, JOB_ATR_sandbox)) && (strcasecmp(get_jattr_str(pjob, JOB_ATR_sandbox), "PRIVATE") == 0)) {
				/* in "sandbox=PRIVATE" mode so run epilogue in PBS_JOBDIR */
				if (chdir(jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir)) == -1) 
					log_errf(-1, __func__, "chdir failed. ERR : %s", strerror(errno));
			} else {
				/* else run in usr's home */
				if (chdir(pjob->ji_grpcache->gc_homedir) == -1) 
					log_errf(-1, __func__, "chdir failed. ERR : %s", strerror(errno));
			}
		}

		extval = 0;
		if (num_eligible_hooks(HOOK_EVENT_EXECJOB_EPILOGUE) > 0) {
			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;
			(void) mom_process_hooks(HOOK_EVENT_EXECJOB_EPILOGUE, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, NULL, NULL, 0, update_svr);
		} else {
			if ((is_jattr_set(pjob, JOB_ATR_interactive)) && get_jattr_long(pjob, JOB_ATR_interactive)) {
				extval = run_pelog(PE_EPILOGUE, path_epilog, pjob, PE_IO_TYPE_NULL);
			} else {
				extval = run_pelog(PE_EPILOGUE, path_epilog, pjob, PE_IO_TYPE_STD);
			}
		}
		if (extval != 2)
			extval = 0;

		if (!cpid)
			/* if we are child exit and parent will do send_obit() */
			exit(extval);

		send_obit(pjob, i);
		/* restore MOM's home if we are foreground */
		if (chdir(mom_home) == -1) 
			log_errf(-1, __func__, "chdir failed. ERR : %s", strerror(errno));
	}
	if (pjob == NULL)
		exiting_tasks = 0; /* went through all jobs */
}

/**
 * @brief
 * 		choosing the server to connect to if a failover server is already set up.
 * 		Selects primary and secondary alternatively.
 *
 * @param[out] port - Passed through to parse_servername(), not modified here.
 *
 * @return char *
 * @return NULL - failure
 * @retval !NULL - pointer to server name
 */
static char *
get_servername_failover(unsigned int *port)
{
	static int whom_to_connect = 0;

	if (!pbs_conf.pbs_secondary)
		return get_servername(port);
	else {
		whom_to_connect = !whom_to_connect;

		if (whom_to_connect)
			return get_servername(port);
		else
			return parse_servername(pbs_conf.pbs_secondary, port);
	}
}

/**
 * @brief
 * 	send IS_HELLOSVR message to Server.
 *
 * @param[in]	stream	- connection stream
 *
 * @par
 *	Open a connection stream to the named server/port if not already exists,
 *	compose the IS_HELLOSVR, flush the stream and remember for future use.
 *
 *
 * @return	void
 *
 */

void
send_hellosvr(int stream)
{
	int rc = 0;
	char *svr = NULL;
	unsigned int port = default_server_port;
	extern int mom_net_up;

	if (mom_net_up == 0)
		return;

	if (stream < 0) {
		if ((svr = get_servername_failover(&port)) == NULL) {
			log_err(errno, msg_daemonname, "get_servername_failover() failed");
			return;
		}

		stream = tpp_open(svr, port);
		if (stream < 0) {
			log_errf(errno, msg_daemonname, "tpp_open(%s, %d) failed", svr, port);
			return;
		}
	}

	if ((rc = is_compose(stream, IS_HELLOSVR)) != DIS_SUCCESS)
		goto err;

	if ((rc = diswui(stream, pbs_mom_port)) != DIS_SUCCESS)
		goto err;
	if ((rc = dis_flush(stream)) != DIS_SUCCESS)
		goto err;

	server_stream = stream;

	if (svr)
		sprintf(log_buffer, "HELLO sent to server at %s:%d, stream:%d", svr, port, stream);
	else
		sprintf(log_buffer, "HELLO sent to server at stream:%d", stream);
	log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, LOG_NOTICE,
		  msg_daemonname, log_buffer);
	return;

err:
	if (svr)
		log_errf(errno, msg_daemonname, "Failed to send HELLO at %s:%d", svr, port);
	else
		log_errf(errno, msg_daemonname, "Failed to send HELLO at stream:%d", stream);
	tpp_close(stream);
	return;
}

/**
 * @brief
 *	On mom initialization, recover all running jobs.
 *
 *	Called on initialization
 *	   If the -p option was given (recover = 2), Mom will allow the jobs
 *	   to continue to run.   She depends on detecting when they terminate
 *	   via the slow poll method rather than SIGCHLD.
 *
 *	   If the -r option was given (recover = 1), MOM is recovering on a
 *  	   running system and the session id of the jobs should be valid;
 *	   the jobs are killed.
 *
 *	   If -r was not given (recover = 0), it is assumed that the whole
 *	   system, not just MOM, is comming up, the session ids are not valid;
 *	   so no attempt is made to kill the job processes.  But the jobs are
 *	   terminated and requeued.
 *
 * @param [in]	recover - Specify recovering mode for MoM.
 * @param [in]	multinode_jobs - Pointer to list of pointers to recovered multinode jobs
 *
 */

void
init_abort_jobs(int recover, pbs_list_head *multinode_jobs)
{
	DIR *dir;
	int i, sisters;
	struct dirent *pdirent;
	job *pj = NULL;
	char *job_suffix = JOB_FILE_SUFFIX;
	int job_suf_len = strlen(job_suffix);
	char *psuffix;
	char path[MAXPATHLEN + 1];
	char oldp[MAXPATHLEN + 1];
	char rcperr[] = "rcperr.";
	struct stat statbuf;
	extern char *path_checkpoint;
	extern char *path_spool;

	CLEAR_HEAD((*multinode_jobs));

	dir = opendir(path_jobs);
	if (dir == NULL) {
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ALERT,
			  msg_daemonname, "Jobs directory not found");
		exit(1);
	}
	while (errno = 0, (pdirent = readdir(dir)) != NULL) {
		if ((i = strlen(pdirent->d_name)) <= job_suf_len)
			continue;

		psuffix = pdirent->d_name + i - job_suf_len;
		if (strcmp(psuffix, job_suffix))
			continue;
		pj = job_recov(pdirent->d_name);
		if (pj == NULL) {
			(void) strcpy(path, path_jobs);
			(void) strcat(path, pdirent->d_name);
			(void) unlink(path);
			psuffix = path + strlen(path) - job_suf_len;
			strcpy(psuffix, JOB_TASKDIR_SUFFIX);
			(void) remtree(path);
			continue;
		}

		/* To get homedir info */
		pj->ji_grpcache = NULL;
		check_pwd(pj);
		if (pbs_idx_insert(jobs_idx, pj->ji_qs.ji_jobid, pj) != PBS_IDX_RET_OK) {
			log_joberr(PBSE_INTERNAL, __func__, "Failed to add job in index during recovery", pj->ji_qs.ji_jobid);
			job_free(pj);
			continue;
		}
		append_link(&svr_alljobs, &pj->ji_alljobs, pj);
		job_nodes(pj);
		task_recov(pj);

		/*
		 ** Check to see if a checkpoint.old dir exists.
		 ** If so, remove the regular checkpoint dir
		 ** and rename the old to the regular name.
		 */
		pbs_strncpy(path, path_checkpoint, sizeof(path));
		if (*pj->ji_qs.ji_fileprefix != '\0')
			strcat(path, pj->ji_qs.ji_fileprefix);
		else
			strcat(path, pj->ji_qs.ji_jobid);
		strcat(path, JOB_CKPT_SUFFIX);
		strcpy(oldp, path);
		strcat(oldp, ".old");

		if (stat(oldp, &statbuf) == 0) {
			(void) remtree(path);
			if (rename(oldp, path) == -1)
				(void) remtree(oldp);
		}

		/*
		 ** Check to see if I am Mother Superior.  The
		 ** JOB_SVFLG_HERE flag is overloaded for MOM
		 ** for this purpose.
		 */
		if ((pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
			/* I am sister, junk the job files */
			if (recover != 2) {
				mom_deljob(pj);
				continue;
			}
		}

		sisters = pj->ji_numnodes - 1;
		if (sisters > 0) {
			pj->ji_resources = (noderes *) calloc(sisters,
							      sizeof(noderes));
			if (pj->ji_resources == NULL) {
				log_err(ENOMEM, "init_abort_jobs", "out of memory");
				continue;
			}
			pj->ji_numrescs = sisters;
		}

		/*
		 **	If mom went down during file stage ops,
		 **	the substate should be EXITED.  Set it
		 **	back to OBIT so the server can verify that
		 **	it still has the job or not.
		 */
		if (check_job_substate(pj, JOB_SUBSTATE_EXITED)) {
			/*
			 ** We don't want to change the state if the
			 ** job is checkpointed.
			 */
			if ((pj->ji_qs.ji_svrflags &
			     (JOB_SVFLG_CHKPT |
			      JOB_SVFLG_ChkptMig)) == 0) {
				set_job_substate(pj, JOB_SUBSTATE_OBIT);
				job_save(pj);
			}
		} else if (check_job_substate(pj, JOB_SUBSTATE_TERM)) {
			/*
			 * Mom went down while terminate action script was
			 * running, don't know if it finished or not;  force
			 * Mom to send/resend OBIT and lets end it
			 */
			if (recover)
				(void) kill_job(pj, SIGKILL);
			set_job_substate(pj, JOB_SUBSTATE_OBIT);
			job_save(pj);
		} else if ((recover != 2) &&
			   ((check_job_substate(pj, JOB_SUBSTATE_RUNNING)) ||
			    (check_job_substate(pj, JOB_SUBSTATE_SUSPEND)) ||
			    (check_job_substate(pj, JOB_SUBSTATE_KILLSIS)) ||
			    (check_job_substate(pj, JOB_SUBSTATE_RUNEPILOG)) ||
			    (check_job_substate(pj, JOB_SUBSTATE_EXITING)))) {

			if (recover)
				(void) kill_job(pj, SIGKILL);

			/* set exit status to:
			 *   JOB_EXEC_INITABT - init abort and no chkpnt
			 *   JOB_EXEC_INITRST - init and chkpt, no mig
			 *   JOB_EXEC_INITRMG - init and chkpt, migrate
			 * to indicate recovery abort
			 */
			if (pj->ji_qs.ji_svrflags &
			    (JOB_SVFLG_CHKPT |
			     JOB_SVFLG_ChkptMig)) {
#if PBS_CHKPT_MIGRATE
				pj->ji_qs.ji_un.ji_momt.ji_exitstat =
					JOB_EXEC_INITRMG;
#else
				pj->ji_qs.ji_un.ji_momt.ji_exitstat =
					JOB_EXEC_INITRST;
#endif
			} else {
				pj->ji_qs.ji_un.ji_momt.ji_exitstat =
					JOB_EXEC_INITABT;
			}

			/*
			 ** I am MS, send a DELETE_JOB request to any
			 ** sisters that happen to still be alive.
			 */
			if (sisters > 0) {
				(void) send_sisters(pj, IM_DELETE_JOB, NULL);
			}
			set_job_substate(pj, JOB_SUBSTATE_EXITING);
			job_save(pj);
			exiting_tasks = 1;
		} else if (recover == 2) {
			pbs_task *ptask;

			for (ptask = (pbs_task *) GET_NEXT(pj->ji_tasks);
			     ptask != NULL;
			     ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask)) {
				ptask->ti_flags |= TI_FLAGS_ORPHAN;
			}

			if (check_job_substate(pj, JOB_SUBSTATE_RUNNING)) {
				recover_walltime(pj);
				start_walltime(pj);
			}

			if (mom_do_poll(pj))
				append_link(&mom_polljobs, &pj->ji_jobque, pj);

			if (sisters > 0)
				append_link(multinode_jobs, &pj->ji_multinodejobs, pj);

			if (pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {
				/* I am MS */
				pj->ji_stdout = pj->ji_ports[0] = pj->ji_extended.ji_ext.ji_stdout;
				pj->ji_stderr = pj->ji_ports[1] = pj->ji_extended.ji_ext.ji_stdout;
			}
		}
	}
	if (errno != 0 && errno != ENOENT) {
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ALERT,
			  msg_daemonname, "Jobs directory cannot be read");
		(void) closedir(dir);
		exit(1);
	}
	(void) closedir(dir);

	/*
	 ** Go through spool dir and remove files that match
	 ** "rcperr.<pid>".  These would be leftover from file
	 ** stage operations that were interrupted.
	 */
	dir = opendir(path_spool);
	if (dir == NULL) {
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ALERT,
			  msg_daemonname, "spool directory not found");
		return;
	}

	while (errno = 0, (pdirent = readdir(dir)) != NULL) {
		if (strncmp(pdirent->d_name, rcperr, sizeof(rcperr) - 1) != 0)
			continue;

		(void) strcpy(path, path_spool);
		(void) strcat(path, pdirent->d_name);
		(void) unlink(path);
	}
	if (errno != 0 && errno != ENOENT)
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ALERT,
			  msg_daemonname, "spool directory cannot be read");
	(void) closedir(dir);
}

/**
 * @brief
 * 	static handler function to be called by deferred child exit work task
 * 	for alps cancel reservation child of mom
 *
 * 	The forked child process cannot send a req_reject or reply_ack since
 * 	transmission of data via tpp is not supported from child processes
 * 	(tpp streams are automatically closed when proccess forks).
 * 	Thus this child exit handler is added to send the reply from the
 * 	parent process after reaping the exit status from child
 *
 * @param[in] ptask - Pointer to the task structure
 *
 */
#if MOM_ALPS
static void
post_alps_cancel_resv(struct work_task *ptask)
{
	struct batch_request *preq = ptask->wt_parm1;
	int j;

	if (preq == NULL)
		return;

	j = ptask->wt_aux;
	if (j > 0) {
		/* Tell the server we failed */
		req_reject(PBSE_ALPSRELERR, j, preq);
	} else if (j < 0) {
		/* Fatal error, log message was logged in
		 * alps_cancel_request
		 */
		req_reject(PBSE_ALPSRELERR, j, preq);
	} else {
		/* The job will have been purged in mom_deljob_wait at this point
		 * so just do the reply.
		 */
		reply_ack(preq);
	}
}
#endif

/**
 * @brief
 * 	del_job_hw	delete job/hardware related resources such as ALPS reservations, ...
 *
 *	Used by del_job_resc() and exec_bail()
 *	Most items here are platform dependent.
 *
 * @param pjob  - pointer to job structure
 *
 * @return void
 *
 */
void
del_job_hw(job *pjob)
{
#if MOM_ALPS
	int i;
	int j;
	int sleeptime = 0;
	time_t total_time = 0;
	time_t begin_time = 0;
	time_t end_time = 0;
	long jitter = 0;
	pid_t parent_pid = 0;
	pid_t pid;
	int sconn = -1;
	struct work_task *wtask = NULL;

	/*
	 * Try to cancel the reservation once as 'main MOM'.
	 * If we got an acknowledgment from ALPS that the reservation
	 * is actually gone, then send ACK to server.
	 * Else, fork a child process that will continue to try to cancel
	 * the reservation until the remaining processes count is zero.
	 * Or until the ALPS reservation no longer exists.
	 */
	if ((j = alps_cancel_reservation(pjob)) > 0) {
		/*
		 * alps reservation cancel failed with "temporary" error
		 * This could be due to one of more of the following:
		 * 	- the reservation still has claims on it
		 * 	- ALPS is down
		 * Retry in child until success, or a hard error is returned
		 * Or alps_release_timeout is reached.
		 * Once the ALPS reservation is successfully canceled,
		 * respond to the server's delete job request.
		 * The job will remain in the 'E' state until then.
		 */
		if (pjob->ji_preq != NULL)
			sconn = pjob->ji_preq->rq_conn;

		if ((pid = fork_me(sconn)) == 0) {
			/* We are the child */
			begin_time = time(NULL);
			end_time = begin_time;
			/* add jobid to the seed */
			srandom((unsigned) (atoi(pjob->ji_qs.ji_jobid) + begin_time));
			for (i = 1; (total_time = end_time - begin_time) < alps_release_timeout; ++i, end_time = time(NULL)) {
				/* calculate time to sleep */
				sleeptime = alps_release_wait_time;
				/* Add randomness of 0 to 0.12 seconds to the
				 * sleeptime so we don't overwhelm ALPS with
				 * multiple ALPS release requests when jobs end
				 * at the same time.
				 */
				jitter = random() % alps_release_jitter;
				sleeptime += jitter;
				usleep(sleeptime);
				if ((j = alps_cancel_reservation(pjob)) <= 0)
					break;
			}
			if (j > 0) {
				sprintf(log_buffer,
					"Timed out after %d attempts over "
					"%ld seconds of attempting "
					"to cancel ALPS reservation %ld",
					i, total_time,
					pjob->ji_extended.ji_ext.ji_reservation);
				log_joberr(-1, __func__, log_buffer,
					   pjob->ji_qs.ji_jobid);
				/* send a HUP to main MOM so she re-reads
				 * the ALPS inventory
				 */
				parent_pid = getppid();
				kill(parent_pid, SIGHUP);

			} else if (j == 0) {
				sprintf(log_buffer,
					"Cancelled ALPS reservation %ld after a "
					"total of %d tries",
					pjob->ji_extended.ji_ext.ji_reservation, i + 1);
				log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB,
					  LOG_DEBUG, pjob->ji_qs.ji_jobid,
					  log_buffer);
			}

			/* exit with the respective error code to the parent process
			 * Parents (moms) post handler will handle this
			 */
			exit(j);

		} else if (pid > 0) {
			/* we are the parent, the reply happens after the child exits */
			if ((wtask = set_task(WORK_Deferred_Child, pid,
					      post_alps_cancel_resv, pjob->ji_preq)) == NULL) {
				log_err(errno, NULL, "Failed to create deferred work task, Out of memory");
				req_reject(PBSE_SYSTEM, 0, pjob->ji_preq);
			}
		} else if (pid < 0) {
			/* fork failed, reply to the server so the job
			 * doesn't stay in the "E" state
			 */
			req_reject(PBSE_ALPSRELERR, j, pjob->ji_preq);
		}
	} else if (j < 0) {
		/* ALPS returned a PERMANENT error */
		req_reject(PBSE_ALPSRELERR, j, pjob->ji_preq);
	} else {
		/* The reservation was canceled, let server know */
		reply_ack(pjob->ji_preq);
	}
	pjob->ji_preq = NULL;
#endif
}

/**
 * @brief
 * 	del_job_resc - delete job related resources, files, etc
 *	Used by mom_deljob() and mom_deljob_wait()
 *
 *	Items which are kept until the very bitter end of the job, just
 *	before the job structure is freed, are released/freed/cleared here.
 *
 * @param[in] pjob - pointer to job structure
 *
 * @return Void
 *
 */
void
del_job_resc(job *pjob)
{
	/*
	 * WARNING - the following is for QA automated testing to induce
	 * certain failures modes
	 */

	if (QA_testing != 0) {
		if (QA_testing & PBSQA_DELJOB_SLEEP)
			sleep(90); /* 90 second delay */
		else if (QA_testing & PBSQA_DELJOB_SLEEPLONG)
			sleep(900); /* 900 second long delay */
		else if (QA_testing & PBSQA_DELJOB_CRASH)
			exit(99); /* simulate crash */
	}

	/* remove PBS_NODEFILE - Mother Superior shall have one and the sister
	moms too if the mom config gen_nodefile_on_sister_mom is set to 1 */

	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE || gen_nodefile_on_sister_mom) {
		char file[MAXPATHLEN + 1];
#ifdef WIN32
		(void) sprintf(file, "%s/auxiliary/%s",
			       pbs_conf.pbs_home_path, pjob->ji_qs.ji_jobid);
#else
		(void) sprintf(file, "%s/aux/%s",
			       pbs_conf.pbs_home_path, pjob->ji_qs.ji_jobid);
#endif
		(void) unlink(file);
	}

	/* TMPDIR removed in job_purge so files are available for staging */

	if (job_clean_extra != NULL) {
		(void) job_clean_extra(pjob);
	}

	/* delete the hardware related items */

	del_job_hw(pjob);
}

/**
 * @brief
 * 	mom_deljob - delete the job entry, MOM no longer knows about the job
 *	This version does NOT wait for the Sisters to reply
 *
 * @param[in] pjob - pointer to job structure
 *
 * @return Void
 *
 */
void
mom_deljob(job *pjob)
{

	del_job_resc(pjob); /* rm tmpdir, etc. */

	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) /* MS */
		(void) send_sisters(pjob, IM_DELETE_JOB, NULL);
	job_purge_mom(pjob);

	/*
	 ** after job is gone, check to make sure no rogue user
	 ** procs are still hanging about
	 */
	dorestrict_user();
}

/**
 * @brief
 * 	mom_deljob_wait - deletes most of the job stuff, job entry not deleted
 *	until the sisters have replied or are down
 *	This version DOES wait for the Sisters to reply, see processing of
 *	IM_DELETE_JOB_REPLY in mom_comm.c
 *	IT should only be called for a job for which this is Mother Superior.
 *
 * @param[in] pjob - pointer to job structure
 *
 * @return int
 * @retval the number of sisters to whom the request was sent
 *
 */
int
mom_deljob_wait(job *pjob)
{
	int i;

	del_job_resc(pjob); /* rm tmpdir, etc. */

	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) { /* MS */
		set_job_substate(pjob, JOB_SUBSTATE_DELJOB);
		pjob->ji_sampletim = time_now;
		/*
		 * The SISTER_KILLDONE flag needs to be reset so
		 * we can talk to the sisterhood and know when they reply.
		 */
		for (i = 0; i < pjob->ji_numnodes; i++) {
			hnodent *np = &pjob->ji_hosts[i];

			if (np->hn_node == pjob->ji_nodeid) /* me */
				continue;

			if (np->hn_sister == SISTER_KILLDONE)
				np->hn_sister = SISTER_OKAY;
		}
		i = send_sisters(pjob, IM_DELETE_JOB_REPLY, NULL);
		if (i == 0) {
			if (pjob->ji_numnodes > 1) {
				sprintf(log_buffer, "Unable to send delete job "
						    "request to one or more sisters");
				log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB,
					  LOG_ERR, pjob->ji_qs.ji_jobid, log_buffer);
			}

			if (mock_run) {
				/* Delete the job end work task for this job */
				delete_task_by_parm1_func(pjob, mock_run_end_job_task, DELETE_ALL);
			}

			/* job is purged here first, discard job happens later
			 * and IM_DISCARD_JOB does not find pjob to kill
			 * job process in case of a mom restart
			 *
			 * Fixing by killing job here, should not hurt in any
			 * case (since we are purging job anyway)
			 */
			(void) kill_job(pjob, SIGKILL);
			job_purge_mom(pjob);
			dorestrict_user();
		}
		/*
		 * otherwise, job_purge() and dorestrict_user() are called in
		 * mom_comm when all the sisters have replied.  The reply to
		 * the Server is also done there
		 */
		return (i);
	} else
		return 0;
}

/**
 *
 * @brief
 *  The wrapper to "mom_deljob_wait()".
 * @par
 *  This will call mom_deljob_wait based on MOM_ALPS macro and
 *  reply to the batch request.
 *
 * @param[in] pjob - pointer to job structure
 *
 * @return void
 */
void
mom_deljob_wait2(job *pjob)
{
#if MOM_ALPS
	(void) mom_deljob_wait(pjob);

#else
	int numnodes;
	struct batch_request *preq;
	/*
	 * save number of nodes in sisterhood in case
	 * job is deleted in mom_deljob_wait()
	 */
	numnodes = pjob->ji_numnodes;

	preq = pjob->ji_preq;
	pjob->ji_preq = NULL;
	if (mom_deljob_wait(pjob) > 0) {
		/* wait till sisters respond */
		pjob->ji_preq = preq;
	} else if (numnodes > 1) {
		/*
		* no messages sent, but there are sisters
		* must be all down
		*/
		req_reject(PBSE_SISCOMM, 0, preq); /* all sis down */
	} else {
		reply_ack(preq); /* no sisters, reply now  */
	}
#endif
}

/**
 * @brief
 * send_sisters_deljob_wait	-
 * 	Job entry is not deleted until the sisters have replied or are down
 *	This version DOES wait for the Sisters to reply, see processing of
 *	IM_DELETE_JOB_REPLY in mom_comm.c
 *	It should only be called for a job for which this is Mother Superior.
 *
 * @param[in] pjob - pointer to job structure
 *
 * @return int
 * @retval the number of sisters to whom the request was sent
 *
 */
int
send_sisters_deljob_wait(job *pjob)
{
	int i;

	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) { /* MS */
		set_job_substate(pjob, JOB_SUBSTATE_DELJOB);
		pjob->ji_sampletim = time_now;
		/*
		 * The SISTER_KILLDONE flag needs to be reset so
		 * we can talk to the sisterhood and know when they reply.
		 */
		for (i = 0; i < pjob->ji_numnodes; i++) {
			hnodent *np = &pjob->ji_hosts[i];

			if (np->hn_node == pjob->ji_nodeid) /* me */
				continue;

			if (np->hn_sister == SISTER_KILLDONE)
				np->hn_sister = SISTER_OKAY;
		}
		return (send_sisters(pjob, IM_DELETE_JOB_REPLY, NULL));
	} else
		return 0;
}

/**
 * @brief
 * 		Convenience function to call mom_set_use() when all jobs need to be updated
 *
 * @param	void
 * @return	void
 */
void
mom_set_use_all(void)
{
	job *pjob = NULL;

	if (!mock_run) {
		if (mom_get_sample() == PBSE_NONE) {
			pjob = (job *) GET_NEXT(svr_alljobs);
			while (pjob) {
				if ((check_job_state(pjob, JOB_STATE_LTR_EXITING) &&
				     (get_job_substate(pjob) >= JOB_SUBSTATE_OBIT ||
				      get_job_substate(pjob) == JOB_SUBSTATE_EXITED)) ||
				    (check_job_state(pjob, JOB_STATE_LTR_RUNNING) && get_job_substate(pjob) <= JOB_SUBSTATE_PRERUN)) {
					pjob = (job *) GET_NEXT(pjob->ji_alljobs);
					continue;
				}
				mom_set_use(pjob);
				pjob = (job *) GET_NEXT(pjob->ji_alljobs);
			}
		}
	}
}

/**
 * @brief	Wrapper function to job purge
 *
 * @param[in]	pjob - the job being purged
 *
 * @return	void
 */
void
job_purge_mom(job *pjob)
{
	if (mock_run)
		mock_run_job_purge(pjob);
	else
		job_purge(pjob);
}
