/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 * functions dealing with a Job Obituary Request (Notice)
 * and the associated post execution job clean up
 */

#include <pbs_config.h> /* the master config generated by configure */

#include <sys/types.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include "libpbs.h"
#include "server_limits.h"
#include "list_link.h"
#include "attribute.h"
#include "resource.h"
#include "server.h"
#include "job.h"
#include "credential.h"
#include "ticket.h"
#include "batch_request.h"
#include "work_task.h"
#include "pbs_error.h"
#include "log.h"
#include "acct.h"
#include "net_connect.h"
#include "pbs_nodes.h"
#include "svrfunc.h"
#include "sched_cmds.h"
#include "mom_server.h"
#include "dis.h"
#include "tpp.h"
#include "libutil.h"
#include "pbs_sched.h"

/* External Global Data Items */

extern unsigned int pbs_mom_port;
extern char *path_spool;
extern int server_init_type;
extern pbs_net_t pbs_server_addr;
extern char *msg_init_abt;
extern char *msg_job_end;
extern char *msg_job_end_sig;
extern char *msg_job_end_stat;
extern char *msg_momnoexec1;
extern char *msg_momnoexec2;
extern char *msg_baduser;
extern char *msg_job_globfail1;
extern char *msg_obitnojob;
extern char *msg_obitnocpy;
extern char *msg_obitnodel;
extern char *msg_bad_password;
extern char *msg_hook_reject_deletejob;
extern char *msg_hook_reject_rerunjob;
extern char *msg_momkillncpusburst;
extern char *msg_momkillncpussum;
extern char *msg_momkillvmem;
extern char *msg_momkillmem;
extern char *msg_momkillcput;
extern char *msg_momkillwalltime;
extern time_t time_now;

/* External Functions called */

extern void set_resc_assigned(void *, int, enum batch_op);
extern long get_walltime(const job *, int);

/* Local public functions  */
void on_job_rerun(struct work_task *ptask);
void on_job_exit(struct work_task *ptask);
extern void set_admin_suspend(job *pjob, int set_remove_nstate);

static char *msg_obitnotrun = "job not running, may have been requeued on node failure";

/**
 * @brief
 * 		mom_comm - if needed, open a connection with the MOM under which
 *		the job was running.  The connection is typically set up by
 *		req_jobobit() using the connection already established by MOM.
 *		However, on server recovery there will be no pre-established connection.
 *
 *		If a connection is needed and cannot be setup, set up a work-task
 *		entry and try again later.
 *
 * @param[in]	pjob	- job structure
 * @param[in]	func	- function pointer which accepts a work task structure and returns void
 * 							here it can calls on_job_exit and on_job_rerun
 *
 * @return	open connection handle to MOM
 * @retval	-1	- failure
 */

int
mom_comm(job *pjob, void (*func)(struct work_task *))
{
	unsigned int dum;
	long t;
	struct work_task *pwt;
	int prot = PROT_TPP;

	if (pjob->ji_momhandle < 0) {

		/* need to make connection, called from pbsd_init() */

		if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr == 0) {
			char *exec_vnode = get_jattr_str(pjob, JOB_ATR_exec_vnode);
			if (!is_jattr_set(pjob, JOB_ATR_exec_vnode) || exec_vnode == NULL)
				return -1;
			pjob->ji_qs.ji_un.ji_exect.ji_momaddr = get_addr_of_nodebyname(exec_vnode, &dum);
			if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr == 0)
				return -1;

			pjob->ji_qs.ji_un.ji_exect.ji_momport = dum;
		}
		pjob->ji_momhandle = svr_connect(
			pjob->ji_qs.ji_un.ji_exect.ji_momaddr,
			pjob->ji_qs.ji_un.ji_exect.ji_momport,
			process_Dreply,
			ToServerDIS,
			prot);
		pjob->ji_mom_prot = prot;

		if (pjob->ji_momhandle < 0) {
			char *operation;

			t = pjob->ji_retryok++;
			t = PBS_NET_RETRY_TIME + t * t;

			if (func == on_job_exit)
				operation = "exit";
			else if (func == on_job_rerun)
				operation = "rerun";
			else
				operation = "UNKNOWN";

			snprintf(log_buffer, sizeof(log_buffer),
				 "cannot connect to MOM, reschedule job %s "
				 "in %ld seconds",
				 operation, t);
			log_err(-1, pjob->ji_qs.ji_jobid, log_buffer);

			t += time_now;
			pwt = set_task(WORK_Timed, t, func, (void *) pjob);
			append_link(&pjob->ji_svrtask, &pwt->wt_linkobj, pwt);
			return (-1);
		}
	}
	return (pjob->ji_momhandle);
}

/**
 * @brief
 * 		rel_resc - release resources assigned to the job
 *
 * @param[in]	pjob	- job structure
 */
void
rel_resc(job *pjob)
{
	conn_t *conn = NULL;
	pbs_sched *psched;

	free_nodes(pjob);

	/* removed the resources used by the job from the used svr/que attr  */

	set_resc_assigned((void *) pjob, 0, DECR);

	/* is there a rerun request waiting for acknowledgement that        */
	/* resources (including licenses) are indeed released? Then ack it. */
	if (pjob->ji_rerun_preq != NULL) { /* set only in req_rerun() */
		if (pjob->ji_rerun_preq->rq_conn != PBS_LOCAL_CONNECTION)
			conn = get_conn(pjob->ji_rerun_preq->rq_conn);

		reply_ack(pjob->ji_rerun_preq);

		/* clear no-timeout flag on connection to prevent stale connections */
		if (conn)
			conn->cn_authen &= ~PBS_NET_CONN_NOTIMEOUT;

		pjob->ji_rerun_preq = NULL;
	}
	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_AdmSuspd)
		set_admin_suspend(pjob, 0);

	/* Mark that scheduler should be called */

	if (find_assoc_sched_jid(pjob->ji_qs.ji_jobid, &psched))
		set_scheduler_flag(SCH_SCHEDULE_TERM, psched);
	else {
		pbs_queue *pq;
		pq = find_queuebyname(pjob->ji_qs.ji_queue);
		sprintf(log_buffer, "Unable to reach scheduler associated with partition %s", get_qattr_str(pq, QA_ATR_partition));
		log_err(-1, __func__, log_buffer);
	}
}
/**
 * @brief
 * 		on_exitrerun_msg	- log message on exit rerun fails, used with
 * 								on_job_rerun() and conn_to_mom_failed()
 *
 * @param[in]	pjob	- job which has failed
 * @param[in]	fmt	- failure message
 */
static void
on_exitrerun_msg(job *pjob, char *fmt)
{
	char *hostname = " ? ";

	if (pjob->ji_qs.ji_destin[0] != '\0')
		hostname = pjob->ji_qs.ji_destin;

	sprintf(log_buffer, fmt, pjob->ji_qs.ji_jobid, hostname);
	log_event(PBSEVENT_ERROR | PBSEVENT_ADMIN | PBSEVENT_JOB,
		  PBS_EVENTCLASS_JOB, LOG_INFO,
		  pjob->ji_qs.ji_jobid, log_buffer);
}

/**
 * @brief
 * 		conn_to_mom_failed - called when the connection to Mom for end of job
 *		processing is broken (Mom gone?).  Log it and close and attempt to
 *		open a new one by going around again.
 *
 * @param[in]	pjob	- job structure
 * @param[in]	func	- function pointer which accepts a work task structure and returns void
 * 							here it can calls on_job_exit and on_job_rerun
 */

static void
conn_to_mom_failed(job *pjob, void (*func)(struct work_task *))
{
	struct work_task *ptask;

	on_exitrerun_msg(pjob, "end of job processing for %s, connection to Mom on host %s was broken");
	if (pjob->ji_mom_prot == PROT_TCP) {
		svr_disconnect(pjob->ji_momhandle);
	} else {
		tpp_close(pjob->ji_momhandle);
		tdelete2((u_long) pjob->ji_momhandle, 0, &streams);
	}
	pjob->ji_momhandle = -1;
	ptask = set_task(WORK_Immed, 0, func, pjob);
	append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);
	return;
}

static void
end_job(job *pjob, int isexpress)
{
	struct batch_request *preq;
	char hook_msg[HOOK_MSG_SIZE] = {0};
	char *rec = "";
	int rc;

	if (isexpress) {
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "express end of job");
		/* see if have any dependencies */
		if (is_jattr_set(pjob, JOB_ATR_depend))
			(void) depend_on_term(pjob);

		/* Set job's exec_vnodes with current time for last_used_time. */
		set_last_used_time_node(pjob, 0);
	}

	pjob->ji_qs.ji_obittime = time_now;
	set_jattr_l_slim(pjob, JOB_ATR_obittime, pjob->ji_qs.ji_obittime, SET);

	/* Allocate space for the jobobit hook event params */
	preq = alloc_br(PBS_BATCH_JobObit);
	if (preq == NULL) {
		log_err(PBSE_INTERNAL, __func__, "rq_jobobit alloc failed");
	} else {
		preq->rq_ind.rq_obit.rq_pjob = pjob;
		rc = process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt);
		if (rc == -1) {
			log_err(-1, __func__, "rq_jobobit process_hooks call failed");
		}
		free_br(preq);
	}

	if (pjob->ji_momhandle != -1 && pjob->ji_mom_prot == PROT_TCP)
		svr_disconnect(pjob->ji_momhandle);
	rel_resc(pjob); /* free any resc assigned to the job */

	account_job_update(pjob, PBS_ACCT_LAST);
	account_jobend(pjob, pjob->ji_acctrec, PBS_ACCT_END);

	if (pjob->ji_acctrec)
		rec = pjob->ji_acctrec;

	if (get_sattr_long(SVR_ATR_log_events) & PBSEVENT_JOB_USAGE) {
		/* log events set to record usage */
		log_event(PBSEVENT_JOB_USAGE | PBSEVENT_JOB_USAGE,
			  PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, rec);
	} else {
		char *pc;

		/* no usage in log, truncate messge */

		if ((pc = strchr(rec, ' ')) != NULL)
			*pc = '\0';
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, rec);
	}

	if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0)
		issue_track(pjob);

	if (pjob->ji_pmt_preq != NULL)
		reply_preempt_jobs_request(PBSE_NONE, PREEMPT_METHOD_DELETE, pjob);
	/*
	 * Check if the history of the finished job can be saved or it needs to be purged.
	 */
	svr_saveorpurge_finjobhist(pjob);
}

/**
 * @brief
 * 		continue post-execution processing of a job that terminated.
 *
 *		This function is called by pbsd_init() on recovery, by job_obit()
 *		on job termination and by itself (via a work task).  The clue to where
 *		we are is the job substate and the type of the work task entry it is
 *		called with.  If the work task entry type is Work_Immed, then this is
 *		the first time in for the job substate.  Otherwise it is with the reply
 *		given by MOM.
 *
 *		NOTE:
 *		On the initial work task (WORK_Immed), the wt_parm1 is a job pointer.
 *		On a call-back work task (WORK_Deferred_Reply) generated by
 *		send_request(), the wt_parm1 is pointing to the request; and the
 *		rq_extra field in the request points to the job.
 *
 * @param[in,out]	ptask	- work task
 */
void
on_job_exit(struct work_task *ptask)
{
	int handle;
	job *pjob;
	struct batch_request *preq;
	struct work_task *pt;
	int rc;
	int stageout_status = 1; /* success */
	long t;
	pbs_list_head *mom_tasklist_ptr = NULL;
	mominfo_t *pmom = 0;
	int release_nodes_on_stageout = 0;

	if (ptask->wt_type != WORK_Deferred_Reply) {
		preq = NULL;
		pjob = (job *) ptask->wt_parm1;
	} else {
		preq = (struct batch_request *) ptask->wt_parm1;
		pjob = (job *) preq->rq_extra;
	}

	/* minor check on validity of pjob */
	if (isdigit((int) pjob->ji_qs.ji_jobid[0]) == 0)
		return; /* not pointing to currently valid job */

	if (check_job_substate(pjob, JOB_SUBSTATE_EXITING)) {
		/*
		 * If jobs doesn't have any files to stage/delete and there is no execjob_end
		 * hook to run, end job immediately
		 *
		 * If no stage files but has execjob_end hook to run, put job directly in
		 * exited sub state
		 */
		int hs = has_stage(pjob);
		rc = num_eligible_hooks(HOOK_EVENT_EXECJOB_END);
		if (!rc && !hs) {
			end_job(pjob, 1);
			return;
		} else if (rc > 0 && !hs)
			svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_EXITED);
	}

	if (is_jattr_set(pjob, JOB_ATR_relnodes_on_stageout) &&
	    (get_jattr_long(pjob, JOB_ATR_relnodes_on_stageout) != 0))
		release_nodes_on_stageout = 1;

	if ((handle = mom_comm(pjob, on_job_exit)) < 0)
		return;

	if (pjob->ji_mom_prot == PROT_TPP) {
		pmom = tfind2((unsigned long) pjob->ji_qs.ji_un.ji_exect.ji_momaddr,
			      pjob->ji_qs.ji_un.ji_exect.ji_momport,
			      &ipaddrs);
		if (!pmom || (pmom->mi_dmn_info->dmn_state & INUSE_DOWN))
			return;
		mom_tasklist_ptr = &pmom->mi_dmn_info->dmn_deferred_cmds;
	}

	switch (get_job_substate(pjob)) {

		case JOB_SUBSTATE_EXITING:
		case JOB_SUBSTATE_ABORT:

			svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_STAGEOUT);
			ptask->wt_type = WORK_Immed;

			/* Initialize retryok */
			pjob->ji_retryok = 0;

			/* NO BREAK, fall into stage out processing */

		case JOB_SUBSTATE_STAGEOUT:

			if (ptask->wt_type != WORK_Deferred_Reply) {

				/* this is the very first call, have mom copy files */
				/* first check the standard files: output & error   */

				preq = cpy_stdfile(preq, pjob, JOB_ATR_outpath);
				preq = cpy_stdfile(preq, pjob, JOB_ATR_errpath);

				/* are there any stage-out files ?		 	*/

				preq = cpy_stage(preq, pjob, JOB_ATR_stageout, STAGE_DIR_OUT);

				if (preq) { /* have files to copy 		*/
					if (release_nodes_on_stageout) {
						if (free_sister_vnodes(pjob, NULL, NULL, log_buffer, LOG_BUF_SIZE, NULL) != 0) {
							log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_WARNING, pjob->ji_qs.ji_jobid, log_buffer);
						}
					}
					preq->rq_extra = (void *) pjob;
					rc = issue_Drequest(handle, preq, on_job_exit, &pt, pjob->ji_mom_prot);
					if (rc == 0) {
						append_link(&pjob->ji_svrtask, &pt->wt_linkobj, pt);
						if (pjob->ji_mom_prot == PROT_TPP)
							if (mom_tasklist_ptr)
								append_link(mom_tasklist_ptr, &pt->wt_linkobj2, pt); /* if tpp, link to mom list as well */
						return;								     /* come back when mom replies */
					} else {
						/* set up as if mom returned error */

						preq->rq_reply.brp_code = rc;
						preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL;
						preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0;
						/* we will "fall" into the post reply side */
					}

				} else { /* no files to copy, any to delete? */
					svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_STAGEDEL);
					ptask = set_task(WORK_Immed, 0, on_job_exit, pjob);
					append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);
					return;
				}
			}

			/* here we have a reply (maybe faked) from MOM about the copy */

			if (preq->rq_reply.brp_code != 0) { /* error from MOM */

				if ((preq->rq_reply.brp_code == DIS_EOF) ||
				    (preq->rq_reply.brp_code == DIS_EOD)) {
					/* connection to Mom broken */
					conn_to_mom_failed(pjob, on_job_exit);
					free_br(preq);
					preq = NULL;
					return;
				}

				if (preq->rq_reply.brp_code == PBSE_NOCOPYFILE)
					stageout_status = 0;

				on_exitrerun_msg(pjob, msg_obitnocpy);
				if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) {
					int len = strlen(log_buffer);

					if (len < LOG_BUF_SIZE + 2) {
						log_buffer[len++] = '\n';
						strncpy(&log_buffer[len],
							preq->rq_reply.brp_un.brp_txt.brp_str,
							LOG_BUF_SIZE - len);
					}
				}
				svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buffer);
			}

			set_jattr_l_slim(pjob, JOB_ATR_stageout_status, stageout_status, SET);

			/*
			 * files (generally) copied ok, move on to the next phase by
			 * "faking" the immediate work task.
			 */

			free_br(preq);
			preq = NULL;
			svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_STAGEDEL);
			ptask->wt_type = WORK_Immed;

			/* NO BREAK - FALL INTO THE NEXT CASE */

		case JOB_SUBSTATE_STAGEDEL:

			if (ptask->wt_type != WORK_Deferred_Reply) { /* first time in */

				/* Build list of files which were staged-in so they can
				 * can be deleted.
				 */

				preq = cpy_stage(preq, pjob, JOB_ATR_stagein, 0);

				if (preq) { /* have files to delete		*/

					/* change the request type from copy to delete  */

					if (preq->rq_type == PBS_BATCH_CopyFiles_Cred)
						preq->rq_type = PBS_BATCH_DelFiles_Cred;
					else
						preq->rq_type = PBS_BATCH_DelFiles;
					preq->rq_extra = (void *) pjob;

					rc = issue_Drequest(handle, preq, on_job_exit, &pt, pjob->ji_mom_prot);
					if (rc == 0) {
						append_link(&pjob->ji_svrtask, &pt->wt_linkobj, pt);
						if (pjob->ji_mom_prot == PROT_TPP)
							if (mom_tasklist_ptr)
								append_link(mom_tasklist_ptr, &pt->wt_linkobj2, pt); /* if tpp, link to mom list as well */
						return;								     /* come back when mom replies */
					} else {
						/* set up as if mom returned error */

						preq->rq_reply.brp_code = rc;
						preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL;

						/* we will "fall" into the post reply side */
					}

				} else { /* preq == 0, no files to delete   */
					svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_EXITED);
					ptask = set_task(WORK_Immed, 0, on_job_exit, pjob);
					append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);
					return;
				}
			}

			/* After MOM replied (maybe faked) to Delete Files request */

			if (preq->rq_reply.brp_code != 0) { /* an error occurred */

				if ((preq->rq_reply.brp_code == DIS_EOF) ||
				    (preq->rq_reply.brp_code == DIS_EOD)) {
					/* tcp connection to Mom broken */
					conn_to_mom_failed(pjob, on_job_exit);
					free_br(preq);
					preq = NULL;
					return;
				}
				if (preq->rq_reply.brp_code == PBSE_TRYAGAIN) {
					/* Mom hasn't finished her post processing yet,
					 * send the delete request again later.
					 */
					t = pjob->ji_retryok++;
					t = time_now + (t * t);
					ptask = set_task(WORK_Timed, t, on_job_exit, pjob);
					append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);

					free_br(preq);
					preq = NULL;
					return;
				}

				on_exitrerun_msg(pjob, msg_obitnodel);
				if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) {
					int len = strlen(log_buffer);

					if (len < LOG_BUF_SIZE + 2) {
						log_buffer[len++] = '\n';
						strncpy(&log_buffer[len],
							preq->rq_reply.brp_un.brp_txt.brp_str,
							LOG_BUF_SIZE - len);
					}
				}
				svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buffer);
			}
			free_br(preq);
			preq = NULL;
			svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_EXITED);

			ptask->wt_type = WORK_Immed;

			/* NO BREAK, FALL INTO NEXT CASE */

		case JOB_SUBSTATE_EXITED:

			if (ptask->wt_type != WORK_Deferred_Reply) { /* first time in */

				/* see if have any dependencies */

				if (is_jattr_set(pjob, JOB_ATR_depend))
					(void) depend_on_term(pjob);

				/* tell mom to delete the job */

				preq = alloc_br(PBS_BATCH_DeleteJob);
				if (preq) {
					strcpy(preq->rq_ind.rq_delete.rq_objname,
					       pjob->ji_qs.ji_jobid);
					preq->rq_extra = (void *) pjob;
					rc = issue_Drequest(handle, preq, on_job_exit, &pt, pjob->ji_mom_prot);
					if (rc == 0) {
						append_link(&pjob->ji_svrtask, &pt->wt_linkobj, pt);
						if (pjob->ji_mom_prot == PROT_TPP)
							if (mom_tasklist_ptr)
								append_link(mom_tasklist_ptr, &pt->wt_linkobj2, pt); /* if tpp, link to mom list as well */
						return;								     /* come back when mom replies */
					} else {
						/* set up as if mom returned error */

						preq->rq_reply.brp_code = rc;
						preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL;

						/* we will "fall" into the post reply side */
					}
				} else {
					log_err(-1, pjob->ji_qs.ji_jobid,
						"Unable to malloc memory for deletejob");
					return;
				}
			}

			/* Set job's exec_vnodes with current time for last_used_time. */
			set_last_used_time_node(pjob, 0);

			/* here we have a reply from MOM about the delete */
			/* if delete ok, send final track and purge the job */

			if (preq->rq_reply.brp_code == PBSE_SISCOMM) {

				/* some sister Mom apparently failed to delete the job and
				 * free resoures, keep job until discard_job() does its job
				 */
				free_br(preq);
				preq = NULL;
				if (handle != -1 && pjob->ji_mom_prot == PROT_TCP)
					svr_disconnect(handle);

				discard_job(pjob, "A sister Mom failed to delete job", 0);
				return;
			} else if ((preq->rq_reply.brp_code == DIS_EOF) ||
				   (preq->rq_reply.brp_code == DIS_EOD)) {
				/* tcp connection to Mom broken */
				conn_to_mom_failed(pjob, on_job_exit);
				free_br(preq);
				preq = NULL;
				return;
			} else if (preq->rq_reply.brp_code == PBSE_TRYAGAIN) {
				/* Mom hasn't finished her post processing yet,
				 * send the delete request again later.
				 */
				t = pjob->ji_retryok++;
				t = time_now + (t * t);
				ptask = set_task(WORK_Timed, t, on_job_exit, pjob);
				append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);

				free_br(preq);
				preq = NULL;
				return;
			} else {
				/* all went ok with the delete by Mom(s) */
				free_br(preq);
				preq = NULL;
				end_job(pjob, 0);
			}
			break;
		case JOB_SUBSTATE_TERMINATED:
			set_last_used_time_node(pjob, 0);
	}
}

/**
 * @brief
 *	Unset values of various attributes of 'pjob'
 *	specifically for node ramp down feature.
 *
 * @param[in]	pjob - job in question
 *
 * @return void
 *
 */
void
unset_extra_attributes(job *pjob)
{
	if (pjob == NULL)
		return;

	if (is_jattr_set(pjob, JOB_ATR_resource_orig)) {
		free_jattr(pjob, JOB_ATR_resource);
		mark_jattr_not_set(pjob, JOB_ATR_resource);
		set_attr_with_attr(&job_attr_def[(int) JOB_ATR_resource], get_jattr(pjob, JOB_ATR_resource), get_jattr(pjob, JOB_ATR_resource_orig), INCR);

		free_jattr(pjob, JOB_ATR_resource_orig);
		mark_jattr_not_set(pjob, JOB_ATR_resource_orig);
	}

	if (is_jattr_set(pjob, JOB_ATR_resc_used_update)) {
		free_jattr(pjob, JOB_ATR_resc_used_update);
		mark_jattr_not_set(pjob, JOB_ATR_resc_used_update);
	}

	if (is_jattr_set(pjob, JOB_ATR_exec_vnode_acct)) {
		free_jattr(pjob, JOB_ATR_exec_vnode_acct);
		mark_jattr_not_set(pjob, JOB_ATR_exec_vnode_acct);
	}

	if (is_jattr_set(pjob, JOB_ATR_exec_vnode_orig)) {
		free_jattr(pjob, JOB_ATR_exec_vnode_orig);
		mark_jattr_not_set(pjob, JOB_ATR_exec_vnode_orig);
	}

	if (is_jattr_set(pjob, JOB_ATR_exec_host_acct)) {
		free_jattr(pjob, JOB_ATR_exec_host_acct);
		mark_jattr_not_set(pjob, JOB_ATR_exec_host_acct);
	}

	if (is_jattr_set(pjob, JOB_ATR_exec_host_orig)) {
		free_jattr(pjob, JOB_ATR_exec_host_orig);
		mark_jattr_not_set(pjob, JOB_ATR_exec_host_orig);
	}

	if (is_jattr_set(pjob, JOB_ATR_SchedSelect_orig)) {
		set_jattr_str_slim(pjob, JOB_ATR_SchedSelect, get_jattr_str(pjob, JOB_ATR_SchedSelect_orig), NULL);

		free_jattr(pjob, JOB_ATR_SchedSelect_orig);
		mark_jattr_not_set(pjob, JOB_ATR_SchedSelect_orig);
	}

	if (is_jattr_set(pjob, JOB_ATR_exec_vnode_deallocated)) {
		free_jattr(pjob, JOB_ATR_exec_vnode_deallocated);
		mark_jattr_not_set(pjob, JOB_ATR_exec_vnode_deallocated);
	}
}

/**
 * @brief
 * 		on_job_rerun - Handle the clean up of jobs being rerun.  This gets
 *		messy if the job is being executed on another host.  Then the
 *		"standard" files must be copied to the server for safe keeping.
 *
 *		The basic flow is very much like that of on_job_exit().
 *		The substate will already set to JOB_SUBSTATE_RERUN and the
 *		JOB_SVFLG_HASRUN bit set in ji_svrflags.
 *
 * @param[in,out]	ptask	- work task structure
 */
void
on_job_rerun(struct work_task *ptask)
{
	int handle;
	char newstate;
	char hook_msg[HOOK_MSG_SIZE] = {0};
	int newsubst;
	job *pjob;
	struct batch_request *preq;
	struct work_task *pt;
	int rc;
	pbs_list_head *mom_tasklist_ptr = NULL;
	mominfo_t *pmom = 0;

	if (ptask->wt_type != WORK_Deferred_Reply) {
		preq = NULL;
		pjob = (job *) ptask->wt_parm1;
	} else {
		preq = (struct batch_request *) ptask->wt_parm1;
		pjob = (job *) preq->rq_extra;
	}

	/* minor check on validatity of pjob */

	if (isdigit((int) pjob->ji_qs.ji_jobid[0]) == 0)
		return; /* not pointing to currently valid job */

	if ((handle = mom_comm(pjob, on_job_rerun)) < 0)
		return;

	if (pjob->ji_mom_prot == PROT_TPP) {
		pmom = tfind2((unsigned long) pjob->ji_qs.ji_un.ji_exect.ji_momaddr,
			      pjob->ji_qs.ji_un.ji_exect.ji_momport,
			      &ipaddrs);
		if (!pmom || (pmom->mi_dmn_info->dmn_state & INUSE_DOWN))
			return;
		mom_tasklist_ptr = &pmom->mi_dmn_info->dmn_deferred_cmds;
	}

	switch (get_job_substate(pjob)) {

		case JOB_SUBSTATE_RERUN:

			if (ptask->wt_type != WORK_Deferred_Reply) {
				if (pjob->ji_qs.ji_un.ji_exect.ji_momaddr == pbs_server_addr) {

					/* files don`t need to be moved, go to next step */

					svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_RERUN1);
					ptask = set_task(WORK_Immed, 0, on_job_rerun, pjob);
					append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);
					return;
				}

				/* here is where we have to save the files	*/
				/* ask mom to send them back to the server	*/
				/* mom deletes her copy if returned ok	*/

				preq = alloc_br(PBS_BATCH_Rerun);
				if (preq == NULL) {
					return;
				}
				(void) strcpy(preq->rq_ind.rq_rerun, pjob->ji_qs.ji_jobid);
				preq->rq_extra = (void *) pjob;

				rc = issue_Drequest(handle, preq, on_job_rerun, &pt, pjob->ji_mom_prot);
				if (rc == 0) {
					/* request ok, will come back when its done */
					append_link(&pjob->ji_svrtask, &pt->wt_linkobj, pt);
					if (pjob->ji_mom_prot == PROT_TPP)
						if (mom_tasklist_ptr)
							append_link(mom_tasklist_ptr, &pt->wt_linkobj2, pt); /* if tpp, link to mom list as well */
					return;
				} else {
					/* set up as if mom returned error */

					preq->rq_reply.brp_code = rc;
					preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL;
					/* we will "fall" into the post reply side */
				}
			}

			/* We get here if MOM replied (may be faked above)  */
			/* to the rerun (return files) request issued above */

			if (preq->rq_reply.brp_code != 0) { /* error */
				/* for now, just log it */
				if ((preq->rq_reply.brp_code == DIS_EOF) ||
				    (preq->rq_reply.brp_code == DIS_EOD)) {
					/* tcp connection to Mom broken */
					conn_to_mom_failed(pjob, on_job_rerun);
					free_br(preq);
					preq = NULL;
					return;
				}
				on_exitrerun_msg(pjob, msg_obitnocpy);
			}
			svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_RERUN1);
			ptask->wt_type = WORK_Immed;
			free_br(preq);
			preq = NULL;

			/* NO BREAK, FALL THROUGH TO NEXT CASE, including the request */

		case JOB_SUBSTATE_RERUN1:

			if (ptask->wt_type != WORK_Deferred_Reply) {

				/* this is the very first call, have mom copy files */
				/* are there any stage-out files to process? 	*/

				preq = cpy_stage(preq, pjob, JOB_ATR_stageout, STAGE_DIR_OUT);

				if (preq) { /* have files to copy 		*/
					preq->rq_extra = (void *) pjob;
					rc = issue_Drequest(handle, preq, on_job_rerun, &pt, pjob->ji_mom_prot);
					if (rc == 0) {
						append_link(&pjob->ji_svrtask, &pt->wt_linkobj, pt);
						if (pjob->ji_mom_prot == PROT_TPP)
							if (mom_tasklist_ptr)
								append_link(mom_tasklist_ptr, &pt->wt_linkobj2, pt); /* if tpp, link to mom list as well */
						return;								     /* come back when mom replies */
					} else
						/* set up as if mom returned error */

						preq->rq_reply.brp_code = rc;
					preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL;
					preq->rq_reply.brp_un.brp_txt.brp_txtlen = 0;
					/* we will "fall" into the post reply side */

				} else { /* no files to copy, any to delete? */
					svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_RERUN2);
					ptask = set_task(WORK_Immed, 0, on_job_rerun, pjob);
					append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);
					return;
				}
			}

			/* here we have a reply (maybe faked) from MOM about the copy */

			if (preq->rq_reply.brp_code != 0) { /* error from MOM */

				if ((preq->rq_reply.brp_code == DIS_EOF) ||
				    (preq->rq_reply.brp_code == DIS_EOD)) {
					/* tcp connection to Mom broken */
					conn_to_mom_failed(pjob, on_job_rerun);
					free_br(preq);
					preq = NULL;
					return;
				}
				on_exitrerun_msg(pjob, msg_obitnocpy);
				if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) {
					int len = strlen(log_buffer);

					if (len < LOG_BUF_SIZE + 2) {
						log_buffer[len++] = '\n';
						strncpy(&log_buffer[len],
							preq->rq_reply.brp_un.brp_txt.brp_str,
							LOG_BUF_SIZE - len);
					}
				}
				svr_mailowner(pjob, MAIL_OTHER, MAIL_FORCE, log_buffer);
			}

			/*
			 * files (generally) copied ok, move on to the next phase by
			 * "faking" the immediate work task.
			 */

			free_br(preq);
			preq = NULL;
			svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_RERUN2);
			ptask->wt_type = WORK_Immed;

			/* NO BREAK - FALL INTO THE NEXT CASE */

		case JOB_SUBSTATE_RERUN2:

			if (ptask->wt_type != WORK_Deferred_Reply) {

				/* here is where we delete  any stage-in files	   */

				preq = cpy_stage(preq, pjob, JOB_ATR_stagein, 0);
				if (preq) {
					preq->rq_type = PBS_BATCH_DelFiles;
					preq->rq_extra = (void *) pjob;
					rc = issue_Drequest(handle, preq, on_job_rerun, &pt, pjob->ji_mom_prot);
					if (rc == 0) {
						append_link(&pjob->ji_svrtask, &pt->wt_linkobj, pt);
						if (pjob->ji_mom_prot == PROT_TPP)
							if (mom_tasklist_ptr)
								append_link(mom_tasklist_ptr, &pt->wt_linkobj2, pt); /* if tpp, link to mom list as well */
						return;
					} else { /* error on sending request */
						preq->rq_reply.brp_code = rc;
						preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL;
						/* we will "fall" into the post reply side */
					}
				} else {
					svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_RERUN3);
					ptask = set_task(WORK_Immed, 0, on_job_rerun, pjob);
					append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);
					return;
				}
			}

			/* post reply side for delete file request to MOM */
			if (preq->rq_reply.brp_code != 0) { /* error */
				if ((preq->rq_reply.brp_code == DIS_EOF) ||
				    (preq->rq_reply.brp_code == DIS_EOD)) {
					/* tcp connection to Mom broken */
					conn_to_mom_failed(pjob, on_job_rerun);
					free_br(preq);
					preq = NULL;
					return;
				}
				/* for other errors, just log it */
				on_exitrerun_msg(pjob, msg_obitnocpy);
			}
			free_br(preq);
			preq = NULL;
			svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_RERUN3);
			ptask->wt_type = WORK_Immed;

			/* NO BREAK, FALL THROUGH TO NEXT CASE */

		case JOB_SUBSTATE_RERUN3:

			if (ptask->wt_type != WORK_Deferred_Reply) {
				/* need to have MOM delete her copy of the job */
				preq = alloc_br(PBS_BATCH_DeleteJob);
				if (preq) {
					strcpy(preq->rq_ind.rq_delete.rq_objname,
					       pjob->ji_qs.ji_jobid);
					preq->rq_extra = (void *) pjob;
					rc = issue_Drequest(handle, preq, on_job_rerun, &pt, pjob->ji_mom_prot);
					if (rc == 0) {
						append_link(&pjob->ji_svrtask, &pt->wt_linkobj, pt);
						if (pjob->ji_mom_prot == PROT_TPP)
							if (mom_tasklist_ptr)
								append_link(mom_tasklist_ptr, &pt->wt_linkobj2, pt); /* if tpp, link to mom list as well */
						return;								     /* come back when Mom replies */
					} else {
						/* set up as if mom returned error */
						preq->rq_reply.brp_code = rc;
						preq->rq_reply.brp_choice = BATCH_REPLY_CHOICE_NULL;
						/* fall into next section */
					}
				} else {
					log_err(-1, pjob->ji_qs.ji_jobid,
						"Unable to malloc memory for rerun");
					return;
				}
			}

			/* here we have a reply from MOM about the delete */
			/* if delete ok, send final track and purge the job */

			if (preq->rq_reply.brp_code == PBSE_SISCOMM) {

				/*
				 * some sister Mom apparently failed to delete the job and
				 * free resoures, keep job until discard_job() does its job
				 */
				free_br(preq);
				preq = NULL;
				if (handle != -1 && pjob->ji_mom_prot == PROT_TCP)
					svr_disconnect(handle);

				if (pjob->ji_pmt_preq != NULL)
					reply_preempt_jobs_request(PBSE_SISCOMM, PREEMPT_METHOD_DELETE, pjob);

				discard_job(pjob, "A sister Mom failed to delete job", 0);
				return;
			} else if ((preq->rq_reply.brp_code == DIS_EOF) ||
				   (preq->rq_reply.brp_code == DIS_EOD)) {
				/* tcp connection to Mom broken */
				conn_to_mom_failed(pjob, on_job_rerun);
				free_br(preq);
				preq = NULL;
				return;
			} else {
				/* all went ok with the delete by Mom(s) */
				free_br(preq);
				preq = NULL;

				pjob->ji_qs.ji_obittime = time_now;
				set_jattr_l_slim(pjob, JOB_ATR_obittime, pjob->ji_qs.ji_obittime, SET);

				/* Allocate space for the jobobit hook event params */
				preq = alloc_br(PBS_BATCH_JobObit);
				if (preq == NULL) {
					log_err(PBSE_INTERNAL, __func__, "rq_jobobit alloc failed");
				} else {
					preq->rq_ind.rq_obit.rq_pjob = pjob;

					rc = process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt);
					if (rc == -1) {
						log_err(-1, __func__, "rq_jobobit process_hooks call failed");
					}
					free_br(preq);
				}

				if (handle != -1 && pjob->ji_mom_prot == PROT_TCP)
					svr_disconnect(handle);

				account_jobend(pjob, pjob->ji_acctrec, PBS_ACCT_RERUN);
				if (pjob->ji_acctrec) {
					free(pjob->ji_acctrec); /* logged, so clear it */
					pjob->ji_acctrec = NULL;
				}
				if ((is_jattr_set(pjob, JOB_ATR_resc_released))) {
					/* If JOB_ATR_resc_released attribute is set and we are trying
					 * to rerun a job then we need to reassign resources first because
					 * when we suspend a job we don't decrement all of the resources.
					 * So we need to set partially released resources
					 * back again to release all other resources
					 */
					set_resc_assigned(pjob, 0, INCR);
					free_jattr(pjob, JOB_ATR_resc_released);
					mark_jattr_not_set(pjob, JOB_ATR_resc_released);
					if (is_jattr_set(pjob, JOB_ATR_resc_released_list)) {
						free_jattr(pjob, JOB_ATR_resc_released_list);
						mark_jattr_not_set(pjob, JOB_ATR_resc_released_list);
					}
				}
				rel_resc(pjob); /* free resc assigned to job */

				/* Respond to pending preemption request from the scheduler, if any */
				if (pjob->ji_pmt_preq != NULL)
					reply_preempt_jobs_request(PBSE_NONE, PREEMPT_METHOD_REQUEUE, pjob);

				unset_extra_attributes(pjob);

				if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HOTSTART) == 0) {
					/* in case of server shutdown, don't clear exec_vnode */
					/* will use it on hotstart when next comes up	      */
					free_jattr(pjob, JOB_ATR_exec_vnode);
					free_jattr(pjob, JOB_ATR_exec_host);
					free_jattr(pjob, JOB_ATR_exec_host2);
				}
				pjob->ji_momhandle = -1;
				pjob->ji_mom_prot = PROT_INVALID;
				/* job dir has no meaning for re-queued jobs, so unset it */
				free_jattr(pjob, JOB_ATR_jobdir);

				pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_StagedIn;
				svr_evaljobstate(pjob, &newstate, &newsubst, 0);
				svr_setjobstate(pjob, newstate, newsubst);
			}
	}
}
/**
 * @brief
 * 		setrerun	- job is to be retried on start failure or
 * 		job is rerunnable and should set for rerun
 *
 * @param[in]	pjob	- job which needs to be set for rerun.
 *
 * @return	exit code
 * @retval	0	- substate set to rerun.
 * @retval	1	- substate left as it
 */
static int
setrerun(job *pjob)
{
	if ((pjob->ji_qs.ji_un.ji_exect.ji_exitstat == JOB_EXEC_RETRY) ||
	    (get_jattr_long(pjob, JOB_ATR_rerunable) != 0)) {
		set_job_substate(pjob, JOB_SUBSTATE_RERUN);
		return 0;
	} else {
		svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_init_abt);
		return 1;
	}
}

/**
 * @brief
 *		Concatenate the resources used to the buffer provided.
 *
 * @param[in,out]buffer - pointer to buffer to add info to.  May grow/change due to pbs_strcat() (realloc)
 * @param[in,out]buffer_size - size of buffer - may increase through pbs_strcat()
 * @param[in]		delim - a pointer to the delimiter to use
 * @param[in]		pjob - job structure for additional info
 */
int
concat_rescused_to_buffer(char **buffer, int *buffer_size, svrattrl *patlist, char *delim, const job *pjob)
{
	int val_len;

	if (buffer == NULL || buffer_size == NULL || patlist == NULL || delim == NULL)
		return 1;
	/*
	 * To calculate length of the string of the form "resources_used.<resource>=<value>".
	 * Additional length of 3 is required to accommodate the characters '.', '=' and '\n'.
	 */
	val_len = strlen(patlist->al_value);
	/* log to accounting_logs only if there's a value */
	if (val_len > 0) {
		if (pbs_strcat(buffer, buffer_size, delim) == NULL) {
			log_err(errno, __func__, "Failed to allocate memory.");
			return 1;
		}
		if (pbs_strcat(buffer, buffer_size, patlist->al_name) == NULL) {
			log_err(errno, __func__, "Failed to allocate memory.");
			return 1;
		}
		if (patlist->al_resc) {
			if (pbs_strcat(buffer, buffer_size, ".") == NULL) {
				log_err(errno, __func__, "Failed to allocate memory.");
				return 1;
			}
			if (pbs_strcat(buffer, buffer_size, patlist->al_resc) == NULL) {
				log_err(errno, __func__, "Failed to allocate memory.");
				return 1;
			}
		}
		if (pbs_strcat(buffer, buffer_size, "=") == NULL) {
			log_err(errno, __func__, "Failed to allocate memory.");
			return 1;
		}
		if ((pjob != NULL) &&
		    patlist->al_resc && (strcmp(patlist->al_resc, WALLTIME) == 0)) {
			long j, k;

			k = get_walltime(pjob, JOB_ATR_resc_used_acct);
			j = get_walltime(pjob, JOB_ATR_resc_used);
			if ((k >= 0) && (j >= k)) {
				char timebuf[TIMEBUF_SIZE] = {0};

				convert_duration_to_str(j - k, timebuf, TIMEBUF_SIZE);
				if (pbs_strcat(buffer, buffer_size, timebuf) == NULL) {
					log_err(errno, __func__,
						"Failed to allocate memory.");
					return 1;
				}
			} else {
				if (pbs_strcat(buffer, buffer_size,
					       patlist->al_value) == NULL) {
					log_err(errno, __func__,
						"Failed to allocate memory.");
					return 1;
				}
			}
		} else if (pbs_strcat(buffer, buffer_size,
				      patlist->al_value) == NULL) {
			log_err(errno, __func__, "Failed to allocate memory.");
			return 1;
		}
	}
	return 0;
}

/**
 * @brief
 *		Process the Job Obituary Notice (request) from MOM for a job which has.
 *		terminated.  The Obit contains the exit status and final resource
 *		usage for the job.
 * @par
 *		If the job cannot be found, the Server tells Mom to discard her copy.
 *		This may be the case if the job was forcefully deleted while Mom was
 *		down or the Server was restarted cold/clean discarding the jobs.
 *
 *		Depending on the state of the job:
 *		- Not RUNNING and not EXITING - tell Mom to discard the job.
 *		- Also not in substate _TERM - Mom wishes to restart the end of job
 *	  	 processing; likely because she hasn't heard from the Server.
 *		- If the "run count" in the obit does not match the Server's, Mom has
 *	 	 an old copy and she is told to discard it.
 * @par
 *		Normally, the Obit is received when the job is in substate RUNNING.
 *		The job is moved into that substate when Mom sends the session id of
 *		the job,  see stat_update().  However, it is possible that  the Obit
 *		is received before that and the job is in substate _PRERUN (or very
 *		unlikely _PROVISION).  If this is the case, call complete_running()
 *		to update the job to _RUNNING and write the "S" accounting record before
 *		we write the "E" record.
 * @par
 *		There are special job exit values (negative nunbers which cannot be
 *		actual exits status of the job).  These are typically because Mom
 *		could not complete starting the job or Mom is being restarted without
 *		the "-p" option.
 *		- JOB_EXEC_FAIL1: Mom could not start job, the standard out/err files
 *	  	were not created.
 *		- JOB_EXEC_FAIL2: Mom could not start the job, but had created the
 *	  	files so there is useful info in them.
 *		- JOB_EXEC_INITABT: Mom aborted the running job on her initialization.
 *		- JOB_EXEC_FAILUID: Mom aborted the job because of an invalid uid/gid.
 *		- JOB_EXEC_FAIL_PASSWORD: Mom aborted the job because she needed the
 *	  	user's password (Windows) and the password didn't work.
 *		- JOB_EXEC_RETRY: Mom couldn't start the job, but it might work later,
 *	  	so requeue it.
 *		- JOB_EXEC_BADRESRT: The job could not be started from the checkpoint
 *	  	restart file.
 *		- JOB_EXEC_INITRST: Mom aborted a checkpointed job which should be
 *	  	requeued for a later "restart".
 *		- JOB_EXEC_QUERST: The Epilogue told Mom to requeue the job which can
 *	  	be restarted from a checkpoint.
 *		- JOB_EXEC_RERUN: or JOB_EXEC_RERUN_SIS_FAIL: requeue the job if it is
 *	  	rerunable (not submitted with "-r n").
 *		- JOB_EXEC_FAILHOOK_RERUN: returned by a job rejected by a mom hook
 *	  	and the next action is to requeue/rerun the job.
 *		- JOB_EXEC_FAILHOOK_DELETE: returned by a job rejected by a mom hook
 *	  	and the next action is to just delete the job.
 *		- JOB_EXEC_HOOK_RERUN - returned by a job that ran a mom hook that
 *	  	instructed the server to requeue the job once reaching the end.
 *		- JOB_EXEC_HOOK_DELETE - returned by a job that ran a mom hook that
 *	  	instructed the server to delete the job  once reaching the end.
 *		- JOB_EXEC_HOOKERROR: returned by a job rejected by a mom hook
 *		due to an exception, or hook alarm was raised,
 *		and the next action is to requeue/rerun the job.
 * @par
 *		Otherwise record the accounting information to be recorded later in the
 *		processing.  Now, the job is moved into "exiting" processing or "rerun"
 *		porcessiong (qrerun) via a work-task entry invoking either on_job_exit()
 *		or on_job_rerun().
 *
 * @param[in] - pruu   - the structure containing the resource usage info
 * @param[in] - stream - the TPP stream connecting to the Mom
 *                       The Server will send back either a rejection or an acceptance
 *                       of  the Obit.
 * @return int
 *
 * @retval 0  - accept obit
 * @retval 1  - reject obit
 * @retval -1 - ignore obit
 *
 */
int
job_obit(ruu *pruu, int stream)
{
	int alreadymailed = 0;
	char *acctbuf = NULL;
	int acctbuf_size = 0;
	int dummy;
	int num;
	int exitstatus;
	int local_exitstatus = 0;
	char *mailbuf = NULL;
	int mailbuf_size = 0;
	char newstate;
	int newsubst;
	job *pjob;
	svrattrl *patlist;
	struct work_task *ptask;
	void (*eojproc)();
	char *mailmsg = NULL;
	char *msg = NULL;

	time_now = time(0);

	DBPRT(("%s: Obit received for job %s status=%d hop=%d\n", __func__, pruu->ru_pjobid, pruu->ru_status, pruu->ru_hop))
	pjob = find_job(pruu->ru_pjobid);
	if (pjob == NULL) { /* not found */
		DBPRT(("%s: job %s not found!\n", __func__, pruu->ru_pjobid))
		if (server_init_type == RECOV_COLD || server_init_type == RECOV_CREATE)
			sprintf(log_buffer, msg_obitnojob, PBSE_CLEANEDOUT);
		else if (is_job_array(pruu->ru_pjobid) == IS_ARRAY_Single)
			sprintf(log_buffer, "%s", msg_obitnotrun);
		else
			sprintf(log_buffer, msg_obitnojob, PBSE_UNKJOBID);
		log_event(PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_NOTICE, pruu->ru_pjobid, log_buffer);

		/* tell MOM the job was blown away */
		return 1;
	}

	log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO, pruu->ru_pjobid,
		   "Obit received momhop:%d serverhop:%ld state:%c substate:%d",
		   pruu->ru_hop, get_jattr_long(pjob, JOB_ATR_run_version), get_job_state(pjob), get_job_substate(pjob));

	if (!check_job_state(pjob, JOB_STATE_LTR_RUNNING)) {
		DBPRT(("%s: job %s not in running state!\n",
		       __func__, pruu->ru_pjobid))
		if (!check_job_state(pjob, JOB_STATE_LTR_EXITING)) {

			/* not running and not exiting - bad news   */
			/* may be from old Mom and job was requeued */
			/* tell mom to trash job		    */
			DBPRT(("%s: job %s not in exiting state!\n",
			       __func__, pruu->ru_pjobid))
			pjob->ji_discarding = 0;

			log_event(PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, pruu->ru_pjobid, msg_obitnotrun);
			return 1;
		} else if (!check_job_substate(pjob, JOB_SUBSTATE_TERM)) {
			/*
			 * not in special site script substate, Mom must have
			 * had a problem and wants to have the post job
			 * processing restarted.
			 *
			 * If there is an open connection to Mom for this job,
			 * find the associate work task, remove and free it and
			 * any outstanding batch_request to Mom.  Then close
			 * the connection so we start fresh and stay in sync.
			 */
			if (pjob->ji_momhandle != -1) {
				struct batch_request *prequest;
				extern pbs_list_head task_list_event;

				ptask = (struct work_task *) GET_NEXT(task_list_event);
				while (ptask) {
					if (ptask->wt_type == WORK_Deferred_Reply && ptask->wt_event == pjob->ji_momhandle)
						break;
					ptask = (struct work_task *) GET_NEXT(ptask->wt_linkevent);
				}
				if (ptask) {
					if ((prequest = ptask->wt_parm1) != NULL)
						free_br(prequest);
					delete_task(ptask);
				}
				if (pjob->ji_mom_prot == PROT_TCP)
					svr_force_disconnect(pjob->ji_momhandle);

				pjob->ji_momhandle = -1;
				pjob->ji_mom_prot = PROT_INVALID;
			}
			if (get_job_substate(pjob) < JOB_SUBSTATE_RERUN)
				eojproc = on_job_exit;
			else
				eojproc = on_job_rerun;
			ptask = set_task(WORK_Immed, 0, eojproc, (void *) pjob);
			append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);
			return -1;
		}
		/*
		 * State EXITING and substate TERM, this is the real obit
		 * so fall throught and start real end of job processing
		 */
	}

	if (pruu->ru_hop < get_jattr_long(pjob, JOB_ATR_run_version)) {
		/*
		 * Obit is for an older run version,  likely a Mom coming back
		 * alive after being down awhile and job was requeue and run
		 * somewhere else.   Just tell Mom to junk job
		 */
		DBPRT(("%s: job %s run count too low\n", __func__, pruu->ru_pjobid))
		return 1;
	} else if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {

		/*
		 * Won't have a valid hop count in the job structure
		 * look at where job is running and who is sending the obit
		 */

		int ivndx;
		mominfo_t *psendmom;
		struct pbsnode *sendvnp;
		char *runningnode;
		extern struct tree *streams;

		psendmom = tfind2(stream, 0, &streams);
		runningnode = parse_servername(get_jattr_str(pjob, JOB_ATR_exec_vnode), NULL);
		if (psendmom && runningnode) {
			for (ivndx = 0; ivndx < ((mom_svrinfo_t *) (psendmom->mi_data))->msr_numvnds; ++ivndx) {
				sendvnp = ((mom_svrinfo_t *) (psendmom->mi_data))->msr_children[ivndx];
				if (strcasecmp(runningnode, sendvnp->nd_name) == 0) {
					break;
				}
			}
			if (ivndx == ((mom_svrinfo_t *) (psendmom->mi_data))->msr_numvnds) {
				/* not the same node, reject the obit */
				return 1;
			}
		}
	}

	/*
	 * have hit a race condition where the send_job child's process
	 * may not yet have been reaped.  Update accounting for job start
	 */

	if (check_job_substate(pjob, JOB_SUBSTATE_PRERUN) || check_job_substate(pjob, JOB_SUBSTATE_PROVISION)) {
		DBPRT(("%s: job %s in prerun state.\n", __func__, pruu->ru_pjobid))
		complete_running(pjob);
	}
	if (pjob->ji_prunreq) {
		reply_ack(pjob->ji_prunreq);
		pjob->ji_prunreq = NULL;
	}

	/* save exit state, update the resources used */
	exitstatus = pruu->ru_status;
	pjob->ji_qs.ji_un.ji_exect.ji_exitstat = exitstatus;

	/* set the Exit_status job attribute */
	if (is_jattr_set(pjob, JOB_ATR_exit_status))
		local_exitstatus = get_jattr_long(pjob, JOB_ATR_exit_status);

	if ((local_exitstatus == JOB_EXEC_HOOK_RERUN || local_exitstatus == JOB_EXEC_HOOK_DELETE) &&
	    exitstatus != JOB_EXEC_FAILHOOK_RERUN && exitstatus != JOB_EXEC_FAILHOOK_DELETE)
		exitstatus = local_exitstatus;
	else
		set_jattr_l_slim(pjob, JOB_ATR_exit_status, exitstatus, SET);

	patlist = (svrattrl *) GET_NEXT(pruu->ru_attr);

	/* record usage attribute to job for history */
	dummy = 0;
	if (modify_job_attr(pjob, patlist, ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &dummy) != 0) {
		for (num = 1; num < dummy; num++)
			patlist = (struct svrattrl *) GET_NEXT(patlist->al_link);
		log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE, pjob->ji_qs.ji_jobid,
			   "unable to update attribute %s.%s in job_obit", patlist->al_name, patlist->al_resc);
	}

	/* Allocate initial space for acctbuf/mailbuf.  Future space will be allocated by pbs_strcat(). */
	acctbuf = malloc(RESC_USED_BUF_SIZE);
	mailbuf = malloc(RESC_USED_BUF_SIZE);

	if (acctbuf == NULL || mailbuf == NULL) {
		log_err(errno, __func__, "Failed to allocate memory");
		/* Just incase one of the buffers got allocated */
		free(acctbuf);
		acctbuf = NULL;
		free(mailbuf);
		mailbuf = NULL;
	} else {
		acctbuf_size = RESC_USED_BUF_SIZE;
		mailbuf_size = RESC_USED_BUF_SIZE;

		snprintf(acctbuf, acctbuf_size, msg_job_end_stat, pjob->ji_qs.ji_un.ji_exect.ji_exitstat);
		if (exitstatus < 10000)
			strncpy(mailbuf, acctbuf, mailbuf_size);
		else
			snprintf(mailbuf, mailbuf_size, msg_job_end_sig, exitstatus - 10000);
		/*
		 * NOTE:
		 * Following code for constructing resources used information is same as account_jobend()
		 * with minor difference that to traverse patlist in this code
		 * we have to use GET_NEXT(patlist->al_link) since it is part of batch request
		 * and in account_jobend() we are using patlist->al_sister which is encoded
		 * information in job struct.
		 * This collects all resources_used information returned from the mom.
		 */
		for (; patlist; patlist = (svrattrl *) GET_NEXT(patlist->al_link)) {
			resource_def *tmpdef;

			if (strcmp(patlist->al_name, ATTR_used) != 0)
				continue;
			tmpdef = find_resc_def(svr_resc_def, patlist->al_resc);
			if (tmpdef == NULL)
				continue;
			/*
			 * Copy all resources to the accounting buffer.
			 * Copy all but invisible resources into the mail buffer.
			 * The ATR_DFLAG_USRD flag will not be set on invisible resources.
			 */
			if (concat_rescused_to_buffer(&acctbuf, &acctbuf_size, patlist, " ", pjob) != 0)
				break;
			if (tmpdef->rs_flags & ATR_DFLAG_USRD) {
				if (concat_rescused_to_buffer(&mailbuf, &mailbuf_size, patlist, "\n", pjob) != 0)
					break;
			}
		}
	}

	/* make sure ji_momhandle is -1 to force new connection to mom */
	pjob->ji_momhandle = -1;
	pjob->ji_mom_prot = PROT_INVALID;
	pjob->ji_retryok = 0; /* for retry if Mom down */

	/* clear suspended flag if it was set, also clear suspended-workstation busy flag if set */

	pjob->ji_qs.ji_svrflags &= ~(JOB_SVFLG_Suspend | JOB_SVFLG_Actsuspd);

	/* Was there a special exit status from MOM ? */
	if (exitstatus < 0 && exitstatus != JOB_EXEC_CHKP) {
		/* negative exit status is special */
		switch (exitstatus) {
			case JOB_EXEC_FAILHOOK_DELETE:
				/* this is a reject */
				log_event(PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, pjob->ji_qs.ji_jobid, msg_hook_reject_deletejob);
				DBPRT(("%s: MOM rejected job %s due to a hook.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_hook_reject_deletejob);
				alreadymailed = 1;
				break;

			case JOB_EXEC_HOOK_DELETE:
				/* more likely an accept with a hook delete option */
				log_event(PBSEVENT_ADMIN | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, pjob->ji_qs.ji_jobid, "a hook requested for job to be deleted");
				DBPRT(("%s: a hook requested for job %s to be deleted.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, "a hook requested for job to be deleted");
				alreadymailed = 1;
				break;

			case JOB_EXEC_FAIL1:
			default:
				/* MOM rejected job with fatal error, abort job */
				DBPRT(("%s: MOM rejected job %s with fatal error.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momnoexec1);
				alreadymailed = 1;
				break;

			case JOB_EXEC_FAIL2:
				/* MOM reject job after files setup, abort job */
				DBPRT(("%s: MOM rejected job %s after setup.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momnoexec2);
				alreadymailed = 1;
				break;

			case JOB_EXEC_INITABT:
				/* MOM aborted job on her initialization */
				DBPRT(("%s: MOM aborted job %s on init, no requeue.\n", __func__, pruu->ru_pjobid))
				alreadymailed = setrerun(pjob);
				pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN;
				break;

			case JOB_EXEC_FAILUID:
				/* MOM abort job because uid or gid was invalid */
				DBPRT(("%s: MOM rejected job %s with invaild uid/gid.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_baduser);
				alreadymailed = 1;
				/* go to the retry case */
				goto RetryJob;

			case JOB_EXEC_FAIL_PASSWORD:

				/* put job on password hold */
				set_jattr_b_slim(pjob, JOB_ATR_hold, HOLD_bad_password, INCR);

				set_job_substate(pjob, JOB_SUBSTATE_HELD);
				svr_evaljobstate(pjob, &newstate, &newsubst, 0);
				svr_setjobstate(pjob, newstate, newsubst);

				msg = pruu->ru_comment ? pruu->ru_comment : "";
				mailmsg = (char *) malloc(strlen(msg) + 1 + strlen(msg_bad_password) + 1);
				if (mailmsg) {
					sprintf(mailmsg, "%s:%s", msg, msg_bad_password);
					svr_mailowner(pjob, MAIL_BEGIN, MAIL_FORCE, mailmsg);
					set_jattr_str_slim(pjob, JOB_ATR_Comment, mailmsg, NULL);

					log_event(PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
						  pjob->ji_qs.ji_jobid, mailmsg);
					free(mailmsg);
				} else {
					svr_mailowner(pjob, MAIL_BEGIN, MAIL_FORCE, msg_bad_password);
					set_jattr_str_slim(pjob, JOB_ATR_Comment, msg_bad_password, NULL);
				}

			case JOB_EXEC_RETRY:
			case JOB_EXEC_FAILHOOK_RERUN:
			case JOB_EXEC_HOOK_RERUN:
			case JOB_EXEC_HOOKERROR:
			case JOB_EXEC_JOINJOB:
				if (exitstatus == JOB_EXEC_FAILHOOK_RERUN || exitstatus == JOB_EXEC_HOOKERROR) {
					log_event(PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, pjob->ji_qs.ji_jobid, msg_hook_reject_rerunjob);
					DBPRT(("%s: MOM rejected job %s due to a hook.\n", __func__, pruu->ru_pjobid))
				} else if (exitstatus == JOB_EXEC_JOINJOB) {
					log_event(PBSEVENT_ERROR | PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
						  pjob->ji_qs.ji_jobid, "Mom rejected job due to join job error");
					exitstatus = JOB_EXEC_RETRY;
				}
			RetryJob:
				/* MOM rejected job, but said retry it */
				DBPRT(("%s: MOM rejected job %s but will retry.\n", __func__, pruu->ru_pjobid))
				if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HASRUN) /* has run before, treat this as another rerun */
					alreadymailed = setrerun(pjob);
				else /* have mom remove job files, not saving them, and requeue job */
					set_job_substate(pjob, JOB_SUBSTATE_RERUN2);

				check_failed_attempts(pjob);
				break;

			case JOB_EXEC_BADRESRT:
				/* MOM could not restart job, setup for rerun */
				DBPRT(("%s: MOM could not restart job %s, will rerun.\n", __func__, pruu->ru_pjobid))
				alreadymailed = setrerun(pjob);
				pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_CHKPT;
				break;

			case JOB_EXEC_INITRST:
				/*
					* Mom aborted job on Mom being restarted, job has been
					* checkpointed and can be "restarted" rather than rerun
					*/
			case JOB_EXEC_QUERST:
				/*
					* Epilogue requested requeue of a checkpointed job
					* it can be restarted later from restart file
					*
					* In both cases, job has checkpoint/restart file,
					* requeue job and leave all information on execution
					* host for a later restart
					*/
				DBPRT(("%s: MOM request requeue of job for restart.\n", __func__))
				if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob)
					goto RetryJob;

				rel_resc(pjob);
				pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN | JOB_SVFLG_CHKPT;

				svr_evaljobstate(pjob, &newstate, &newsubst, 1);
				svr_setjobstate(pjob, newstate, newsubst);
				if (pjob->ji_mom_prot == PROT_TCP)
					svr_disconnect(pjob->ji_momhandle);

				pjob->ji_momhandle = -1;
				pjob->ji_mom_prot = PROT_INVALID;

				free(mailbuf);
				free(acctbuf);
				return 0;

			case JOB_EXEC_INITRMG:
				/*
					* MOM abort job on init, job has migratable checkpoint
					* Must recover output and checkpoint file, do eoj
					*/

				DBPRT(("%s: MOM aborted migratable job %s on init, will requeue.\n", __func__, pruu->ru_pjobid))

				if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob)
					goto RetryJob;

				alreadymailed = setrerun(pjob);
				pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN | JOB_SVFLG_ChkptMig;
				break;

			case JOB_EXEC_RERUN:
			case JOB_EXEC_RERUN_SIS_FAIL:
				if (get_jattr_long(pjob, JOB_ATR_rerunable))
					set_job_substate(pjob, JOB_SUBSTATE_RERUN);
				else {
					set_job_substate(pjob, JOB_SUBSTATE_EXITING);
					svr_mailowner(pjob, MAIL_ABORT, MAIL_NORMAL,
						      "Non-rerunable job deleted on requeue");
				}
				break;
			case JOB_EXEC_FAIL_SECURITY:
				/* MOM rejected job with security breach fatal error, abort job */
				DBPRT(("%s: MOM rejected job %s with security breach fatal error.\n", __func__, pruu->ru_pjobid))
				set_jattr_b_slim(pjob, JOB_ATR_hold, HOLD_s, INCR);
				set_jattr_str_slim(pjob, JOB_ATR_Comment,
						   "job held due to possible security breach of job tmpdir, failed to start", NULL);
				rel_resc(pjob);
				svr_setjobstate(pjob, JOB_STATE_LTR_HELD, JOB_SUBSTATE_HELD);
				free(mailbuf);
				free(acctbuf);
				return 0;
			case JOB_EXEC_KILL_NCPUS_BURST:
				/* MOM killed job due to exceeding ncpus (burst), abort job */
				DBPRT(("%s: MOM killed job %s due to exceeding ncpus (burst).\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momkillncpusburst);
				alreadymailed = 1;
				break;
			case JOB_EXEC_KILL_NCPUS_SUM:
				/* MOM killed job due to exceeding ncpus (sum), abort job */
				DBPRT(("%s: MOM killed job %s due to exceeding ncpus (sum).\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momkillncpussum);
				alreadymailed = 1;
				break;
			case JOB_EXEC_KILL_VMEM:
				/* MOM killed job due to exceeding vmem, abort job */
				DBPRT(("%s: MOM killed job %s due to exceeding vmem.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momkillvmem);
				alreadymailed = 1;
				break;
			case JOB_EXEC_KILL_MEM:
				/* MOM killed job due to exceeding mem, abort job */
				DBPRT(("%s: MOM killed job %s due to exceeding mem.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momkillmem);
				alreadymailed = 1;
				break;
			case JOB_EXEC_KILL_CPUT:
				/* MOM killed job due to exceeding cput, abort job */
				DBPRT(("%s: MOM killed job %s due to exceeding cput.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momkillcput);
				alreadymailed = 1;
				break;
			case JOB_EXEC_KILL_WALLTIME:
				/* MOM killed job due to exceeding walltime, abort job */
				DBPRT(("%s: MOM killed job %s due to exceeding walltime.\n", __func__, pruu->ru_pjobid))
				svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, msg_momkillwalltime);
				alreadymailed = 1;
				break;
		}
	}

	/* Send email if exiting (not rerun) */

	if ((exitstatus == JOB_EXEC_FAILHOOK_DELETE) || (exitstatus == JOB_EXEC_HOOK_DELETE) ||
	    (!check_job_substate(pjob, JOB_SUBSTATE_RERUN) && !check_job_substate(pjob, JOB_SUBSTATE_RERUN2))) {
		DBPRT(("%s: Job %s is terminating and not rerun.\n", __func__, pjob->ji_qs.ji_jobid))

		svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_EXITING);
		if (alreadymailed == 0 && mailbuf != NULL)
			svr_mailowner(pjob, MAIL_END, MAIL_NORMAL, mailbuf);
	}

	/* can free this now since no need to use it */
	free(mailbuf);

	/* save record accounting for later */
	free(pjob->ji_acctrec);
	pjob->ji_acctrec = acctbuf;

	/* Now, what do we do with the job... */
	if (exitstatus == JOB_EXEC_FAILHOOK_DELETE || exitstatus == JOB_EXEC_HOOK_DELETE ||
	    (!check_job_substate(pjob, JOB_SUBSTATE_RERUN) && !check_job_substate(pjob, JOB_SUBSTATE_RERUN2))) {
		if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT) && ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) == 0) && (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HASHOLD)) {

			/* non-migratable checkpoint, leave there
			 * and just requeue the job.
			 */

			rel_resc(pjob);
			pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HASRUN;
			pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_HASHOLD;
			svr_evaljobstate(pjob, &newstate, &newsubst, 1);
			svr_setjobstate(pjob, newstate, newsubst);
			if (pjob->ji_mom_prot == PROT_TCP)
				svr_disconnect(pjob->ji_momhandle);
			pjob->ji_momhandle = -1;
			pjob->ji_mom_prot = PROT_INVALID;
			return 0;
		}

		check_block(pjob, ""); /* if block set, send word */
		ptask = set_task(WORK_Immed, 0, on_job_exit, (void *) pjob);
		append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);

		/* "on_job_exit()" will be dispatched out of the main loop */

	} else {
		/*
		 * Rerunning job ...
		 * If not checkpointed, clear "resources_used"
		 * Requeue job
		 */
		DBPRT(("%s: Rerunning job %s\n", __func__, pjob->ji_qs.ji_jobid))
		if ((pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHKPT | JOB_SVFLG_ChkptMig)) == 0)
			free_jattr(pjob, JOB_ATR_resc_used);

		svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, get_job_substate(pjob));
		ptask = set_task(WORK_Immed, 0, on_job_rerun, (void *) pjob);
		append_link(&pjob->ji_svrtask, &ptask->wt_linkobj, ptask);

		/* "on_job_rerun()" will be dispatched out of the main loop */
	}

	DBPRT(("%s: Returning from end of function.\n", __func__))

	return 0;
}
