/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 * @file	req_runjob.c
 *
 * @brief
 * 		req_runjob.c - functions dealing with a Run Job Request
 *
 * Included functions are:
 *	check_and_provision_job()
 *	clear_from_defr()
 *	req_runjob()
 *	req_runjob2()
 *	clear_exec_on_run_fail()
 *	req_stagein()
 *	post_stagein()
 *	svr_stagein()
 *	svr_startjob()
 *	svr_strtjob2()
 *	complete_running()
 *	parse_hook_rejectmsg()
 *	post_sendmom()
 *	chk_job_torun()
 *	where_to_runjob()
 *	assign_hosts()
 *	req_defschedreply()
 *	check_failed_attempts()
 *
 */

#include <pbs_config.h> /* the master config generated by configure */

#include <ctype.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>

#include <strings.h>
#include <sys/wait.h>

#include <signal.h>
#include <stdlib.h>
#include "libpbs.h"
#include "server_limits.h"
#include "list_link.h"
#include "attribute.h"
#include "resource.h"
#include "server.h"
#include "credential.h"
#include "batch_request.h"
#include "job.h"
#include "reservation.h"
#include "queue.h"
#include "work_task.h"
#include "pbs_error.h"
#include "log.h"
#include "acct.h"
#include "net_connect.h"
#include "pbs_nodes.h"
#include "svrfunc.h"
#include <libutil.h>
#include "sched_cmds.h"
#include "pbs_license.h"
#include "hook.h"
#include "provision.h"
#include "pbs_share.h"
#include "pbs_sched.h"

/* External Functions Called: */

extern struct batch_request *cpy_stage(struct batch_request *, job *,
				       enum job_atr, int);
extern struct batch_request *cpy_stage(struct batch_request *, job *, enum job_atr, int);

/* Public Functions in this file */

int svr_startjob(job *, struct batch_request *);
extern char *msg_daemonname;
extern char *path_hooks_workdir;
extern char *msg_hook_reject_deletejob;

/* Private Function local to this file */

void post_sendmom(struct work_task *);
static int svr_stagein(job *, struct batch_request *, char, int);
static int svr_strtjob2(job *, struct batch_request *);
static job *chk_job_torun(struct batch_request *preq, job *);
static int req_runjob2(struct batch_request *preq, job *pjob);
static job *where_to_runjob(struct batch_request *preq, job *);
static void convert_job_to_resv(job *pjob);
/* Global Data Items: */

extern pbs_net_t pbs_mom_addr;
extern int pbs_mom_port;
extern struct server server;
extern char *msg_badexit;
extern char *msg_jobrun;
extern char *msg_job_end_sig;
extern char *msg_init_substate;
extern char *msg_manager;
extern char *msg_stageinfail;
extern char *msg_job_abort;
extern pbs_list_head svr_deferred_req;
extern time_t time_now;
extern int svr_totnodes; /* non-zero if using nodes */
extern job *chk_job_request(char *, struct batch_request *, int *, int *);
extern int send_cred(job *pjob);

/* private data */

/**
 * @brief
 *		Take a batch_request and job pointer as arguments
 *	 	Enque provisioning by calling check_and_engue_provisioning
 *	 	if Enque is successful, sets the job substate to provisioning
 *	 	else returns an error, caller sends a req_reject to scheduler
 *
 * @see
 *		req_runjob2
 *
 * @param[in]	preq	-	batch_request
 * @param[in,out]	pjob	-	job pointer
 * @param[out]	need_prov	-	boolean value, whether job will provision
 *
 * @return	int
 * @retval	0	: no provisioning required
 * @retval	-1	: provisioning required
 * @retval	>0	: PBS error codes
 *
 * @par Side Effects:
 *	Unknown
 *
 * @par MT-safe: No
 *
 */
static int
check_and_provision_job(struct batch_request *preq, job *pjob, int *need_prov)
{
	int rc = 0;

	/* prov node is part of exec_vnodes, */
	/* cut and update exec_vnode and prov_vnode */
	if (!preq || !pjob || !need_prov)
		return (PBSE_IVALREQ);

	rc = check_and_enqueue_provisioning(pjob, need_prov);
	if (rc) {
		/* log message about failure to start provisioning for a job */
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid,
			  "Job failed to start provisioning");

		/* put system hold and move to held state */
		set_jattr_b_slim(pjob, JOB_ATR_hold, HOLD_s, INCR);
		svr_setjobstate(pjob, JOB_STATE_LTR_HELD, JOB_SUBSTATE_HELD);
		set_jattr_str_slim(pjob, JOB_ATR_Comment, "job held, provisioning failed to start", NULL);

		/* not offlining vnodes, since its not a vnode's fault. vnode */
		/* is good to run other jobs, so why waste resource. */
		return rc;
	}

	if (*need_prov == 0)
		return PBSE_NONE;

	/* provisioning was needed and enqueued */

	svr_setjobstate(pjob, JOB_STATE_LTR_RUNNING, JOB_SUBSTATE_PROVISION);
	DBPRT(("%s: Sucessfully enqueued provisioning for job %s\n", __func__, pjob->ji_qs.ji_jobid))

	/* log accounting line for start of prov for a job */
	set_job_ProvAcctRcd(pjob, time_now, PROVISIONING_STARTED);

	return PBSE_NONE;
}

/**
 * @brief
 *		Search for a deferred reply element in the list whose client
 *		request came in on the socket which was just closed.  If found,
 *		clear the pointer to the original request which has been freed
 *		when the connection was closed.
 *
 * @param[in]	sd	-	socket which was just closed
 *
 * @return	none
 *
 * @par MT-safe: not really
 */
static void
clear_from_defr(int sd)
{
	struct deferred_request *pdefr;

	for (pdefr = (struct deferred_request *) GET_NEXT(svr_deferred_req);
	     pdefr;
	     pdefr = (struct deferred_request *) GET_NEXT(pdefr->dr_link)) {
		if (pdefr->dr_preq != NULL) {
			if (pdefr->dr_preq->rq_conn == sd) {
				/* found deferred run job request whose */
				/* connection to the client has closed  */
				if (pdefr->dr_sent != 0) {
					/* request sent to scheduler, wait   */
					/* for it to respond before removing */
					/* this request, just null the qrun  */
					/* request pointer                   */
					pdefr->dr_preq = NULL;
				} else {
					/* unlink & free the deferred request */
					delete_link(&pdefr->dr_link);
					free(pdefr);
				}
				break;
			}
		}
	}
}

/**
 * @brief	Wrapper function that calls process_hooks()
 *
 * @see		req_runjob()
 *
 * @return	int
 *
 */
int
call_to_process_hooks(struct batch_request *preq, char *hook_msg, size_t msg_len,
		      void(*pyinter_func))
{
	int rc;
	rc = process_hooks(preq, hook_msg, msg_len, pyinter_func);
	if (rc == -1)
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
			  LOG_INFO, "", "runjob event: accept req by default");
	return rc;
}

/**
 * @brief
 * 	req_runjob - service the Run Job and Asyc Run Job Requests
 *
 * @par
 *	This request forces a job into execution. Client must be privileged to run job.
 *
 * @param[in] preq - pointer to batch request structure
 *
 * @return void
 *
 */
void
req_runjob(struct batch_request *preq)
{
	int anygood;
	int i;
	int j;
	char *jid;
	int jt; /* job type */
	char *pc;
	job *pjob = NULL;
	job *pjobsub = NULL;
	job *parent = NULL;
	char *range;
	int start;
	int end;
	int step;
	int count;
	struct deferred_request *pdefr;
	char hook_msg[HOOK_MSG_SIZE];
	pbs_sched *psched;
	char sjst;

	if ((preq->rq_perm & (ATR_DFLAG_MGWR | ATR_DFLAG_OPWR)) == 0) {
		req_reject(PBSE_PERM, 0, preq);
		return;
	}

	jid = preq->rq_ind.rq_run.rq_jid;
	parent = chk_job_request(jid, preq, &jt, NULL);
	if (parent == NULL)
		return; /* note, req_reject already called */

	/* the job must be in an execution queue */
	if (parent->ji_qhdr->qu_qs.qu_type != QTYPE_Execution) {
		req_reject(PBSE_IVALREQ, 0, preq);
		return;
	}

	if (!find_assoc_sched_jid(jid, &psched)) {
		sprintf(log_buffer, "Unable to reach scheduler associated with job %s", jid);
		log_err(-1, __func__, log_buffer);
		req_reject(PBSE_NOSCHEDULER, 0, preq);
		return;
	}

	if ((psched->sc_cycle_started != -1) && was_job_alteredmoved(parent)) {
		/* Reject run request for altered/moved jobs if job_run_wait is set to "execjob_hook" */
		if (!is_sched_attr_set(psched, SCHED_ATR_job_run_wait) ||
		    (!strcmp(get_sched_attr_str(psched, SCHED_ATR_job_run_wait), RUN_WAIT_EXECJOB_HOOK))) {
			req_reject(PBSE_NORUNALTEREDJOB, 0, preq);
			set_scheduler_flag(SCH_SCHEDULE_NEW, psched);
			return;
		}
	}

	if (jt == IS_ARRAY_NO) {
		/* just a regular job, pass it on down the line and be done */
		pjob = chk_job_torun(preq, parent);
		if (pjob == NULL)
			return;
		if (pjob->ji_discarding) {
			req_reject(PBSE_BADSTATE, 0, preq);
			return;
		}
	} else if (jt == IS_ARRAY_Single) {
		/* single subjob, if running can signal */
		pjob = get_subjob_and_state(parent, get_index_from_jid(jid), &sjst, NULL);
		if (sjst == JOB_STATE_LTR_UNKNOWN) {
			req_reject(PBSE_IVALREQ, 0, preq);
			return;
		} else if (sjst != JOB_STATE_LTR_QUEUED || (pjob && pjob->ji_discarding)) {
			/* job already running or discarding  */
			req_reject(PBSE_BADSTATE, 0, preq);
			return;
		}
	} else if (jt == IS_ARRAY_ArrayJob) {
		/* invalid to run the array itself */
		req_reject(PBSE_IVALREQ, 0, preq);
		return;
	} else {
		/*
		 * what's left to handle is a range of subjobs,
		 * validate that given range has atleast one subjob
		 * in queue state
		 */
		anygood = 0;
		range = get_range_from_jid(jid);
		if (range == NULL) {
			req_reject(PBSE_IVALREQ, 0, preq);
			return;
		}

		while (1) {
			if ((i = parse_subjob_index(range, &pc, &start, &end, &step, &count)) == -1) {
				req_reject(PBSE_IVALREQ, 0, preq);
				return;
			} else if (i == 1)
				break; /* no more in the range */
			for (i = start; i <= end; i += step) {
				pjob = get_subjob_and_state(parent, i, &sjst, NULL);
				if (sjst == JOB_STATE_LTR_QUEUED) {
					anygood = 1;
					break;
				}
			}
			range = pc;
		}
		if (anygood == 0) {
			req_reject(PBSE_BADSTATE, 0, preq);
			return;
		}
	}

	/*
	 * At this point, we know the basic request to run the job
	 * or jobs is valid, so we can proceed farther.
	 * If there is a specified list of execution vnodes and
	 * resources, then process the run request, else it has
	 * to go to the scheduler
	 */
	if ((preq->rq_ind.rq_run.rq_destin == NULL) ||
	    (*preq->rq_ind.rq_run.rq_destin == '\0')) {
		char fixjid[PBS_MAXSVRJOBID + 1];

		/* if runjob request is from the Scheduler, it must have a destination specified */
		if (preq->rq_conn == psched->sc_primary_conn) {
			log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, LOG_INFO, jid, "runjob request from scheduler with null destination");
			req_reject(PBSE_IVALREQ, 0, preq);
			return;
		}
		pdefr = (struct deferred_request *) malloc(sizeof(struct deferred_request));
		if (pdefr == NULL) {
			req_reject(PBSE_SYSTEM, 0, preq);
			return;
		}
		CLEAR_LINK(pdefr->dr_link);

		/*
		 * fix the job id so the suffix matches the real jobid's
		 * suffix;  in case qrun 1.short vs 1.short.domain.com
		 */
		snprintf(fixjid, sizeof(fixjid), "%s", jid);
		pc = strchr(fixjid, (int) '.');
		if (pc)
			*pc = '\0';
		pc = strchr(parent->ji_qs.ji_jobid, (int) '.');
		if (pc)
			strcat(fixjid, pc);

		pbs_strncpy(pdefr->dr_id, fixjid, PBS_MAXSVRJOBID + 1);
		pdefr->dr_preq = preq;
		pdefr->dr_sent = 0;
		append_link(&svr_deferred_req, &pdefr->dr_link, pdefr);
		/* ensure that request is removed if client connect is closed */
		net_add_close_func(preq->rq_conn, clear_from_defr);

		if (schedule_jobs(psched) == -1) {
			/* unable to contact the Scheduler, reject */
			req_reject(PBSE_NOSCHEDULER, 0, preq);
			/* unlink and free the deferred request entry */
			delete_link(&pdefr->dr_link);
			free(pdefr);
		}
		return;
	}

	DBPRT(("req_runjob: received command to run job on destin %s\n", preq->rq_ind.rq_run.rq_destin))

	/*
	 * OK - go back over the run job request, assign the vhosts
	 * and finally run the job by calling req_runjob2()
	 */
	if (jt == IS_ARRAY_NO) {

		/* just a regular job, pass it on down the line and be done */
		if (call_to_process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt) == 0) {
			reply_text(preq, PBSE_HOOKERROR, hook_msg);
			return;
		}
		pjob = where_to_runjob(preq, parent);
		if (pjob) {
			/* free prov_vnode before use */
			free_jattr(pjob, JOB_ATR_prov_vnode);
			req_runjob2(preq, parent);
		}
		return;

	} else if (jt == IS_ARRAY_Single) {
		attribute sub_runcount;
		attribute sub_run_version;
		attribute sub_prev_res;

		clear_attr(&sub_runcount, &job_attr_def[JOB_ATR_runcount]);
		clear_attr(&sub_run_version, &job_attr_def[JOB_ATR_run_version]);
		clear_attr(&sub_prev_res, &job_attr_def[JOB_ATR_resource]);

		pjobsub = get_subjob_and_state(parent, get_index_from_jid(jid), NULL, NULL);
		if (pjobsub != NULL) {
			if (is_jattr_set(pjobsub, JOB_ATR_runcount))
				set_attr_with_attr(&job_attr_def[JOB_ATR_runcount], &sub_runcount, get_jattr(pjobsub, JOB_ATR_runcount), SET);
			if (is_jattr_set(pjobsub, JOB_ATR_run_version))
				set_attr_with_attr(&job_attr_def[JOB_ATR_run_version], &sub_run_version, get_jattr(pjobsub, JOB_ATR_run_version), SET);
			if (is_jattr_set(pjobsub, JOB_ATR_resource))
				set_attr_with_attr(&job_attr_def[JOB_ATR_resource], &sub_prev_res, get_jattr(pjobsub, JOB_ATR_resource), SET);
			job_purge(pjobsub);
		}

		if ((pjobsub = create_subjob(parent, jid, &j)) == NULL) {
			if (is_attr_set(&sub_runcount))
				free_attr(job_attr_def, &sub_runcount, JOB_ATR_runcount);
			if (is_attr_set(&sub_run_version))
				free_attr(job_attr_def, &sub_run_version, JOB_ATR_run_version);
			if (is_attr_set(&sub_prev_res))
				free_attr(job_attr_def, &sub_prev_res, JOB_ATR_resource);
			req_reject(j, 0, preq);
			return;
		}

		if (is_attr_set(&sub_runcount)) {
			free_jattr(pjobsub, JOB_ATR_runcount);
			set_attr_with_attr(&job_attr_def[JOB_ATR_runcount], get_jattr(pjobsub, JOB_ATR_runcount), &sub_runcount, SET);
			free_attr(job_attr_def, &sub_runcount, JOB_ATR_runcount);
		}

		if (is_attr_set(&sub_run_version)) {
			free_jattr(pjobsub, JOB_ATR_run_version);
			set_attr_with_attr(&job_attr_def[JOB_ATR_run_version], get_jattr(pjobsub, JOB_ATR_run_version), &sub_run_version, SET);
			free_attr(job_attr_def, &sub_run_version, JOB_ATR_run_version);
		}

		if (is_attr_set(&sub_prev_res)) {
			free_jattr(pjobsub, JOB_ATR_resource);
			set_attr_with_attr(&job_attr_def[JOB_ATR_resource], get_jattr(pjobsub, JOB_ATR_resource), &sub_prev_res, SET);
			free_attr(job_attr_def, &sub_prev_res, JOB_ATR_resource);
		}

		if (call_to_process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt) == 0) {
			/* subjob reject from hook*/
			reply_text(preq, PBSE_HOOKERROR, hook_msg);
			return;
		}
		pjob = where_to_runjob(preq, pjobsub);
		if (pjob) {
			/* free prov_vnode before use */
			free_jattr(pjob, JOB_ATR_prov_vnode);
			req_runjob2(preq, pjob);
		}
		return;
	}

	/*
	 * what's left to handle is a range of subjobs,
	 * foreach subjob, if queued, run it
	 */
	range = get_range_from_jid(jid);
	if (range == NULL) {
		req_reject(PBSE_IVALREQ, 0, preq);
		return;
	}

	++preq->rq_refct;

	while (1) {
		if ((i = parse_subjob_index(range, &pc, &start, &end, &step, &count)) == -1) {
			req_reject(PBSE_IVALREQ, 0, preq);
			break;
		} else if (i == 1)
			break;
		for (i = start; i <= end; i += step) {
			attribute sub_runcount = {0};
			attribute sub_run_version = {0};
			attribute sub_prev_res = {0};

			clear_attr(&sub_runcount, &job_attr_def[JOB_ATR_runcount]);
			clear_attr(&sub_run_version, &job_attr_def[JOB_ATR_run_version]);
			clear_attr(&sub_prev_res, &job_attr_def[JOB_ATR_resource]);

			pjobsub = get_subjob_and_state(parent, i, &sjst, NULL);
			if (sjst != JOB_STATE_LTR_QUEUED)
				continue;

			if (pjobsub != NULL) {
				if (is_jattr_set(pjobsub, JOB_ATR_runcount))
					set_attr_with_attr(&job_attr_def[JOB_ATR_runcount], &sub_runcount, get_jattr(pjobsub, JOB_ATR_runcount), SET);
				if (is_jattr_set(pjobsub, JOB_ATR_run_version))
					set_attr_with_attr(&job_attr_def[JOB_ATR_run_version], &sub_run_version, get_jattr(pjobsub, JOB_ATR_run_version), SET);
				if (is_jattr_set(pjobsub, JOB_ATR_resource))
					set_attr_with_attr(&job_attr_def[JOB_ATR_resource], &sub_prev_res, get_jattr(pjobsub, JOB_ATR_resource), SET);
				job_purge(pjobsub);
			}

			if ((pjobsub = create_subjob(parent, create_subjob_id(parent->ji_qs.ji_jobid, i), &j)) == NULL) {
				if (is_attr_set(&sub_prev_res))
					free_attr(job_attr_def, &sub_prev_res, JOB_ATR_resource);
				req_reject(j, 0, preq);
				continue;
			}

			if (is_attr_set(&sub_run_version))
				set_jattr_l_slim(pjobsub, JOB_ATR_run_version, get_attr_l(&sub_run_version), SET);

			if (is_attr_set(&sub_runcount))
				set_jattr_l_slim(pjobsub, JOB_ATR_runcount, get_attr_l(&sub_runcount), SET);

			if (is_attr_set(&sub_prev_res)) {
				free_jattr(pjobsub, JOB_ATR_resource);
				set_attr_with_attr(&job_attr_def[JOB_ATR_resource], get_jattr(pjobsub, JOB_ATR_resource), &sub_prev_res, SET);
				free_attr(job_attr_def, &sub_prev_res, JOB_ATR_resource);
			}

			if (call_to_process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt) == 0) {
				/* subjob reject from hook*/
				reply_text(preq, PBSE_HOOKERROR, hook_msg);
				return;
			}

			if ((pjob = where_to_runjob(preq, pjobsub)) == NULL)
				continue;

			dup_br_for_subjob(preq, pjob, req_runjob2);
		}
		range = pc;
	}

	/*
	 * if not waiting on any running subjobs, can reply; else
	 * it is taken care of when last running subjob responds
	 */
	if (--preq->rq_refct == 0)
		reply_send(preq);
	return;
}
/**
 * @brief
 * 		req_runjob - service the Run Job and Asyc Run Job Requests
 *
 * @param[in,out]	preq	-	Run Job Requests
 * @param[in,out]	pjob	-	job pointer
 */
static int
req_runjob2(struct batch_request *preq, job *pjob)
{
	int rc;
	int prov_rc = 0;
	int need_prov;
	char *dest;
	int rq_type = 0;

	/* Check if prov is required, if so, reply_ack and let prov finish */
	/* else follow normal flow */
	prov_rc = check_and_provision_job(preq, pjob, &need_prov);

	/* In case of subjob, save it to the database now because
	 * not saved to the database so far.
	 */
	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
		if (job_save_db(pjob)) {
			free_nodes(pjob);
			req_reject(PBSE_SAVE_ERR, 0, preq);
			return 1;
		}
	}

	if (prov_rc) { /* problem with the request */
		free_nodes(pjob);
		req_reject(prov_rc, 0, preq);
		return 1;
	} else if (need_prov == 1) { /* prov required and request is fine */
		/* allocate resources right away */
		set_resc_assigned((void *) pjob, 0, INCR);

		/* provisioning was needed and was qneueued successfully */
		/* Allways send ack for prov jobs, even if not async run */
		reply_ack(preq);
		return 0;
	}

	/* if need_prov ==0 then no prov required, so continue normal flow */
	dest = preq->rq_ind.rq_run.rq_destin;
	if ((dest == NULL) || (*dest == '\0') || ((*dest == '-') && (*(dest + 1) == '\0'))) {
		if (is_jattr_set(pjob, JOB_ATR_exec_vnode)) {
			dest = get_jattr_str(pjob, JOB_ATR_exec_vnode);
		} else {
			dest = NULL;
		}
	}
	if ((dest == NULL) || (*dest == '\0')) {
		/* Neither the run request nor the job specified an execvnode. */
		free_nodes(pjob);
		req_reject(PBSE_IVALREQ, 0, preq);
		return 1;
	}
	sprintf(log_buffer, msg_manager, msg_jobrun, preq->rq_user, preq->rq_host);
	strcat(log_buffer, " on exec_vnode ");
	rc = LOG_BUF_SIZE - strlen(log_buffer) - 1;
	strncat(log_buffer, dest, rc);
	*(log_buffer + LOG_BUF_SIZE - 1) = '\0';
	log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
		  pjob->ji_qs.ji_jobid, log_buffer);

	/* If async run, reply now; otherwise reply is handled in */
	/* post_sendmom or post_stagein				  */
	rq_type = preq->rq_type;
	if (preq && (rq_type == PBS_BATCH_AsyrunJob_ack)) {
		reply_ack(preq);
		preq = 0; /* cleared so we don't try to reuse */
	}

	if (((rc = svr_startjob(pjob, preq)) != 0) &&
	    ((rq_type == PBS_BATCH_AsyrunJob_ack) || preq)) {
		free_nodes(pjob);
		if (preq)
			req_reject(rc, 0, preq);
		return 1;
	}

	return 0;
}

/**
 * @brief
 *		clear_exec_on_run_fail - On failure of running a job,
 *		clear exec strings so job can be resecheduled anywhere.
 *
 * @par Functionality:
 *		If the job has been checkpointed then the job must run where it ran before.
 *		Otherwise it is free to run anywhere when re-scheduled.  In this case,
 *		clear the exec_hosts, exec_vnodes, etc.
 *
 * @param[in]	jobp	-	pointer to to job whose run failed
 *
 * @return	none
 *
 * @par MT-safe: yes
 */
void
clear_exec_on_run_fail(job *jobp)
{
	if ((jobp->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT) == 0) {

		free_jattr(jobp, JOB_ATR_exec_host);
		free_jattr(jobp, JOB_ATR_exec_host2);
		free_jattr(jobp, JOB_ATR_exec_vnode);
		jobp->ji_qs.ji_destin[0] = '\0';
	}
}

/*
 * @brief
 * 		req_stagein	-	service the Stage In Files for a Job Request
 *
 *		This request causes MOM to start stagin in files.
 *		Client must be privileged.
 *
 * @param[in]	preq	-	Job Request
 */

void
req_stagein(struct batch_request *preq)
{
	req_reject(PBSE_NOSUP, 0, preq);
}

/**
 * @brief
 * 		post_stagein - process reply from MOM to stage-in request
 *
 * @param[in]	pwt	-	pointer to work task structure which contains the request
 */

static void
post_stagein(struct work_task *pwt)
{
	int code;
	char newstate;
	int newsub;
	job *paltjob;
	job *pjob;
	struct batch_request *preq;
	attribute *pwait;

	preq = pwt->wt_parm1;
	code = preq->rq_reply.brp_code;
	pjob = find_job(preq->rq_extra);
	free(preq->rq_extra);

	if (pjob != NULL) {

		if (code != 0) {

			/* stage in failed - "wait" job */

			set_resc_assigned((void *) pjob, 0, DECR);
			free_nodes(pjob);
			free_jattr(pjob, JOB_ATR_exec_host);
			free_jattr(pjob, JOB_ATR_exec_host2);
			free_jattr(pjob, JOB_ATR_exec_vnode);

			if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
				/* for subjob, "wait" the parent array */
				paltjob = pjob->ji_parentaj;
			} else {
				/* for regular job, "wait" that job */
				paltjob = pjob;
			}
			pwait = get_jattr(paltjob, JOB_ATR_exectime);
			if (!is_jattr_set(paltjob, JOB_ATR_exectime)) {
				set_jattr_l_slim(paltjob, JOB_ATR_exectime, time_now + PBS_STAGEFAIL_WAIT, SET);
				job_set_wait(pwait, paltjob, 0);
			}
			svr_setjobstate(paltjob, JOB_STATE_LTR_WAITING, JOB_SUBSTATE_STAGEFAIL);

			if (preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text)
				svr_mailowner(pjob, MAIL_STAGEIN, MAIL_FORCE,
					      preq->rq_reply.brp_un.brp_txt.brp_str);
		} else {
			/* stage in was successful */
			pjob->ji_qs.ji_svrflags |= JOB_SVFLG_StagedIn;
			if (check_job_substate(pjob, JOB_SUBSTATE_STAGEGO)) {
				/* continue to start job running */
				svr_strtjob2(pjob, NULL);
			} else {
				svr_evaljobstate(pjob, &newstate, &newsub, 0);
				svr_setjobstate(pjob, newstate, newsub);
			}
		}
	}
	release_req(pwt); /* close connection and release request */
}

/**
 * @brief
 * 		svr_stagein - direct MOM to stage in the requested files for a job
 *
 * @param[in,out]	pjob	-	job structure
 * @param[in,out]	preq	-	request structure
 * @param[in]	state	-	job state
 * @param[in,out]	substate	-	job substate
 *
 * @return	int
 * @retval	0	- success
 * @retval	non-zero	- error code
 */

static int
svr_stagein(job *pjob, struct batch_request *preq, char state, int substate)
{
	struct batch_request *momreq = 0;
	int rc;

	momreq = cpy_stage(momreq, pjob, JOB_ATR_stagein, STAGE_DIR_IN);
	if (momreq) { /* have files to stage in */

		/* save job id for post_stagein */

		momreq->rq_extra = malloc(PBS_MAXSVRJOBID + 1);
		if (momreq->rq_extra == 0)
			return (PBSE_SYSTEM);
		strcpy(momreq->rq_extra, pjob->ji_qs.ji_jobid);
		rc = relay_to_mom(pjob, momreq, post_stagein);
		if (rc == 0) {

			svr_setjobstate(pjob, state, substate);
			/*
			 * show resources allocated as stage-in may take
			 * take sufficient time to run into another
			 * scheduling cycle
			 */
			set_resc_assigned((void *) pjob, 0, INCR);
			/*
			 * stage-in started ok - reply to client as copy may
			 * take too long to wait.
			 */

			if (preq)
				reply_ack(preq);
		} else {
			free(momreq->rq_extra);
		}
		return (rc);

	} else {

		/* no files to stage-in, go direct to sending job to mom */

		return (svr_strtjob2(pjob, preq));
	}
}

/**
 * @brief
 * 		form_attr_comment - Creates and return attribute comment in the given template
 * 		by appending time and execvnode
 *
 * @param[in]	template	-	template of the string
 * @param[in]	execvnode	-	execution node, NULL if this field is not required in the output
 *
 * @return	string
 * @retval	new attribute comment with time and execnode appended.
 *
 * @note
 * 		Do not copy the output of this function into log_buffer. It is used internally.
 */
char *
form_attr_comment(const char *template, const char *execvnode)
{
	char timebuf[128];
	strftime(timebuf, 128, "%a %b %d at %H:%M", localtime(&time_now));
	sprintf(log_buffer, template, timebuf);
	if (execvnode != NULL) {
		strcat(log_buffer, " on ");
		if (strlen(execvnode) > COMMENT_BUF_SIZE - strlen(log_buffer) - 1) {
			strncat(log_buffer, execvnode, COMMENT_BUF_SIZE - strlen(log_buffer) - 1 - 3);
			strcat(log_buffer, "...");
			log_buffer[COMMENT_BUF_SIZE - 1] = '\0';
		} else
			strcat(log_buffer, execvnode);
	}
	return log_buffer;
}

/**
 * @brief
 * 		svr_startjob - place a job into running state by shipping it to MOM
 *
 * @param[in,out]	pjob	-	job to run
 * @param[in,out]	preq	-	 NULL or Run Job batch request
 *
 * @return	int
 * @retval	0	- success
 * @retval	non-zero	- error code
 */
int
svr_startjob(job *pjob, struct batch_request *preq)
{
	int f;
	int rc;
	char *nspec;
	pbs_queue *pque = pjob->ji_qhdr;
	long delay = 10; /* Default value for kill_delay */

	/* if not already setup, transfer the control/script file basename */
	/* into an attribute accessable to MOM				   */

	if (!(is_jattr_set(pjob, JOB_ATR_hashname)))
		if (set_jattr_str_slim(pjob, JOB_ATR_hashname, pjob->ji_qs.ji_jobid, NULL))
			return (PBSE_SYSTEM);

	/* clear Exit_status which may have been set in a hook and requeued */
	clear_attr(get_jattr(pjob, JOB_ATR_exit_status), &job_attr_def[(int) JOB_ATR_exit_status]);

	/* if exec_vnode already set and either (hotstart or checkpoint) */
	/* then reuseuse the host(s) listed in the current exec_vnode	 */

	rc = 0;
	f = is_jattr_set(pjob, JOB_ATR_exec_vnode);
	if (f && ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HOTSTART) || (pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT)) && ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HasNodes) == 0)) {

		nspec = get_jattr_str(pjob, JOB_ATR_exec_vnode);
		if (nspec == NULL)
			return (PBSE_SYSTEM);
		rc = assign_hosts(pjob, nspec, 0);

	} else if (f == 0) {
		/* exec_vnode not already set, use hosts from request   */
		if (preq == NULL)
			return (PBSE_INTERNAL);
		nspec = preq->rq_ind.rq_run.rq_destin;
		if (nspec == NULL)
			return (PBSE_IVALREQ);

		rc = assign_hosts(pjob, nspec, 1);
	}
	if (rc != 0)
		return rc;

	if (is_jattr_set(pjob, JOB_ATR_create_resv_from_job) &&
	    get_jattr_long(pjob, JOB_ATR_create_resv_from_job))
		convert_job_to_resv(pjob);

	/* Move job_kill_delay attribute from Server to MOM */
	if (is_qattr_set(pque, QE_ATR_KillDelay))
		delay = get_qattr_long(pque, QE_ATR_KillDelay);
	set_jattr_l_slim(pjob, JOB_ATR_job_kill_delay, delay, SET);

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
	if (is_jattr_set(pjob, JOB_ATR_cred_id)) {
		rc = send_cred(pjob);
		if (rc != 0) {
			return rc; /* do not start job without credentials */
		}
	}
#endif

	/* Next, are there files to be staged-in? */

	if ((is_jattr_set(pjob, JOB_ATR_stagein)) &&
	    (!check_job_substate(pjob, JOB_SUBSTATE_STAGECMP))) {

		/* yes, we do that first; then start the job */

		rc = svr_stagein(pjob, preq, JOB_STATE_LTR_RUNNING, JOB_SUBSTATE_STAGEGO);

		/* note, the positive acknowledgment to the run job request */
		/* is done by svr_stagein if the stage-in is successful     */

		if (rc != 0) {
			/* If the stage-in failed and we aren't          */
			/* checkpointed, clear the exec_host/exec_vnode; */
			/* job can be run  elsewhere			 */
			if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT) == 0) {
				/* clear StagedIn flag for good measure */
				pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_StagedIn;
				free_jattr(pjob, JOB_ATR_exec_host);
				free_jattr(pjob, JOB_ATR_exec_host2);
				free_jattr(pjob, JOB_ATR_exec_vnode);
			}
		}

	} else {

		/* No stage-in or already done, start job executing */

		rc = svr_strtjob2(pjob, preq);
	}
	return (rc);
}

/**
 * @brief
 * 		Continue the process of running a job by sending it to Mother Superior,
 *		and making sure it is in JOB_SUBSTATE_PRERUN.
 *
 * @param[in]	pjob - pointer to job to run
 * @param[in]	preq - the run job request from the scheduler or client
 *
 * @return	int
 * @retval	0	:  success, job is being sent to Mom
 * @retval	!0	:  error in trying to send to Mom
 */
static int
svr_strtjob2(job *pjob, struct batch_request *preq)
{
	char old_state;
	int old_subst;

	old_state = get_job_state(pjob);
	old_subst = get_job_substate(pjob);
	pjob->ji_qs.ji_stime = 0; /* updated in complete_running() */

	/* if not restarting a checkpointed job, increment the run/hop count */

	if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT) == 0) {
		set_jattr_l_slim(pjob, JOB_ATR_run_version, 1, INCR);
		set_jattr_l_slim(pjob, JOB_ATR_runcount, 1, INCR);
	}

	/* send the job to MOM */
	set_jattr_generic(pjob, JOB_ATR_Comment,
			  form_attr_comment("Job was sent for execution at %s", get_jattr_str(pjob, JOB_ATR_exec_vnode)),
			  NULL, SET);

	if (old_subst != JOB_SUBSTATE_PROVISION)
		svr_setjobstate(pjob, JOB_STATE_LTR_RUNNING,
				JOB_SUBSTATE_PRERUN);

	if (send_job(pjob, pjob->ji_qs.ji_un.ji_exect.ji_momaddr,
		     pjob->ji_qs.ji_un.ji_exect.ji_momport, MOVE_TYPE_Exec,
		     post_sendmom, (void *) preq) == 2) {
		pjob->ji_prunreq = preq;
		/* Clear the suspend server flag. */
		pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_Suspend;

		/* in case of async ack runjob, we need to assign resources
		 * since another scheduling cycle can happen before the
		 * mom responds to the req_commit message. This is the
		 * same logic that is done for jobs with files to stage
		 * in
		 */
		if (preq == NULL || (preq->rq_type == PBS_BATCH_AsyrunJob_ack) || (preq->rq_type == PBS_BATCH_AsyrunJob)) {
			job *base_job = NULL;
			if (check_job_substate(pjob, JOB_SUBSTATE_PRERUN)) {
				set_resc_assigned((void *) pjob, 0, INCR);
				/* Just update dependencies for the first subjob that runs */
				if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) &&
				    !check_job_state(pjob->ji_parentaj, JOB_STATE_LTR_BEGUN))
					base_job = pjob->ji_parentaj;
				else
					base_job = pjob;
			}
			if (base_job != NULL &&
			    is_jattr_set(base_job, JOB_ATR_depend)) {
				struct depend *pdep;
				pdep = find_depend(JOB_DEPEND_TYPE_RUNONE, get_jattr(base_job, JOB_ATR_depend));
				if (pdep != NULL)
					depend_runone_hold_all(base_job);
			}
		}
		return (0);
	} else {
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_NOTICE,
			  pjob->ji_qs.ji_jobid,
			  "Unable to Run Job, send to Mom failed");

		if (check_job_substate(pjob, JOB_SUBSTATE_PROVISION) ||
		    check_job_substate(pjob, JOB_SUBSTATE_PRERUN))
			rel_resc(pjob);
		else
			free_nodes(pjob);

		clear_exec_on_run_fail(pjob);
		svr_evaljobstate(pjob, &old_state, &old_subst, 1);
		svr_setjobstate(pjob, old_state, old_subst);
		return (pbs_errno);
	}
}

/**
 * @brief
 *		Complete the process of placing a job into execution state
 * @par
 *		Records a bunch of information for accouting and resource management,
 *		and sets substate to PRERUN if it isn't already.
 *		The sub moves to SUBSTATE_RUNNING when the session id is received
 *		from Mom, meaning it is in fact running; see stat_update().
 * @par
 *		Note, if a job is in substate PROVISION,  the resources have already
 *		been allocated.
 *
 * @param[in]	jobp	-	pointer to job which is just starting to run.
 */

void
complete_running(job *jobp)
{
	job *parent;

	if (jobp->ji_qs.ji_stime != 0)
		return; /* already called for this incarnation */

	jobp->ji_terminated = 0; /* reset terminated flag */
	/**
	 *	For a subjob, insure the parent array's state is set to 'B'
	 *	and deal with any dependency on the parent.
	 */
	if (jobp->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
		/* if this is first subjob to run, mark */
		/* parent Array as state "Begun"	*/
		parent = jobp->ji_parentaj;
		if (check_job_state(parent, JOB_STATE_LTR_QUEUED) ||
		    (check_job_state(parent, JOB_STATE_LTR_BEGUN) && parent->ji_qs.ji_stime == 0)) {
			svr_setjobstate(parent, JOB_STATE_LTR_BEGUN, JOB_SUBSTATE_BEGUN);

			/* Also set the parent job's stime */
			parent->ji_qs.ji_stime = time_now;
			set_jattr_l_slim(parent, JOB_ATR_stime, time_now, SET);

			account_jobstr(parent, PBS_ACCT_RUN);
			set_jattr_str_slim(parent, JOB_ATR_Comment, form_attr_comment("Job Array Began at %s", NULL), NULL);

			/* if any dependencies, see if action required */
			if (is_jattr_set(parent, JOB_ATR_depend))
				depend_on_exec(parent);

			svr_mailowner(parent, MAIL_BEGIN, MAIL_NORMAL, NULL);
		}
	}
	/* Job started ATR_Comment is set in server since scheduler cannot read	*/
	/* the reply in case of error in asynchronous communication.	*/
	set_jattr_str_slim(jobp, JOB_ATR_Comment, form_attr_comment("Job run at %s", get_jattr_str(jobp, JOB_ATR_exec_vnode)), NULL);

	jobp->ji_qs.ji_svrflags &= ~JOB_SVFLG_HOTSTART;

	/* record start time for accounting and for the Scheduler */
	/* setting ji_stime is also an indicator that we have done all this */

	jobp->ji_qs.ji_stime = time_now;
	set_jattr_l_slim(jobp, JOB_ATR_stime, time_now, SET);

	/*
	 * if job is in substate PROVISION, set to PRERUN.
	 * It is possible that the job is in substate:
	 * - RUNNING if Mom sent the status update first before we get to
	 *   process the send_job SIGCHLD, see stat_update()
	 * - EXITING if the Obit was received before send_job's exit status.
	 */
	if (check_job_substate(jobp, JOB_SUBSTATE_PROVISION)) {
		svr_setjobstate(jobp, JOB_STATE_LTR_RUNNING, JOB_SUBSTATE_PRERUN);
		/* above saves job structure */
	}

	/* update resource usage attributes */
	/* may have already been done for provisioning, but    */
	/* that will be detected inside of set_resc_assigned() */
	set_resc_assigned((void *) jobp, 0, INCR);
	/* These attributes need to be cleared/freed now that the job has been resumed */
	if (is_jattr_set(jobp, JOB_ATR_resc_released)) {
		free_jattr(jobp, JOB_ATR_resc_released);
		mark_jattr_not_set(jobp, JOB_ATR_resc_released);
	}

	if (is_jattr_set(jobp, JOB_ATR_resc_released_list)) {
		free_jattr(jobp, JOB_ATR_resc_released_list);
		mark_jattr_not_set(jobp, JOB_ATR_resc_released_list);
	}

	/* accounting log for start or restart */
	if (jobp->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT)
		account_record(PBS_ACCT_RESTRT, jobp, NULL);
	else
		account_jobstr(jobp, PBS_ACCT_RUN);

	/* if any dependencies, see if action required */

	if (is_jattr_set(jobp, JOB_ATR_depend))
		depend_on_exec(jobp);

	svr_mailowner(jobp, MAIL_BEGIN, MAIL_NORMAL, NULL);
	/*
	 * it is unfortunate, but while the job has gone into execution,
	 * there is no way of obtaining the session id except by making
	 * a status request of MOM.  (Even if the session id was passed
	 * back to the sending child, it couldn't get up to the parent.)
	 */
}

/**
 * @brief
 * 		Helper function to parse the hookname and hook_msg out of a hook rejection message
 *
 * @param[in]	reject_msg	-	The hooks rejection message
 * @param[out]	hook_name	-	pointer to buffer to fill parsed hook_name
 * @param[in]	hook_name_size	- The length of the hook name output buffer
 *
 * @return	The hook message
 * @retval	NULL	: Failed to parse out hook_name and hook_msg
 * @retval	!NULL	: The hook message
 */
static char *
parse_hook_rejectmsg(char *reject_msg, char *hook_name, int hook_name_size)
{
	char *p;
	if (reject_msg != NULL) {
		p = strchr(reject_msg, ',');
		if (p != NULL) {
			*p = '\0';
			p++;
			strncpy(hook_name, reject_msg, hook_name_size);
			return p;
		}
	}
	return NULL;
}

/**
 * @brief
 *		Check and put a hold on a job if it has already been run
 *		too many times.
 *
 * @param[in,out]	pjob	-	job pointer
 *
 * @return	void
 */
void
check_failed_attempts(job *jobp)
{
	if (get_jattr_long(jobp, JOB_ATR_runcount) >
#ifdef NAS /* localmod 083 */
	    PBS_MAX_HOPCOUNT
#else
	    PBS_MAX_HOPCOUNT + PBS_MAX_HOPCOUNT
#endif /* localmod 083 */
	) {
		set_jattr_b_slim(jobp, JOB_ATR_hold, HOLD_s, INCR);
		set_jattr_str_slim(jobp, JOB_ATR_Comment, "job held, too many failed attempts to run", NULL);

		if (jobp->ji_parentaj) {
			char comment_buf[100 + PBS_MAXSVRJOBID];
			svr_setjobstate(jobp->ji_parentaj, JOB_STATE_LTR_HELD, JOB_SUBSTATE_HELD);
			set_jattr_b_slim(jobp->ji_parentaj, JOB_ATR_hold, HOLD_s, INCR);
			sprintf(comment_buf, "Job Array Held, too many failed attempts to run subjob %s", jobp->ji_qs.ji_jobid);
			set_jattr_str_slim(jobp->ji_parentaj, JOB_ATR_Comment, comment_buf, NULL);
		}
	}
}

/**
 * @brief
 * 		post_sendmom - clean up action for child started in send_job
 *		which was sending a job "home" to MOM
 * @par
 * 		If send was successfull, mark job as executing.
 * 		See comments in complete_running() above about the possible substate changes.
 *
 * 		The job's session id will be updated with Mom first responds with
 * 		the resources_used.
 *
 * 		If send didn't work, requeue the job.
 *
 * 		If the work_task has a non-null wt_parm2, it is the address of a batch
 * 		request to which a reply must be sent.
 * @par
 * 		If the ji_prunreq (pointer to the run request) is null,  the run request
 * 		has already been replied to.  This might happen if the job's Obit is
 * 		received prior to reaping the send_job child.  In that case, we skip all
 * 		this because the job has already "run" and is now in Exiting state.
 *
 * @param[in,out]	pwt	-	work_task structure
 *
 * Returns: none.
 */
void
post_sendmom(struct work_task *pwt)
{
	char newstate;
	int newsub;
	int r;
	char *reject_msg = NULL;
	int wstat = pwt->wt_aux;
	job *jobp = (job *) pwt->wt_parm2;
	struct batch_request *preq = (struct batch_request *) pwt->wt_parm1;
	int prot = pwt->wt_aux2;
	struct batch_reply *reply = (struct batch_reply *) pwt->wt_parm3;
	char dest_host[PBS_MAXROUTEDEST + 1];
	char hook_name[PBS_HOOK_NAME_SIZE + 1] = {'\0'};
	char *hook_msg = NULL;

	if (jobp == NULL) {
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_INFO, "", "post_sendmom failed, jobp NULL");
		if (preq)
			req_reject(PBSE_SYSTEM, 0, preq);
		return;
	}

	DBPRT(("post_sendmom: %s substate is %ld", jobp->ji_qs.ji_jobid, get_job_substate(jobp)))

	if (jobp->ji_prunreq)
		jobp->ji_prunreq = NULL; /* set in svr_strtjob2() */

	if (prot == PROT_TCP) {
		if (WIFEXITED(wstat)) {
			r = WEXITSTATUS(wstat);
		} else if (WIFSIGNALED(wstat)) {
			/* Check if send_job child process has been signaled or not */
			r = SEND_JOB_SIGNAL;
			snprintf(log_buffer, LOG_BUF_SIZE, msg_job_end_sig, WTERMSIG(wstat));
			log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, LOG_INFO,
				  jobp->ji_qs.ji_jobid, log_buffer);
		} else {
			r = SEND_JOB_RETRY;
			sprintf(log_buffer, msg_badexit, wstat);
			strcat(log_buffer, __func__);
			log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, LOG_INFO,
				  jobp->ji_qs.ji_jobid, log_buffer);
		}

		/* if the return code says it was hook error read the hook error message
		 * from the file and parse out hookname and hook_msg
		 */
		if ((r == SEND_JOB_HOOKERR) ||
		    (r == SEND_JOB_HOOK_REJECT) ||
		    (r == SEND_JOB_HOOK_REJECT_RERUNJOB) ||
		    (r == SEND_JOB_HOOK_REJECT_DELETEJOB)) {

			char name_buf[MAXPATHLEN + 1];
			int fd;
			struct stat sbuf;

			snprintf(name_buf, sizeof(name_buf), "%s%s%s", path_hooks_workdir, jobp->ji_qs.ji_jobid,
				 HOOK_REJECT_SUFFIX);

			if ((stat(name_buf, &sbuf) != -1) && (sbuf.st_size > 0)) {

				if ((fd = open(name_buf, O_RDONLY)) != -1) {

					reject_msg = malloc(sbuf.st_size);
					if (reject_msg != NULL) {
						if (read(fd, reject_msg, sbuf.st_size) != sbuf.st_size) {
							sprintf(log_buffer, "read %s is incomplete", name_buf);
							log_err(errno, __func__, log_buffer);
							reject_msg[0] = '\0';
						}
					}
					close(fd);
					unlink(name_buf);
				}
			}
			hook_msg = parse_hook_rejectmsg(reject_msg, hook_name, PBS_HOOK_NAME_SIZE);
		}

	} else {
		/* in case of tpp, the pbs_errno is set in wstat, based
		 * on which we determine value of r
		 */
		switch (wstat) {
			case PBSE_NONE:
				r = SEND_JOB_OK;
				break;
			case PBSE_NORELYMOM:
				r = SEND_JOB_NODEDW;
				break;
			case PBSE_HOOKERROR:
				r = SEND_JOB_HOOKERR;
				break;
			case PBSE_HOOK_REJECT:
				r = SEND_JOB_HOOK_REJECT;
				break;
			case PBSE_HOOK_REJECT_RERUNJOB:
				r = SEND_JOB_HOOK_REJECT_RERUNJOB;
				break;
			case PBSE_HOOK_REJECT_DELETEJOB:
				r = SEND_JOB_HOOK_REJECT_DELETEJOB;
				break;
			default:
				r = SEND_JOB_FATAL;
				break;
		}

		/* also take note of the reject msg if any */
		if (reply && reply->brp_choice == BATCH_REPLY_CHOICE_Text)
			reject_msg = reply->brp_un.brp_txt.brp_str;

		/*
		 * the above reject_msg should never be freed within this function
		 * since it will be freed by the caller process_DreplyTPP() in the
		 * case of a TPP based job send
		 */

		if (r != SEND_JOB_OK) {
			if (reject_msg)
				sprintf(log_buffer,
					"send of job to %s failed error = %d reject_msg=%s",
					jobp->ji_qs.ji_destin, pbs_errno, reject_msg);
			else
				sprintf(log_buffer,
					"send of job to %s failed error = %d",
					jobp->ji_qs.ji_destin, pbs_errno);

			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, jobp->ji_qs.ji_jobid, log_buffer);
			snprintf(log_buffer, LOG_BUF_SIZE,
				 "Not Running: PBS Error: %s", pbse_to_txt(PBSE_MOMREJECT));

			if (jobp->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
				/*
				 * if the job is a subjob, set the comment of parent job array
				 * only if the job array is in state Queued. Once the job
				 * array starts its comment is set to a begun message and
				 * should not change after that
				 */
				if (check_job_state(jobp->ji_parentaj, JOB_STATE_LTR_QUEUED)) {
					set_jattr_str_slim(jobp->ji_parentaj, JOB_ATR_Comment, log_buffer, NULL);
				}
			}

			/* if the job is a normal job or a subjob */
			set_jattr_generic(jobp, JOB_ATR_Comment, log_buffer, NULL, SET);

			if (pbs_errno == PBSE_MOM_REJECT_ROOT_SCRIPTS)
				check_failed_attempts(jobp);
		}

		/* in the case of hook error we parse the hook_name and hook msg */
		if ((r == SEND_JOB_HOOKERR) ||
		    (r == SEND_JOB_HOOK_REJECT) ||
		    (r == SEND_JOB_HOOK_REJECT_RERUNJOB) ||
		    (r == SEND_JOB_HOOK_REJECT_DELETEJOB)) {

			hook_msg = parse_hook_rejectmsg(reject_msg, hook_name, PBS_HOOK_NAME_SIZE);
		}
	}

	if (!(check_job_substate(jobp, JOB_SUBSTATE_PRERUN) ||
	      check_job_substate(jobp, JOB_SUBSTATE_RUNNING) ||
	      check_job_substate(jobp, JOB_SUBSTATE_PROVISION))) {
		sprintf(log_buffer, "send_job returned with exit status = %d and job substate = %ld",
			r, get_job_substate(jobp));

		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_INFO,
			  jobp->ji_qs.ji_jobid, log_buffer);
	}

	switch (r) {

		case SEND_JOB_OK: /* send to MOM went ok */

			if (preq)
				reply_ack(preq);
			if ((check_job_substate(jobp, JOB_SUBSTATE_PRERUN)) ||
			    (check_job_substate(jobp, JOB_SUBSTATE_PROVISION)))
				complete_running(jobp);
			break;

		case SEND_JOB_SIGNAL:

			/* send_job child process has been signaled
			 * therefore kill the job if it is already
			 * running on the MOM and force requeue the job
			 */
			if (preq)
				req_reject(PBSE_SYSTEM, 0, preq);

			/* need to record log message before aborting and
			 * requeuing job both in server and accounting logs
			 */
			snprintf(log_buffer, LOG_BUF_SIZE, "%s", msg_job_abort);
			log_event(PBSEVENT_SYSTEM | PBSEVENT_JOB | PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO, jobp->ji_qs.ji_jobid, log_buffer);

			/* abort job irrespective of its presence
			 * (may or may not be running) in a MOM
			 */
			job_abt(jobp, log_buffer);

			snprintf(log_buffer, LOG_BUF_SIZE, msg_init_substate, get_job_substate(jobp));
			log_event(PBSEVENT_SYSTEM | PBSEVENT_JOB | PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO, jobp->ji_qs.ji_jobid, log_buffer);

			/* Force requeue the job since the job has been aborted by the server */
			force_reque(jobp);
			break;

		case SEND_JOB_NODEDW: /* node (mother superior) is down? */
			mark_node_down(jobp->ji_qs.ji_destin, "could not send job to mom");

			/* fall through to requeue job */

		default: /* send failed, requeue the job */
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_NOTICE,
				  jobp->ji_qs.ji_jobid,
				  "Unable to Run Job, MOM rejected");

			/* release resources */
			if (check_job_substate(jobp, JOB_SUBSTATE_PROVISION) ||
			    check_job_substate(jobp, JOB_SUBSTATE_PRERUN))
				rel_resc(jobp);
			else
				free_nodes(jobp);

			/* delete stagein files if flag is set */
			if (jobp->ji_qs.ji_svrflags & JOB_SVFLG_StagedIn)
				if (remove_stagein(jobp) != 0) {
					/* if remove stagein is failed then */
					/* we will remove stagedin flag from job */
					jobp->ji_qs.ji_svrflags &= ~JOB_SVFLG_StagedIn;
				}
			snprintf(dest_host, sizeof(dest_host), "%s", jobp->ji_qs.ji_destin);
			clear_exec_on_run_fail(jobp);

			if (!check_job_substate(jobp, JOB_SUBSTATE_ABORT)) {
				if (preq) {
					if ((r == SEND_JOB_HOOKERR) ||
					    (r == SEND_JOB_HOOK_REJECT) ||
					    (r == SEND_JOB_HOOK_REJECT_RERUNJOB) ||
					    (r == SEND_JOB_HOOK_REJECT_DELETEJOB)) {
						int err;

						if (r == SEND_JOB_HOOK_REJECT)
							err = PBSE_HOOK_REJECT;
						else if (r == SEND_JOB_HOOK_REJECT_RERUNJOB)
							err = PBSE_HOOK_REJECT_RERUNJOB;
						else if (r == SEND_JOB_HOOK_REJECT_DELETEJOB)
							err = PBSE_HOOK_REJECT_DELETEJOB;
						else
							err = PBSE_HOOKERROR;

						log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB,
							  LOG_NOTICE, jobp->ji_qs.ji_jobid, pbse_to_txt(err));

						reply_text(preq, err, hook_msg ? hook_msg : "");
					} else {
						req_reject(PBSE_MOMREJECT, 0, preq);
					}
				}

				if (r == SEND_JOB_HOOK_REJECT_DELETEJOB) {
					log_event(PBSEVENT_DEBUG,
						  PBS_EVENTCLASS_JOB, LOG_INFO,
						  jobp->ji_qs.ji_jobid,
						  "Job aborted per a hook rejection");

					/* Need to force queued state so */
					/* job_abt() call does not try   */
					/* to issue a kill job signal to mom */
					set_job_state(jobp, JOB_STATE_LTR_QUEUED);
					set_job_substate(jobp, JOB_SUBSTATE_QUEUED);
					job_abt(jobp, msg_hook_reject_deletejob);
					break;
				} else if ((r == SEND_JOB_HOOKERR) ||
					   (r == SEND_JOB_HOOK_REJECT) ||
					   (r == SEND_JOB_HOOK_REJECT_RERUNJOB)) {
					check_failed_attempts(jobp);
					if (r == SEND_JOB_HOOKERR) {
						hook *phook;
						phook = find_hook(hook_name);
						if (phook != NULL) {
							if ((phook->fail_action & HOOK_FAIL_ACTION_OFFLINE_VNODES) != 0) {
								/*
								 * hook_buf must be large enough
								 * to hold the hook_name and a
								 * small amount of text.
								 */
								char hook_buf[PBS_HOOK_NAME_SIZE + 64];

								snprintf(hook_buf, sizeof(hook_buf),
									 "offlined by hook '%s' due to hook error",
									 hook_name);
								mark_node_offline_by_mom(dest_host, hook_buf);
							}
							if ((phook->fail_action & HOOK_FAIL_ACTION_SCHEDULER_RESTART_CYCLE) != 0) {

								set_scheduler_flag(SCH_SCHEDULE_RESTART_CYCLE, dflt_scheduler);
								log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK, LOG_INFO, phook->hook_name, "requested for scheduler to restart cycle");
							}
						}
					}
				}

				svr_evaljobstate(jobp, &newstate, &newsub, 1);
				svr_setjobstate(jobp, newstate, newsub);
			} else {
				if (preq)
					req_reject(PBSE_BADSTATE, 0, preq);
			}

			break;
	}

	if (prot == PROT_TCP && reject_msg != NULL)
		free(reject_msg); /* free this only in case of non-tpp since it was locally allocated */

	return;
}

/**
 * @brief
 * 		chk_job_torun - check state and past execution host of a job for which
 *		files are about to be staged in or the job is about to be run.
 * 		Returns pointer to job if all is ok, else returns null.
 *
 *		pjob must be to a existing job structure
 *
 * @param[in,out]	preq	-	Pointer to batch request
 * @param[in]	pjob	-	existing job structure
 *
 * @return	Pointer to job
 * @retval	null	: fail
 */

static job *
chk_job_torun(struct batch_request *preq, job *pjob)
{

	if (pjob == NULL)
		return pjob;

	if ((check_job_state(pjob, JOB_STATE_LTR_TRANSIT)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_EXITING)) ||
	    (check_job_substate(pjob, JOB_SUBSTATE_STAGEGO)) ||
	    (check_job_substate(pjob, JOB_SUBSTATE_PRERUN)) ||
	    (check_job_substate(pjob, JOB_SUBSTATE_RUNNING))) {
		req_reject(PBSE_BADSTATE, 0, preq);
		return NULL;
	}

	if (preq->rq_type == PBS_BATCH_StageIn) {
		if (check_job_substate(pjob, JOB_SUBSTATE_STAGEIN)) {
			req_reject(PBSE_BADSTATE, 0, preq);
			return NULL;
		}
	}
	return (pjob);
}
/**
 * @brief
 * 		where to execute the job
 *
 * @param[in,out]	preq	-	Pointer to batch request
 * @param[in,out]	pjob	-	existing job structure
 *
 * @return	Pointer to job
 * @retval	null	: fail
 */
static job *
where_to_runjob(struct batch_request *preq, job *pjob)
{
	char *nspec;
	struct rq_runjob *prun = &preq->rq_ind.rq_run;
	int rc;

	if ((pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHKPT | JOB_SVFLG_StagedIn)) ||
	    ((prun->rq_destin != NULL) && (*prun->rq_destin == '-') && (*(prun->rq_destin + 1) == '\0'))) {
		/* Job has files staged, a checkpoint image, or "qrun -H -" was specified.	*/
		/* Reuse assigned resources.							*/
		if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HasNodes) == 0) {
			/* re-reserve nodes and leave exec_vnode as is */
			/* convert exec_vnode string into form like user spec */
			nspec = get_jattr_str(pjob, JOB_ATR_exec_vnode);
			if (nspec == NULL) {
				/* something's wrong, before we reject the */
				/* job let us clear the flags so the job can */
				/* run the next time around  */
				pjob->ji_qs.ji_svrflags &= ~(JOB_SVFLG_CHKPT |
							     JOB_SVFLG_StagedIn);
				req_reject(PBSE_IVALREQ, 0, preq);
				return NULL;
			}
			if ((rc = assign_hosts(pjob, nspec, 0)) != 0) {
				free(nspec);
				req_reject(rc, 0, preq);
				return NULL;
			}
		}
	} else {

		/* job has not run before or need not run there again	*/
		/* reserve nodes and set exec_vnode anew		*/

		if ((prun->rq_destin == NULL) ||
		    (*prun->rq_destin == '\0')) {
			req_reject(PBSE_IVALREQ, 0, preq);
			return NULL;
		}

		if ((is_jattr_set(pjob, JOB_ATR_exec_vnode)) != 0) {
			/*
			 * Instruct MoM to discard the existing job before we assign new
			 * resources. This ensures previously assigned resources are cleaned
			 * up properly and prevents orphaned processes. If the job is not
			 * discarded, files and directories created for the job will linger.
			 */
			discard_job(pjob, "Force qrun", 1);
		}

		rc = assign_hosts(pjob, prun->rq_destin, 1);

		if (rc != 0) {
			req_reject(rc, 0, preq);
			return NULL;
		}
	}

	/* If the request did not come from the scheduler, update the comment. */
	if (find_sched_from_sock(preq->rq_conn, CONN_SCHED_PRIMARY) == NULL) {
		char comment[MAXCOMMENTLEN];
		nspec = get_jattr_str(pjob, JOB_ATR_exec_vnode);
		if ((nspec != NULL) && (*nspec != '\0')) {
			snprintf(comment, MAXCOMMENTLEN, "Job manually qrun on %s", nspec);
		} else {
			snprintf(comment, MAXCOMMENTLEN, "Job manually qrun.");
		}
		set_jattr_str_slim(pjob, JOB_ATR_Comment, comment, NULL);
	}

	return (pjob);
}

/**
 * @brief
 * 		assign_hosts - assign hosts (vnodes) to job which are specified (given) by:
 *		1. the scheduler when it runs a job,
 *		2. the operator as the -H option to qrun
 *		3. from exec_vnode when required by checkpoint-restart or file stage-in
 *
 * @param[in,out]	pjob	-	pointer to a job object
 * @param[in]	given	-	original vnode list from scheduler/operator
 * @param[in]	set_exec_vnode	-	if True (non-zero), this function is to create
 *                              	a new hoststr including new job indicies,
 *                              	otherwise return existing exec_host unchanged.
 *
 * @return	int
 * @retval	0	: success
 * @retval	!0	: error code
 */

int
assign_hosts(job *pjob, char *given, int set_exec_vnode)
{
	char *hoststr;
	char *hoststr2;
	char *vnodestoalloc;
	pbs_net_t momaddr = 0;
	unsigned int port;
	int rc = 0;

	if (svr_totnodes == 0) /* Must have nodes file */
		return (PBSE_NONODES);

	if (given == NULL)
		return (PBSE_IVALREQ);

	/* allocate the execution nodes and resources */

	if ((set_exec_vnode == 0) &&
	    (is_jattr_set(pjob, JOB_ATR_exec_host))) {
		hoststr = get_jattr_str(pjob, JOB_ATR_exec_host);
		hoststr2 = get_jattr_str(pjob, JOB_ATR_exec_host2);
	} else {
		hoststr = NULL;
		hoststr2 = NULL;
	}

	rc = set_nodes((void *) pjob, JOB_OBJECT, given, &vnodestoalloc, &hoststr, &hoststr2,
		       set_exec_vnode, FALSE);

	if (rc == 0) {
		if (set_exec_vnode) {
			free_jattr(pjob, JOB_ATR_exec_host);
			free_jattr(pjob, JOB_ATR_exec_host2);
			free_jattr(pjob, JOB_ATR_exec_vnode);
			set_jattr_str_slim(pjob, JOB_ATR_exec_vnode, vnodestoalloc, NULL);
			set_jattr_str_slim(pjob, JOB_ATR_exec_host, hoststr, NULL);
			set_jattr_str_slim(pjob, JOB_ATR_exec_host2, hoststr2, NULL);
		} else {
			/* leave exec_vnode alone and reuse old IP address */
			momaddr = pjob->ji_qs.ji_un.ji_exect.ji_momaddr;
			port = pjob->ji_qs.ji_un.ji_exect.ji_momport;
		}
		strncpy(pjob->ji_qs.ji_destin,
			parse_servername(hoststr, NULL),
			PBS_MAXROUTEDEST);
		if (momaddr == 0) {
			momaddr = get_addr_of_nodebyname(pjob->ji_qs.ji_destin,
							 &port);
			if (momaddr == 0) {
				free_nodes(pjob);
				free_jattr(pjob, JOB_ATR_exec_host);
				free_jattr(pjob, JOB_ATR_exec_host2);
				free_jattr(pjob, JOB_ATR_exec_vnode);
				return (PBSE_BADHOST);
			}
		}
		pjob->ji_qs.ji_un.ji_exect.ji_momaddr = momaddr;
		pjob->ji_qs.ji_un.ji_exect.ji_momport = port;
	}
	return (rc);
}

/**
 * @brief
 * 		req_defschedreply - handle the deferred scheduler reply call
 *
 * @param[in,out]	preq	-	Pointer to batch request
 */

void
req_defschedreply(struct batch_request *preq)
{
	struct deferred_request *pdefr;

	if (preq->rq_ind.rq_defrpy.rq_cmd != SCH_SCHEDULE_AJOB) {
		req_reject(PBSE_IVALREQ, 0, preq);
		return;
	}

	for (pdefr = (struct deferred_request *) GET_NEXT(svr_deferred_req);
	     pdefr;
	     pdefr = (struct deferred_request *) GET_NEXT(pdefr->dr_link)) {
		if (strcmp(preq->rq_ind.rq_defrpy.rq_id, pdefr->dr_id) == 0)
			break;
	}

	if (pdefr == NULL) {
		req_reject(PBSE_UNKJOBID, 0, preq);
		return;
	}

	/* reply to the original (deferred) request */
	/* if the connection for the original request (qrun) was closed */
	/* the pointer to it will have been nulled */
	if (pdefr->dr_preq != NULL) {
		/* "preq" points to the deferred reply from the Scheduler  */
		/* "pdefr" points to the original qrun batch request, this */
		/* request structure will be freed on the reply            */

		if (preq->rq_ind.rq_defrpy.rq_txt) {
			/* have a text string from the Scheduler to send to qrun */
			reply_text(pdefr->dr_preq, preq->rq_ind.rq_defrpy.rq_err,
				   preq->rq_ind.rq_defrpy.rq_txt);

		} else if (preq->rq_ind.rq_defrpy.rq_err == 0) {
			/* no error, acknowledge qrun */
			reply_send(pdefr->dr_preq);

		} else {
			/* was an error (without text string), send error to qrun */
			req_reject(preq->rq_ind.rq_defrpy.rq_err, 0, pdefr->dr_preq);
		}
	}

	/* unlink and free the deferred request entry */
	delete_link(&pdefr->dr_link);
	free(pdefr);

	reply_send(preq);
}

/**
 * @brief
 *	convert_job_to_resv - create a reservation out of the job
 * 			      and move the job to the newly created
 * 			      reservation.
 *
 * @param[in]	pjob - pointer to the job object
 *
 * @return	void
 */

void
convert_job_to_resv(job *pjob)
{
	svrattrl *psatl;
	unsigned int len;
	pbs_list_head *plhed;
	struct work_task *pwt;
	struct batch_request *newreq;

	newreq = alloc_br(PBS_BATCH_SubmitResv);
	if (newreq == NULL) {
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, LOG_ERR,
			  pjob->ji_qs.ji_jobid, "batch request allocation failed, could not create reservation from the job");
		return;
	}
	newreq->rq_type = PBS_BATCH_SubmitResv;

	get_jobowner(get_jattr_str(pjob, JOB_ATR_job_owner), newreq->rq_user);

	strncpy(newreq->rq_host, get_jattr_str(pjob, JOB_ATR_submit_host), PBS_MAXHOSTNAME);
	newreq->rq_perm = READ_WRITE | ATR_DFLAG_ALTRUN;

	newreq->rq_ind.rq_queuejob.rq_jid[0] = '\0';
	newreq->rq_ind.rq_queuejob.rq_destin[0] = '\0';

	len = strlen(pjob->ji_qs.ji_jobid) + 1;
	plhed = &newreq->rq_ind.rq_queuejob.rq_attr;
	CLEAR_HEAD(newreq->rq_ind.rq_queuejob.rq_attr);
	if ((psatl = attrlist_create(ATTR_resv_job, NULL, len)) != NULL) {
		psatl->al_flags = resv_attr_def[RESV_ATR_job].at_flags;
		strcpy(psatl->al_value, pjob->ji_qs.ji_jobid);
		append_link(plhed, &psatl->al_link, psatl);
	}

	if (issue_Drequest(PBS_LOCAL_CONNECTION, newreq, release_req, &pwt, 0) == -1) {
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_JOB, LOG_ERR,
			  pjob->ji_qs.ji_jobid, "Could not create reservation from the job");
		free_br(newreq);
	}
}
