/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 *
 * @brief
 * 		miscellaneous server functions
 *
 */
#include <pbs_config.h> /* the master config generated by configure */

#ifdef PYTHON
#include "pbs_python_private.h"
#endif

#include "portability.h"
#include <assert.h>
#include <sys/types.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include "server_limits.h"
#include "list_link.h"
#include "log.h"
#include "attribute.h"
#include "resource.h"
#include "job.h"
#include "reservation.h"
#include "queue.h"
#include "server.h"
#include "pbs_error.h"
#include "sched_cmds.h"
#include "ticket.h"
#include "pbs_nodes.h"
#include "tpp.h"
#include "pbs_license.h"
#include "pbs_share.h"
#include "pbs_entlim.h"
#include "work_task.h"
#include "acct.h"
#include "provision.h"
#include "hook.h"
#include "net_connect.h"
#include "libpbs.h"
#include "batch_request.h"
#include "svrfunc.h"
#include "pbs_db.h"
#include "libutil.h"
#include "pbs_ecl.h"
#include "pbs_sched.h"
#include "liblicense.h"

extern struct python_interpreter_data svr_interp_data;
extern pbs_list_head svr_runjob_hooks;

extern time_t time_now;
extern char *resc_in_err;
extern char *msg_daemonname;
extern char server_name[];

extern pbs_list_head svr_allconns;

#define ERR_MSG_SIZE 256
#define MAXNLINE 2048
#define SERVER_ID "1"

/*
 * application provisioning returns success status as 1
 */
#define APP_PROV_SUCCESS 1

extern char *path_hooks_workdir;
extern char *path_priv;

char *path_prov_track;
int max_concurrent_prov = PBS_MAX_CONCURRENT_PROV;
int provision_timeout;

/*
 * the top level list of all vnodes queued for provisioning
 */
pbs_list_head prov_allvnodes;

static int is_runnable(job *, struct prov_vnode_info *);
extern void set_srv_prov_attributes();
static void del_prov_vnode_entry(job *);
extern int resize_prov_table(int);
static void prov_startjob(struct work_task *ptask);
extern enum failover_state are_we_primary(void);

/*
 * Added for History jobs.
 */
extern void svr_clean_job_history(struct work_task *);
long svr_history_enable = 0;			 /* disable by default */
long svr_history_duration = SVR_JOBHIST_DEFAULT; /* default 2 weeks */
/* Added for Trillion Jobid*/
long long svr_max_job_sequence_id = SVR_MAX_JOB_SEQ_NUM_DEFAULT; /* default max job id 9999999 */

/*
 * Added for Node_fail_requeue
 */
long node_fail_requeue = PBS_NODE_FAIL_REQUEUE_DEFAULT; /* default value for node_fail_requeue 310 */

/*
 * Added for jobscript_max_size
 */
struct attribute attr_jobscript_max_size; /* to store default size value for jobscript_max_size */

extern int do_sync_mom_hookfiles;
extern int sync_mom_hookfiles_replies_pending;

/*
 * Added for licensing
 */
extern struct work_task *init_licensing_task;
extern struct work_task *get_more_licenses_task;
extern struct work_task *licenses_linger_time_task;
extern void get_more_licenses(struct work_task *ptask);
extern void return_lingering_licenses(struct work_task *ptask);

/*
 * Miscellaneous server functions
 */
extern void db_to_svr_svr(struct server *ps, pbs_db_svr_info_t *pdbsvr);
#ifdef NAS /* localmod 005 */
extern int write_single_node_state(struct pbsnode *np);
#endif /* localmod 005 */

char primary_host[PBS_MAXHOSTNAME + 1]; /* host_name of primary */

/*
 * the following array of strings is used in decoding/encoding the server state
 */
static char *svr_idle = "Idle";
static char *svr_sched = "Scheduling";
static char *svr_state_names[] = {
	"",		     /* SV_STATE_DOWN */
	"",		     /* SV_STATE_INIT */
	"Hot_Start",	     /* SV_STATE_HOT  */
	"Active",	     /* SV_STATE_RUN  */
	"Terminating_Delay", /* SV_STATE_SHUTDEL */
	"Terminating",	     /* SV_STATE_SHUTIMM */
	"Terminating"	     /* SV_STATE_SHUTSIG */
};

/**
 * @brief
 * 		encode_svrstate - encode the current server state from the internal
 *		integer to a state name string.
 *
 * @param[in]	pattr	-	ptr to attribute
 * @param[in,out]	phead	-	head of attrlist list
 * @param[in]	atname	-	attribute name
 * @param[in]	rsname	-	resource name
 * @param[in]	mode	-	encode mode
 * @param[out]	rtnl	-	RETURN: ptr to svrattrl
 *
 * @return	int
 * @retval	0	: don't bother to encode it
 * @retval	1	: encoded.
 */

int
encode_svrstate(const attribute *pattr, pbs_list_head *phead, char *atname, char *rsname, int mode, svrattrl **rtnl)
{
	svrattrl *pal;
	char *psname;

	if (!pattr)
		return (-1);
	if ((mode == ATR_ENCODE_SAVE) ||
	    (pattr->at_val.at_long <= SV_STATE_DOWN) ||
	    (pattr->at_val.at_long > SV_STATE_SHUTSIG))
		return (0); /* don't bother to encode it */

	psname = svr_state_names[pattr->at_val.at_long];
	if (pattr->at_val.at_long == SV_STATE_RUN) {
		if (get_sattr_long(SVR_ATR_scheduling) == 0)
			psname = svr_idle;
		else if (dflt_scheduler && dflt_scheduler->sc_cycle_started == 1)
			psname = svr_sched;
	}

	pal = attrlist_create(atname, rsname, strlen(psname) + 1);
	if (pal == NULL)
		return (-1);
	(void) strcpy(pal->al_value, psname);
	pal->al_flags = pattr->at_flags;
	append_link(phead, &pal->al_link, pal);
	if (rtnl)
		*rtnl = pal;
	return (1);
}

/**
 * @brief
 * 		set_resc_assigned - updates server and/or queue resources_assigned
 *		attribute depending on to what kind of object the first argument
 *		points and possibly on what value of "state" the object has
 *
 * @param[in,out]	pobj	-	pointer to reservation or object based on the type
 * @param[in]	objtype	-	0=job, 1=reservation
 * @param[in]	op	-	operation to be performed.
 *
 */
void
set_resc_assigned(void *pobj, int objtype, enum batch_op op)
{
	resc_resv *presv = NULL;
	resource_def *rscdef;
	job *pjob = NULL;
	resource *pr = NULL;
	resource *rescp = NULL;
	attribute *queru = NULL;
	attribute *sysru = NULL;

	/*First part of this lengthy function figures out which
	 *"resources_assigned" lists need to get updated.  Most of
	 *the time it's two lists that will get updated, but it can
	 *be only one (or even none, if the resources have already
	 *been accounted earlier) if for example we have a job belonging
	 *to a reservation and the job is told to run or the job exits
	 */

	if (!objtype) {
		pjob = (job *) pobj;

		if ((pjob->ji_qhdr == 0) ||
		    (pjob->ji_qhdr->qu_qs.qu_type != QTYPE_Execution))
			return;

		if (op == INCR) {
			if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_RescAssn)
				return; /* already added in */
			pjob->ji_qs.ji_svrflags |= JOB_SVFLG_RescAssn;
		} else if (op == DECR) {
			if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_RescAssn) == 0)
				return; /* not currently included */
			pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_RescAssn;
		} else {
			return; /* invalid op */
		}

		rescp = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resource));
		if ((check_job_substate(pjob, JOB_SUBSTATE_SUSPEND)) ||
		    (check_job_substate(pjob, JOB_SUBSTATE_SCHSUSP))) {
			/* If resources_released attribute is not set for this suspended job then use release all
			 * resources assigned to the job */
			if ((is_jattr_set(pjob, JOB_ATR_resc_released)) == 0)
				rescp = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resource));
			else {
				/* Use resource_released_list for updating queue/server resources,
				 * If resource_released_list is not present then create it by
				 * using resources_released attribute.
				 */
				if (is_jattr_set(pjob, JOB_ATR_resc_released_list))
					rescp = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resc_released_list));
				else {
					if (update_resources_rel(pjob, get_jattr(pjob, JOB_ATR_resc_released), INCR) != 0)
						rescp = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resource));
					else
						rescp = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resc_released_list));
				}
			}
		} else {
			/* If job is not suspended then just release all resources assigned to the job */
			rescp = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resource));
			if (is_jattr_set(pjob, JOB_ATR_resc_released_list))
				rescp = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resc_released_list));
		}
		sysru = get_sattr(SVR_ATR_resource_assn);
		queru = get_qattr(pjob->ji_qhdr, QE_ATR_ResourceAssn);

		if (pjob->ji_myResv &&
		    (pjob->ji_myResv->ri_qs.ri_state == RESV_RUNNING ||
		     pjob->ji_myResv->ri_qs.ri_state == RESV_DELETED ||
		     pjob->ji_myResv->ri_qs.ri_state == RESV_BEING_DELETED ||
		     pjob->ji_myResv->ri_qs.ri_state == RESV_FINISHED)) {

			/*for jobs running under a reservation, server's
			 *"resources_assigned" is updated when reservation
			 *itself begins running or is terminated.  So don't touch
			 *the server's resources_assigned
			 */
			sysru = NULL;
		}
	} else if (objtype == 1) {

		presv = (resc_resv *) pobj;
		queru = NULL;
		sysru = NULL;
		rescp = (resource *) GET_NEXT(get_rattr_list(presv, RESV_ATR_resource));
		if (presv->ri_parent != NULL &&
		    (presv->ri_parent->ri_qs.ri_state == RESV_RUNNING ||
		     presv->ri_parent->ri_qs.ri_state == RESV_DELETED ||
		     presv->ri_parent->ri_qs.ri_state == RESV_BEING_DELETED ||
		     presv->ri_parent->ri_qs.ri_state == RESV_FINISHED)) {
			/*if the reservation has a parent (as reservation jobs can)
			 *the parent's "resources_assigned" list is the relevant list
			 *to modify
			 *Remark: The -server's- "resources_assigned" updates when the
			 *parent starts running or is terminated
			 */
			sysru = get_qattr(presv->ri_parent->ri_qp, QE_ATR_ResourceAssn);
		} else if (presv->ri_parent == NULL &&
			   (presv->ri_qs.ri_state == RESV_RUNNING ||
			    presv->ri_qs.ri_state == RESV_DELETED ||
			    presv->ri_qs.ri_state == RESV_BEING_DELETED ||
			    presv->ri_qs.ri_state == RESV_FINISHED)) {
			/*when reservation object has no parent reservation, the server's
			 *"resources_asigned" list is the one that's relevant in this case.
			 *if the reservation object is that of a "reservation job",
			 *the job's queue needs to have its "resources_assigned" list
			 *modified.  Otherwise the "queru" should be set NULL
			 */

			sysru = get_sattr(SVR_ATR_resource_assn);
		}
	}

	/*
	 *for each resource in the job (or reservation's or reservation-job's)
	 *list, check in the definition for that resource to see if the "RASSN"
	 *flag is turned on.  If the flag is set, modify the appropriate
	 *"resources_assigned" lists ("queue", "sys" or both) to account for
	 *the amount of the resources being consumed or relinquished by the object.
	 *
	 *Note: if we aren't supposed to be updating the server's or the queue's
	 *	"resources_assigned" the pointers "sysru"/"queru" should be NULL
	 */
	while (rescp) {
		rscdef = rescp->rs_defin;

		/* if resource usage is to be tracked */
		if ((rscdef->rs_flags & ATR_DFLAG_RASSN) &&
		    (is_attr_set(&rescp->rs_value))) {

			/* update system attribute of resources assigned */

			if (sysru) {
				pr = find_resc_entry(sysru, rscdef);
				if (pr == NULL) {
					pr = add_resource_entry(sysru, rscdef);
					if (pr == NULL)
						return;
				}
				rscdef->rs_set(&pr->rs_value, &rescp->rs_value, op);
				sysru->at_flags |= ATR_MOD_MCACHE;
			}

			/* update queue attribute of resources assigned */

			if (queru) {
				pr = find_resc_entry(queru, rscdef);
				if (pr == NULL) {
					pr = add_resource_entry(queru, rscdef);
					if (pr == NULL)
						return;
				}
				rscdef->rs_set(&pr->rs_value, &rescp->rs_value, op);
				queru->at_flags |= ATR_MOD_MCACHE;
			}
		}
		rescp = (resource *) GET_NEXT(rescp->rs_link);
	}

	/* if a job, update resource_assigned at the node level */
	if (objtype == 1)
		update_node_rassn(get_rattr(presv, RESV_ATR_resv_nodes), op);
	else if ((objtype == 0) && (pjob->ji_myResv == NULL)) {
		if (is_jattr_set(pjob, JOB_ATR_resc_released))
			/* This is just the normal case when job was not suspended but trying to run| end */
			update_job_node_rassn(pjob, get_jattr(pjob, JOB_ATR_resc_released), op);
		else
			/* updating all resources from exec vnode attribute */
			update_job_node_rassn(pjob, get_jattr(pjob, JOB_ATR_exec_vnode), op);
		if (is_jattr_set(pjob, JOB_ATR_exec_vnode_deallocated)) {
			update_job_node_rassn(pjob, get_jattr(pjob, JOB_ATR_exec_vnode_deallocated), op);
		}
	}
}

/**
 * @brief
 * 		ck_chkpnt - check validity of job checkpoint attribute value
 *
 * @param[in]	pattr	-	checkpoint attribute
 * @param[in]	pobject	-	job object
 * @param[in]	mode	-	action mode
 *
 * @return	int
 * @retval	0	: success
 * @retval	!0	: PBS Error Code
 */
int
ck_chkpnt(attribute *pattr, void *pobject, int mode)
{
	char *val;

	val = pattr->at_val.at_str;
	if (val == NULL)
		return (0);

	if ((*val == 'n') || (*val == 's') || (*val == 'u')) {
		if (*(val + 1) != '\0')
			return (PBSE_BADATVAL);
	} else if (*val == 'c') {
		val++;
		if (*val != '\0') {
			if (*val++ != '=')
				return (PBSE_BADATVAL);
			if (atoi(val) <= 0)
				return (PBSE_BADATVAL);
		}
	} else if (*val == 'w') {
		val++;
		if (*val != '\0') {
			if (*val++ != '=')
				return (PBSE_BADATVAL);
			if (atoi(val) <= 0)
				return (PBSE_BADATVAL);
		}
	} else
		return (PBSE_BADATVAL);

	/* If the checkpoint attribute is being altered, then check    */
	/* against the queue's Checkpoint_min attribute as when queued */
	if (mode == ATR_ACTION_ALTER)
		eval_chkpnt((job *) pobject, get_qattr(((job *) pobject)->ji_qhdr, QE_ATR_ChkptMin));
	return (0);
}

/**
 * @brief
 *      keepfiles_action - check validity of job keepfiles attribute value
 *
 * @param[in]   pattr   -   keepfiles attribute
 * @param[in]   pobject -   job object
 * @param[in]   mode    -   action mode
 *
 * @return  int
 * @retval  0   : success
 * @retval  !0  : PBS Error Code
 */
int
keepfiles_action(attribute *pattr, void *pobject, int mode)
{
	if ((mode != ATR_ACTION_ALTER) && (mode != ATR_ACTION_NEW))
		return PBSE_NONE;
	if (pobject && check_job_state((job *) pobject, JOB_STATE_LTR_RUNNING))
		return PBSE_MODATRRUN;
	return verify_keepfiles_common(pattr->at_val.at_str);
}

/**
 * @brief
 *      removefiles_action - check validity of job removefiles attribute value
 *
 * @param[in]   pattr   -   remove attribute
 * @param[in]   pobject -   job object
 * @param[in]   mode    -   action mode
 *
 * @return  int
 * @retval  0   : success
 * @retval  !0  : PBS Error Code
 */
int
removefiles_action(attribute *pattr, void *pobject, int mode)
{
	if ((mode != ATR_ACTION_ALTER) && (mode != ATR_ACTION_NEW))
		return PBSE_NONE;
	if (pobject && check_job_state((job *) pobject, JOB_STATE_LTR_RUNNING))
		return PBSE_MODATRRUN;
	return verify_removefiles_common(pattr->at_val.at_str);
}

/**
 * @brief
 * 		cred_name_okay - action routine for the "required_cred" attribute.
 *		Check to make sure the cred name is okay.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobj	-	not used
 * @param[in]	actmode	-	action mode
 *
 * @return	int
 * @retval	zero	: success
 * @retval	nonzero	: failure
 */

int
cred_name_okay(attribute *pattr, void *pobj, int actmode)
{
	static const char *cred_list[] = {
		PBS_CREDNAME_AES,
		NULL /* must be last */
	};

	if (actmode == ATR_ACTION_ALTER) {
		char *val = pattr->at_val.at_str;
		int i;

		for (i = 0; cred_list[i]; i++) {
			if (strcmp(cred_list[i], val) == 0)
				return PBSE_NONE;
		}
		return PBSE_BADATVAL;
	}
	return PBSE_NONE;
}

/**
 * @brief
 * 		action_resv_retry_time - action routine for the server's
 * 		"reserve_retry_time" attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobj	-	not used
 * @param[in]	actmode	-	action mode
 *
 * @return	int
 * @retval	zero	: success
 * @retval	nonzero	: failure
 */
int
action_reserve_retry_time(attribute *pattr, void *pobj, int actmode)
{
	if (actmode == ATR_ACTION_ALTER ||
	    actmode == ATR_ACTION_RECOV) {

		if (pattr->at_val.at_long <= 0)
			return PBSE_BADATVAL;
		ATR_UNSET(get_sattr(SVR_ATR_resv_retry_init));
		resv_retry_time = pattr->at_val.at_long;
	}
	return PBSE_NONE;
}

/**
 * @brief
 * 		action_resv_retry_init - action routine for the server's
 * 		"reserve_retry_init" attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobj	-	not used
 * @param[in]	actmode	-	action mode
 *
 * @return	int
 * @retval	zero	: success
 * @retval	nonzero	: failure
 */
int
action_reserve_retry_init(attribute *pattr, void *pobj, int actmode)
{
	if (actmode == ATR_ACTION_ALTER ||
	    actmode == ATR_ACTION_RECOV) {

		if (pattr->at_val.at_long <= 0)
			return PBSE_BADATVAL;
		set_sattr_l_slim(SVR_ATR_resv_retry_time, pattr->at_val.at_long, SET);

		resv_retry_time = pattr->at_val.at_long;
	}
	return PBSE_NONE;
}

/**
 * @brief
 * 		dummy action function for rpp_retry
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobj	-	not used
 * @param[in]	actmode	-	action mode
 *
 * @return	int
 * @retval	zero	: success
 * @retval	nonzero	: failure
 */
int
set_rpp_retry(attribute *pattr, void *pobj, int actmode)
{
	log_err(-1, __func__, "rpp_retry is deprecated. This functionality is now automatic without needing this attribute");
	return PBSE_NONE;
}

/**
 * @brief
 * 		dummy action function for rpp_highwater
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobj	-	not used
 * @param[in]	actmode	-	action mode
 *
 * @return	int
 * @retval	zero	: success
 * @retval	nonzero	: failure
 */
int
set_rpp_highwater(attribute *pattr, void *pobj, int actmode)
{
	log_err(-1, __func__, "rpp_highwater is deprecated. This functionality is now automatic without needing this attribute");
	return PBSE_NONE;
}

/**
 * @brief
 *		is_valid_resource - action function to make sure attribute value is
 *			        a valid resource of type string
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobj	-	not used
 * @param[in]	actmode	-	action mode
 *
 * @return	int
 * @retval	zero	: success
 * @retval	nonzero	: failure
 */
int
is_valid_resource(attribute *pattr, void *pobject, int actmode)
{
	int i;
	struct resource_def *pres;

	if (actmode == ATR_ACTION_FREE)
		return (PBSE_NONE);

	if (is_attr_set(pattr) == 0)
		return (PBSE_NONE);

	for (i = 0; i < pattr->at_val.at_arst->as_usedptr; ++i) {
		pres = find_resc_def(svr_resc_def, pattr->at_val.at_arst->as_string[i]);
		if (pres == NULL)
			return PBSE_UNKRESC;

		if ((pres->rs_type != ATR_TYPE_STR) &&
		    (pres->rs_type != ATR_TYPE_ARST))
			return PBSE_RESCNOTSTR;
	}

	return PBSE_NONE;
}

/**
 * @brief
 * 		action_svr_iteration - the "action" routine for the server
 *		scheduler_iteration attribute
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	0	: success
 * @retval	!0	: PBSE Error Code
 */
int
action_svr_iteration(attribute *pattr, void *pobj, int mode)
{
	/* set this attribute on main scheduler */
	if (dflt_scheduler) {
		if (mode == ATR_ACTION_NEW || mode == ATR_ACTION_ALTER || mode == ATR_ACTION_RECOV) {
			set_sched_attr_l_slim(dflt_scheduler, SCHED_ATR_schediteration, pattr->at_val.at_long, SET);
			sched_save_db(dflt_scheduler);
		}
	}
	return PBSE_NONE;
}

/**
 * @brief
 * 		deflt_chunk_action - the "action" routine for the queue and server
 *		default_chunk attribute
 * @par
 *		Builds an array of key_value_pair structures for the defaults
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	0	: success
 * @retval	!0	: PBSE Error Code
 */
int
deflt_chunk_action(attribute *pattr, void *pobj, int mode)
{
	int i;
	int j;
	int nelem;
	int *nkv;
	int old_perm;
	struct key_value_pair **pkvp;
	resource *presc;
	pbs_list_head head;
	svrattrl *psvratrl;
	int rc;
	extern int resc_access_perm;

	CLEAR_HEAD(head);

	if (pobj == (void *) &server) {
		pkvp = &server.sv_seldft;
		nkv = &server.sv_nseldft;
	} else {
		pkvp = &((pbs_queue *) pobj)->qu_seldft;
		nkv = &((pbs_queue *) pobj)->qu_nseldft;
	}

	/* free any existing key_value_pair structure */
	if (*pkvp) {
		for (i = 0; i < *nkv; ++i) {
			free(((*pkvp) + i)->kv_keyw);
			free(((*pkvp) + i)->kv_val);
		}
		free(*pkvp);
		*pkvp = NULL;
	}
	*nkv = 0;

	if (((is_attr_set(pattr)) == 0) ||
	    (mode == ATR_ACTION_FREE))
		return 0;

	/* validate and count the number of pairs in the default attribute */
	nelem = 0;

	presc = GET_NEXT(pattr->at_val.at_list);
	while (presc) {
		if ((presc->rs_defin->rs_flags & ATR_DFLAG_CVTSLT) == 0) {
			if ((resc_in_err = strdup(presc->rs_defin->rs_name)) == NULL)
				return PBSE_SYSTEM;
			return PBSE_INVALJOBRESC;
		}
		nelem++;
		presc = GET_NEXT(presc->rs_link);
	}

	/* encode the default resources so we can get the values */
	/* need to save & restore the current value incase we are recovering */
	old_perm = resc_access_perm;
	resc_access_perm = ATR_DFLAG_RDACC;
	rc = encode_resc(pattr, &head, ATTR_DefaultChunk, NULL, ATR_ENCODE_CLIENT, NULL);
	resc_access_perm = old_perm;
	if (rc < 0) {
		return PBSE_SYSTEM;
	}

	*pkvp = (struct key_value_pair *) malloc((nelem + 1) * sizeof(struct key_value_pair));
	if (*pkvp == NULL) {
		free_attrlist(&head);
		return PBSE_SYSTEM;
	}

	/* now set the name and value words */
	i = 0;
	psvratrl = GET_NEXT(head);
	while (psvratrl && i < nelem) {
		if ((((*pkvp) + i)->kv_keyw = strdup(psvratrl->al_resc)) == NULL) {
			free_attrlist(&head);
			if (*pkvp) {
				for (j = 0; j < i; ++j) {
					free(((*pkvp) + j)->kv_keyw);
					free(((*pkvp) + j)->kv_val);
				}
				free(*pkvp);
				*pkvp = NULL;
			}
			return PBSE_SYSTEM;
		}
		if ((((*pkvp) + i)->kv_val = strdup(psvratrl->al_value)) == NULL) {
			free_attrlist(&head);
			if (*pkvp) {
				for (j = 0; j < i; ++j) {
					free(((*pkvp) + j)->kv_keyw);
					free(((*pkvp) + j)->kv_val);
				}
				free(*pkvp);
				*pkvp = NULL;
			}
			return PBSE_SYSTEM;
		}
		++i;
		psvratrl = GET_NEXT(psvratrl->al_link);
	}
	free_attrlist(&head); /* free svrattrl list created by the encode */

	*nkv = i;

	return 0;
}

/**
 * @brief
 *	set_license_location - action function for the pbs_license_info
 * 				server attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 */
int
set_license_location(attribute *pattr, void *pobject, int actmode)
{
	if (actmode == ATR_ACTION_FREE)
		return (PBSE_NONE);

	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {
		int delay = 5;

		if (pbs_licensing_location)
			free(pbs_licensing_location);

		pbs_licensing_location = strdup(pattr->at_val.at_str ? pattr->at_val.at_str : "");
		if (pbs_licensing_location == NULL) {
			log_err(errno, __func__, "warning: strdup failed!");
			return PBSE_SYSTEM;
		}

		if (actmode == ATR_ACTION_RECOV)
			delay = 0;

		init_licensing_task = set_task(WORK_Timed, time_now + delay, init_licensing, NULL);
	}

	return (PBSE_NONE);
}

/**
 * @brief
 *		unset_license_location - set the floating licensing
 * 				server attribute to default value.
 *
 */
void
unset_license_location(void)
{

	if (pbs_licensing_location) {

		if (pbs_licensing_location[0] != '\0') {
			lic_close();
			unlicense_nodes();
			memset(&license_counts, 0, sizeof(license_counts));
		} else
			reset_license_counters(&license_counts);

		free(pbs_licensing_location);
		pbs_licensing_location = NULL;
	}
}

/*
 *
 * @brief
 *	Set node_fail_requeue attribute.
 *
 * @par Functionality:
 *	This function sets the node_fail_requeue server attribute.
 *	Since node_fail_requeue can be a negative value no check
 *	for < 0 is performed.
 *
 * @param[in]	pattr	-	ptr to attribute
 * @param[in]	pobject	-	pointer to some parent object.(required but unused here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE
 *
 */
int
set_node_fail_requeue(attribute *pattr, void *pobject, int actmode)
{
	if (actmode == ATR_ACTION_FREE)
		return (PBSE_NONE);

	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {

		node_fail_requeue = pattr->at_val.at_long;
		sprintf(log_buffer,
			"node_fail_requeue value changed to %ld",
			node_fail_requeue);
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
			  LOG_NOTICE, msg_daemonname, log_buffer);
	}

	return (PBSE_NONE);
}

/*
 *
 * @brief
 *	Unset node_fail_requeue attribute.
 *
 * @par Functionality:
 *	This function unsets the node_fail_requeue server attribute
 *	by reverting it back to it's default value.
 *
 * @param[in]	void
 *
 * @return	void
 *
 */
void
unset_node_fail_requeue(void)
{
	node_fail_requeue = PBS_NODE_FAIL_REQUEUE_DEFAULT;

	sprintf(log_buffer,
		"node_fail_requeue reverting back to default val %ld",
		node_fail_requeue);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
		  LOG_NOTICE, msg_daemonname, log_buffer);
}

/**
 * @brief
 *		set_license_min - action function for the pbs_license_min server
 *			  attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 */
int
set_license_min(attribute *pattr, void *pobject, int actmode)
{
	if (actmode == ATR_ACTION_FREE)
		return (PBSE_NONE);

	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {

		if ((pattr->at_val.at_long < 0) ||
		    (pattr->at_val.at_long > licensing_control.licenses_max)) {
			return (PBSE_LICENSE_MIN_BADVAL);
		}
		licensing_control.licenses_min = pattr->at_val.at_long;

		if (licensing_control.licenses_min > licensing_control.licenses_checked_out)
			if (get_more_licenses_task == NULL)
				get_more_licenses_task = set_task(WORK_Timed, time(NULL) + 2, get_more_licenses, NULL);
	}

	return (PBSE_NONE);
}

/**
 * @brief
 *		unset_license_min - set the the pbs_license_min server
 *			  attribute to default value.
 */
void
unset_license_min(void)
{
	licensing_control.licenses_min = PBS_MIN_LICENSING_LICENSES;

	sprintf(log_buffer,
		"pbs_license_min reverting back to default val %ld",
		licensing_control.licenses_min);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
		  LOG_NOTICE, msg_daemonname, log_buffer);
}

/**
 * @brief
 *		set_license_max - action function for the pbs_license_max server
 *			  attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 * @retval	PBSE_LICENSE_MAX_BADVAL	: wrong value for pbs_license_max attribute
 */
int
set_license_max(attribute *pattr, void *pobject, int actmode)
{
	if (actmode == ATR_ACTION_FREE)
		return (PBSE_NONE);

	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {

		if ((pattr->at_val.at_long < 0) ||
		    (pattr->at_val.at_long < licensing_control.licenses_min)) {
			return (PBSE_LICENSE_MAX_BADVAL);
		}
		licensing_control.licenses_max = pattr->at_val.at_long;

		if ((licensing_control.licenses_max < licensing_control.licenses_checked_out) ||
		    ((licensing_control.licenses_checked_out < licensing_control.licenses_total_needed) &&
		     (licensing_control.licenses_checked_out < licensing_control.licenses_max)))
			if (get_more_licenses_task == NULL)
				get_more_licenses_task = set_task(WORK_Timed, time(NULL) + 2, get_more_licenses, NULL);
	}

	return (PBSE_NONE);
}

/**
 * @brief
 *		unset_license_max - set pbs_license_max server
 *			  attribute to default value.
 */
void
unset_license_max(void)
{
	licensing_control.licenses_max = PBS_MAX_LICENSING_LICENSES;

	sprintf(log_buffer,
		"pbs_license_max reverting back to default val %ld",
		licensing_control.licenses_max);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
		  LOG_NOTICE, msg_daemonname, log_buffer);
}

/**
 * @brief
 *		set_license_linger - action function for the pbs_license_linger server
 *			  attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 * @retval	PBSE_LICENSE_LINGER_BADVAL	: wrong value for pbs_license_linger attribute
 */
int
set_license_linger(attribute *pattr, void *pobject, int actmode)
{

	if (actmode == ATR_ACTION_FREE)
		return (PBSE_NONE);

	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {

		if ((pattr->at_val.at_long <= 0)) {
			return (PBSE_LICENSE_LINGER_BADVAL);
		}
		licensing_control.licenses_linger_time = pattr->at_val.at_long;

		if (licenses_linger_time_task)
			delete_task(licenses_linger_time_task);

		licenses_linger_time_task = set_task(WORK_Timed,
						     licensing_control.licenses_checkout_time + licensing_control.licenses_linger_time,
						     return_lingering_licenses, NULL);
	}

	return (PBSE_NONE);
}

/**
 * @brief
 *		unset_license_linger - set pbs_license_linger server
 *			  attribute to default value.
 */
void
unset_license_linger(void)
{
	licensing_control.licenses_linger_time = PBS_LIC_LINGER_TIME;

	sprintf(log_buffer,
		"pbs_license_linger_time reverting back to default val %ld",
		licensing_control.licenses_linger_time);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
		  LOG_NOTICE, msg_daemonname, log_buffer);
}

/**
 * @brief
 *		Function name: unset_job_history_enable
 * @par
 *		Description: It is called when the job_history_enable attr will be
 *		     unset through "qmgr".
 * @par
 *		Purpose: If the job_history_enable server attribute is unset, then
 *		 set the global svr_history_enable to '0' and purge all the
 *		 the history jobs available in the server immediately. Also
 *		 will be called if job_history_enable set to 0.
 * @par
 *		Input : None
 *		Output: None
 */
void
unset_job_history_enable(void)
{
	job *pjob = NULL;
	job *nxpjob = NULL;

	sprintf(log_buffer, "job_history_enable has been unset.");
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
		  LOG_NOTICE, msg_daemonname, log_buffer);

	/*
	 * Reset the SERVER level global switch for job history
	 * feature i.e. svr_history_enable. It will not keep the
	 * job history information anymore.
	 */
	svr_history_enable = 0;

	/*
	 * Find all the history jobs (jobs with state JOB_STATE_LTR_MOVED
	 * and JOB_STATE_LTR_FINISHED) in the server and purge them right
	 * now as job_history_enable has been UNSET OR SET to FALSE.
	 */
	pjob = (job *) GET_NEXT(svr_alljobs);
	while (pjob != NULL) {
		/* save the next */
		nxpjob = (job *) GET_NEXT(pjob->ji_alljobs);

		if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) ||
		    (check_job_state(pjob, JOB_STATE_LTR_FINISHED)) ||
		    (check_job_state(pjob, JOB_STATE_LTR_EXPIRED))) {
			job_purge(pjob);
			pjob = NULL;
		}
		/* restore the next and continue */
		pjob = nxpjob;
	}
}

/**
 * @brief
 *		set_job_history_enable - action function for the job_history_enable server
 *			  attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 */
int
set_job_history_enable(attribute *pattr, void *pobject, int actmode)
{
	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {

		svr_history_enable = pattr->at_val.at_long;
		if (svr_history_enable) {
			(void) set_task(WORK_Timed,
					(long) (time_now + SVR_CLEAN_JOBHIST_TM),
					svr_clean_job_history, 0);
		} else {
			unset_job_history_enable();
		}
	}
	return (PBSE_NONE);
}

/**
 * @brief
 *		set_log_events - action function for the log_events
 *			  server attribute, also sets the tpp logmask
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 */
int
set_log_events(attribute *pattr, void *pobject, int actmode)
{
	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {
		tpp_set_logmask(pattr->at_val.at_long);
	}
	return (PBSE_NONE);
}

/**
 * @brief
 *		set_job_history_duration - action function for the job_history_duration
 *			  server attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 * @retval	PBSE_BADATVAL	: Invalid attribute value
 */
int
set_job_history_duration(attribute *pattr, void *pobject, int actmode)
{

	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {

		if ((pattr->at_val.at_long < 0))
			return (PBSE_BADATVAL);

		svr_history_duration = pattr->at_val.at_long;
		sprintf(log_buffer, "svr_history_duration set to val %ld",
			svr_history_duration);
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
			  LOG_NOTICE, msg_daemonname, log_buffer);
	}
	return (PBSE_NONE);
}

/**
 * @brief
 *		unset_job_history_duration - set job_history_duration server
 *			  attribute to default value.
 */
void
unset_job_history_duration(void)
{
	svr_history_duration = SVR_JOBHIST_DEFAULT;

	sprintf(log_buffer,
		"svr_history_duration reverting back to default val %ld",
		svr_history_duration);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
		  LOG_NOTICE, msg_daemonname, log_buffer);
}

/**
 * @brief
 *	set_max_job_sequence_id - action function for the max_job_sequence_id server
 *				  attribute.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 */
int
set_max_job_sequence_id(attribute *pattr, void *pobject, int actmode)
{

	if ((actmode == ATR_ACTION_ALTER) ||
	    (actmode == ATR_ACTION_RECOV)) {

		if ((pattr->at_val.at_ll < SVR_MAX_JOB_SEQ_NUM_DEFAULT) ||
		    (pattr->at_val.at_ll > PBS_SEQNUMTOP)) {
			return (PBSE_INVALID_MAX_JOB_SEQUENCE_ID);
		}
		svr_max_job_sequence_id = pattr->at_val.at_ll;
		/* If the max_job_sequence_id is set to something smaller than current job id,
		 * then it will wrap to 0(ZERO)*/
		if (server.sv_qs.sv_jobidnumber > svr_max_job_sequence_id) {
			(void) reset_svr_sequence_window(); /* wrap it*/
			sprintf(log_buffer, "svr_max_job_sequence_id wrapped to 0");
			log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
				  LOG_NOTICE, msg_daemonname, log_buffer);
		} else {
			sprintf(log_buffer, "svr_max_job_sequence_id set to val %lld",
				svr_max_job_sequence_id);
			log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
				  LOG_NOTICE, msg_daemonname, log_buffer);
		}
	}
	return (PBSE_NONE);
}

/**
 * @brief
 *	unset_max_job_sequence_id - set server attribute "max_job_sequence_id" to
 *				    default value.
 */
void
unset_max_job_sequence_id(void)
{
	svr_max_job_sequence_id = SVR_MAX_JOB_SEQ_NUM_DEFAULT;
	/* If the max_job_sequence_id is set to something smaller than current job id,
	 * then it will wrap to 0(ZERO)*/
	if (server.sv_qs.sv_jobidnumber >= svr_max_job_sequence_id) {
		(void) reset_svr_sequence_window(); /* wrap it*/
		sprintf(log_buffer, "svr_max_job_sequence_id wrapped to 0");
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
			  LOG_NOTICE, msg_daemonname, log_buffer);
	}
	sprintf(log_buffer,
		"svr_max_job_sequence_id reverting back to default val %lld",
		svr_max_job_sequence_id);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
		  LOG_NOTICE, msg_daemonname, log_buffer);
}

/**
 * @brief
 *		eligibletime_action - set/unset ATR_VFLAG_SET flag for
 *			      all jobs in server.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)(not used here)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 */
int
eligibletime_action(attribute *pattr, void *pobject, int actmode)
{
	job *pj;
	long accruetype;

	/* switching on eligible_time_enable. when switch happens,
	 * job's old accrue_type is not reliable
	 */
	if (pattr->at_val.at_long == 1) {

		pj = (job *) GET_NEXT(svr_alljobs);
		while (pj != NULL) {
			accruetype = determine_accruetype(pj);
			update_eligible_time(accruetype, pj);

			pj = (job *) GET_NEXT(pj->ji_alljobs);
		}

		/* if scheduling is true, need to run the scheduling cycle */
		/* so that, accrue type is determined for cases */
		if (get_sattr_long(SVR_ATR_scheduling))
			set_scheduler_flag(SCH_SCHEDULE_ETE_ON, NULL);
	}

	return 0;
}

/**
 * @brief
 *		decode_formula - decode the job sort formula from a secure file
 * @par
 *		returns value from decode_str
 *
 * @param[in]	patr	-	pointer to attribute structure
 * @param[in]	name	-	attribute name
 * @param[in]	rescn	-	resource name - unused here
 * @param[in]	val	-	attribute value
 *
 * @return	int
 * @retval	zero	: success
 * @retval	nonzero	: PBSE Error Code
 */
int
decode_formula(attribute *patr, char *name, char *rescn, char *val)
{
	FILE *fp;
	char pathbuf[MAXPATHLEN];
	char *formula_buf;
	int formula_buf_len = 1024;
	int rc;

	/* when we are coming up, we need to read from the server's database */
	if (get_sattr_long(SVR_ATR_State) == SV_STATE_INIT)
		return decode_str(patr, name, rescn, val);

	sprintf(pathbuf, "%s/%s", pbs_conf.pbs_home_path, FORMULA_ATTR_PATH);

	if ((fp = fopen(pathbuf, "r")) == NULL) {
		return PBSE_PERM;
	}

	formula_buf = malloc(formula_buf_len);
	if (formula_buf == NULL) {
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
			  LOG_ALERT, msg_daemonname,
			  "unable to decode formula, no memory");
		fclose(fp);
		remove(pathbuf);
		return PBSE_INTERNAL;
	}
	memset(formula_buf, 0, formula_buf_len);

	if (pbs_fgets(&formula_buf, &formula_buf_len, fp) == NULL) {
		fclose(fp);
		remove(pathbuf);
		free(formula_buf);
		return PBSE_INTERNAL;
	}

	fclose(fp);

	/* now that we have the data, the file may be removed */
	remove(pathbuf);

	/* remove the newline */
	formula_buf[strlen(formula_buf) - 1] = '\0';

	rc = decode_str(patr, name, rescn, formula_buf);
	free(formula_buf);
	return rc;
}

/*
 *  Following datum and functions are used to enforce the rule that the
 * entity-limits (entlims) attributes cannot be used if the old style
 * user/group/run limeits are in use and vice versa.
 *
 * The datum entlim_type_in_use is set to:
 *	 0 - when neither type has been set (yet)
 *	+1 - when newer "entlims" have been set
 *	-1 - when older style limits have been set.
 *
 * If the datum is 0, the first style limit of either style is allowed
 * and sets the datum accordingly.  If set to +1, then any additional new
 * style entlim is allowed without additional checks.  If set to -1, then
 * additionaly old style are allowed to be set.
 *
 * If the wrong type is being set,  then an exhustive search of the server
 * attributes and queue attributes of all queues are required to see if the
 * other style is in use.  This is needed because the datum cannot be reset to
 * zero if the last of a style is unset, the mechinism isn't in place to do so.
 *
 * There are two lists, one for server the other for queues, of the attributs
 * which must be checked.
 */

static int entlim_type_in_use = 0;

static int svr_oldstyle[] = {
	(int) SVR_ATR_max_running,
	(int) SVR_ATR_MaxUserRun,
	(int) SVR_ATR_MaxGrpRun,
	(int) SVR_ATR_MaxUserRes,
	(int) SVR_ATR_MaxGroupRes,
	(int) SVR_ATR_MaxUserRunSoft,
	(int) SVR_ATR_MaxGrpRunSoft,
	(int) SVR_ATR_MaxUserResSoft,
	(int) SVR_ATR_MaxGroupResSoft,
	-1};
static int svr_newstyle[] = {
	(int) SVR_ATR_max_run,
	(int) SVR_ATR_max_run_res,
	(int) SVR_ATR_max_run_soft,
	(int) SVR_ATR_max_run_res_soft,
	-1};
static int que_oldstyle[] = {
	(int) QA_ATR_MaxJobs,
	(int) QA_ATR_MaxRun,
	(int) QE_ATR_MaxUserRun,
	(int) QE_ATR_MaxGrpRun,
	(int) QE_ATR_MaxUserRes,
	(int) QE_ATR_MaxGroupRes,
	(int) QE_ATR_MaxUserRunSoft,
	(int) QE_ATR_MaxGrpRunSoft,
	(int) QE_ATR_MaxUserResSoft,
	(int) QE_ATR_MaxGroupResSoft,
	-1};
static int que_newstyle[] = {
	(int) QA_ATR_max_queued,
	(int) QA_ATR_queued_jobs_threshold,
	(int) QE_ATR_max_run,
	(int) QE_ATR_max_run_res,
	(int) QE_ATR_max_run_soft,
	(int) QE_ATR_max_run_res_soft,
	-1};

extern pbs_list_head svr_queues;
/**
 * @brief
 * 		is_attrs_in_list_set - for a list of certain attributes, is any of them
 * 		set in the parent objects array of attributes
 *
 * @param[in]	wlist	-	style of queue/server
 * @param[in]	attrs	-	pointer to attribute structure
 *
 *	Returns >=0 index of the first attribute found to be set,
 *		 -1 if none set
 */
static int
is_attrs_in_list_set(int *wlist, attribute *attrs)
{
	int i;

	for (i = 0; *(wlist + i) != -1; ++i) {
		if (((attrs + *(wlist + i))->at_flags & ATR_VFLAG_SET) != 0)
			return *(wlist + i);
	}
	return -1;
}

/**
 * @brief
 * 		log_mixed_limit_controls - log a message when the administrator attempts
 *		to mix the type of queue/run limits.
 *
 * @param[in]	pq	-	pointer to the queue
 * @param[in]	index	-	index of queue/server attribute definition structure
 * @param[in]	type	-	type of queue/run limits
 */
static void
log_mixed_limit_controls(pbs_queue *pq, int index, char *type)
{
	attribute_def *pdef;
	char *objname;

	if (pq) {
		objname = pq->qu_qs.qu_name;
		pdef = &que_attr_def[index];
	} else {
		objname = "Server";
		pdef = &svr_attr_def[index];
	}
	snprintf(log_buffer, LOG_BUF_SIZE - 1,
		 "%s style attribute \"%s\" already set in %s %s, cannot mix types",
		 type, pdef->at_name, pq ? "queue" : "", objname);
	log_buffer[LOG_BUF_SIZE - 1] = '\0';
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_ALERT,
		  msg_daemonname, log_buffer);
}

/**
 * @brief
 * 		action_entlim_chk - the at_action for the entity attribute
 *		Prevents old and new type controls from being used at same time
 *
 * @param[in]	pattr	-	pointer to attribute structure(not used here)
 * @param[in]	pobject -	pointer to some parent object.(not used here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)(not used here)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 * @retval	PBSE_MIXENTLIMS	: mixing old and new limit enformcement
 */
int
action_entlim_chk(attribute *pattr, void *pobject, int actmode)
{
	int i;
	pbs_queue *pq;

	/* first check if the new style limits cannot be used */
	/* due to a conflict with the old style.              */
	if (entlim_type_in_use == +1)
		return PBSE_NONE;
	else if (entlim_type_in_use == 0) {
		entlim_type_in_use = +1; /* show new stype in use */
		return PBSE_NONE;
	}

	/* flags says wrong (old) style in use, but need to double check */
	if ((i = is_attrs_in_list_set(svr_oldstyle, server.sv_attr)) != -1) {
		log_mixed_limit_controls(NULL, i, "old");
		return PBSE_MIXENTLIMS;
	}
	pq = (pbs_queue *) GET_NEXT(svr_queues);
	while (pq) {
		if ((i = is_attrs_in_list_set(que_oldstyle, pq->qu_attr)) != -1) {
			log_mixed_limit_controls(pq, i, "old");
			return PBSE_MIXENTLIMS;
		}
		pq = (pbs_queue *) GET_NEXT(pq->qu_link);
	}

	entlim_type_in_use = +1; /* show new stype in use */
	return PBSE_NONE;
}

/**
 * @brief
 * 		entlim_resum - Re-totals the entity usage, either count or resources
 *		for a specific entity limit attribute
 *
 * @param[in]	pwt	-	pointer to work task structure
 */

static void
entlim_resum(struct work_task *pwt)
{
	void *ctx;
	int is_resc;
	attribute *pattr;
	attribute *pattr2;
	char *key = NULL;
	svr_entlim_leaf_t *plf;
	job *pj;
	void *pobject;
	pbs_queue *pque;
	extern pbs_list_head svr_alljobs;

	pobject = pwt->wt_parm1; /* pointer to parent object */
	is_resc = pwt->wt_aux;	 /* 1=resource, 0-count */

	/* now determine if the parent object is a queue or is the Server */
	/* this tells us which list of jobs we need to walk.		  */
	if ((struct server *) pobject == &server) {
		/* server is the parent */
		pque = NULL;
		if (is_resc) {
			pattr = get_sattr(SVR_ATR_max_queued_res);
			pattr2 = get_sattr(SVR_ATR_queued_jobs_threshold_res);
		} else {
			pattr = get_sattr(SVR_ATR_max_queued);
			pattr2 = get_sattr(SVR_ATR_queued_jobs_threshold);
		}
		pj = (job *) GET_NEXT(svr_alljobs);
	} else {
		/* a queue is the parent */
		pque = (pbs_queue *) pobject;
		if (is_resc) {
			pattr = get_qattr(pque, QA_ATR_max_queued_res);
			pattr2 = get_qattr(pque, QA_ATR_queued_jobs_threshold_res);
		} else {
			pattr = get_qattr(pque, QA_ATR_max_queued);
			pattr2 = get_qattr(pque, QA_ATR_queued_jobs_threshold);
		}
		pj = (job *) GET_NEXT(pque->qu_jobs);
	}

	/* Next, walk the limit tree and clear all current values */

	ctx = pattr->at_val.at_enty.ae_tree;
	while ((plf = entlim_get_next(ctx, (void **) &key)) != NULL) {
		if (is_attr_set(&plf->slf_sum)) {
			plf->slf_rescd->rs_free(&plf->slf_sum);
			DBPRT(("clearing %s\n", key))
		}
	}

	ctx = pattr2->at_val.at_enty.ae_tree;
	key = NULL;
	while ((plf = entlim_get_next(ctx, (void **) &key)) != NULL) {
		if (is_attr_set(&plf->slf_sum)) {
			plf->slf_rescd->rs_free(&plf->slf_sum);
			DBPRT(("clearing %s\n", key))
		}
	}

	/* then for each job in the parent object, sum up its count/resource */

	while (pj) {
		if ((pj->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) == 0) {
			if (is_resc) {
				account_entity_limit_usages(pj, pque, NULL, INCR, ETLIM_ACC_ALL_RES);
			} else {
				account_entity_limit_usages(pj, pque, NULL, INCR, ETLIM_ACC_ALL_CT);
			}
		}

		if (pque)
			pj = (job *) GET_NEXT(pj->ji_jobque);
		else
			pj = (job *) GET_NEXT(pj->ji_alljobs);
	}
}

/**
 * @brief
 * 		action_entlim_ct - the at_action for the entity job count attributes
 *		calls the common "action_entlim" function with an zero flag to indicate
 *		that the entity limit is a count limit.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 * @retval	nonzero	: PBSE error code
 */

int
action_entlim_ct(attribute *pattr, void *pobject, int actmode)
{
	struct work_task *pwt;
	int rc;

	rc = action_entlim_chk(pattr, pobject, actmode);
	if (rc != PBSE_NONE)
		return rc;

	if (actmode == ATR_ACTION_ALTER) {
		/*
		 * setup a work task to resum the count for this
		 * limit after the "set" has been really set in the
		 * attribute.  At this instant in time, the real attribute
		 * still has the old information and may even be unset
		 */
		pwt = set_task(WORK_Immed, 0, entlim_resum, pobject);
		if (pwt)
			pwt->wt_aux = 0; /* resum count of jobs */
	}
	return PBSE_NONE;
}

/**
 * @brief
 * 		action_entlim_res - the at_action for the entity resource limit attributes
 *		calls the common "action_entlim" function with the flag indicating
 *		that the entity limit is a resource limit.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 * @retval	nonzero	: PBSE error code
 */

int
action_entlim_res(attribute *pattr, void *pobject, int actmode)
{
	int rc;
	struct work_task *pwt;

	rc = action_entlim_chk(pattr, pobject, actmode);
	if (rc != PBSE_NONE)
		return rc;

	if (actmode == ATR_ACTION_ALTER) {
		/*
		 * setup a work task to resum the resource usage for this
		 * limit after the "set" has been really set in the
		 * attribute.  At this instant in time, the real attribute
		 * still has the old information and may even be unset
		 */
		pwt = set_task(WORK_Immed, 0, entlim_resum, pobject);
		if (pwt)
			pwt->wt_aux = 1; /* resum resources */
	}
	return PBSE_NONE;
}

/**
 * @brief
 * 		check_no_entlim - checks for conflicting attributes which restrict what can
 *		run or be enqueued.  If an old style is being set, the newer "entlim"
 *		types cannot be set.
 *
 * @param[in]	pattr	-	pointer to attribute structure
 * @param[in]	pobject -	pointer to some parent object.
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	int
 * @retval	PBSE_NONE	: no new entlim type currently set
 * @retval	PBSE_MIXENTLIMS	: here is a new style entlim limit set
 */
int
check_no_entlim(pattr, pobject, actmode)
attribute *pattr;
void *pobject;
int actmode;
{
	int i;
	pbs_queue *pq;

	if (entlim_type_in_use == -1)
		return PBSE_NONE;
	else if (entlim_type_in_use == 0) {
		entlim_type_in_use = -1; /* show old style in use */
		return PBSE_NONE;
	}

	/* flags says wrong (new) style in use, but need to double check */
	if ((i = is_attrs_in_list_set(svr_newstyle, server.sv_attr)) != -1) {
		log_mixed_limit_controls(NULL, i, "new");
		return PBSE_MIXENTLIMS;
	}
	pq = (pbs_queue *) GET_NEXT(svr_queues);
	while (pq) {
		if ((i = is_attrs_in_list_set(que_newstyle, pq->qu_attr)) != -1) {
			log_mixed_limit_controls(pq, i, "new");
			return PBSE_MIXENTLIMS;
		}
		pq = (pbs_queue *) GET_NEXT(pq->qu_link);
	}

	entlim_type_in_use = -1; /* show old style in use */
	return 0;
}

/* Defines for return value of check_single_entity_* */
#define Exceeds_Generic -2
#define Exceeds_Limit -1
#define No_Limit 0
#define Within_Limit 1

#define ET_LIM_DBG(format, ...)                                                                              \
	if (will_log_event(PBSEVENT_DEBUG4)) {                                                               \
		snprintf(log_buffer, LOG_BUF_SIZE - 1, "ET_LIM_DBG: %s: " format, __VA_ARGS__);              \
		log_event(PBSEVENT_DEBUG4, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer); \
	}

extern char statechars[];

/**
 * @brief
 * 		check_single_entity_ct	-	check the single entity count
 *
 * @param[in]	kt	-	Key type- user/group/project or overall.
 * @param[in]	ename	-	entity name.
 * @param[in]	patr	-	pointer to attribute structure
 * @param[in]	subjobs	-	number of subjobs if any.
 * @param[in]	pjob	-	pointer to job
 *
 * @return	int
 * @retval	Exceeds_Generic	: count exceeds generic limit
 * @retval	Exceeds_Limit	: count exceeds slf_limit
 * @retval	No_Limit	: There is no limit
 * @retval	Within_Limit	: count is within the limit
 */
static int
check_single_entity_ct(enum lim_keytypes kt, char *ename, attribute *patr, int subjobs, job *pjob)
{
	char *kstr;
	void *ctx;
	svr_entlim_leaf_t *plf;
	int count = subjobs;

	kstr = entlim_mk_runkey(kt, ename);
	if (kstr == NULL) {
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
			  LOG_ALERT, msg_daemonname,
			  "rejecting job,  unable to make entity limit key, no memory");
		ET_LIM_DBG("exiting, ret %d [kstr is NULL]", __func__, LIM_OVERALL)
		return LIM_OVERALL;
	}
	ET_LIM_DBG("kstr %s, %d", __func__, kstr, subjobs)
	ctx = patr->at_val.at_enty.ae_tree;
	plf = (svr_entlim_leaf_t *) entlim_get(kstr, ctx);

	if (plf) {
		count += plf->slf_sum.at_val.at_long;
		ET_LIM_DBG("ct usage for %s is %ld", __func__, kstr, plf->slf_sum.at_val.at_long)
		ET_LIM_DBG("ct specific limit for %s is %ld", __func__, kstr, plf->slf_limit.at_val.at_long)
	}
	free(kstr);

	ET_LIM_DBG("count is %d", __func__, count)
	if (plf && (is_attr_set(&plf->slf_limit))) {
		if (count > plf->slf_limit.at_val.at_long) {
			ET_LIM_DBG("exiting, ret Exceeds_Limit [specific limit]", __func__)
			return Exceeds_Limit;
		} else {
			ET_LIM_DBG("exiting, ret Within_Limit [specific limit]", __func__)
			return Within_Limit;
		}
	} else if (kt != LIM_OVERALL) {
		/* compare against generic limit if one */
		kstr = entlim_mk_runkey(kt, PBS_GENERIC_ENTITY);
		if (kstr == NULL) {
			ET_LIM_DBG("exiting, ret No_Limit [generic limit]", __func__)
			return No_Limit;
		}
		plf = (svr_entlim_leaf_t *) entlim_get(kstr, ctx);
		if (plf && (is_attr_set(&plf->slf_limit))) {
			ET_LIM_DBG("ct generic limit for %s is %ld", __func__, kstr, plf->slf_limit.at_val.at_long)
			free(kstr);
			if (count > plf->slf_limit.at_val.at_long) {
				ET_LIM_DBG("exiting, ret Exceeds_Generic [generic limit]", __func__)
				return Exceeds_Generic;
			} else {
				ET_LIM_DBG("exiting, ret Within_Limit [generic limit]", __func__)
				return Within_Limit;
			}
		}
		free(kstr);
	}
	ET_LIM_DBG("exiting, ret No_Limit [all ok]", __func__)
	return No_Limit;
}
/**
 * @brief
 * 		check_single_entity_res	-	check single entity resource
 *
 * @param[in]	kt	-	Key type- user/group/project or overall.
 * @param[in]	ename	-	entity name.
 * @param[in]	patr	-	pointer to attribute structure
 * @param[in]	newr	-	new resource
 * @param[in]	oldr	-	old resource
 * @param[in]	subjobs -	number of subjobs if any.
 * @param[in]	pjob	-	pointer to job
 *
 * @return	int
 * @retval	Exceeds_Generic	: count exceeds generic limit
 * @retval	Exceeds_Limit	: count exceeds slf_limit
 * @retval	No_Limit	: There is no limit
 * @retval	Within_Limit	: count is within the limit
 */
static int
check_single_entity_res(enum lim_keytypes kt, char *ename,
			attribute *patr,
			resource *newr,
			resource *oldr,
			int subjobs,
			job *pjob)
{
	char *kstr;
	void *ctx;
	svr_entlim_leaf_t *plf;
	int rc;
	int i;
	attribute tmpval = {0};

	kstr = entlim_mk_reskey(kt, ename, newr->rs_defin->rs_name);
	if (kstr == NULL) {
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
			  LOG_ALERT, msg_daemonname,
			  "rejecting job,  unable to make entity limit key, no memory");
		ET_LIM_DBG("exiting, ret %d [kstr is NULL]", __func__, LIM_OVERALL)
		return LIM_OVERALL;
	}
	ET_LIM_DBG("kstr %s, %d, oldr %p", __func__, kstr, subjobs, oldr)
	ctx = patr->at_val.at_enty.ae_tree;
	plf = (svr_entlim_leaf_t *) entlim_get(kstr, ctx);

	if (plf) {
		tmpval = plf->slf_sum;
		for (i = 0; i < subjobs; i++) {
			if (oldr)
				plf->slf_rescd->rs_set(&tmpval, &oldr->rs_value, DECR);
			/* add in requested amount */
			plf->slf_rescd->rs_set(&tmpval, &newr->rs_value, INCR);
		}
		if (will_log_event(PBSEVENT_DEBUG4)) {
			svrattrl *sum = NULL, *limit = NULL;
			char *sum_val, *limit_val;
			if (is_attr_set(&plf->slf_sum)) {
				plf->slf_rescd->rs_encode(&plf->slf_sum, NULL, "sumval", NULL, ATR_ENCODE_CLIENT, &sum);
				sum_val = sum->al_value;
			} else
				sum_val = "(not_set)";
			if (is_attr_set(&plf->slf_limit)) {
				plf->slf_rescd->rs_encode(&plf->slf_limit, NULL, "limval", NULL, ATR_ENCODE_CLIENT, &limit);
				limit_val = limit->al_value;
			} else
				limit_val = "(not_set)";
			ET_LIM_DBG("res usage for %s is %s", __func__, kstr, sum_val)
			ET_LIM_DBG("res specific limit for %s is %s", __func__, kstr, limit_val)
			free(sum);
			free(limit);
		}
	}
	free(kstr);
	if (plf && (is_attr_set(&plf->slf_limit))) {
		/* check the specific user's limit */
		rc = plf->slf_rescd->rs_comp(&tmpval, &plf->slf_limit);
		if (rc > 0) {
			ET_LIM_DBG("exiting, ret Exceeds_Limit, rc=%d [specific limit]", __func__, rc)
			return Exceeds_Limit;
		}
		ET_LIM_DBG("exiting, ret Within_Limit, rc=%d [specific limit]", __func__, rc)
		return Within_Limit;
	} else if (kt != LIM_OVERALL) {
		/* check against the generic limit if one */
		kstr = entlim_mk_reskey(kt, PBS_GENERIC_ENTITY, newr->rs_defin->rs_name);
		if (kstr == NULL) {
			ET_LIM_DBG("exiting, ret No_Limit [generic limit]", __func__)
			return No_Limit;
		}
		plf = (svr_entlim_leaf_t *) entlim_get(kstr, ctx);
		if (plf && (is_attr_set(&plf->slf_limit))) {
			if (!(is_attr_set(&tmpval))) { /* for no recorded usage for entity */
				plf->slf_rescd->rs_set(&tmpval, &newr->rs_value, SET);
				for (i = 0; i < (subjobs - 1); i++) {
					plf->slf_rescd->rs_set(&tmpval, &newr->rs_value, INCR);
				}
				if (will_log_event(PBSEVENT_DEBUG4) && (is_attr_set(&tmpval))) {
					svrattrl *count;
					plf->slf_rescd->rs_encode(&tmpval, NULL, "tmpval", NULL, ATR_ENCODE_CLIENT, &count);
					ET_LIM_DBG("res generic limit for %s is %s", __func__, kstr, count->al_value)
					free(count);
				} else
					ET_LIM_DBG("res generic limit for %s is (not_set)", __func__, kstr)
			}
			rc = plf->slf_rescd->rs_comp(&tmpval, &plf->slf_limit);
			free(kstr);
			if (rc > 0) {
				ET_LIM_DBG("exiting, ret Exceeds_Generic, rc=%d [generic limit]", __func__, rc)
				return Exceeds_Generic;
			}
			ET_LIM_DBG("exiting, ret Within_Limit, rc=%d [generic limit]", __func__, rc)
			return Within_Limit;
		}
		free(kstr);
	}
	ET_LIM_DBG("exiting, ret No_Limit [all ok]", __func__)
	return No_Limit;
}

/**
 * @brief
 * 		check_entity_ct_limit_queued() - called to see if a job can be enqueued
 *		1. Called when new job is arriving against server attributes:
 *	   		- pque will be null
 *		2. Called to check against queue attributes on any enqueue
 *	   	(submit, move or route):
 *	   	- pque will point to queue struct, i.e. not null
 *
 * @param[in]	pjob	-	new job
 * @param[in]	pque	-	any enqueue
 *
 * @return	within the limit or not
 * @retval	zero	: within defined limit
 * @retval	PBS_Enumber	: if limit exceeded
 * @note
 *		On an error, a formatted message is attached to the job in ji_
 */
int
check_entity_ct_limit_queued(job *pjob, pbs_queue *pque)
{
	char *egroup;
	char *project;
	char *euser;
	attribute *pqueued_jobs_threshold;
	int rc;
	int subjobs;
	char ebuff[COMMENT_BUF_SIZE + 1];
	extern char *msg_et_qct_q;
	extern char *msg_et_sct_q;
	extern char *msg_et_ggq_q;
	extern char *msg_et_ggs_q;
	extern char *msg_et_gpq_q;
	extern char *msg_et_gps_q;
	extern char *msg_et_guq_q;
	extern char *msg_et_gus_q;
	extern char *msg_et_sgq_q;
	extern char *msg_et_sgs_q;
	extern char *msg_et_spq_q;
	extern char *msg_et_sps_q;
	extern char *msg_et_suq_q;
	extern char *msg_et_sus_q;

	ET_LIM_DBG("entered for %s", __func__, pque ? pque->qu_qs.qu_name : "server")
	euser = get_jattr_str(pjob, JOB_ATR_euser);
	egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	project = get_jattr_str(pjob, JOB_ATR_project);
	if (pjob->ji_clterrmsg) {
		free(pjob->ji_clterrmsg);
		pjob->ji_clterrmsg = NULL;
	}
	if (pque)
		pqueued_jobs_threshold = get_qattr(pque, QA_ATR_queued_jobs_threshold);
	else
		pqueued_jobs_threshold = get_sattr(SVR_ATR_queued_jobs_threshold);

	if (!is_attr_set(pqueued_jobs_threshold)) {
		ET_LIM_DBG("exiting, ret 0 [queued_jobs_threshold limit not set for %s]", __func__, pque ? pque->qu_qs.qu_name : "server")
		return PBSE_NONE; /* no limits set */
	}

	if ((subjobs = get_queued_subjobs_ct(pjob)) < 0) {
		ET_LIM_DBG("exiting, ret %d [get_queued_subjobs_ct() returned %d]", __func__,
			   PBSE_INTERNAL, subjobs)
		return PBSE_INTERNAL;
	}

	/* I.  For jobs count limits */

	/* 1. Check against Overall limit, [o:PBS_ALL] */
	rc = check_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pqueued_jobs_threshold, subjobs, pjob);
	if (rc == Exceeds_Limit) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_qct_q,
				 pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, "%s", msg_et_sct_q);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(o:" PBS_ALL_ENTITY ",%d) returned Exceeds_Limit]", __func__,
			   PBSE_ENTLIMCT, subjobs)
		return PBSE_ENTLIMCT;
	}

	/* 2. Check against specific user limit, [u:user] */
	rc = check_single_entity_ct(LIM_USER, euser, pqueued_jobs_threshold, subjobs, pjob);
	if (rc == Exceeds_Limit) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_suq_q,
				 euser, pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_sus_q, euser);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(u:%s,%d) returned Exceeds_Limit]", __func__,
			   PBSE_ENTLIMCT, euser, subjobs)
		return PBSE_ENTLIMCT;

	} else if (rc == Exceeds_Generic) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_guq_q,
				 "generic", pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_gus_q, "generic");
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(u:%s,%d) returned Exceeds_Generic]", __func__,
			   PBSE_ENTLIMCT, euser, subjobs)
		return PBSE_ENTLIMCT;
	}

	/* 3. Check against specific group limit, [g:group] */
	rc = check_single_entity_ct(LIM_GROUP, egroup, pqueued_jobs_threshold, subjobs, pjob);
	if (rc == Exceeds_Limit) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_sgq_q,
				 egroup, pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_sgs_q, egroup);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(g:%s,%d) returned Exceeds_Limit]", __func__,
			   PBSE_ENTLIMCT, egroup, subjobs)
		return PBSE_ENTLIMCT;

	} else if (rc == Exceeds_Generic) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_ggq_q,
				 pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, "%s", msg_et_ggs_q);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(g:%s,%d) returned Exceeds_Generic]", __func__,
			   PBSE_ENTLIMCT, egroup, subjobs)
		return PBSE_ENTLIMCT;
	}

	/* 4. Check against specific project limit, [p:project] */
	rc = check_single_entity_ct(LIM_PROJECT, project, pqueued_jobs_threshold, subjobs, pjob);
	if (rc == Exceeds_Limit) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_spq_q,
				 project, pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_sps_q, project);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(p:%s,%d) returned Exceeds_Limit]", __func__,
			   PBSE_ENTLIMCT, project, subjobs)
		return PBSE_ENTLIMCT;

	} else if (rc == Exceeds_Generic) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_gpq_q,
				 pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, "%s", msg_et_gps_q);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(p:%s,%d) returned Exceeds_Generic]", __func__,
			   PBSE_ENTLIMCT, project, subjobs)
		return PBSE_ENTLIMCT;
	}

	ET_LIM_DBG("exiting, ret 0 [all ok]", __func__)
	return 0; /* within all count limits */
}

/**
 * @brief
 * 		check_entity_ct_limit_max() - called to see if a job can be enqueued
 *		1. Called when new job is arriving against server attributes:
 *	   	- pque will be null
 *		2. Called to check against queue attributes on any enqueue
 *	   	(submit, move or route):
 *	   	- pque will point to queue struct, i.e. not null
 *
 * @param[in]	pjob	-	new job
 * @param[in]	pque	-	any enqueue
 *
 * @return	within the limit or not
 * @retval	zero	: within defined limit
 * @retval	PBS_Enumber	: if limit exceeded
 * @note
 *		On an error, a formatted message is attached to the job in ji_
 */
int
check_entity_ct_limit_max(job *pjob, pbs_queue *pque)
{
	char *egroup;
	char *project;
	char *euser;
	attribute *pmax_queued;
	int rc;
	int subjobs;
	char ebuff[COMMENT_BUF_SIZE + 1];
	extern char *msg_et_qct;
	extern char *msg_et_sct;
	extern char *msg_et_ggq;
	extern char *msg_et_ggs;
	extern char *msg_et_gpq;
	extern char *msg_et_gps;
	extern char *msg_et_guq;
	extern char *msg_et_gus;
	extern char *msg_et_sgq;
	extern char *msg_et_sgs;
	extern char *msg_et_spq;
	extern char *msg_et_sps;
	extern char *msg_et_suq;
	extern char *msg_et_sus;

	ET_LIM_DBG("entered for %s", __func__, pque ? pque->qu_qs.qu_name : "server")
	euser = get_jattr_str(pjob, JOB_ATR_euser);
	egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	project = get_jattr_str(pjob, JOB_ATR_project);
	if (pjob->ji_clterrmsg) {
		free(pjob->ji_clterrmsg);
		pjob->ji_clterrmsg = NULL;
	}
	if (pque)
		pmax_queued = get_qattr(pque, QA_ATR_max_queued);
	else
		pmax_queued = get_sattr(SVR_ATR_max_queued);

	if (!is_attr_set(pmax_queued)) {
		ET_LIM_DBG("exiting, ret 0 [max_queued limit not set for %s]", __func__, pque ? pque->qu_qs.qu_name : "server")
		return PBSE_NONE; /* no limits set */
	}

	if ((subjobs = get_queued_subjobs_ct(pjob)) < 0) {
		ET_LIM_DBG("exiting, ret %d [get_queued_subjobs_ct() returned %d]", __func__,
			   PBSE_INTERNAL, subjobs)
		return PBSE_INTERNAL;
	}

	/* I.  For jobs count limits */

	/* 1. Check against Overall limit, [o:PBS_ALL] */
	rc = check_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pmax_queued, subjobs, pjob);
	if (rc == Exceeds_Limit) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_qct,
				 pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, "%s", msg_et_sct);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(o:" PBS_ALL_ENTITY ",%d) returned Exceeds_Limit]", __func__,
			   PBSE_ENTLIMCT, subjobs)
		return PBSE_ENTLIMCT;
	}

	/* 2. Check against specific user limit, [u:user] */
	rc = check_single_entity_ct(LIM_USER, euser, pmax_queued, subjobs, pjob);
	if (rc == Exceeds_Limit) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_suq,
				 euser, pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_sus, euser);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(u:%s,%d) returned Exceeds_Limit]", __func__,
			   PBSE_ENTLIMCT, euser, subjobs)
		return PBSE_ENTLIMCT;

	} else if (rc == Exceeds_Generic) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_guq,
				 "generic", pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_gus, "generic");
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(u:%s,%d) returned Exceeds_Generic]", __func__,
			   PBSE_ENTLIMCT, euser, subjobs)
		return PBSE_ENTLIMCT;
	}

	/* 3. Check against specific group limit, [g:group] */
	rc = check_single_entity_ct(LIM_GROUP, egroup, pmax_queued, subjobs, pjob);
	if (rc == Exceeds_Limit) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_sgq,
				 egroup, pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_sgs, egroup);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(g:%s,%d) returned Exceeds_Limit]", __func__,
			   PBSE_ENTLIMCT, egroup, subjobs)
		return PBSE_ENTLIMCT;

	} else if (rc == Exceeds_Generic) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_ggq,
				 pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, "%s", msg_et_ggs);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(g:%s,%d) returned Exceeds_Generic]", __func__,
			   PBSE_ENTLIMCT, egroup, subjobs)
		return PBSE_ENTLIMCT;
	}

	/* 4. Check against specific project limit, [p:project] */
	rc = check_single_entity_ct(LIM_PROJECT, project, pmax_queued, subjobs, pjob);
	if (rc == Exceeds_Limit) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_spq,
				 project, pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_sps, project);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(p:%s,%d) returned Exceeds_Limit]", __func__,
			   PBSE_ENTLIMCT, project, subjobs)
		return PBSE_ENTLIMCT;

	} else if (rc == Exceeds_Generic) {
		if (pque) {
			snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_gpq,
				 pque->qu_qs.qu_name);
		} else {
			snprintf(ebuff, COMMENT_BUF_SIZE, "%s", msg_et_gps);
		}
		ebuff[COMMENT_BUF_SIZE] = '\0';
		if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
			return PBSE_SYSTEM;
		ET_LIM_DBG("exiting, ret %d [check_single_entity_ct(p:%s,%d) returned Exceeds_Generic]", __func__,
			   PBSE_ENTLIMCT, project, subjobs)
		return PBSE_ENTLIMCT;
	}

	ET_LIM_DBG("exiting, ret 0 [all ok]", __func__)
	return 0; /* within all count limits */
}

/**
 * @brief
 * 		check_entity_res_limit_queued() - called to see if a job can be enqueued
 *		based on requested (or altered) job wide resources
 *		1. Called when new job is arriving against server attributes:
 *	   	- pque will be null
 *		2. Called to check against queue attributes on any enqueue
 *	   	(submit, move or route):
 *	   	- pque will point to queue struct, i.e. not null
 *
 * @param[in]	pjob	-	new job
 * @param[in]	pque	-	any enqueue
 * @param[in]	altered_resc	-	altered job wide resources
 *
 * @return	within the limit or not
 * @retval	zero	: within defined limit
 * @retval	PBS_Enumber	: if limit exceeded
 * @note
 *		Error message text returned in ebuffer if limit exceeded
 */
int
check_entity_resc_limit_queued(job *pjob, pbs_queue *pque, attribute *altered_resc)
{
	char *egroup;
	char *project;
	char *euser;
	int rc;
	int subjobs;
	attribute *pmaxqresc;
	attribute *pattr_new;
	attribute *pattr_old;
	resource *presc_new;
	resource *presc_old;
	char ebuff[COMMENT_BUF_SIZE + 1];

	extern char *msg_et_ggq_q;
	extern char *msg_et_ggs_q;
	extern char *msg_et_guq_q;
	extern char *msg_et_gus_q;
	extern char *msg_et_sgq_q;
	extern char *msg_et_sgs_q;
	extern char *msg_et_spq_q;
	extern char *msg_et_sps_q;
	extern char *msg_et_suq_q;
	extern char *msg_et_sus_q;
	extern char *msg_et_raq_q;
	extern char *msg_et_ras_q;
	extern char *msg_et_rggq_q;
	extern char *msg_et_rggs_q;
	extern char *msg_et_rgpq_q;
	extern char *msg_et_rgps_q;
	extern char *msg_et_rguq_q;
	extern char *msg_et_rgus_q;
	extern char *msg_et_rsgq_q;
	extern char *msg_et_rsgs_q;
	extern char *msg_et_rspq_q;
	extern char *msg_et_rsps_q;
	extern char *msg_et_rsuq_q;
	extern char *msg_et_rsus_q;

	ET_LIM_DBG("entered for %s, alt_res %p", __func__, pque ? pque->qu_qs.qu_name : "server", altered_resc)
	euser = get_jattr_str(pjob, JOB_ATR_euser);
	egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	project = get_jattr_str(pjob, JOB_ATR_project);
	if (pjob->ji_clterrmsg) {
		free(pjob->ji_clterrmsg);
		pjob->ji_clterrmsg = NULL;
	}
	if (pque)
		pmaxqresc = get_qattr(pque, QA_ATR_queued_jobs_threshold_res);
	else
		pmaxqresc = get_sattr(SVR_ATR_queued_jobs_threshold_res);

	if (!is_attr_set(pmaxqresc)) {
		ET_LIM_DBG("exiting, ret 0 [queued_jobs_threshold_res limit not set for %s]", __func__, pque ? pque->qu_qs.qu_name : "server")
		return 0; /* no limits set */
	}

	if (altered_resc) {
		pattr_new = altered_resc;
		pattr_old = get_jattr(pjob, JOB_ATR_resource);
	} else {
		pattr_new = get_jattr(pjob, JOB_ATR_resource);
		pattr_old = NULL; /* null */
	}

	if ((subjobs = get_queued_subjobs_ct(pjob)) < 0) {
		ET_LIM_DBG("exiting, ret %d [get_queued_subjobs_ct() returned %d]", __func__,
			   PBSE_INTERNAL, subjobs)
		return PBSE_INTERNAL;
	}

	for (presc_new = (resource *) GET_NEXT(pattr_new->at_val.at_list);
	     presc_new != NULL;
	     presc_new = (resource *) GET_NEXT(presc_new->rs_link)) {
		char *rescn = presc_new->rs_defin->rs_name;
		/* is there an entity limit set for this resource */
		if (!(is_attr_set(&presc_new->rs_value)) || (presc_new->rs_defin->rs_entlimflg != PBS_ENTLIM_LIMITSET))
			continue; /* no limit set */

		/* If this is from qalter where presc_old is set, see if    */
		/* corresponding resource is in presc_old, had a pior value */

		if (pattr_old)
			presc_old = find_resc_entry(pattr_old, presc_new->rs_defin);
		else
			presc_old = NULL;

		ET_LIM_DBG("checking for resc %s", __func__, rescn)
		/* 1. check against overall limit o:PBS_ALL */
		rc = check_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
					     pmaxqresc,
					     presc_new, presc_old, subjobs, pjob);
		if (rc == Exceeds_Limit) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_raq_q,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_ras_q,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(o:" PBS_ALL_ENTITY ";%s,%d) returned Exceeds_Limit]", __func__,
				   PBSE_ENTLIMRESC, rescn, subjobs)
			return PBSE_ENTLIMRESC;
		}

		/* 2. check aginst user/generic-user limit */
		rc = check_single_entity_res(LIM_USER, euser,
					     pmaxqresc,
					     presc_new, presc_old, subjobs, pjob);
		if (rc == Exceeds_Limit) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsuq_q,
					 euser,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsus_q,
					 euser, presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(u:%s;%s,%d) returned Exceeds_Limit]", __func__,
				   PBSE_ENTLIMRESC, euser, rescn, subjobs)
			return PBSE_ENTLIMRESC;

		} else if (rc == Exceeds_Generic) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rguq_q,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rgus_q,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(u:%s;%s,%d) returned Exceeds_Generic]", __func__,
				   PBSE_ENTLIMRESC, euser, rescn, subjobs)
			return PBSE_ENTLIMRESC;
		}

		/* 3. check against specific/generic group limit */
		rc = check_single_entity_res(LIM_GROUP, egroup,
					     pmaxqresc,
					     presc_new, presc_old, subjobs, pjob);
		if (rc == Exceeds_Limit) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsgq_q,
					 egroup,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsgs_q,
					 egroup,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(g:%s;%s,%d) returned Exceeds_Limit]", __func__,
				   PBSE_ENTLIMRESC, egroup, rescn, subjobs)
			return PBSE_ENTLIMRESC;

		} else if (rc == Exceeds_Generic) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rggq_q,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rggs_q,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(g:%s;%s,%d) returned Exceeds_Generic]", __func__,
				   PBSE_ENTLIMRESC, egroup, rescn, subjobs)
			return PBSE_ENTLIMRESC;
		}

		/* 4. check against specific/generic project limit */
		rc = check_single_entity_res(LIM_PROJECT, project,
					     pmaxqresc,
					     presc_new, presc_old, subjobs, pjob);
		if (rc == Exceeds_Limit) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rspq_q,
					 project,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsps_q,
					 project,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(p:%s;%s,%d) returned Exceeds_Limit]", __func__,
				   PBSE_ENTLIMRESC, project, rescn, subjobs)
			return PBSE_ENTLIMRESC;

		} else if (rc == Exceeds_Generic) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rgpq_q,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rgps_q,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(p:%s;%s,%d) returned Exceeds_Generic]", __func__,
				   PBSE_ENTLIMRESC, project, rescn, subjobs)
			return PBSE_ENTLIMRESC;
		}
	}

	ET_LIM_DBG("exiting, ret 0 [all ok]", __func__)
	/* At this point the job is good to go (into the queue/server */
	return 0;
}

/**
 * @buffer
 * 		check_entity_res_limit_max() - called to see if a job can be enqueued
 *		based on requested (or altered) job wide resources
 *		1. Called when new job is arriving against server attributes:
 *	   	- pque will be null
 *		2. Called to check against queue attributes on any enqueue
 *	   	(submit, move or route):
 *	   	- pque will point to queue struct, i.e. not null
 *
 * @param[in]	pjob	-	new job
 * @param[in]	pque	-	any enqueue
 * @param[in]	altered_resc	-	altered job wide resources
 *
 * @return	within the limit or not
 * @retval	zero	: within defined limit
 * @retval	PBS_Enumber	: if limit exceeded
 * @note
 * 		Error message text returned in ebuffer if limit exceeded
 */
int
check_entity_resc_limit_max(job *pjob, pbs_queue *pque, attribute *altered_resc)
{
	char *egroup;
	char *project;
	char *euser;
	int rc;
	int subjobs;
	attribute *pmaxqresc;
	attribute *pattr_new;
	attribute *pattr_old;
	resource *presc_new;
	resource *presc_old;
	char ebuff[COMMENT_BUF_SIZE + 1];

	extern char *msg_et_ggq;
	extern char *msg_et_ggs;
	extern char *msg_et_guq;
	extern char *msg_et_gus;
	extern char *msg_et_sgq;
	extern char *msg_et_sgs;
	extern char *msg_et_spq;
	extern char *msg_et_sps;
	extern char *msg_et_suq;
	extern char *msg_et_sus;
	extern char *msg_et_raq;
	extern char *msg_et_ras;
	extern char *msg_et_rggq;
	extern char *msg_et_rggs;
	extern char *msg_et_rgpq;
	extern char *msg_et_rgps;
	extern char *msg_et_rguq;
	extern char *msg_et_rgus;
	extern char *msg_et_rsgq;
	extern char *msg_et_rsgs;
	extern char *msg_et_rspq;
	extern char *msg_et_rsps;
	extern char *msg_et_rsuq;
	extern char *msg_et_rsus;

	ET_LIM_DBG("entered for %s, alt_res %p", __func__, pque ? pque->qu_qs.qu_name : "server", altered_resc)
	euser = get_jattr_str(pjob, JOB_ATR_euser);
	egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	project = get_jattr_str(pjob, JOB_ATR_project);
	if (pjob->ji_clterrmsg) {
		free(pjob->ji_clterrmsg);
		pjob->ji_clterrmsg = NULL;
	}
	if (pque)
		pmaxqresc = get_qattr(pque, QA_ATR_max_queued_res);
	else
		pmaxqresc = get_sattr(SVR_ATR_max_queued_res);

	if (!is_attr_set(pmaxqresc)) {
		ET_LIM_DBG("exiting, ret 0 [max_queued_res limit not set for %s]", __func__, pque ? pque->qu_qs.qu_name : "server")
		return 0; /* no limits set */
	}

	if (altered_resc) {
		pattr_new = altered_resc;
		pattr_old = get_jattr(pjob, JOB_ATR_resource);
	} else {
		pattr_new = get_jattr(pjob, JOB_ATR_resource);
		pattr_old = NULL; /* null */
	}

	if ((subjobs = get_queued_subjobs_ct(pjob)) < 0) {
		ET_LIM_DBG("exiting, ret %d [get_queued_subjobs_ct() returned %d]", __func__,
			   PBSE_INTERNAL, subjobs)
		return PBSE_INTERNAL;
	}

	for (presc_new = (resource *) GET_NEXT(pattr_new->at_val.at_list);
	     presc_new != NULL;
	     presc_new = (resource *) GET_NEXT(presc_new->rs_link)) {
		char *rescn = presc_new->rs_defin->rs_name;
		/* is there an entity limit set for this resource */
		if (!(is_attr_set(&presc_new->rs_value)) || (presc_new->rs_defin->rs_entlimflg != PBS_ENTLIM_LIMITSET))
			continue; /* no limit set */

		/* If this is from qalter where presc_old is set, see if    */
		/* corresponding resource is in presc_old, had a pior value */

		if (pattr_old)
			presc_old = find_resc_entry(pattr_old, presc_new->rs_defin);
		else
			presc_old = NULL;

		ET_LIM_DBG("checking for resc %s", __func__, rescn)
		/* 1. check against overall limit o:PBS_ALL */
		rc = check_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
					     pmaxqresc,
					     presc_new, presc_old, subjobs, pjob);
		if (rc == Exceeds_Limit) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_raq,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_ras,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(o:" PBS_ALL_ENTITY ";%s,%d) returned Exceeds_Limit]", __func__,
				   PBSE_ENTLIMRESC, rescn, subjobs)
			return PBSE_ENTLIMRESC;
		}

		/* 2. check aginst user/generic-user limit */
		rc = check_single_entity_res(LIM_USER, euser,
					     pmaxqresc,
					     presc_new, presc_old, subjobs, pjob);
		if (rc == Exceeds_Limit) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsuq,
					 euser,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsus,
					 euser, presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(u:%s;%s,%d) returned Exceeds_Limit]", __func__,
				   PBSE_ENTLIMRESC, euser, rescn, subjobs)
			return PBSE_ENTLIMRESC;

		} else if (rc == Exceeds_Generic) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rguq,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rgus,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(u:%s;%s,%d) returned Exceeds_Generic]", __func__,
				   PBSE_ENTLIMRESC, euser, rescn, subjobs)
			return PBSE_ENTLIMRESC;
		}

		/* 3. check against specific/generic group limit */
		rc = check_single_entity_res(LIM_GROUP, egroup,
					     pmaxqresc,
					     presc_new, presc_old, subjobs, pjob);
		if (rc == Exceeds_Limit) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsgq,
					 egroup,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsgs,
					 egroup,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(g:%s;%s,%d) returned Exceeds_Limit]", __func__,
				   PBSE_ENTLIMRESC, egroup, rescn, subjobs)
			return PBSE_ENTLIMRESC;

		} else if (rc == Exceeds_Generic) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rggq,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rggs,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(g:%s;%s,%d) returned Exceeds_Generic]", __func__,
				   PBSE_ENTLIMRESC, egroup, rescn, subjobs)
			return PBSE_ENTLIMRESC;
		}

		/* 4. check against specific/generic project limit */
		rc = check_single_entity_res(LIM_PROJECT, project,
					     pmaxqresc,
					     presc_new, presc_old, subjobs, pjob);
		if (rc == Exceeds_Limit) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rspq,
					 project,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rsps,
					 project,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(p:%s;%s,%d) returned Exceeds_Limit]", __func__,
				   PBSE_ENTLIMRESC, project, rescn, subjobs)
			return PBSE_ENTLIMRESC;

		} else if (rc == Exceeds_Generic) {
			if (pque) {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rgpq,
					 presc_new->rs_defin->rs_name,
					 pque->qu_qs.qu_name);
			} else {
				snprintf(ebuff, COMMENT_BUF_SIZE, msg_et_rgps,
					 presc_new->rs_defin->rs_name);
			}
			ebuff[COMMENT_BUF_SIZE] = '\0';
			if ((pjob->ji_clterrmsg = strdup(ebuff)) == NULL)
				return PBSE_SYSTEM;
			ET_LIM_DBG("exiting, ret %d [check_single_entity_res(p:%s;%s,%d) returned Exceeds_Generic]", __func__,
				   PBSE_ENTLIMRESC, project, rescn, subjobs)
			return PBSE_ENTLIMRESC;
		}
	}

	ET_LIM_DBG("exiting, ret 0 [all ok]", __func__)
	/* At this point the job is good to go (into the queue/server */
	return 0;
}

/**
 * @brief
 * 		set_single_entity_ct - incr/decr a count for a single entity (user/group/all)
 *		for an attribute owned by queue or server
 * @see
 *		set_entity_ct_sum()
 *
 * @param[in]	kt    - key type
 * @param[in]	ename - entity name
 * @param[in]	patr  - pointer to attribute
 * @param[in]	pjob  - pointer to job
 * @param[in]	subjobs   - count
 * @param[in]	op    - increment or decrement
 *
 * @return	within the limit or not
 * @retval	zero	: adjusted Entity count
 * @retval	PBS_Enumber	: something got wrong!
 *
 */

static int
set_single_entity_ct(enum lim_keytypes kt, char *ename, attribute *patr, job *pjob, int subjobs, enum batch_op op)
{
	char *kstr;
	void *ctx;
	svr_entlim_leaf_t *plf;
	int rc;

	kstr = entlim_mk_runkey(kt, ename);
	if (kstr == NULL) {
		ET_LIM_DBG("exiting, ret %d [kstr is NULL]", __func__, PBSE_SYSTEM)
		return (PBSE_SYSTEM);
	}
	ET_LIM_DBG("kstr %s, %d, %s", __func__, kstr, subjobs, (op == INCR) ? "INCR" : "DECR")
	ctx = patr->at_val.at_enty.ae_tree;
	plf = (svr_entlim_leaf_t *) entlim_get(kstr, ctx);
	if (op == INCR) {
		if (plf == NULL) {
			/* add leaf for this entity-limit */
			if ((rc = alloc_svrleaf(NULL, &plf)) != PBSE_NONE) {
				free(kstr);
				ET_LIM_DBG("exiting, ret %d [alloc_svrleaf failed]", __func__, rc)
				return (rc);
			}
			if (entlim_add(kstr, plf, ctx) == -1) {
				free(kstr);
				free(plf);
				ET_LIM_DBG("exiting, ret %d [entlim_add failed]", __func__, PBSE_SYSTEM)
				return (PBSE_SYSTEM);
			}
		}
		plf->slf_sum.at_val.at_long += subjobs;
		mark_attr_set(&plf->slf_sum);
		ET_LIM_DBG("usage INCR to %ld, by %d", __func__, plf->slf_sum.at_val.at_long, subjobs)
	} else {
		if (plf == NULL) {
			free(kstr);
			/* Do not decrement what isn't there */
			ET_LIM_DBG("exiting, ret %d [plf is NULL]", __func__, PBSE_INTERNAL)
			return (PBSE_INTERNAL);
		}
		plf->slf_sum.at_val.at_long -= subjobs;
		mark_attr_set(&plf->slf_sum);
		ET_LIM_DBG("usage DECR to %ld, by %d", __func__, plf->slf_sum.at_val.at_long, subjobs)

		if (plf->slf_sum.at_val.at_long < 0L) {
			ET_LIM_DBG("zeroing usage, was %ld, by %d", __func__, plf->slf_sum.at_val.at_long, subjobs)
			plf->slf_sum.at_val.at_long = 0L;
			snprintf(log_buffer, LOG_BUF_SIZE - 1, "set_single_entity_ct zeroing negative usage for %s", kstr);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_WARNING, msg_daemonname, log_buffer);
		}
	}
	free(kstr);
	ET_LIM_DBG("exiting, ret 0 [all ok]", __func__)
	return PBSE_NONE;
}

/*
 * set_single_entity_res - incr/decr a single resource for a single
 *	entity (user/group/all) for an attribute owned by queue or server,
 *	see set_entity_res_sum()
 *
 *	kt     - key type
 *	ename  - entity name
 *	patr   - pointer to attribute owned by queue or server to update
 *	newval - ptr to resource new value
 *	oldval - ptr to resource old value, null except for qalter case
 *		 where value is being changed, not just set
 *	pjob   - pointer to job
 *	subjobs    - count
 *	op     - increment or decrement
 *
 * @return	within the limit or not
 * @retval	zero	: adjusted Entity count
 * @retval	PBS_Enumber	: something got wrong!
 */
static int
set_single_entity_res(enum lim_keytypes kt, char *ename,
		      attribute *patr, resource *newval,
		      resource *oldval, job *pjob, int subjobs, enum batch_op op)
{
	char *rescn = newval->rs_defin->rs_name;
	char *kstr;
	void *ctx;
	svr_entlim_leaf_t *plf;
	int rc;
	int i;
	attribute tmpval = newval->rs_value;

	kstr = entlim_mk_reskey(kt, ename, rescn);
	if (kstr == NULL) {
		snprintf(log_buffer, LOG_BUF_SIZE - 1, "Error in entlim_mk_reskey for rescn %s", rescn);
		log_err(-1, __func__, log_buffer);
		ET_LIM_DBG("exiting, ret %d [kstr is NULL]", __func__, PBSE_SYSTEM)
		return (PBSE_SYSTEM);
	}
	ET_LIM_DBG("kstr %s, %d, %s, res %s, %p", __func__, kstr,
		   subjobs, (op == INCR) ? "INCR" : "DECR", rescn, oldval)
	ctx = patr->at_val.at_enty.ae_tree;
	plf = (svr_entlim_leaf_t *) entlim_get(kstr, ctx);

	if (oldval && plf) {
		if (!(plf->slf_rescd->rs_comp(&tmpval, &oldval->rs_value))) {
			free(kstr);
			ET_LIM_DBG("exiting, ret 0 [newval == oldval]", __func__)
			return PBSE_NONE;
		}
		plf->slf_rescd->rs_set(&tmpval, &oldval->rs_value, DECR); /* subtract prior value (qalter case) */
		if (will_log_event(PBSEVENT_DEBUG4)) {
			svrattrl *new, *old, *diff;
			char *new_val, *old_val, *diff_val;
			new = old = diff = NULL;
			if (is_attr_set(&newval->rs_value)) {
				plf->slf_rescd->rs_encode(&newval->rs_value, NULL, "newval", NULL, ATR_ENCODE_CLIENT, &new);
				new_val = new->al_value;
			} else
				new_val = "(not_set)";
			if (is_attr_set(&oldval->rs_value)) {
				plf->slf_rescd->rs_encode(&oldval->rs_value, NULL, "oldval", NULL, ATR_ENCODE_CLIENT, &old);
				old_val = old->al_value;
			} else
				old_val = "(not_set)";
			if (is_attr_set(&tmpval)) {
				plf->slf_rescd->rs_encode(&tmpval, NULL, "diffval", NULL, ATR_ENCODE_CLIENT, &diff);
				diff_val = diff->al_value;
			} else
				diff_val = "(not_set)";
			ET_LIM_DBG("DECR old from new, %s - %s = %s", __func__, new_val, old_val, diff_val)
			free(new);
			free(old);
			free(diff);
		}
	}

	if (op == INCR) {

		/* increment resource by newval, subtracting oldval if there */
		if (plf == NULL) {
			/* add leaf for this entity-limit */
			if ((rc = alloc_svrleaf(rescn, &plf)) != PBSE_NONE) {
				free(kstr);
				ET_LIM_DBG("exiting, ret %d [alloc_svrleaf failed]", __func__, rc)
				return (rc);
			}
			if (entlim_add(kstr, plf, ctx) == -1) {
				snprintf(log_buffer, LOG_BUF_SIZE - 1, "Error in entlim_add for reskey %s", kstr);
				log_err(-1, __func__, log_buffer);
				free(kstr);
				free(plf);
				ET_LIM_DBG("exiting, ret %d [entlim_add failed]", __func__, PBSE_SYSTEM)
				return (PBSE_SYSTEM);
			}
		}

		for (i = 0; i < subjobs; i++) {
			/* add in requested amount */
			(void) plf->slf_rescd->rs_set(&plf->slf_sum,
						      &tmpval, INCR);
		}
		if (will_log_event(PBSEVENT_DEBUG4) && (is_attr_set(&plf->slf_sum))) {
			svrattrl *sum;
			plf->slf_rescd->rs_encode(&plf->slf_sum, NULL, "sumval", NULL, ATR_ENCODE_CLIENT, &sum);
			ET_LIM_DBG("usage INCR to %s", __func__, sum->al_value)
			free(sum);
		} else
			ET_LIM_DBG("usage INCR to (not_set)", __func__)

	} else { /* DECR */

		/* decrement resource by newval, adding oldval if there */
		if (plf == NULL) {
			/* Do not decrement what isn't there */
			snprintf(log_buffer, LOG_BUF_SIZE - 1, "decrementing resource for reskey %s: isn't found in attribute tree", kstr);
			log_err(-1, __func__, log_buffer);
			free(kstr);
			ET_LIM_DBG("exiting, ret %d [plf is NULL]", __func__, PBSE_INTERNAL)
			return (PBSE_INTERNAL);
		}

		for (i = 0; i < subjobs; i++) {
			(void) plf->slf_rescd->rs_set(&plf->slf_sum, &tmpval, DECR);
		}

		if (will_log_event(PBSEVENT_DEBUG4) && (is_attr_set(&plf->slf_sum))) {
			svrattrl *sum;
			plf->slf_rescd->rs_encode(&plf->slf_sum, NULL, "sumval", NULL, ATR_ENCODE_CLIENT, &sum);
			ET_LIM_DBG("usage DECR to %s", __func__, sum->al_value)
			free(sum);
		} else
			ET_LIM_DBG("usage DECR to (not_set)", __func__)

		tmpval = plf->slf_sum;
		plf->slf_rescd->rs_decode(&tmpval, NULL, NULL, "0");
		if (plf->slf_rescd->rs_comp(&plf->slf_sum, &tmpval) < 0) {
			ET_LIM_DBG("zeroing res usage", __func__)
			plf->slf_sum = tmpval;
			snprintf(log_buffer, LOG_BUF_SIZE - 1, "set_single_entity_res zeroing negative usage for %s-%s", plf->slf_rescd->rs_name, kstr);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_WARNING, msg_daemonname, log_buffer);
		}
	}

	free(kstr);
	ET_LIM_DBG("exiting, ret 0 [all ok]", __func__)
	return PBSE_NONE;
}

/**
 * @brief
 * 		set_entity_ct_sum_queued() - set (increment/decrement) entity usage sum
 *		1.Againt Server attribute (pque will be null):
 *	   		a. Called when new job is arriving (INCR)
 *	   		b. Called when job is purged (DECR)
 *		2. against queue attributes
 *	   		a. on any enqueue (INCR) or
 *	   		b. any dequeue (DECR)
 *
 * @param[in]	pjob	-	pointer to job structure
 * @param[in]	pque	-	pque will point to queue structure, i.e. not be null
 * @param[in]	op	-	operator example- INCR, DECR
 *
 * @return	int
 * @retval	zero	: all went ok
 * @retval	PBS_Enumber	: if error, typically a system or internal error
 */
int
set_entity_ct_sum_queued(job *pjob, pbs_queue *pque, enum batch_op op)
{
	char *egroup;
	char *project;
	char *euser;
	attribute *pqueued_jobs_threshold;
	enum batch_op rev_op;
	int rc;
	int subjobs;

	/* if the job is in states JOB_STATE_LTR_MOVED or JOB_STATE_LTR_FINISHED, */
	/* then just return,  the job's resources were removed from the   */
	/* entity sums when it went into the MOVED/FINISHED state	  */
	/* also return if the entity limits for this job were 		  */
	/* decremented before.						  */

	if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_FINISHED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_EXPIRED)) ||
	    ((check_job_state(pjob, JOB_STATE_LTR_RUNNING)) && (op == INCR))) {
		ET_LIM_DBG("exiting, ret 0 [job in %c state]", __func__, get_job_state(pjob))
		return 0;
	}

	/* set reverse op incase we have to back up */
	if (op == INCR)
		rev_op = DECR;
	else
		rev_op = INCR;

	if (pque)
		pqueued_jobs_threshold = get_qattr(pque, QA_ATR_queued_jobs_threshold);
	else
		pqueued_jobs_threshold = get_sattr(SVR_ATR_queued_jobs_threshold);

	if (!is_attr_set(pqueued_jobs_threshold)) {
		ET_LIM_DBG("exiting, ret 0 [queued_jobs_threshold limit not set for %s]", __func__, pque ? pque->qu_qs.qu_name : "server")
		return PBSE_NONE; /* no limits set */
	}

	euser = get_jattr_str(pjob, JOB_ATR_euser);
	egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	project = get_jattr_str(pjob, JOB_ATR_project);

	if ((subjobs = get_queued_subjobs_ct(pjob)) < 0) {
		ET_LIM_DBG("exiting, ret %d [get_queued_subjobs_ct() returned %d]", __func__,
			   PBSE_INTERNAL, subjobs)
		return PBSE_INTERNAL;
	}

	/* 1. set Overall limit, [o:PBS_ALL] */
	rc = set_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pqueued_jobs_threshold, pjob, subjobs, op);
	if (rc != PBSE_NONE) {
		ET_LIM_DBG("exiting, ret %d [set_single_entity_ct(o:" PBS_ALL_ENTITY ",%d,%s) failed]", __func__,
			   rc, subjobs, (op == INCR) ? "INCR" : "DECR")
		return rc;
	}

	/* 2. set specific user limit, [u:user] */
	rc = set_single_entity_ct(LIM_USER, euser, pqueued_jobs_threshold, pjob, subjobs, op);
	if (rc != PBSE_NONE) {
		/* undo what was done above */
		(void) set_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pqueued_jobs_threshold, pjob,
					    subjobs, rev_op);
		ET_LIM_DBG("exiting, ret %d [set_single_entity_ct(u:%s,%d,%s) failed]", __func__,
			   rc, euser, subjobs, (op == INCR) ? "INCR" : "DECR")
		return rc;
	}

	/* 3. set specific group limit, [g:group] */
	rc = set_single_entity_ct(LIM_GROUP, egroup, pqueued_jobs_threshold, pjob, subjobs, op);
	if (rc != PBSE_NONE) {
		/* undo what was done above */
		(void) set_single_entity_ct(LIM_USER, euser, pqueued_jobs_threshold, pjob, subjobs, rev_op);
		(void) set_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pqueued_jobs_threshold, pjob,
					    subjobs, rev_op);
		ET_LIM_DBG("exiting, ret %d [set_single_entity_ct(g:%s,%d,%s) failed]", __func__,
			   rc, egroup, subjobs, (op == INCR) ? "INCR" : "DECR")
		return rc;
	}

	/* 4. set specific project limit, [p:project] */
	rc = set_single_entity_ct(LIM_PROJECT, project, pqueued_jobs_threshold, pjob, subjobs, op);
	if (rc != PBSE_NONE) {
		/* undo what was done above */
		(void) set_single_entity_ct(LIM_GROUP, egroup, pqueued_jobs_threshold, pjob, subjobs, rev_op);
		(void) set_single_entity_ct(LIM_USER, euser, pqueued_jobs_threshold, pjob, subjobs, rev_op);
		(void) set_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pqueued_jobs_threshold, pjob, subjobs, rev_op);
		ET_LIM_DBG("exiting, ret %d [set_single_entity_ct(p:%s,%d,%s) failed]", __func__,
			   rc, project, subjobs, (op == INCR) ? "INCR" : "DECR")
		return rc;
	}

	ET_LIM_DBG("exiting, ret 0 [all ok]", __func__)
	return 0;
}
/**
 * @brief
 * 		set_entity_ct_sum_max() - set (increment/decrement) entity usage sum
 *		1.Againt Server attribute (pque will be null):
 *	   		a. Called when new job is arriving (INCR)
 *	   		b. Called when job is purged (DECR)
 *		2. against queue attributes
 *	   		a. on any enqueue (INCR) or
 *	   		b. any dequeue (DECR)
 *
 * @param[in]	pjob	-	pointer to job structure
 * @param[in]	pque	-	pque will point to queue structure, i.e. not be null
 * @param[in]	op	-	operator example- INCR, DECR
 *
 * @return	int
 * @retval	zero	: all went ok
 * @retval	PBS_Enumber	: if error, typically a system or internal error
 */
int
set_entity_ct_sum_max(job *pjob, pbs_queue *pque, enum batch_op op)
{
	char *egroup;
	char *project;
	char *euser;
	attribute *pmax_queued;
	enum batch_op rev_op;
	int rc;
	int subjobs;

	/* if the job is in states JOB_STATE_LTR_MOVED or JOB_STATE_LTR_FINISHED, */
	/* then just return,  the job's resources were removed from the   */
	/* entity sums when it went into the MOVED/FINISHED state	  */

	if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_EXPIRED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_FINISHED))) {
		ET_LIM_DBG("exiting, ret 0 [job in %c state]", __func__, get_job_state(pjob))
		return 0;
	}

	/* set reverse op incase we have to back up */
	if (op == INCR)
		rev_op = DECR;
	else
		rev_op = INCR;

	if (pque)
		pmax_queued = get_qattr(pque, QA_ATR_max_queued);
	else
		pmax_queued = get_sattr(SVR_ATR_max_queued);

	if (!is_attr_set(pmax_queued)) {
		ET_LIM_DBG("exiting, ret 0 [max_queued limit not set for %s]", __func__, pque ? pque->qu_qs.qu_name : "server")
		return PBSE_NONE;
	}

	euser = get_jattr_str(pjob, JOB_ATR_euser);
	egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	project = get_jattr_str(pjob, JOB_ATR_project);

	if ((subjobs = get_queued_subjobs_ct(pjob)) < 0) {
		ET_LIM_DBG("exiting, ret %d [get_queued_subjobs_ct() returned %d]", __func__,
			   PBSE_INTERNAL, subjobs)
		return PBSE_INTERNAL;
	}

	/* 1. set Overall limit, [o:PBS_ALL] */
	rc = set_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pmax_queued, pjob, subjobs, op);
	if (rc != PBSE_NONE) {
		ET_LIM_DBG("exiting, ret %d [set_single_entity_ct(o:" PBS_ALL_ENTITY ",%d,%s) failed]", __func__,
			   rc, subjobs, (op == INCR) ? "INCR" : "DECR")
		return rc;
	}

	/* 2. set specific user limit, [u:user] */
	rc = set_single_entity_ct(LIM_USER, euser, pmax_queued, pjob, subjobs, op);
	if (rc != PBSE_NONE) {
		/* undo what was done above */
		(void) set_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pmax_queued, pjob, subjobs, rev_op);
		ET_LIM_DBG("exiting, ret %d [set_single_entity_ct(u:%s,%d,%s) failed]", __func__,
			   rc, euser, subjobs, (op == INCR) ? "INCR" : "DECR")
		return rc;
	}

	/* 3. set specific group limit, [g:group] */
	rc = set_single_entity_ct(LIM_GROUP, egroup, pmax_queued, pjob, subjobs, op);
	if (rc != PBSE_NONE) {
		/* undo what was done above */
		(void) set_single_entity_ct(LIM_USER, euser, pmax_queued, pjob, subjobs, rev_op);
		(void) set_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pmax_queued, pjob, subjobs, rev_op);
		ET_LIM_DBG("exiting, ret %d [set_single_entity_ct(g:%s,%d,%s) failed]", __func__,
			   rc, egroup, subjobs, (op == INCR) ? "INCR" : "DECR")
		return rc;
	}

	/* 4. set specific project limit, [p:project] */
	rc = set_single_entity_ct(LIM_PROJECT, project, pmax_queued, pjob, subjobs, op);
	if (rc != PBSE_NONE) {
		/* undo what was done above */
		(void) set_single_entity_ct(LIM_GROUP, egroup, pmax_queued, pjob, subjobs, rev_op);
		(void) set_single_entity_ct(LIM_USER, euser, pmax_queued, pjob, subjobs, rev_op);
		(void) set_single_entity_ct(LIM_OVERALL, PBS_ALL_ENTITY, pmax_queued, pjob, subjobs, rev_op);
		ET_LIM_DBG("exiting, ret %d [set_single_entity_ct(p:%s,%d,%s) failed]", __func__,
			   rc, project, subjobs, (op == INCR) ? "INCR" : "DECR")
		return rc;
	}
	ET_LIM_DBG("exiting, ret 0 [all ok]", __func__)
	return 0; /* within all count limits */
}

/**
 * @brief
 *		revert_entity_resources - unset prior entity resource count, if any failure occurs.
 *
 *
 * @param[in]  pmaxqresc    -   pointer to queue attribute structure
 * @param[in]  pattr_old    -   pointer to job attribute
 * @param[in]  presc_new    -   pointer to current processing resource
 * @param[in]  presc_old    -   pointer to old resource before alter
 * @param[in]  presc_first  -   pointer to first resource, used to reach the starting of resource list
 * @param[in]  pjob...      -   pointer to job
 * @param[in]  subjobs      -   number of subjobs, if any.
 * @param[in]  op           -   operator example- INCR, DECR
 *
 * @return      int
 * @retval      zero        -   all went ok
 * @retval      -1          -   error in input parameters
 */

static int
revert_entity_resources(attribute *pmaxqresc, attribute *pattr_old,
			resource *presc_new, resource *presc_old, resource *presc_first,
			job *pjob, int subjobs, enum batch_op op)
{

	int res_flag = 1;
	char *euser = get_jattr_str(pjob, JOB_ATR_euser);
	char *egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	char *project = get_jattr_str(pjob, JOB_ATR_project);

	if (pmaxqresc && presc_new && presc_first && euser && egroup && project) {

		for (presc_new = (resource *) GET_PRIOR(presc_new->rs_link);
		     (presc_new != NULL) && res_flag;
		     presc_new = (resource *) GET_PRIOR(presc_new->rs_link)) {

			if (presc_new == presc_first)
				res_flag = 0;
			if (!(is_attr_set(&presc_new->rs_value)) || ((presc_new->rs_defin->rs_entlimflg & PBS_ENTLIM_LIMITSET) == 0))
				continue;

			/* If this is from qalter where presc_old is set, see if    */
			/* corresponding resource is in presc_old, had a pior value */

			if (pattr_old)
				presc_old = find_resc_entry(pattr_old, presc_new->rs_defin);
			else
				presc_old = NULL;

			(void) set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY, pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
			(void) set_single_entity_res(LIM_USER, euser, pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
			(void) set_single_entity_res(LIM_GROUP, egroup, pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
			(void) set_single_entity_res(LIM_PROJECT, project, pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		}

		return (0);
	} else
		return (-1);
}

/**
 * @brief
 * 		set_entity_resc_sum_queued() - set entity resource usage
 *		based on requested (or altered) job wide resources
 *		1. Called against server attributes (pque will be null):
 *	   		a. When new job arrives (INCR)
 *	   		b. when job is purged (DECR)
 *		2. Called against queue attributes (pque will point to queue struct):
 *	   		a. on any enqueue (INCR), or
 *	   		b. on any dequeue (DECR)
 *
 * @param[in,out]	pjob	-	pointer to job structure
 * @param[in]	pque	-	pque will point to queue structure, i.e. not be null
 * @param[in]	altered_resc	-	altered resources.
 * @param[in]	op	-	operator example- INCR, DECR
 *
 * @return	int
 * @retval	zero	: all went ok
 * @retval	PBS_Enumber	: if error, typically a system or internal error
 */
int
set_entity_resc_sum_queued(job *pjob, pbs_queue *pque, attribute *altered_resc,
			   enum batch_op op)
{
	char *egroup = NULL;
	char *project = NULL;
	char *euser = NULL;
	int rc = PBSE_NONE;
	int rc_final;
	int subjobs;
	attribute *pmaxqresc = NULL;
	attribute *pattr_new = NULL;
	attribute *pattr_old = NULL;
	resource *presc_new = NULL;
	resource *presc_old = NULL;
	resource *presc_first = NULL;
	enum batch_op rev_op;

	ET_LIM_DBG("entered [alt_res %p]", __func__, altered_resc)
	/* if the job is in states JOB_STATE_LTR_MOVED or JOB_STATE_LTR_FINISHED, */
	/* then just return,  the job's resources were removed from the   */
	/* entity sums when it went into the MOVED/FINISHED state	  */
	/* also return if the entity limits for this job were 		  */
	/* decremented before.						  */

	if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_FINISHED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_EXPIRED)) ||
	    ((check_job_state(pjob, JOB_STATE_LTR_RUNNING)) && (op == INCR))) {
		ET_LIM_DBG("exiting, ret 0 [job in %c state]", __func__, get_job_state(pjob))
		return 0;
	}

	/* set reverse op incase we have to back up */
	if (op == INCR)
		rev_op = DECR;
	else
		rev_op = INCR;

	if (pque)
		pmaxqresc = get_qattr(pque, QA_ATR_queued_jobs_threshold_res);
	else
		pmaxqresc = get_sattr(SVR_ATR_queued_jobs_threshold_res);

	if (!is_attr_set(pmaxqresc)) {
		ET_LIM_DBG("exiting, ret 0 [queued_jobs_threshold_res limit not set for %s]", __func__, pque ? pque->qu_qs.qu_name : "server")
		return 0; /* no limits set */
	}

	if (altered_resc) {
		pattr_new = altered_resc;
		pattr_old = get_jattr(pjob, JOB_ATR_resource);
	} else {
		pattr_new = get_jattr(pjob, JOB_ATR_resource);
		pattr_old = NULL; /* null */
	}

	euser = get_jattr_str(pjob, JOB_ATR_euser);
	egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	project = get_jattr_str(pjob, JOB_ATR_project);

	if ((subjobs = get_queued_subjobs_ct(pjob)) < 0)
		rc = PBSE_INTERNAL;

	if (!euser) {
		log_err(PBSE_INTERNAL, __func__, "EMPTY USER");
		rc = PBSE_INTERNAL;
	}
	if (!egroup) {
		log_err(PBSE_INTERNAL, __func__, "EMPTY GROUP");
		rc = PBSE_INTERNAL;
	}
	if (!project) {
		log_err(PBSE_INTERNAL, __func__, "EMPTY PROJECT");
		rc = PBSE_INTERNAL;
	}

	if (rc == PBSE_INTERNAL) {
		ET_LIM_DBG("exiting, ret %d [something not right, subjobs %d, %p, %p, %p]", __func__,
			   PBSE_INTERNAL, subjobs, euser, egroup, project)
		return PBSE_INTERNAL;
	}

	rc_final = 0;

	for (presc_new = (resource *) GET_NEXT(pattr_new->at_val.at_list), presc_first = presc_new;
	     presc_new != NULL;
	     presc_new = (resource *) GET_NEXT(presc_new->rs_link)) {

		char *rescn;
		if (!(is_attr_set(&presc_new->rs_value)) || ((presc_new->rs_defin->rs_entlimflg & PBS_ENTLIM_LIMITSET) == 0))
			continue;

		/* If this is from qalter where presc_old is set, see if    */
		/* corresponding resource is in presc_old, had a pior value */

		if (pattr_old)
			presc_old = find_resc_entry(pattr_old, presc_new->rs_defin);
		else
			presc_old = NULL;

		rescn = presc_new->rs_defin->rs_name;
		if (rescn == NULL) {
			if (presc_new != presc_first)
				if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
					log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
			log_err(PBSE_INTERNAL, __func__, "EMPTY RESOURCE");
			ET_LIM_DBG("exiting, ret %d [rescn is NULL]", __func__, PBSE_INTERNAL)
			return PBSE_INTERNAL;
		}

		ET_LIM_DBG("setting usage for res %s", __func__, rescn)
		/* 1. set overall limit o:PBS_ALL */
		rc = set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
					   pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		if (rc) {
			ET_LIM_DBG("set_single_entity_res(o:" PBS_ALL_ENTITY ";%s,%d,%s) failed with rc %d", __func__,
				   rescn, subjobs, (op == INCR) ? "INCR" : "DECR", rc)
			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "Error in LIM_OVERALL for resource %s", rescn);
			log_err(rc, __func__, log_buffer);
			if (op == INCR) {
				if (presc_new != presc_first)
					if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
						log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
				return rc;
			} else {
				if (!rc_final)
					rc_final = rc;
			}
			continue;
		}

		/* 2. sets user limit */
		rc = set_single_entity_res(LIM_USER, euser,
					   pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		if (rc) {
			ET_LIM_DBG("set_single_entity_res(u:%s;%s,%d,%s) failed with rc %d", __func__,
				   euser, rescn, subjobs, (op == INCR) ? "INCR" : "DECR", rc)
			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "Error in LIM_USER for euser %s for resource %s", euser, rescn);
			log_err(rc, __func__, log_buffer);
			/* reverse change made above */
			(void) set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY, pmaxqresc,
						     presc_new, presc_old, pjob, subjobs, rev_op);
			if (op == INCR) {
				if (presc_new != presc_first)
					if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
						log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
				return rc;
			} else {
				if (!rc_final)
					rc_final = rc;
				continue;
			}
		}

		/* 3. set specific group limit */
		rc = set_single_entity_res(LIM_GROUP, egroup,
					   pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		if (rc) {
			ET_LIM_DBG("set_single_entity_res(g:%s;%s,%d,%s) failed with rc %d", __func__,
				   egroup, rescn, subjobs, (op == INCR) ? "INCR" : "DECR", rc)

			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "Error in LIM_GROUP for egroup %s for resource %s", egroup, rescn);
			log_err(rc, __func__, log_buffer);

			/* reverse changes made above */
			(void) set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			(void) set_single_entity_res(LIM_USER, euser,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			if (op == INCR) {
				if (presc_new != presc_first)
					if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
						log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
				return rc;
			} else {
				if (!rc_final)
					rc_final = rc;
				continue;
			}
		}

		/* 4. set specific project limit */
		rc = set_single_entity_res(LIM_PROJECT, project,
					   pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		if (rc) {
			ET_LIM_DBG("set_single_entity_res(p:%s;%s,%d,%s) failed with rc %d", __func__,
				   project, rescn, subjobs, (op == INCR) ? "INCR" : "DECR", rc)
			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "Error in LIM_USER for project %s for resource %s", project, rescn);
			log_err(rc, __func__, log_buffer);

			/* reverse changes made above */
			(void) set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			(void) set_single_entity_res(LIM_USER, euser,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			(void) set_single_entity_res(LIM_GROUP, egroup,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			if (op == INCR) {
				if (presc_new != presc_first)
					if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
						log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
				return rc;
			} else {
				if (!rc_final)
					rc_final = rc;
				continue;
			}
		}
	}

	ET_LIM_DBG("exiting, ret %d", __func__, rc_final)
	return rc_final;
}

/**
 * @brief
 * 		set_entity_res_sum_max() - set entity resource usage
 *		based on requested (or altered) job wide resources
 *		1. Called against server attributes (pque will be null):
 *	   		a. When new job arrives (INCR)
 *	   		b. when job is purged (DECR)
 *		2. Called against queue attributes (pque will point to queue struct):
 *	   		a. on any enqueue (INCR), or
 *	   		b. on any dequeue (DECR)
 *
 * @param[in]	pjob	-	pointer to job structure
 * @param[in]	pque	-	pque will point to queue structure, i.e. not be null
 * @param[in]	altered_resc	-	altered resources.
 * @param[in]	op	-	operator example- INCR, DECR
 *
 * @return	int
 * @retval	zero	: all went ok
 * @retval	PBS_Enumber	: if error, typically a system or internal error
 */
int
set_entity_resc_sum_max(job *pjob, pbs_queue *pque, attribute *altered_resc,
			enum batch_op op)
{
	char *egroup = NULL;
	char *project = NULL;
	char *euser = NULL;
	int rc = PBSE_NONE;
	int rc_final;
	int subjobs;
	attribute *pmaxqresc = NULL;
	attribute *pattr_new = NULL;
	attribute *pattr_old = NULL;
	resource *presc_new = NULL;
	resource *presc_old = NULL;
	resource *presc_first = NULL;
	enum batch_op rev_op;

	ET_LIM_DBG("entered [alt_res %p]", __func__, altered_resc)
	/* if the job is in states JOB_STATE_LTR_MOVED or JOB_STATE_LTR_FINISHED, */
	/* then just return,  the job's resources were removed from the   */
	/* entity sums when it went into the MOVED/FINISHED state	  */

	if ((check_job_state(pjob, JOB_STATE_LTR_MOVED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_EXPIRED)) ||
	    (check_job_state(pjob, JOB_STATE_LTR_FINISHED))) {
		ET_LIM_DBG("exiting, ret 0 [job in %c state]", __func__, get_job_state(pjob))
		return 0;
	}

	/* set reverse op incase we have to back up */
	if (op == INCR)
		rev_op = DECR;
	else
		rev_op = INCR;

	if (pque)
		pmaxqresc = get_qattr(pque, QA_ATR_max_queued_res);
	else
		pmaxqresc = get_sattr(SVR_ATR_max_queued_res);

	if (!is_attr_set(pmaxqresc)) {
		ET_LIM_DBG("exiting, ret 0 [max_queued_res limit not set for %s]", __func__, pque ? pque->qu_qs.qu_name : "server")
		return 0; /* no limits set */
	}

	if (altered_resc) {
		pattr_new = altered_resc;
		pattr_old = get_jattr(pjob, JOB_ATR_resource);
	} else {
		pattr_new = get_jattr(pjob, JOB_ATR_resource);
		pattr_old = NULL; /* null */
	}

	euser = get_jattr_str(pjob, JOB_ATR_euser);
	egroup = get_jattr_str(pjob, JOB_ATR_egroup);
	project = get_jattr_str(pjob, JOB_ATR_project);

	if ((subjobs = get_queued_subjobs_ct(pjob)) < 0) {
		rc = PBSE_INTERNAL;
	}

	if (!euser) {
		log_err(PBSE_INTERNAL, __func__, "EMPTY USER");
		rc = PBSE_INTERNAL;
	}
	if (!egroup) {
		log_err(PBSE_INTERNAL, __func__, "EMPTY GROUP");
		rc = PBSE_INTERNAL;
	}
	if (!project) {
		log_err(PBSE_INTERNAL, __func__, "EMPTY PROJECT");
		rc = PBSE_INTERNAL;
	}

	if (rc == PBSE_INTERNAL) {
		ET_LIM_DBG("exiting, ret %d [something not right, subjobs %d, %p, %p, %p]", __func__,
			   PBSE_INTERNAL, subjobs, euser, egroup, project)
		return PBSE_INTERNAL;
	}

	rc_final = 0;

	for (presc_new = (resource *) GET_NEXT(pattr_new->at_val.at_list), presc_first = presc_new;
	     presc_new != NULL;
	     presc_new = (resource *) GET_NEXT(presc_new->rs_link)) {
		char *rescn;
		if (!(is_attr_set(&presc_new->rs_value)) || ((presc_new->rs_defin->rs_entlimflg & PBS_ENTLIM_LIMITSET) == 0))
			continue;

		/* If this is from qalter where presc_old is set, see if    */
		/* corresponding resource is in presc_old, had a pior value */

		if (pattr_old)
			presc_old = find_resc_entry(pattr_old, presc_new->rs_defin);
		else
			presc_old = NULL;

		rescn = presc_new->rs_defin->rs_name;
		if (rescn == NULL) {
			if (presc_new != presc_first)
				if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
					log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
			log_err(PBSE_INTERNAL, __func__, "EMPTY RESOURCE");
			ET_LIM_DBG("exiting, ret %d [rescn is NULL]", __func__, PBSE_INTERNAL)
			return PBSE_INTERNAL;
		}

		ET_LIM_DBG("setting usage for res %s", __func__, rescn)
		/* 1. set overall limit o:PBS_ALL */
		rc = set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
					   pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		if (rc) {
			ET_LIM_DBG("set_single_entity_res(o:" PBS_ALL_ENTITY ";%s,%d,%s) failed with rc %d", __func__,
				   rescn, subjobs, (op == INCR) ? "INCR" : "DECR", rc)

			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "Error in  LIM_OVERALL for resource %s", rescn);
			log_err(rc, __func__, log_buffer);
			if (op == INCR) {
				if (presc_new != presc_first)
					if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
						log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
				return rc;
			} else {
				if (!rc_final)
					rc_final = rc;
				continue;
			}
		}

		/* 2. sets user limit */
		rc = set_single_entity_res(LIM_USER, euser,
					   pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		if (rc) {
			ET_LIM_DBG("set_single_entity_res(u:%s;%s,%d,%s) failed with rc %d", __func__,
				   euser, rescn, subjobs, (op == INCR) ? "INCR" : "DECR", rc)
			/* reverse change made above */
			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "Error in  LIM_USER for euser %s for resource %s", euser, rescn);
			log_err(rc, __func__, log_buffer);
			(void) set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			if (op == INCR) {
				if (presc_new != presc_first)
					if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
						log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
				return rc;
			} else {
				if (!rc_final)
					rc_final = rc;
				continue;
			}
		}

		/* 3. set specific group limit */
		rc = set_single_entity_res(LIM_GROUP, egroup,
					   pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		if (rc) {
			ET_LIM_DBG("set_single_entity_res(g:%s;%s,%d,%s) failed with rc %d", __func__,
				   egroup, rescn, subjobs, (op == INCR) ? "INCR" : "DECR", rc)
			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "Error in  LIM_GROUP for egroup %s for resource %s", egroup, rescn);
			log_err(rc, __func__, log_buffer);
			/* reverse changes made above */
			(void) set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			(void) set_single_entity_res(LIM_USER, euser,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			if (op == INCR) {
				if (presc_new != presc_first)
					if ((revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op)) != 0)
						log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
				return rc;
			} else {
				if (!rc_final)
					rc_final = rc;
				continue;
			}
		}

		/* 4. set specific project limit */
		rc = set_single_entity_res(LIM_PROJECT, project,
					   pmaxqresc, presc_new, presc_old, pjob, subjobs, op);
		if (rc) {
			ET_LIM_DBG("set_single_entity_res(p:%s;%s,%d,%s) failed with rc %d", __func__,
				   project, rescn, subjobs, (op == INCR) ? "INCR" : "DECR", rc)
			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "Error in LIM_PROJECT for project %s for resource %s", project, rescn);
			log_err(rc, __func__, log_buffer);
			/* reverse changes made above */
			(void) set_single_entity_res(LIM_OVERALL, PBS_ALL_ENTITY,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			(void) set_single_entity_res(LIM_USER, euser,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			(void) set_single_entity_res(LIM_GROUP, egroup,
						     pmaxqresc, presc_new, presc_old, pjob, subjobs, rev_op);
			if (op == INCR) {
				if (presc_new != presc_first)
					if (revert_entity_resources(pmaxqresc, pattr_old, presc_new, presc_old, presc_first, pjob, subjobs, rev_op) != 0)
						log_err(PBSE_INTERNAL, __func__, "Error in revert_entity_resources");
				return rc;
			} else {
				if (!rc_final)
					rc_final = rc;
				continue;
			}
		}
	}

	ET_LIM_DBG("exiting, ret %d", __func__, rc_final)
	return rc_final;
}
/**
 * @brief
 * 		account_entity_limit_usages() - set entity usage
 *		for all four combination of entity limits res/ct and max/queued
 *		1. Called against server attributes (pque will be null):
 *	   		a. When new job arrives (INCR)
 *	   		b. when job is purged (DECR)
 *		2. Called against queue attributes (pque will point to queue struct):
 *	   		a. on any enqueue (INCR), or
 *	   		b. on any dequeue (DECR)
 *
 * @param[in]	pjob	-	pointer to job structure
 * @param[in]	pque	-	pque will point to queue structure, i.e. not be null
 * @param[in]	altered_resc	-	altered resources.
 * @param[in]	op	-	operator example- INCR, DECR
 * @param[in]	op_flag	-	operation flag for selecting combinations of set_entity_*_sum_*()
 * 				use ETLIM_ACC_* flag macros defined in pbs_entlim.h, ex: ETLIM_ACC_ALL
 *
 * @return	int
 * @retval	zero	: all went ok
 * @retval	PBS_Enumber	: if error, typically a system or internal error
 */
int
account_entity_limit_usages(job *pjob, pbs_queue *pque, attribute *altered_resc,
			    enum batch_op op, int op_flag)
{
	int rc, ret_error = PBSE_NONE;

	/* not doing NULL checks of parameters as this function is currently invoked from sane locations */

	ET_LIM_DBG("entered, %s on %s %s, op_flag %x, alt_res_ptr %p", __func__,
		   (op == INCR) ? "INCR" : "DECR", pque ? "queue" : "server", pque ? pque->qu_qs.qu_name : server_name, op_flag, altered_resc)

	if ((op_flag & ETLIM_ACC_CT_MAX) == ETLIM_ACC_CT_MAX)
		if ((rc = set_entity_ct_sum_max(pjob, pque, op)) != 0) {
			ret_error = rc;
			snprintf(log_buffer, LOG_BUF_SIZE - 1, "set_entity_ct_sum_max %s on %s %s failed with %d",
				 (op == INCR) ? "INCR" : "DECR", pque ? "queue" : "server", pque ? pque->qu_qs.qu_name : server_name, rc);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE, pjob->ji_qs.ji_jobid, log_buffer);
		}

	if ((op_flag & ETLIM_ACC_CT_QUEUED) == ETLIM_ACC_CT_QUEUED)
		if ((rc = set_entity_ct_sum_queued(pjob, pque, op)) != 0) {
			ret_error = rc;
			snprintf(log_buffer, LOG_BUF_SIZE - 1, "set_entity_ct_sum_queued %s on %s %s failed with %d",
				 (op == INCR) ? "INCR" : "DECR", pque ? "queue" : "server", pque ? pque->qu_qs.qu_name : server_name, rc);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE, pjob->ji_qs.ji_jobid, log_buffer);
		}

	if ((op_flag & ETLIM_ACC_RES_MAX) == ETLIM_ACC_RES_MAX)
		if ((rc = set_entity_resc_sum_max(pjob, pque, altered_resc, op)) != 0) {
			ret_error = rc;
			snprintf(log_buffer, LOG_BUF_SIZE - 1, "set_entity_resc_sum_max %s on %s %s failed with %d, (altered_resc %p)",
				 (op == INCR) ? "INCR" : "DECR", pque ? "queue" : "server", pque ? pque->qu_qs.qu_name : server_name, rc, altered_resc);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE, pjob->ji_qs.ji_jobid, log_buffer);
		}

	if ((op_flag & ETLIM_ACC_RES_QUEUED) == ETLIM_ACC_RES_QUEUED)
		if ((rc = set_entity_resc_sum_queued(pjob, pque, altered_resc, op)) != 0) {
			ret_error = rc;
			snprintf(log_buffer, LOG_BUF_SIZE - 1, "set_entity_resc_sum_queued %s on %s %s failed with %d, (altered_resc %p)",
				 (op == INCR) ? "INCR" : "DECR", pque ? "queue" : "server", pque ? pque->qu_qs.qu_name : server_name, rc, altered_resc);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE, pjob->ji_qs.ji_jobid, log_buffer);
		}

	ET_LIM_DBG("exiting, ret_error %d", __func__, ret_error)

	return ret_error;
}

/**
 * @brief
 *		Adds a record for a provisioning vnode.
 *
 * @par Functionality:
 *      This function loops through 'sv_prov_track' table
 *		and stores input arguments, if it finds an empty record.
 *
 * @see
 *		start_vnode_provisioning
 *
 * @param[in]   pid		-	provision process id
 * @param[in]   prov_vnode_info	-	prov_vnode_info structure
 *
 * @return	int
 * @retval	0	: On successful addtion of record
 * @retval	-1	: could not add provision record
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

static int
add_prov_record(prov_pid pid,
		struct prov_vnode_info *prov_vnode_info)
{
	int i;

	for (i = 0; i < server.sv_provtracksize; i++) {
		if (server.sv_prov_track[i].pvtk_mtime == 0) {
			/* found an empty record */
			break;
		}
	}
	if (i == server.sv_provtracksize) {
		DBPRT(("%s: Could not add records: current records = %d\n",
		       __func__, server.sv_cur_prov_records))
		return -1;
	}
	server.sv_prov_track[i].pvtk_mtime = time_now;
	if ((server.sv_prov_track[i].pvtk_vnode = strdup(prov_vnode_info->pvnfo_vnode)) == NULL) {
		DBPRT(("%s: Unable to allocate Memory!\n", __func__));
		return -1;
	}
	if ((server.sv_prov_track[i].pvtk_aoe_req = strdup(prov_vnode_info->pvnfo_aoe_req)) == NULL) {
		free(server.sv_prov_track[i].pvtk_vnode);
		DBPRT(("%s: Unable to allocate Memory!\n", __func__));
		return -1;
	}
	server.sv_prov_track[i].prov_vnode_info = prov_vnode_info;
	server.sv_prov_track[i].pvtk_pid = pid;
	server.sv_cur_prov_records++;
	server.sv_provtrackmodifed = 1;
	DBPRT(("%s: Added a record: current records = %d\n",
	       __func__, server.sv_cur_prov_records))
	return 0;
}

/**
 * @brief
 *		remove_prov_record
 *
 * @par Functionality:
 *      This function loops through 'sv_prov_track' table and resets the record
 *		for a vnode. It is called when vnode finishes provisioning or fails one.
 *
 * @see
 *
 * @param[in]   vnode	-	vnode name
 *
 * @return      void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

static void
remove_prov_record(char *vnode)
{
	int i;

	for (i = 0; i < server.sv_provtracksize; i++) {
		if (server.sv_prov_track[i].pvtk_mtime != 0 &&
		    (strcmp(vnode, server.sv_prov_track[i].pvtk_vnode) == 0)) {
			if (server.sv_prov_track[i].pvtk_aoe_req)
				free(server.sv_prov_track[i].pvtk_aoe_req);
			if (server.sv_prov_track[i].pvtk_vnode)
				free(server.sv_prov_track[i].pvtk_vnode);
			memset(&server.sv_prov_track[i], 0,
			       sizeof(struct prov_tracking));
			server.sv_prov_track[i].pvtk_mtime = 0;
			server.sv_provtrackmodifed = 1;
			server.sv_cur_prov_records--;
			break;
		}
	}
}

/**
 * @brief
 *		Save the provisioning records to a file.
 *
 * @par Functionality:
 *      This function is invoked periodically by a timed work task. It saves
 *		vnode name, aoe name and time stamp to prov_tracking file.
 *
 * @see
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

void
prov_track_save()
{
	FILE *fd;
	int i;

	/* set task for next round trip */
	if (server.sv_provtrackmodifed == 0)
		return; /* nothing to do this time */

	fd = fopen(path_prov_track, "w");
	if (fd == NULL) {
		DBPRT(("%s: unable to open tracking file\n", __func__))
		return;
	}

	/* we write only mtime , vnode name and AOE name to file */
	for (i = 0; i < server.sv_provtracksize; i++) {
		/* write in the file (size of each record may vary) */
		fprintf(fd, "%ld|", server.sv_prov_track[i].pvtk_mtime);

		if (server.sv_prov_track[i].pvtk_vnode)
			fprintf(fd, "%s|", server.sv_prov_track[i].pvtk_vnode);
		else
			fprintf(fd, "0|"); /* dont want to write (null) */

		if (server.sv_prov_track[i].pvtk_aoe_req)
			fprintf(fd, "%s", server.sv_prov_track[i].pvtk_aoe_req);
		else
			fprintf(fd, "0"); /* dont want to write (null) */

		if (i < server.sv_provtracksize - 1)
			fprintf(fd, "|");
	}

	(void) fclose(fd);
	server.sv_provtrackmodifed = 0;
}

/**
 * @brief
 *		Looks up a provisioning vnode record by a vnode name.
 *
 * @par Functionality:
 *      This function gets the index of the provisioning table given the
 *		vnode name. It returns NULL if match not found.
 *
 * @see
 *		#prov_tracking in provision.h
 *
 * @param[in]	vnode	-	vnode name
 *
 * @return	pointer to prov_tracking
 * @retval	pointer to prov_tracking	: if prov_tracking record is found
 * @retval	NULL	: if prov_tracking record is not found
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

struct prov_tracking *
get_prov_record_by_vnode(char *vnode)
{
	int i;

	for (i = 0; i < server.sv_provtracksize; i++) {
		if ((server.sv_prov_track[i].pvtk_mtime != 0) &&
		    strcmp(vnode,
			   server.sv_prov_track[i].pvtk_vnode) == 0) {
			return &(server.sv_prov_track[i]);
		}
	}
	return NULL;
}

/**
 * @brief
 *		Looks up a provisioning vnode record by a given pid.
 *
 *
 * @par Functionality:
 *      This function takes 'pid' as an input and returns address
 * 		of the provision record. If it doesn't find any entry then
 *		it returns -1.
 *
 * @see
 *		#prov_tracking in provision.h
 *
 * @param[in]	pid	-	provision process id
 *
 * @return	pointer to prov_tracking
 * @retval	pointer to prov_tracking	: if prov_tracking record is found
 * @retval	NULL	: if prov_tracking record is not found
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

static struct prov_tracking *
get_prov_record_by_pid(prov_pid pid)
{
	int i;

	for (i = 0; i < server.sv_provtracksize; i++) {
		if (pid == server.sv_prov_track[i].pvtk_pid) {
			return &(server.sv_prov_track[i]);
		}
	}
	return NULL;
}

/**
 * @brief
 *		Deletes a single prov_vnode_info record.
 *
 *
 * @par Functionality:
 *      This function deletes an entry of prov_vnode_info type
 *
 * @see
 *
 * @param[in]	pvinfo	-	provision vnode info structure.
 *
 * @return      void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

static void
free_pvnfo(struct prov_vnode_info *pvnfo)
{
	if (pvnfo == NULL)
		return;
	if (pvnfo->pvnfo_vnode)
		free(pvnfo->pvnfo_vnode);
	if (pvnfo->pvnfo_aoe_req)
		free(pvnfo->pvnfo_aoe_req);
	free(pvnfo);
	pvnfo = NULL;
}

/**
 * @brief
 *		Checks if aoe is available on a vnode.
 *
 *
 * @par Functionality:
 *      This function checks if aoe is available in node's
 *		resources_available.aoe
 *
 * @see
 *
 * @param[in]	pnode	-	pointer to pbsnode struct
 * @param[in]   aoe_req	-	aoe requested
 *
 * @return	int
 * @retval	0	: aoe is available on the vnode
 * @retval	-1	: aoe is unavailable on the vnode
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

int
check_req_aoe_available(struct pbsnode *pnode, char *aoe_req)
{
	resource_def *prd;
	resource *prc;
	struct array_strings *pas;
	int i;

	if (!pnode || !aoe_req)
		return -1;

	prd = &svr_resc_def[RESC_AOE];
	if (prd == NULL)
		return -1;
	prc = find_resc_entry(get_nattr(pnode, ND_ATR_ResourceAvail), prd);
	if (prc) {
		pas = prc->rs_value.at_val.at_arst;

		if (pas != NULL) {
			for (i = 0; i < pas->as_usedptr; i++) {
				if (strcmp(aoe_req, pas->as_string[i]) == 0)
					return 0;
			}
		}
	}
	return -1;
}

/**
 * @brief
 *		This function disables provisioning functionality.
 *
 * @par Functionality:
 *		This function disables provision_enable (internal)attribute on server.
 *
 * @see
 *		mgr_hook_delete
 *		mgr_hook_set
 *		mgr_hook_unset
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

void
disable_svr_prov()
{
	if (is_sattr_set(SVR_ATR_ProvisionEnable))
		set_sattr_l_slim(SVR_ATR_ProvisionEnable, 0, SET);
}

/**
 * @brief
 *		Parses prov_vnode attribute of job
 *
 * @par Functionality:
 *      This function parses 'prov_vnode' attribute of job and returns
 *		number of nodes required to run the job or -1 for failure.
 *
 * @see
 *		is_runnable
 *		fail_vnode_job
 *
 * @param[in]	prov_vnode	-	vnode name
 * @param[in]   prov_vnodes	-	ptr to prov_vnode string
 *
 * @return      int
 * @retval     >=1	: number of nodes to be provisioned
 * @retval      -1	: parsing failure.
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

int
parse_prov_vnode(char *prov_vnode, exec_vnode_listtype *prov_vnodes)
{
	/* Variables used in parsing the "exec_vnode" string */
	char *psubspec;
	char *slast;
	char *sbuf = NULL;
	int hpn;
	int i = 0, k;
	int num_of_prov_vnodes = 1;
	char *p = NULL;

	if (prov_vnode == NULL) {
		DBPRT(("%s: invalid params\n", __func__))
		return (-1);
	}

	/* Find number of nodes required to run the job */
	for (p = prov_vnode; *p; p++) {
		if (*p == '+')
			num_of_prov_vnodes++;
	}
	/* Allocate tempory memory to hold prov_vnode attribute */
	sbuf = strdup(prov_vnode);
	if (sbuf == NULL)
		return -1;

	/* Allocate memory to hold vnodenames */
	*prov_vnodes = calloc(num_of_prov_vnodes, PBS_MAXHOSTNAME + 1);
	if (*prov_vnodes == NULL) {
		free(sbuf);
		return -1;
	}

	psubspec = parse_plus_spec_r(sbuf, &slast, &hpn);
	while (psubspec) {
		/* Read vnodename */
		k = 0;
		for (p = psubspec; *p && *p != ':'; p++, k++) {
			(*prov_vnodes)[i][k] = *p;
		}
		(*prov_vnodes)[i][k] = '\0';
		DBPRT(("%s: %s\n", __func__, (*prov_vnodes)[i]))
		++i;
		psubspec = parse_plus_spec_r(slast, &slast, &hpn);
	}
	free(sbuf);

	return num_of_prov_vnodes;
}

/**
 * @brief
 *		Checks if node needs provisioning.
 *
 * @par Functionality:
 *		Checks if node needs provisioning by matching the requested aoe and
 *		current aoe on the node. It also checks if requested aoe is available
 *		on the node. Node need not provision if requested aoe is current aoe
 *		on the node. If reqeusted aoe is not available on node or available
 *		list on node is empty then job cannot run.
 *
 * @see
 *		find_prov_vnode_list
 *
 * @param[in]	pnode	-	vnode
 * @param[in]	aoe_name	-	aoe requested
 *
 * @return	int
 * @retval	-1	: node cannot be provisioned
 * @retval	0	: node need not provision
 * @retval	1	: node can be provisioned
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:  Yes
 *
 */

static int
node_need_prov(struct pbsnode *pnode, char *aoe_name)
{
	resource *presc;
	resource_def *prdef;
	char *aoe; /* hold current_aoe of pnode */
	int i;
	struct array_strings *pas = NULL;

	if (pnode == NULL || aoe_name == NULL)
		return -1;

	prdef = &svr_resc_def[RESC_AOE];
	presc = find_resc_entry(get_nattr(pnode, ND_ATR_ResourceAvail), prdef);

	/* if resources_available.aoe not set */
	if (presc == NULL)
		return -1;

	if (presc->rs_value.at_flags & ATR_VFLAG_MODIFY) {
		/* if aoe is not in resources_available.aoe */

		pas = presc->rs_value.at_val.at_arst;
		for (i = 0; i < pas->as_usedptr; i++) {
			if (strcmp(pas->as_string[i], aoe_name) == 0) { /* aoe is available */
				/* if aoe is already instantiated */
				aoe = get_nattr_str(pnode, ND_ATR_current_aoe);
				if (aoe != NULL) {
					if (strcmp(aoe_name, aoe) == 0)
						return 0;
				}
				return 1;
			}
		}
	}

	return -1;
}

/**
 * @brief
 *		Parses exec_vnode string sent by scheduler and sets prov_vnode of job
 *
 * @par Functionality:
 *      This function takes 'exec_vnode' attribute sent by scheduler
 *		and on sucessful parsing returns number of nodes with aoe in
 *		their chunk. Multiple nodes in exec_vnode are reported once.
 *		aoe_name contains the name of aoe to be provisioned with.
 *		prov_vnode attribute of job is also set.
 *
 * @see
 *		check_and_enqueue_provisioning
 *
 * @param[in]	pjob		-	pointer to job
 * @param[out]  prov_vnodes	-	list of nodes
 * @param[out]  aoe_name	-	aoe requested
 *
 * @return	int
 * @retval	>=0	: number of nodes to be provisioned
 * @retval	-1	: parsing failure.
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

int
find_prov_vnode_list(job *pjob, exec_vnode_listtype *prov_vnodes, char **aoe_name)
{
	/* Variables used in parsing the "exec_vnode" string */
	char *psubspec;
	char *slast;
	char *sbuf = NULL;
	int hpn;
	int i = 0, k, j;
	int num_of_exec_vnodes = 1;
	char *p = NULL;
	char *aoe = NULL;
	char *vname;
	int nelem;
	struct key_value_pair *pkvp;
	int no_add = 0;
	struct pbsnode *pnode;
	int ret; /* return code of node_need_provision() */
	char *pbuf = NULL;
	char *execvnod = NULL;

	if (is_jattr_set(pjob, JOB_ATR_exec_vnode))
		execvnod = get_jattr_str(pjob, JOB_ATR_exec_vnode);

	if (execvnod == NULL) {
		DBPRT(("%s: invalid params\n", __func__))
		return (-1);
	}

	/* Find number of nodes required to run the job */
	for (p = execvnod; *p; p++) {
		if (*p == '+')
			num_of_exec_vnodes++;
	}
	/* Allocate temporary memory to hold execvnod attribute */
	sbuf = strdup(execvnod);
	if (sbuf == NULL)
		return -1;

	/* Allocate temp memory to hold prov_vnode attribute */
	pbuf = calloc(1, strlen(execvnod) + 1);
	if (pbuf == NULL) {
		free(sbuf);
		return -1;
	}

	/* Allocate memory to hold vnodenames and their aoe's */
	*prov_vnodes = calloc(num_of_exec_vnodes, PBS_MAXHOSTNAME + 1);
	if (*prov_vnodes == NULL) {
		free(sbuf);
		free(pbuf);
		return -1;
	}

	psubspec = parse_plus_spec_r(sbuf, &slast, &hpn);
	while (psubspec) {
		if (parse_node_resc(psubspec, &vname, &nelem, &pkvp) == 0) {
			for (k = 0; k < nelem; k++) {
				no_add = 0;
				/* Read vnodename if aoe requested */
				if (strcasecmp("aoe", (pkvp + k)->kv_keyw) == 0) {

					/* check if same vnode is request again */
					for (j = 0; j <= i; j++) {
						if (strcmp(vname, (*prov_vnodes)[i]) == 0) {
							no_add = 1;
							break;
						}
					}
					if (no_add)
						break;

					DBPRT(("%s: Look up node %s\n", __func__, vname))
					pnode = find_nodebyname(vname);
					/* check if node really needs provisioning, if not, continue.
					 * This is to stop qrun -H from provisioning a node (including
					 * head node) that does not have aoe_req in its available list.
					 */
					ret = node_need_prov(pnode, (pkvp + k)->kv_val);
					if (ret == -1) {
						free(sbuf);
						free(pbuf);
						return -1;
					}
					if (ret == 0)
						break;

					strcpy((*prov_vnodes)[i], vname);
					DBPRT(("%s: %s\n", __func__, (*prov_vnodes)[i]))
					++i;
					if (aoe_name != NULL) {
						if (*aoe_name) {
							if (strcmp(*aoe_name, ((pkvp + k)->kv_val)) != 0) {
								/* Aoe name can not be different across chunks, it's an error */
								free(sbuf);
								free(pbuf);
								free(*aoe_name);
								return -1;
							}
						} else {
							aoe = strdup((pkvp + k)->kv_val);
							if (aoe == NULL) {
								free(sbuf);
								free(pbuf);
								return -1;
							}
							(*aoe_name) = aoe;
						}
						DBPRT(("%s: %s\n", __func__, (*aoe_name)))
					}
					break;
				}
			}
		}
		psubspec = parse_plus_spec_r(slast, &slast, &hpn);
	}

	/* prepare prov_vnode and assign to job. We do this because prov_vnode
	 * is to be parsed again later to find vnodes that were provisioned.
	 * exec_vnode cannot be parsed again since vnodes would have their
	 * current_aoe set right.
	 */
	for (j = 0; j < i; j++) {
		if (j == 0) {
			strcpy(pbuf, (*prov_vnodes)[j]);
		} else {
			strcat(pbuf, "+");
			strcat(pbuf, (*prov_vnodes)[j]);
		}
		strcat(pbuf, ":aoe=");
		strcat(pbuf, (*aoe_name));
	}
	set_jattr_str_slim(pjob, JOB_ATR_prov_vnode, pbuf, NULL);

	DBPRT(("%s: prov_vnode: %s\n", __func__, pbuf))

	free(pbuf);
	free(sbuf);
	return i;
}

/**
 * @brief
 *		Finds a vnode's entry in prov_vnode_info.
 *
 * @par Functionality:
 *      This function loops through list of provision vnodes
 *		and returns prov_vnode_info. Returns NULL, if not able to find.
 *
 * @see
 *		free_prov_vnode
 *		#prov_vnode_info in provision.h
 *
 * @param[in]	pnode	-	pointer to pbsnode
 *
 * @return	pointer to prov_vnode_info
 * @retval	pointer to prov_vnode_info	: if entry in prov_vnode_info is found
 * @retval	NULL	: if entry is not found
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

static struct prov_vnode_info *
find_prov_vnode(struct pbsnode *pnode)
{
	struct prov_vnode_info *prov_vnode_info = NULL;

	prov_vnode_info = GET_NEXT(prov_allvnodes);
	while (prov_vnode_info) {
		if (strcmp(prov_vnode_info->pvnfo_vnode, pnode->nd_name) == 0) {
			return prov_vnode_info;
		}
		prov_vnode_info = GET_NEXT(prov_vnode_info->al_link);
	}
	return NULL;
}

/**
 * @brief
 *		Removes vnode's entry from prov_vnode_info.
 *
 * @par Functionality:
 *      This function removes the vnode from prov_vnode_info and
 *		unsets vnode's INUSE_WAIT_PROV state.
 *
 * @see
 *		free_nodes
 *
 * @param[in]	pnode	-	pointer to pbsnode
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

void
free_prov_vnode(struct pbsnode *pnode)
{
	struct prov_vnode_info *prov_vnode_info = NULL;

	if (pnode->nd_state & INUSE_WAIT_PROV) {
		if ((prov_vnode_info = find_prov_vnode(pnode))) {
			delete_link(&prov_vnode_info->al_link);
			free_pvnfo(prov_vnode_info);
		}

		set_vnode_state(pnode, ~INUSE_WAIT_PROV, Nd_State_And);
	}
}

/**
 * @brief
 *		Determines if job can be run on account of a vnode finishing
 *		provisioning.
 *
 * @par Functionality:
 *      This function checks for a job, if all its vnodes have finished
 *		provisioning. If at least one vnode is offline or in wait-provisioning
 *		state or has finished provisioning but has another aoe set then job
 *		cannot be run.
 *		It also checks the case where a multi-vnode job has one
 *		vnode failing provisioning while others are still provisioning.
 *
 * @see
 *		check_and_run_jobs
 *
 * @param[in]   ptr	-	pointer to job struct
 * @param[in]   pvinfo	-	pointer to prov_vnode_info struct
 *
 * @return	int
 * @retval	0	: job is eligible to run
 * @retval	-1	: job is not eligible to run since other vnodes
 *					still provisioning or error
 * @retval	-2	:	nodes are done provisioning, but some are down
 * @retval	-3	:	nodes are all done prov, but curr_aoe, does not
 *					match req_aoe
 * @retval	-4	:	left over provisioning just returned
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

static int
is_runnable(job *ptr, struct prov_vnode_info *pvnfo)
{
	struct pbsnode *np = NULL;
	int i;
	int eflag = 0;
	exec_vnode_listtype prov_vnode_list = NULL;
	int num_of_prov_vnodes = 1;
	job *pjob;
	char *aoe_req = NULL;
	char *current_aoe;

	if (!ptr) {
		DBPRT(("%s: ptr is NULL\n", __func__))
		return -1;
	}

	pjob = (job *) ptr;
	DBPRT(("%s: Entered jobid=%s\n", __func__, pjob->ji_qs.ji_jobid))

	aoe_req = pvnfo->pvnfo_aoe_req;

	num_of_prov_vnodes = parse_prov_vnode(get_jattr_str(pjob, JOB_ATR_prov_vnode), &prov_vnode_list);

	if (num_of_prov_vnodes == -1) {
		if (prov_vnode_list)
			free(prov_vnode_list);
		return -1;
	}

	/* it could happen that some vnode started provisioning but another */
	/* failed to provision. Since, first vnode will return later, this is */
	/* a catch to stop processing further since job would have already */
	/* been held or re queued */
	if (!check_job_substate(pjob, JOB_SUBSTATE_PROVISION)) {
		DBPRT(("%s: stray provisioning for job %s\n", __func__,
		       pjob->ji_qs.ji_jobid))
		eflag = -4;
		goto label1;
	}

	for (i = 0; i < num_of_prov_vnodes; i++) {

		np = find_nodebyname(prov_vnode_list[i]);
		if (np == NULL) {
			DBPRT(("%s: node %s is null\n",
			       __func__, prov_vnode_list[i]))
			eflag = -2;
			/* let eflag get overwritten in next iterations
			 by other conditions */
			break;
		}

		/* check if vnode offline, since it could have failed prov */
		if (np->nd_state & (INUSE_OFFLINE | INUSE_OFFLINE_BY_MOM)) {

			DBPRT(("%s: vnode %s is offline (failed prov)\n",
			       __func__, np->nd_name))
			eflag = -2;
			break;

		} else if ((np->nd_state & INUSE_PROV) ||
			   (np->nd_state & INUSE_WAIT_PROV)) {
			/* Check any vnode is provisioning */
			eflag = -1;
			DBPRT(("%s: Some nodes still provisioning\n", __func__))
			break;
		} else {
			/* check if node has the correct aoe or not */
			current_aoe = NULL;
			if (is_nattr_set(np, ND_ATR_current_aoe))
				current_aoe = get_nattr_str(np, ND_ATR_current_aoe);

			if ((current_aoe == NULL) ||
			    strcmp(current_aoe, aoe_req) != 0) {
				eflag = -3;
				DBPRT(("%s: req_aoe mismatch on %s\n",
				       __func__, prov_vnode_list[i]))
				break;
			}
		}
	}
label1:

	if (num_of_prov_vnodes > 0)
		free(prov_vnode_list);

	return eflag;
}

/**
 * @brief
 *		Requeue/Hold job on provisioning failure.
 *
 * @par Functionality:
 * 		This function requeues/holds the job for which provisioning failed.
 *	 	- writes accounting log message for failure
 *	 	- frees prov_vnode of job
 *	 	- removes all pending provisioning requests
 *	 	- releases resources held by job
 *	 	- applies server hold or requeues the job
 *
 * @see
 *		fail_vnode
 *		check_and_run_jobs
 *		do_provisioning
 *
 * @param[in]   prov_vnod_info	-	pointer to prov_vnode_info struct
 * @param[in]   hold_or_que	-	indicates if job is to be held or queued
 *								hold_or_que = 0 if job is to be held
 *								hold_or_que > 0 if job is to be queued
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

void
fail_vnode_job(struct prov_vnode_info *prov_vnode_info, int hold_or_que)
{
	job *pjob;
	int cnt; /* no. of prov vnodes */
	exec_vnode_listtype prov_vnode_list = NULL;
	int i;
	struct pbsnode *np;
	struct prov_tracking *ptracking = NULL;

	if (!prov_vnode_info) {
		DBPRT(("%s: prov_vnode_info is NULL\n", __func__))
		return;
	}

	/*
	 * fail_vnode_job could be called by pending work tasks
	 * of a job, which might have already been requeued/held.
	 * However, in that case, prov_vnode_info->pvnfo_jobid
	 * will be empty, return without performing any action
	 */
	if (prov_vnode_info->pvnfo_jobid[0] == '\0')
		return;

	pjob = (job *) find_job(prov_vnode_info->pvnfo_jobid);
	if (!pjob)
		return;

	/* add accounting log for provision failure for job */
	set_job_ProvAcctRcd(pjob, time_now, PROVISIONING_FAILURE);

	/* log job prov failed message */
	if (hold_or_que == 0) {
		sprintf(log_buffer,
			"Provisioning for job %s failed, job held",
			pjob->ji_qs.ji_jobid);
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);

	} else if (hold_or_que == 1) {
		sprintf(log_buffer,
			"Provisioning for job %s failed, job queued",
			pjob->ji_qs.ji_jobid);
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);
	}
	/* remove from table other vnodes that might provision.*/
	/* vnodes that start provisioning are not within control. */
	/* These have not yet entered tracking table. */
	del_prov_vnode_entry(pjob);

	/* release resource, put system hold and move to held state */
	if (hold_or_que == 0) {
		rel_resc(pjob);
		clear_exec_on_run_fail(pjob);
		set_jattr_b_slim(pjob, JOB_ATR_hold, HOLD_s, INCR);
		set_jattr_str_slim(pjob, JOB_ATR_Comment, "job held, provisioning failed to start", NULL);
		svr_setjobstate(pjob, JOB_STATE_LTR_HELD, JOB_SUBSTATE_HELD);
	} else if (hold_or_que == 1) {
		/* don't purge job, instead requeue */
		(void) force_reque(pjob);
	}

	/*
	 * The first time fail_job_vnode is called for a job, the
	 * job is requeued/held, prov_vnode freed, and accounting
	 * record written. However, pending work tasks for the same
	 * job could trigger fail_job_vnode again later.
	 *
	 * Thus, on the first call to fail_vnode_job, loop through
	 * all prov_vnode_info's for this job and remove the job_id
	 * from them, so future calls to fail_vnode_job from pending
	 * work tasks would return without performing any action
	 */
	cnt = parse_prov_vnode(get_jattr_str(pjob, JOB_ATR_prov_vnode), &prov_vnode_list);
	for (i = 0; i < cnt; i++) {
		if ((np = find_nodebyname(prov_vnode_list[i]))) {
			if ((ptracking = get_prov_record_by_vnode(np->nd_name))) {
				prov_vnode_info = ptracking->prov_vnode_info;
				if (prov_vnode_info)
					prov_vnode_info->pvnfo_jobid[0] = '\0';
			}
		}
	}
	if (prov_vnode_list)
		free(prov_vnode_list);

	/* remove the prov_node attribute from the job here */
	if (is_jattr_set(pjob, JOB_ATR_prov_vnode))
		free_jattr(pjob, JOB_ATR_prov_vnode);
}

/**
 * @brief
 *		Marks the vnode offline.
 *
 * @par Functionality:
 *      This function marks a given vnode as offline and may log a message
 *      with it why vnode marked offline.
 *
 * @see
 *		fail_vnode
 *		offline_all_provisioning_vnodes
 *
 * @param[in]   pnode		-	pointer to pbsnode
 * @param[in]   comment		-	comment to be set on vnode and logged
 *
 * @return      void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

static void
mark_prov_vnode_offline(pbsnode *pnode, char *comment)
{
	if (!pnode) {
		DBPRT(("%s: pnode is NULL\n", __func__))
		return;
	}

	/* unset the current aoe settings, as this may not be right now */
	free_nattr(pnode, ND_ATR_current_aoe);

	DBPRT(("%s: node=%s set to offline, resetting current_aoe\n",
	       __func__, pnode->nd_name))

	/* set node to down state */
	set_vnode_state(pnode, INUSE_OFFLINE, Nd_State_Or);
	set_vnode_state(pnode, ~INUSE_PROV, Nd_State_And);

	/* write the node state and current_aoe */
	node_save_db(pnode);

	if (comment != NULL) {
		/* log msg about marking node as offline */
		log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_NODE, LOG_NOTICE, msg_daemonname, "Vnode %s: %s", pnode->nd_name, comment);
		set_nattr_str_slim(pnode, ND_ATR_Comment, comment, NULL);
	}
}

/**
 * @brief
 *		On provisioning failure, marks vnode offline and fails the job.
 *
 * @par Functionality:
 *      This function marks a vnode offline and requeue/holds all jobs on it.
 *
 * @see
 *		prov_request_deferred
 *		prov_request_timed
 *
 * @param[in]   prov_vnode_info	-	pointer to struct prov_vnode_info
 * @param[in]   hold_or_que	-	indicates if job is to be held or queued
 *								hold_or_que = 0 if job is to be held
 *								hold_or_que > 0 if job is to be queued
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

static void
fail_vnode(struct prov_vnode_info *prov_vnode_info, int hold_or_que)
{
	struct pbsnode *pnode;
	char comment[MAXNLINE];

	if (!prov_vnode_info) {
		DBPRT(("%s: prov_vnode_info is NULL\n", __func__))
		return;
	}

	pnode = find_nodebyname(prov_vnode_info->pvnfo_vnode);

	DBPRT(("%s: node=%s entered\n", __func__, prov_vnode_info->pvnfo_vnode))

	if (pnode == NULL)
		return;

	strcpy(comment, "Vnode offlined since it failed provisioning");
	mark_prov_vnode_offline(pnode, comment);

	fail_vnode_job(prov_vnode_info, hold_or_que);
}

/**
 * @brief
 *		Marks vnodes in prov_tracking table offline during startup.
 *
 * @par Functionality:
 *      This function marks all vnodes present in prov_tracking table as offline
 *		Called from pbsd_init, when server recovers from a crash or server is
 *		started. Since, status of vnodes undergoing provisioning is not known,
 *		it marks them offline.
 *
 * @see
 *		#prov_tracking in provision.h
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

void
offline_all_provisioning_vnodes()
{
	int i;
	int count = 0;
	struct pbsnode *pnode;
	char comment[MAXNLINE];
	char *vnode;

	strcpy(comment,
	       "Vnode offlined since server went down during provisioning");

	for (i = 0; i < server.sv_provtracksize; i++) {
		if (server.sv_prov_track[i].pvtk_mtime != 0) {
			/* found an empty record */
			vnode = server.sv_prov_track[i].pvtk_vnode;
			pnode = find_nodebyname(vnode);

			if (pnode) {
				mark_prov_vnode_offline(pnode, comment);
				/*
				 * reservations will take care of
				 * themselves in pbsd_init
				 */
				count++;
			}
		}
		if (server.sv_prov_track[i].pvtk_vnode)
			free(server.sv_prov_track[i].pvtk_vnode);
		if (server.sv_prov_track[i].pvtk_aoe_req)
			free(server.sv_prov_track[i].pvtk_aoe_req);
		memset(&(server.sv_prov_track[i]), 0,
		       sizeof(struct prov_tracking));
		server.sv_prov_track[i].pvtk_mtime = 0; /* mark slot empty */
	}

	server.sv_cur_prov_records = 0;
	server.sv_provtrackmodifed = 1;

	DBPRT(("%s: Marked %d nodes offline (from prov recovery)\n",
	       __func__, count))

	/* save the provisioning table to disk */
	prov_track_save();
}

/**
 * @brief
 *		Runs a job if it can when a vnode finished provisioning.
 *
 * @par Functionality:
 *      This function checks if job is runnable by calling is_runnable().
 *		A vnode finished provisioning, so check if job can be run. If it can be
 *		run, writes accounting log and server log and sends job to mom.
 *		If job cannot run because of provisioning failure, it calls fail_vnode_job
 *		to requeue job and mark vnode offline.
 *
 * @see
 *		is_runnable
 *		is_vnode_prov_done
 *		fail_vnode_job
 *
 * @param[in]	prov_vnode_info	-	pointer to struct prov_vnode_info
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

static void
check_and_run_jobs(struct prov_vnode_info *prov_vnode_info)
{
	job *pjob;
	int rc;
	struct work_task task;

	if (!prov_vnode_info) {
		DBPRT(("%s: prov_vnode_info is NULL\n", __func__))
		return;
	}

	/*
	 * job info is stale - this is from a pending work task for a job
	 * which would have failed provisioning already and requeued/held.
	 * So, dont process any further
	 */
	if (prov_vnode_info->pvnfo_jobid[0] == '\0')
		return;

	DBPRT(("%s: Entered, node=%s, jobid=%s\n", __func__,
	       prov_vnode_info->pvnfo_vnode, prov_vnode_info->pvnfo_jobid))

	pjob = (job *) find_job(prov_vnode_info->pvnfo_jobid);
	if (pjob == NULL)
		return;

	rc = is_runnable(pjob, prov_vnode_info);

	if (rc == 0) {
		task.wt_parm1 = (void *) pjob;
		prov_startjob(&task);

	} else if (rc == -2 || rc == -3) {
		/*
		 * prov over on all nodes,
		 * but some nodes offline or curr_aoe bad
		 */
		DBPRT(("%s: Jobid: %s isjob_eligible returned %d\n", __func__,
		       pjob->ji_qs.ji_jobid, rc))
		if (rc == -3)
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_INFO,
				  pjob->ji_qs.ji_jobid, "provisioning error: AOE mis-match");
		else
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_INFO,
				  pjob->ji_qs.ji_jobid, "provisioning error: vnode offline");

		if (rc == -3)
			fail_vnode_job(prov_vnode_info, 0);
		else
			fail_vnode_job(prov_vnode_info, 1);
	}
}

/**
 * @brief
 *		Checks if vnode is up after provisioning.
 *
 * @par Functionality:
 *      This function checks whether the concerned vnode is up after
 *		provisioning. If vnode is up:
 *			- cancels the timeout work task
 *			- updates the nodes file to reflect the new state
 *			- calls check_and_run_job to run jobs
 *			- frees the prov_vnode_info structure allocated
 *		  		by do_provisioning
 *		If vnode is not yet up:
 *			- it returns. It will get called again by set_vnode_state.
 *
 * @see
 *		set_vnode_state
 *		prov_request_deferred
 *
 * @param[in]	vnode	-	pointer to string containing vnode name
 *
 * @return	void
 *
 * @par Side Effects:
 *     starts a new work task to do more provisioning
 *
 * @par MT-safe:	No
 *
 */

void
is_vnode_prov_done(char *vnode)
{
	struct pbsnode *pnode = NULL;
	struct prov_vnode_info *prov_vnode_info;
	struct work_task *ptask_timeout;
	struct prov_tracking *ptracking;

	ptracking = get_prov_record_by_vnode(vnode);
	if (ptracking == NULL)
		/* prov tracking record not created */
		return;
	if (ptracking->pvtk_pid > -1) {
		DBPRT(("%s: Provisioning script not yet done\n", __func__))
		return;
	}

	prov_vnode_info = ptracking->prov_vnode_info;

	pnode = (struct pbsnode *) find_nodebyname(prov_vnode_info->pvnfo_vnode);
	assert(pnode != NULL);

	ptask_timeout = prov_vnode_info->ptask_timed;

	DBPRT(("%s: Entered for node:%s\n", __func__, prov_vnode_info->pvnfo_vnode))

	/* check if this node is up or not */
	if ((pnode->nd_state & VNODE_UNAVAILABLE) ||
	    (pnode->nd_state & INUSE_INIT)) {
		/* node is is still not up
		 return, since this will be called again
		 when the vnode gets up (from set_vnode_state)
		 */
		DBPRT(("%s: node:%s not yet up\n",
		       __func__, prov_vnode_info->pvnfo_vnode))
		return;
	}

	DBPRT(("%s: node:%s is up - cancelling timeout task\n",
	       __func__, prov_vnode_info->pvnfo_vnode))
	/* delete the timeout task */
	delete_task(ptask_timeout);

	/* unset the provisioning flag on this node */
	if (pnode->nd_state & INUSE_PROV) {
		DBPRT(("%s: node:%s is up - removing prov\n",
		       __func__, prov_vnode_info->pvnfo_vnode))
		set_vnode_state(pnode, ~INUSE_PROV, Nd_State_And);
	}

	/* save the state of this node to the nodes file */
	node_save_db(pnode);

	/* log msg about prov of node success */
	sprintf(log_buffer, "Provisioning of Vnode %s successful",
		prov_vnode_info->pvnfo_vnode);
	log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_NODE,
		  LOG_NOTICE, msg_daemonname, log_buffer);

	check_and_run_jobs(prov_vnode_info);

	/* Remove record from prov tracking table */
	remove_prov_record(pnode->nd_name);
	prov_track_save(); /* save tracking table since its modified now */

	free_pvnfo(prov_vnode_info);

	/*
	 * since one provisioning was finished, we have space
	 * to do more prov so start a task for looking at
	 * other nodes in the provisioning queue
	 */
	set_task(WORK_Immed, 0, do_provisioning, NULL);
}

/**
 * @brief
 *		Determines if any of the provisionable vnodes assigned to the job
 *		has a pending mom hook-related file copy action.
 *
 * @param[in]   pjob	-	pointer to job struct
 *
 * @return	int
 * @retval	1	: job has a pending hook-related copy action on at least
 *			  of its provisioning vnodes.
 * @retval	0	: either no pending hook-related action detected, or an
 *			  an error has occurred.
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

static int
prov_vnode_pending_hook_copy(job *pjob)
{
	struct pbsnode *np = NULL;
	int i;
	exec_vnode_listtype prov_vnode_list = NULL;
	int num_of_prov_vnodes = 1;
	int rcode = 0;

	if (pjob == NULL) {
		DBPRT(("%s: job is NULL\n", __func__))
		return 0;
	}

	DBPRT(("%s: Entered jobid=%s\n", __func__, pjob->ji_qs.ji_jobid))

	num_of_prov_vnodes = parse_prov_vnode(get_jattr_str(pjob, JOB_ATR_prov_vnode), &prov_vnode_list);

	if (num_of_prov_vnodes == -1) {
		if (prov_vnode_list)
			free(prov_vnode_list);
		return 0;
	}

	for (i = 0; i < num_of_prov_vnodes; i++) {
		int j;

		np = find_nodebyname(prov_vnode_list[i]);
		if (np == NULL) {
			DBPRT(("%s: node %s is null\n",
			       __func__, prov_vnode_list[i]))
			goto prov_vnode_label;
		}
		/* hook has not been sent */
		for (j = 0; j < np->nd_nummoms; j++) {
			if ((np->nd_moms[j] != NULL) && (sync_mom_hookfiles_count(np->nd_moms[j]) > 0)) {
				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_NODE, LOG_WARNING, pjob->ji_qs.ji_jobid, "prov vnode %s's parent mom %s:%d has a pending copy hook or delete hook request", np->nd_name, np->nd_moms[j]->mi_host, np->nd_moms[j]->mi_port);
				rcode = 1;
				break;
			}
		}
	}
prov_vnode_label:

	if (num_of_prov_vnodes > 0)
		free(prov_vnode_list);

	return rcode;
}

/**
 * @brief
 * 	This function ensures that the hooks are synced with the
 * 	provisioned node before starting the job on it.
 *
 * @param[in,out]
 * 	ptask - work task structure contains prov_vnode_info
 *
 * @return	void
 *
 */

static void
prov_startjob(struct work_task *ptask)
{
	job *pjob;
	int rc;

	assert(ptask->wt_parm1 != NULL);
	pjob = (job *) ptask->wt_parm1;
	if (pjob == NULL) {
		DBPRT(("%s: pjob is NULL\n", __func__))
		return;
	}
	/* task being serviced here */
	pjob->ji_prov_startjob_task = NULL;
	if ((do_sync_mom_hookfiles || sync_mom_hookfiles_replies_pending) &&
	    (prov_vnode_pending_hook_copy(pjob))) {

		/**
		 * If mom hook files sync is in process then create
		 * a time task where you perform this check again,
		 * and start the job once it is done
		 */

		DBPRT(("%s: setting the time task as sync mom"
		       "hookfiles is not completed\n",
		       __func__))

		/* set a work task to run after 5 sec from now */
		pjob->ji_prov_startjob_task = set_task(WORK_Timed, time_now + 5,
						       prov_startjob, pjob);
		if (pjob->ji_prov_startjob_task == NULL) {
			log_err(errno, __func__, "Unable to set task for prov_startjob; requeuing the job");
			(void) force_reque(pjob);
		}
		return;
	}

	/*  accounting log about prov for job over */
	set_job_ProvAcctRcd(pjob, time_now,
			    PROVISIONING_SUCCESS);

	/* log msg about prov for job over */
	sprintf(log_buffer,
		"Provisioning for Job %s succeeded, running job",
		pjob->ji_qs.ji_jobid);
	log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB,
		  LOG_INFO, pjob->ji_qs.ji_jobid,
		  log_buffer);

	DBPRT(("%s: Jobid: %s about to run after prov success\n",
	       __func__, pjob->ji_qs.ji_jobid))

	/* now prov_vnode is stale, remove it */
	if (is_jattr_set(pjob, JOB_ATR_prov_vnode))
		free_jattr(pjob, JOB_ATR_prov_vnode);

	DBPRT(("%s: calling [svr_startjob] from prov_startjob\n", __func__))
	/* Move the job to MOM */
	if ((rc = svr_startjob(pjob, 0)) != 0) {
		DBPRT(("%s: Jobid: %s - startjob failed - rc:%d\n",
		       __func__, pjob->ji_qs.ji_jobid, rc))
		free_nodes(pjob);
	}
	DBPRT(("%s: Jobid: %s, startjob returned: %d\n",
	       __func__, pjob->ji_qs.ji_jobid, rc))
}

/**
 * @brief
 *		Performs provisioning cleanup when provisioning script returns.
 *
 * @par Functionality:
 *      This function is called when deferred child task, set by
 *		start_vnode_provisioning, returns (i.e. provisioning script finishes,
 *		either success or failure). This can get triggered before/after
 *		provision_timeout occurs.
 *			1) Gets the childs exit status:
 *		if provisioning script exited with success (0),
 *			- updates vnodes current_aoe attribute to
 *			 the aoe for provisioning
 *			- removes the provisioning record from the provisioing table
 *			- saves the provisioing table to disk
 *		if provisioning script exited with error (non-zero)
 *			- cancels the timeout work task
 *			- removes the provisioning record from the
 * 			 prov table and saves to disk
 *			- calls fail_vnode to mark node offline
 *			 and requeue all jobs on vnode
 *
 * @see
 *		start_vnode_provisioning
 *
 * @param[in]	wtask	-	pointer to work_task
 *							wtask->wt_parm1	: should have pointer to
 *						  			prov_vnode_info structure
 *							wtask->wt_parm2	: should have pointer to timeout task
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

static void
prov_request_deferred(struct work_task *wtask)
{
	struct work_task *timeout_task;
	int stat;
	struct pbsnode *pnode = NULL;
	struct prov_vnode_info *prov_vnode_info;
	prov_pid this_pid;
	int exit_status = -1;
	struct prov_tracking *prov_tracking;

	assert(wtask->wt_parm1 != NULL);

	prov_vnode_info = (struct prov_vnode_info *) wtask->wt_parm1;
	pnode = (struct pbsnode *) find_nodebyname(prov_vnode_info->pvnfo_vnode);
	this_pid = (pid_t) wtask->wt_event;
	DBPRT(("%s: pid = %ld\n", __func__, (long) this_pid))
	timeout_task = (struct work_task *) prov_vnode_info->ptask_timed;

	/* Now, figure out exitvalue of the child process */
	stat = wtask->wt_aux;

	/* update the fact that the process is gone in the prov table */
	prov_tracking = get_prov_record_by_pid(this_pid);
	prov_tracking->pvtk_pid = -1; /* indicating the process has exited */

	if (WIFEXITED(stat))
		exit_status = WEXITSTATUS(stat);

	DBPRT(("%s: stat=%d, exit_status=%d\n", __func__, stat, exit_status))

	/* success or application prov over */
	if (exit_status == 0 || exit_status == APP_PROV_SUCCESS) {

		if (pnode == NULL) {
			delete_task(timeout_task);
			free_pvnfo(prov_vnode_info);
			return;
		}

		/* Update Current aoe */
		set_nattr_str_slim(pnode, ND_ATR_current_aoe, prov_vnode_info->pvnfo_aoe_req, NULL);

		DBPRT(("%s: node:%s current_aoe set: %s\n",
		       __func__, pnode->nd_name, prov_vnode_info->pvnfo_aoe_req))

		/* write the node current_aoe */
		node_save_db(pnode);

		/* if exit_status says app_prov returned success, reset down
		 * that we set. after setting the state, is_vnode_prov_done()
		 * is called which would delete the timed work task.
		 */
		if (exit_status == APP_PROV_SUCCESS &&
		    (pnode->nd_state & INUSE_DOWN))
			set_vnode_state(pnode, ~INUSE_DOWN, Nd_State_And);

		is_vnode_prov_done(pnode->nd_name);

		return;
	}

	/* log msg about prov of node failure */
	sprintf(log_buffer,
		"Provisioning of %s with %s for %s failed, provisioning exit status=%d",
		prov_vnode_info->pvnfo_vnode, prov_vnode_info->pvnfo_aoe_req,
		prov_vnode_info->pvnfo_jobid, exit_status);
	log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_SERVER,
		  LOG_NOTICE, msg_daemonname, log_buffer);

	/* kill the timed task since we dont need it any more */
	delete_task(timeout_task);

	/* Remove record from prov tracking table */
	remove_prov_record(pnode->nd_name);
	prov_track_save(); /* save tracking table since its modified now */

	/* Any other exit code */
	/* Failure, move all jobs to be run_err
	 * on this node to failed state
	 */
	fail_vnode(prov_vnode_info, 1);
	free_pvnfo(prov_vnode_info);

	/*
	 * since one provisioning was failed, we have space to
	 * do more prov so start a task for looking at other
	 * nodes in the provisioning queue
	 */
	set_task(WORK_Immed, 0, do_provisioning, NULL);
}

/**
 * @brief
 *		Performs provisioning cleanup if provisioning timed out.
 *
 * @par Functionality:
 *      This function performs provisioning cleanup if timed out.
 *      It is triggered after "provision_timeout" seconds have elapsed.
 *      This can get triggered before/after the deferred task finishes.
 *		1) Kills the program group of the provisioning script, if deferred
 *	   		child task not yet called.
 *      2) Cancels the deferred child work task if its not yet complete.
 *      3) Calls fail_vnode (for the concerned vnode) to mark vnode offline and
 *	   		requeue all jobs on this vnode.
 *      4) Frees the prov_vnode_info structure, allocated by do_provisioning.
 *
 * @see
 *		start_vnode_provisioning
 *
 * @param[in]	wtask	-	pointer to work_task
 *							wtask->wt_parm1	: should have pointer to
 *					  		prov_vnode_info structure
 *							wtask->wt_parm2 : should have pointer to
 *					  		sdeferred child task
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

static void
prov_request_timed(struct work_task *wtask)
{
	struct work_task *ptask_defer;
	struct prov_vnode_info *prov_vnode_info;
	prov_pid this_pid;
	struct prov_tracking *ptracking;

	assert(wtask->wt_parm1 != NULL);

	prov_vnode_info = (struct prov_vnode_info *) wtask->wt_parm1;
	ptask_defer = (struct work_task *) prov_vnode_info->ptask_defer;

	sprintf(log_buffer,
		"Provisioning of %s with %s for %s timed out",
		prov_vnode_info->pvnfo_vnode, prov_vnode_info->pvnfo_aoe_req,
		prov_vnode_info->pvnfo_jobid);
	log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_SERVER, LOG_NOTICE,
		  msg_daemonname, log_buffer);

	DBPRT(("%s: Entered node:%s Timed timeout work task\n",
	       __func__, prov_vnode_info->pvnfo_vnode))

	ptracking = get_prov_record_by_vnode(prov_vnode_info->pvnfo_vnode);
	if (ptracking->pvtk_pid > -1) {
		/* pid is part of the deferred task event */
		this_pid = ptracking->pvtk_pid;
		DBPRT(("%s: pid = %d\n", __func__, this_pid))

		/* Kill all process belonging to this process group */
		if (kill(((-1) * this_pid), SIGKILL) == -1) {
			DBPRT(("%s: couldn't kill prov process pgid = %d\n",
			       __func__, this_pid))
		}
		DBPRT(("%s: killed provisioning process tree for pgid = %d\n",
		       __func__, this_pid))

		/*
		 * script was running, it means that prov_request_deferred did
		 * not occur. so safe to delete task.
		 */
		delete_task(ptask_defer);
	}

	/* remove prov record */
	remove_prov_record(prov_vnode_info->pvnfo_vnode);
	prov_track_save();

	/* Move jobs on this node to the failed state */
	fail_vnode(prov_vnode_info, 1);
	free_pvnfo(prov_vnode_info);

	/*
	 * since one provisioning was failed, we have space
	 * to do more prov so start a task for looking at other nodes
	 * in the provisioning queue
	 */
	set_task(WORK_Immed, 0, do_provisioning, NULL);
}

/**
 * @brief
 *		Sets provision_enable and provision_timeout on server every time
 *		provision hook is modified.
 *
 * @par Functionality:
 *      This function sets server level attributes, SVR_ATR_ProvisionEnable and
 *		SVR_ATR_provision_timeout from the provisioning hook. It checks whether
 *		server attributes should be set or not.
 *
 * @see
 *		mgr_hook_import
 *		mgr_hook_set
 *		mgr_hook_unset
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

void
set_srv_prov_attributes(void)
{
#ifdef PYTHON
	hook *phook;

	DBPRT(("Entered %s\n", __func__))

	phook = find_hookbyevent(HOOK_EVENT_PROVISION);
	if (!phook || !phook->script || !phook->enabled) {
		disable_svr_prov();
		DBPRT(("%s: script/enabled not set\n", __func__))
		return;
	}

	provision_timeout = phook->alarm;
	set_sattr_l_slim(SVR_ATR_provision_timeout, provision_timeout, SET);
	set_sattr_l_slim(SVR_ATR_ProvisionEnable, 1, SET);
#else
	disable_svr_prov();
	DBPRT(("%s: Python not enabled\n", __func__))
#endif
}

/**
 * @brief
 *		Executes provisioning hook script for a vnode.
 *
 * @par Functionality:
 *      This function initializes python environment and runs python top level
 *		script. If compiled without python support, it can run a shell script
 *		(for testing).
 *
 * @see
 *		start_vnode_provisioning
 *
 * @param[in]	phook	-	pointer to provisioning hook
 * @param[in]   prov_vnode_info	-	pointer to prov_vnode_info
 *
 * @return	int
 * @retval	>1	: error code as returned by provisioning hook script
 * @retval	1	: success if doing application provisioning
 * @retval	0	: success if doing os provisioning
 * @retval	-1	: failure
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe: No
 *
 */

int
execute_python_prov_script(hook *phook,
			   struct prov_vnode_info *prov_vnode_info)
{
	int rc = 255;
	int exit_code = 255;
#ifdef PYTHON
	unsigned int hook_event;
	char *emsg = NULL;
	hook_input_param_t req_ptr;
	char perf_label[MAXBUFLEN];

	if (!phook || !prov_vnode_info)
		return rc;

	hook_event = HOOK_EVENT_PROVISION;

	if (phook->user != HOOK_PBSADMIN)
		return rc;

	snprintf(perf_label, sizeof(perf_label), "hook_%s_%s_%d", HOOKSTR_PROVISION, phook->hook_name, getpid());
	req_ptr.rq_prov = (struct prov_vnode_info *) prov_vnode_info;
	rc = pbs_python_event_set(hook_event, "root",
				  "server", &req_ptr, perf_label);
	if (rc == -1) { /* internal server code failure */
		log_event(PBSEVENT_DEBUG2,
			  PBS_EVENTCLASS_HOOK, LOG_ERR, __func__,
			  "Failed to set event; request accepted by default");
		return (-1);
	}

	/* hook_name changes for each hook */
	/* This sets Python event object's hook_name value */
	rc = pbs_python_event_set_attrval(PY_EVENT_HOOK_NAME,
					  phook->hook_name);

	if (rc == -1) {
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
			  LOG_ERR, phook->hook_name,
			  "Failed to set event 'hook_name'.");
		return (-1);
	}

	/* hook_type needed for internal processing; */
	/* hook_type changes for each hook.	     */
	/* This sets Python event object's hook_type value */
	rc = pbs_python_event_set_attrval(PY_EVENT_HOOK_TYPE,
					  hook_type_as_string(phook->type));

	if (rc == -1) {
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
			  LOG_ERR, phook->hook_name,
			  "Failed to set event 'hook_type'.");
		return (-1);
	}

	log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_HOOK,
		  LOG_INFO, phook->hook_name, "started");

	pbs_python_set_mode(PY_MODE); /* hook script mode */

	/* hook script may create files, and we don't want it to */
	/* be littering server's private directory. */
	/* NOTE: path_hooks_workdir is periodically cleaned up */
	if (chdir(path_hooks_workdir) != 0) {
		log_event(PBSEVENT_DEBUG2,
			  PBS_EVENTCLASS_HOOK, LOG_WARNING, phook->hook_name,
			  "unable to go to hooks tmp directory");
	}

	/* let rc pass through */
	hook_perf_stat_start(perf_label, HOOK_PERF_RUN_CODE, 0);
	rc = pbs_python_run_code_in_namespace(&svr_interp_data,
					      phook->script,
					      &exit_code);
	hook_perf_stat_stop(perf_label, HOOK_PERF_RUN_CODE, 0);

	/* go back to server's private directory */
	if (chdir(path_priv) != 0) {
		log_event(PBSEVENT_DEBUG2,
			  PBS_EVENTCLASS_HOOK, LOG_WARNING, phook->hook_name,
			  "unable to go back server private directory");
	}

	pbs_python_set_mode(C_MODE); /* PBS C mode - flexible */
	log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_HOOK,
		  LOG_INFO, phook->hook_name, "finished");

	switch (rc) {
		case 0:
			/* reject if at least one hook script rejects */
			if (pbs_python_event_get_accept_flag() == FALSE) { /* a reject occurred */
				snprintf(log_buffer, LOG_BUF_SIZE - 1,
					 "%s request rejected by '%s'",
					 hook_event_as_string(hook_event),
					 phook->hook_name);
				log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_HOOK,
					  LOG_ERR, phook->hook_name, log_buffer);
				if ((emsg = pbs_python_event_get_reject_msg()) != NULL) {
					snprintf(log_buffer, LOG_BUF_SIZE - 1, "%s", emsg);
					/* log also the custom reject message */
					log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_HOOK,
						  LOG_ERR, phook->hook_name, log_buffer);
				}
			}
			return (exit_code);

		case -1: /* internal error */
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
				  LOG_ERR, phook->hook_name,
				  "Internal server error encountered. Skipping hook.");
			return (rc); /* should not happen */

		case -2: /* unhandled exception */
			pbs_python_event_reject(NULL);
			pbs_python_event_param_mod_disallow();

			snprintf(log_buffer, LOG_BUF_SIZE - 1,
				 "%s hook '%s' encountered an exception, "
				 "request rejected",
				 hook_event_as_string(hook_event), phook->hook_name);
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
				  LOG_ERR, phook->hook_name, log_buffer);
			return (rc);
	}
#endif
	return rc;
}

/**
 * @brief
 *		Performs basic checks and then kicks off provisioning of a vnode.
 *
 * @par Functionality:
 *      This function starts provisioning of a vnode with aoe specified by
 *		starting provisioning hook in another process. do_provisioning() is
 *		called in the end to drain the provisioning list. Deferred and Timed
 *		work tasks are set and a provisioning record is added in server. vnode
 *		state is marked down and provisioning. wait-provisioning state flag is
 *		cleared.
 *
 * @see
 *		check_and_enqueue_provisioning
 *
 * @param[in]	prov_vnode_info	-	pointer to prov_vnode_info entry in server
 *
 * @return	int
 * @retval	PBSE_NONE	: success if provisioning started for a vnode
 * @retval	PBS Error code	: if failed to start provisioning
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

static int
start_vnode_provisioning(struct prov_vnode_info *prov_vnode_info)
{
	prov_pid pid;
	struct work_task *ptask_defer;
	struct work_task *ptask_timed;
	struct pbsnode *pnode;
	job *pjob;
	int rc = -1;
	struct sigaction act;
	hook *phook;

	DBPRT(("%s: Provisioning vnode: %s with aoe: %s\n", __func__,
	       prov_vnode_info->pvnfo_vnode, prov_vnode_info->pvnfo_aoe_req))

	pnode = find_nodebyname(prov_vnode_info->pvnfo_vnode);
	if (!pnode) {
		DBPRT(("%s: Could not find vnode %s\n", __func__,
		       prov_vnode_info->pvnfo_vnode))
		return (PBSE_SYSTEM);
	}

	phook = find_hookbyevent(HOOK_EVENT_PROVISION);
	if (!phook) {
		DBPRT(("%s: Provisioning hook not found\n", __func__))
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, LOG_INFO,
			  msg_daemonname, "Provisioning hook not found");
		return rc;
	}

	if ((rc = pbs_python_check_and_compile_script(&svr_interp_data,
						      phook->script)) != 0) {
		DBPRT(("%s: Recompilation failed\n", __func__))
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_INFO,
			  msg_daemonname, "Provisioning script recompilation failed");
		return rc;
	}

	/* Create child process to run TOP-LEVEL provisioning script */
	pid = fork();
	if (pid == -1) { /* fork failed */
		DBPRT(("%s: fork() failed\n", __func__))
		return (PBSE_SYSTEM);
	} else if (pid == 0) { /* child process */
		alarm(0);
		/* standard tpp closure and net close */
		net_close(-1);
		tpp_terminate();

		/* Reset signal actions for most to SIG_DFL */
		sigemptyset(&act.sa_mask);
		act.sa_flags = 0;
		act.sa_handler = SIG_DFL;
		(void) sigaction(SIGCHLD, &act, NULL);
		(void) sigaction(SIGHUP, &act, NULL);
		(void) sigaction(SIGINT, &act, NULL);
		(void) sigaction(SIGTERM, &act, NULL);

		/* Reset signal mask */
		(void) sigprocmask(SIG_SETMASK, &act.sa_mask, NULL);

		/*
		 * set process as session leader
		 */
		if (setsid() < 0)
			exit(13);

		/* Redirect standard files to /dev/null */
		if (freopen("/dev/null", "r", stdin) == NULL) 
			log_errf(-1, __func__, "freopen of null device failed. ERR : %s",strerror(errno));


		/* Unprotect child from being killed by system */
		daemon_protect(0, PBS_DAEMON_PROTECT_OFF);

		/* exit with the return code from the script */
		rc = execute_python_prov_script(phook, prov_vnode_info);

		/* if python did sys.exit we wont be here */
		exit(rc);
	}

	/* parent process */
	/* set node state to provisioning */
	/*
	 * set_vnode_state(pnode, INUSE_PROV, Nd_State_Or);
	 * This is now done earlier
	 */
	/* unset the current_aoe for the node here provisioning */
	free_nattr(pnode, ND_ATR_current_aoe);

	/* write the node current_aoe */
	node_save_db(pnode);

	/*
	 * Parent process creates two work tasks
	 * i.e deferred child work task and timed work task.Deferred child
	 * task is to capture the exit code of the provisioning script.
	 * The Timed task is to implement the timeout feature.
	 */

	/*
	 * wt_parm1 is passed the address of the prov_vnode_info
	 * structure allocated earlier
	 */
	ptask_defer = set_task(WORK_Deferred_Child, pid,
			       prov_request_deferred,
			       (void *) prov_vnode_info);
	if (!ptask_defer)
		return (PBSE_INTERNAL);

	ptask_timed = set_task(WORK_Timed, time_now + provision_timeout,
			       prov_request_timed,
			       (void *) prov_vnode_info);
	if (!ptask_timed) {
		/* cancel deferred child work task */
		delete_task(ptask_defer);
		return (PBSE_INTERNAL);
	}

	/* store the addresses in prov_vnode_info */
	prov_vnode_info->ptask_defer = ptask_defer;
	prov_vnode_info->ptask_timed = ptask_timed;

	/*
	 * add a provisioning record to the prov_record table,
	 * used for server crash recovery
	 */
	if (add_prov_record(pid, prov_vnode_info) == -1) {
		/* this actually should not fail, since we checked before */
		delete_task(ptask_defer);
		delete_task(ptask_timed);
		return (PBSE_INTERNAL);
	}

	pjob = find_job(prov_vnode_info->pvnfo_jobid);
	if (pjob) {
		/* log job prov success message */
		sprintf(log_buffer, "Provisioning vnode %s with AOE %s "
				    "started successfully",
			prov_vnode_info->pvnfo_vnode,
			prov_vnode_info->pvnfo_aoe_req);
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
			  LOG_INFO, pjob->ji_qs.ji_jobid, log_buffer);
	}

	/* remove the INUSE_WAIT_PROV flag as it is prov now */
	set_vnode_state(pnode, ~INUSE_WAIT_PROV, Nd_State_And);

	/* set prov and down states */
	set_vnode_state(pnode, INUSE_PROV | INUSE_DOWN, Nd_State_Or);

	return (PBSE_NONE);
}

/**
 * @brief
 *		Checks if provisioning is required or not.
 *
 * @par Functionality:
 *      This function parses job's exec_vnode attribute, if set, it checks if
 *		job needs one or more vnodes to be provisioned. If exec_vnode is null,
 *		need_prov contains 0. If one or more vnodes need provisioning, need_prov is 1.
 *
 * @see
 *		check_and_provision_job
 * @param[in]   pjob	-	pointer to job
 * @param[out]  need_prov	-	boolean value, whether job will provision
 *
 * @return	int
 * @retval	PBSE_NONE	: success if no provisioning needed
 * @retval	PBS Error code	: if some error occurs
 *
 * @par Side Effects:
 *  	Unknown
 *
 * @par MT-safe:	No
 *
 */

int
check_and_enqueue_provisioning(job *pjob, int *need_prov)
{
	exec_vnode_listtype prov_vnode_list = NULL;
	int num_of_prov_vnodes = -1;
	int i;
	struct prov_vnode_info *prov_vnode_info;
	struct pbsnode *pnode;
	struct work_task *ptask_start_prov;
	char *aoe_req = NULL; /* to point to aoe */

	DBPRT(("%s: Entered\n", __func__))

	if (need_prov == NULL) {
		DBPRT(("%s: bad params\n", __func__))
		return (PBSE_IVALREQ);
	}

	*need_prov = 0;

	/* prov_vnode_list is of type exec_vnode_listtype.
	 * This is an array of "pointers to arrays[PBS_MAXCLTJOBID]"
	 */
	num_of_prov_vnodes = find_prov_vnode_list(pjob, &prov_vnode_list, &aoe_req);
	if (num_of_prov_vnodes == -1) {
		if (prov_vnode_list)
			free(prov_vnode_list);
		if (aoe_req)
			free(aoe_req);
		return (PBSE_IVALREQ);
	}

	DBPRT(("%s: aoe_req: %s\n", __func__, (aoe_req ? aoe_req : "NULL")))

	if (num_of_prov_vnodes == 0) {
		*need_prov = 0;
		DBPRT(("%s: Provisioning will not be done, "
		       "since no aoe requested or scheduler did not give provision vnode\n",
		       __func__))
		if (prov_vnode_list)
			free(prov_vnode_list);
		return (PBSE_NONE);
	}

	/* enque the provisioning request */
	for (i = 0; i < num_of_prov_vnodes; i++) {
		prov_vnode_info =
			(struct prov_vnode_info *) calloc(1,
							  sizeof(struct prov_vnode_info));
		if (!prov_vnode_info) {
			free(prov_vnode_list);
			if (aoe_req)
				free(aoe_req);
			return (PBSE_INTERNAL);
		}
		/*
		 * prepare prov_vnode_info structure thats
		 * passed as arg to work tasks
		 */

		/*
		 * prov_vnode_info carries only the id's of the
		 * job/resv and not pointers this is because, this
		 * structure would be used by work tasks later and
		 * at that point of time, job / resv pointers may not
		 * be valid as its possible that they could be
		 * deleted by the server
		 */
		if ((prov_vnode_info->pvnfo_vnode = strdup(prov_vnode_list[i])) == NULL) {
			free(prov_vnode_list);
			free_pvnfo(prov_vnode_info);
			if (aoe_req)
				free(aoe_req);
			return PBSE_SYSTEM;
		}
		if ((prov_vnode_info->pvnfo_aoe_req = strdup(aoe_req)) == NULL) {
			free(prov_vnode_list);
			free_pvnfo(prov_vnode_info);
			if (aoe_req)
				free(aoe_req);
			return PBSE_SYSTEM;
		}
		strcpy(prov_vnode_info->pvnfo_jobid, pjob->ji_qs.ji_jobid);

		CLEAR_LINK(prov_vnode_info->al_link);
		append_link(&prov_allvnodes, &prov_vnode_info->al_link,
			    prov_vnode_info);

		pnode = find_nodebyname(prov_vnode_list[i]);

		set_vnode_state(pnode, INUSE_WAIT_PROV, Nd_State_Or);
	}

	/*
	 * then start a immediate work task to start provisioning
	 * based on max allowed provisioings - start an immediate
	 * work task repeatable every PROV_POLL interval
	 */
	ptask_start_prov = set_task(WORK_Immed, 0,
				    do_provisioning, NULL);

	if (ptask_start_prov == NULL) {
		free(prov_vnode_list);
		if (aoe_req)
			free(aoe_req);
		return (PBSE_INTERNAL);
	}

	DBPRT(("%s: Provisioning will be done\n", __func__))

	free(prov_vnode_list);
	if (aoe_req)
		free(aoe_req);

	/* could be a good time to resize the prov table */
	resize_prov_table(max_concurrent_prov);

	*need_prov = 1;
	return (PBSE_NONE);
}

/**
 * @brief
 *		Starts as many provisioning as possible from the list available
 *		with server.
 *
 * @par Functionality:
 *      This function is called by a work task. It runs as many provisioning
 *		from the linked list as allowed. It calls start_vnode_provisioning()
 *		to start the provisioning for a vnode. If starting a provisioning fails
 *		it does not fail the vnode but the job that was waiting on that vnode is
 *		held.
 *
 * @see
 *		start_vnode_provisioning
 *
 * @param[in]	wtask	-	pointer to work_task
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */

void
do_provisioning(struct work_task *wtask)
{
	struct prov_vnode_info *prov_vnode_info;
	struct pbsnode *pnode;
	int rc;

	prov_vnode_info = GET_NEXT(prov_allvnodes);

	/*
	 * check number of provisionings needed to be done,
	 * should not cross max limit
	 */
	while (prov_vnode_info &&
	       (server.sv_cur_prov_records < max_concurrent_prov)) {

		/*
		 * allocate prov_vnode_info, its kept as long as provisioning
		 * goes on. This will be freed by the fail_vnode,
		 * prov_request_deferred(if script failed),prov_request_timeout
		 * (always), is_vnode_prov_done
		 * (before running job)
		 */

		/* remove this node from the linked list */
		delete_link(&prov_vnode_info->al_link);

		pnode = find_nodebyname(prov_vnode_info->pvnfo_vnode);
		if (pnode == NULL) {
			DBPRT(("%s: node %s was deleted\n", __func__,
			       prov_vnode_info->pvnfo_vnode))
			free_pvnfo(prov_vnode_info);
			prov_vnode_info = GET_NEXT(prov_allvnodes);
			continue;
		}

		rc = start_vnode_provisioning(prov_vnode_info);

		if (rc != 0) {
			/* we want to fail jobs/resv but not the node */
			/* fail all the jobs that were logged on this vnode */
			/* vnode is not offlined */
			fail_vnode_job(prov_vnode_info, 0);

			/* this node will not provision, remove flag */
			pnode = find_nodebyname(prov_vnode_info->pvnfo_vnode);
			if (pnode) {
				DBPRT(("%s: \n", __func__))
				set_vnode_state(pnode, ~(INUSE_PROV | INUSE_WAIT_PROV),
						Nd_State_And);
			}
			free_pvnfo(prov_vnode_info);
		}
		prov_vnode_info = GET_NEXT(prov_allvnodes);
	}

	/* Save provisioning records to file */
	prov_track_save();
}

/**
 * @brief
 *		Deletes prov_vnode_info entry.
 *
 * @par Functionality:
 *      This function deletes all prov_vnode_info entries for a job in server.
 *
 * @see
 *		fail_vnode_job
 *
 * @param[in]	pjob	-	pointer to job
 *
 * @return	void
 *
 * @par Side Effects:
 *      Unknown
 *
 * @par MT-safe:	No
 *
 */
static void
del_prov_vnode_entry(job *pjob)
{
	struct prov_vnode_info *tmp_record;
	struct prov_vnode_info *nxt_record;
	struct pbsnode *pnode;

	/* since entry is plucked from list, it won't come again */
	tmp_record = GET_NEXT(prov_allvnodes);
	while (tmp_record) {
		nxt_record = GET_NEXT(tmp_record->al_link);
		if (strcmp(tmp_record->pvnfo_jobid, pjob->ji_qs.ji_jobid) == 0) {
			delete_link(&tmp_record->al_link);
			DBPRT(("%s: vnode %s\n", __func__, tmp_record->pvnfo_vnode))
			/* node is no longer going to provision */
			pnode = find_nodebyname(tmp_record->pvnfo_vnode);
			if (pnode)
				set_vnode_state(pnode,
						~(INUSE_PROV | INUSE_WAIT_PROV),
						Nd_State_And);
			free_pvnfo(tmp_record);
		}
		tmp_record = nxt_record;
	}
}

/**
 * @brief
 * function to enable/disable power_provisioning
 *
 * Reflect the change to the server attribute from enabled flag for
 * a PBS hook.
 * *
 * @return	None.
 */
void
set_srv_pwr_prov_attribute()
{
	char hook_name[] = PBS_POWER;
	hook *phook = NULL;
	int val = 0;
	unsigned int action = 0;
	char str_val[2] = {0};

	phook = find_hook(hook_name);
	if (phook == NULL)
		return;

	if (phook->enabled == TRUE)
		val = 1;

	snprintf(str_val, sizeof(str_val), "%d", val);
	set_sattr_str_slim(SVR_ATR_PowerProvisioning, str_val, NULL);

	/*
	 * The enabled attribute is changed so send the attributes.
	 * If enabled is true, we also need to send the hook.
	 */
	action = MOM_HOOK_ACTION_SEND_ATTRS;
	if (val)
		action |= MOM_HOOK_ACTION_SEND_SCRIPT;
	add_pending_mom_hook_action(NULL, hook_name, action);
}

/**
 * @brief
 *		action_backfill_depth - action function for backfill_depth
 *				valid input range is >=1
 *
 * @param[in]	pattr	-	The estimated start time frequency
 * @param[in]	pobj	-	object being considered
 * @param[in]	actmode	-	action mode
 *
 * @return	Whether function completed successfully or not
 * @retval	PBSE_NONE	: when no errors are encountered
 * @retval	PBSE_BADATVAL	: if bad attribute is attempted to be set
 *
 * @return	int
 */
int
action_backfill_depth(attribute *pattr, void *pobj, int actmode)
{

	if (pattr == NULL)
		return PBSE_NONE;

	if (actmode == ATR_ACTION_ALTER || actmode == ATR_ACTION_RECOV) {
		if (pattr->at_val.at_long < 0)
			return PBSE_BADATVAL;
	}
	return PBSE_NONE;
}

/**
 * @brief
 *	action_jobscript_max_size - action function for jobscript_max_size
 *	valid input range is >=1 to <=2GB
 *
 * @param[in] pattr - server attributes (jobscript_max_size)
 * @param[in] pobj  - object being considered
 * @param[in] actmode - action mode
 *
 * @return Whether function completed successfully or not
 * @retval PBSE_NONE when no errors are encountered
 * @retval PBSE_BADJOBSCRIPTMAXSIZE when size is set to more than 2GB
 *
 */

int
action_jobscript_max_size(attribute *pattr, void *pobj, int actmode)
{
	attribute attrib;
	if (pattr == NULL)
		return PBSE_NONE;
	set_attr_generic(&attrib, &svr_attr_def[SVR_ATR_jobscript_max_size], "2gb", NULL, INTERNAL);
	if (actmode == ATR_ACTION_ALTER || actmode == ATR_ACTION_RECOV) {
		if (comp_size(pattr, &attrib) > 0)
			return PBSE_BADJOBSCRIPTMAXSIZE;
	}
	set_size(&attr_jobscript_max_size, pattr, SET);
	return PBSE_NONE;
}

/**
 * @brief
 *	action_check_res_to_release - action function for restrict_res_to_release_on_suspend
 *	it validates that input is a list of legitimate resource names
 *
 * @param[in] pattr - server attribute
 * @param[in] pobj  - object being considered
 * @param[in] actmode - action mode
 *
 * @return Whether function completed successfully or not
 * @retval PBSE_NONE when no errors are encountered
 * @retval PBSE_UNKRESC when any of the resource is not known
 *
 */

int
action_check_res_to_release(attribute *pattr, void *pobj, int actmode)
{
	int i;
	if (pattr == NULL)
		return PBSE_NONE;

	if (actmode == ATR_ACTION_ALTER || actmode == ATR_ACTION_NEW) {
		for (i = 0; i < pattr->at_val.at_arst->as_usedptr; i++) {
			if (find_resc_def(svr_resc_def, pattr->at_val.at_arst->as_string[i]) == NULL)
				return PBSE_UNKRESC;
		}
	}
	return PBSE_NONE;
}

/**
  * @brief
  *      Unset jobscript_max_size attribute.
  *
  * @par Functionality:
  *      This function unsets the jobscript_max_size server attribute
  *      by reverting it back to it's default value.
  *
  * @param[in]   void
  *
  * @return      void
  *
  */
void
unset_jobscript_max_size(void)
{
	log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_NOTICE, msg_daemonname,
		   "unsetting jobscript_max_size - reverting back to default val %s",
		   DFLT_JOBSCRIPT_MAX_SIZE);
	set_attr_generic(&attr_jobscript_max_size, &svr_attr_def[SVR_ATR_jobscript_max_size], DFLT_JOBSCRIPT_MAX_SIZE, NULL, INTERNAL);
}

/**
 * @brief
 *		Create a copy of the job script from database to a temporary file
 *		This filename is then passed onto the sendjob process to send the
 *		jobfile to the target mom/server
 *
 * @param[in]	pj	-	Job pointer
 * @param[out]	script_name	-	Name of the temporary filename to which
 *								the job script was copied to
 *
 * @return	Error code
 * @retval	0	: Success
 * @retval	-1	: Failure
 *
 */
extern char *msg_script_open;
extern char *msg_script_write;
extern char *path_spool;

/*
 * @brief
 *  	Loads the job-script associated to the job from the database.
 *  	It populates the ji_script field of the job as well as returns
 *      a pointer to the script
 *
 * @param[in, out] pj - Job pointer. pj->ji_script has the script loaded into it.
 *
 * @return Text buffer containing the job script
 * @retval NULL  - Failed to load job script
 * @retval !NULL - Job script
 *
 */
char *
svr_load_jobscript(job *pj)
{
	void *conn = (void *) svr_db_conn;
	pbs_db_jobscr_info_t jobscr;
	pbs_db_obj_info_t obj;

	if (pj->ji_script) {
		free(pj->ji_script);
		pj->ji_script = NULL;
	}

	if (pj->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
		strcpy(jobscr.ji_jobid, pj->ji_parentaj->ji_qs.ji_jobid);
	} else {
		strcpy(jobscr.ji_jobid, pj->ji_qs.ji_jobid);
	}
	jobscr.script = NULL;
	obj.pbs_db_obj_type = PBS_DB_JOBSCR;
	obj.pbs_db_un.pbs_db_jobscr = &jobscr;

	if (pbs_db_load_obj(conn, &obj) != 0) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "Failed to load job script for job %s from PBS datastore",
			 pj->ji_qs.ji_jobid);
		log_err(-1, __func__, log_buffer);
		return NULL;
	}

	if (jobscr.script == NULL) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "Out of memory loading script for job %s from PBS datastore",
			 pj->ji_qs.ji_jobid);
		log_err(-1, __func__, log_buffer);
		return NULL;
	}

	pj->ji_script = jobscr.script;

	return jobscr.script;
}

/*
 * @brief
 *  	Write the job script from the job structure into a temporary file
 *
 * @param[in] pj - Job pointer
 * @param[in] script_name - The name of the script file to be created in tmpdir
 *
 * @return Error code
 * @retval -1 - Failure
 * @retval  0 - Success
 */
int
svr_create_tmp_jobscript(job *pj, char *script_name)
{
	int fds;
	int filemode = 0600;
	int len;

	if (pj->ji_script == NULL) {
		(void) snprintf(log_buffer, sizeof(log_buffer), "Job has no script loaded!! Can't write temp job script");
		log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_INFO, pj->ji_qs.ji_jobid, log_buffer);
		return -1;
	}

	(void) strcpy(script_name, pbs_conf.pbs_tmpdir);
	(void) strcat(script_name, "/");

	if (*pj->ji_qs.ji_fileprefix != '\0')
		(void) strcat(script_name, pj->ji_qs.ji_fileprefix);

	(void) strcat(script_name, pj->ji_qs.ji_jobid);
	(void) strcat(script_name, JOB_SCRIPT_SUFFIX);

	fds = open(script_name, O_WRONLY | O_CREAT, filemode);
	if (fds < 0) {
		log_err(errno, __func__, msg_script_open);
		return -1;
	}

	len = strlen(pj->ji_script);
	if (write(fds, pj->ji_script, len) != len) {
		log_err(errno, __func__, msg_script_write);
		(void) close(fds);
		return -1;
	}

	(void) close(fds);
	return 0;
}

/**
 * @brief
 * 		Determines type of place directive.
 *
 * @param[in]	place_str	: The string representation of the place directive
 * @param[in]	by	: The type of exclusivity to check for.
 *
 * @return	The place sharing type
 *
 * @par MT-Safe: No
 */
enum vnode_sharing
place_sharing_type(char *place_str, enum vnode_sharing by)
{
	enum vnode_sharing ret = VNS_UNSET;

	if (place_str == NULL)
		return ret;

	if (by == VNS_FORCE_EXCL) {
		if (place_sharing_check(place_str, PLACE_Excl))
			ret = VNS_FORCE_EXCL;
	} else if (by == VNS_FORCE_EXCLHOST) {
		if (place_sharing_check(place_str, PLACE_ExclHost))
			ret = VNS_FORCE_EXCLHOST;
	} else if (by == VNS_IGNORE_EXCL) {
		if (place_sharing_check(place_str, PLACE_Shared))
			ret = VNS_IGNORE_EXCL;
	}

	return ret;
}
/**
 * @brief
 * 		function for default queue check.
 *
 * @param[in]	pattr	-	The estimated start time frequency
 * @param[in]	pobj	-	object being considered
 * @param[in]	actmode	-	action mode
 *
 * @return	Whether function completed successfully or not
 * @retval	PBSE_NONE	: when no errors are encountered
 * @retval	PBSE_UNKQUE	: Unknown queue name
 * @retval	PBSE_INTERNAL	: on an internal error
 *
 */
int
default_queue_chk(attribute *pattr, void *pobj, int actmode)
{
	pbs_queue *pq = NULL;

	if (pattr == NULL) {
		return (PBSE_INTERNAL);
	}

	if (actmode == ATR_ACTION_ALTER) {
		if (is_attr_set(pattr)) {
			pq = find_queuebyname(pattr->at_val.at_str);
			if (pq == NULL) {
				return (PBSE_UNKQUE);
			}
		}
	}
	return (PBSE_NONE);
}

/**
 *
 * @brief
 *		Marks a connection flag that tells a qsub daemon that something has
 *		changed in the server, and its req_queuejob request needs to be redone.
 *
 */
void
force_qsub_daemons_update(void)
{
	conn_t *cp = NULL;
	if (svr_allconns.ll_next == NULL)
		return;
	for (cp = (conn_t *) GET_NEXT(svr_allconns); cp; cp = GET_NEXT(cp->cn_link)) {
		if (cp->cn_authen & PBS_NET_CONN_FROM_QSUB_DAEMON)
			cp->cn_authen |= PBS_NET_CONN_FORCE_QSUB_UPDATE;
	}
}

/**
 * @brief
 *		The action function for the "default_qsub_arguments" server
 *		attribute, which tells qsub daemons to redo some req_queuejob
 *		operation as this attribute has changed.

 * @param[in]	pattr	-	target "default_qsub_arguments" attribute value
 * @param[in]	pobject	-	pointer to some parent object.(required but unused here)
 * @param[in]	actmode	-	the action to take (e.g. ATR_ACTION_ALTER)
 *
 * @return	Whether or not okay to set to new value.
 * @retval	PBSE_NONE	: Action is okay.
 * @retval	PBSE_INTERNAL	: for any error.
 */
int
force_qsub_daemons_update_action(attribute *pattr, void *pobj, int actmode)
{
	if (pattr == NULL) {
		return (PBSE_INTERNAL);
	}
	force_qsub_daemons_update();

	return (PBSE_NONE);
}

/**
 * @brief
 *		are_we_primary - determines the failover role, are we the Primary
 *		Server, the Secondary Server or the only Server (no failover)
 *
 * @return  int			- failover server role
 * @retval  FAILOVER_NONE		- failover not configured
 * @retval  FAILOVER_PRIMARY		- Primary Server
 * @retval  FAILOVER_SECONDARY		- Secondary Server
 * @retval  FAILOVER_CONFIG_ERROR	- error in pbs.conf configuration
 */
enum failover_state
are_we_primary(void)
{
	char hn1[PBS_MAXHOSTNAME + 1];

	/* both secondary and primary should be set or neither set */
	if ((pbs_conf.pbs_secondary == NULL) && (pbs_conf.pbs_primary == NULL))
		return FAILOVER_NONE;
	if ((pbs_conf.pbs_secondary == NULL) || (pbs_conf.pbs_primary == NULL))
		return FAILOVER_CONFIG_ERROR;

	if (get_fullhostname(pbs_conf.pbs_primary, primary_host, (sizeof(primary_host) - 1)) == -1) {
		log_err(-1, "pbsd_main", "Unable to get full host name of primary");
		return FAILOVER_CONFIG_ERROR;
	}

	if (strcmp(primary_host, server_host) == 0)
		return FAILOVER_PRIMARY; /* we are the listed primary */

	if (get_fullhostname(pbs_conf.pbs_secondary, hn1, (sizeof(hn1) - 1)) == -1) {
		log_err(-1, "pbsd_main", "Unable to get full host name of secondary");
		return FAILOVER_CONFIG_ERROR;
	}
	if (strcmp(hn1, server_host) == 0)
		return FAILOVER_SECONDARY; /* we are the secondary */

	return FAILOVER_CONFIG_ERROR; /* cannot be neither */
}

/**
 * @brief
 * 		dumps the memory usage of the heap into server log in every 10 minutes.
 *
 * @param[in]	ptask	-	pointer to the work task
 *
 * @return	void
 *
 * @par MT-Safe: Yes
 * @par Side Effects: None
 *
 */
void
memory_debug_log(struct work_task *ptask)
{

	if (ptask)
		(void) set_task(WORK_Timed, time_now + 600, memory_debug_log, NULL);
	if (!will_log_event(PBSEVENT_DEBUG4))
		return;
	snprintf(log_buffer, LOG_BUF_SIZE, "MEM_DEBUG: sbrk: %zu", (size_t) sbrk(0));
	log_event(PBSEVENT_DEBUG4, PBS_EVENTCLASS_SERVER, LOG_DEBUG, msg_daemonname, log_buffer);
#ifdef HAVE_MALLOC_INFO
	char *buf;
	buf = get_mem_info();
	if (buf) {
		log_event(PBSEVENT_DEBUG4, PBS_EVENTCLASS_SERVER, LOG_DEBUG, msg_daemonname, buf);
		free(buf);
	}
#endif /* malloc_info */
}
