/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 *
 * @brief
 * 		contains server functions dealing with jobs
 *
 */
#include <pbs_config.h> /* the master config generated by configure */

#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <ctype.h>
#include <time.h>
#include <math.h>
#include <netdb.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/poll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "pbs_ifl.h"
#include "libutil.h"
#include "server_limits.h"
#include "list_link.h"
#include "attribute.h"
#include "libpbs.h"
#include "credential.h"
#include "batch_request.h"
#include "resource.h"
#include "server.h"
#include "work_task.h"
#include "resv_node.h"
#include "queue.h"
#include "job.h"
#include "pbs_sched.h"
#include "reservation.h"
#include "pbs_error.h"
#include "log.h"
#include "acct.h"
#include "pbs_idx.h"
#include "pbs_nodes.h"
#include "svrfunc.h"
#include "sched_cmds.h"
#include "dis.h"
#include "libsec.h"
#include "pbs_license.h"
#include "pbs_reliable.h"
#include <sys/wait.h>

#define MIN_WALLTIME_LIMIT 0
#define MAX_WALLTIME_LIMIT 1

char statechars[] = "TQHWREXBMF";

/* Private Functions */

static void default_std(job *, int key, char *to);
static void Time4reply(struct work_task *);
static void Time4resv(struct work_task *);
static void Time4resv1(struct work_task *);
static void resvFinishReply(struct work_task *);
int change_enableORstart(resc_resv *, int, char *);
static void handle_qmgr_reply_to_startORenable(struct work_task *);
static void delete_occurrence_jobs(resc_resv *presv);
static void Time4occurrenceFinish(resc_resv *);
static void running_jobs_count(struct work_task *);

/* Global Data Items: */
extern char *msg_noloopbackif;
extern char *msg_mombadmodify;

extern struct server server;
extern int pbs_mom_port;
extern pbs_list_head svr_alljobs;
extern char *msg_badwait; /* error message */
extern char *msg_daemonname;
extern char *msg_also_deleted_job_history;
extern char server_name[];
extern pbs_list_head svr_queues;
extern int comp_resc_lt;
extern int comp_resc_gt;
extern time_t time_now;
extern char *resc_in_err;

extern struct licenses_high_use usedlicenses;

/* For history jobs only */
extern long svr_history_enable;
extern long svr_history_duration;

/* Work Task Handlers */

extern void resv_retry_handler(struct work_task *);
extern long determine_resv_retry(resc_resv *);

/* external functions */
extern void free_job_work_tasks(job *);

/* Private Functions */

#ifndef NDEBUG
static void correct_ct(pbs_queue *);
#endif /* NDEBUG */

/**
 * @brief
 * 		clear the default resource from structures
 *
 * @param[in]	pjob	-	The job to be enqueued.
 */
static void
clear_default_resc(job *pjob)
{
	resource *presc;

	if (is_jattr_set(pjob, JOB_ATR_resource)) {
		presc = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resource));
		while (presc) {
			if (presc->rs_value.at_flags & ATR_VFLAG_DEFLT)
				presc->rs_defin->rs_free(&presc->rs_value);
			presc = (resource *) GET_NEXT(presc->rs_link);
		}
	}
}

/**
 * @brief
 * 		tickle_for_reply ()
 * 		For internally generated requests to the server we would like
 * 		processing of the reply from the particular server subsystem
 * 		to happen as "soon" as the server get back to its main loop -
 * 		see server's main loop and "next_task()" and variable, "waittime".
 * 		By placing a do nothing task on the "timed_task_list" whose time
 * 		is now (or already passed), we can get next_task() to look at the
 * 		"task_list_immed" tasks now rather than wait for a while
 */
void
tickle_for_reply(void)
{
	(void) set_task(WORK_Timed, time_now + 10, 0, NULL);
}

/**
 * @brief
 * 		svr_enquejob	-	Enqueue the job into specified queue.
 *
 * @param[in]	pjob	-	The job to be enqueued.
 * @param[in]	selectspec -	select spec of the job.
 *
 * @return	int
 * @retval	0	: on success
 * @retval	PBSE	: specified error number.
 *
 * @par MT-Safe:	no
 *
 * @par Note:
 *		Enqueue the job to the specific queue and update the queue state.
 *		Updated default attributes and resources specific to job type.
 */
int
svr_enquejob(job *pjob, char *selectspec)
{
	job *pjcur;
	pbs_queue *pque;
	int rc;
	pbs_sched *psched;
	int state_num;
	char *qtype;
	char hook_msg[HOOK_MSG_SIZE] = {0};

	state_num = get_job_state_num(pjob);

	/* make sure queue is still there, there exist a small window ... */

	pque = find_queuebyname(pjob->ji_qs.ji_queue);
	if (pque == NULL) {
		/*
		 * If it is a history job, then don't return PBSE_UNKQUE
		 * error but link the job to SERVER job list and update
		 * job history timestamp and subjob state table and return
		 * 0 (SUCCESS). INFO: The job is not associated with any
		 * queue as the queue has been already purged.
		 */
		if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) ||
		    (check_job_state(pjob, JOB_STATE_LTR_FINISHED))) {
			if (is_linked(&svr_alljobs, &pjob->ji_alljobs) == 0) {
				if (pbs_idx_insert(jobs_idx, pjob->ji_qs.ji_jobid, pjob) != PBS_IDX_RET_OK) {
					log_joberr(PBSE_INTERNAL, __func__, "Failed add history job in index", pjob->ji_qs.ji_jobid);
					return PBSE_INTERNAL;
				}
				append_link(&svr_alljobs, &pjob->ji_alljobs, pjob);
			}
			server.sv_qs.sv_numjobs++;
			if (state_num != -1)
				server.sv_jobstates[state_num]++;
			return (0);
		} else {
			return (PBSE_UNKQUE);
		}
	}

	/* add job to server's all job list and update server counts */

#ifndef NDEBUG
	(void) sprintf(log_buffer, "enqueuing into %s, state %c hop %ld",
		       pque->qu_qs.qu_name, get_job_state(pjob),
		       get_jattr_long(pjob, JOB_ATR_hopcount));
	log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG,
		  pjob->ji_qs.ji_jobid, log_buffer);
#endif /* NDEBUG */

	if (pbs_idx_insert(jobs_idx, pjob->ji_qs.ji_jobid, pjob) != PBS_IDX_RET_OK) {
		log_joberr(PBSE_INTERNAL, __func__, "Failed add job in index", pjob->ji_qs.ji_jobid);
		return PBSE_INTERNAL;
	}

	pjcur = (job *) GET_PRIOR(svr_alljobs);
	while (pjcur) {
		if (get_jattr_ll(pjob, JOB_ATR_qrank) >= get_jattr_ll(pjcur, JOB_ATR_qrank))
			break;
		pjcur = (job *) GET_PRIOR(pjcur->ji_alljobs);
	}
	if (pjcur == 0) {
		/* link first in server's list */
		insert_link(&svr_alljobs, &pjob->ji_alljobs, pjob,
			    LINK_INSET_AFTER);
	} else {
		/* link after 'current' job in server's list */
		insert_link(&pjcur->ji_alljobs, &pjob->ji_alljobs, pjob,
			    LINK_INSET_AFTER);
	}

	server.sv_qs.sv_numjobs++;
	if (state_num != -1)
		server.sv_jobstates[state_num]++;

	/* place into queue in order of queue rank starting at end */

	pjob->ji_qhdr = pque;

	pjcur = (job *) GET_PRIOR(pque->qu_jobs);
	while (pjcur) {
		if (get_jattr_ll(pjob, JOB_ATR_qrank) >= get_jattr_ll(pjcur, JOB_ATR_qrank))
			break;
		pjcur = (job *) GET_PRIOR(pjcur->ji_jobque);
	}
	if (pjcur == 0) {
		/* link first in list */
		insert_link(&pque->qu_jobs, &pjob->ji_jobque, pjob,
			    LINK_INSET_AFTER);
	} else {
		/* link after 'current' job in list */
		insert_link(&pjcur->ji_jobque, &pjob->ji_jobque, pjob,
			    LINK_INSET_AFTER);
	}

	/* update counts: queue and queue by state */

	pque->qu_numjobs++;
	if (state_num != -1)
		pque->qu_njstate[state_num]++;

	if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) || (check_job_state(pjob, JOB_STATE_LTR_FINISHED))) {
		return (0);
	}

	/* update the current location and type attribute */
	set_jattr_generic(pjob, JOB_ATR_in_queue, pque->qu_qs.qu_name, NULL, SET);

	if ((qtype = get_qattr_str(pque, QA_ATR_QType)) == NULL) {
		log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_QUEUE, LOG_ERR,
			   pjob->ji_qs.ji_jobid, "queue type must be set for queue `%s`",
			   pque->qu_qs.qu_name);
		return PBSE_NEEDQUET;
	}
	set_jattr_c_slim(pjob, JOB_ATR_queuetype, *qtype, SET);

	if (!is_jattr_set(pjob, JOB_ATR_qtime))
		set_jattr_l_slim(pjob, JOB_ATR_qtime, time_now, SET);

	/*
	 * set any "unspecified" resources which have default values,
	 * first with queue defaults, then with server defaults
	 */

	rc = set_resc_deflt((void *) pjob, JOB_OBJECT, NULL);
	if (rc)
		return rc;

	/*
	 * Ensure that all jobs has JOB_ATR_project set.
	 * It could be unset if coming from an overlay upgrade.
	 */
	if (!is_jattr_set(pjob, JOB_ATR_project))
		set_jattr_str_slim(pjob, JOB_ATR_project, PBS_DEFAULT_PROJECT, NULL);

	/* update any entity count and entity resources usage for the queue */

	if (!(pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) ||
	    (get_sattr_long(SVR_ATR_State) == SV_STATE_INIT))
		account_entity_limit_usages(pjob, pque, NULL, INCR, ETLIM_ACC_ALL);

	/*
	 * See if we need to do anything special based on type of queue
	 */

	if (pque->qu_qs.qu_type == QTYPE_Execution) {

		/* set union to "EXEC" and clear mom's address */

		if (pjob->ji_qs.ji_un_type != JOB_UNION_TYPE_EXEC) {
			pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_EXEC;
			pjob->ji_qs.ji_un.ji_exect.ji_momaddr = 0;
			pjob->ji_qs.ji_un.ji_exect.ji_momport = 0;
			pjob->ji_qs.ji_un.ji_exect.ji_exitstat = 0;
		}

		/* check the job checkpoint against the queue's  min */

		eval_chkpnt(pjob, get_qattr(pque, QE_ATR_ChkptMin));

		/*
		 * do anything needed doing regarding job dependencies,
		 * ignore this during Server recovery as the dependency
		 * was registered when the job was first enqueued.
		 */

		if (get_sattr_long(SVR_ATR_State) != SV_STATE_INIT) {
			if (is_jattr_set(pjob, JOB_ATR_depend)) {
				rc = depend_on_que(get_jattr(pjob, JOB_ATR_depend), pjob, ATR_ACTION_NOOP);
				if (rc)
					return rc;
			}
		}

		/* set eligible time */

		if (!is_jattr_set(pjob, JOB_ATR_etime) && check_job_state(pjob, JOB_STATE_LTR_QUEUED)) {
			set_jattr_l_slim(pjob, JOB_ATR_etime, time_now, SET);

			/* better notify the Scheduler we have a new job */
			if (!selectspec) {
				if (find_assoc_sched_jid(pjob->ji_qs.ji_jobid, &psched))
					set_scheduler_flag(SCH_SCHEDULE_NEW, psched);
				else {
					sprintf(log_buffer, "Unable to reach scheduler associated with job %s", pjob->ji_qs.ji_jobid);
					log_err(-1, __func__, log_buffer);
				}
			}
			if (find_assoc_sched_jid(pjob->ji_qs.ji_jobid, &psched))
				set_scheduler_flag(SCH_SCHEDULE_NEW, psched);
			else {
				sprintf(log_buffer, "Unable to reach scheduler associated with job %s", pjob->ji_qs.ji_jobid);
				log_err(-1, __func__, log_buffer);
			}
		} else if (get_sattr_long(SVR_ATR_EligibleTimeEnable) && get_sattr_long(SVR_ATR_scheduling) && !selectspec) {

			/* notify the Scheduler we have moved a job here */

			if (find_assoc_sched_jid(pjob->ji_qs.ji_jobid, &psched))
				set_scheduler_flag(SCH_SCHEDULE_MVLOCAL, psched);
			else {
				sprintf(log_buffer, "Unable to reach scheduler associated with job %s", pjob->ji_qs.ji_jobid);
				log_err(-1, __func__, log_buffer);
			}
		}

	} else if (pque->qu_qs.qu_type == QTYPE_RoutePush) {

		/* start attempts to route job */

		pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_ROUTE;
		pjob->ji_qs.ji_un.ji_routet.ji_quetime = time_now;
		pjob->ji_qs.ji_un.ji_routet.ji_rteretry = 0;
	}

	/* start postqueuejob hook */

	struct batch_request *preq;
	preq = alloc_br(PBS_BATCH_PostQueueJob);
	if (preq == NULL) {
		log_err(PBSE_INTERNAL, __func__, "failed to alloc_br for PBS_BATCH_PostQueueJob");
	} else {
		preq->rq_ind.rq_postqueuejob.rq_pjob = pjob;
		strcpy(preq->rq_ind.rq_postqueuejob.rq_jid, pjob->ji_qs.ji_jobid);
		strncpy(preq->rq_user, pbs_current_user, PBS_MAXUSER);
		strncpy(preq->rq_host, server_host, PBS_MAXHOSTNAME);

		rc = process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt);
		if (rc == -1) {
			log_eventf(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, pjob->ji_qs.ji_jobid,
				   "postqueuejob process_hooks call failed: %s", hook_msg);
		}
		free_br(preq);
	}
	return (0);
}

/**
 * @brief
 * 		svr_dequejob() - remove job from whatever queue its in and reduce counts
 *
 * @param[in]	pjob	-	The job to be enqueued.
 */

void
svr_dequejob(job *pjob)
{
	int bad_ct = 0;
	pbs_queue *pque;
	int state_num;

	/* remove job from server's all job list and reduce server counts */

	if (is_linked(&svr_alljobs, &pjob->ji_alljobs)) {
		int state_num;

		delete_link(&pjob->ji_alljobs);
		delete_link(&pjob->ji_unlicjobs);
		if (pbs_idx_delete(jobs_idx, pjob->ji_qs.ji_jobid) != PBS_IDX_RET_OK)
			log_joberr(PBSE_INTERNAL, __func__, "Failed to delete job from index", pjob->ji_qs.ji_jobid);
		if (--server.sv_qs.sv_numjobs < 0)
			bad_ct = 1;

		state_num = get_job_state_num(pjob);
		if (state_num != -1 && --server.sv_jobstates[state_num] < 0)
			bad_ct = 1;
	}

	if ((pque = pjob->ji_qhdr) != NULL) {

		/* update any entity count and entity resources usage at que */

		account_entity_limit_usages(pjob, pque, NULL, DECR,
					    pjob->ji_etlimit_decr_queued ? ETLIM_ACC_ALL_MAX : ETLIM_ACC_ALL);

		if (is_linked(&pque->qu_jobs, &pjob->ji_jobque)) {
			delete_link(&pjob->ji_jobque);
			if (--pque->qu_numjobs < 0)
				bad_ct = 1;

			state_num = get_job_state_num(pjob);
			if (state_num != -1 && --pque->qu_njstate[state_num] < 0)
				bad_ct = 1;
		}
		pjob->ji_qhdr = NULL;
	}

#ifndef NDEBUG
	sprintf(log_buffer, "dequeuing from %s, state %c",
		pque ? pque->qu_qs.qu_name : "", get_job_state(pjob));
	log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG,
		  pjob->ji_qs.ji_jobid, log_buffer);
	if (bad_ct) /* state counts are all messed up */
		correct_ct(pque);
#endif /* NDEBUG */

	mark_jattr_not_set(pjob, JOB_ATR_qtime);

	/* clear any default resource values */
	clear_default_resc(pjob);
}

/**
 * @brief
 * 		svr_setjobstate - set the job state, update the server/queue state counts,
 *		and save the job
 *
 * @param[in,out]	pjob	-	The job to be operated on.
 * @param[in]	newstate	-	new job state
 * @param[in]	newsubstate	-	new sub state of the job.
 *
 * @return	int
 * @retval	0	: success
 * @retval	!=0	: failure
 */

int
svr_setjobstate(job *pjob, char newstate, int newsubstate)
{
	pbs_queue *pque = pjob->ji_qhdr;
	pbs_sched *psched;

	/*
	 * If the job has already finished, then do not make any new changes
	 * to job state or substate.
	 */
	if (check_job_state(pjob, JOB_STATE_LTR_FINISHED) ||
	    (check_job_state(pjob, newstate) && (check_job_substate(pjob, newsubstate))))
		return (0);

	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_INFO, pjob->ji_qs.ji_jobid,
		   "Updated job state to %d and substate to %d", newstate, newsubstate);

	/*
	 * if its is a new job, then don't update counts, svr_enquejob() will
	 * take care of that, also req_commit() will see that the job is saved.
	 */

	if (!check_job_substate(pjob, JOB_SUBSTATE_TRANSICM)) {
		char oldstate = get_job_state(pjob);

		/* if the state is changing, also update the state counts */

		if (oldstate != newstate) {
			int oldstatenum;
			int newstatenum;

			oldstatenum = state_char2int(oldstate);
			newstatenum = state_char2int(newstate);
			if (oldstatenum != -1)
				server.sv_jobstates[oldstatenum]--;
			if (newstatenum != -1)
				server.sv_jobstates[newstatenum]++;
			if (pque != NULL) {
				if (oldstatenum != -1)
					pque->qu_njstate[oldstatenum]--;
				if (newstatenum != -1)
					pque->qu_njstate[newstatenum]++;

				/*
				 * if execution queue, and eligability to run
				 * has improved, kick the scheduler.
				 */

				if ((pque->qu_qs.qu_type == QTYPE_Execution) &&
				    (newstate == JOB_STATE_LTR_QUEUED)) {
					if (find_assoc_sched_jid(pjob->ji_qs.ji_jobid, &psched))
						set_scheduler_flag(SCH_SCHEDULE_NEW, psched);
					else {
						sprintf(log_buffer, "Unable to reach scheduler associated with job %s", pjob->ji_qs.ji_jobid);
						log_err(-1, __func__, log_buffer);
					}

					if (!is_jattr_set(pjob, JOB_ATR_etime))
						set_jattr_l_slim(pjob, JOB_ATR_etime, time_now, SET);

					/* clear start time (stime) */
					free_jattr(pjob, JOB_ATR_stime);

				} else if ((newstate == JOB_STATE_LTR_HELD) || (newstate == JOB_STATE_LTR_WAITING)) {
					/* on hold or wait, clear etime */
					free_jattr(pjob, JOB_ATR_etime);
					/* TODO: remove attr etime from database */
				}
			}
			/* if subjob, update parent Array Job */
			if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
				update_sj_parent(pjob->ji_parentaj, pjob, pjob->ji_qs.ji_jobid, oldstate, newstate);
				chk_array_doneness(pjob->ji_parentaj);
			}
		}
	}

	/* set the states accordingly */
	set_job_state(pjob, newstate);
	set_job_substate(pjob, newsubstate);

	/* eligible_time_enable */
	if (get_sattr_long(SVR_ATR_EligibleTimeEnable) == 1) {
		long newaccruetype;

		newaccruetype = determine_accruetype(pjob);
		update_eligible_time(newaccruetype, pjob);
	}

	/* update the job file */

	if (newstate == JOB_STATE_LTR_RUNNING) {
		if (pjob->ji_etlimit_decr_queued == FALSE) {
			account_entity_limit_usages(pjob, NULL, NULL, DECR, ETLIM_ACC_ALL_QUEUED);
			account_entity_limit_usages(pjob, pjob->ji_qhdr, NULL, DECR, ETLIM_ACC_ALL_QUEUED);
			pjob->ji_etlimit_decr_queued = TRUE;
		}
	}

	if (pjob->newobj) {
		/* object was never saved/loaded before, so new object */
		return 0;
	}

	return (job_save_db(pjob));
}

/**
 * @brief	Helper function thats re-evaluates job state and sub state.
 *
 * @param	jobp - pointer to the job
 *
 * @return	void
 */
void
svr_evalsetjobstate(job *jobp)
{
	char newstate;
	int newsub;

	/* force re-eval of job state out of Transit */
	svr_evaljobstate(jobp, &newstate, &newsub, 1);
	svr_setjobstate(jobp, newstate, newsub);
}

/**
 * @brief
 * 		svr_evaljobstate - evaluate and return the job state and substate
 *		according to the the values of the hold, execution time, and
 *		dependency attributes.  This is typically called after the job has been
 *		enqueued or the (hold, execution-time) attributes have been modified.
 * @par
 *		IF the job is a history job i.e. job state is JOB_STATE_MOVED
 *		or JOB_STATE_FINISHED, then just return state/substate without
 *		any change, irrespective of the value of "forceeval".
 *
 * @param[in]	pjob	-	pointer to the job structure
 * @param[out]	newstate	-	recommended new state for job
 * @param[out]	newsub	-	recommended new substate for job
 * @param[in]	forceeval	-	whether to forcefully change the value or not.
 *
 * @return	void
 */
void
svr_evaljobstate(job *pjob, char *newstate, int *newsub, int forceeval)
{
	/*
	 * A value MUST be assigned to newstate and newsub because
	 * they may have been passed in uninitialized. We MUST put
	 * the job in a valid state or the scheduler will bail out
	 * on subsequent cycles and not schedule ANY work. The
	 * safest thing to do is to hold the job by default.
	 */
	*newstate = JOB_STATE_LTR_HELD;
	*newsub = JOB_SUBSTATE_HELD;

	if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_FINISHED))) {

		/* History job, just return state/sub-state. */
		*newstate = get_job_state(pjob);
		*newsub = get_job_substate(pjob);

	} else if ((forceeval == 0) &&
		   (check_job_state(pjob, JOB_STATE_LTR_TRANSIT) ||
		    check_job_state(pjob, JOB_STATE_LTR_RUNNING))) {

		/* Leave as is. */
		*newstate = get_job_state(pjob);
		*newsub = get_job_substate(pjob);
	} else if (get_jattr_long(pjob, JOB_ATR_hold)) {

		*newstate = JOB_STATE_LTR_HELD;
		/* is the hold due to a dependency? */
		if ((check_job_substate(pjob, JOB_SUBSTATE_SYNCHOLD)) ||
		    (check_job_substate(pjob, JOB_SUBSTATE_DEPNHOLD))) {
			/* Retain substate. */
			*newsub = get_job_substate(pjob);
		} else {
			*newsub = JOB_SUBSTATE_HELD;
		}

	} else if (get_jattr_long(pjob, JOB_ATR_exectime) > (long) time_now) {

		*newstate = JOB_STATE_LTR_WAITING;
		*newsub = JOB_SUBSTATE_WAITING;

	} else if (is_jattr_set(pjob, JOB_ATR_stagein)) {

		*newstate = JOB_STATE_LTR_QUEUED;
		if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_StagedIn) {
			*newsub = JOB_SUBSTATE_STAGECMP;
		} else {
			*newsub = JOB_SUBSTATE_PRESTAGEIN;
		}

	} else {

		if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_ArrayJob) {
			/* This is an array job. */
			ajinfo_t *ptbl = pjob->ji_ajinfo;
			if (ptbl) {
				if (ptbl->tkm_subjsct[JOB_STATE_QUEUED] + ptbl->tkm_dsubjsct < ptbl->tkm_ct) {
					*newstate = JOB_STATE_LTR_BEGUN;
					*newsub = JOB_SUBSTATE_BEGUN;
				} else {
					/* All subjobs are queued. */
					*newstate = JOB_STATE_LTR_QUEUED;
					*newsub = JOB_SUBSTATE_QUEUED;
				}
			} else {
				sprintf(log_buffer, "Array job has no tracking table!");
				log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_ERR,
					  pjob->ji_qs.ji_jobid, log_buffer);
				*newstate = JOB_STATE_LTR_HELD;
				*newsub = JOB_SUBSTATE_HELD;
			}
		} else {
			*newstate = JOB_STATE_LTR_QUEUED;
			*newsub = JOB_SUBSTATE_QUEUED;
		}
	}
}

/**
 * @brief
 * 		get_variable - get the value associated with a specified environment
 *		variable of a job
 *
 * @param[in]	pjob	-	pointer to the job object
 * @param[in]	variable	-	string variable which needs to be searched in object attribute.
 *
 * @return	pointer to the start of the value
 * @retval	NULL	: if the variable is not found in the variable_list attribute.
 */

char *
get_variable(job *pjob, char *variable)
{
	char *pc;

	pc = arst_string(variable, get_jattr(pjob, JOB_ATR_variables));
	if (pc) {
		if ((pc = strchr(pc, (int) '=')) != 0)
			pc++;
	}
	return (pc);
}

/**
 * @brief
 * 		lookup_variable - lookup the value of a particular environment variable
 *		associated with the object.
 *
 * @param[in]	pobj	-	pointer to the object structure
 * @param[in]	objtype	-	object type
 * @param[in]	variable	-	string variable which needs to be searched in object attribute.
 *
 * @return	a pointer to the beginning of the value string
 * @retval	NULL	: pointer if the variable isn't found in the object's "variable_list"
 */

char *
lookup_variable(void *pobj, int objtype, char *variable)
{
	char *pc;
	attribute *objattr;

	if (objtype == JOB_OBJECT)
		objattr = get_jattr((job *) pobj, JOB_ATR_variables);
	else
		objattr = get_rattr((resc_resv *) pobj, RESV_ATR_variables);

	pc = arst_string(variable, objattr);
	if (pc) {
		if ((pc = strchr(pc, (int) '=')) != 0)
			pc++;
	}
	return (pc);
}

/**
 * @brief
 * 		compare the job resource limit against the system limit
 * 		unless a queue limit exists, it take priority
 *
 * @param[in]	jobatr	-	job attribute
 * @param[in]	queatr	-	resource (value) entry
 * @param[in]	svratr	-	server attribute.
 * @param[in]	qtype	-	type of queue (not used here)
 *
 * @return	number of .gt. and .lt. comparison in comp_resc_gt and comp_resc_lt
 * 			does not make use of comp_resc_eq or comp_resc_nc
 */

static void
chk_svr_resc_limit(attribute *jobatr, attribute *queatr,
		   attribute *svratr, int qtype)
{
	int rc;
	resource *jbrc;
	resource *qurc;
	resource *svrc;
	resource *cmpwith;
	static resource_def *noderesc = NULL;

	if (noderesc == NULL) {
		noderesc = &svr_resc_def[RESC_NODES];
	}
	comp_resc_gt = 0;
	comp_resc_lt = 0;

	jbrc = (resource *) GET_NEXT(jobatr->at_val.at_list);
	while (jbrc) {
		cmpwith = 0;
		if (is_attr_set(&jbrc->rs_value)) {
			qurc = find_resc_entry(queatr, jbrc->rs_defin);
			if ((qurc == 0) ||
			    ((is_attr_set(&qurc->rs_value)) == 0)) {
				/* queue limit not set, check server's */

				svrc = find_resc_entry(svratr, jbrc->rs_defin);
				if ((svrc != 0) &&
				    (is_attr_set(&svrc->rs_value))) {
					cmpwith = svrc;
				}

			} else {
				/* queue limit is set, use it */
				cmpwith = qurc;
			}

			if ((jbrc->rs_defin != noderesc) && cmpwith) {
				rc = jbrc->rs_defin->rs_comp(&cmpwith->rs_value,
							     &jbrc->rs_value);
				if (rc > 0)
					comp_resc_gt++;
				else if (rc < 0)
					comp_resc_lt++;
			}
		}
		jbrc = (resource *) GET_NEXT(jbrc->rs_link);
	}
}
/**
 * @brief
 * 		get_wt_limit - get limit set on walltime from the list of resource limits.
 *
 * @param[in]	plimit_attr	-	list of resource limits
 * @param[out]	wt_attr	-	A pointer to walltime limit
 *
 * @return	int
 * @retval	0	: if walltime limit is set
 * @retval 	1	: no walltime limit is set
 */

int
get_wt_limit(attribute *plimit_attr, attribute *wt_attr)
{
	resource *wiresc = NULL;
	if (plimit_attr == NULL || wt_attr == NULL)
		return 1;
	/* Check min_walltime if min_or_max == MIN_WALLTIME_LIMIT */
	wiresc = (resource *) GET_NEXT(plimit_attr->at_val.at_list);
	while (wiresc != NULL) {
		if ((strcasecmp(wiresc->rs_defin->rs_name, WALLTIME) == 0) && (is_attr_set(&wiresc->rs_value))) {
			*wt_attr = wiresc->rs_value;
			return 0;
		}
		wiresc = (resource *) GET_NEXT(wiresc->rs_link);
	}
	return 1;
}
/**
 * @brief
 * 		comp_wt_limits_STF - check a job's min_walltime OR max_walltime against
 * 		configured "walltime" limits.
 *
 * @param[in]	resc_minmaxwt	-	resource(min_walltime or max_walltime) to be compared
 * @param[in] 	limit_attr	-	attribute containing resource limit
 * @param[in] 	min_or_max	-	check aginst minimum walltime limit if MIN_WALLTIME_LIMIT,
 * 								else check against maximum walltime limit
 * @return	int
 * @retval 	0	: within limits OR if resc_minmaxwt == NULL OR is unset.
 * @retval	PBSE_EXCQRESC	: not within limits
 */
int
comp_wt_limits_STF(resource *resc_minmaxwt, attribute limit_attr, int min_or_max)
{
	int rc = 0;

	if (resc_minmaxwt == NULL || !(is_attr_set(&resc_minmaxwt->rs_value)))
		return 0;

	/* Check minimum walltime limit if min_or_max == MIN_WALLTIME_LIMIT */
	if (min_or_max == MIN_WALLTIME_LIMIT) {
		if ((rc = resc_minmaxwt->rs_defin->rs_comp(&(resc_minmaxwt->rs_value), &limit_attr)) < 0)
			return (PBSE_EXCQRESC);
	} else {
		/* Check maximum walltime limit*/
		if ((rc = resc_minmaxwt->rs_defin->rs_comp(&(resc_minmaxwt->rs_value), &limit_attr)) > 0)
			return (PBSE_EXCQRESC);
	}
	return 0;
}

/**
 * @brief
 * 		chk_wt_limits_STF - check a STF job's min and max wlltime against the queue
 * 		and server maximum and mininum "walltime" limits.
 * 		max_walltime will be set to resources_max.walltime(if set), if resc_maxwt == NULL
 *
 * @param[in]	resc_minwt	-	resource_list.min_walltime
 * @param[in]	resc_maxwt	-	resource_list.max_walltime
 * @param[in]	pque	-	queue
 * @param[out]	pattr	-	resource_list, resource_list.max_walltime will be set if NULL
 * @return	int
 * @retval 	0	: within limits OR if resc_minwt == NULL
 * @retval	PBSE_EXCQRESC	: not within limits
 */
int
chk_wt_limits_STF(resource *resc_minwt, resource *resc_maxwt, pbs_queue *pque, attribute *pattr)
{
	attribute wt_min_queue_limit;
	attribute wt_max_queue_limit;
	attribute wt_max_server_limit;
	resource *new_res = NULL;
	resource_def *rscdef = NULL;
	int have_max_queue_limit = 0;
	int have_max_server_limit = 0;

	if (resc_minwt == NULL)
		return 0;
	/* The following should be true:
	 min_walltime >= resources_min.walltime
	 min_walltime <= resources_max.walltime
	 max_walltime >= resources_min.walltime
	 max_walltime <= resources_max.walltime
	 */
	/* Check against queue maximum */
	if (pque && get_wt_limit(get_qattr(pque, QA_ATR_ResourceMax), &wt_max_queue_limit) == 0)
		have_max_queue_limit = 1;
	/* Check server maximum limit only if queue maximum limit is not present */
	if (!have_max_queue_limit && pque && get_wt_limit(get_sattr(SVR_ATR_ResourceMax), &wt_max_server_limit) == 0)
		have_max_server_limit = 1;

#ifndef NAS /* localmod 026 */
	/* If res_maxwt is NULL and resources_max.walltime is set on either server/queue,
	 * set resource_list.max_walltime on the server/queue to value of resources_max.walltime.
	 * If resources_max.walltime is set on both server and queue, set resource_list.max_walltime
	 * to queue's resources_max.walltime. */
	if (resc_maxwt == NULL && pattr != NULL && (have_max_queue_limit || have_max_server_limit)) {
		rscdef = &svr_resc_def[RESC_MAX_WALLTIME];
		new_res = add_resource_entry(pattr, rscdef);
		if (have_max_queue_limit)
			new_res->rs_defin->rs_set(&new_res->rs_value, &wt_max_queue_limit, SET);
		else if (have_max_server_limit)
			new_res->rs_defin->rs_set(&new_res->rs_value, &wt_max_server_limit, SET);
		mark_attr_set(&new_res->rs_value);
	}
#endif /* localmod 026 */
	/* Check against queue maximum */
	if (have_max_queue_limit) {
		if (PBSE_EXCQRESC == comp_wt_limits_STF(resc_minwt,
							wt_max_queue_limit, MAX_WALLTIME_LIMIT) ||
		    PBSE_EXCQRESC == comp_wt_limits_STF(resc_maxwt,
							wt_max_queue_limit, MAX_WALLTIME_LIMIT))
			return (PBSE_EXCQRESC);
	}
	/* Queue limit not present, check against server maximum */
	else if (have_max_server_limit) {
		if ((PBSE_EXCQRESC == comp_wt_limits_STF(resc_maxwt,
							 wt_max_server_limit, MAX_WALLTIME_LIMIT) ||
		     PBSE_EXCQRESC == comp_wt_limits_STF(resc_minwt,
							 wt_max_server_limit, MAX_WALLTIME_LIMIT)))
			return (PBSE_EXCQRESC);
	}
	/* Check against queue minimum */
	if (pque && (get_wt_limit(get_qattr(pque, QA_ATR_ResourceMin), &wt_min_queue_limit) == 0)) {
		if (PBSE_EXCQRESC == comp_wt_limits_STF(resc_minwt,
							wt_min_queue_limit, MIN_WALLTIME_LIMIT) ||
		    PBSE_EXCQRESC == comp_wt_limits_STF(resc_maxwt,
							wt_min_queue_limit, MIN_WALLTIME_LIMIT))
			return (PBSE_EXCQRESC);
	}
	return 0;
}
/**
 * @brief
 * 		chk_resc_limits - check job Resource_Limits attribute against the queue
 *		and server maximum and minimum values.
 *
 * @param[in]	pattr	-	attribute list containing resource request of the job
 * @param[in]	pque	-	queue
 *
 * @return	int
 * @retval 	0	: within limits
 * @retval 	PBSE_EXCQRESC	: not within limits
 */
int
chk_resc_limits(attribute *pattr, pbs_queue *pque)
{
	resource *atresc;
	resource *resc_maxwt = NULL;
	resource *resc_minwt = NULL;

	/* Get resource_list.min_walltime and resource_list.max_walltime if it is a STF job */
	atresc = (resource *) GET_NEXT(pattr->at_val.at_list);
	while (atresc != NULL) {
		if ((strcasecmp(atresc->rs_defin->rs_name, MIN_WALLTIME) == 0)) {
			resc_minwt = atresc;
		} else if ((strcasecmp(atresc->rs_defin->rs_name, MAX_WALLTIME) == 0)) {
			resc_maxwt = atresc;
		}
		/* No need to traverse further if both min_walltime and max_walltime are set */
		if (resc_minwt && resc_maxwt)
			break;
		atresc = (resource *) GET_NEXT(atresc->rs_link);
	}

	/* Check min and max walltime of a STF job against "walltime" resource limit on queue and server */
	if (resc_minwt != NULL && PBSE_EXCQRESC == chk_wt_limits_STF(resc_minwt, resc_maxwt, pque, pattr))
		return (PBSE_EXCQRESC);
	if ((comp_resc(get_qattr(pque, QA_ATR_ResourceMin), pattr) == -1) ||
	    comp_resc_gt)
		return (PBSE_EXCQRESC);

	/* now check individual resources against queue or server maximum */
	chk_svr_resc_limit(pattr,
			   get_qattr(pque, QA_ATR_ResourceMax),
			   get_sattr(SVR_ATR_ResourceMax),
			   pque->qu_qs.qu_type);

	if (comp_resc_lt > 0)
		return (PBSE_EXCQRESC);
	return (0);
}

/**
 * @brief
 * 		svr_chkque - check if job can enter a queue
 *
 * @note
 * 		Note: job owner must be set before calling svr_chkque()
 * 		set_objexid() will be called to set a uid/gid/name if not already set
 *
 * @param[in]	pjob	-	job structure
 * @param[in]	submithost	-	job's submit machine
 * @param[in]	hostname	-	host machine issued this check
 * @param[in]	mtype	-	MOVE_TYPE_* type;  see server_limits.h
 *
 * @return	int
 * @retval	0	: all ok, job can enter queue
 * @retval	PBSE Number	: error code
 */

int
svr_chkque(job *pjob, pbs_queue *pque, char *submithost, char *hostname, int mtype)
{
	int i;

	/* if not already set, set up a uid/gid/name */

	if (!is_jattr_set(pjob, JOB_ATR_euser) || !is_jattr_set(pjob, JOB_ATR_egroup)) {
		if ((i = set_objexid((void *) pjob, JOB_OBJECT, pjob->ji_wattr)) != 0)
			return (i); /* PBSE_BADUSER or GRP */
	}

	/*
	 * 1. If the queue is an Execution queue ...
	 *    These are checked first because 1b - 1d are more damaging
	 *    (see local_move() in svr_movejob.c)
	 */

	if (pque->qu_qs.qu_type == QTYPE_Execution) {

		/* 1b. Check site restrictions */

		if (site_acl_check(pjob, pque))
			return (PBSE_PERM);

		/* 1c. cannot have an unknown resource */

		if (find_resc_entry(get_jattr(pjob, JOB_ATR_resource),
				    svr_resc_def + svr_resc_unk))
			return (PBSE_UNKRESC);

		/* 1d. cannot have an unknown attribute */

		if (is_jattr_set(pjob, JOB_ATR_UNKN))
			return (PBSE_NOATTR);
	}

	/* checks 2, 2a, and 3 are bypassed for a move by manager or qorder */

	if ((mtype != MOVE_TYPE_MgrMv) && (mtype != MOVE_TYPE_Order)) {

		/* 2. the queue must be enabled and the job limit not exceeded */

		if (get_qattr_long(pque, QA_ATR_Enabled) == 0)
			return (PBSE_QUNOENB);

		if (is_qattr_set(pque, QA_ATR_MaxJobs)) {
			int histjobs = 0;
			if (svr_chk_history_conf()) {
				/* calculate number of finished and moved jobs */
				histjobs = pque->qu_njstate[JOB_STATE_MOVED] +
					   pque->qu_njstate[JOB_STATE_FINISHED] +
					   pque->qu_njstate[JOB_STATE_EXPIRED];
			}
			/*
			 * check number of jobs in queue excluding
			 * finished and moved jobs
			 */
			if ((pque->qu_numjobs - histjobs) >= get_qattr_long(pque, QA_ATR_MaxJobs))
				return (PBSE_MAXQUED);
		}

		/* 2a. if job array, check for queue max_array_size */

		if (is_qattr_set(pque, QA_ATR_maxarraysize)) {
			if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_ArrayJob) &&
			    (pjob->ji_ajinfo != NULL)) {
				if (pjob->ji_ajinfo->tkm_ct > get_qattr_long(pque, QA_ATR_maxarraysize))
					return (PBSE_MaxArraySize);
			}
		}

		/* 3. If "from_route_only" is true, only local route allowed */

		if (is_qattr_set(pque, QA_ATR_FromRouteOnly) && get_qattr_long(pque, QA_ATR_FromRouteOnly) == 1)
			if (mtype == MOVE_TYPE_Move) /* ok if not plain user or scheduler */
				return (PBSE_QACESS);
	}

	/* 4. If enabled, check the queue's host ACL */

	if (get_qattr_long(pque, QA_ATR_AclHostEnabled))
		if ((acl_check(get_qattr(pque, QA_ATR_AclHost),
			      submithost, ACL_Host) == 0) &&
			(acl_check(get_qattr(pque, QA_ATR_AclHost),
			      hostname, ACL_Host) == 0))
			if (mtype != MOVE_TYPE_MgrMv) /* ok if mgr */
				return (PBSE_BADHOST);

	/* 5a. If enabled, check the queue's user ACL */

	if (get_qattr_long(pque, QA_ATR_AclUserEnabled))
		if (acl_check(get_qattr(pque, QA_ATR_AclUsers),
			      get_jattr_str(pjob, JOB_ATR_job_owner), ACL_User) == 0)
			if (mtype != MOVE_TYPE_MgrMv) /* ok if mgr */
				return (PBSE_PERM);

	/* 5b. If enabled, check the queue's group ACL */

	if (get_qattr_long(pque, QE_ATR_AclGroupEnabled))
		if (acl_check(get_qattr(pque, QE_ATR_AclGroup),
			      get_jattr_str(pjob, JOB_ATR_euser),
			      ACL_Group) == 0)
			if (mtype != MOVE_TYPE_MgrMv) /* ok if mgr */
				return (PBSE_PERM);

	/* 6. If enabled, check the queue's required cred type */

	if (is_qattr_set(pque, QA_ATR_ReqCredEnable) &&
	    get_qattr_long(pque, QA_ATR_ReqCredEnable) &&
	    is_qattr_set(pque, QA_ATR_ReqCred)) {
		char *reqc = get_qattr_str(pque, QA_ATR_ReqCred);
		char *jobc = get_jattr_str(pjob, JOB_ATR_cred);
		/*
		 **	The queue requires a cred, if job has none, or
		 **	it is the wrong one, and if not mgr, reject.
		 */
		if ((!is_jattr_set(pjob, JOB_ATR_cred) || strcmp(reqc, jobc) != 0) && mtype != MOVE_TYPE_MgrMv)
			return PBSE_BADCRED;
	}

	/* checks 7 and 7a are bypassed for a move by manager or qorder */
	if (mtype != MOVE_TYPE_MgrMv) {
		/* 7. resources of the job must be in the limits of the queue */

		/* 7a. Check limit on number of jobs per entity in queue */

		i = check_entity_ct_limit_max(pjob, pque);
		if (i != 0)
			return i;

		i = check_entity_ct_limit_queued(pjob, pque);
		if (i != 0)
			return i;

		/* 7b. Check limit on number of jobs per entity in server only if */
		/*     this is a new job defined by state == JOB_STATE_LTR_TRANSIT    */
		if (check_job_state(pjob, JOB_STATE_LTR_TRANSIT)) {
			i = check_entity_ct_limit_max(pjob, NULL);
			if (i != 0)
				return i;
			i = check_entity_ct_limit_queued(pjob, NULL);
			if (i != 0)
				return i;
		}
	}

	/* Need to unset current default resources and reset them */
	/* from new queue before check if can enter that queue    */

	clear_default_resc(pjob);
	i = set_resc_deflt(pjob, JOB_OBJECT, pque);
	if (i == 0) {

		/* checks 7c and 7d are bypassed for a move by manager or qorder */
		if (mtype != MOVE_TYPE_MgrMv) {
			/* 7c. Check resource limits per entity in queue */
			i = check_entity_resc_limit_max(pjob, pque, NULL);
			if (i == 0)
				i = check_entity_resc_limit_queued(pjob, pque, NULL);

			if (i == 0) {
				/* 7d. Check resource limits per entity in server if this */
				/*     is a new job defined by state == JOB_STATE_TRANSIT */
				i = check_entity_resc_limit_max(pjob, NULL, NULL);
				if (i == 0)
					i = check_entity_resc_limit_queued(pjob, NULL, NULL);

				if (i == 0) {
					/* 7e.  test old gateing limits */
					i = chk_resc_limits(get_jattr(pjob, JOB_ATR_resource), pque);
				}
			}
		}
	}

	/* after check unset defaults & reset based on current queue, if one */
	if (pjob->ji_qhdr) {
		clear_default_resc(pjob);
		(void) set_resc_deflt(pjob, JOB_OBJECT, NULL);
	}

	if (i != 0)
		if (mtype != MOVE_TYPE_MgrMv) /* ok if mgr */
			return (i);

	return (0); /* all ok, job can enter queue */
}

/**
 * @brief
 *		check_block_wt	-	A worktask to reply to the blocked job client
 *
 * @param[in]	ptask	-	work_task structure
 */
void
check_block_wt(struct work_task *ptask)
{
	struct block_job_reply *blockj = ptask->wt_parm1;
	struct pollfd fds[1];
	int rc;
	pbs_socklen_t len = sizeof(rc);
	int conn = 0;
	int ret = 0;
	int check_error;

	if (blockj->fd == -1) {
		int sock_flags;
		struct hostent *hp;
		struct sockaddr_in remote;

		if ((hp = gethostbyname(blockj->client)) == NULL) {
			sprintf(log_buffer, "client host %s not found for block job %s",
				blockj->client, blockj->jobid);
			goto err;
		}

		memset(&remote, 0, sizeof(remote));
		memcpy(&remote.sin_addr, hp->h_addr, hp->h_length);
		remote.sin_port = htons((unsigned short) blockj->port);
		remote.sin_family = hp->h_addrtype;

		if ((blockj->fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
			sprintf(log_buffer, "Failed to create socket for job %s", blockj->jobid);
			goto err;
		}

		/* Set socket to Non-blocking */
		sock_flags = fcntl(blockj->fd, F_GETFL, 0);
		if (fcntl(blockj->fd, F_SETFL, sock_flags | O_NONBLOCK) == -1) {
			sprintf(log_buffer, "Failed to set non-blocking flag on socket for job %s",
				blockj->jobid);
			goto err;
		}

		conn = connect(blockj->fd, (struct sockaddr *) &remote, sizeof(remote));
		if ((conn == -1) && !(errno == EINPROGRESS || errno == EWOULDBLOCK)) {
			goto retry;
		}
	}

	while (1) {
		fds[0].fd = blockj->fd;
		fds[0].events = POLLOUT;
		fds[0].revents = 0;

		rc = poll(fds, (nfds_t) 1, 0);
		if (rc == -1) {
			if ((errno != EAGAIN) && (errno != EINTR))
				break;
		} else
			break; /* no error */
	}

	if (rc <= 0)
		goto retry;

	rc = 0;
	check_error = getsockopt(fds[0].fd, SOL_SOCKET, SO_ERROR, &rc, &len);
	if ((rc != 0) || (check_error != 0) || (fds[0].revents != POLLOUT))
		goto retry;

	rc = CS_server_auth(blockj->fd);
	if ((rc != CS_SUCCESS) && (rc != CS_AUTH_CHECK_PORT)) {
		sprintf(log_buffer, "Unable to authenticate with %s:%d", blockj->client, blockj->port);
		goto err;
	}

	/*
	**	All ready to talk... now send the info.
	*/

	DIS_tcp_funcs();
	ret = diswsi(blockj->fd, 1); /* version */
	if (ret != DIS_SUCCESS)
		goto err;
	ret = diswst(blockj->fd, blockj->jobid);
	if (ret != DIS_SUCCESS)
		goto err;
	if (blockj->msg == NULL) {
		ret = diswst(blockj->fd, "");
	} else {
		ret = diswst(blockj->fd, blockj->msg);
	}
	if (ret != DIS_SUCCESS)
		goto err;
	ret = diswsi(blockj->fd, blockj->exitstat);
	if (ret != DIS_SUCCESS)
		goto err;
	(void) dis_flush(blockj->fd);

	sprintf(log_buffer, "%s: Write successful to client %s for job %s ", __func__,
		blockj->client, blockj->jobid);
	log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_NOTICE, blockj->jobid, log_buffer);
	dis_destroy_chan(blockj->fd);
	CS_close_socket(blockj->fd);
	goto end;

retry:
	if ((time(0) - blockj->reply_time) < BLOCK_JOB_REPLY_TIMEOUT) {
		set_task(WORK_Timed, time_now + 10, check_block_wt, blockj);
		return;
	} else {
		sprintf(log_buffer, "Unable to reply to client %s for job %s",
			blockj->client, blockj->jobid);
	}
err:
	DIS_tcp_funcs();
	dis_destroy_chan(blockj->fd);
	if (ret != DIS_SUCCESS) {
		sprintf(log_buffer, "DIS error while replying to client %s for job %s",
			blockj->client, blockj->jobid);
	}
	log_err(-1, __func__, log_buffer);
end:
	if (blockj->fd != -1)
		close(blockj->fd);
	free(blockj->msg);
	free(blockj);
}

/**
 * @brief
 *		check_block	-	See if "block" is set and send reply.
 *
 * @param[in,out]	pjob	-	job structure
 * @param[in]	message	-	message needs to be send to the port.
 */
void
check_block(job *pjob, char *message)
{
	int port;
	char *phost;
	char *jobid = pjob->ji_qs.ji_jobid;
	struct block_job_reply *blockj;

	if ((is_jattr_set(pjob, JOB_ATR_block)) == 0)
		return;
	if ((get_jattr_long(pjob, JOB_ATR_block)) == -1)
		return;

	port = (int) get_jattr_long(pjob, JOB_ATR_block);
	/*
	 * The blocking attribute of the job needs to be unset . This contains the port number on which the job
	 * submission host is waiting for the exit status of the job . This is done here i.e check_block() as it is the
	 * final function in processing of a blocking job .
	 *
	 * Since for posterity it would be useful to record the fact that a job was a blocking job we set the
	 * port number to an impossible value instead of clearing it so that the database only contains
	 * a reference to the fact that a history job was a blocking job . Port number need not be recorded .
	 */
	set_jattr_l_slim(pjob, JOB_ATR_block, -1, SET);

	phost = get_jattr_str(pjob, JOB_ATR_submit_host);
	if (port == 0 || phost == NULL) {
		sprintf(log_buffer, "%s: cannot reply %s:%d", __func__,
			phost == NULL ? "<no host>" : phost, port);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
			  jobid, log_buffer);
		return;
	}

	blockj = (struct block_job_reply *) malloc(sizeof(struct block_job_reply));
	if (blockj == NULL) {
		sprintf(log_buffer, "%s: Unable to allocate memory for the job %s", __func__, jobid);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
			  jobid, log_buffer);
		return;
	}

	blockj->msg = strdup(message);
	strncpy(blockj->client, phost, PBS_MAXHOSTNAME);
	blockj->client[PBS_MAXHOSTNAME - 1] = '\0';
	blockj->port = port;
	blockj->fd = -1;
	blockj->reply_time = time(NULL);
	blockj->exitstat = pjob->ji_qs.ji_un.ji_exect.ji_exitstat;
	strcpy(blockj->jobid, pjob->ji_qs.ji_jobid);

	set_task(WORK_Immed, 0, check_block_wt, blockj);
	return;
}

/**
 * @brief
 * 		job_wait_over - The execution wait time of a job has been reached, at
 *		least according to a work_task entry via which we were invoked.
 * @par
 *		IMP: Should not invoke/create any such work task(s) for history
 *	     	jobs (job with state JOB_STATE_MOVED or JOB_STATE_FINISHED).
 * @par
 *		If indeed the case, re-evaluate and set the job state.
 *
 * @param[in]	pwt	-	work task structure
 */

static void
job_wait_over(struct work_task *pwt)
{
	char newstate;
	int newsub;
	job *pjob;

	pjob = (job *) pwt->wt_parm1;

	/* If history job, just return from here */
	if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_FINISHED)))
		return;

#ifndef NDEBUG
	{
		time_t now = time(NULL);
		time_t when = get_jattr_long((job *) pjob, JOB_ATR_exectime);
		struct work_task *ptask;

		if (when > now) {
			sprintf(log_buffer, msg_badwait, ((job *) pjob)->ji_qs.ji_jobid);
			log_err(-1, "job_wait_over", log_buffer);

			/* recreate the work task entry */

			ptask = set_task(WORK_Timed, when, job_wait_over, pjob);
			if (ptask) {
				append_link(&pjob->ji_svrtask,
					    &ptask->wt_linkobj, ptask);
			}
			return;
		}
	}
#endif
	pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_HASWAIT;

	/* clear the exectime attribute */
	free_jattr(pjob, JOB_ATR_exectime);
	svr_evaljobstate(pjob, &newstate, &newsub, 0);
	svr_setjobstate(pjob, newstate, newsub);
}

/**
 * @brief
 * 		job_set_wait - set up a work task entry that will trigger at the execution
 *		wait time of the job.
 * @par
 *		IMP: History jobs:
 *	     If the SERVER is configured for history jobs and the job is
 *	     in state JOB_STATE_MOVED or JOB_STATE_FINISHED, then do not
 *	     create/schedule any further work task on this job which may
 *	     modify the HISTORY jobs.
 * @par
 *		This is called as the at_action (see attribute.h) function associated
 *		with the execution-time job attribute.
 * 		parameter pjob is a job * cast to a void *
 * 		parameter mode is unused;  do it for all action modes
 *
 * @param[in]	pattr	-	execution-time job attribute.
 * @param[in]	pjob	-	pjob is a job * cast to a void *
 * @param[in]	pattr	-	mode is unused;  do it for all action modes
 */

int
job_set_wait(attribute *pattr, void *pjob, int mode)
{
	struct work_task *ptask;
	long when;

	/* Return 0 if it is history job */
	if (check_job_state((job *) pjob, JOB_STATE_LTR_MOVED) || check_job_state((job *) pjob, JOB_STATE_LTR_FINISHED))
		return (0);

	if (!is_attr_set(pattr))
		return (0);
	when = pattr->at_val.at_long;
	ptask = (struct work_task *) GET_NEXT(((job *) pjob)->ji_svrtask);

	/* Is there already an entry for this job?  Then reuse it */

	if (((job *) pjob)->ji_qs.ji_svrflags & JOB_SVFLG_HASWAIT) {
		while (ptask) {
			if ((ptask->wt_event == WORK_Timed) &&
			    (ptask->wt_func == job_wait_over) &&
			    (ptask->wt_parm1 == pjob)) {
				ptask->wt_event = when;
				return (0);
			}
			ptask = (struct work_task *) GET_NEXT(ptask->wt_linkobj);
		}
	}

	ptask = set_task(WORK_Timed, when, job_wait_over, pjob);
	if (ptask == NULL)
		return (-1);
	append_link(&((job *) pjob)->ji_svrtask, &ptask->wt_linkobj, ptask);

	/* set JOB_SVFLG_HASWAIT to show job has work task entry */

	((job *) pjob)->ji_qs.ji_svrflags |= JOB_SVFLG_HASWAIT;
	return (0);
}

/**
 * @brief
 * 		default_std - make the default name for standard output or error
 *		"job_name".[e|o]job_sequence_number
 *		or
 *		"job_name".[e|o]job_sequence_number^index^ for an Array Job
 * 		parameter key is 'e' for stderr, 'o' for stdout
 *	 	parameter to points to a buffer into which the name is returned; callers
 *		are responsible for ensuring that the buffer is of sufficient size
 *
 * @param[in]	pjob	-	pointer to job structure
 * @param[in]	key	-	the letter before the sequence number
 * @param[out]	to	-	output name
 */

static void
default_std(job *pjob, int key, char *to)
{
	int len;
	char *pd;

	pd = strrchr(get_jattr_str(pjob, JOB_ATR_jobname), '/');
	if (pd)
		++pd;
	else
		pd = get_jattr_str(pjob, JOB_ATR_jobname);
	len = strlen(pd);

	(void) strcpy(to, pd);	    /* start with the job name */
	*(to + len++) = '.';	    /* the dot        */
	*(to + len++) = (char) key; /* the letter     */
	pd = pjob->ji_qs.ji_jobid;  /* the seq_number */
	while (isdigit((int) *pd))
		*(to + len++) = *pd++;
	*(to + len) = '\0';
	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_ArrayJob) {
		/* Array Job - append special substituation string for index */
		strcat(to, ".");
		strcat(to, PBS_FILE_ARRAY_INDEX_TAG);
	}
}

/**
 * @brief
 * 		prefix_std_file - build the absolute pathname for the job's standard
 * 		output or error:
 *		outputhost:$PBS_O_WORKDIR/job_name.[eo]job_sequence_number
 *
 * @param[in]	pjob	-	pointer to job structure
 * @param[in]	key	-	integer that is either 'e' or 'o'
 *
 * @return	char *
 * @retval	NULL	-	Failed to construct prefix
 * @retval	!NULL	-	Pointer to the prefix string
 */
char *
prefix_std_file(job *pjob, int key)
{
	char *name = NULL;
	char *outputhost;
	char *wdir;

	if (pbs_conf.pbs_output_host_name)
		outputhost = pbs_conf.pbs_output_host_name;
	else
		outputhost = get_jattr_str(pjob, JOB_ATR_submit_host);
	wdir = get_variable(pjob, "PBS_O_WORKDIR");
	if (outputhost) {
		int len;

		len = strlen(outputhost) +
		      strlen(get_jattr_str(pjob, JOB_ATR_jobname)) + PBS_MAXSEQNUM + strlen(PBS_FILE_ARRAY_INDEX_TAG) + 6;
		if (wdir)
			len += strlen(wdir);
		name = malloc(len);
		if (name) {
			strcpy(name, outputhost); /* the qsub host name	*/
			strcat(name, ":");	  /* the :		*/
			if (wdir) {
				strcat(name, wdir); /* the qsub cwd		*/
				strcat(name, "/");  /* the final /		*/
			}
			/* now add the rest	*/
			default_std(pjob, key, name + strlen(name));
		}
	}
	return (name);
}

/**
 * @brief
 * 		cat_default_std  - function concatenates the default name for the job's
 * 		stdout/err filename to the string residing in buffer "in".  Space for the
 * 		newly created string is dynamically acquired and returned to the caller
 * 		via the argument "out".  It is the responsibility of some other function
 * 		to free this heap memory when it is no longer needed.
 * @par
 * 		parameter key is 'e' for stderr, 'o' for stdout
 * 		parameter in is the location of the input buffer
 * 		parameter out is the return location of the result string
 *
 * @param[in]	pjob	-	job structure
 * @param[in]	key	-	the letter before the sequence number
 * @param[in]	in	-	the string residing in buffer "in".
 * @param[in]	out	-	Space for the newly created string.
 */
void
cat_default_std(job *pjob, int key, char *in, char **out)
{
	char *result;
	int len;
	len = strlen(in) +
	      strlen(get_jattr_str(pjob, JOB_ATR_jobname)) +
	      PBS_MAXSEQNUM + 5 + strlen(PBS_FILE_ARRAY_INDEX_TAG) + 1;
	if ((result = malloc(len))) {
		strcpy(result, in);
		default_std(pjob, key, &result[strlen(result)]);
	}
	*out = result;
}

/**
 * @brief
 * 		cvrt_fqn_to_name - copy the name only (no @host suffix) to the "to" buffer.
 *		"to" buffer is (PBS_MAXUSER+1) characters long. Null terminate string.
 *
 * @param[in]	from	-	 basic job owner's name
 * @param[out]	to	-	"to" buffer where name is copied.
 */
void
cvrt_fqn_to_name(char *from, char *to)
{
	int i;

	for (i = 0; i < PBS_MAXUSER; ++i) {
		if ((*(from + i) == '@') || (*(from + i) == '\0'))
			break;
		*(to + i) = *(from + i);
	}
	*(to + i) = '\0';
}

/**
 * @brief
 *		get_hostPart - return a pointer to the "host" part of a "name@host"
 *		string.
 *
 * @param[in]	from	-	pointer to a string of the form user@host
 *
 * @return	char *
 * @retval	pointer to host part of the input string
 * @retval 	NULL	: if no '@' in input or host part following the '@' is null
 *
 * @par MT-safe:	yes
 */
char *
get_hostPart(char *from)
{
	char *pc;

	if ((pc = strchr(from, '@')) == NULL)
		return NULL;
	else if (*(++pc) == '\0')
		return NULL;
	return pc;
}

#define CVT_SIZE 1500

/**
 * @brief
 * 		set_select_and_place - create a select and place resource for the job
 *
 * @param[in]	objtype	-	type of the object
 * @param[in]	pobj	-	pointer to void * object, not used here.
 * @param[in]	patr	-	pointer to attriute structure which contains the resources.
 *
 * @return	int
 * @retval	0	: success
 * @retval	PBSE_Error	: Error Code
 *
 * @par MT-safe:	No.
 */
int
set_select_and_place(int objtype, void *pobj, attribute *patr)
{
	pbs_list_head collectresc;
	static char *cvt = NULL;
	static size_t cvt_len;
	char *ndspec;
	resource *presc;
	resource *prescsl;
	resource *prescpc;
	resource_def *prdefnd;
	resource_def *prdefpc;
	resource_def *prdefsl;
	int rc;
	extern int resc_access_perm;

	if (cvt == NULL) {
		cvt = malloc(CVT_SIZE);
		if (cvt == NULL)
			return PBSE_SYSTEM;
		else
			cvt_len = CVT_SIZE;
	}

	prdefpc = &svr_resc_def[RESC_PLACE];
	prdefnd = &svr_resc_def[RESC_NODES];
	prdefsl = &svr_resc_def[RESC_SELECT];
	presc = find_resc_entry(patr, prdefnd);

	/* add "select" and "place" resource */

	prescsl = add_resource_entry(patr, prdefsl);
	if (prescsl == NULL)
		return PBSE_SYSTEM;

	prescpc = find_resc_entry(patr, prdefpc);
	if (prescpc == NULL) {
		prescpc = add_resource_entry(patr, prdefpc);
		if (prescpc == NULL)
			return PBSE_SYSTEM;
	}

	if (presc && ((ndspec = presc->rs_value.at_val.at_str) != NULL)) {

		/* Have a nodes spec, use it  to make select and place */

		if ((rc = cvt_nodespec_to_select(ndspec, &cvt, &cvt_len, patr)) != 0)
			return rc;

		if ((rc = prdefsl->rs_decode(&prescsl->rs_value, NULL, "select", cvt)) != 0)
			return rc;
		prescsl->rs_value.at_flags |= ATR_VFLAG_DEFLT;

		if (strstr(ndspec, "#excl") != NULL) {
			prdefpc->rs_decode(&prescpc->rs_value, NULL, "place", "scatter:excl");
		} else if (strstr(ndspec, "#shared") != NULL) {
			prdefpc->rs_decode(&prescpc->rs_value, NULL, "place", "scatter:share");
		} else {
			prdefpc->rs_decode(&prescpc->rs_value, NULL, "place", "scatter");
		}

	} else {
		attribute_def *objatrdef;

		/* No nodes spec, use ncpus/mem/arch/host/software	*/
		/* from the resource_List attribute			*/

		if (objtype == JOB_OBJECT)
			objatrdef = &job_attr_def[(int) JOB_ATR_resource];
		else
			objatrdef = &resv_attr_def[(int) RESV_ATR_resource];

		CLEAR_HEAD(collectresc);
		resc_access_perm = READ_ONLY;
		if (objatrdef->at_encode(patr, &collectresc, objatrdef->at_name, NULL, ATR_ENCODE_CLIENT, NULL) > 0) {
			svrattrl *psvrl;

			*cvt = '1';
			*(cvt + 1) = '\0';
			psvrl = (svrattrl *) GET_NEXT(collectresc);
			while (psvrl) {
				resource_def *prdefcopy;

				prdefcopy = find_resc_def(svr_resc_def, psvrl->al_resc);
				if (prdefcopy && (prdefcopy->rs_flags & ATR_DFLAG_CVTSLT)) {
					size_t cvtneed;

					/* how much space is needed in cvt buffer, 	 */
					/* +5 = one for : = possible quotes and null */
					cvtneed = strlen(psvrl->al_resc) +
						  strlen(psvrl->al_value) + 5;
					if ((strlen(cvt) + cvtneed) > cvt_len) {
						/* double cvt buffer */
						char *tcvt;
						tcvt = realloc(cvt, 2 * cvt_len);
						if (tcvt) {
							cvt = tcvt;
							cvt_len *= 2;
						} else {
							log_event(PBSEVENT_ERROR,
								  PBS_EVENTCLASS_SERVER, LOG_ALERT,
								  msg_daemonname,
								  "unable to malloc space");
							return PBSE_SYSTEM;
						}
					}
					strcat(cvt, ":");
					strcat(cvt, psvrl->al_resc);
					strcat(cvt, "=");
					if (strpbrk(psvrl->al_value, "\"'+:=()")) {
						char *quotec;
						if (strchr(psvrl->al_value, (int) '"'))
							quotec = "'";
						else
							quotec = "\"";
						strcat(cvt, quotec);
						strcat(cvt, psvrl->al_value);
						strcat(cvt, quotec);
					} else {
						strcat(cvt, psvrl->al_value);
					}
				}
				psvrl = (svrattrl *) GET_NEXT(psvrl->al_link);
			}
			free_attrlist(&collectresc);
		} else {
			strcpy(cvt, "ncpus=1");
		}
		if (prdefsl->rs_decode(&prescsl->rs_value, NULL, "select", cvt) == 0) {

			if (objtype == JOB_OBJECT) { /* set default flg only on jobs */
				prescsl->rs_value.at_flags |= ATR_VFLAG_DEFLT;
				if ((prescpc->rs_value.at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) != ATR_VFLAG_SET)
					if (prdefpc->rs_decode(&prescpc->rs_value, NULL, "place", "pack") == 0)

						if (objtype == JOB_OBJECT) /* set default flg only on jobs */
							prescpc->rs_value.at_flags |= ATR_VFLAG_DEFLT;
			}
		}
	}
	return 0;
}

/**
 * @brief
 * 		set_chunk_sums() - set the sums of consumable resources listed
 *		in the chunks as job limits if not already set.
 *
 * @param[in]	pselectattr	-	attribute structure from which we parse the select directive
 * @param[in]	pattr	-	attribute structure where resource limit is set.
 *
 * @return	int
 * @retval	0	: success
 * @retval	PBSE_Error	: Error Code
 *
 * @par MT-safe:	No.
 */

int
set_chunk_sum(attribute *pselectattr, attribute *pattr)
{
	char *chunk;
	int i;
	int j;
	int nchk;
	int nelem;
	int rc;
	int default_flag;
	int total_chunks = 0;
	struct key_value_pair *pkvp;
	resource *presc;
	resource_def *pdef;
	static attribute tmpatr;

	if ((pselectattr == NULL) || (pattr == NULL))
		return 0;

	/* first clear the summation table used later */

	for (i = 0; svr_resc_sum[i].rs_def; ++i) {
		(void) memset((char *) &svr_resc_sum[i].rs_attr, 0, sizeof(struct attribute));

		svr_resc_sum[i].rs_set = 0;
		svr_resc_sum[i].rs_prs = NULL;
	}

	/* now, look through the resource limits specified for the job        */
	/* if any matches an entry in the table, set the pointer and set flag */

	presc = (resource *) GET_NEXT(pattr->at_val.at_list);
	while (presc) {
		for (i = 0; svr_resc_sum[i].rs_def; ++i) {
			if (strcmp(presc->rs_defin->rs_name, svr_resc_sum[i].rs_def->rs_name) == 0) {
				/* found one, save the resource ptr in sum table */
				svr_resc_sum[i].rs_prs = presc;
			}
		}
		presc = (resource *) GET_NEXT(presc->rs_link);
	}

	/* now, parse the select directive */

	chunk = parse_plus_spec(pselectattr->at_val.at_str, &rc);
	if (rc != 0)
		return rc;
	while (chunk) {
		if (parse_chunk(chunk, &nchk, &nelem, &pkvp, NULL) == 0) {
			total_chunks += nchk;
			for (j = 0; j < nelem; ++j) {
				for (i = 0; svr_resc_sum[i].rs_def; ++i) {
					if (strcmp(svr_resc_sum[i].rs_def->rs_name, pkvp[j].kv_keyw) == 0) {
						rc = svr_resc_sum[i].rs_def->rs_decode(&tmpatr, 0,
										       0, pkvp[j].kv_val);
						if (rc != 0)
							return rc;
						else if (!is_attr_set(&tmpatr))
							return PBSE_BADATVAL; /* illegal null value */
						if (svr_resc_sum[i].rs_def->rs_type == ATR_TYPE_SIZE)
							tmpatr.at_val.at_size.atsv_num *= nchk;
						else if (svr_resc_sum[i].rs_def->rs_type == ATR_TYPE_FLOAT)
							tmpatr.at_val.at_float *= nchk;
						else
							tmpatr.at_val.at_long *= nchk;

						(void) svr_resc_sum[i].rs_def->rs_set(&svr_resc_sum[i].rs_attr, &tmpatr, INCR);
						svr_resc_sum[i].rs_set = 1;
						break;
					}
				}
			}
		} else {
			return (PBSE_BADATVAL);
		}
		chunk = parse_plus_spec(NULL, &rc);
		if (rc != 0)
			return rc;
	}

	/* check that the user asked for at least one chunk total */

	if (total_chunks <= 0)
		return (PBSE_BADATVAL);

	/*
	 * now that we have summed up the chunks, for each one summed (set) ...
	 * set or reset the corresponding job wide limit
	 */
	for (i = 0; svr_resc_sum[i].rs_def; ++i) {
		if (svr_resc_sum[i].rs_set) {

			if (svr_resc_sum[i].rs_prs) {
				presc = svr_resc_sum[i].rs_prs;
				default_flag = presc->rs_value.at_flags & ATR_VFLAG_DEFLT;
			} else {
				default_flag = ATR_VFLAG_DEFLT;
				presc = add_resource_entry(pattr, svr_resc_sum[i].rs_def);
				if (presc == NULL)
					return PBSE_SYSTEM;
			}
			(void) svr_resc_sum[i].rs_def->rs_set(&presc->rs_value, &svr_resc_sum[i].rs_attr, SET);
			presc->rs_value.at_flags |= default_flag;
		}
	}

	/* set pseudo-resource "nodect" to the number of chunks */

	pdef = &svr_resc_def[RESC_NODECT];
	if (pdef) {
		presc = find_resc_entry(pattr, pdef);
		if (presc == NULL)
			presc = add_resource_entry(pattr, pdef);
		if (presc) {
			presc->rs_value.at_val.at_long = total_chunks;
			presc->rs_value.at_flags |= ATR_VFLAG_DEFLT | ATR_SET_MOD_MCACHE;
		}
	}
	return 0;
}

/**
 * @par
 * 		make_schedselect - decode a selection specification,  and produce the
 *		the "schedselect" attribute which contains any default resources
 *		missing from the chunks in the select spec.
 *		Also translates the value of any boolean resource to the "formal"
 *		value of "True" or "False" for the Scheduler who needs to know it
 *		is a boolean and not a string or number.
 *
 *	@param[in]	patrl	-	(not used)
 * 	@param[in]	pselect -	pointer to the select specification
 * 	@param[in]	pque	-	used to obtain queue defaults
 *	@param[in,out]	psched	-	scheduler attribute
 *
 *	@return	int
 *	@retval	0	: success
 *	@retval	PBSE_Error	: Error Code.
 *
 *	@par MT-safe:	No.
 */

extern int resc_access_perm;

int
make_schedselect(attribute *patrl, resource *pselect,
		 pbs_queue *pque, attribute *psched)
{
	int rc;
	char *sched_select_out = NULL;

	if ((pselect == NULL) || (psched == NULL)) {
		return (PBSE_SYSTEM);
	}

	rc = do_schedselect(pselect->rs_value.at_val.at_str, (struct server *) &server, (pbs_queue *) pque, &resc_in_err, &sched_select_out);

	if (rc == 0) {
		free_str(psched);
		(void) decode_str(psched, NULL, NULL, sched_select_out);
		psched->at_flags |= ATR_VFLAG_DEFLT;
	}
	return (rc);
}

/**
 * @brief
 * 		set_deflt_resc - set resource attributes based on a set of defaults provided
 *
 *	@param[in,out]	jb	-	is the job resource list attribute
 *	@param[in]	dflt	-	is the parent object (queue, server, ...) list of defaults
 *	@param[in]	selflg	-	if set means set select/place from the defaults
 */

static void
set_deflt_resc(attribute *jb, attribute *dflt, int selflg)
{
	resource *prescjb;
	resource *prescdt;
	resource_def *seldef;
	resource_def *plcdef;

	seldef = &svr_resc_def[RESC_SELECT];
	plcdef = &svr_resc_def[RESC_PLACE];

	if (is_attr_set(dflt)) {

		/* for each resource in the default value list */

		for (prescdt = (resource *) GET_NEXT(dflt->at_val.at_list);
		     prescdt;
		     prescdt = (resource *) GET_NEXT(prescdt->rs_link)) {

			if ((prescdt->rs_defin == seldef) ||
			    (prescdt->rs_defin == plcdef)) {
				if (!selflg)
					continue; /* dont use select/place */
			}

			if (is_attr_set(&prescdt->rs_value)) {
				/* see if the job already has that resource */
				prescjb = find_resc_entry(jb, prescdt->rs_defin);
				if ((prescjb == NULL) ||
				    ((prescjb->rs_value.at_flags &
				      ATR_VFLAG_SET) == 0)) {

					if (prescjb == NULL)
						prescjb = add_resource_entry(jb,
									     prescdt->rs_defin);
					if (prescjb) {
						if (prescdt->rs_defin->rs_set(&prescjb->rs_value, &prescdt->rs_value, SET) == 0)
							prescjb->rs_value.at_flags |= (ATR_VFLAG_SET | ATR_VFLAG_DEFLT);
						jb->at_flags |= ATR_MOD_MCACHE;
					}
				}
			}
		}
	}
}

/**
 * @brief
 * 		set_resc_deflt - sets default resource limit values
 * 		on the object pointed to by input "pobj"
 *
 * @param[in]	pobj	-	job/reservation structure
 * @param[in]	objtype	-	type of object - job or reservation.
 * @param[in,out]	pque	-	Queue structure
 *
 * @return	int
 * @retval	0	: success
 * @retval	PBSE_Error	: Error Code.
 *
 * @par MT-safe:	No.
 */
int
set_resc_deflt(void *pobj, int objtype, pbs_queue *pque)
{
	static resc_resv *presv;
	job *pjob;
	attribute *pdest = NULL;
	attribute *psched = NULL;
	resource *presc;
	resource_def *prdefsl;
	resource_def *prdefpc;
	int rc;

	switch (objtype) {
		case JOB_OBJECT:
			pjob = (job *) pobj;
			assert(pjob != NULL);
			if (pque == NULL)
				pque = pjob->ji_qhdr;
			assert(pque != NULL);
			pdest = get_jattr(pjob, JOB_ATR_resource);
			psched = get_jattr(pjob, JOB_ATR_SchedSelect);
			break;

		case RESC_RESV_OBJECT:
			presv = (resc_resv *) pobj;
			assert(presv != NULL);
			pque = NULL;
			pdest = get_rattr(presv, RESV_ATR_resource);
			psched = get_rattr(presv, RESV_ATR_SchedSelect);
			break;

		default:
			break;
	}

	/* set defaults based on the Queue's resources_default */
	if (pque) {
		set_deflt_resc(pdest,
			       get_qattr(pque, QA_ATR_ResourceDefault), 1);
	}

	/* set defaults based on the Server' resources_default */
	set_deflt_resc(pdest, get_sattr(SVR_ATR_resource_deflt), 1);

	/* set defaults based on the Queue's resources_max */
	if (pque) {
		set_deflt_resc(pdest,
			       get_qattr(pque, QA_ATR_ResourceMax), 0);
	}

	/* set defaults based on the Server's resources_max */
	set_deflt_resc(pdest, get_sattr(SVR_ATR_ResourceMax), 0);

	/* if needed, set "select" and "place" from the other resources */

	prdefsl = &svr_resc_def[RESC_SELECT];
	presc = find_resc_entry(pdest, prdefsl);
	/* if not set, set select/place */
	if ((presc == NULL) || ((is_attr_set(&presc->rs_value)) == 0))
		if ((rc = set_select_and_place(objtype, pobj, pdest)) != 0)
			return rc;

	prdefpc = &svr_resc_def[RESC_PLACE];
	presc = find_resc_entry(pdest, prdefpc);
	/* if "place" still not set, force to "free" */
	if ((presc == NULL) || ((is_attr_set(&presc->rs_value)) == 0)) {
		presc = add_resource_entry(pdest, prdefpc);
		if (presc == NULL)
			return PBSE_SYSTEM;
		if (prdefpc->rs_decode(&presc->rs_value, NULL, "place", "free") == 0)
			if (objtype == JOB_OBJECT) /* only for jobs, set DEFLT */
				presc->rs_value.at_flags |= ATR_VFLAG_DEFLT;
	}

	/* now set up the Scheduler's version of select JOB_ATR_SchedSelect */
	presc = find_resc_entry(pdest, prdefsl);
	if (presc) {
		if ((rc = make_schedselect(pdest, presc, pque, psched)) == 0)
			rc = set_chunk_sum(psched, pdest);

	} else
		rc = PBSE_SYSTEM;
	return rc;
}

/**
 * @brief
 * 		eval_chkpnt - if the job's checkpoint attribute is "c=nnnn" and
 * 		nnnn is less than the queue' minimum checkpoint time, reset
 *		to the queue min time.
 *
 * @param[in]	jobckp	-	the job's checkpoint attribute
 * @param[in]	queckp	-	the queue's checkpoint attribute
 */

void
eval_chkpnt(job *pjob, attribute *queckp)
{
	char *pv = get_jattr_str(pjob, JOB_ATR_chkpnt);

	if (!is_jattr_set(pjob, JOB_ATR_chkpnt) || !is_attr_set(queckp))
		return; /* need do nothing */

	if ((*pv == 'c') || (*pv == 'w')) {
		int jobs;
		char queues[30];
		char ckt;

		ckt = *pv;
		if (*++pv == '=')
			pv++;
		jobs = atoi(pv);
		if (jobs < queckp->at_val.at_long) {
			sprintf(queues, "%c=%ld", ckt, queckp->at_val.at_long);
			set_jattr_generic(pjob, JOB_ATR_chkpnt, queues, NULL, INTERNAL);
		}
	}
}

#ifndef NDEBUG
/**
 * @brief
 * 		correct_ct - This is a work-around for an as yet unfound bug where
 *		the counts of jobs in each state sometimes (rarely) become wrong.
 *		When this happens, the count for a state can become negative.
 *		If this is detected (see above), this routine is called to reset
 *		all of the counts and log a message.
 *
 * @param[in]	pqj	-	pbs queue structure
 */

static void
correct_ct(pbs_queue *pqj)
{
	int i;
	char *pc;
	job *pjob;
	pbs_queue *pque;

	(void) sprintf(log_buffer, "Job state counts incorrect, server %d: ",
		       server.sv_qs.sv_numjobs);
	server.sv_qs.sv_numjobs = 0;
	for (i = 0; i < PBS_NUMJOBSTATE - 4; ++i) {
		pc = log_buffer + strlen(log_buffer);
		(void) sprintf(pc, "%d ", server.sv_jobstates[i]);
		server.sv_jobstates[i] = 0;
	}
	if (pqj) {
		pc = log_buffer + strlen(log_buffer);
		(void) sprintf(pc, "; queue %s %d: ", pqj->qu_qs.qu_name,
			       pqj->qu_numjobs);
		for (i = 0; i < PBS_NUMJOBSTATE - 4; ++i) {
			pc = log_buffer + strlen(log_buffer);
			(void) sprintf(pc, "%d ", pqj->qu_njstate[i]);
		}
	}
	log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_DEBUG,
		  msg_daemonname, log_buffer);

	for (pque = (pbs_queue *) GET_NEXT(svr_queues); pque;
	     pque = (pbs_queue *) GET_NEXT(pque->qu_link)) {
		pque->qu_numjobs = 0;
		for (i = 0; i < PBS_NUMJOBSTATE - 4; ++i)
			pque->qu_njstate[i] = 0;
	}

	for (pjob = (job *) GET_NEXT(svr_alljobs); pjob;
	     pjob = (job *) GET_NEXT(pjob->ji_alljobs)) {
		int state_num;

		state_num = get_job_state_num(pjob);
		server.sv_qs.sv_numjobs++;
		if (state_num != -1)
			server.sv_jobstates[state_num]++;
		if (pjob->ji_qhdr) {
			(pjob->ji_qhdr)->qu_numjobs++;
			if (state_num != -1)
				(pjob->ji_qhdr)->qu_njstate[state_num]++;
		}
	}
	return;
}
#endif /* NDEBUG */

/**
 * @brief
 * 		get_wall - get the value of "walltime" for the job
 *
 * @param[in]	jp	-	jp is a valid job pointer
 *
 * @return	int
 * @retval	-1	: function failed
 * @retval	walltime value	: function succeeded
 *
 * @note
 * 		Assumption: input jp is a valid job pointer
 */
int
get_wall(job *jp)
{
	resource_def *rscdef;
	resource *pres;

	rscdef = &svr_resc_def[RESC_WALLTIME];
	if (rscdef == 0)
		return (-1);
	pres = find_resc_entry(get_jattr(jp, JOB_ATR_resource), rscdef);
	if (pres == 0)
		return (-1);
	else if (!is_attr_set(&pres->rs_value))
		return (-1);
	else
		return pres->rs_value.at_val.at_long; /*wall time value*/
}

/**
 * @brief
 * 		get the amount of "walltime" resource USED for the job
 *
 * @param[in]	jp	- Pointer to a job
 *
 * @return	Success/Failure
 * @retval	-1 		- Function failed
 * @retval	walltime value	- Function succeeded
 * @note
 * 		Assumption: input jp is a valid job pointer
 *
 * @return	wall time value
 * @retval	-1	: failure
 *
 */
int
get_used_wall(job *jp)
{
	resource_def *rscdef;
	resource *pres;

	rscdef = &svr_resc_def[RESC_WALLTIME];
	if (rscdef == 0)
		return (-1);
	pres = find_resc_entry(get_jattr(jp, JOB_ATR_resc_used), rscdef);
	if (pres == 0)
		return (-1);
	else if (!is_attr_set(&pres->rs_value))
		return (-1);
	else
		return pres->rs_value.at_val.at_long; /*wall time value*/
}

/**
 * @brief
 * 		get_softwall - get the value of "soft_walltime" for the job
 *
 * @param[in]	jp	-	jp is a valid job pointer
 *
 * @return	int
 * @retval	-1	: function failed
 * @retval	soft walltime value	: function succeeded
 *
 * @note
 * 		Assumption: input jp is a valid job pointer
 */
int
get_softwall(job *jp)
{
	resource_def *rscdef;
	resource *pres;

	rscdef = &svr_resc_def[RESC_SOFT_WALLTIME];
	if (rscdef == 0)
		return (-1);
	pres = find_resc_entry(get_jattr(jp, JOB_ATR_resource), rscdef);
	if (pres == 0)
		return (-1);
	else if (!is_attr_set(&pres->rs_value))
		return (-1);
	else
		return pres->rs_value.at_val.at_long; /*wall time value*/
}

/**
 * @brief
 * 		get_cput - get the value of "cput" for the job
 *
 * @param[in]	jp	-	jp is a valid job pointer
 *
 * @return	int
 * @retval	-1	: function failed
 * @retval	cput value	: function succeeded
 *
 * @note
 * 		Assumption: input jp is a valid job pointer
 */
int
get_cput(job *jp)
{
	resource_def *rscdef;
	resource *pres;

	rscdef = &svr_resc_def[RESC_CPUT];
	if (rscdef == 0)
		return (-1);
	pres = find_resc_entry(get_jattr(jp, JOB_ATR_resource), rscdef);
	if (pres == 0)
		return (-1);
	else if (!is_attr_set(&pres->rs_value))
		return (-1);
	else
		return pres->rs_value.at_val.at_long; /*wall time value*/
}

/**
 * @brief
 * 		get the amount of "cput" resource USED for the job
 *
 * @param[in]	jp	- Pointer to a job
 *
 * @return	Success/Failure
 * @retval	-1 		- Function failed
 * @retval	walltime value	- Function succeeded
 * @note
 * 		Assumption: input jp is a valid job pointer
 *
 * @return	wall time value
 * @retval	-1	: failure
 *
 */
int
get_used_cput(job *jp)
{
	resource_def *rscdef;
	resource *pres;

	rscdef = &svr_resc_def[RESC_CPUT];
	if (rscdef == 0)
		return (-1);
	pres = find_resc_entry(get_jattr(jp, JOB_ATR_resc_used), rscdef);
	if (pres == 0)
		return (-1);
	else if (!is_attr_set(&pres->rs_value))
		return (-1);
	else
		return pres->rs_value.at_val.at_long; /*wall time value*/
}
/*-------------------------------------------------------------------------------
 Functions for establishing reservation related tasks
 --------------------------------------------------------------------------------*/
/**
 * @brief
 * 		Time4reply	-	reply when reservation becomes unconfirmed.
 *
 * @param[in,out]	ptask	-	work task structure which contains reservation structure.
 */
static void
Time4reply(struct work_task *ptask)
{
	resc_resv *presv = ptask->wt_parm1;

	if (presv->ri_brp) {
		char buf[512] = {0};
		if (presv->ri_qs.ri_state == RESV_UNCONFIRMED ||
		    presv->ri_qs.ri_state == RESV_BEING_ALTERED)
			snprintf(buf, sizeof(buf), "%s UNCONFIRMED", presv->ri_qs.ri_resvID);
		else if (presv->ri_qs.ri_state == RESV_CONFIRMED) {
			/*Remark: this part of the if is unlikely to happen*/
			/*        reply would happen in req_rescreserve()  */
			snprintf(buf, sizeof(buf), "%s CONFIRMED", presv->ri_qs.ri_resvID);
		}

		(void) reply_text(presv->ri_brp, PBSE_NONE, buf);
		presv->ri_brp = NULL;
	}
}

/**
 * @brief
 * 		Time4resv - function to execute when the "start time" for a
 *		CONFIRMED reservation finally arrives.
 *		At some prior point in time a task that's to be processed
 *		at "start time" is put onto  the "timed-tasks" list.  This
 *		task's function pointer field points at this function -
 *		see "gen_task_Time4resv" regards the task on the "timed_tasks"
 *		list.
 *		A pointer to the resc_resv structure is put in the task's
 *		"wt_parm1" void* field.
 * @note
 *		Note: function "dispatch_task" unlinks the task structure
 *	      from whatever list(s) it's on and it frees the memory
 *	      consumed by the work_task struct
 *
 * @param[in,out]	ptask	-	work task structure which contains reservation structure.
 *
 *	Returns   none
 */
static void
Time4resv(struct work_task *ptask)
{
	resc_resv *presv = ptask->wt_parm1;
	int pbs_ecode;
	int state, sub;

	/* cause to have issued to the qmgr subsystem
	 * a request to start the reservation's queue
	 * note: if presv is for a job reservation no
	 * request gets made
	 */

	pbs_ecode = change_enableORstart(presv, Q_CHNG_START, "True");
	if (!pbs_ecode) {
		/*
		 *this is really the line we want once the scheduler
		 *has the capability to say "begin this reservation"
		 */

		eval_resvState(presv, RESVSTATE_Time4resv, 0, &state, &sub);
		resv_setResvState(presv, state, sub);

		/*ok, time for the reservation to be running so adjust
		 *server's/queue's resource accounting to reflect that
		 *their "resources_assigned" values are now higher by the
		 *amounts requested by the reservation.  Also, set a flag to
		 *indicate that in the future resources have to be returned
		 *and, setup so that the scheduler gets notified
		 */
		if (!presv->resv_from_job)
			set_resc_assigned((void *) presv, 1, INCR);
		presv->ri_giveback = 1;

		resv_exclusive_handler(presv);
		notify_scheds_about_resv(SCH_SCHEDULE_JOBRESV, presv);

		/*notify the relevant persons that the reservation time has arrived*/
		if (presv->ri_qs.ri_tactive == time_now) {
			svr_mailownerResv(presv, MAIL_BEGIN, MAIL_NORMAL, "");
			account_resvstart(presv);

			/* make an artifical request so we can fire process hooks */
			struct batch_request *preq = alloc_br(PBS_BATCH_BeginResv);
			preq->rq_perm |= ATR_DFLAG_MGWR;
			strncpy(preq->rq_user, pbs_current_user, PBS_MAXUSER);
			strncpy(preq->rq_host, server_host, PBS_MAXHOSTNAME);
			strncpy(preq->rq_ind.rq_manager.rq_objname, presv->ri_qs.ri_resvID, PBS_MAXSVRRESVID);
			/* handle truncation warning */
			preq->rq_ind.rq_manager.rq_objname[PBS_MAXSVRJOBID] = '\0';

			char hook_msg[HOOK_MSG_SIZE] = {0};
			switch (process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt)) {
				case 0: /* explicit reject */
				case 1: /* no recreate request as there are only read permissions */
				case 2: /* no hook script executed - go ahead and accept event*/
					break;
				default:
					log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK, LOG_INFO, __func__,
						  "resv_begin event: accept req by default");
			}
			free_br(preq);
		}

		presv->resv_start_task = NULL;
		if ((ptask = set_task(WORK_Timed, time_now + 60,
				      Time4resv1, presv)) != 0) {

			ptask->wt_aux = 4; /*we will attempt up to 5 times*/

			/* set things so that the reservation going away causes */
			/* any "yet to be processed" work tasks also going away */

			append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);
		}
	}

	if (is_rattr_set(presv, RESV_ATR_del_idle_time)) {
		/* Catch the idle case where the reservation never has any jobs in it */
		set_idle_delete_task(presv);
	}
}

/**
 * @brief
 * 		Time4resv1 - function that's executed when a "n-th reminder"
 *		on the timed-tasks list gets dispatched.
 * @par
 *		This function checks if the reservation is still in state
 *		RESV_TIME_TO_RUN and, if it is, it sets into the global
 *		server variable "svr_do_schedule" an appropriate command
 *		for the scheduler, notes how many times it has done this
 *		and, from that, determines whether or not to put itself
 *		back on the task list.
 *
 * @param[in,out]	ptask	-	work task structure which contains reservation structure.
 *
 *	@return   none
 */
static void
Time4resv1(struct work_task *ptask)
{
	struct work_task *pwt;
	resc_resv *presv = ptask->wt_parm1;

	if (get_rattr_long(presv, RESV_ATR_state) != RESV_TIME_TO_RUN)
		return; /*no more reminders needed*/

	/*put on another reminder timed for 60 seconds in the future*/
	if (ptask->wt_aux > 0) {
		if ((pwt = set_task(WORK_Timed, time_now + 60,
				    Time4resv1, presv)) != 0) {

			pwt->wt_aux = ptask->wt_aux - 1;

			/* set things so that the job going away will result in */
			/* any "yet to be processed" work tasks also going away */

			append_link(&presv->ri_svrtask, &pwt->wt_linkobj, pwt);
		}
	}

#if 0
	For general reservation, where duration can be less than end - start
	time, the scheduler can(eventually) choose when to run the reservation
	so going to want something different than what we currently have for
		reservation jobs
#endif

	/* specify the scheduling command for the scheduler */
	set_scheduler_flag(SCH_SCHEDULE_JOBRESV, dflt_scheduler);
}

/**
 * @brief
 * 		Time4resvFinish - function that's to execute when "time_now" exceeds
 *		the ending time of the reservation
 *
 * @param[in,out]	ptask	-	work task structure which contains reservation structure.
 *
 *	Returns   none
 */
void
Time4resvFinish(struct work_task *ptask)
{
	resc_resv *presv = ptask->wt_parm1;
	struct batch_request *preq;

	/* If more than one occurrence then process the occurrence end. The sequence
	 * of events that are needed for the end of a standing reservation are:
	 *
	 * 1) change the queue state from started True to False
	 * 2) Delete all Running Jobs and Keep Queued Jobs
	 * 3) Once all Obits are received (see running_jobs_count):
	 *    3.a) Determine if occurrences were missed
	 *    3.b) Add the next occurrence start and end event on the work task
	 */
	presv->resv_end_task = NULL;
	if (get_rattr_long(presv, RESV_ATR_resv_count) > 1) {
		int ridx = get_rattr_long(presv, RESV_ATR_resv_idx);
		int rcount = get_rattr_long(presv, RESV_ATR_resv_count);

		DBPRT(("reached end of occurrence %d/%d\n", ridx, rcount))
		log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID,
				       "reached end of occurrence %d/%d", ridx, rcount);

		/* When recovering past the last occurrence the standing reservation is purged
		 * in a manner similar to an advance reservation
		 */
		if (ridx < rcount) {
			/*
			 * Invoke the reservation end hook for every occurrence
			 */
			struct batch_request *newreq;
			newreq = alloc_br(PBS_BATCH_ResvOccurEnd);
			if (newreq != NULL) {
				newreq->rq_perm |= ATR_DFLAG_MGWR;
				strcpy(newreq->rq_user, pbs_current_user);
				strcpy(newreq->rq_host, server_host);
				strcpy(newreq->rq_ind.rq_manager.rq_objname, presv->ri_qs.ri_resvID);
				if (issue_Drequest(PBS_LOCAL_CONNECTION, newreq, resvFinishReply, NULL, 0) == -1) {
					free_br(newreq);
				}
				tickle_for_reply();
			}
			/* 1) Change queue state from started True to False and change
			 * state of the reservation queue
			 */
			change_enableORstart(presv, Q_CHNG_START, "FALSE");
			resv_setResvState(presv, RESV_DELETING_JOBS, presv->ri_qs.ri_substate);

			/* 2) Issue delete messages to jobs in running state and keep jobs in
			 * Queued state. Server periodically monitors the reservation queue
			 * to determine if all jobs in run state have been purged.
			 */
			delete_occurrence_jobs(presv);

			/* Done processing the current occurrence. If MOM locks up during cleanup
			 * or stage out for a duration that exceeds the time of the last occurrence
			 * then this handler will be invoked again with an occurrence index (ridx)
			 * equal to the last occurrence (rcount) and be processed as a DeleteReservation
			 * event in the next block.
			 */
			return;
		}
	}
	/* If an advance reservation or last occurrence of a standing reservation
	 * construct a "deleteResv" batch request for the dummy connection
	 * PBS_LOCAL_CONNECTION; Issue that request via "issue_Drequest".
	 * "issue_Drequest" will notice this request is to be handled here
	 * and call upon "dispatch_request", the mechanism for dispatching of
	 * incomming requests.  "issue_Drequest" is passed a function that's
	 * to deal with the reply to the request when it arrives.  A task
	 * having this reply handling function (pointer) is placed on the
	 * global list, "task_list_event".  Request dispatching proceeds
	 * as it normally does and invokes the function "reply_send", which
	 * is to send back a reply to the batch request.  In this instance,
	 * (response recipent local) reply_send  moves the task of dealing
	 * with the "reply to request" on to "task_list_immediate", so it
	 * can get recognized the next time function "next_task" in the
	 * server's main loop gets invoked
	 */
	if ((preq = alloc_br(PBS_BATCH_DeleteResv)) != 0) {
		/*setup field so don't fail a check on perm*/
		preq->rq_perm |= ATR_DFLAG_MGWR;

		strcpy(preq->rq_user, pbs_current_user);
		strcpy(preq->rq_host, server_host);
		strcpy(preq->rq_ind.rq_manager.rq_objname,
		       presv->ri_qs.ri_resvID);

		/*notify relevant parties that the reservation's
		 *ending time has arrived and reservation is being deleted
		 */
		svr_mailownerResv(presv, MAIL_END, MAIL_NORMAL, "");

		set_last_used_time_node(presv, 1);
		(void) issue_Drequest(PBS_LOCAL_CONNECTION, preq,
				      resvFinishReply, NULL, 0);
		tickle_for_reply();
	}
}

/**
 * @brief
 * 		If processing a Standing Reservation
 * 		1) Get the occurrence index and the total number of occurrences,
 *    		if this is the last occurrence, an event to purge reservation is added to
 *    		the work task.
 * 		2) If not last, then set the next occurrence's start and end time and
 *    		appropriate execvnodes and add to server's work task
 * 		3) Update state and save reservation
 * @par
 * 		This function is also entered upon reservation recovery to handle skipped
 * 		occurrences.
 *
 * @param[in]	presv	-	Standing Reservation
 */
static void
Time4occurrenceFinish(resc_resv *presv)
{
	time_t newend;
	time_t newstart;
	int state = 0;
	int sub = 0;
	int rc = 0;
	int rcount_adjusted = 0;
	char *execvnodes_orig = NULL;
	char *execvnodes = NULL;
	char *newxc = NULL;
	char **short_xc = NULL;
	char **tofree = NULL;
	time_t dtstart;
	time_t dtend;
	time_t next;
	time_t now;
	struct work_task *ptask = NULL;
	pbsnode_list_t *pl = NULL;
	char start_time[9] = {0}; /* 9 = sizeof("%H:%M:%S")[=8] + 1('\0') */
	resource_def *rscdef = NULL;
	resource *prsc = NULL;
	attribute atemp = {0};
	int j = 2;
	int occurrence_ended_early = 0;
	int ridx = get_rattr_long(presv, RESV_ATR_resv_idx);
	int rcount = get_rattr_long(presv, RESV_ATR_resv_count);
	char *rrule = get_rattr_str(presv, RESV_ATR_resv_rrule);
	char *tz = get_rattr_str(presv, RESV_ATR_resv_timezone);

	/* the next occurrence returned by get_occurrence is counted from the current
	 * one which is at index 1. */

	/* If the reservation was altered,
	 * use the stored values in RESV_ATR_standing_revert.
	 */
	if (is_rattr_set(presv, RESV_ATR_standing_revert)) {
		resource *resc, *resc2;
		attribute *stnd_revert = get_rattr(presv, RESV_ATR_standing_revert);
		attribute *resc_attr = get_rattr(presv, RESV_ATR_resource);

		resc = find_resc_entry(stnd_revert, &svr_resc_def[RESC_START_TIME]);
		dtstart = resc->rs_value.at_val.at_long;

		resc = find_resc_entry(stnd_revert, &svr_resc_def[RESC_WALLTIME]);
		set_rattr_l_slim(presv, RESV_ATR_duration, resc->rs_value.at_val.at_long, SET);
		presv->ri_qs.ri_duration = resc->rs_value.at_val.at_long;

		resc = find_resc_entry(resc_attr, &svr_resc_def[RESC_SELECT]);
		resc2 = find_resc_entry(stnd_revert, &svr_resc_def[RESC_SELECT]);
		free(resc->rs_value.at_val.at_str);
		resc->rs_value.at_val.at_str = strdup(resc2->rs_value.at_val.at_str);
		post_attr_set(resc_attr);
		make_schedselect(resc_attr, resc, NULL, get_rattr(presv, RESV_ATR_SchedSelect));
		set_chunk_sum(&resc->rs_value, resc_attr);
	} else
		dtstart = get_rattr_long(presv, RESV_ATR_start);

	dtend = get_rattr_long(presv, RESV_ATR_end);
	next = dtstart;
	now = time(NULL);

	/* Add next occurrence and account for missed occurrences.
	 * There are three ways we can get into this function:
	 * 1) When the server is initializating.  We need to account for all occurrences we have missed.
	 * 2) The end of an occurrence.  We need to move onto the next.
	 * 3) If an occurrence ends early.  We need to move onto the next.
	 */
	if (presv->ri_qs.ri_substate == RESV_RUNNING && next < now)
		occurrence_ended_early = 1;
	while (occurrence_ended_early || dtend <= now) {
		/* We may loop and skip several occurrences for different reasons,
		 * if an occurrence ended early, it can only be the one we are in
		 */
		occurrence_ended_early = 0;
		/* get occurrence that is "j" numbers away from dtstart. */
		next = get_occurrence(rrule, dtstart, tz, j);
		dtend = next + presv->ri_qs.ri_duration;

		/* Index of next occurrence from dtstart */
		j++;

		/* Log information notifying of missed occurrences. An occurrence is
		 * "missed" either if it was interrupted, in which case it never was
		 * instructed to "give back" its allocated resources, or if the server
		 * was down for an extended period of time extending over a number of
		 * occurrences.
		 * The first time around j has the value 2 and is incremented to 3 to
		 * account for the next occurrence. Any increments after that characterize
		 * missed occurrences that are noted in the log file. */
		if (j > 3 || presv->ri_giveback == 0) {
			if (strftime(start_time, sizeof(start_time),
				     "%H:%M:%S", localtime(&dtstart))) {
				sprintf(log_buffer,
					"reservation occurrence %d/%d "
					"scheduled at %s was skipped because "
					"its end time is in the past",
					ridx, rcount, start_time);
			} else {
				sprintf(log_buffer,
					"reservation occurrence %d/%d was "
					"skipped because its end time is in "
					"the past",
					ridx, rcount);
			}
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV,
				  LOG_NOTICE, presv->ri_qs.ri_resvID,
				  log_buffer);
		}

		/* The reservation index is incremented */
		ridx++;

		/* If skipped past the last occurrence then return to the
		 * caller which will handle issuing a reservation delete
		 * message
		 */
		if (ridx > rcount) {
			set_rattr_l_slim(presv, RESV_ATR_resv_idx, rcount, SET);

			if ((ptask = set_task(WORK_Immed, 0, Time4resvFinish, presv)) != 0)
				append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);

			return;
		}

		DBPRT(("stdg_resv: next occurrence start = %s", ctime(&next)))
		DBPRT(("stdg_resv: next occurrence end   = %s", ctime(&dtend)))
	}
	if (is_rattr_set(presv, RESV_ATR_resv_execvnodes))
		execvnodes_orig = get_rattr_str(presv, RESV_ATR_resv_execvnodes);
	if (execvnodes_orig != NULL) {
		DBPRT(("stdg_resv: execvnodes sequence   = %s\n", execvnodes_orig))
		execvnodes = strdup(execvnodes_orig);
	} else {
		DBPRT(("stdg_resv: execvnodes sequence missing"))
		;
	}
	short_xc = (char **) unroll_execvnode_seq(execvnodes, &tofree);

	/* when a reservation is reconfirmed, the 'count' of occurrences may differ
	 * from the original 'count', we need to adjust for the actual remaining
	 * count
	 */
	rcount_adjusted = rcount - get_execvnodes_count(execvnodes);

	/* The reservation index starts at 1 but the short_xc array at 0. Occurrence 1
	 * is therefore given by array element 0.
	 */
	if (ridx - rcount_adjusted >= 1 && short_xc != NULL)
		newxc = strdup(short_xc[ridx - rcount_adjusted - 1]);
	else {
		newxc = NULL;
		log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID,
		           "%s: attempt to find vnodes for for occurence %d failed; using empty set",
		           __func__, ridx);
	}

	/* clean up helper variables */
	free(short_xc);
	free(execvnodes);
	free_execvnode_seq(tofree);

	/* Set reservation state to finished. Will re-evaluate
	 * the state for the next occurrence later in the function.
	 */
	resv_setResvState(presv, RESV_FINISHED, RESV_FINISHED);

	/* Decrement resources assigned */
	if (presv->ri_giveback == 1) {
		set_resc_assigned((void *) presv, 1, DECR);
		presv->ri_giveback = 0;
	}

	/* Reservation Nodes are freed and a -possibly- new set assigned */
	free_resvNodes(presv);
	/* set ri_vnodes_down to 0 because the previous occurrences downed nodes might
	 * not exist in the following occurrence.  The new occurrence's ri_vnodes_down
	 * will be set properly in set_nodes()
	 */
	presv->ri_vnodes_down = 0;

	/* Set the new start time, end time, and occurrence index */
	newstart = next;
	newend = (time_t)(newstart + presv->ri_qs.ri_duration);

	set_rattr_l_slim(presv, RESV_ATR_start, newstart, SET);
	presv->ri_qs.ri_stime = newstart;

	set_rattr_l_slim(presv, RESV_ATR_end, newend, SET);
	presv->ri_qs.ri_etime = newend;

	set_rattr_l_slim(presv, RESV_ATR_resv_idx, ridx, SET);
	set_rattr_l_slim(presv, RESV_ATR_duration, presv->ri_qs.ri_duration, SET);

	rscdef = &svr_resc_def[RESC_WALLTIME];
	prsc = find_resc_entry(get_rattr(presv, RESV_ATR_resource), rscdef);
	atemp.at_flags = ATR_VFLAG_SET;
	atemp.at_type = ATR_TYPE_LONG;
	atemp.at_val.at_long = presv->ri_qs.ri_duration;
	rscdef->rs_set(&prsc->rs_value, &atemp, SET);
	post_attr_set(get_rattr(presv, RESV_ATR_resource));

	/* Assign the allocated resources to the reservation
	 * and the reservation to the associated vnodes
	 */
	rc = assign_resv_resc(presv, newxc, FALSE);
	free(newxc);

	if (rc != PBSE_NONE) {
		sprintf(log_buffer, "problem assigning resource to reservation occurrence (%d)", rc);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID, log_buffer);
		resv_setResvState(presv, RESV_DEGRADED, RESV_DEGRADED);
		/* avoid skipping a reconfirmation */
		presv->ri_degraded_time = newstart;
		force_resv_retry(presv, determine_resv_retry(presv));
		resv_save_db(presv);
		return;
	}

	/* place "Time4resv" task on "task_list_timed" */
	if ((rc = gen_task_Time4resv(presv)) != 0) {
		sprintf(log_buffer, "problem generating task Time for occurrence (%d)", rc);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID, log_buffer);
		resv_setResvState(presv, RESV_DEGRADED, RESV_DEGRADED);
		/* avoid skipping a reconfirmation */
		presv->ri_degraded_time = newstart;
		force_resv_retry(presv, determine_resv_retry(presv));
		resv_save_db(presv);
		return;
	}
	/* add task to handle the end of the next occurrence */
	if ((rc = gen_task_EndResvWindow(presv)) != 0) {
		(void) resv_purge(presv);
		sprintf(log_buffer, "problem generating reservation end task for occurrence (%d); "
		        "purging reservation", rc);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID, log_buffer);
		return;
	}

	/* compute new values for state and substate */
	eval_resvState(presv, RESVSTATE_gen_task_Time4resv, 1, &state, &sub);

	/*
	 * Walk the nodes list associated to this reservation to determine if any
	 * node is unavailable. If so, mark this next occurrence as degraded
	 */
	for (pl = presv->ri_pbsnode_list; pl != NULL; pl = pl->next) {
		if ((pl->vnode->nd_state & (INUSE_OFFLINE | INUSE_OFFLINE_BY_MOM | INUSE_DOWN | INUSE_UNKNOWN)) != 0) {
			DBPRT(("vnode %s unavailable\n", pl->vnode->nd_name))
			state = RESV_DEGRADED;
			sub = RESV_DEGRADED;

			presv->ri_degraded_time = newstart;
			break;
		}
	}

	/* All nodes of this occurrence are up, mark reservation confirmed */
	if (pl == NULL)
		state = RESV_CONFIRMED;

	/* If the reservation already has a retry time set then its substate is
	 * marked degraded.  If all degraded occurrences are in the past, the
	 * scheduler will fix this on the next retry attempt.
	 */
	if (is_rattr_set(presv, RESV_ATR_retry)) {
		sub = RESV_DEGRADED;
		if (get_rattr_long(presv, RESV_ATR_retry) > 0 && get_rattr_long(presv, RESV_ATR_retry) <= time_now)
			set_resv_retry(presv, time_now + 120);
	}

	if (sub == RESV_DEGRADED) {
		DBPRT(("degraded_time of %s is %s", presv->ri_qs.ri_resvID, ctime(&presv->ri_degraded_time)))
	}

	/* Set the reservation state and substate */
	resv_setResvState(presv, state, sub);

	resv_save_db(presv);
}

/**
 * @brief
 * 		Handler to check on number of remaining jobs in RUNNING/EXITING state.
 * 		The jobs asynchronously update their state as they get purged from the system.
 * 		Once all jobs have been purged, the process for adding the next occurrence is
 * 		triggered.
 *
 * @param[in,out]	ptask	-	work task structure which contains reservation
 */
static void
running_jobs_count(struct work_task *ptask)
{
	resc_resv *presv;
	int rj;
	presv = (resc_resv *) ptask->wt_parm1;

	/* Number of remaining jobs in RUNNING/EXITING state */
	rj = presv->ri_qp->qu_njstate[JOB_STATE_RUNNING] + presv->ri_qp->qu_njstate[JOB_STATE_EXITING];

	if (rj == 0)
		/* If none are left then process the next occurrence */
		Time4occurrenceFinish(presv);
	else
		/* If some are left then issue another set of requests to clean up */
		delete_occurrence_jobs(presv);
}

/**
 * @brief
 * 		Delete all Running jobs associated to a standing reservation queue.
 * 		The queued jobs will remain queued
 *
 * @param[in,out]	presv	-	The reservation to obtain queue and jobs from
 *
 */
static void
delete_occurrence_jobs(resc_resv *presv)
{
	job *pjob, *pnxj;
	struct work_task *ptask;

	pjob = (job *) GET_NEXT(presv->ri_qp->qu_jobs);
	while (pjob != NULL) {
		/* Get the next job from the queue before the job is unlinked as a result
		 * of job_abt
		 */
		pnxj = (job *) GET_NEXT(pjob->ji_jobque);
		if (check_job_state(pjob, JOB_STATE_LTR_RUNNING) && !check_job_substate(pjob, JOB_SUBSTATE_ABORT))
			(void) job_abt(pjob, "Deleting running job at end of reservation occurrence");

		pjob = pnxj;
	}
	/* Check if all running jobs have been cleaned up every 5 seconds.
	 * Link the work task into the server's reservation info work tasks such that
	 * the work task gets deleted when the reservation is deleted.
	 * This can happen if a pbs_rdel is invoked on the reservation while it is
	 * processing the deletion of running jobs. */
	if ((ptask = set_task(WORK_Timed, time_now + 5, running_jobs_count, presv)) != 0)
		append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);
}

/**
 * @brif
 * 		Time4_term - function that's to execute when server wants to delete
 *		a reservation, e.g. when the server needs to delete the reservation
 *		part of a "reservation job".
 *
 * @param[in]	ptask	-	The reservation to be deleted.
 *
 *	@return	none
 */
void
Time4_term(struct work_task *ptask)
{
	resc_resv *presv = ptask->wt_parm1;
	struct batch_request *preq;

	/*construct a "deleteResv" batch request for the dummy connection
	 *PBS_LOCAL_CONNECTION; Issue that request via "issue_Drequest".
	 *"issue_Drequest" will notice this request is to be handled here
	 *and call upon "dispatch_request", the mechanism for dispatching of
	 *incomming requests.  "issue_Drequest" is passed a function that's
	 *to deal with the reply to the request when it arrives.  A task
	 *having this reply handling function (pointer) is placed on the
	 *global list, "task_list_event".  Request dispatching proceeds
	 *as it normally does and invokes the function "reply_send", which
	 *is to send back a reply to the batch request.  In this instance,
	 *(response recipent local) reply_send  moves the task of dealing
	 *with the "reply to request" on to "task_list_immediate", so it
	 *can get recognized the next time function "next_task" in the
	 *server's main loop gets invoked
	 */

	if ((preq = alloc_br(PBS_BATCH_DeleteResv)) != 0) {
		/*setup field so don't fail a check on perm*/
		preq->rq_perm |= ATR_DFLAG_MGWR;

		strcpy(preq->rq_user, pbs_current_user);
		strcpy(preq->rq_host, server_host);
		strcpy(preq->rq_ind.rq_manager.rq_objname,
		       presv->ri_qs.ri_resvID);

		(void) issue_Drequest(PBS_LOCAL_CONNECTION, preq,
				      resvFinishReply, NULL, 0);

		/*notify relevant parties that the reservation's
		 *ending time has arrived and reservation is being deleted
		 */
		svr_mailownerResv(presv, MAIL_END, MAIL_NORMAL, "");

		tickle_for_reply();
		set_last_used_time_node(presv, 1);
	}
}

/**
 * @brief
 * 		Time4_I_term - function that's to execute when an "interactive"
 *		reservation is submitted with a ngative "I" value.  If the state
 *		on the reservation is UNCONFIRMED a delete request is generated.
 *		If the state is not UNCONFIRMED, this task function does nothing.
 *
 * @param[in]	ptask	-	The reservation submitted with a negative "I" value.
 *
 *	@return	none
 */
void
Time4_I_term(struct work_task *ptask)
{
	resc_resv *presv = ptask->wt_parm1;
	struct batch_request *preq;

	if (presv->ri_qs.ri_state != RESV_UNCONFIRMED)
		return;

	/*construct a "deleteResv" batch request for the dummy connection
	 *PBS_LOCAL_CONNECTION; Issue that request via "issue_Drequest".
	 *"issue_Drequest" will notice this request is to be handled here
	 *and call upon "dispatch_request", the mechanism for dispatching of
	 *incomming requests.  "issue_Drequest" is passed a function that's
	 *to deal with the reply to the request when it arrives.  A task
	 *having this reply handling function (pointer) is placed on the
	 *global list, "task_list_event".  Request dispatching proceeds
	 *as it normally does and invokes the function "reply_send", which
	 *is to send back a reply to the batch request.  In this instance,
	 *(response recipent local) reply_send  moves the task of dealing
	 *with the "reply to request" on to "task_list_immediate", so it
	 *can get recognized the next time function "next_task" in the
	 *server's main loop gets invoked
	 */

	if ((preq = alloc_br(PBS_BATCH_DeleteResv)) != 0) {
		/*setup field so don't fail a check on perm*/
		preq->rq_perm |= ATR_DFLAG_MGWR;

		strcpy(preq->rq_user, pbs_current_user);
		strcpy(preq->rq_host, server_host);
		strcpy(preq->rq_ind.rq_manager.rq_objname,
		       presv->ri_qs.ri_resvID);

		(void) issue_Drequest(PBS_LOCAL_CONNECTION, preq,
				      resvFinishReply, NULL, 0);

		/*notify relevant parties that the reservation's
		 *ending time has arrived and reservation is being deleted
		 */
		svr_mailownerResv(presv, MAIL_END, MAIL_NORMAL, "");

		tickle_for_reply();
	}
}

/**
 * @brief
 * 		resvFinishReply - function gets executed to dispatch the reply
 *		to an internally generated request to delete a reservation
 *		whose time has passed (it's FINISHED).
 *		Here, we just delete the batch_request structure.  If the
 *		request bombs for some internal reason, mail should go back
 *		to those on the reservation's mail list as well as an error
 *		being entered into the server's logging files.
 * @param[in,out]	ptask	-	wt_param1 holds the address of the batch_request structure,
 *								which needs to be freed
 *
 * @return	none
 */
static void
resvFinishReply(struct work_task *ptask)
{
	if (ptask->wt_event == PBS_LOCAL_CONNECTION) {
		/*we passed the little sanity check so do the free*/
		free_br((struct batch_request *) ptask->wt_parm1);
	}
}

/**
 * @brief
 * 		eval_resvState - does an evaluation to determine
 * 		what should be set for state and substate values on
 * 		the reservation in question.
 * @par
 * 		Evaluation is based on current time, current state
 * 		and substate, pointer to some relevant function and
 * 		possibly it's success or failure return value
 *
 * @param[in]	presv	-	reservation in question.
 * @param[in]	s	-	identifies the caller
 * @param[in]	relVal	-	relVal can have following possible values 0,1,2.
 * @param[out]	pstate	-	internal copy of state
 * @param[out]	psub	-	substate of resv state
 */
void
eval_resvState(resc_resv *presv, enum resvState_discrim s, int relVal, int *pstate, int *psub)
{
	int is_running = 0;

	*pstate = presv->ri_qs.ri_state;
	*psub = presv->ri_qs.ri_substate;

	if (time_now >= presv->ri_qs.ri_stime && time_now < presv->ri_qs.ri_etime)
		is_running = 1;

	if (s == RESVSTATE_gen_task_Time4resv) {
		/* from a successful confirmation */
		if (relVal == 0) {
			if (*psub == RESV_DEGRADED) {
				if (is_running) {
					*pstate = RESV_RUNNING;
					*psub = RESV_RUNNING;
				} else {
					*pstate = RESV_CONFIRMED;
					*psub = RESV_CONFIRMED;
				}
			} else {
				if (*pstate == RESV_BEING_ALTERED) {
					if (is_running) {
						*pstate = RESV_RUNNING;
						*psub = RESV_RUNNING;

					} else {
						/* Altering a reservation after its start time */
						*pstate = RESV_CONFIRMED;
						*psub = RESV_CONFIRMED;
					}
				} else if (presv->ri_qs.ri_etime > time_now) {
					*pstate = RESV_CONFIRMED;
					*psub = RESV_CONFIRMED;
				}
			}
		} else {
			/* End of standing occurrence */
			if (*psub == RESV_DEGRADED)
				*pstate = RESV_DEGRADED;
			else {
				*pstate = RESV_CONFIRMED;
				*psub = RESV_CONFIRMED;
			}
		}
	} else if (s == RESVSTATE_Time4resv) {
		if (relVal == 0) {
			if (presv->ri_qs.ri_stime <= time_now &&
			    time_now <= presv->ri_qs.ri_etime) {
				if (*pstate == RESV_DEGRADED || *psub == RESV_DEGRADED)
					*psub = RESV_DEGRADED;
				else
					*psub = RESV_RUNNING;
				*pstate = RESV_RUNNING;
				if (presv->ri_qs.ri_tactive < get_rattr_long(presv, RESV_ATR_start))
					/* Assigning time_now to indicate when reservation become active
 					 *to help in fend off accounting on server restart
					 */
					presv->ri_qs.ri_tactive = time_now;
			}
		}
	} else if (s == RESVSTATE_req_deleteReservation) {
		if (relVal == 0) {

			*pstate = RESV_BEING_DELETED;
			*psub = RESV_BEING_DELETED;
		} else if (relVal == 1) {

			*pstate = RESV_BEING_DELETED;
			*psub = RESV_DELETING_JOBS;
		} else if (relVal == 2) {

			*pstate = RESV_DELETED;
			*psub = RESV_DELETED;
		}
	} else if (s == RESVSTATE_add_resc_resv_to_job) {
		*pstate = RESV_UNCONFIRMED;
		*psub = RESV_UNCONFIRMED;
	} else if (s == RESVSTATE_is_resv_window_in_future) {
		if (presv->ri_qs.ri_etime < time_now) {
			*pstate = RESV_FINISHED;
			*psub = RESV_FINISHED;
		}
	} else if (s == RESVSTATE_req_resvSub) {
		*pstate = RESV_UNCONFIRMED;
		*psub = RESV_UNCONFIRMED;
	} else if (s == RESVSTATE_alter_failed) {
		if (presv->ri_alter.ra_state) {
			*pstate = presv->ri_alter.ra_state;
		} else if (*psub == RESV_IN_CONFLICT || *psub == RESV_DEGRADED) {
			if (is_running) {
				*pstate = RESV_RUNNING;
			} else {
				*pstate = RESV_DEGRADED;
			}
		} else if (is_running) {
			*pstate = RESV_RUNNING;
			*psub = RESV_RUNNING;
		} else if (is_rattr_set(presv, RESV_ATR_resv_nodes)) {
			*pstate = RESV_CONFIRMED;
			*psub = RESV_CONFIRMED;
		} else {
			*pstate = RESV_UNCONFIRMED;
			*psub = RESV_UNCONFIRMED;
		}
	}
}

/**
 * @brief
 * 		resv_setResvState - function modifies the state, substate
 * 		and related fields of the resc_resv object and updates
 * 		the local backing store for the object as appropriate -
 * 		either a full save of the structure or a quick save of the
 * 		structure
 *
 * @param[out]	presv	-	resc_resv object
 * @param[in]	state	-	internal copy of state
 * @param[in]	sub	-	substate of resv state
 */
void
resv_setResvState(resc_resv *presv, int state, int sub)
{
	if ((presv->ri_qs.ri_state == state) &&
	    (presv->ri_qs.ri_substate == sub))
		return;

	DBPRT(("resv_name=%s, o_state=%d, o_sub=%d, state=%d, sub=%d",
	       presv->ri_qs.ri_resvID, presv->ri_qs.ri_state, presv->ri_qs.ri_substate,
	       state, sub))

	presv->ri_qs.ri_state = state;
	presv->ri_qs.ri_substate = sub;

	set_rattr_l_slim(presv, RESV_ATR_state, state, SET);
	set_rattr_l_slim(presv, RESV_ATR_substate, sub, SET);

	resv_save_db(presv);
	return;
}

/**
 * @brief
 * 		Set a scheduler flag to initiate a scheduling cycle when a reservation is
 * 		in degraded mode and needs to have nodes replaced.
 *
 * @param[in]	ptask	-	work task structure which contains reservation.
 * @param[in]	forced 	- 	whether to neuter scheduler call if ri_vnodes_down is 0
 */
void
resv_retry_handler2(struct work_task *ptask, int forced)
{
	resc_resv *presv = ptask->wt_parm1;

	if (!presv)
		return;

	/* If all nodes associated to this reservation are back to available (due to
	 * a change in the system setup or a recovery) then no action is required as
	 * the handler vnode_available takes care of updating the reservation state
	 */
	if (!forced && presv->ri_vnodes_down == 0)
		return;

	/* Notify scheduler that a reservation needs to be reconfirmed */
	notify_scheds_about_resv(SCH_SCHEDULE_RESV_RECONFIRM, presv);
}

/**
 * @brief
 * 		Set a scheduler flag to initiate a scheduling cycle when a reservation is
 * 		in degraded mode and needs to have nodes replaced.
 *		this version will only kick scheduler if ri_vnodes_down > 0
 *
 * @param[in]	ptask	-	work task structure which contains reservation.
 */
void
resv_retry_handler(struct work_task *ptask)
{
	resv_retry_handler2(ptask, 0);
}

/**
 * @brief
 * 		Set a scheduler flag to initiate a scheduling cycle when a reservation is
 * 		in degraded mode and needs to have nodes replaced.
 *		this version will also kick scheduler if ri_vnodes_down is 0
 *
 * @param[in]	ptask	-	work task structure which contains reservation.
 */
void
resv_retry_handler_forced(struct work_task *ptask)
{
	resv_retry_handler2(ptask, 1);
}

/**
 * @brief
 * 		chk_resvReq_viable - checks if scheduler's request to reserve is viable
 *
 * @param[in]	presv	-	pointer to reservation.
 *
 * @return	int
 * @retval	0	: no problems occur
 * @retval	error code	: if problem detected
 */
int
chk_resvReq_viable(resc_resv *presv)
{
	long state = get_rattr_long(presv, RESV_ATR_state);
	int rc;

	if (state == RESV_NONE)
		return PBSE_INTERNAL;

	rc = 0; /*assume no problems occur*/

	if (state == RESV_FINISHED || state == RESV_DELETED || state == RESV_BEING_DELETED)
		rc = PBSE_INTERNAL;

	return rc;
}

/**
 * @brief
 * 		gen_task_Time4resv - creates a work_task structure and puts it onto
 * 		the "WORK_Timed" work_task list at the appropriate (time sequential)
 * 		location.
 * @par
 * 		The assumption here is that gen_task_Time4resv () won't be called
 * 		if the "rescreserve" is not viable - see chk_resvReq_viable ()
 *
 * @param[in]	presv	-	pointer to reservation.
 *
 * @return	int
 * @retval	0	: work_task created and put on timed task list
 * @retval	error code	: if problem was detected
 */
int
gen_task_Time4resv(resc_resv *presv)
{
	struct work_task *ptask;
	int rc;
	long startTime;

	if (get_rattr_long(presv, RESV_ATR_state) == RESV_NONE)
		return PBSE_INTERNAL;

	if (presv->resv_start_task)
		delete_task(presv->resv_start_task);
	presv->resv_start_task = NULL;
	startTime = get_rattr_long(presv, RESV_ATR_start);
	if ((ptask = set_task(WORK_Timed, startTime,
			      Time4resv, presv)) != 0) {
		/* set things so that the reservation going away causes
		 * any "yet to be processed" work tasks to also go away
		 */

		append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);

		/* cause to have issued to the qmgr subsystem
		 * a request to enable the reservation's queue
		 */
		rc = change_enableORstart(presv, Q_CHNG_ENABLE, "True");
		presv->resv_start_task = ptask;

	} else
		rc = PBSE_SYSTEM;

	return (rc);
}

/**
 * @brief
 * 		gen_task_EndResvWindow - creates a work_task for deleting a reservation
 * 		whose window has expired and puts it on the "WORK_Timed" work_task list
 * 		at the appropriate (time sequential) location.
 *
 * @param[in]	presv	-	pointer to reservation.
 *
 * @return	int
 * @retval	0	: task was created and put on timed task list
 * @retval	error code	: if a problem was detected
 */
int
gen_task_EndResvWindow(resc_resv *presv)
{
	int rc;
	long fromNow;

	if (presv == NULL)
		return (PBSE_INTERNAL);

	fromNow = presv->ri_qs.ri_etime - (long) time_now;
	if (is_sattr_set(SVR_ATR_resv_post_processing))
		fromNow -= get_sattr_long(SVR_ATR_resv_post_processing);
	rc = gen_future_deleteResv(presv, fromNow);
	return (rc);
}

/**
 * @brief
 * 		gen_deleteResv - creates a work_task for deleting a reservation
 * 		Argument "fromNow" needs to be a non-negative value.  It's the number of
 * 		seconds into the future (measured from from global variable "time_now")
 * 		that this task is to be activated.
 *
 * @param[in,out]	presv	-	pointer to reservation.
 * @param[in]	fromNow	-	It's the number of seconds into the future that this task is to be activated.
 *
 * @return	int
 * @retval	0	: task was created and put on timed task list
 * @retval	error code	: if a problem was detected
 */
int
gen_deleteResv(resc_resv *presv, long fromNow)
{
	struct work_task *ptask;
	int rc = 0; /*assume success*/
	long event = (long) time_now + fromNow;

	if ((ptask = set_task(WORK_Timed, event,
			      Time4_term, presv)) != 0) {

		/* set things so that the reservation going away results in
		 * any "yet to be processed" work tasks also going away
		 * and set up to notify Scheduler of new reservation-job
		 */

		append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);
		presv->ri_futuredr = 1;
	} else
		rc = PBSE_SYSTEM;

	return (rc);
}

/**
 * @brief
 * 		gen_negI_deleteResv - creates a work_task for deleting a reservation
 * 		if the reservation was submitted with a negative value for "I" attribute -
 * 		meaning: willing to wait "n" seconds, but after that forget it.
 * 		Argument "fromNow" needs to be a non-negative value.  It's the number of
 * 		seconds into the future (measured from from global variable "time_now")
 * 		that this task is to be activated.
 *
 * @param[in,out]	presv	-	pointer to reservation.
 * @param[in]	fromNow	-	It's the number of seconds into the future that this task is to be activated.
 *
 * @return	int
 * @retval	0	: task was created and put on timed task list
 * @retval	error code	: if a problem was detected
 */
int
gen_negI_deleteResv(resc_resv *presv, long fromNow)
{
	struct work_task *ptask;
	int rc = 0; /*assume success*/
	long event = (long) time_now + fromNow;

	if ((ptask = set_task(WORK_Timed, event,
			      Time4_I_term, presv)) != 0) {

		/* set things so that the reservation going away results in
		 * any "yet to be processed" work tasks also going away
		 * and set up to notify Scheduler of new reservation-job
		 */

		append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);
		presv->ri_futuredr = 1;
	} else
		rc = PBSE_SYSTEM;

	return (rc);
}

/**
 * @brief
 * 		gen_future_deleteResv - creates a work_task for deleting a reservation
 * 		in the future and puts it on the "WORK_Timed" work_task list at the
 * 		appropriate (time sequential) location.  Argument "fromNow" is to be a
 * 		non-negative value.  It's the number of seconds into the future
 * 		(measured from from global variable "time_now") that the task is to become
 * 		active.
 *
 * @param[in,out]	presv	-	pointer to reservation.
 * @param[in]	fromNow	-	It's the number of seconds into the future that this task is to be activated.
 *
 * @return	int
 * @retval	0	: task was created and put on timed task list
 * @retval	error code	: if a problem was detected
 */
int
gen_future_deleteResv(resc_resv *presv, long fromNow)
{
	struct work_task *ptask = NULL;
	int rc = 0; /*assume success*/
	long event = (long) time_now + fromNow;

	if (presv->resv_end_task)
		delete_task(presv->resv_end_task);
	presv->resv_end_task = NULL;
	if ((ptask = set_task(WORK_Timed, event,
			      Time4resvFinish, presv)) != 0) {

		/* set things so that the reservation going away results in
		 * any "yet to be processed" work tasks also going away
		 * and set up to notify Scheduler of new reservation-job
		 */

		append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);
		presv->ri_futuredr = 1;
		presv->resv_end_task = ptask;
	} else
		rc = PBSE_SYSTEM;

	return (rc);
}

/**
 * @brief
 * 		gen_future_reply - creates a work_task to reply in the future to a
 * 		reservation request submitted now. Place on the "WORK_Timed" work_task
 * 		list at the appropriate (time sequential) location.  Argument "fromNow"
 * 		is to be a non-negative value.  It's the number of seconds into the future
 * 		(measured from from global variable "time_now") that the task is to become
 * 		active.
 *
 * @param[in,out]	presv	-	pointer to reservation.
 * @param[in]	fromNow	-	It's the number of seconds into the future that this task is to be activated.
 *
 * @return	int
 * @retval	0	: task was created and put on timed task list
 * @retval	error code	: if a problem was detected
 */
int
gen_future_reply(resc_resv *presv, long fromNow)
{
	struct work_task *ptask;
	int rc = 0; /*assume success*/
	long event = (long) time_now + fromNow;

	if ((ptask = set_task(WORK_Timed, event,
			      Time4reply, presv)) != 0) {

		/* set things so that the reservation going away results in
		 * any "yet to be processed" work tasks also going away
		 * and set up to notify Scheduler of new reservation-job
		 */

		append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);
		presv->ri_futuredr = 1;
	} else
		rc = PBSE_SYSTEM;

	return (rc);
}

/**
 * @brief
 * 		change_enableORstart - call this function to build and issue an internally
 *		generated request to the qmgr subsystem to change the value of either
 *		attributes "start" or "enable" for the queue associated with a general
 *		resources reservation.
 *
 * @note
 *		Notes:  the "issue_Drequest" function called in the body of this
 *		code causes the request to be dispatched to qmgr immediately
 *		(since its local) and the reply from qmgr will get handled
 *		by the "reply handling function" passed to issue_Drequest.
 *		The reply handler gets triggered by invocation of "next_task()"
 *		in the server's main loop, since it's put into a work_task on the
 *		"immediate_tasks" work_list.
 *
 * @param[in]	presv	-	pointer to reservation structure
 * @param[in]	which	-	Q_CHNG_START, Q_CHNG_ENABLE
 * @param[in]	value	-	"True", "False"
 *
 * @return	int
 * @retval	0	: if build and issuance successful
 * @retval	!=0	: error code if function fails
 */
int
change_enableORstart(resc_resv *presv, int which, char *value)
{
	extern char *msg_internalReqFail;
	struct batch_request *newreq;
	pbs_list_head *plhed;
	int len;
	svrattrl *psatl;
	struct work_task *pwt;
	char *at_name;
	int index;

	if (which == Q_CHNG_START && strcmp(value, ATR_TRUE) == 0 && !is_rattr_set(presv, RESV_ATR_resv_nodes))
		return (0);

	newreq = alloc_br(PBS_BATCH_Manager);
	if (newreq == NULL) {
		(void) sprintf(log_buffer, "batch request allocation failed");
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_RESV, LOG_NOTICE,
			  presv->ri_qs.ri_resvID, log_buffer);
		return (PBSE_SYSTEM);
	}

	newreq->rq_ind.rq_manager.rq_cmd = MGR_CMD_SET;
	newreq->rq_ind.rq_manager.rq_objtype = MGR_OBJ_QUEUE;
	newreq->rq_perm = ATR_DFLAG_MGWR | ATR_DFLAG_OPWR;
	(void) strcpy(newreq->rq_user, "pbs_server");
	(void) strcpy(newreq->rq_host, pbs_server_name);

	strcpy(newreq->rq_ind.rq_manager.rq_objname, get_rattr_str(presv, RESV_ATR_queue));

	CLEAR_HEAD(newreq->rq_ind.rq_manager.rq_attr);
	plhed = &newreq->rq_ind.rq_manager.rq_attr;

	if (which == Q_CHNG_ENABLE) {
		index = QA_ATR_Enabled;
		at_name = que_attr_def[index].at_name;
	} else if (which == Q_CHNG_START) {
		index = QA_ATR_Started;
		at_name = que_attr_def[index].at_name;
	} else
		return (PBSE_INTERNAL);

	len = strlen(value) + 1;
	if ((psatl = attrlist_create(at_name, NULL, len)) != NULL) {
		psatl->al_flags = que_attr_def[index].at_flags;
		strcpy(psatl->al_value, value);
		append_link(plhed, &psatl->al_link, psatl);
	} else {
		free_br(newreq);
		return (PBSE_INTERNAL);
	}

	if (issue_Drequest(PBS_LOCAL_CONNECTION, newreq,
			   handle_qmgr_reply_to_startORenable, &pwt, 0) == -1) {
		free_br(newreq);

		(void) sprintf(log_buffer, "%s", msg_internalReqFail);
		log_event(PBSEVENT_RESV, PBS_EVENTCLASS_RESV, LOG_NOTICE,
			  presv->ri_qs.ri_resvID, log_buffer);

		return (PBSE_mgrBatchReq);
	}
	tickle_for_reply();
	if (pwt)
		pwt->wt_parm2 = presv; /*needed to handle qmgr's response*/

	return (0);
}

/**
 * @brief
 * 		handle_qmgr_reply_to_startORenable - this is the function that's to be
 *		called to handle the qmgr's response to the request issued in
 *		"change_enableORstart()".  If not successful log a message.
 * @par
 *		This function should only be called through an INTERNALLY GENERATED
 *		request to another server (including ourself).
 *		It frees the request structure and closes the connection (handle).
 * @par
 *		In the work task entry, wt_event is the connection handle and
 *		wt_parm1 is a pointer to the request structure (that contains the reply).
 *		wt_parm2 should have the address of the reservation structure
 * @par
 *		THIS SHOULD NOT BE USED IF AN EXTERNAL (CLIENT) REQUEST IS "relayed",
 *		because the request/reply structure is still needed to reply back
 *		to the client.
 *
 * @param[in]	pwt	-	work task entry
 */
static void
handle_qmgr_reply_to_startORenable(struct work_task *pwt)
{
	extern char *msg_qEnabStartFail;
	struct batch_request *preq = pwt->wt_parm1;
	resc_resv *presv = pwt->wt_parm2;

	if (preq->rq_reply.brp_code) {

		(void) sprintf(log_buffer, "%s", msg_qEnabStartFail);
		log_event(PBSEVENT_RESV, PBS_EVENTCLASS_RESV, LOG_NOTICE,
			  presv->ri_qs.ri_resvID, log_buffer);
	}

	free_br((struct batch_request *) pwt->wt_parm1);
	if (pwt->wt_event != -1)
		svr_disconnect(pwt->wt_event);

	/*I don't know why, except for system error, that a
	 *the server couldn't set "start" or "enable" on one
	 *of its own queues.  However, if this happens it probably
	 *should result in the reservation being deleted by the
	 *server and a message sent back to the owner regarding the
	 *action.  We will pass on that for now.
	 */
}

/**
 * @brief
 * 		remove_deleted_resvs - Walk the server's "svr_allresvs"
 *		list and cause to be removed any reservation whose state
 *		is marked RESV_FINISHED.  Function used in "pbsd_init" code
 *
 *	@return	Nothing
 */
void
remove_deleted_resvs(void)
{
	resc_resv *presv, *nxresv;
	struct work_task *ptask;

	presv = (resc_resv *) GET_NEXT(svr_allresvs);
	while (presv) {
		nxresv = (resc_resv *) GET_NEXT(presv->ri_allresvs);

		if (presv->ri_qs.ri_state == RESV_FINISHED) {
			/*put a task on the server's "task_list_timed" that causes
			 *an internal BATCH_REQUEST_DeleteResv to be generated
			 *and issued against this reservation
			 */

			if ((ptask = set_task(WORK_Timed, time_now + 5,
					      Time4resvFinish, presv)) != 0) {

				/* set things so that the reservation going away results in
				 * any "yet to be processed" work tasks also going away
				 * and set up to notify Scheduler of new reservation-job
				 */

				append_link(&presv->ri_svrtask, &ptask->wt_linkobj, ptask);
			}
		} else if (presv->ri_qs.ri_state == RESV_DELETING_JOBS) {
				/* this will set up the task to finally move it to RESV_FINISHED */
				delete_occurrence_jobs(presv);
		}
		presv = nxresv;
	}
}

/**
 * @brief
 *  	degrade_corrupted_confirmed_resvs - Walk the server's "svr_allresvs"
 *  	list and cause to be degraded any reservation whose state
 *  	is marked RESV_CONFIRMED but is missing resv_nodes or resv_execvnodes.
 *  	Function used in "pbsd_init" code
 *
 * @return Nothing
 */
void
degrade_corrupted_confirmed_resvs(void)
{
	int is_degraded = 0;
	resc_resv *presv, *nxresv;
	long retry_time = 0;
	char *str_time;

	presv = (resc_resv *) GET_NEXT(svr_allresvs);
	while (presv) {
		nxresv = (resc_resv *) GET_NEXT(presv->ri_allresvs);
		/* if corrupted and already degraded we still need to set a retry time for the scheduler to be prodded again */
		if (presv->ri_qs.ri_state == RESV_CONFIRMED || presv->ri_qs.ri_state == RESV_DEGRADED) {
			if (get_rattr_long(presv, RESV_ATR_resv_standing))
				if (!(is_rattr_set(presv, RESV_ATR_resv_execvnodes)) || get_rattr_str(presv, RESV_ATR_resv_execvnodes) == NULL)
					is_degraded = 1;
			if (!(is_rattr_set(presv, RESV_ATR_resv_nodes)) || get_rattr_str(presv, RESV_ATR_resv_nodes) == NULL)
				is_degraded = 1;
		} else if (presv->ri_qs.ri_state == RESV_FINISHED && get_rattr_long(presv, RESV_ATR_resv_standing))
			if (get_rattr_long(presv, RESV_ATR_resv_idx) < get_rattr_long(presv, RESV_ATR_resv_count))
				/* should never keep a standing reservation in RESV_FINISHED state for anything but the last occurrence */
				/* if we don't degrade it then remove_deleted_resvs may create a task to nuke it */
				is_degraded = 1;
		if (is_degraded) {
			resv_setResvState(presv, RESV_DEGRADED, RESV_DEGRADED);
			/* there is no point in trying to reconfirm it immediately at server start,
			 * since the nodes will not have reported as free yet.
			 * One minute is a reasonable time to try, but jobs may already have filled
			 * some nodes by then. Tough luck, but it's the best we can do. It beats
			 * waiting for the default 600 seconds.
			 */
			retry_time = determine_resv_retry(presv);
			if (time_now + 60 < retry_time)
				retry_time = time_now + 60;
			str_time = ctime(&retry_time);
			if (str_time == NULL)
				str_time = "";
			presv->ri_degraded_time = get_rattr_long(presv, RESV_ATR_start);
			/* bogus value, but avoid skipping a reconfirmation */
			log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID,
				   "Reservation with corrupted nodes, degrading with retry time set to %s", str_time);
			force_resv_retry(presv, retry_time);
		}
		presv = nxresv;
	}
}

/**
 * @brief
 *  	add_resv_beginEnd_tasks - for each reservation not in state
 *  	RESV_FINISHED add to "task_list_timed" the "begin" and
 *  	"end" reservation tasks as appropriate.  Function used
 *  	in "pbsd_init" code
 *
 * @return	none
 */
void
add_resv_beginEnd_tasks(void)
{
	resc_resv *presv;
	char txt[PBS_MAXSVRRESVID + 100];
	int rc;

	presv = (resc_resv *) GET_NEXT(svr_allresvs);
	while (presv) {
		rc = 0;
		if (presv->ri_qs.ri_state == RESV_CONFIRMED ||
		    presv->ri_qs.ri_state == RESV_RUNNING) {

			/* add "begin" and "end" tasks onto "task_list_timed" */

			if ((rc = gen_task_EndResvWindow(presv)) != 0) {
				sprintf(txt, "%s : EndResvWindow task creation failed",
					presv->ri_qs.ri_resvID);
				log_err(rc, "add_resv_beginEnd_tasks", txt);
			}
			if ((rc = gen_task_Time4resv(presv)) != 0) {
				sprintf(txt, "%s : Time4resv task creation failed",
					presv->ri_qs.ri_resvID);
				log_err(rc, "add_resv_beginEnd_tasks", txt);
			}
		} else if (presv->ri_qs.ri_state == RESV_UNCONFIRMED) {

			/* add "end" task onto "task_list_timed" */

			if ((rc = gen_task_EndResvWindow(presv)) != 0) {
				sprintf(txt, "%s : EndResvWindow task creation failed",
					presv->ri_qs.ri_resvID);
				log_err(rc, "add_resv_beginEnd_tasks", txt);
			}
		}

		presv = (resc_resv *) GET_NEXT(presv->ri_allresvs);
	}
}

/**
 * @brief
 * 		uniq_nameANDfile - develop a unique name and file in the directory
 *		pointed to by "pdir".  The root name of the file is initially
 *		given by "pname".  The name pointed to by pname may be modified
 *		in place in the process of arriving at a unique filename for
 *		the file.  The file, if generated, will have zero length.
 *
 * @param[in]	pname	-	The root name of the file is initially given by "pname"
 * @param[in]	psuffix	-	suffix of the name
 * @param[in]	pdir	-	points to the directory in which file contains.
 *
 * @return	int
 * @return	0	: on success
 * @retval	PBSE_*	: code on failure
 */
int
uniq_nameANDfile(char *pname, char *psuffix, char *pdir)
{
	int fds, L1, L2;
	int rc = 0;
	char *pc;
	char namebuf[MAXPATHLEN + 1];

	if (!pname || !psuffix || !pdir ||
	    !(L1 = strlen(pname)) ||
	    !(L2 = strlen(pdir)) ||
	    ((L1 + L2 + strlen(psuffix)) >= MAXPATHLEN))
		return (PBSE_INTERNAL);

	do {
		(void) strcpy(namebuf, pdir);
		(void) strcat(namebuf, pname);
		(void) strcat(namebuf, psuffix);
		fds = open(namebuf, O_CREAT | O_EXCL | O_WRONLY, 0600);
		if (fds < 0) {
			if (errno == EEXIST) {
				pc = pname + strlen(pname) - 1;
				while (!isprint((int) *pc)) {
					pc--;
					if (pc <= pname) {
						rc = PBSE_INTERNAL;
						break;
					}
				}
				(*pc)++;
			} else {
				rc = PBSE_SYSTEM;
				break;
			}
		}
	} while (fds < 0);

	if (fds)
		(void) close(fds);
	return (rc);
}

/**
 * @brief
 *		start_end_dur_wall - This function considers the information specified for
 *		start_time, end_time, duration and walltime.  Using what was
 *		specified, it computes those unspecified values that are
 *		possible to compute. If the initially supplied information
 *		is bogus, or not enough information is specified or results
 *		are inconsistent, the function returns "failure" otherwise,
 *		it returns "success".
 * @par
 * 		reservation attributes dealing with start, end, duration times
 *		can be modified by this function.  In addition, if this is
 *		happens to be a RESC_RESV_OBJECT  its "ri_qs.ri_stime",
 *		"ri_qs.ri_etime", and "ri_qs.ri_duration" fields are subject
 *		to modification.
 *
 * @param[in,out]	presv	-	the "resc_resv" object
 *
 * @return	int
 * @retval	0	: Success
 * @retval	!= 0	: don't have a complete or consistent set of
 * 					information, or possibly some other error
 * 					occurred - e.g. problem adding the "walltime"
 * 					resource entry if it doesn't exist
 */
int
start_end_dur_wall(resc_resv *presv)
{
	resource_def *rscdef = NULL;
	resource *prsc = NULL;
	attribute *pattr = NULL;
	attribute atemp = {0};
	int pstate = 0;
	long stime, etime, duration;

	int swcode = 0; /* "switch code" */
	int rc = 0;	/* return code, assume success */
	short check_start = 1;

	if (presv == 0)
		return (-1);

	rscdef = &svr_resc_def[RESC_WALLTIME];
	pstate = get_rattr_long(presv, RESV_ATR_state);
	stime = get_rattr_long(presv, RESV_ATR_start);
	etime = get_rattr_long(presv, RESV_ATR_end);
	duration = get_rattr_long(presv, RESV_ATR_duration);

	pattr = get_rattr(presv, RESV_ATR_resource);
	prsc = find_resc_entry(pattr, rscdef);
	check_start = !is_rattr_set(presv, RESV_ATR_job);

	if (pstate != RESV_BEING_ALTERED) {
		if (is_rattr_set(presv, RESV_ATR_start))
			swcode += 1; /* have start */
		if (is_rattr_set(presv, RESV_ATR_end))
			swcode += 2; /* have end */
		if (is_rattr_set(presv, RESV_ATR_duration))
			swcode += 4; /* have duration */
		if (prsc)
			swcode += 8; /* have walltime */
		else if (!(prsc = add_resource_entry(pattr, rscdef)))
			return (-1);
	} else {
		if (presv->ri_alter.ra_flags & RESV_DURATION_MODIFIED)
			swcode += 4;
		if (presv->ri_alter.ra_flags & RESV_END_TIME_MODIFIED)
			swcode += 2; /* calcualte start time */
		if (presv->ri_alter.ra_flags & RESV_START_TIME_MODIFIED)
			swcode += 1; /* calculate end time */
		if (presv->ri_alter.ra_flags == RESV_START_TIME_MODIFIED || presv->ri_alter.ra_flags == RESV_END_TIME_MODIFIED) {
			swcode = 3;
		}
	}

	atemp.at_flags = ATR_VFLAG_SET;
	atemp.at_type = ATR_TYPE_LONG;
	switch (swcode) {
		case 3: /* start, end */
			if (((check_start && (stime < time_now)) && (pstate != RESV_BEING_ALTERED)) ||
			    (etime <= stime))
				rc = -1;
			else {
				if (pstate == RESV_BEING_ALTERED) {
					presv->ri_alter.ra_flags |= RESV_DURATION_MODIFIED;
				}
				atemp.at_val.at_long = etime - stime;
				set_rattr_l_slim(presv, RESV_ATR_duration, atemp.at_val.at_long, SET);
				rscdef->rs_set(&prsc->rs_value, &atemp, SET);
			}
			break;

		case 4:
		case 5: /* start, duration */
			if (((check_start && stime < time_now) && (pstate != RESV_BEING_ALTERED)) ||
			    (duration <= 0))
				rc = -1;
			else {
				if (pstate == RESV_BEING_ALTERED) {
					presv->ri_alter.ra_flags |= RESV_END_TIME_MODIFIED;
				}
				set_rattr_l_slim(presv, RESV_ATR_end, stime + duration, SET);
				set_attr_l(&atemp, duration, SET);
				rscdef->rs_set(&prsc->rs_value, &atemp, SET);
			}
			break;

		case 7: /* start, end, duration */
			if (((check_start) && (stime < time_now)) ||
			    (etime < stime) ||
			    (duration <= 0) ||
			    ((etime - stime) !=
			     duration))
				rc = -1;
			else {
				atemp.at_val.at_long = duration;
				rscdef->rs_set(&prsc->rs_value, &atemp, SET);
			}
			break;

		case 6:
		case 8: /* end, duration */
			if ((duration <= 0) ||
			    (etime - duration <
			     time_now)) {
				rc = -1;
			} else {
				if (pstate == RESV_BEING_ALTERED) {
					presv->ri_alter.ra_flags |= RESV_START_TIME_MODIFIED;
				}
				set_rattr_l_slim(presv, RESV_ATR_start, etime - duration, SET);
				atemp.at_val.at_long = duration;
				rscdef->rs_set(&prsc->rs_value, &atemp, SET);
			}
			break;

		case 9: /* start, wall */
			if (((check_start) && (stime < time_now)) ||
			    (prsc->rs_value.at_val.at_long <= 0))
				rc = -1;
			else {
				if (pstate == RESV_BEING_ALTERED) {
					presv->ri_alter.ra_flags |= RESV_END_TIME_MODIFIED | RESV_DURATION_MODIFIED;
				}
				set_rattr_l_slim(presv, RESV_ATR_end, stime + prsc->rs_value.at_val.at_long, SET);
				set_rattr_l_slim(presv, RESV_ATR_duration, prsc->rs_value.at_val.at_long, SET);
			}
			break;

		case 10: /* end, wall */
			if ((prsc->rs_value.at_val.at_long <= 0) ||
			    (etime - prsc->rs_value.at_val.at_long <
			     time_now)) {
				rc = -1;
			} else {
				if (pstate == RESV_BEING_ALTERED) {
					presv->ri_alter.ra_flags |= RESV_START_TIME_MODIFIED;
				}
				set_rattr_l_slim(presv, RESV_ATR_start, etime - prsc->rs_value.at_val.at_long, SET);
				set_rattr_l_slim(presv, RESV_ATR_duration, prsc->rs_value.at_val.at_long, SET);
			}
			break;

		case 11: /* start, end, wall */
			if (((check_start) && (stime < time_now)) ||
			    (prsc->rs_value.at_val.at_long <= 0) ||
			    (etime - stime !=
			     prsc->rs_value.at_val.at_long))
				rc = -1;
			else {
				if (pstate == RESV_BEING_ALTERED) {
					presv->ri_alter.ra_flags |= RESV_DURATION_MODIFIED;
				}
				set_rattr_l_slim(presv, RESV_ATR_duration, prsc->rs_value.at_val.at_long, SET);
			}
			break;

		case 13: /* start, duration & wall */
			if (((check_start) && (stime < time_now)) ||
			    (prsc->rs_value.at_val.at_long != duration) ||
			    (duration <= 0))
				rc = -1;
			else {
				if (pstate == RESV_BEING_ALTERED) {
					presv->ri_alter.ra_flags |= RESV_END_TIME_MODIFIED;
				}
				set_rattr_l_slim(presv, RESV_ATR_end, stime + presv->ri_qs.ri_duration, SET);
			}
			break;

		case 15: /* start, end, duration & wall */
			if (((check_start) || (stime < time_now)) ||
			    (etime < stime) ||
			    (duration <= 0) ||
			    (prsc->rs_value.at_val.at_long != duration) ||
			    ((etime - stime) !=
			     duration))
				rc = -1;
			break;

		default:
			rc = -1;
	}

	if (is_sattr_set(SVR_ATR_resv_post_processing)) {
		duration += get_sattr_long(SVR_ATR_resv_post_processing);
		etime += get_sattr_long(SVR_ATR_resv_post_processing);
	}

	presv->ri_qs.ri_stime = get_rattr_long(presv, RESV_ATR_start);
	presv->ri_qs.ri_etime = get_rattr_long(presv, RESV_ATR_end);
	presv->ri_qs.ri_duration = get_rattr_long(presv, RESV_ATR_duration);

	return (rc);
}

/**
 * @brief
 * 		is_resv_window_in_future - Updates the reservation's state
 *		to RESV_FINISHED if the current time is already beyond
 *		the start of the reservation's window otherwise, it
 *		just returns.  It's used in pbsd_init.c to decide if
 *		an existing reservation, that's read back into the system
 *		from the disk at sever restart, should continue to remain.
 *
 * @param[in,out]	presv	-	reservation structure
 *
 * @return	Nothing
 */
void
is_resv_window_in_future(resc_resv *presv)
{
	int state, sub;

	eval_resvState(presv, RESVSTATE_is_resv_window_in_future, 0, &state,
		       &sub);
	resv_setResvState(presv, state, sub);
}

/**
 * @brief
 * 		resv_mailAction - Based on what was requested on reservation submission
 * 		and on who is issuing the current *request*, generate (or not) a mail
 * 		message about some aspect of the reservation to the appropriate parties
 * @par
 * 		This function makes use of mail function svr_mailownerResv () but
 * 		it is not intended that this be the only way that svr_mailownerResv
 * 		should be called, for we may not be presented with any related
 * 		batch_request at the point where some message ought to be issued -
 * 		e.g. the case, "reservation start-time has finally arrived."
 *
 * @param[in]	presv	-	reservation structure
 * @param[in]	preq	-	batch_request structure
 */
void
resv_mailAction(resc_resv *presv, struct batch_request *preq)
{
	int force;
	char text[PBS_MAXUSER + PBS_MAXHOSTNAME + 64];

	if (preq->rq_type != PBS_BATCH_DeleteResv)
		return;

	snprintf(text, sizeof(text), "Requesting party: %s@%s",
		 preq->rq_user, preq->rq_host);
#ifdef NAS /* localmod 028 */
	/*
	 * The extend attribute can contain additional explanation
	 */
	if (preq->rq_extend) {
		size_t len;
		len = strlen(text);
		snprintf(text + len, sizeof(text) - len,
			 "\nReason: %s\n", preq->rq_extend);
	}
#endif /* localmod 028 */
	if (preq->rq_fromsvr != 0)
		force = MAIL_FORCE;
	else
		force = MAIL_NORMAL;
	svr_mailownerResv(presv, MAIL_ABORT, force, text);
}

/**
 * @brief
 * 		This function converts long to hh:mm:ss format
 *
 * @param[in]	l	-	time passed as a long number
 *
 * @return	pointer to the converted time string, allocated
 *         	by the function. Memory deallocation rests in
 *         	the hands of caller.
 * @retval	NULL	: failure
 */

char *
convert_long_to_time(long l)
{
	unsigned int h;
	unsigned int temp;
	int m;
	int s;
	int hr_len = 0;
	char *str;

	temp = h = l / 3600;
	l = l % 3600;
	m = l / 60;
	l = l % 60;
	s = l;
	while (temp > 0) {
		hr_len++;
		temp = temp / 10;
	}
	/* Allocating memory for hours field and other 9 chars which can
	 * accommodate "hh:mm:ss\0"
	 */
	str = (char *) malloc(hr_len + 9);
	if (str == NULL)
		return NULL;

	sprintf(str, "%02u:%02d:%02d", h, m, s);
	return str;
}

/**
 * @brief
 * 		  determine_accruetype
 *        determine accrue_type for new job or after overlay upgrade
 *        or after recovery.
 *        If coming after an overlay upgrade or enabling accrual after a long time,
 *        jobs which entered system when accrual was false or those before the upgrade,
 *        accrual will begin from start time of the scheduling cycle.
 *        if, scheduler cannot determine accrual type, then server determines it and
 *        accrual begins from the time job was created in the server.
 *
 *        precedence of accrual :
 *	        1) run time, exit time 2) ineligible time 3) eligible time
 * @param[in]	pjob	-	Job whose accrue type needs to be determined
 * @return	long
 * @retval	JOB_ELIGIBLE	-	when job is eligible to accrue eligible_time
 * @retval	JOB_INELIGIBLE	-	when job is ineligible to accrue eligible_time
 * @retval	JOB_RUNNING	-	when job is running or provisioning
 * @retval	JOB_EXIT	-	when job is exiting
 * @retval	-1	- when accrue type couldn'tbe determined
 */
long
determine_accruetype(job *pjob)
{
	struct pbs_queue *pque;
	long temphold;

	/* have to determine accrue type */

	/* if job is truely running or provisioning */
	if (check_job_state(pjob, JOB_STATE_LTR_RUNNING) &&
	    (check_job_substate(pjob, JOB_SUBSTATE_RUNNING) ||
	     check_job_substate(pjob, JOB_SUBSTATE_PROVISION)))
		return JOB_RUNNING;

	/* if job exit */
	if (check_job_state(pjob, JOB_STATE_LTR_EXITING))
		return JOB_EXIT;

	/* handling qsub -a, waiting with substate 30 ; accrue ineligible time */
	if (get_jattr_long(pjob, JOB_ATR_exectime))
		return JOB_INELIGIBLE;

	/* 'user' hold applied ; accrue ineligible time */
	if (get_jattr_long(pjob, JOB_ATR_hold) & HOLD_u)
		return JOB_INELIGIBLE;

	/* other than 'user' hold applied */
	/* accrue type is set to JOB_INELIGIBLE incase a job has dependency */
	/* on another job and hold type is set to system hold. */
	/* For all other cases accrue type is set to JOB_ELIGIBLE. */
	temphold = get_jattr_long(pjob, JOB_ATR_hold);
	if (temphold & HOLD_o || temphold & HOLD_bad_password || temphold & HOLD_s) {
		if ((check_job_substate(pjob, JOB_SUBSTATE_DEPNHOLD)) && (temphold & HOLD_s))
			return JOB_INELIGIBLE;

		return JOB_ELIGIBLE;
	}

	/* scheduler suspend job ; accrue eligible time */
	if (check_job_substate(pjob, JOB_SUBSTATE_SCHSUSP))
		return JOB_ELIGIBLE;

	/* qsig suspended job ; accrue eligible time */
	if (check_job_substate(pjob, JOB_SUBSTATE_SUSPEND))
		return JOB_ELIGIBLE;

	/* check for stopped queue: routing and execute ; accrue eligible time */
	pque = find_queuebyname(pjob->ji_qs.ji_queue);
	if (pque != NULL)
		if (get_qattr_long(pque, QA_ATR_Started) == 0)
			return JOB_ELIGIBLE;

	/* The job doesn't have any reason to not accrue eligible time (e.g. on hold), so it should accrue it */
	if (check_job_state(pjob, JOB_STATE_LTR_TRANSIT) &&
	    check_job_substate(pjob, JOB_SUBSTATE_TRANSIN))
		return JOB_ELIGIBLE;

	return -1;
}

/**
 * @brief
 * 		update_eligible_time - this function is responsible for calculating eligible time accrued
 *			  for a job. it also updates the accrue type and sample start time
 *
 * @param[in]	newaccruetype	-	new accrue type to be set
 * @param[in,out]	pjob	-	pointer to job
 *
 * @return	int
 * @retval	0	: success
 * @retval	1	: if updating same accrue type or do nothing
 *
 * @par MT-Safe: No
 */

int
update_eligible_time(long newaccruetype, job *pjob)
{
	static char *msg[] = {"initial_time", "ineligible_time", "eligible_time", "run_time", "exiting"};
	char *strtime;
	static char errtime[] = "00:00:00";
	char str[256];
	long accrued_time = 0; /* accrued time */
	long oldaccruetype = get_jattr_long(pjob, JOB_ATR_accrue_type);
	long timestamp = (long) time_now; /* time since accrual begins */

	/* check if updating same accrue type or do nothing */
	if (newaccruetype == oldaccruetype || newaccruetype == -1)
		return 1;

	/* time since accrue type last changed  */
	accrued_time = timestamp - get_jattr_long(pjob, JOB_ATR_sample_starttime);

	if (oldaccruetype == JOB_ELIGIBLE && accrued_time > 0)
		set_jattr_l_slim(pjob, JOB_ATR_eligible_time, accrued_time, INCR);

	/* change type to new accrue type, update start time to mark change of accrue type */
	set_jattr_l_slim(pjob, JOB_ATR_accrue_type, newaccruetype, SET);
	set_jattr_l_slim(pjob, JOB_ATR_sample_starttime, timestamp, SET);

	/* Prepare and print log message */
	strtime = convert_long_to_time(get_jattr_long(pjob, JOB_ATR_eligible_time));
	if (strtime == NULL)
		strtime = errtime;

	sprintf(str, "Accrue type has changed to %s, previous accrue type was %s for %ld secs, total eligible_time=%s",
		msg[newaccruetype], msg[oldaccruetype], accrued_time, strtime);
	log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, str);

	if (strtime != NULL && strtime != errtime)
		free(strtime);

	return 0;
}

/**
 * @brief
 * 		alter_eligibletime 	this is action function for eligible_time.
 *			qalter will alter the value of eligible_time,
 *			hence need to set sample_starttime to now so that
 *			accrual begins with this change. log message is
 *			printed to mark the change. the time accrued while
 *			in present accruetype from last change is printed.
 *			Accrual continues from now.
 *
 * @param[in]	pattr	-	attribute structure
 * @param[in,out]	pobject	-	object which will be later typecasted into job type.
 * @param[in]	actmode	-	action mode
 *
 * @par MT-Safe: No
 */
int
alter_eligibletime(attribute *pattr, void *pobject, int actmode)
{
	static char errtime[] = "00:00:00";
	long timestamp = (long) time_now; /* accrual begins from here */
	job *pjob = (job *) pobject;
	long oldaccruetype = get_jattr_long(pjob, JOB_ATR_accrue_type);
	long newaccruetype = oldaccruetype; /* We are not changing accrue type */

	/* distinguish between genuine qalter and call by action */
	if (actmode == ATR_ACTION_ALTER) {

		/* eligible_time_enable is OFF, then error */
		if (!get_sattr_long(SVR_ATR_EligibleTimeEnable)) {
			return PBSE_ETEERROR;
		} else {
			long accrued_time;
			char *strtime;
			char logstr[256];
			static char *msg[] = {
				"initial_time",
				"ineligible_time",
				"eligible_time",
				"run_time",
				"exiting"};

			accrued_time = (long) time_now -
				       get_jattr_long(pjob, JOB_ATR_sample_starttime);

			/* Sample time accrual continues with this time .... */
			set_jattr_l_slim(pjob, JOB_ATR_sample_starttime, timestamp, SET);
			/* eligible_time is set to new value again in modify_job_attr.
			 * this is for log message, we have the new value anyways.
			 */
			strtime = convert_long_to_time(pattr->at_val.at_long);

			sprintf(logstr, "Accrue type is %s, previous accrue type was %s for %ld secs, due to qalter total eligible_time=%s",
				msg[newaccruetype], msg[oldaccruetype], accrued_time, strtime != NULL ? strtime : errtime);
			log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				  pjob->ji_qs.ji_jobid, logstr);

			free(strtime);

			return PBSE_NONE;
		}
	}
	return PBSE_NONE;
}
/**
 * @brief
 *		Check if the history of the  finished job needs to be saved or purged .
 *		If it needs to be saved and history management is ON then call svr_setjob_histinfo() to
 *		store the data in the server's history . Else if the history management is OFF or the
 *		request is to not store the jobs history then call job_purge()
 *
 * @param[in]	pjob	-	Pointer to the job structure
 *
 *
 */
void
svr_saveorpurge_finjobhist(job *pjob)
{
	int flag = 0;
	resc_resv *presv;

	presv = pjob->ji_myResv;

	flag = svr_chk_history_conf();
	if (flag && !pjob->ji_deletehistory) {
		svr_setjob_histinfo(pjob, T_FIN_JOB);
		if (pjob->ji_ajinfo != NULL)
			pjob->ji_ajinfo->tkm_flags &= ~TKMFLG_CHK_ARRAY;
		if (pjob->ji_terminated &&
		    (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) &&
		    pjob->ji_parentaj != NULL &&
		    pjob->ji_parentaj->ji_ajinfo != NULL)
			pjob->ji_parentaj->ji_ajinfo->tkm_dsubjsct++;
	} else {
		if (pjob->ji_deletehistory && flag) {
			log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB,
				  LOG_INFO, pjob->ji_qs.ji_jobid,
				  msg_also_deleted_job_history);
		}
		/* For an array subjob if exit status is non-zero mark sub state
		 * as JOB_SUBSTATE_FAILED. Otherwise set to JOB_SUBSTATE_FINISHED
		 * when current sub state is JOB_SUBSTATE_EXITED.
		 */
		if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
			if (pjob->ji_terminated)
				set_job_substate(pjob, JOB_SUBSTATE_TERMINATED);
			else if (is_jattr_set(pjob, JOB_ATR_exit_status)) {
				if (get_jattr_long(pjob, JOB_ATR_exit_status))
					set_job_substate(pjob, JOB_SUBSTATE_FAILED);
				else if (check_job_substate(pjob, JOB_SUBSTATE_EXITED))
					set_job_substate(pjob, JOB_SUBSTATE_FINISHED);
			}
		}
		job_purge(pjob);
	}
	set_idle_delete_task(presv);
}
/**
 * @brief
 *		Function name: svr_clean_job_history
 * @par Purpose: Periodically checks for the history jobs in the server and
 *		 purge the history jobs whose history duration exceeds the
 *		 configured job_history_duration server attribute.
 * @par Functionality: It is a work_task and reschedule itself after 2 mins if
 *		 and only if job_history_enable is set.
 *		Output: None
 *
 * @param[in]	pwt	-	work_task structure
 */
void
svr_clean_job_history(struct work_task *pwt)
{
	job *pjob;
	job *nxpjob = NULL;
	int walltime_used = 0;

	/*
	 * Keep track of time spent purging jobs, interrupts purge if necessary.
	 * Timed task in nearby future set if purge took too long.
	 * Timed task in far future only set if this task completes.
	 * Autotunes time between job history purges:
	 * - raised if last purge was short
	 * - lowered if this purge needs to be interrupted
	 */

	time_t begin_time;
	time_t end_time;
	static time_t time_between_tasks = SVR_CLEAN_JOBHIST_TM;

	begin_time = time(NULL);
	/* Initialize end_time, in case we do not get into the while loop */
	end_time = begin_time;

	/*
	 * Traverse through the SERVER job list and find the history
	 * jobs (job with state JOB_STATE_LTR_MOVED and JOB_STATE_LTR_FINISHED)
	 * which exceed the configured job_history_duration value and
	 * purge them immediately.
	 */
	pjob = (job *) GET_NEXT(svr_alljobs);

	while (pjob != NULL) {
		/* save the next job */
		nxpjob = (job *) GET_NEXT(pjob->ji_alljobs);

		if ((check_job_state(pjob, JOB_STATE_LTR_MOVED) && check_job_substate(pjob, JOB_SUBSTATE_FINISHED)) ||
		    (check_job_state(pjob, JOB_STATE_LTR_FINISHED)) ||
		    (check_job_state(pjob, JOB_STATE_LTR_EXPIRED))) {

			if (!(is_jattr_set(pjob, JOB_ATR_history_timestamp))) {
				if (check_job_state(pjob, JOB_STATE_LTR_MOVED))
					set_jattr_l_slim(pjob, JOB_ATR_history_timestamp, time_now, SET);
				else {
					if (((walltime_used = get_used_wall(pjob)) == -1) ||
					    !(is_jattr_set(pjob, JOB_ATR_stime))) {
						log_err(-1, "svr_clean_job_history",
							"Finished job missing start-time/walltime used, cannot clean history");
						pjob = nxpjob;
						continue;
					}
					set_jattr_l_slim(pjob, JOB_ATR_history_timestamp,
							 get_jattr_long(pjob, JOB_ATR_stime) + walltime_used, SET);
				}
				job_save_db(pjob);
			}

			if (time_now >= (get_jattr_long(pjob, JOB_ATR_history_timestamp) + svr_history_duration)) {
				job_purge(pjob);
				pjob = NULL;
			}
		}
		/* restore the saved next in pjob */
		pjob = nxpjob;

		/* check if we spent too long hogging the pbs_server process here */
		end_time = time(NULL);
		if ((end_time - begin_time) > SVR_CLEAN_JOBHIST_SECS) {
			/* Apparently the interval between history purges is too long.
			 * reduce it using factor 0.7
			 */
			time_between_tasks = (floor((double) time_between_tasks * 0.7));

			/* no use reducing to less than 4 * SVR_CLEAN_JOBHIST_SECS
			 * since we'll already schedule a continuation task here
			 */
			if (time_between_tasks < (4 * SVR_CLEAN_JOBHIST_SECS))
				time_between_tasks = 4 * SVR_CLEAN_JOBHIST_SECS;

			/* set up another work task in near future,
			 * but leave as much time as we spent in this routine for other work first
			 */
			if (!set_task(WORK_Timed,
				      (end_time + SVR_CLEAN_JOBHIST_SECS),
				      svr_clean_job_history, NULL)) {
				log_err(errno,
					"svr_clean_job_history",
					"Unable to set task for clean job history");
				/* on error to set task
					 * just continue purging the history
					 */
			} else
				/* but if we managed to set a task in near future, return;
				 * that task will continue where we left off
				 */
				return;
		}
	} /* end of while loop through jobs */

	/* We purged everything necessary in this task if we get here.
	 * set up another work task for next time period.
	 */
	if (pwt && svr_history_enable) {
		if (!set_task(WORK_Timed,
			      (time_now + time_between_tasks),
			      svr_clean_job_history, NULL)) {
			log_err(errno,
				"svr_clean_job_history",
				"Unable to set task for clean job history");
		}
	}

	/* try to move the time between tasks up again
	 * but only if we spent less than 2/3rds of what we were allowed to spend
	 * Note the last purge of a chain of purges will often tend to undo part of the lowering
	 * in the earlier incomplete purges -- that's OK: 0.7*1.1 is still smaller than 1
	 */

	if ((time_between_tasks < SVR_CLEAN_JOBHIST_TM) &&
	    ((end_time - begin_time) < floor((double) SVR_CLEAN_JOBHIST_SECS * 2 / 3))) {
		time_between_tasks = ceil((double) time_between_tasks * 1.1);
		if (time_between_tasks > SVR_CLEAN_JOBHIST_TM)
			time_between_tasks = SVR_CLEAN_JOBHIST_TM;
	}
}

/**
 * @brief
 * 		Function name: svr_histjob_update()
 * 		Description: Update the state/substate of the history job and save
 *		the job structure to the disk.
 * 		Input: 1) pjob 2) newstate 3) newsubstate
 *
 * @param[in,out]	pjob	-	job which needs to be updated.
 * @param[in]	newstate	-	internal copy of state
 * @param[in]	newsubstate	-	job sub-state
 *
 * @return	Nothing
 */
void
svr_histjob_update(job *pjob, char newstate, int newsubstate)
{
	char oldstate = get_job_state(pjob);
	pbs_queue *pque = pjob->ji_qhdr;

	/* update the state count in queue and server */
	if (oldstate != newstate) {
		int oldstatenum;
		int newstatenum;

		oldstatenum = state_char2int(oldstate);
		newstatenum = state_char2int(newstate);
		if (oldstatenum != -1)
			server.sv_jobstates[oldstatenum]--;
		if (newstatenum != -1)
			server.sv_jobstates[newstatenum]++;
		if (pque != NULL) {
			if (oldstatenum != -1)
				pque->qu_njstate[oldstatenum]--;
			if (newstatenum != -1)
				pque->qu_njstate[newstatenum]++;
		}
	}
	/* set the job state and state char */
	set_job_state(pjob, newstate);
	set_job_substate(pjob, newsubstate);

	/* For subjob update the state */
	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
		update_sj_parent(pjob->ji_parentaj, pjob, pjob->ji_qs.ji_jobid, oldstate, newstate);
		chk_array_doneness(pjob->ji_parentaj);
	}

	/* set the status of each subjob if it is an array job */
	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_ArrayJob) {
		int i;
		ajinfo_t *ptbl = pjob->ji_ajinfo;
		if (ptbl) {
			for (i = ptbl->tkm_start; i <= ptbl->tkm_end; i += ptbl->tkm_step) {
				int sjsst;
				char sjst;
				job *psubj = get_subjob_and_state(pjob, i, &sjst, &sjsst);
				if (psubj) {
					if (sjsst != JOB_SUBSTATE_TERMINATED &&
					    sjsst != JOB_SUBSTATE_FINISHED &&
					    sjsst != JOB_SUBSTATE_FAILED &&
					    sjsst != JOB_SUBSTATE_MOVED)
						svr_histjob_update(psubj, newstate, newsubstate);
					else
						svr_histjob_update(psubj, newstate, sjsst);
				} else
					update_sj_parent(pjob, NULL, create_subjob_id(pjob->ji_qs.ji_jobid, i), sjst, newstate);
			}
		}
	}

	job_save_db(pjob);
}

/**
 * @brief
 * 		 svr_chk_history_conf - Check if server is configured to keep job history info.
 *
 * @return	Boolen value
 * @retval	1	: if the server is configured for job history info
 * @retval	0	: otherwise. i.e. onei/both of svr_history_enable and
 * 					 svr_history_duration is/are zero.
 */
int
svr_chk_history_conf()
{
	return (svr_history_enable && svr_history_duration);
}

/**
 * @brief:
 *        update_job_finish_comment	- Append job comment on exit (finished/ terminated/ failed) of job.
 *
 * @param[in]  *pjob       -	job structure
 * @param[in]  newsubstate -	new substate of the job
 * @param[in]  user        -	username who invoked job termination
 *
 * @return void
 *
 */
void
update_job_finish_comment(job *pjob, int newsubstate, char *user)
{
	char buffer[LOG_BUF_SIZE + 1] = {'\0'};
	if ((is_jattr_set(pjob, JOB_ATR_Comment)) == 0) {
		return;
	}

	if (newsubstate == JOB_SUBSTATE_FINISHED) {
		snprintf(buffer, LOG_BUF_SIZE, "%s and finished",
			 get_jattr_str(pjob, JOB_ATR_Comment));
	} else if (newsubstate == JOB_SUBSTATE_FAILED) {
		if (is_jattr_set(pjob, JOB_ATR_exit_status)) {
			switch (get_jattr_long(pjob, JOB_ATR_exit_status)) {
				case JOB_EXEC_KILL_NCPUS_BURST:
					snprintf(buffer, LOG_BUF_SIZE, "%s and exceeded resource ncpus (burst)",
						 get_jattr_str(pjob, JOB_ATR_Comment));
					break;
				case JOB_EXEC_KILL_NCPUS_SUM:
					snprintf(buffer, LOG_BUF_SIZE, "%s and exceeded resource ncpus (sum)",
						 get_jattr_str(pjob, JOB_ATR_Comment));
					break;
				case JOB_EXEC_KILL_VMEM:
					snprintf(buffer, LOG_BUF_SIZE, "%s and exceeded resource vmem",
						 get_jattr_str(pjob, JOB_ATR_Comment));
					break;
				case JOB_EXEC_KILL_MEM:
					snprintf(buffer, LOG_BUF_SIZE, "%s and exceeded resource mem",
						 get_jattr_str(pjob, JOB_ATR_Comment));
					break;
				case JOB_EXEC_KILL_CPUT:
					snprintf(buffer, LOG_BUF_SIZE, "%s and exceeded resource cput",
						 get_jattr_str(pjob, JOB_ATR_Comment));
					break;
				case JOB_EXEC_KILL_WALLTIME:
					snprintf(buffer, LOG_BUF_SIZE, "%s and exceeded resource walltime",
						 get_jattr_str(pjob, JOB_ATR_Comment));
					break;
				default:
					snprintf(buffer, LOG_BUF_SIZE, "%s and failed",
						 get_jattr_str(pjob, JOB_ATR_Comment));
					break;
			}
		} else {
			snprintf(buffer, LOG_BUF_SIZE, "%s and failed",
				 get_jattr_str(pjob, JOB_ATR_Comment));
		}
	} else if (newsubstate == JOB_SUBSTATE_TERMINATED) {
		/* Don't overwrite the comment; if already set by req_deletejob2 */
		if (strstr(get_jattr_str(pjob, JOB_ATR_Comment), "terminated") == NULL) {
			if (user != NULL) {
				snprintf(buffer, LOG_BUF_SIZE, "%s and terminated by %s",
					 get_jattr_str(pjob, JOB_ATR_Comment),
					 user);
			} else {
				snprintf(buffer, LOG_BUF_SIZE, "%s and terminated",
					 get_jattr_str(pjob, JOB_ATR_Comment));
			}
		}
	}
	if (buffer[0] != '\0') {
		set_jattr_str_slim(pjob, JOB_ATR_Comment, buffer, NULL);
	}
}

/**
 * @brief
 *		Set the history info for the job and keep until cleaned up by the
 *		server after the svr_history_duration period.
 *		Called on the execution completion of jobs i.e. normal job (non-array)
 *		or sub jobs; and when an array job is done (all subjobs complete).
 *		Also called when locally created normal jobs and array jobs are
 *		moved/routed to a new server.
 *
 * @param[in]	*pjob	-	job structure
 * @param[in]	type	-	type of history
 *	  						type = T_FIN_JOB for FINISHED jobs
 *	  						type = T_MOV_JOB for MOVED jobs
 *	  						type = T_MOM_DOWN for non-rerunnable jobs FAILED
 *	  						because MOM went down.
 *
 * @return	void
 */

void
svr_setjob_histinfo(job *pjob, histjob_type type)
{
	char newstate = 'T';
	int newsubstate = 0;
	ajinfo_t *ptbl = NULL;

	if (type == T_MOV_JOB) { /* MOVED job */
		char *destination = pjob->ji_qs.ji_destin;
		char *tmpstr = NULL;
		char qname[PBS_MAXROUTEDEST + 1];

		if (destination == NULL || *destination == '\0') {
			return;
		}

		/*
		 * If the move_job request comes from the scheduler because of
		 * peer-2-peer scheduling, then destin will have port number
		 * (format: "[<queue>]<server>:<portno>")which is not required,
		 * so strip the string after ':'.
		 */
		tmpstr = strchr(destination, ':');
		if (tmpstr != NULL) {
			*tmpstr = '\0';
		}

		sprintf(log_buffer,
			"Job Moved to destination: \"%s\"", destination);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);

		/* put the accounting log for MOVED job */
		sprintf(log_buffer, "destination=%s", destination);
		account_record(PBS_ACCT_MOVED, pjob, log_buffer);

		/*
		 * parse the queue name from the destination
		 * set the queue attribute to the destination
		 * set the server's ji_queue info to just the queue name
		 */
		snprintf(qname, sizeof(qname), "%s", destination);

		/* strip off the portion that isn't the queue name */
		tmpstr = strchr(qname, '@');
		if (tmpstr != NULL) {
			*tmpstr = '\0';
		}
		snprintf(pjob->ji_qs.ji_queue, sizeof(pjob->ji_qs.ji_queue),
			 "%.*s", PBS_MAXQUEUENAME, qname);

		/* Set the queue attribute to destination */
		set_jattr_generic(pjob, JOB_ATR_in_queue, destination, NULL, SET);

		/* set the job comment attr with destination */
		sprintf(log_buffer, "Job has been moved to \"%s\"", destination);
		set_jattr_generic(pjob, JOB_ATR_Comment, log_buffer, NULL, SET);

		/*
		 * SET the NEW STATE/SUB-STATE for the job (which is moved).
		 * New STATE for the job will be JOB_STATE_LTR_MOVED and new
		 * SUBSTATE will be JOB_SUBSTATE_MOVED.
		 */
		newstate = JOB_STATE_LTR_MOVED;
		newsubstate = JOB_SUBSTATE_MOVED;

	} else if (type == T_FIN_JOB) {
		/*
		 * FINISHED job:
		 * set the OGSA-BES compliant substate for the job.
		 * If it is terminated by deljob batch request, then
		 * set it as TERMINATED. Else, if the job has run and
		 * exited with non-zero exit status, then it is FAILED,
		 * otherwise FINISHED.
		 */
		newstate = (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) ? JOB_STATE_LTR_EXPIRED : JOB_STATE_LTR_FINISHED; /* default X for subjob, F for other jobs */
		newsubstate = JOB_SUBSTATE_FINISHED;									  /* default */

		/* If Array job, handle here */
		if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_ArrayJob) &&
		    (ptbl = pjob->ji_ajinfo)) {
			if (pjob->ji_terminated)
				newsubstate = JOB_SUBSTATE_TERMINATED;
			else {
				int i;
				for (i = ptbl->tkm_start; i <= ptbl->tkm_end; i += ptbl->tkm_step) {
					int sjsst;
					get_subjob_and_state(pjob, i, NULL, &sjsst);
					if (sjsst == JOB_SUBSTATE_FAILED || sjsst == JOB_SUBSTATE_TERMINATED) {
						newsubstate = sjsst;
						break;
					}
				}
			}
		} else { /* Non-Array job */
			if (pjob->ji_terminated) {
				newsubstate = JOB_SUBSTATE_TERMINATED;
			} else if (is_jattr_set(pjob, JOB_ATR_exit_status)) {
				if (get_jattr_long(pjob, JOB_ATR_exit_status))
					newsubstate = JOB_SUBSTATE_FAILED;
			}
		}
		update_job_finish_comment(pjob, newsubstate, NULL);
	} else if (type == T_MOM_DOWN) {
		newstate = JOB_STATE_LTR_FINISHED;
		newsubstate = JOB_SUBSTATE_FAILED;
	}

	/* if the job is not already in MOVED or FINISHED state, then */
	/* decrement the entity job counts and entity resource sums   */

	if (!check_job_state(pjob, JOB_STATE_LTR_MOVED) &&
	    !check_job_state(pjob, JOB_STATE_LTR_EXPIRED) &&
	    !check_job_state(pjob, JOB_STATE_LTR_FINISHED)) {
		account_entity_limit_usages(pjob, NULL, NULL, DECR,
					    pjob->ji_etlimit_decr_queued ? ETLIM_ACC_ALL_MAX : ETLIM_ACC_ALL);
		account_entity_limit_usages(pjob, pjob->ji_qhdr, NULL, DECR,
					    pjob->ji_etlimit_decr_queued ? ETLIM_ACC_ALL_MAX : ETLIM_ACC_ALL);
	}

	/* set the history timestamp */
	set_jattr_l_slim(pjob, JOB_ATR_history_timestamp, time_now, SET);
	/* update the history job state and substate */
	svr_histjob_update(pjob, newstate, newsubstate);

	/*
	 * Work tasks on history jobs are not required and may change the
	 * history info which is dangerous, so better delete them.
	 */
	free_job_work_tasks(pjob);
}

/**
 * @brief
 * 		svr_chk_histjob - check whether job is a history job: called from
 * 			req_stat_job() if type = 1;
 *
 * @param[in]	pjob	-	job structure to be checked
 *
 * @return	PBSE_*
 * @retval	PBSE_NONE	: if it is not a history job or feature is not enabled.
 * @retval	PBSE_UNKJOBID	: if it is a moved job but active at remote server
 * @retval	PBSE_HISTJOBID	: if it is a Finished job or moved and finished
 *	    		    			at remote server.
 */

int
svr_chk_histjob(job *pjob)
{
	int rc = PBSE_NONE;

	/*
	 * If server is not configured for history job information,
	 * no need to check further, return PBSE_NONE.
	 */
	if (svr_history_enable == 0)
		return PBSE_NONE;

	/*
	 * If it is FINISHED or MOVED with substate FINISHED, then
	 * return PBSE_HISTJOBID otherwise PBSE_NONE.
	 */
	if (pjob) {
		switch (get_job_state(pjob)) {
			case JOB_STATE_LTR_FINISHED:
				rc = PBSE_HISTJOBID;
				break;
			case JOB_STATE_LTR_MOVED:
				if (check_job_substate(pjob, JOB_SUBSTATE_FINISHED))
					rc = PBSE_HISTJOBID;
				else /* other than JOB_SUBSTATE_FINISHED */
					rc = PBSE_UNKJOBID;
				break;
		}
	}
	return rc;
}

/**
 * @brief
 * 	Finish the request to mom to update a job's exec_* values.
 *	Both the mom request and the originating client request are acknowledged.
 *
 * @param[in,out]	pwt -	work_task structure, containing info
 *				about the mom request and the client request.
 * @return none
 */
static void
post_send_job_exec_update_req(struct work_task *pwt)
{
	struct batch_request *mom_preq = NULL;
	struct batch_request *cli_preq = NULL;
	int bcode = 0;

	if (pwt == NULL)
		return;

	if (pwt->wt_aux2 != PROT_TPP)
		svr_disconnect(pwt->wt_event); /* close connection to MOM */
	mom_preq = pwt->wt_parm1;
	mom_preq->rq_conn = mom_preq->rq_orgconn; /* restore socket to client */
	bcode = mom_preq->rq_reply.brp_code;

	cli_preq = pwt->wt_parm2;

	if (bcode) {
		char err_msg[LOG_BUF_SIZE];

		/* also take note of the reject msg if any */
		if (mom_preq->rq_reply.brp_choice == BATCH_REPLY_CHOICE_Text) {
			(void) snprintf(err_msg, sizeof(err_msg), "%s", mom_preq->rq_reply.brp_un.brp_txt.brp_str);
		} else {
			(void) snprintf(err_msg, sizeof(err_msg), msg_mombadmodify, bcode);
		}
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, mom_preq->rq_ind.rq_modify.rq_objname, err_msg);
		req_reject(bcode, 0, mom_preq);
		reply_text(cli_preq, bcode, err_msg);
	} else {
		reply_ack(mom_preq);
		if (cli_preq != NULL) {
			if (cli_preq->rq_extend == NULL) {
				reply_ack(cli_preq);
			} else {
				reply_text(cli_preq, PBSE_NONE, cli_preq->rq_extend);
			}
		}
	}
}

/**
 * @brief
 *
 * Communicate to the MS mom pjob's exec_vnode, exec_host,
 * exec_host2, and schedselect attributes.
 *
 * @param[in]	pjob - job structure
 * @param[out]  err_msg - a buffer of size 'err_msg_sz' supplied by the
 *       		  caller and upon a failure will contain an appropriate
 *       		  error message
 * @param[in]	err_msg_sz - size of 'err_msg' buf
 * @param[in]	reply_req - the batch request to reply to if any
 *
 * @return int
 * @retrval	0	- sucess
 * @retrval	1	- fail
 */

int
send_job_exec_update_to_mom(job *pjob, char *err_msg, int err_msg_sz,
			    struct batch_request *reply_req)
{
	struct batch_request *newreq;
	char *new_exec_vnode = NULL;
	char *new_exec_host = NULL;
	char *new_exec_host2 = NULL;
	int rc = 1;
	int num_updates = 0;
	struct work_task *pwt = NULL;

	if (pjob == NULL) {
		log_err(-1, __func__, "bad job parameter");
		return (1);
	}

	if ((err_msg[0] != '\0') && (reply_req != NULL)) {
		/*
		 * be sure to save/send this extra info in
		 * 'err_msg' buf
		 */
		reply_req->rq_extend = strdup(err_msg);
		if (reply_req->rq_extend == NULL) {
			log_err(-1, __func__, "strdup failed");
			return (1);
		}
	}

	newreq = alloc_br(PBS_BATCH_ModifyJob);

	if (newreq == (struct batch_request *) 0) {
		log_err(-1, __func__, "failed to alloc_br for PBS_MATCH_modifyjob");
		return (1);
	}
	CLEAR_HEAD(newreq->rq_ind.rq_modify.rq_attr);

	(void) strcpy(newreq->rq_ind.rq_modify.rq_objname,
		      pjob->ji_qs.ji_jobid);

	if (is_jattr_set(pjob, JOB_ATR_exec_vnode)) {
		new_exec_vnode =
			get_jattr_str(pjob, JOB_ATR_exec_vnode);

		if (add_to_svrattrl_list(
			    &(newreq->rq_ind.rq_modify.rq_attr),
			    ATTR_execvnode, NULL, new_exec_vnode, 0,
			    NULL) == -1) {
			if ((err_msg != NULL) && (err_msg_sz > 0)) {
				snprintf(err_msg, err_msg_sz, "failed to add_to_svrattrl_list(%s,%s,%s)", ATTR_execvnode, "", new_exec_vnode);
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, err_msg);
			}
			goto send_job_exec_update_exit;
		}
		num_updates++;
	}

	if (is_jattr_set(pjob, JOB_ATR_exec_host)) {
		new_exec_host =
			get_jattr_str(pjob, JOB_ATR_exec_host);

		if (add_to_svrattrl_list(
			    &(newreq->rq_ind.rq_modify.rq_attr),
			    ATTR_exechost, NULL, new_exec_host, 0, NULL) == -1) {
			if ((err_msg != NULL) && (err_msg_sz > 0)) {
				snprintf(err_msg, err_msg_sz, "failed to add_to_svrattrl_list(%s,%s,%s)", ATTR_exechost, "", new_exec_host);
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, err_msg);
			}
			goto send_job_exec_update_exit;
		}
		num_updates++;
	}

	if (is_jattr_set(pjob, JOB_ATR_exec_host2)) {
		new_exec_host2 =
			get_jattr_str(pjob, JOB_ATR_exec_host2);

		if (add_to_svrattrl_list(
			    &(newreq->rq_ind.rq_modify.rq_attr),
			    ATTR_exechost2, NULL, new_exec_host2, 0, NULL) == -1) {
			if ((err_msg != NULL) && (err_msg_sz > 0)) {
				snprintf(err_msg, err_msg_sz, "failed to add_to_svrattrl_list(%s,%s,%s)", ATTR_exechost2, "", new_exec_host2);
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, err_msg);
			}
			goto send_job_exec_update_exit;
		}
		num_updates++;
	}

	if (is_jattr_set(pjob, JOB_ATR_SchedSelect)) {
		if (add_to_svrattrl_list(
			    &(newreq->rq_ind.rq_modify.rq_attr),
			    ATTR_SchedSelect, NULL,
			    get_jattr_str(pjob, JOB_ATR_SchedSelect),
			    0, NULL) == -1) {
			if ((err_msg != NULL) && (err_msg_sz > 0)) {
				snprintf(err_msg, err_msg_sz, "failed to add_to_svrattrl_list(%s,%s,%s)", ATTR_SchedSelect, "", get_jattr_str(pjob, JOB_ATR_SchedSelect));
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, err_msg);
			}
			goto send_job_exec_update_exit;
		}
		num_updates++;
	}

	if ((is_jattr_set(pjob, JOB_ATR_resource)) != 0) {
		pbs_list_head collectresc;
		svrattrl *psvrl;
		attribute_def *objatrdef;
		extern int resc_access_perm;

		objatrdef = &job_attr_def[(int) JOB_ATR_resource];
		CLEAR_HEAD(collectresc);
		resc_access_perm = READ_ONLY;
		if (objatrdef->at_encode(get_jattr(pjob, JOB_ATR_resource), &collectresc, objatrdef->at_name, NULL, ATR_ENCODE_CLIENT, NULL) > 0) {

			psvrl = (svrattrl *) GET_NEXT(collectresc);
			while (psvrl) {
				if (add_to_svrattrl_list(
					    &(newreq->rq_ind.rq_modify.rq_attr),
					    objatrdef->at_name, psvrl->al_resc,
					    psvrl->al_value, 0, NULL) == -1) {
					free_attrlist(&collectresc);
					if ((err_msg != NULL) && (err_msg_sz > 0)) {
						snprintf(err_msg, err_msg_sz, "failed to add_to_svrattrl_list(%s,%s,%s)", objatrdef->at_name, psvrl->al_resc, psvrl->al_value);
						log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, err_msg);
					}
					goto send_job_exec_update_exit;
				}
				num_updates++;
				psvrl = (svrattrl *) GET_NEXT(psvrl->al_link);
			}
			free_attrlist(&collectresc);
		}
	}

	/* pass the request on to MOM */

	if (num_updates > 0) {
		rc = relay_to_mom2(pjob, newreq,
				   post_send_job_exec_update_req, &pwt);
		if (rc != 0) {
			log_err(-1, __func__, "failed telling mom of the request");
		} else {
			pwt->wt_parm2 = reply_req;
		}
	} else {
		/* no updates, ok */
		rc = 0;
	}

send_job_exec_update_exit:

	if ((rc != 0) || (num_updates == 0)) {
		free_br(newreq);
	}

	return (rc);
}

/**
 *
 * @brief
 *	Extracts mom hostnames from exechostx and adds it to list
 *
 * @param[in/out]	to_head - destination reliable_job_node list
 * @param[in]	exechostx - string in exechost format
 *
 * @return 	void
 */
static void
populate_mom_list(pbs_list_head *to_head, char *exechostx)
{
	char *hostn = NULL, *last = NULL, *peh;
	int hasprn = 0;

	if (!to_head || !exechostx || !*exechostx) {
		log_err(-1, __func__, "bad param passed");
		return;
	}

	peh = strdup(exechostx);
	if (peh == NULL) {
		log_err(errno, __func__, "strdup error");
		return;
	}

	CLEAR_HEAD((*to_head));

	for (hostn = parse_plus_spec_r(peh, &last, &hasprn);
	     hostn;
	     hostn = parse_plus_spec_r(last, &last, &hasprn)) {
		if (reliable_job_node_add(to_head, strtok(hostn, ":/")) == -1) {
			free(peh);
			return;
		}
	}
	free(peh);
	return;
}

/**
 * @brief
 *	returns a copy of partial select string representing the MS (first) chunk
 *	Note - caller to free the returned string pointer
 *
 * @param[in]		select_str - pointer to complete schedselect string
 *
 * @return char *
 * @retval ptr	pointer to malloc'd string containing ms (first) chunk's select str
 *
 * @note
 * caller to free the returned string pointer
*/
static char *
get_ms_select_chunk(char *select_str)
{
	char *slast, *selbuf, *psubspec, *retval = NULL;
	int hpn;

	if (select_str == NULL) {
		log_err(-1, __func__, "bad param passed");
		return (NULL);
	}

	selbuf = strdup(select_str);
	if (selbuf == NULL) {
		log_err(errno, __func__, "strdup fail");
		return (NULL);
	}
	psubspec = parse_plus_spec_r(selbuf, &slast, &hpn);

	if (psubspec) {
		while (*psubspec && !isalpha(*(psubspec++)))
			; /* one line loop */

		if (!(retval = strdup(--psubspec)))
			log_err(errno, __func__, "strdup fail");
	}

	free(selbuf);
	return retval;
}
/**
 * @brief
 *	Recreates the pjob's exec_vnode, updating at the same time
 *	its corresponding exec_host and exec_host2 attributes
 *	by taking out the vnodes managed by sister moms.
 *
 * @param[in,out]	pjob - job structure
 * @param[in]		vnodelist - if non-NULL, lists the vnodes to be
 *				freed whose parent mom is a sister mom.
 *				if NULL, releases all the sister
 *				vnodes assigned to 'pjob'
 * @param[in]		keep_select - non-NULL means it's a select string that
 *				describes vnodes to be kept while freeing all other vnodes
 *				assigned to 'pjob' whose parent mom is a sister mom.
 * @param[out]  err_msg - if function returns != 0 (failure), return
 *			  any error message in this buffer.
 * @param[int]	err_msg_sz - size of 'err_msg' buf.
 * @return int
 * @retval 0	for success
 * @reval 1	for error
*/
int
recreate_exec_vnode(job *pjob, char *vnodelist, char *keep_select, char *err_msg,
		    int err_msg_sz)
{
	char *exec_vnode = NULL;
	char *exec_host = NULL;
	char *exec_host2 = NULL;
	char *new_exec_vnode = NULL;
	char *new_exec_host = NULL;
	char *new_exec_host2 = NULL;
	char *new_select = NULL;
	char *schedselect = NULL;
	char *deallocated_execvnode = NULL;
	char *new_deallocated_execvnode = NULL;
	resource_def *prdefsl = NULL;
	resource *presc;
	int rc = 1;
	relnodes_input_t r_input;
	relnodes_input_vnodelist_t r_input_vnlist;
	relnodes_input_select_t r_input_keep_select;
	pbs_list_head succeeded_mom_list;

	if (pjob == NULL) {
		log_err(-1, __func__, "bad job parameter");
		return (1);
	}

	if ((!check_job_state(pjob, JOB_STATE_LTR_RUNNING)) &&
	    (!check_job_state(pjob, JOB_STATE_LTR_EXITING))) {
		log_err(-1, __func__, "job not in running or exiting state");
		return (1);
	}

	if ((is_jattr_set(pjob, JOB_ATR_exec_vnode)) == 0) {
		log_err(-1, __func__, "exec_vnode is not set");
		return (1);
	}

	if ((is_jattr_set(pjob, JOB_ATR_exec_host)) == 0) {
		log_err(-1, __func__, "exec_host is not set");
		return (1);
	}

	if ((is_jattr_set(pjob, JOB_ATR_exec_host2)) == 0) {
		log_err(-1, __func__, "exec_host2 is not set");
		return (1);
	}

	if ((is_jattr_set(pjob, JOB_ATR_SchedSelect)) == 0) {
		log_err(-1, __func__, "schedselect is not set");
		return (1);
	}

	exec_vnode = get_jattr_str(pjob, JOB_ATR_exec_vnode);

	exec_host = get_jattr_str(pjob, JOB_ATR_exec_host);

	exec_host2 = get_jattr_str(pjob, JOB_ATR_exec_host2);

	schedselect = get_jattr_str(pjob, JOB_ATR_SchedSelect);

	if (is_jattr_set(pjob, JOB_ATR_exec_vnode_deallocated))
		deallocated_execvnode = get_jattr_str(pjob, JOB_ATR_exec_vnode_deallocated);

	relnodes_input_init(&r_input);
	r_input.jobid = pjob->ji_qs.ji_jobid;
	r_input.execvnode = exec_vnode;
	r_input.exechost = exec_host;
	r_input.exechost2 = exec_host2;
	r_input.schedselect = schedselect;
	r_input.p_new_exec_vnode = &new_exec_vnode;
	r_input.p_new_exec_host[0] = &new_exec_host;
	r_input.p_new_exec_host[1] = &new_exec_host2;
	r_input.p_new_schedselect = &new_select;

	if (keep_select == NULL) {
		relnodes_input_vnodelist_init(&r_input_vnlist);
		r_input_vnlist.vnodelist = vnodelist;
		r_input_vnlist.deallocated_nodes_orig = deallocated_execvnode;
		r_input_vnlist.p_new_deallocated_execvnode = &new_deallocated_execvnode;

		rc = pbs_release_nodes_given_nodelist(&r_input, &r_input_vnlist, err_msg, err_msg_sz);
	} else {
		int select_str_sz = 0;
		relnodes_input_select_init(&r_input_keep_select);
		r_input_keep_select.select_str = get_ms_select_chunk(schedselect); /* has to be freed later */
		select_str_sz = strlen(r_input_keep_select.select_str) + 1;
		pbs_strcat(&r_input_keep_select.select_str, &select_str_sz, "+");
		pbs_strcat(&r_input_keep_select.select_str, &select_str_sz, keep_select);
		populate_mom_list(&succeeded_mom_list, exec_host2);
		r_input_keep_select.succeeded_mom_list = &succeeded_mom_list;

		rc = pbs_release_nodes_given_select(&r_input, &r_input_keep_select, err_msg, err_msg_sz);
		free(r_input_keep_select.select_str);
		reliable_job_node_free(&succeeded_mom_list);
	}

	if (rc != 0) {
		goto recreate_exec_vnode_exit;
	}

	if (new_exec_vnode && (new_exec_vnode[0] != '\0')) {

		if (strcmp(get_jattr_str(pjob, JOB_ATR_exec_vnode),
			   new_exec_vnode) == 0) {
			/* no change */

			if ((err_msg != NULL) && (err_msg_sz > 0)) {
				snprintf(err_msg, err_msg_sz, "node(s) requested to be released not part of the job: %s", vnodelist ? vnodelist : "");
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, err_msg);
			}
			goto recreate_exec_vnode_exit;
		}
		set_jattr_str_slim(pjob, JOB_ATR_exec_vnode_acct, get_jattr_str(pjob, JOB_ATR_exec_vnode), NULL);

		/* save original value which will be used later in the accounting end record */
		if ((is_jattr_set(pjob, JOB_ATR_exec_vnode_orig)) == 0) {
			set_jattr_str_slim(pjob, JOB_ATR_exec_vnode_orig,
					   get_jattr_str(pjob, JOB_ATR_exec_vnode), NULL);
		}

		if ((is_jattr_set(pjob, JOB_ATR_resource_acct)) != 0) {
			free_jattr(pjob, JOB_ATR_resource_acct);
			mark_jattr_not_set(pjob, JOB_ATR_resource_acct);
		}
		set_attr_with_attr(&job_attr_def[JOB_ATR_resource_acct], get_jattr(pjob, JOB_ATR_resource_acct), get_jattr(pjob, JOB_ATR_resource), INCR);
		set_jattr_str_slim(pjob, JOB_ATR_exec_vnode, new_exec_vnode, NULL);

		(void) update_resources_list(pjob, ATTR_l,
					     JOB_ATR_resource, new_exec_vnode, INCR, 0,
					     JOB_ATR_resource_orig);
	} else {
		log_err(-1, __func__, "new_exec_vnode is null or empty string");
		goto recreate_exec_vnode_exit;
	}

	if (!keep_select && new_deallocated_execvnode && *new_deallocated_execvnode) {
		set_jattr_str_slim(pjob, JOB_ATR_exec_vnode_deallocated, new_deallocated_execvnode, NULL);
	}

	if (new_exec_host && *new_exec_host) {

		set_jattr_str_slim(pjob, JOB_ATR_exec_host_acct, get_jattr_str(pjob, JOB_ATR_exec_host), NULL);

		/* save original value which will be used later in the accounting end record */
		if ((is_jattr_set(pjob, JOB_ATR_exec_host_orig)) == 0) {
			set_jattr_str_slim(pjob, JOB_ATR_exec_host_orig, get_jattr_str(pjob, JOB_ATR_exec_host), NULL);
		}

		set_jattr_str_slim(pjob, JOB_ATR_exec_host, new_exec_host, NULL);
	} else {
		log_err(-1, __func__, "new_exec_host is null or empty string");
		goto recreate_exec_vnode_exit;
	}

	if (new_exec_host2 && *new_exec_host2) {

		set_jattr_str_slim(pjob, JOB_ATR_exec_host2, new_exec_host2, NULL);
	} else {
		log_err(-1, __func__, "new_exec_host2 is null or empty string");
		goto recreate_exec_vnode_exit;
	}

	if (new_select && *new_select) {
		prdefsl = &svr_resc_def[RESC_SELECT];
		/* re-generate "select" resource */
		if (prdefsl != NULL) {
			presc = find_resc_entry(get_jattr(pjob, JOB_ATR_resource), prdefsl);
			if (presc == NULL)
				presc = add_resource_entry(get_jattr(pjob, JOB_ATR_resource), prdefsl);
			if (presc != NULL) {
				(void) prdefsl->rs_decode(
					&presc->rs_value, NULL, "select", new_select);
			}
		}
		/* re-generate "schedselect" attribute */

		if (is_jattr_set(pjob, JOB_ATR_SchedSelect)) {
			/* Save current SchedSelect value if not */
			/* already saved in *_orig */
			if (!is_jattr_set(pjob, JOB_ATR_SchedSelect_orig))
				set_jattr_str_slim(pjob, JOB_ATR_SchedSelect_orig, get_jattr_str(pjob, JOB_ATR_SchedSelect), NULL);
		}
		set_jattr_str_slim(pjob, JOB_ATR_SchedSelect, new_select, NULL);
		/* re-generate nodect */
		set_chunk_sum(get_jattr(pjob, JOB_ATR_SchedSelect), get_jattr(pjob, JOB_ATR_resource));

	} else {
		log_err(-1, __func__, "new_select is null or empty string");
		goto recreate_exec_vnode_exit;
	}
recreate_exec_vnode_exit:
	free(new_exec_vnode);
	free(new_exec_host);
	free(new_exec_host2);
	free(new_select);
	free(new_deallocated_execvnode);

	return (rc);
}

/**
 * @brief
 *  action_max_run_subjobs This is action function for max_run_subjobs attribute.
 *			   It verifies that the attribute is being set only on array jobs.
 *
 * @param[in]	pattr	-	attribute structure
 * @param[in]	pobject	-	job object
 * @param[in]	actmode	-	action mode
 */
int
action_max_run_subjobs(attribute *pattr, void *pobject, int actmode)
{
	job *pjob = (job *) pobject;
	int jtype;

	if (pjob == NULL)
		return PBSE_INTERNAL;

	jtype = is_job_array(pjob->ji_qs.ji_jobid);
	if (jtype != IS_ARRAY_ArrayJob)
		return PBSE_NOTARRAY_ATTR;

	return PBSE_NONE;
}
