/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 *
 * @brief
 *		all the functions related to node management.
 *
 */

#include <pbs_config.h> /* the master config generated by configure */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <netdb.h>
#include <netinet/in.h>
#include <stddef.h>
#include <time.h>

#include "portability.h"
#include "libpbs.h"
#include "server_limits.h"
#include "list_link.h"
#include "attribute.h"
#include "resource.h"
#include "server.h"
#include "net_connect.h"
#include "work_task.h"
#include "job.h"
#include "reservation.h"
#include "acct.h"
#include "queue.h"
#include "pbs_nodes.h"
#include "log.h"
#include "tpp.h"
#include "dis.h"
#include "resmon.h"
#include "mom_server.h"
#include "pbs_license.h"
#include "ticket.h"
#include "placementsets.h"
#include "pbs_ifl.h"
#include "grunt.h"
#include "libutil.h"
#include "pbs_db.h"
#include "batch_request.h"
#include "hook_func.h"
#include "sched_cmds.h"
#include "provision.h"
#include "pbs_sched.h"
#include "svrfunc.h"

#if !defined(H_ERRNO_DECLARED)
extern int h_errno;
#endif

int mom_send_vnode_map = 0; /* server must send vnode map to Mom */
int svr_quehasnodes;

static int mtfd_replyhello = -1;
static int mtfd_replyhello_noinv = -1;

static int cvt_overflow(size_t, size_t);
static int cvt_realloc(char **, size_t *, char **, size_t *);

static void set_resv_for_degrade(struct pbsnode *pnode, resc_resv *presv);
extern time_t time_now;
extern int server_init_type;

extern int ctnodes(char *);
extern char *resc_in_err;
extern struct server server;
extern int tpp_network_up; /* from pbsd_main.c - used only in case of TPP */

extern unsigned int pbs_mom_port;

extern char *msg_noloopbackif;
extern char *msg_job_end_stat;
extern char *msg_daemonname;
extern char *msg_new_inventory_mom;
extern pbs_list_head svr_allhooks;

extern void is_vnode_prov_done(char *); /* for provisioning */
extern void free_prov_vnode(struct pbsnode *);
extern void fail_vnode_job(struct prov_vnode_info *, int);
extern struct prov_tracking *get_prov_record_by_vnode(char *);
extern int parse_prov_vnode(char *, exec_vnode_listtype *);

static void check_and_set_multivnode(struct pbsnode *);
int write_single_node_mom_attr(struct pbsnode *np);

static char *hook_privilege = "Not allowed to update vnodes or to request scheduler restart cycle, if run as a non-manager/operator user %s@%s";

extern struct python_interpreter_data svr_interp_data;

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
extern void svr_renew_job_cred(struct work_task *pwt);
#endif

extern long node_fail_requeue;

extern void propagate_licenses_to_vnodes(mominfo_t *pmom);

#define SKIP_NONE 0
#define SKIP_EXCLUSIVE 1
#define SKIP_ANYINUSE 2

#define GLOB_SZ 511
#define STR_TIME_SZ 20

#define MAX_NODE_WAIT 600

/*
 * Tree search generalized from Knuth (6.2.2) Algorithm T just like
 * the AT&T man page says.
 *
 * The tree structure is for internal use only, lint doesn't grok it.
 *
 * Written by reading the System V Interface Definition, not the code.
 *
 */
/*LINTLIBRARY*/

/*
 **      Modified by Tom Proett for PBS.
 */

struct tree *ipaddrs = NULL; /* tree of ip addrs */
struct tree *streams = NULL; /* tree of stream numbers */

extern pntPBS_IP_LIST pbs_iplist;

static int
comp_keys(u_long key1, u_long key2, struct tree *pt)
{
	if (key1 == pt->key1) {
		if (key2 == pt->key2)
			return 0;
		else if (key2 < pt->key2)
			return -1;
		else
			return 1;
	} else if (key1 < pt->key1)
		return -1;
	else
		return 1;
}

/**
 * @brief
 *  	find value in tree, return NULL if not found
 *
 * @param[in]	key1	-	key to be located
 * @param[in]	key2 	-	key to be located
 * @param[in]	rootp 	-	address of tree root
 *
 * @return	mominfo_t *
 * @retval	a pointer to the mominfo_t object located in the tree	- found
 * @retval	NULL	- not found
 *
 * @par MT-safe: No
 */
mominfo_t *
tfind2(const u_long key1, const u_long key2, struct tree **rootp)
{
	if (rootp == NULL)
		return NULL;

	while (*rootp != NULL) { /* Knuth's T1: */
		int i;

		i = comp_keys(key1, key2, *rootp);
		if (i == 0)
			return (*rootp)->momp; /* we found it! */
		else if (i < 0)
			rootp = &(*rootp)->left; /* T3: follow left branch */
		else
			rootp = &(*rootp)->right; /* T4: follow right branch */
	}
	return NULL;
}
/**
 * @brief
 *  	insert a mom on the tree.
 *
 * @param[in]	key1	-	key to be located
 * @param[in]	key2	-	key to be located
 * @param[in]	momp 	-	key to be located
 * @param[in,out]	rootp 	-	address of tree root
 *
 * @return	mominfo_t *
 * @retval	a pointer to the mominfo_t object located in the tree	- found
 * @retval	NULL	- not found
 *
 * @par MT-safe: No
 */
void
tinsert2(const u_long key1, const u_long key2, mominfo_t *momp, struct tree **rootp)
{
	int i;
	struct tree *q;

	DBPRT(("tinsert2: %lu|%lu %s stream %d\n", key1, key2,
	       momp->mi_host, momp->mi_dmn_info ? momp->mi_dmn_info->dmn_stream : -1))

	if (rootp == NULL)
		return;
	while (*rootp != NULL) { /* Knuth's T1: */
		i = comp_keys(key1, key2, *rootp);
		if (i == 0)
			return; /* we found it! */
		else if (i < 0)
			rootp = &(*rootp)->left; /* T3: follow left branch */
		else
			rootp = &(*rootp)->right; /* T4: follow right branch */
	}
	q = (struct tree *) malloc(sizeof(struct tree));
	/* T5: key not found */
	if (q != NULL) {	/* make new node */
		*rootp = q;	/* link new node to old */
		q->key1 = key1; /* initialize new node */
		q->key2 = key2; /* initialize new node */
		q->momp = momp;
		q->left = q->right = NULL;
	}
	return;
}

/**
 * @brief Send the IS_CLUSTER_ADDRS message to Mom so she has the
 *      latest list of IP addresses of the all the Moms in the complex.
 *
 * @param[in] stream - the open stream to the Mom
 * @param[in] combine_msg - combine message in the caller
 *
 * @return int
 * @retval DIS_SUCCESS (0) for success
 * @retval != 0 otherwise.
 */
static int
send_ip_addrs_to_mom(int stream, int combine_msg)
{
	int j;
	int ret;

	DBPRT(("%s: entered\n", __func__))

	if (stream < 0)
		return -1;

	if (!combine_msg)
		if ((ret = is_compose(stream, IS_CLUSTER_ADDRS)) != DIS_SUCCESS)
			return (ret);

	if ((ret = diswui(stream, pbs_iplist->li_nrowsused)) != DIS_SUCCESS)
		return ret;

	for (j = 0; j < pbs_iplist->li_nrowsused; j++) {
#ifdef DEBUG
		unsigned long ipaddr;
		ipaddr = IPLIST_GET_LOW(pbs_iplist, j);
		DBPRT(("%s: ip %d\t%ld.%ld.%ld.%ld\n", __func__, j,
		       (ipaddr & 0xff000000) >> 24,
		       (ipaddr & 0x00ff0000) >> 16,
		       (ipaddr & 0x0000ff00) >> 8,
		       (ipaddr & 0x000000ff)))
#endif /* DEBUG */
		DBPRT(("%s: depth %ld\n", __func__, (long) IPLIST_GET_HIGH(pbs_iplist, j)))
		if ((ret = diswul(stream, IPLIST_GET_LOW(pbs_iplist, j))) != DIS_SUCCESS)
			return (ret);
		if ((ret = diswul(stream, IPLIST_GET_HIGH(pbs_iplist, j))) != DIS_SUCCESS)
			return (ret);
	}
	if (!combine_msg)
		return dis_flush(stream);
	return 0;
}

/**
 * @brief Reply to IS_HELLOSVR
 * Sending all the information mom needs from the server.
 * including need inventory, rpp value and mom ip addresses.
 *
 * @param[in] stream - the open stream to the Mom
 * @param[in] need_inv - whether the server needs inventory of the mom.
 *
 * @return int
 * @retval DIS_SUCCESS (0) for success
 * @retval != 0 otherwise.
 */
static int
reply_hellosvr(int stream, int need_inv)
{
	int ret;

	DBPRT(("%s: entered\n", __func__))

	if (stream < 0)
		return -1;

	if ((ret = is_compose(stream, IS_REPLYHELLO)) != DIS_SUCCESS)
		return ret;

	if ((ret = diswsi(stream, need_inv)) != DIS_SUCCESS)
		return ret;

	if ((ret = send_ip_addrs_to_mom(stream, 1)) != DIS_SUCCESS)
		return ret;

	return dis_flush(stream);
}

/**
 * @brief
 *  	delete node with given key
 *
 * @param[in]	key1	-	key to be located
 * @param[in]	key2	-	key to be located
 * @param[in]	rootp 	-	address of tree root
 *
 * @return	root node
 * @retval	root	- after successful deletion.
 * @retval	NULL	- could not found the key to be freed.
 */
void *
tdelete2(const u_long key1, const u_long key2, struct tree **rootp)
{
	struct tree *p;
	struct tree *q;
	struct tree *r;
	int i;

	DBPRT(("tdelete2: %lu|%lu\n", key1, key2))
	if (rootp == NULL || (p = *rootp) == NULL)
		return NULL;
	while ((i = comp_keys(key1, key2, *rootp)) != 0) {
		p = *rootp;
		rootp = (i < 0) ? &(*rootp)->left : /* left branch */
				&(*rootp)->right;   /* right branch */
		if (*rootp == NULL)
			return NULL; /* key not found */
	}
	r = (*rootp)->right;		  /* D1: */
	if ((q = (*rootp)->left) == NULL) /* Left */
		q = r;
	else if (r != NULL) {	       /* Right is null? */
		if (r->left == NULL) { /* D2: Find successor */
			r->left = q;
			q = r;
		} else { /* D3: Find NULL link */
			for (q = r->left; q->left != NULL; q = r->left)
				r = q;
			r->left = q->right;
			q->left = (*rootp)->left;
			q->right = (*rootp)->right;
		}
	}
	free((struct tree *) *rootp); /* D4: Free node */
	*rootp = q;		      /* link parent to new node */
	return (p);
}
/**
 * @brief
 *  	free the entire tree
 *
 * @param[in]	rootp 	-	address of tree root
 *
 * @return	void
 */
void
tfree2(struct tree **rootp)
{
	if (rootp == NULL || *rootp == NULL)
		return;
	tfree2(&(*rootp)->left);
	tfree2(&(*rootp)->right);
	free(*rootp);
	*rootp = NULL;
}

/**
 * @brief
 * 		get the addr of the host on which a node is defined
 *
 * @param[in]	name	- is in one of the forms:
 *							nodename[:DDDD][:resc=val...]
 *							nodename[:DDDD]/DD[*DD]
 *							where D is a numerical digit;  :DDDD is a port number
 * @param[in]	port	- the port number as commonly used in exec_vnode string or
 * 							exec_host string
 *
 * @return	The IP address and port from the first Mom declared for the node
 *
 * @par MT-safe: No
 */
pbs_net_t
get_addr_of_nodebyname(char *name, unsigned int *port)
{
	char *nodename;
	struct pbsnode *np;

	nodename = parse_servername(name, NULL);
	/* ignore the port which might have been found in the string */
	np = find_nodebyname(nodename);
	if (np == 0 || is_nattr_set(np, ND_ATR_Mom) == 0)
		return (0);
	/* address and port from mom_svrinfo */
	*port = np->nd_moms[0]->mi_port;
	return (get_hostaddr(np->nd_moms[0]->mi_host));
}

enum Set_All_State_When {
	Set_ALL_State_All_Down,	  /* set on vnodes when all Moms are down */
	Set_All_State_Regardless, /* set on vnodes regardless */
	Set_All_State_All_Offline /* set on vnodes when all Moms are offline */
};

/**
 * @brief
 * 		set or clear state bits on the mominfo entry and all
 *		virtual nodes under that Mom and set the comment, if txt is null,
 *		set the comment, if txt is null,
 *		do_set = 1 means set the bits in "bits", otherwise clear them
 *
 * @param[in]	pmom	- pointer to mom
 * @param[in]	do_set	- do_set = 1 means set the bits, otherwise clear them
 * @param[in]	txt		- set the comment, if txt is null, set the comment, if txt is null,
 * @param[in]	setwhen	- of type Set_All_State_When enum, having two states.
 *
 * @return	void
 *
 * @par MT-safe: No
 */
static void
set_all_state(mominfo_t *pmom, int do_set, unsigned long bits, char *txt,
	      enum Set_All_State_When setwhen)
{
	int imom;
	unsigned long mstate;
	mom_svrinfo_t *psvrmom = (mom_svrinfo_t *) (pmom->mi_data);
	dmn_info_t *pdmn_info = pmom->mi_dmn_info;
	struct pbsnode *pvnd;
	attribute *pat;
	int nchild;
	unsigned long inuse_flag = 0;

	if (do_set) { /* STALE is not meaning in the state of the Mom, don't set it */
		pdmn_info->dmn_state |= (bits & ~INUSE_STALE);
	} else {
		pdmn_info->dmn_state &= ~bits;
	}

	log_eventf(PBSEVENT_DEBUG2, PBS_EVENTCLASS_NODE, LOG_INFO, pmom->mi_host,
		   "set_all_state;txt=%s mi_modtime=%ld", txt, pmom->mi_modtime);

	/* Set the inuse_flag based off the value of setwhen */
	if (setwhen == Set_ALL_State_All_Down) {
		inuse_flag = INUSE_DOWN;
	} else if (setwhen == Set_All_State_All_Offline) {
		inuse_flag = INUSE_OFFLINE_BY_MOM;
	}

	for (nchild = 0; nchild < psvrmom->msr_numvnds; ++nchild) {
		int do_this_vnode;

		do_this_vnode = 1;

		pvnd = psvrmom->msr_children[nchild];

		/*
		 * If this vnode has more than one Mom and
		 * setwhen is Set_ALL_State_All_Down or
		 * setwhen is Set_All_State_All_Offline, then we only change
		 * state if all Moms are down/offline
		 */
		if ((pvnd->nd_nummoms > 1) &&
		    ((setwhen == Set_ALL_State_All_Down) ||
		     (setwhen == Set_All_State_All_Offline))) {
			for (imom = 0; imom < pvnd->nd_nummoms; ++imom) {
				mstate = pvnd->nd_moms[imom]->mi_dmn_info->dmn_state;
				if ((mstate & inuse_flag) == 0) {
					do_this_vnode = 0;
					break;
				}
			}
		}
		/* Skip resetting state only on cray_compute nodes when state is sleep */
		if ((pvnd->nd_state & INUSE_SLEEP) &&
		    (setwhen == Set_All_State_Regardless) &&
		    (bits & INUSE_SLEEP) &&
		    !(do_set)) {
			resource_def *prd;
			resource *prc;
			pat = &pvnd->nd_attr[(int) ND_ATR_ResourceAvail];
			prd = find_resc_def(svr_resc_def, "vntype");
			if (pat && prd && (prc = find_resc_entry(pat, prd))) {
				if (strcmp(prc->rs_value.at_val.at_arst->as_string[0], CRAY_COMPUTE) == 0)
					do_this_vnode = 0;
			}
		}
		if (do_this_vnode == 0)
			continue; /* skip setting state on this vnode */

		if (do_set) {
			set_vnode_state(pvnd, bits, Nd_State_Or);
		} else {
			set_vnode_state(pvnd, ~bits, Nd_State_And);
			if ((bits & INUSE_OFFLINE_BY_MOM) &&
			    (pvnd->nd_state & INUSE_OFFLINE)) {
				log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_NODE,
					  LOG_NOTICE, pvnd->nd_name,
					  "clearing offline_by_mom state for "
					  "vnode: still offlined because of "
					  "previous admin offline action`");
			}
		}

		post_attr_set(get_nattr(pvnd, ND_ATR_state));
		pat = get_nattr(pvnd, ND_ATR_Comment);

		/*
		 * change the comment only if it is a default comment (set by
		 * the serve and not the Manager;  if "txt" is null, just
		 * clear (unset) the comment
		 *
		 * comments set as part of INUSE_OFFLINE_BY_MOM state
		 * action should not be touched.
		 */

		if ((bits & INUSE_OFFLINE_BY_MOM) ||
		    ((is_attr_set(pat)) == 0) ||
		    ((pat->at_flags & ATR_VFLAG_DEFLT) != 0)) {

			/* default comment */
			free_attr(node_attr_def, pat, ND_ATR_Comment);
			if (txt)
				set_attr_generic(pat, &node_attr_def[(int) ND_ATR_Comment], txt, NULL, INTERNAL);

			if (do_set && (bits & INUSE_OFFLINE_BY_MOM)) {
				/* this means not directly set by the server */
				/* This means server did not set comment */
				/* directly but as done per mom */
				pat->at_flags &= ~ATR_VFLAG_DEFLT;
				mark_attr_set(pat);
			} else {
				/* ATR_VFLAG_DEFLT means server set comment */
				/* itself */
				pat->at_flags |= ATR_VFLAG_DEFLT;
			}
		}
	}
}

/**
 * @brief
 * 		requeue/delete job on primary node going down.
 *
 * @par Functionality:
 *		If the primary, Mother Superior, node of a job goes down, it
 *		should be requeued if possible or delete.
 *
 *		Called via a work-task set up in momptr_down()
 * @see
 * 		momptr_down
 *
 * @param[in]	pwt	-	work task structure.
 *
 * @return	void
 */

static void
node_down_requeue(struct work_task *pwt)
{
	char *nname;
	mominfo_t *mp;
	mom_svrinfo_t *svmp;
	job *pj;
	struct pbsnode *np;
	struct pbssubn *psn;
	struct jobinfo *pjinfo;
	struct jobinfo *pjinfo_nxt;
	int nchild;
	int cnt;
	int i;
	char *tmp_acctrec = NULL;
	struct pbsnode *vnode = NULL;
	exec_vnode_listtype prov_vnode_list = NULL;
	struct prov_tracking *ptracking;
	struct prov_vnode_info *prov_vnode_info;

	DBPRT(("node_down_requeue invoked\n"))
	if (!pwt) {
		sprintf(log_buffer, "Illegal value passed to %s", __func__);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ERR,
			  msg_daemonname, log_buffer);
		return;
	}
	mp = (mominfo_t *) pwt->wt_parm1;
	if (!mp) {
		sprintf(log_buffer, "Illegal mominfo value in %s", __func__);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ERR,
			  msg_daemonname, log_buffer);
		return;
	}
	svmp = (mom_svrinfo_t *) (mp->mi_data);
	if (!svmp) {
		sprintf(log_buffer, "Illegal srvinfo value in %s", __func__);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ERR,
			  msg_daemonname, log_buffer);
		return;
	}

	/* clear ptr to this worktask */
	svmp->msr_wktask = 0;

	/* is node still down? If not, leave jobs as is */
	if ((mp->mi_dmn_info->dmn_state & INUSE_DOWN) == 0)
		return;

	DBPRT(("node_down_requeue node still down\n"))

	for (nchild = 0; nchild < svmp->msr_numvnds; ++nchild) {
		np = svmp->msr_children[nchild];
		/* is node still provisioning? If yes, leave jobs as is */
		if ((np->nd_state & INUSE_PROV) == 0) {
			DBPRT(("node_down_requeue node still provisioning\n"))

			for (psn = np->nd_psn; psn; psn = psn->next) {
				for (pjinfo = psn->jobs; pjinfo; pjinfo = pjinfo_nxt) {
					pj = find_job(pjinfo->jobid);
					pjinfo_nxt = pjinfo->next;
					while (pjinfo_nxt && !strcmp(pjinfo_nxt->jobid, pj->ji_qs.ji_jobid)) {
						/* skip over next occurrence of same job in list*/
						/* if it is deleted in discard_job(), we would	*/
						/* have a pointer to nothingness		*/
						pjinfo_nxt = pjinfo_nxt->next;
					}

					nname = parse_servername(
						get_jattr_str(pj, JOB_ATR_exec_vnode), NULL);
					if (nname && (strcasecmp(np->nd_name, nname) == 0)) {
						/* node is Mother Superior for job */
						set_jattr_l_slim(pj, JOB_ATR_exit_status, JOB_EXEC_RERUN_MS_FAIL, SET);

						sprintf(log_buffer, msg_job_end_stat, JOB_EXEC_RERUN_MS_FAIL);
						log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, pj->ji_qs.ji_jobid, log_buffer);

						/* If Job is  in wait Provision state, then fail_vnode_provisioning should be called.
						 * Since this job is going to get requed and can run on different set of vnodes
						 * hence to make sure provisioning failure on previous set of vnodes doesn't create problem.
						 */
						if (check_job_substate(pj, JOB_SUBSTATE_PROVISION)) {
							cnt = parse_prov_vnode(get_jattr_str(pj, JOB_ATR_prov_vnode),
									       &prov_vnode_list);

							/* Check if any node associated to the provisioned job is still in provisioning state. */
							for (i = 0; i < cnt; i++) {
								if ((vnode = find_nodebyname(prov_vnode_list[i]))) {
									if ((ptracking = get_prov_record_by_vnode(vnode->nd_name))) {
										prov_vnode_info = ptracking->prov_vnode_info;
										if (prov_vnode_info) {
											fail_vnode_job(prov_vnode_info, -1); /* Passing -1 so that fail_vnode_job neither hold nor requeue the job */
											break;
										}
									}
								}
							}
						}
						/* Set for requeuing the job if job is rerunnable */
						if (get_jattr_long(pj, JOB_ATR_rerunable) != 0) {
							set_job_substate(pj, JOB_SUBSTATE_RERUN3);
							if (pj->ji_acctrec != NULL) {
								if (pbs_asprintf(&tmp_acctrec, "%s %s", pj->ji_acctrec, log_buffer) == -1) {
									free(tmp_acctrec); /* free 1 byte malloc'd in pbs_asprintf() */
								} else {
									free(pj->ji_acctrec);
									pj->ji_acctrec = tmp_acctrec;
								}
							} else {
								pj->ji_acctrec = strdup(log_buffer);
							}
						}

						/* When job is non-rerunnable and if job has any dependencies,
						 *register dependency request to delete the dependent jobs.
						 */
						if (get_jattr_long(pj, JOB_ATR_rerunable) == 0 &&
						    (is_jattr_set(pj, JOB_ATR_depend))) {
							/* set job exit status from MOM */
							pj->ji_qs.ji_un.ji_exect.ji_exitstat = JOB_EXEC_RERUN_MS_FAIL;
							(void) depend_on_term(pj);
						}

						/* notify all sisters to discard the job */
						discard_job(pj, "on node down requeue", 0);

						/* Clear "resources_used" only if not waiting on any mom */
						if (!pj->ji_jdcd_waiting && ((pj->ji_qs.ji_svrflags & (JOB_SVFLG_CHKPT | JOB_SVFLG_ChkptMig)) == 0)) {
							free_jattr(pj, JOB_ATR_resc_used);
						}
					}
				}
			}
		}
	}
}

/**
 * @brief
 * 		called when a node is marked down or responds to an
 * 		IS_DISCARD_JOB message.
 *
 * @par Functionality:
 * 		If all Moms have responded or are down, then we can deal with the job
 * 		depending on the substate.
 *
 *		If the second arg (pmom) is null, just check the state; if not null
 *		then mark that Mom's slot as done, then check
 *
 * @see
 * 		discard_job
 *
 * @param[in,out]	pjob	-	point to the job
*  @param[in]		pmom	-	if (pmom) is null, just check the state; if not null then mark that Mom's slot as done
 * @param[in]		newstate-	new state.
 *
 * @return	void
 *
 * @par MT-safe: No
 */
static void
post_discard_job(job *pjob, mominfo_t *pmom, int newstate)
{
	char *downmom = NULL;
	char hook_msg[HOOK_MSG_SIZE] = {0};
	struct jbdscrd *pdsc;
	struct batch_request *preq;
	int rc;

	if (pjob->ji_discard == NULL) {
		pjob->ji_discarding = 0;
		return;
	}
	if (pmom != NULL) {
		for (pdsc = pjob->ji_discard; pdsc->jdcd_mom; ++pdsc) {
			if (pdsc->jdcd_mom == pmom) {
				pdsc->jdcd_state = newstate;
				break;
			}
		}
	}

	for (pdsc = pjob->ji_discard; pdsc->jdcd_mom; ++pdsc) {
		if (pdsc->jdcd_state == JDCD_WAITING)
			return; /* need to wait some more */
	}
	pjob->ji_jdcd_waiting = 0;

	/* not waiting on any Mom to reply to an IS_DISCARD_JOB */
	/* so can now deal with the job                         */

	/* find name of (a) down mom */
	for (pdsc = pjob->ji_discard; pdsc->jdcd_mom; ++pdsc) {
		if (pdsc->jdcd_state == JDCD_DOWN) {
			downmom = pdsc->jdcd_mom->mi_host;
			break;
		}
	}
	if (downmom == NULL)
		downmom = ""; /* didn't find one, null string for msg */

	free(pjob->ji_discard);
	pjob->ji_discard = NULL;

	if (check_job_state(pjob, JOB_STATE_LTR_QUEUED) && (check_job_substate(pjob, JOB_SUBSTATE_QUEUED))) {
		static char nddown[] = "Job never started, execution node %s down";

		/*
		 * The job was rejected by mother superior and has
		 * already been placed back in queued state by a
		 * call to svr_evaljobstate() within post_sendjob().
		 * This is done regarless of whether the job is
		 * rerunnable or not, since it never actually started.
		 * There was no start record for this job, so no need
		 * to call account_jobend().
		 */
		sprintf(log_buffer, nddown, downmom);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);
		return;
	}

	if (check_job_state(pjob, JOB_STATE_LTR_HELD) && (check_job_substate(pjob, JOB_SUBSTATE_HELD))) {
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, "Leaving job in held state");
		return;
	}

	if (check_job_substate(pjob, JOB_SUBSTATE_RERUN3) || pjob->ji_discarding) {

		static char *ndreque;

		if (pjob->ji_discarding)
			ndreque = "Job requeued, discard response received";
		else
			ndreque = "Job requeued, execution node %s down";

		/*
		 * Job to be rerun,   no need to check if job is rerunnable
		 * because to get here the job is either rerunnable or Mom
		 * tried to run the job and it failed before it ever went
		 * into execution and sent the server JOB_EXEC_RETRY
		 */
		sprintf(log_buffer, ndreque, downmom);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);
		force_reque(pjob);
		if (pjob->ji_acctrec) {
			free(pjob->ji_acctrec); /* logged, so clear it */
			pjob->ji_acctrec = NULL;
		}

		/* free resc_used */
		if ((is_jattr_set(pjob, JOB_ATR_resc_used)) &&
		    ((pjob->ji_qs.ji_svrflags & (JOB_SVFLG_CHKPT | JOB_SVFLG_ChkptMig)) == 0))
			free_jattr(pjob, JOB_ATR_resc_used);

		pjob->ji_discarding = 0;
		return;
	}

	/* at this point the job is to be purged */
	pjob->ji_qs.ji_obittime = time_now;
	set_jattr_l_slim(pjob, JOB_ATR_obittime, pjob->ji_qs.ji_obittime, SET);

	/* Allocate space for the jobobit hook event params */
	preq = alloc_br(PBS_BATCH_JobObit);
	if (preq == NULL) {
		log_err(PBSE_INTERNAL, __func__, "rq_jobobit alloc failed");
	} else {
		preq->rq_ind.rq_obit.rq_pjob = pjob;
		rc = process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt);
		if (rc == -1) {
			log_err(-1, __func__, "rq_jobobit process_hooks call failed");
		}
		free_br(preq);
	}

	if (pjob->ji_acctrec) {
		/* fairly normal job exit, record accounting info */
		account_job_update(pjob, PBS_ACCT_LAST);
		account_jobend(pjob, pjob->ji_acctrec, PBS_ACCT_END);

		if (get_sattr_long(SVR_ATR_log_events) & PBSEVENT_JOB_USAGE) {
			/* log events set to record usage */
			log_event(PBSEVENT_JOB_USAGE, PBS_EVENTCLASS_JOB, LOG_INFO,
				  pjob->ji_qs.ji_jobid, pjob->ji_acctrec);
		} else {
			char *pc;

			/* no usage in log, truncate messge */
			if ((pc = strchr(pjob->ji_acctrec, (int) ' ')) != NULL)
				*pc = '\0';
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
				  pjob->ji_qs.ji_jobid, pjob->ji_acctrec);
		}

	} else {
		static char ndtext[] = "Job deleted, execution node %s down";

		sprintf(log_buffer, ndtext, downmom);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);
		account_record(PBS_ACCT_DEL, pjob, log_buffer);
		svr_mailowner(pjob, MAIL_ABORT, MAIL_FORCE, log_buffer);
	}

	rel_resc(pjob); /* free any resc assigned to the job */
	if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0)
		issue_track(pjob);
	/*
	 * If the server is configured to maintain job history, then
	 * keep the job structure which will be cleaned up later by
	 * SERVER, probably after the history duration. History job
	 * type is T_MOM_DOWN(2) for the jobs to be purged because
	 * of MOM failure.
	 */
	if (svr_chk_history_conf())
		svr_setjob_histinfo(pjob, T_MOM_DOWN);
	else
		job_purge(pjob);

	return;
}

/**
 * @brief
 * 		mark mom (by ptr) down and log message
 *
 * @param[in]		pmom	-	mom which is down
 * @param[in]		why		-	the reason why the mom is down
 *
 * @return	void
 */
void
momptr_down(mominfo_t *pmom, char *why)
{
	int i;
	int j;
	int nj;
	int nchild;
	struct pbsnode *np;
	struct jobinfo *pji;
	job **parray;
	struct pbssubn *psn;
	mom_svrinfo_t *psvrmom = (mom_svrinfo_t *) (pmom->mi_data);
	long sec;
	int setwktask = 0;
	int is_provisioning = 0;
	job *pj;

	pmom->mi_dmn_info->dmn_state |= INUSE_DOWN;

	/* log message if node just down or been down for an hour */
	/* mark mom down and vnodes down as well                  */
	if ((psvrmom->msr_timedown + 3600) > time_now)
		return;

	psvrmom->msr_timedown = time_now;

	/* is node provisioning? */
	for (nchild = 0; nchild < psvrmom->msr_numvnds; ++nchild) {
		np = psvrmom->msr_children[nchild];
		if (np->nd_state & INUSE_PROV) {
			is_provisioning = 1;
			break;
		}
	}

#ifndef NAS /* localmod 023 */
	/* do not display 'node down' msg and comment */
	if (is_provisioning) {
		set_all_state(pmom, 1, INUSE_DOWN, NULL,
			      Set_All_State_Regardless);
	} else {
#endif /* localmod 023 */

#ifdef NAS /* localmod 023 */
		if (is_provisioning)
			(void) snprintf(log_buffer, sizeof(log_buffer), "node down for provisioning: %s", why);
		else
#endif /* localmod 023 */
			(void) snprintf(log_buffer, sizeof(log_buffer), "node down: %s", why);
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
			  LOG_ALERT, pmom->mi_host, log_buffer);

		set_all_state(pmom, 1, INUSE_DOWN, log_buffer,
			      Set_ALL_State_All_Down);
#ifndef NAS /* localmod 023 */
	}
#endif /* localmod 023 */

	for (nchild = 0; nchild < psvrmom->msr_numvnds; ++nchild) {

		np = psvrmom->msr_children[nchild];

		for (psn = np->nd_psn; psn; psn = psn->next) {
			if (psn->jobs) {
				setwktask = 1;
				nj = 0;
				/* find list of jobs on this sub-node */
				/* first, how many are they */
				for (pji = psn->jobs; pji; pji = pji->next) {
					pj = find_job(pji->jobid);
					if (pj && pj->ji_discard)
						++nj;
				}
				/* if any, save pointer to the jobs in an array as the    */
				/* list may be distrubed by the post_discard_job function */
				if (nj != 0) {
					parray = (job **) calloc((size_t) nj, sizeof(job *));
					if (parray) {
						i = 0;
						for (pji = psn->jobs; pji; pji = pji->next) {
							pj = find_job(pji->jobid);
							if (pj && pj->ji_discard) {
								/* we only want one entry per job */
								for (j = 0; j < i; ++j) {
									if (*(parray + j) == pj)
										break;
								}
								if (j == i) {
									*(parray + i) = pj; /* new, add it */
									++i;
								}
							}
						}

						for (i = 0; i < nj; ++i)
							if (*(parray + i))
								post_discard_job(*(parray + i), pmom, JDCD_DOWN);

						free(parray);
						parray = NULL;
					}
				}
			}
		}
	}

	/* If this Mom is in a vnode pool and is the inventory Mom for that pool */
	/* remove her from that role and if another Mom in the pool is up make   */
	/* that one the new inventory Mom */

	if (psvrmom->msr_vnode_pool != 0) {
		reset_pool_inventory_mom(pmom);
	}

	if (((sec = node_fail_requeue) != 0) &&
	    (setwktask != 0) && (psvrmom->msr_wktask == NULL)) {

		/* there isn't an outstanding work task to deal with the jobs    */
		/* and node has jobs, set task to deal with the jobs after delay */

		if (sec < 0) /* if less than zero, treat as if one */
			sec = 1;

		psvrmom->msr_wktask = set_task(WORK_Timed, time_now + sec, node_down_requeue, (void *) pmom);
	}

	return;
}

/**
 * @brief
 * 		Given a vnode_state_op, return the string value.
 * 		The enum is found in pbs_nodes.h
 *
 * @param[in]	op - The operation for the state change
 *
 * @return	char *
 */
char *
get_vnode_state_op(enum vnode_state_op op)
{
	switch (op) {
		case Nd_State_Set:
			return "Nd_State_Set";
		case Nd_State_Or:
			return "Nd_State_Or";
		case Nd_State_And:
			return "Nd_State_And";
	}
	return "ND_state_unknown";
}

/**
 * @brief
 * 		Create a duplicate of the specified vnode
 *
 * @param[in]	vnode - the vnode to duplicate
 *
 * @note
 *  Creates a shallow duplicate of struct * and char * members.
 *
 *
 * @return  duplicated vnode
 */
static struct pbsnode *
shallow_vnode_dup(struct pbsnode *vnode)
{
	int i;
	struct pbsnode *vnode_dup = NULL;

	if (vnode == NULL) {
		return NULL;
	}

	/*
	 * Allocate and initialize vnode_o, then copy vnode elements into vnode_o
	 */
	if ((vnode_dup = calloc(1, sizeof(struct pbsnode))) == NULL) {
		log_err(PBSE_INTERNAL, __func__, "vnode_dup alloc failed");
		return NULL;
	}

	/*
	 * Copy vnode elements (same order as "struct pbsnode" element definition)
	 */

	vnode_dup->nd_name = vnode->nd_name;
	vnode_dup->nd_moms = vnode->nd_moms;
	vnode_dup->nd_nummoms = vnode->nd_nummoms;
	vnode_dup->nd_nummslots = vnode->nd_nummslots;
	vnode_dup->nd_index = vnode->nd_index;
	vnode_dup->nd_arr_index = vnode->nd_arr_index;
	vnode_dup->nd_hostname = vnode->nd_hostname;
	vnode_dup->nd_psn = vnode->nd_psn;
	vnode_dup->nd_resvp = vnode->nd_resvp;
	vnode_dup->nd_nsn = vnode->nd_nsn;
	vnode_dup->nd_nsnfree = vnode->nd_nsnfree;
	vnode_dup->nd_ncpus = vnode->nd_ncpus;
	vnode_dup->nd_state = vnode->nd_state;
	vnode_dup->nd_ntype = vnode->nd_ntype;
	vnode_dup->nd_pque = vnode->nd_pque;
	vnode_dup->nd_svrflags = vnode->nd_svrflags;
	for (i = 0; i < ND_ATR_LAST; i++) {
		vnode_dup->nd_attr[i] = vnode->nd_attr[i];
	}
	return vnode_dup;
}

/**
 * @brief
 * 		Change the state of a vnode. See pbs_nodes.h for definition of node's
 * 		availability and unavailability.
 *
 * 		This function detects the type of change, either from available to
 * 		unavailable, and invokes the appropriate handler to handle the state
 * 		change.
 *
 * @param[in]	pbsnode	- The vnode
 * @param[in]	state_bits	- the value to set the vnode to
 * @param[in]	type	- The operation on the node
 *
 * @return	void
 *
 * @par MT-safe: No
 */
void
set_vnode_state(struct pbsnode *pnode, unsigned long state_bits, enum vnode_state_op type)
{
	/*
	 * Vars used to construct hook event data
	 */
	struct batch_request *preq = NULL;
	struct pbsnode *vnode_o = NULL;
	char hook_msg[HOOK_MSG_SIZE] = {0};
	int time_int_val;
	int last_time_int;

	time_now = time(NULL);
	time_int_val = time_now;

	if (pnode == NULL)
		return;

	/*
	 * Allocate space for the modifyvnode hook event params
	 */
	preq = alloc_br(PBS_BATCH_ModifyVnode);
	if (preq == NULL) {
		log_err(PBSE_INTERNAL, __func__, "rq_modifyvnode alloc failed");
		return;
	}

	/*
	 * Create a duplicate of the vnode
	 */
	vnode_o = shallow_vnode_dup(pnode);
	if (vnode_o == NULL) {
		log_err(PBSE_INTERNAL, __func__, "shallow_vnode_dup failed");
		goto fn_free_and_return;
	}

	/*
	 * Apply specified state operation (to the vnode only)
	 */
	switch (type) {
		case Nd_State_Set:
			pnode->nd_state = state_bits;
			break;
		case Nd_State_Or:
			pnode->nd_state |= state_bits;
			break;
		case Nd_State_And:
			pnode->nd_state &= state_bits;
			break;
		default:
			DBPRT(("%s: operator type unrecognized %d, defaulting to Nd_State_Set",
			       __func__, type))
			type = Nd_State_Set;
			pnode->nd_state = state_bits;
	}

	/* Populate hook param rq_modifyvnode with old and new vnode states */
	preq->rq_ind.rq_modifyvnode.rq_vnode_o = vnode_o;
	preq->rq_ind.rq_modifyvnode.rq_vnode = pnode;

	DBPRT(("%s(%5s): Requested state transition 0x%lx --> 0x%lx\n", __func__,
	       pnode->nd_name, vnode_o->nd_state, pnode->nd_state))

	/* sync state attribute with nd_state */

	if (pnode->nd_state != get_nattr_long(pnode, ND_ATR_state))
		set_nattr_l_slim(pnode, ND_ATR_state, pnode->nd_state, SET);

	if (vnode_o->nd_state != pnode->nd_state) {
		set_nattr_l_slim(pnode, ND_ATR_last_state_change_time, time_int_val, SET);

		/* Write the vnode state change event to server log */
		last_time_int = (int) vnode_o->nd_attr[(int) ND_ATR_last_state_change_time].at_val.at_long;
		log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_NODE, LOG_INFO, pnode->nd_name,
		   	"set_vnode_state;vnode.state=0x%lx vnode_o.state=0x%lx "
		   	"vnode.last_state_change_time=%d vnode_o.last_state_change_time=%d "
		   	"state_bits=0x%lx state_bit_op_type_str=%s state_bit_op_type_enum=%d",
		   	pnode->nd_state, vnode_o->nd_state, time_int_val, last_time_int,
		   	state_bits, get_vnode_state_op(type), type);
	}

	if (pnode->nd_state & INUSE_PROV) {
		if (!(pnode->nd_state & VNODE_UNAVAILABLE) ||
		    (pnode->nd_state == INUSE_PROV)) { /* INUSE_FREE is 0 */

			resource_def *prd;
			resource *prc;

			prd = &svr_resc_def[RESC_VNTYPE];
			if (prd && (prc = find_resc_entry(get_nattr(pnode, ND_ATR_ResourceAvail), prd))) {
				if (strncmp(prc->rs_value.at_val.at_arst->as_string[0],
					    "cray_compute", 12) == 0) {

					/**
 					 * Unlike other nodes, in compute-node
 					 * provisioning, MOM node does not restart
 					 * so is_vnode_prov_done will not get call from
 					 * IS_HOOK_CHECKSUMS request and the same is
 					 * called here.
 					 */

					DBPRT(("%s: calling [is_vnode_prov_done] from set_vnode_state, type = %d\n", __func__, type))
					is_vnode_prov_done(pnode->nd_name);
				}
			}
		}

		/* while node is provisioning, we don't want the reservation
		 * to degrade, hence returning.
		 */
		goto fn_fire_event;
	}

	unsigned long bits;
	bits = vnode_o->nd_state ^ pnode->nd_state;

	if (bits & (INUSE_OFFLINE | INUSE_OFFLINE_BY_MOM |
		    INUSE_MAINTENANCE | INUSE_SLEEP |
		    INUSE_PROV | INUSE_WAIT_PROV))
		pnode->nd_modified = 1;

	DBPRT(("%s(%5s): state transition 0x%lx --> 0x%lx\n", __func__, pnode->nd_name,
	       vnode_o->nd_state, pnode->nd_state))

	/* node is marked INUSE_DOWN | INUSE_PROV when provisioning.
	 * need to check transition from INUSE_PROV to UNAVAILABLE
	 */
	if ((!(vnode_o->nd_state & VNODE_UNAVAILABLE) ||
	     (vnode_o->nd_state & INUSE_PROV)) &&
	    (pnode->nd_state & VNODE_UNAVAILABLE)) {
		/* degrade all associated reservations. The '1' instructs the function to
		 * account for the unavailable vnodes in the reservation's counter
		 */
		(void) vnode_unavailable(pnode, 1);
	} else if (((vnode_o->nd_state & VNODE_UNAVAILABLE)) &&
		   ((!(pnode->nd_state & VNODE_UNAVAILABLE)) ||
		    (pnode->nd_state == INUSE_FREE))) {
		(void) vnode_available(pnode);
	}

fn_fire_event:
	/* Fire off the vnode state change event */
	process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt);

fn_free_and_return:
	free(vnode_o);
	free_br(preq);
}

/**
 *  @brief
 *  	A vnode becomes available when its state transitions towards no bits
 *  	with VNODE_UNAVAILABLE set.
 * 		If the node was associated to a reservation and the reservation was degraded
 * 		then the reservation is adjusted to reflect that one of its associated vnode
 * 		is now back up.
 *
 * 		If all the vnodes associated to the reservation are back up
 * 		then the reservation does not need to be reconfirmed by the scheduler.
 * @see
 * 		set_vnode_state
 *
 * @param[in]	np	- the node that has become available again
 *
 * @return	void
 *
 * @par MT-safe: No
 */
void
vnode_available(struct pbsnode *np)
{
	resc_resv *presv;
	struct resvinfo *rinfp;
	struct resvinfo *rinfp_hd = NULL;
	char *execvnodes = NULL;
	int occurrence = -1;

	if (np == NULL)
		return;

	/* the vnode has no associated reservations, no action is required */
	if ((rinfp = find_vnode_in_resvs(np, Skip_Degraded_Time)) == NULL)
		return;

	DBPRT(("%s(%s): entered\n", __func__, np->nd_name))

	/* keep track of the head of the linked list for garbage collection */
	rinfp_hd = rinfp;

	/* Process each reservation that this node is associated to */
	for (presv = rinfp->resvp; rinfp; rinfp = rinfp->next) {
		if ((presv = rinfp->resvp) == NULL) {
			log_err(PBSE_SYSTEM, __func__, "could not access reservation");
			continue;
		}
		/* If none of the vnodes associated to the reservation are down, reset
		 * the states of the reservation to their previous values.
		 *
		 * ri_vnodes_down lives with the reservation information during the
		 * lifecycle of the server process, it is not stored to disk upon server
		 * restart. The second check on number of nodes down != 0 is done to
		 * avoid altering reservation information if the state of a node changes
		 * to UP while no nodes were previously seen as down
		 */
		if (presv->ri_vnodes_down != 0) {
			/* decrement number of nodes down */
			presv->ri_vnodes_down--;

			if (presv->ri_vnodes_down == 0) {
				/* If the reservation is currently running, reset its state to
				 * running
				 */
				if (presv->ri_qs.ri_state == RESV_RUNNING)
					resv_setResvState(presv, RESV_RUNNING, RESV_RUNNING);
				else {
					/* Otherwise revert its state to Confirmed */
					resv_setResvState(presv, RESV_CONFIRMED, RESV_CONFIRMED);
				}
				/* Unset all of the reservation retry attributes and values */
				unset_resv_retry(presv);
			}
		} else {
			/* An inconsistency in recognizing node state transitions caused an
			 * unexpected re-entry into this handler. Since this is not
			 * supposed to happen we only log it for now.
			 */
			/* If a standing reservation we print the execvnodes sequence
			 * string for debugging purposes */
			if (get_rattr_long(presv, RESV_ATR_resv_standing)) {
				if (is_rattr_set(presv, RESV_ATR_resv_execvnodes))
					execvnodes = get_rattr_str(presv, RESV_ATR_resv_execvnodes);
				if (execvnodes == NULL)
					execvnodes = "";
 				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
				           presv->ri_qs.ri_resvID, "execvnodes sequence: %s", execvnodes);
				if (is_rattr_set(presv, RESV_ATR_resv_idx))
					occurrence = get_rattr_long(presv, RESV_ATR_resv_idx);
				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
				           presv->ri_qs.ri_resvID, "vnodes in occurrence %d: %d; ",
				           occurrence, presv->ri_vnodect);
			} else {
				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
			           presv->ri_qs.ri_resvID, "vnodes in reservation: %d; ",
			           presv->ri_vnodect);
			}
		}
	}

	free_rinf_list(rinfp_hd);
}

/**
 * @brief
 * 		A node is considered unavailable if it is in one of the states:
 * 		OFFLINE, DOWN, DELETED, STALE, or UNKNOWN.
 *
 * 		If a node is in a reservation and the resv is associated to the soonest
 * 		occurrence then flag the reservation as state degraded and substate
 * 		degraded.
 *
 * 		Otherwise, if the reservation is a standing reservation, and the
 * 		node is in a later occurrence, then mark the reservation in substate
 * 		degraded.
 *
 * @param[in]	np	- the unavailable node
 * @param[in]	account_vnode	- register the vnode as down in the reservation's counts.
 *
 * @return	void
 * @par MT-safe: No
 */
void
vnode_unavailable(struct pbsnode *np, int account_vnode)
{
	char *nd_name;
	char *resv_nodes;
	resc_resv *presv;
	struct resvinfo *rinfp;
	struct resvinfo *rinfp_hd = NULL;
	int *presv_state;
	int *presv_substate;
	int in_soonest_occr;
	long degraded_time;
	long resv_start_time;
	long retry_time;
	char *execvnodes = NULL;
	int occurrence = -1;

	if (np == NULL)
		return;

	if (!(nd_name = np->nd_name))
		return;

	/* If the vnode has no associated reservation, i.e., the vnode does not
	 * appear in any advance reservation nor any occurrence of a standing
	 * reservation, then no action is required.
	 */
	if ((rinfp = find_vnode_in_resvs(np, Set_Degraded_Time)) == NULL)
		return;

	DBPRT(("%s(%s): entered\n", __func__, np->nd_name))

	/* keep track of the head of the linked list for garbage collection */
	rinfp_hd = rinfp;

	/* Process each reservation that this node is associated to */
	for (presv = rinfp->resvp; rinfp; rinfp = rinfp->next) {

		if ((presv = rinfp->resvp) == NULL) {
			log_err(PBSE_SYSTEM, __func__, "could not access reservation");
			continue;
		}

		presv_state = &presv->ri_qs.ri_state;
		presv_substate = &presv->ri_qs.ri_substate;
		retry_time = get_rattr_long(presv, RESV_ATR_retry);
		resv_nodes = get_rattr_str(presv, RESV_ATR_resv_nodes);
		resv_start_time = get_rattr_long(presv, RESV_ATR_start);
		/* the start time of the soonest degraded occurrence */
		degraded_time = presv->ri_degraded_time;
		in_soonest_occr = find_vnode_in_execvnode(resv_nodes, np->nd_name);

		if (retry_time == 0)
			set_resv_retry(presv, determine_resv_retry(presv));

		/* If the downed node is part of the soonest reservation then the
		 * reservation is marked degraded. This is recognized by having the
		 * degraded_time be equal to the reservation start time or if the vnode
		 * name is present in the soonest occurrence's resv_nodes attribute.
		 */
		if ((degraded_time == resv_start_time) || (in_soonest_occr == 1)) {
			DBPRT(("vnode_unavailable: changing reservation state to degraded\n"))
			if (*presv_state == RESV_CONFIRMED) {
				(void) resv_setResvState(presv, RESV_DEGRADED, RESV_DEGRADED);
			} else {
				/* If reservation is currently running and a node is down then
				 * set its substate to degraded
				 */
				(void) resv_setResvState(presv, presv->ri_qs.ri_state, RESV_DEGRADED);
			}
		} else if (degraded_time > resv_start_time)
			(void) resv_setResvState(presv, presv->ri_qs.ri_state, RESV_DEGRADED);

		/* reference count the number of vnodes down such that the state of the
		 * reservation can be reset to CONFIRMED once the number of unavailable
		 * nodes reaches 0.
		 */
		if ((*presv_substate == RESV_DEGRADED) && (account_vnode == 1)) {
			/* the number of vnodes down could exceed the number of vnodes in
			 * the reservation only in the case of a standing reservation for
			 * which the vnodes unavailable are associated to later occurrences
			 */
			if (presv->ri_vnodes_down > presv->ri_vnodect) {
				/* If a standing reservation we print the execvnodes sequence
				 * string for debugging purposes */
				if (get_rattr_arst(presv, RESV_ATR_resv_standing)) {
					if (is_rattr_set(presv, RESV_ATR_resv_execvnodes))
						execvnodes = get_rattr_str(presv, RESV_ATR_resv_execvnodes);
					if (execvnodes == NULL)
						execvnodes = "";
					log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
						   presv->ri_qs.ri_resvID, "execvnodes sequence: %s",
						   execvnodes);
					if (is_rattr_set(presv, RESV_ATR_resv_idx))
						occurrence = get_rattr_long(presv, RESV_ATR_resv_idx);
 					log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
					           presv->ri_qs.ri_resvID,
					           "vnodes in occurrence %d: %d;"
					           " unavailable vnodes in reservation: %d",
					           occurrence, presv->ri_vnodect, presv->ri_vnodes_down);
				} else {
					log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
 					   presv->ri_qs.ri_resvID,
					   "vnodes in reservation: %d; unavailable vnodes in reservation: %d",
 					   presv->ri_vnodect, presv->ri_vnodes_down);
				}
			}
			presv->ri_vnodes_down++;
		}

	} /* End of for. Process next reservation associated to the affected node */

	free_rinf_list(rinfp_hd);
}

/**
 * @brief
 * 		Search all reservations for an associated node that matches the one
 * 		passed as argument.
 *
 * @param[in]	np	-	The node to find in the reservations list
 * @param[in]	vnode_degraded_op	-	To indicate whether to set the degraded time on
 * 										the reservation or not.
 *
 * @return	resvinfo *
 * @retval	-	The reservation info structure	- for the matching reservations
 * @retval	NULL	- if none are found.
 *
 * @note
 * 		if none are found. This function allocates memory that has to be freed by
 * 		the caller.
 *
 * @par MT-safe: No
 */
struct resvinfo *
find_vnode_in_resvs(struct pbsnode *np, enum vnode_degraded_op degraded_op)
{
	struct resvinfo *rinfp;
	struct resvinfo *parent_rinfp;
	resc_resv *presv;
	pbsnode_list_t *pl;
	int match = 0;
	int is_degraded = 0;
	long retry_time;

	if (np == NULL)
		return NULL;

	/* Walk all reservations and check if the node is associated to an
	 * occurrence of a standing reservation
	 *
	 * While walking the reservation's list, we create a resv info linked list
	 * that contains all reservations on which the node appears
	 */
	rinfp = malloc(sizeof(struct resvinfo));
	if (!rinfp)
		return NULL;

	rinfp->resvp = NULL;
	rinfp->next = NULL;

	parent_rinfp = rinfp;

	for (presv = (resc_resv *) GET_NEXT(svr_allresvs); presv != NULL;
	     presv = (resc_resv *) GET_NEXT(presv->ri_allresvs)) {
		/* When processing an advance reservation, set the degraded time to be
		 * the start time of the reservation and process the next reservation
		 */
		if (get_rattr_long(presv, RESV_ATR_resv_standing) == 0) {
			for (pl = presv->ri_pbsnode_list; pl; pl = pl->next) {
				if (np == pl->vnode)
					break;
			}
			if (!pl)
				continue;

			presv->ri_degraded_time = get_rattr_long(presv, RESV_ATR_start);
			if (!match) {
				rinfp->resvp = presv;
				rinfp->next = NULL;
				match = 1;
			} else {
				rinfp->next = malloc(sizeof(struct resvinfo));
				if (!rinfp->next) {
					log_err(PBSE_SYSTEM, __func__,
						"could not allocate memory to create a resvinfo list");
					break;
				}
				rinfp = rinfp->next;
				rinfp->resvp = presv;
				rinfp->next = NULL;
			}
		} else { /* Standing Reservation */
			/* If the sequence of execvnodes of the considered standing reservation
			 * isn't set, process the next element. Note that this should never
			 * happen as the reservation should have been confirmed and the nodes
			 * been assigned to it
			 */
			if (!is_rattr_set(presv, RESV_ATR_resv_execvnodes)) {
				is_degraded = 1;
				log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESV, LOG_NOTICE,
				           presv->ri_qs.ri_resvID,
				           "%s: Reservation's execvnodes_seq are corrupted, degrading it",
				           __func__);
				if (presv->ri_qs.ri_substate != RESV_DEGRADED) {
					if (presv->ri_qs.ri_state == RESV_RUNNING
					    || presv->ri_qs.ri_state == RESV_DELETING_JOBS) {
						/*
						** leave it as is, rely on resv_vnodes to run jobs in it;
						** once this occurrence finally finishes
						** we will re-evaluate whether to reconfirm next occurrence.
						** Still set substate to degraded now to alert site admins
						*/
						resv_setResvState(presv, presv->ri_qs.ri_state, RESV_DEGRADED);
					} else {
						resv_setResvState(presv, RESV_DEGRADED, RESV_DEGRADED);
						retry_time = determine_resv_retry(presv);
						/* if server just came up wait 'some time' for nodes to come up */
						if (time_now + ESTIMATED_DELAY_NODES_UP < retry_time)
							retry_time = time_now + ESTIMATED_DELAY_NODES_UP;
						/* bogus value for degraded_time, but avoid skipping a reconfirmation */
						presv->ri_degraded_time = get_rattr_long(presv, RESV_ATR_start);
						log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID,
						           "%s: Reservation with corrupted nodes, setting up reconfirmation",
						           __func__);
						force_resv_retry(presv, retry_time);
					}
				}
				/* the reservation is degraded but we cannot associate it with the node */
 				continue;
			} else {
				is_degraded = find_degraded_occurrence(presv, np, degraded_op);
 			}

			/* If no occurrence is degraded move on to the next reservation */
			if (is_degraded == 0)
				continue;

			/* Add the reservation to the constructed linked list to which this
			 * node is associated
			 */
			if (!match) {
				rinfp->resvp = presv;
				rinfp->next = NULL;
				match = 1;
			} else {
				rinfp->next = malloc(sizeof(struct resvinfo));
				if (!rinfp->next) {
					log_err(PBSE_SYSTEM, __func__,
						"could not allocate memory to create a resvinfo list");
					break;
				}
				rinfp = rinfp->next;
				rinfp->resvp = presv;
				rinfp->next = NULL;
			}
		}
	}

	/* no reservations are associated to this vnode */
	if (!match) {
		free(rinfp);
		rinfp = NULL;
		parent_rinfp = NULL;
	}

	return parent_rinfp;
}

/**
 * @brief
 * 		Walk occurrences of a standing reservation searching for the soonest
 * 		valid degraded occurrence associated to the vnode passed as argument.
 *
 * @param[in]	presv	- The reservation being processed
 * @param[in]	np	- The node affected, either available or unavailable
 * @param[in]	vnode_degraded_op	- determines if a degraded time should be set
 *
 * @return	int
 * @retval	1	- upon success finding the node in the reservation (including its
 * 					occurrences when a standing reservation)
 * @retval 0	- if the node was not found.
 *
 * @par Side-effects: this function will also set the degraded time of the
 * reservation when instructed to by the degraded_op operator.
 *
 * @par MT-safe: No
 */
int
find_degraded_occurrence(resc_resv *presv, struct pbsnode *np,
			 enum vnode_degraded_op degraded_op)
{
	char **execvnodes_seq;
	char *short_execvnodes_seq = NULL;
	char **tofree = NULL;
	char *rrule;
	char *tz;
	char *execvnodes = NULL;
	long dtstart;
	long occr_time;
	long curr_degraded_time;
	int ridx;
	int ridx_adjusted;
	int rcount;
	int rcount_adjusted;
	int i, j;
	int occr_found;

	if (presv == NULL)
		return 0;

	if (np == NULL)
		return 0;

	rrule = get_rattr_str(presv, RESV_ATR_resv_rrule);
	tz = get_rattr_str(presv, RESV_ATR_resv_timezone);
	dtstart = get_rattr_long(presv, RESV_ATR_start);
	if (is_rattr_set(presv, RESV_ATR_resv_execvnodes))
		execvnodes = get_rattr_str(presv, RESV_ATR_resv_execvnodes);
	if (execvnodes == NULL || (short_execvnodes_seq = strdup(execvnodes)) == NULL)
		return -1;
	execvnodes_seq = unroll_execvnode_seq(short_execvnodes_seq, &tofree);
	/* If an error occurred during unrolling, this reservation is ignored */
	if (!(*execvnodes_seq)) {
		free(short_execvnodes_seq);
		return -1;
	}

	ridx = get_rattr_long(presv, RESV_ATR_resv_idx);
	rcount = get_rattr_long(presv, RESV_ATR_resv_count);
	/* A reconfirmed degraded reservation reports the number of
	 * reconfirmed occurrences from the time of degradation.
	 */
	rcount_adjusted = get_execvnodes_count(execvnodes);

	ridx_adjusted = ridx - (rcount - rcount_adjusted);
	occr_found = 0;
	curr_degraded_time = 0;

	/* Search for a match for this node in each occurrence's execvnode */
	for (i = ridx_adjusted - 1, j = 1; i < rcount_adjusted; i++, j++) {
		if (i < 0) {
			log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID,
				       "%s: attempt to find vnodes for for occurence %d failed; skipping",
		           __func__, j);
			continue;
		}
		if (find_vnode_in_execvnode(execvnodes_seq[i], np->nd_name)) {
			occr_found = 1;
			if (degraded_op == Set_Degraded_Time) {
				/* we keep track of the occurrence time to determine the earliest
				 * degraded time
				 */
				occr_time = get_occurrence(rrule, dtstart, tz, j);

				if (presv->ri_degraded_time == 0 &&
				    curr_degraded_time == 0) {
					curr_degraded_time = occr_time;
				}
			} else
				break;
		}
	}
	/* clean up unrolled execvnodes sequence helpers */
	free(execvnodes_seq);
	execvnodes_seq = NULL;
	free(short_execvnodes_seq);
	short_execvnodes_seq = NULL;
	free_execvnode_seq(tofree);
	tofree = NULL;

	/* No matching vnode name was found in any occurrence */
	if (!occr_found)
		return 0;

	/* A matching vnode was found in an occurrence but no degraded time was set
	 * , we set it to curr_degraded_time for consistency
	 */
	if (presv->ri_degraded_time == 0 && curr_degraded_time != 0)
		presv->ri_degraded_time = curr_degraded_time;

	return 1;
}

/**
 * @brief
 * 		Garbage collect the dynamically generated reservation list
 * @see
 *		vnode_available and vnode_unavailable
 *
 * @param[in,out]	rinfp	-dynamically generated reservation list
 *
 * @return	void
 *
 * @par MT-safe: No
 */
void
free_rinf_list(struct resvinfo *rinfp)
{
	struct resvinfo *rinfp_tmp = rinfp;

	if (rinfp_tmp == NULL)
		return;

	while (rinfp != NULL) {
		rinfp_tmp = rinfp->next;
		free(rinfp);
		rinfp = rinfp_tmp;
	}
}

/**
 * @brief
 * 		Unset all reservations retry attributes and variables.
 *
 * @param[in]	presv - The reservation to process
 *
 * @return	void
 *
 * @par MT-safe: No
 */
void
unset_resv_retry(resc_resv *presv)
{
	if (presv == NULL)
		return;

	if (!is_rattr_set(presv, RESV_ATR_retry))
		return;

	set_rattr_l_slim(presv, RESV_ATR_retry, 0, SET);

	presv->ri_resv_retry = 0;
	presv->ri_degraded_time = 0;
}

/**
 * @brief
 * 		Set reservation retry attributes and variables.
 * 		The reservation attribute RESV_ATR_retry is recovered upon a server
 * 		restart. The field ri_resv_retry is not.
 * 		If RESV_ATR_retry is set, we add that already existing time as the
 * 		event time, otherwise we compute the event time
 *
 * @param[in]	presv	-	The reservation to process
 * @param[in]	retry_time	-	The retry time to set
 * @param[in]	forced  - determines which handler we call
 * 
 *
 * @return	void
 *
 * @par MT-safe: No
 */
void
set_resv_retry2(resc_resv *presv, long retry_time, int forced)
{
	struct work_task *pwt;
	extern void resv_retry_handler(struct work_task *ptask);
	extern void resv_retry_handler_forced(struct work_task *ptask);
	char *msg;
	char *str_time;

	if (presv == NULL)
		return;

	if (presv->ri_resv_retry)
		msg = "Next attempt to reconfirm reservation will be made on %s";
	else
		msg = "An attempt to reconfirm reservation will be made on %s";

	set_rattr_l_slim(presv, RESV_ATR_retry, retry_time, SET);

	presv->ri_resv_retry = retry_time;

	str_time = ctime(&retry_time);
	if (str_time == NULL)
		str_time = "";
	log_eventf(PBSEVENT_DEBUG2, PBS_EVENTCLASS_RESV, LOG_NOTICE, presv->ri_qs.ri_resvID, msg, str_time);

	/* Set a work task to initiate a scheduling cycle when the time to check
	 * for alternate nodes to assign the reservation comes
	 */
	if ((pwt = set_task(WORK_Timed, retry_time, forced ? resv_retry_handler_forced : resv_retry_handler, presv)) != NULL) {
		/* set things so that the reservation going away will result in
		 * any "yet to be processed" work tasks also going away
		 */
		append_link(&presv->ri_svrtask, &pwt->wt_linkobj, pwt);
	}
}

/**
 * @brief
 * 		Set reservation retry attributes and variables.
 * 		The reservation attribute RESV_ATR_retry is recovered upon a server
 * 		restart. The field ri_resv_retry is not.
 * 		If RESV_ATR_retry is set, we add that already existing time as the
 * 		event time, otherwise we compute the event time
 *
 *      This one will only kick a reconfirmation if ri_vnodes_down is positive
 *
 * @param[in]	presv	-	The reservation to process
 * @param[in]	retry_time	-	The retry time to set
 *
 * @return	void
 *
 * @par MT-safe: No
 */
void
set_resv_retry(resc_resv *presv, long retry_time)
{
	set_resv_retry2(presv, retry_time, 0);
}

/**
 * @brief
 * 		Set reservation retry attributes and variables.
 * 		The reservation attribute RESV_ATR_retry is recovered upon a server
 * 		restart. The field ri_resv_retry is not.
 * 		If RESV_ATR_retry is set, we add that already existing time as the
 * 		event time, otherwise we compute the event time
 *
 *      This will always kick a reconfirmation
 *
 * @param[in]	presv	-	The reservation to process
 * @param[in]	retry_time	-	The retry time to set
 *
 * @return	void
 *
 * @par MT-safe: No
 */
void
force_resv_retry(resc_resv *presv, long retry_time)
{
	set_resv_retry2(presv, retry_time, 1);
}

/**
 * @brief
 * 		search string big for exact occurrence of string little. The preceding
 * 		and successsor characters of the occurring string should be legal vnode
 * 		characters. The pattern defined by 'little' consists only of legal vnode
 * 		characters.
 *
 * 		This function is used to find an exact match of a vnode name within an
 * 		execvnode string, for example searching for "node1" in the execvnode
 * 		(node12:ncpus=1)+(node1node1:ncpus=2)+(node1:npcus=3)+(node3:mem=5000:npcus=1)
 * @see
 * 		vnode_unavailable and find_degraded_occurrence
 *
 * @param[in]	big	-	the original string to search
 * @param[in]	little	-	the pattern to find
 *
 * @return	int
 * @retval	1	- if the pattern is found
 * @retval	0	- otherwise
 *
 * @par MT-safe: no
 */
int
find_vnode_in_execvnode(char *big, char *little)
{
	char *s;
	int patt_length;

	if (big == NULL)
		return 0;

	if (little == NULL)
		return 0;

	s = strstr(big, little);

	patt_length = strlen(little);

	/*
	 * Note that the pattern little can never occur at the beginning of big, as
	 * the only way this would happen would be for a string containing a
	 * repetition of the pattern, as in the second execvnode in the example above,
	 * where node1 is repeated twice and therefore a vnode name distinct from
	 * node1, is skipped by catching the index value being 0.
	 */
	while (s != NULL) {
		ptrdiff_t index;

		/* Get the index in the original string at which the occurrence is found
		 * using pointer arithmetic. */
		index = s - big;

		/* If the pattern isn't part of the remainder of a pattern, for example
		 * looking for "node1" in "node1node1" and the immediately preceding and
		 * succeeding characters aren't legal vnode characters, then it is a match
		 */
		if (index != 0 && !legal_vnode_char(big[index - 1], 1) && !legal_vnode_char(big[index + patt_length], 1))
			return 1;
		/* Otherwise, we move by the amount that the pattern requires before
		 * running the search again
		 */
		s = s + patt_length;

		s = strstr(s, little);
	}
	return 0;
}

/**
 * @brief
 * 		decode_stat_update - decodes body of status update request from MOM
 *		number of jobs should already be decoded by caller
 * @see
 * 		stat_update and recv_job_obit.
 *
 * @param[in]	stream	-	TPP stream open from Mom on which to read the msg
 * @param[out]	prused	-	Job Resource Usage requests
 *
 * @return	int
 * @return	return code
 */

static int
decode_stat_update(int stream, ruu *prused)
{
	int hc;
	int rc;

	prused->ru_pjobid = disrst(stream, &rc);
	if (rc)
		return rc;

	hc = disrsi(stream, &rc);
	if (rc)
		return rc;
	if (hc) {
		/* there is a comment string following */
		prused->ru_comment = disrst(stream, &rc);
		if (rc)
			return rc;
	} else {
		prused->ru_comment = NULL;
	}
	prused->ru_status = disrsi(stream, &rc);
	if (rc)
		return rc;
	prused->ru_hop = disrsi(stream, &rc);
	if (rc)
		return rc;

	CLEAR_HEAD(prused->ru_attr);
	rc = decode_DIS_svrattrl(stream, &prused->ru_attr);
	if (rc) {
		free_attrlist(&prused->ru_attr);
	}
	return rc;
}

/**
 * @brief
 *		Update job resource usage based on information sent from Mom.
 *		All updates include the lastest information on resource usage.
 * @par Functionality:
 *		An update from Mom also contains certain attributes which
 *		need to be recorded,  the most inportant of which is the job's
 *		session id.  When the session id is modified, the job's substate is
 *		changed from PRERUN to RUNNING; this also saves the job to the database,
 *		otherwise it is saved explicitly.
 * @see
 * 		is_request
 *
 * @param[in] stream - TPP stream open from Mom on which to read the msg
 *
 * @return	void
 */
static void
stat_update(int stream)
{
	int bad;
	int num;
	int njobs;
	job *pjob;
	int rc;
	ruu rused = {0};
	svrattrl *sattrl;
	mominfo_t *mp;

	njobs = disrui(stream, &rc); /* number of jobs in update */
	if (rc)
		return;

	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, __func__, "received updates = %d", njobs);

	rused.ru_next = NULL;
	while (njobs--) {

		rused.ru_pjobid = NULL;
		if (decode_stat_update(stream, &rused) != 0) {

			if ((mp = tfind2((u_long) stream, 0, &streams)) != NULL) {

				log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
					  LOG_NOTICE, mp->mi_host, "error in stat_update");
			}
			tpp_eom(stream);
			break;
		}
		DBPRT(("stat_update: update for %s\n", rused.ru_pjobid))

		if (((pjob = find_job(rused.ru_pjobid)) != NULL) &&
		    (check_job_state(pjob, JOB_STATE_LTR_RUNNING) || check_job_state(pjob, JOB_STATE_LTR_EXITING)) &&
		    (get_jattr_long(pjob, JOB_ATR_run_version) == rused.ru_hop)) {

			long old_sid = 0; /* used to save prior sid of job */
			svrattrl *execvnode_entry = NULL;
			svrattrl *schedselect_entry = NULL;
			char *cur_execvnode = NULL;
			char *cur_schedselect = NULL;

			if (is_jattr_set(pjob, JOB_ATR_exec_vnode))
				cur_execvnode = get_jattr_str(pjob, JOB_ATR_exec_vnode);

			if (is_jattr_set(pjob, JOB_ATR_SchedSelect))
				cur_schedselect = get_jattr_str(pjob, JOB_ATR_SchedSelect);

			/* update all the attributes sent from Mom */
			execvnode_entry = find_svrattrl_list_entry(&rused.ru_attr, ATTR_execvnode, NULL);
			schedselect_entry = find_svrattrl_list_entry(&rused.ru_attr, ATTR_SchedSelect, NULL);

			if ((execvnode_entry != NULL) &&
			    (execvnode_entry->al_value != NULL) &&
			    (schedselect_entry != NULL) &&
			    (schedselect_entry->al_value != NULL) &&
			    (cur_execvnode != NULL) &&
			    (strcmp(cur_execvnode, execvnode_entry->al_value) != 0) &&
			    (cur_schedselect != NULL) &&
			    (strcmp(cur_schedselect, schedselect_entry->al_value) != 0)) {

				/* decreements everything found in exec_vnode */
				set_resc_assigned((void *) pjob, 0, DECR);
				free_nodes(pjob);

				if (cur_execvnode != NULL) {
					set_jattr_str_slim(pjob, JOB_ATR_exec_vnode_acct, cur_execvnode, NULL);
				}

				if ((is_jattr_set(pjob, JOB_ATR_resource_acct)) != 0) {
					free_jattr(pjob, JOB_ATR_resource_acct);
					mark_jattr_not_set(pjob, JOB_ATR_resource_acct);
				}
				set_attr_with_attr(&job_attr_def[JOB_ATR_resource_acct], get_jattr(pjob, JOB_ATR_resource_acct), get_jattr(pjob, JOB_ATR_resource), INCR);

				set_jattr_str_slim(pjob, JOB_ATR_exec_host_acct, get_jattr_str(pjob, JOB_ATR_exec_host), NULL);

				if (assign_hosts(pjob, execvnode_entry->al_value, 1) == 0) {
					resource_def *prdefsl;
					resource *presc;
					(void) update_resources_list(pjob, ATTR_l,
								     JOB_ATR_resource,
								     execvnode_entry->al_value,
								     INCR, 0,
								     JOB_ATR_resource_orig);

					if ((is_jattr_set(pjob, JOB_ATR_SchedSelect_orig)) == 0)
						set_jattr_str_slim(pjob, JOB_ATR_SchedSelect_orig, cur_schedselect, NULL);
					set_jattr_str_slim(pjob, JOB_ATR_SchedSelect, schedselect_entry->al_value, NULL);

					/* re-generate nodect */
					set_chunk_sum(get_jattr(pjob, JOB_ATR_SchedSelect), get_jattr(pjob, JOB_ATR_resource));
					set_resc_assigned((void *) pjob, 0, INCR);

					prdefsl = &svr_resc_def[RESC_SELECT];
					/* re-generate "select" resource */
					presc = find_resc_entry(get_jattr(pjob, JOB_ATR_resource), prdefsl);
					if (presc == NULL)
						presc = add_resource_entry(get_jattr(pjob, JOB_ATR_resource), prdefsl);
					if (presc != NULL)
						(void) prdefsl->rs_decode(&presc->rs_value, NULL, "select", schedselect_entry->al_value);
					account_jobstr(pjob, PBS_ACCT_PRUNE);
				} else {
					log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
						  pjob->ji_qs.ji_jobid,
						  "error assigning hosts...requeueing job");
					discard_job(pjob, "Force rerun", 1);
					force_reque(pjob);
				}
			}

			if (execvnode_entry != NULL) {
				delete_link(&execvnode_entry->al_link);
				free(execvnode_entry);
			}
			if (schedselect_entry != NULL) {
				delete_link(&schedselect_entry->al_link);
				free(schedselect_entry);
			}
			if (is_jattr_set(pjob, JOB_ATR_session_id))
				old_sid = get_jattr_long(pjob, JOB_ATR_session_id);
			/* update all the attributes sent from Mom */
			sattrl = (svrattrl *) GET_NEXT(rused.ru_attr);
			if (sattrl != NULL) {
				if (modify_job_attr(pjob, sattrl,
						    ATR_DFLAG_MGWR | ATR_DFLAG_SvWR, &bad) != 0) {
					if ((mp = tfind2((u_long) stream, 0, &streams)) != NULL) {
						for (num = 1; num < bad; num++)
							sattrl = (struct svrattrl *) GET_NEXT(sattrl->al_link);
						sprintf(log_buffer, "unable to update attribute %s.%s in stat_update", sattrl->al_name, sattrl->al_resc);
						log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
							  LOG_NOTICE, mp->mi_host, log_buffer);
					}
				}
			}

			if ((is_jattr_set(pjob, JOB_ATR_session_id)) && (get_jattr_long(pjob, JOB_ATR_session_id) != old_sid)) {
				/* save new or updated session id for the job */
				/* and if needed update substate to running   */
				/*
				 * save the session id and likely update the job
				 * substate, normally it is changed from
				 * PRERUN (or PROVISION) to RUNNING here, but
				 * it may have already been changed to:
				 * - EXITING if the OBIT arrived first.
				 */
				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid,
					   "Received session ID for job: %ld", get_jattr_long(pjob, JOB_ATR_session_id));
				if ((check_job_substate(pjob, JOB_SUBSTATE_PRERUN)) ||
				    (check_job_substate(pjob, JOB_SUBSTATE_PROVISION))) {
					/* log acct info and make RUNNING */
					complete_running(pjob);
					/* this causes a save of the job */
					svr_setjobstate(pjob, JOB_STATE_LTR_RUNNING,
							JOB_SUBSTATE_RUNNING);
					/*
					 * If JOB_DEPEND_TYPE_BEFORESTART dependency is set for the current job
					 * then release the after dependency for its childs as the current job
					 * is changing its state from JOB_SUBSTATE_PRERUN to JOB_SUBSTATE_RUNNING
					 */
					if (is_jattr_set(pjob, JOB_ATR_depend)) {
						(void) depend_on_exec(pjob);
					}
				}
			} else if ((is_jattr_set(pjob, JOB_ATR_session_id)) == 0) {
				/* this has been downgraded to DEBUG3  */
				/* level (from DEBUG2)		       */
				/* since a mom hook can actually send  */
				/* job updates, even before a job gets */
				/* a session id */
				log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB,
					  LOG_DEBUG, pjob->ji_qs.ji_jobid,
					  "update from Mom without session id");
			} else {
				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "Received the same SID as before: %ld", get_jattr_long(pjob, JOB_ATR_session_id));
				job_save_db(pjob);
			}
		}
		(void) free(rused.ru_comment);
		rused.ru_comment = NULL;
		(void) free(rused.ru_pjobid);
		rused.ru_pjobid = NULL;
		free_attrlist(&rused.ru_attr);
	}
}

/**
 * @brief
 * 		receive a job_obit IS message from a Mom on TPP stream.
 *
 *		Decode the message into a resc_used_update structure and call
 *		job_obit() to start the end of job procedures
 * @see
 * 		is_request
 *
 * @param[in]	stream	-	the TPP stream connecting to the Mom
 *
 * @return	void
 */

static void
recv_job_obit(int stream)
{
	int njobs = 0;
	int i = 0;
	char **reject_list = NULL;
	char **ack_list = NULL;
	int reject_count = 0;
	int ack_count = 0;
	mominfo_t *mp = NULL;
	ruu rused = {0};

	njobs = disrui(stream, &i); /* number of jobs in update */
	if (i)
		return;

	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, __func__, "received obits = %d", njobs);

	reject_list = (char **) calloc(1, njobs * sizeof(char *));
	if (reject_list == NULL)
		goto recv_job_obit_err;

	ack_list = (char **) calloc(1, njobs * sizeof(char *));
	if (ack_list == NULL)
		goto recv_job_obit_err;

	while (njobs--) {
		CLEAR_HEAD(rused.ru_attr);
		rused.ru_comment = NULL;
		rused.ru_next = NULL;
		rused.ru_pjobid = NULL;

		if (decode_stat_update(stream, &rused) == 0) {
			int is_reject = 0;

			DBPRT(("recv_job_obit: decoded obit for %s\n", rused.ru_pjobid))
			is_reject = job_obit(&rused, stream);
			if (is_reject == 1) {
				reject_list[reject_count++] = rused.ru_pjobid;
				rused.ru_pjobid = NULL;
			} else if (is_reject != -1) { /* -1 means ignore ruu */
				ack_list[ack_count++] = rused.ru_pjobid;
				rused.ru_pjobid = NULL;
			}
			free(rused.ru_comment);
			if (rused.ru_pjobid != NULL)
				free(rused.ru_pjobid);
			free_attrlist(&rused.ru_attr);
		} else
			goto recv_job_obit_err;
	}

	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, __func__, "processed obits, sending replies acks: %d, rejects: %d", ack_count, reject_count);

	if (ack_count > 0 || reject_count > 0) {
		if (is_compose(stream, IS_OBITREPLY) != DIS_SUCCESS)
			goto recv_job_obit_err;
		if (diswui(stream, ack_count) != DIS_SUCCESS)
			goto recv_job_obit_err;
		if (ack_count > 0) {
			for (i = 0; i < ack_count; i++) {
				if (diswst(stream, ack_list[i]) != DIS_SUCCESS)
					goto recv_job_obit_err;
				free(ack_list[i]);
				ack_list[i] = NULL;
			}
		}
		free(ack_list);
		ack_list = NULL;
		ack_count = 0;
		if (diswui(stream, reject_count) != DIS_SUCCESS)
			goto recv_job_obit_err;
		if (reject_count > 0) {
			for (i = 0; i < reject_count; i++) {
				if (diswst(stream, reject_list[i]) != DIS_SUCCESS)
					goto recv_job_obit_err;
				free(reject_list[i]);
				reject_list[i] = NULL;
			}
		}
		dis_flush(stream);
		free(reject_list);
		reject_list = NULL;
		reject_count = 0;
	}

	return;

recv_job_obit_err:
	if (rused.ru_pjobid) {
		DBPRT(("recv_job_obit: failed to decode obit for %s\n", rused.ru_pjobid))
		log_joberr(PBSE_INTERNAL, __func__, "Failed to decode obit", rused.ru_pjobid);
		free(rused.ru_pjobid);
	}
	if (rused.ru_comment)
		free(rused.ru_comment);
	free_attrlist(&rused.ru_attr);

	/* had a error, discard rest of message */
	if ((mp = tfind2((u_long) stream, 0, &streams)) != NULL) {
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, LOG_NOTICE, mp->mi_host, "error in recv_job_obit");
	}
	tpp_eom(stream);

	if (reject_list != NULL) {
		for (i = 0; i < reject_count; i++) {
			if (reject_list[i] != NULL)
				free(reject_list[i]);
		}
		free(reject_list);
	}
	if (ack_list != NULL) {
		for (i = 0; i < ack_count; i++) {
			if (ack_list[i] != NULL)
				free(ack_list[i]);
		}
		free(ack_list);
	}
}

/**
 * @brief
 * 	Tell Mom to discard (kill) a running job.
 *
 *	This is done in certain circumstances, such as
 *
 *	1. If Mom was marked down and jobs where requeued on node_down_requeue,
 *		Mom will kill off the job and then send an OBIT which wil be rejected
 *		because the run version will not match.
 *
 *	2. Mother Superior or a Sister failed to acknowledge the Delete Job request
 *		at the end of job processing.  This tells all Moms involved to delete
 *		the job and free the resources.
 *
 * @param[in]	stream	-	the TPP stream connecting to the Mom
 * @param[in]	jobid	-	job id to be discarded.
 * @param[in]	runver	-	is the run version (hop) of the jobs which should be deleted. A runver of -1 is delete any.
 * @param[in]	txt	-	the reason why it is getting discarded.
 *
 * @return	void
 */

static void
send_discard_job(int stream, char *jobid, int runver, char *txt)
{
	DBPRT(("discard_job %s\n", jobid))
	if (stream != -1) {
		static char sdjfmt[] = "Discard running job, %s %s";
		int rc;

		if ((rc = is_compose(stream, IS_DISCARD_JOB)) == DIS_SUCCESS) {
			if ((rc = diswst(stream, jobid)) == DIS_SUCCESS)
				if ((rc = diswsi(stream, runver)) == DIS_SUCCESS)
					dis_flush(stream);
		}
		if (rc != DIS_SUCCESS) {
			mominfo_t *mp;

			if (txt == NULL)
				txt = "";
			sprintf(log_buffer, sdjfmt, txt, "failed");
			mp = tfind2((u_long) stream, 0, &streams);
			if (mp)
				momptr_down(mp, log_buffer);
		} else if (txt) {
			snprintf(log_buffer, sizeof(log_buffer), sdjfmt, txt, "");
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, jobid,
				  log_buffer);
		}
	}
	DBPRT(("send_discard_job for %s, stream %d \n", jobid, stream))
}

/**
 * @brief
 * 		During the execution of a job, one or more Moms involved with
 *		the job apparent went down.
 *
 * @par
 *		To make sure that the resources allocated
 *		to the job by the Moms are released for other jobs, we send a
 *		IS_DISCARD_JOB message to each Mom.
 * @par
 *		A structure (struct jbdscrd) is hung off of the the job structure
 *		to track which Moms have acknowledge the IS_DISCARD_JOB message, see
 *		post_discard_job(), and which Moms are down, see mom_ptrdown().
 * @par
 *		The "txt" message is logged one time only to prevent flooding the log
 *		with duplicate messages.
 * @par
 *		If the "noack" flag is true, then we do not wish to wait for the
 *		Mom's acknowledgement because the job is being requeued/deleted
 *		immediately.   In this case we do not set ji_discard and do not
 *		call post_discard_job() for the first check.
 *
 * @param[in,out]	pjob	-	job structure
 * @param[in]	txt		-	The "txt" message is logged one time only to prevent flooding the log with duplicate messages.
 * @param[in]	noack	-	If the "noack" flag is true, then we do not wish to wait for the Mom's acknowledgement
 * 							 because the job is being requeued/deleted immediately.
 *
 * @return	void
 */
void
discard_job(job *pjob, char *txt, int noack)
{
	int i;
	int nmom;
	struct jbdscrd *pdsc = NULL;
	char *pc;
	char *pn;
	struct pbsnode *pnode;
	int rc;
	int rver;

	/* We're about to discard the job, reply to a preemption.
	 * This serves as a catch all just incase the code doesn't reply on its own.
	 */

	if (pjob->ji_pmt_preq != NULL)
		reply_preempt_jobs_request(PBSE_NONE, PREEMPT_METHOD_DELETE, pjob);

	if ((is_jattr_set(pjob, JOB_ATR_exec_vnode)) == 0) {
		/*  no exec_vnode list from which to work */
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG,
			  pjob->ji_qs.ji_jobid,
			  "in discard_job and no exec_vnode");
		return;
	}
	if (pjob->ji_discard) {
		/* must be already discarding */
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG,
			  pjob->ji_qs.ji_jobid,
			  "cancel previous discard_job tracking for new discard_job request");
		free(pjob->ji_discard);
		pjob->ji_discard = NULL;
	}

	/* first count up number of vnodes in exec_vnode to size the	*/
	/* jbdscrd (job discard) array, this may result in more entries	*/
	/* than needed for the number of Moms, but that is ok		*/

	nmom = 1;
	pn = get_jattr_str(pjob, JOB_ATR_exec_vnode);
	while ((pn = strchr(pn, (int) '+')) != NULL) {
		nmom++;
		pn++;
	}
	/* allocate one extra for the null terminator */
	pdsc = calloc(sizeof(struct jbdscrd), (size_t) (nmom + 1));
	if (pdsc == NULL)
		return;

	/* note, calloc has zeroed the space, so the jdcd_mom ptrs are null */

	/* go through the list of hosts and add each parent Mom once */
	nmom = 0;
	pn = parse_plus_spec(get_jattr_str(pjob, JOB_ATR_exec_host), &rc);
	while (pn) {
		pc = pn;
		while ((*pc != '\0') && (*pc != ':'))
			++pc;
		*pc = '\0';

		pnode = find_nodebyname(pn);
		/* had better be the "natural" vnode with only the one parent */
		if (pnode != NULL) {
			for (i = 0; i < nmom; ++i) {
				if ((pdsc + i)->jdcd_mom == pnode->nd_moms[0])
					break; /* already have this Mom */
			}
			if (i == nmom) {
				(pdsc + nmom)->jdcd_mom = pnode->nd_moms[0];
				if (pnode->nd_moms[0]->mi_dmn_info->dmn_state & INUSE_DOWN)
					(pdsc + nmom)->jdcd_state = JDCD_DOWN;
				else {
					(pdsc + nmom)->jdcd_state = JDCD_WAITING;
					pjob->ji_jdcd_waiting = 1;
				}
				nmom++;
			}
		}
		pn = parse_plus_spec(NULL, &rc);
	}

	/* Get run version of this job */
	rver = get_jattr_long(pjob, JOB_ATR_run_version);

	/* unless "noack", attach discard array to the job */
	if (noack == 0)
		pjob->ji_discard = pdsc;
	else
		pjob->ji_discard = NULL;

	/* Send discard message to each Mom that is up or mark the entry down */
	for (i = 0; i < nmom; i++) {
		int s;

		s = (pdsc + i)->jdcd_mom->mi_dmn_info->dmn_stream;
		if ((s != -1) && ((pdsc + i)->jdcd_state != JDCD_DOWN)) {
			send_discard_job(s, pjob->ji_qs.ji_jobid, rver, txt);
			txt = NULL; /* so one log message only */
		} else
			(pdsc + i)->jdcd_state = JDCD_DOWN;
	}

	/*
	 * at this point unless "noack", we call post_discard_job() to see if
	 * there are any outstanding discard requests and if not to deal with
	 * the job the second arg is NULL to indicate "just checking"
	 */
	if (noack == 0)
		post_discard_job(pjob, NULL, 0);
	else
		free(pdsc); /* not attached to job, free it now */
}

/**
 * @brief
 * 		receive message that a job is suspended/resumed because
 *		the cycle harvesting workstation has gone busy/idle.
 *
 *		Note, the JOB_SVFLG_Actsuspd bit which is set in the job is independent
 *		of the JOB_SVRFLG_Suspend bit which is set by qsig -s suspend.
 *		Both may be set.
 *
 *		Data received:	integer  job state (1 suspended, 0 resumed)
 *			string	 jobid
 *
 * @param[in]	stream	-	the TPP stream connecting to the Mom
 *
 * @reurn	void
 */
static void
recv_wk_job_idle(int stream)
{
	int rc;
	int which;
	char *jobid;
	job *pjob;

	which = disrui(stream, &rc); /* 1 = suspend, 0 = resume */
	if (rc)
		return;

	jobid = disrst(stream, &rc); /* job id */
	if (rc)
		return;

	pjob = find_job(jobid);
	if (pjob) {
		/* suspend or resume job */

		set_job_state(pjob, JOB_STATE_LTR_RUNNING);

		if (which)
			pjob->ji_qs.ji_svrflags |= JOB_SVFLG_Actsuspd;
		else
			pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_Actsuspd;

		job_save_db(pjob);
	}

	free(jobid);
}

/**
 * @brief
 *	Clears job 'pjob' from the pnode's list of jobs.
 *
 * @param[in]	jobid	- job id
 * @param[in]	pnode	- node structure
 *
 * @return int
 * @retval	<val> - # of cpus freed as a result of removing 'pjob'.
 *
 */
static int
deallocate_job_from_node(char *jobid, struct pbsnode *pnode)
{
	int numcpus = 0;    /* for floating licensing */
	int still_has_jobs; /* still jobs on this vnode */
	struct pbssubn *np;
	struct jobinfo *jp, *prev, *next;

	if ((jobid == NULL) || (pnode == NULL)) {
		return (0);
	}

	still_has_jobs = 0;
	for (np = pnode->nd_psn; np; np = np->next) {

		for (prev = NULL, jp = np->jobs; jp; jp = next) {
			next = jp->next;
			if (strcmp(jp->jobid, jobid)) {
				prev = jp;
				still_has_jobs = 1; /* another job still here */
				continue;
			}

			if (prev == NULL)
				np->jobs = next;
			else
				prev->next = next;
			if (jp->has_cpu) {
				pnode->nd_nsnfree++; /* up count of free */
				numcpus++;
				if (pnode->nd_nsnfree > pnode->nd_nsn) {
					log_event(PBSEVENT_SYSTEM,
						  PBS_EVENTCLASS_NODE, LOG_ALERT,
						  pnode->nd_name,
						  "CPU count incremented free more than total");
				}
			}
			free(jp->jobid);
			free(jp);
			jp = NULL;
		}
		if (np->jobs == NULL) {
			np->inuse &= ~(INUSE_JOB | INUSE_JOBEXCL);
		}
	}
	if (still_has_jobs) {
		/* if the vnode still has jobs, then don't clear */
		/* JOBEXCL */
		if (pnode->nd_nsnfree > 0) {
			/* some cpus free, clear "job-busy" state */
			set_vnode_state(pnode, ~INUSE_JOB, Nd_State_And);
		}
	} else {
		/* no jobs at all, clear both JOBEXCL and "job-busy" */
		set_vnode_state(pnode,
				~(INUSE_JOB | INUSE_JOBEXCL),
				Nd_State_And);

		/* call function to check and free the node from the */
		/* prov list and reset wait_prov flag, if set */
		if (check_job_substate(find_job(jobid), JOB_SUBSTATE_PROVISION))
			free_prov_vnode(pnode);
	}

	return (numcpus);
}

/**
 *
 * @brief
 *	Given a string of exec_vnode format, remove the vnode entries
 *	that are found in 'vnodelist'.
 *
 * @param[in]	execvnode 	- the input exec_vnode string
 * @param[in]	vnodelist 	- list of vnodes, plus-separated, that are to be deleted from the
 *			    		'execvnode' entry.
 * @param[in]	err_msg		- if there's any failure, put appropriate message here.
 * @param[in]	err_msg_sz 	- size of the 'err_msg' buffer.
 *
 * @return char *
 * @retaval <string>	- a new version of 'execvnode' string with entries
 *			  containing the vnodes in 'vnodelist' taken out.
 * @retval  NULL	- if an error has occurred.
 *
 * @note
 *	returned string is a malloced value that must be freed.
 */
static char *
delete_from_exec_vnode(char *execvnode, char *vnodelist, char *err_msg,
		       int err_msg_sz)
{
	char *exec_vnode = NULL;
	char *new_exec_vnode = NULL;
	char *chunk = NULL;
	char *last = NULL;
	int hasprn = 0;
	int entry = 0;
	int nelem;
	char *noden;
	struct key_value_pair *pkvp;
	char buf[LOG_BUF_SIZE] = {0};
	int j;
	int paren = 0;
	int parend = 0;

	if (execvnode == NULL) {
		snprintf(err_msg, err_msg_sz, "bad parameter");
		return NULL;
	}

	exec_vnode = strdup(execvnode);
	if (exec_vnode == NULL) {
		snprintf(err_msg, err_msg_sz, "execvnode strdup error");
		goto delete_from_exec_vnode_exit;
	}

	new_exec_vnode = (char *) calloc(1, strlen(exec_vnode) + 1);
	if (new_exec_vnode == NULL) {
		snprintf(err_msg, err_msg_sz,
			 "new_exec_vnode calloc error");
		goto delete_from_exec_vnode_exit;
	}

	new_exec_vnode[0] = '\0';
	entry = 0; /* exec_vnode entries */
	paren = 0;
	for (chunk = parse_plus_spec_r(exec_vnode, &last, &hasprn);
	     chunk != NULL;
	     chunk = parse_plus_spec_r(last, &last, &hasprn)) {
		paren += hasprn;
		if (parse_node_resc(chunk, &noden, &nelem, &pkvp) == 0) {
			if ((vnodelist != NULL) &&
			    !in_string_list(noden, '+', vnodelist)) {

				/* there's something put in previously */
				if (entry > 0) {
					strcat(new_exec_vnode, "+");
				}

				if (((hasprn > 0) && (paren > 0)) ||
				    ((hasprn == 0) && (paren == 0))) {
					/* at the beginning of chunk for current host */
					if (!parend) {
						strcat(new_exec_vnode, "(");
						parend = 1;
					}
				}
				if (!parend) {
					strcat(new_exec_vnode, "(");
					parend = 1;
				}
				strcat(new_exec_vnode, noden);
				entry++;

				for (j = 0; j < nelem; ++j) {
					snprintf(buf, sizeof(buf), ":%s=%s",
						 pkvp[j].kv_keyw, pkvp[j].kv_val);
					strcat(new_exec_vnode, buf);
				}

				/* have all chunks for current host */
				if (paren == 0) {

					if (parend) {
						strcat(new_exec_vnode, ")");
						parend = 0;
					}
				}
			} else {

				if (hasprn < 0) {
					/* matched ')' in chunk, so need to */
					/* balance the parenthesis */
					if (parend) {
						strcat(new_exec_vnode, ")");
						parend = 0;
					}
				}
			}
		} else {
			snprintf(err_msg, err_msg_sz,
				 "parse_node_resc error");
			goto delete_from_exec_vnode_exit;
		}
	}

	entry = strlen(new_exec_vnode) - 1;
	if ((entry >= 0) && (new_exec_vnode[entry] == '+'))
		new_exec_vnode[entry] = '\0';

	free(exec_vnode);
	return (new_exec_vnode);

delete_from_exec_vnode_exit:
	free(exec_vnode);
	free(new_exec_vnode);
	return NULL;
}

/**
 * @brief
 *	This return 1 if the given 'pmom' is a parent mom of
 *	node 'pnode'.
 *
 * @param[in]	pmom - the parent mom
 * @param[in]	pnode - the node to match against.
 *
 * @return int
 * @retval 1	- if true
 * @retval 0	- if  false
 */
static int
is_parent_mom_of_node(mominfo_t *pmom, pbsnode *pnode)
{
	int i;

	if ((pmom == NULL) || (pnode == NULL) ||
	    (pnode->nd_moms == NULL)) {
		return (0);
	}

	for (i = 0; i < pnode->nd_nummoms; i++) {
		if (pnode->nd_moms[i] == pmom) {
			return (1);
		}
	}
	return (0);
}

/**
 * @brief
 *	This removes 'pjob' from vnodes managed by parent mom 'pmom'.
 *	Also, if pjob's 'exec_vnode_deallocated' attribute is set,
 *	then remove entries in 'exec_vnode_deallocated' that match
 *	the vnodes where 'pjob' has already been taken out.
 *
 * @param[in]	pmom - the parent mom who sent the request.
 * @param[in]	pjob - job in question
 *
 * @return void
 */
static void
deallocate_job(mominfo_t *pmom, job *pjob)
{
	int i;
	int totcpus = 0;
	int totcpus0 = 0;
	char *freed_vnode_list = NULL;
	int freed_sz = 0;
	char *new_exec_vnode = NULL;
	char *jobid;
	pbs_sched *psched;

	if ((pmom == NULL) || (pjob == NULL)) {
		return;
	}

	jobid = pjob->ji_qs.ji_jobid;
	if ((jobid == NULL) || (*jobid == '\0'))
		return;

	for (i = 0; i < svr_totnodes; i++) {
		pbsnode *pnode;

		pnode = pbsndlist[i];

		if ((pnode != NULL) && !(pnode->nd_state & INUSE_DELETED) && is_parent_mom_of_node(pmom, pnode)) {
			totcpus0 = totcpus;
			totcpus += deallocate_job_from_node(pjob->ji_qs.ji_jobid, pnode);
			if (totcpus > totcpus0) {
				snprintf(log_buffer, sizeof(log_buffer),
					 "clearing job %s from node %s", jobid, pnode->nd_name);
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_NODE, LOG_DEBUG,
					  pmom->mi_host, log_buffer);
			}
			if (i != 0) {
				if (pbs_strcat(&freed_vnode_list, &freed_sz, "+") == NULL) {
					log_err(-1, __func__, "pbs_strcat failed");
					free(freed_vnode_list);
					return;
				}
			}
			if (pbs_strcat(&freed_vnode_list, &freed_sz, pnode->nd_name) == NULL) {
				log_err(-1, __func__, "pbs_strcat failed");
				free(freed_vnode_list);
				return;
			}
		}
	}
	if (totcpus > 0) {
		snprintf(log_buffer, sizeof(log_buffer), "deallocating %d cpu(s) from job %s", totcpus, jobid);
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_NODE, LOG_DEBUG,
			  pmom->mi_host, log_buffer);
	}

	if ((freed_vnode_list != NULL) && (is_jattr_set(pjob, JOB_ATR_exec_vnode_deallocated))) {
		char err_msg[LOG_BUF_SIZE];

		new_exec_vnode = delete_from_exec_vnode(
			get_jattr_str(pjob, JOB_ATR_exec_vnode_deallocated),
			freed_vnode_list, err_msg, LOG_BUF_SIZE);

		if (new_exec_vnode == NULL) {
			log_err(-1, __func__, err_msg);
			free(freed_vnode_list);
			return;
		}

		set_jattr_str_slim(pjob, JOB_ATR_exec_vnode_deallocated, new_exec_vnode, NULL);
		free(new_exec_vnode);
	}
	if (find_assoc_sched_jid(pjob->ji_qs.ji_jobid, &psched))
		set_scheduler_flag(SCH_SCHEDULE_TERM, psched);
	else {
		log_err(-1, __func__, "Unable to find scheduler associated with partition");
	}
	free(freed_vnode_list);
}
/**
 * @brief
 * 	We got an EOF on a stream.
 *
 * @param[in]	stream	-	the TPP stream connecting to the Mom
 * @param[in]	ret	-	not used here
 * @param[in]	msg	-	the reason why the mom is down
 *
 * @return	void
 */
void
stream_eof(int stream, int ret, char *msg)
{
	mominfo_t *mp;

	DBPRT(("entering %s", __func__))

	tpp_close(stream);

	/* find who the stream belongs to and mark down */
	if ((mp = tfind2((u_long) stream, 0, &streams)) != NULL) {
		DBPRT(("%s: %s down\n", __func__, mp->mi_host))
		log_errf(-1, __func__, "%s down", mp->mi_host);
		if (msg == NULL)
			msg = "communication closed";

		momptr_down(mp, msg);

		/* Down node and all subnodes */
		mp->mi_dmn_info->dmn_stream = -1;

		/* Since stream is now closed, reset the intermediate
		 * state INUSE_INIT.
		 */
		mp->mi_dmn_info->dmn_state &= ~INUSE_INIT;

#ifdef NAS /* localmod 005 */
		tdelete2((u_long) stream, 0ul, &streams);
#else
		tdelete2((u_long) stream, 0, &streams);
#endif /* localmod 005 */
	}
	return;
}

/**
 * @brief
 * 		Mark all the nodes in mom array as unknown
 * @see
 * 		net_down_handler
 *
 * @param[in]	this value should be 1 - to mark all the mom state as unknown.
 *
 * @return	void
 */
void
mark_nodes_unknown(int all)
{
	mominfo_t *pmom;
	int i;
	int stm;
	dmn_info_t *pdmn_info;

	DBPRT(("entering %s", __func__))

	for (i = 0; i < mominfo_array_size; i++) {
		if (mominfo_array[i]) {
			pmom = mominfo_array[i];
			pdmn_info = pmom->mi_dmn_info;

			if ((pdmn_info->dmn_state & INUSE_INIT) || all == 1) {
				set_all_state(pmom, 1, INUSE_UNKNOWN, NULL, Set_All_State_Regardless);
				stm = pdmn_info->dmn_stream;
				if (stm >= 0) {
					tpp_close(stm);
					tdelete2((u_long) stm, 0, &streams);
				}
				pdmn_info->dmn_stream = -1;

				/* Since stream is being closed, reset the intermediate
				 * state INUSE_INIT.
				 */
				pdmn_info->dmn_state &= ~INUSE_INIT;
				pdmn_info->dmn_state |= INUSE_UNKNOWN | INUSE_MARKEDDOWN;
			}
		}
	}
}

/**
 * @brief The TPP multicast version for server -> mom.
 *
 * @param[in] pmom - The mom to ping
 * @param[in] mtfd - The TPP channel to add moms for multicasting.
 * @param[in] unique - Ensure only unique values are added.
 * 
 * 
 * @return int
 * @retval 0: success
 * @retval !0: failure
 *
 */
int
mcast_add(mominfo_t *pmom, int *mtfd, bool unique)
{
	dmn_info_t *pdmninfo;
	int rc = 0;

	if (!pmom)
		return -1;

	pdmninfo = pmom->mi_dmn_info;

	DBPRT(("%s: entered\n", __func__))

	if (pdmninfo->dmn_stream < 0)
		return -1;

	/* open the tpp mcast channel here */
	if (*mtfd == -1 && (*mtfd = tpp_mcast_open()) == -1) {
		log_err(-1, __func__, "Failed to open TPP mcast channel for broadcasting messages");
		return -1;
	}

	rc = tpp_mcast_add_strm(*mtfd, pdmninfo->dmn_stream, unique);

	if (rc == -1) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "Failed to add service endpoint at %s:%d to mcast", pmom->mi_host, pmom->mi_port);
		log_err(-1, __func__, log_buffer);
		tpp_close(pdmninfo->dmn_stream);
		tdelete2((u_long) pdmninfo->dmn_stream, 0, &streams);
		pdmninfo->dmn_stream = -1;
	}

	return rc;
}

/**
 * @brief
 * 	Multicast function to close all failed streams and close them
 *
 * @param[in]	stm	- multi-cast stream where broadcast is attempted
 * @param[in]	ret	- failure return code
 *
 * @return	void
 */
void
close_streams(int stm, int ret)
{
	int *strms;
	int count = 0;
	int i;
	mominfo_t *pmom;
	dmn_info_t *pdmninfo;
	struct sockaddr_in *addr;

	if (stm < 0)
		return;

	strms = tpp_mcast_members(stm, &count);

	for (i = 0; i < count; i++) {
		if ((pmom = tfind2((u_long) strms[i], 0, &streams)) != NULL) {
			pdmninfo = pmom->mi_dmn_info;
			/* find the respective mom from the stream */
			addr = tpp_getaddr(pdmninfo->dmn_stream);
			snprintf(log_buffer, sizeof(log_buffer), "%s %d to %s(%s)",
				 dis_emsg[ret], errno, pmom->mi_host, netaddr(addr));
			log_err(-1, __func__, log_buffer);
			stream_eof(pdmninfo->dmn_stream, ret, "mcast failed!");
		}
	}
}

/**
 * @brief
 * 		Mom multicast functions to broadcast a single command to all the moms.
 *
 * @param[in]	ptask	- work task structure
 *
 * @return	void
 */
void
mcast_msg(struct work_task *ptask)
{
	dmn_info_t *pdmninfo;
	int i;
	int ret;

	if (!ptask)
		return;

	DBPRT(("%s: entered\n cmd: %d", __func__, ptask->wt_aux))

	int cmd = ptask->wt_aux;
	int mtfd = -1;

	switch (cmd) {
		case IS_CLUSTER_ADDRS:
			for (i = 0; i < mominfo_array_size; i++) {

				if (!mominfo_array[i])
					continue;

				pdmninfo = mominfo_array[i]->mi_dmn_info;
				if ((pdmninfo->dmn_state & INUSE_NEED_ADDRS) && pdmninfo->dmn_stream >= 0) {
					mcast_add(mominfo_array[i], &mtfd, FALSE);
					if (pdmninfo->dmn_state & INUSE_MARKEDDOWN)
						pdmninfo->dmn_state &= ~INUSE_MARKEDDOWN;
					set_all_state(mominfo_array[i], 0, INUSE_DOWN | INUSE_NEED_ADDRS,
						      NULL, Set_All_State_Regardless);
				}
			}

			if ((ret = send_ip_addrs_to_mom(mtfd, 0)) != DIS_SUCCESS)
				close_streams(mtfd, ret);

			tpp_mcast_close(mtfd);
			break;

		case IS_REPLYHELLO:
			if (mtfd_replyhello != -1)
				if ((ret = reply_hellosvr(mtfd_replyhello, 1)) != DIS_SUCCESS)
					close_streams(mtfd_replyhello, ret);
			if (mtfd_replyhello_noinv != -1)
				if ((ret = reply_hellosvr(mtfd_replyhello_noinv, 0)) != DIS_SUCCESS)
					close_streams(mtfd_replyhello_noinv, ret);

			tpp_mcast_close(mtfd_replyhello);
			tpp_mcast_close(mtfd_replyhello_noinv);
			mtfd_replyhello = -1;
			mtfd_replyhello_noinv = -1;
			break;

		default:
			break;
	}
}

/**
 * @brief
 * 		Add placement set names to the Server's pnames attribute.
 * @see
 * 		update2_to_vnode and is_request
 *
 * @param[in]	namestr	-	The namestr paramenter is a comma separated set of strings
 * 							Each separate name is added only if it isn't already in pnames
 * @return	int
 * @retval	0	- success
 * @retval	1	- failure
 */

static int
setup_pnames(char *namestr)
{
	int i;
	char *newbuffer;
	int newentries = 0;
	char *pe;
	char *ps;
	attribute *ppnames;
	struct array_strings *pparst;
	char *workcopy;
	int resc_added = 0;

	if ((namestr == NULL) || (*namestr == '\0'))
		return 0;
	workcopy = strdup(namestr);
	if (workcopy == NULL)
		return 1;
	if ((newbuffer = (char *) malloc(strlen(workcopy) + 1)) == NULL) {
		free(workcopy);
		return 1;
	}
	*newbuffer = '\0';

	ppnames = get_sattr(SVR_ATR_PNames);
	pparst = ppnames->at_val.at_arst;
	ps = workcopy;

	/* look at each individual resource name in the comma seperated list */
	while (*ps) {
		while (*ps && isspace((int) *ps))
			ps++;
		pe = ps;
		while (*pe && (*pe != ','))
			pe++;
		if (*pe == ',')
			*pe++ = '\0';

		/* is the resource name already in the pnames attribute? */
		if (pparst) {
			for (i = 0; i < pparst->as_usedptr; ++i) {
				if (strcasecmp(ps, pparst->as_string[i]) == 0)
					break;
			}
		}
		if ((pparst == NULL) || (i == pparst->as_usedptr)) {
			/* not there already, ok to add this word */
			if (newentries++)
				strcat(newbuffer, ",");
			strcat(newbuffer, ps);
		}

		/* next see if it needs to be added to resourcedef */
		if (!find_resc_def(svr_resc_def, ps)) {
			if (add_resource_def(ps, ATR_TYPE_ARST, NO_USER_SET) == 0)
				resc_added++;
		}

		ps = pe;
	}

	if (resc_added > 0) {
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
			  LOG_INFO, "setup_pnames",
			  "Restarting Python interpreter as resourcedef file has changed.");
		pbs_python_ext_shutdown_interpreter(&svr_interp_data);
		if (pbs_python_ext_start_interpreter(&svr_interp_data) != 0) {
			log_err(PBSE_INTERNAL, __func__, "Failed to restart Python interpreter");
			free(workcopy);
			free(newbuffer);
			return 1;
		}

		send_rescdef(1);
	}

	if (newentries) {
		int flag = 0;

		if (is_attr_set(ppnames) == 0 || (ppnames->at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) == (ATR_VFLAG_SET | ATR_VFLAG_DEFLT))
			flag = ATR_VFLAG_DEFLT;

		set_sattr_generic(SVR_ATR_PNames, newbuffer, NULL, INCR);
		ppnames->at_flags |= flag;
	}
	free(workcopy);
	free(newbuffer);

	return 0;
}

/**
 * @brief
 * 		add mom to the vnode list if it is not listed, and if there is no room,
 * 		re-strucure and create room for the mom. Add Mom's name to this vnode's Mom attribute
 * 		and set reverse linkage Mom -> node.
 * @see
 * 		update2_to_vnode and create_pbs_node2.
 *
 *
 */
int
cross_link_mom_vnode(struct pbsnode *pnode, mominfo_t *pmom)
{
	int i;
	int n;
	mom_svrinfo_t *prmomsvr;

	if ((pnode == NULL) || (pmom == NULL))
		return (PBSE_NONE);

	/* see if the node already has this Mom listed,if not add her */

	for (i = 0; i < pnode->nd_nummoms; ++i) {
		if (pnode->nd_moms[i] == pmom)
			break;
	}

	if (i == pnode->nd_nummoms) {
		/* need to add this parent Mom in the node's array */
		if (pnode->nd_nummoms == pnode->nd_nummslots) {

			/* need to expand the array to make room */
			mominfo_t **tmpim;

			n = pnode->nd_nummslots;
			if (n == 0)
				n = 1;
			else
				n *= 2;
			tmpim = (mominfo_t **) realloc(pnode->nd_moms,
						       n * sizeof(mominfo_t *));
			if (tmpim == NULL)
				return (PBSE_SYSTEM);
			pnode->nd_moms = tmpim;
			pnode->nd_nummslots = n;
		}
		pnode->nd_moms[pnode->nd_nummoms++] = pmom;

		/* also add Mom's name to this vnode's Mom attribute */
		set_nattr_generic(pnode, ND_ATR_Mom, pmom->mi_host, NULL, INCR);
	}

	/* Now set reverse linkage Mom -> node */

	prmomsvr = pmom->mi_data;
	for (i = 0; i < prmomsvr->msr_numvnds; ++i) {
		if (prmomsvr->msr_children[i] == pnode)
			break;
	}
	if (i == prmomsvr->msr_numvnds) {

		/* need to add this node to array of Mom's children */
		if (prmomsvr->msr_numvnds == prmomsvr->msr_numvslots) {
			/* need to expand the array (double it) */
			struct pbsnode **tmpn;

			n = prmomsvr->msr_numvslots;
			if (n == 0)
				n = 1;
			else
				n *= 2;
			tmpn = (struct pbsnode **) realloc(prmomsvr->msr_children,
							   n * sizeof(struct pbsnode *));
			if (tmpn == NULL)
				return (PBSE_SYSTEM);
			prmomsvr->msr_children = tmpn;
			prmomsvr->msr_numvslots = n;
		}
		prmomsvr->msr_children[prmomsvr->msr_numvnds++] = pnode;
	}
	return 0;
}

#define UPDATE_FROM_HOOK "update_from_hook"
#define UPDATE2 "update2"
#define UPDATE_FROM_HOOK_U "UPDATE_FROM_HOOK"
#define UPDATE2_U "UPDATE2"
#define UPDATE_FROM_MOM_HOOK "update from mom hook"
#define UPDATE "update"
/**
 * @brief
 * 		create/update vnodes from the information sent by Mom in the UPDATE2
 * 		message.
 *
 * @param[in]  pvnal 	- info on one vnode from Mom
 * @param[in]  new   	- true if ok to create new vnode
 * @param[in]  pmom  	- the Mom which sent this update
 * @param[out] madenew 	- set non-zero if any new vnodes were created
 * @param[out] from_hook - set non-zero if request coming from hook
 *			  Normally set to 1 for regular vnoded request;
 *			  2 for qmgr-like (non-vnoded) request.
 *
 * @return int
 * @retval	zero	- ok
 * @retval	PBSE_ number	- error
 *
 * @par MT-safe: No
 */
static int
update2_to_vnode(vnal_t *pvnal, int new, mominfo_t *pmom, int *madenew, int from_hook)
{
	int bad;
	int i;
	int j;
	int localmadenew = 0;
	struct pbsnode *pnode;
	pbs_list_head atrlist;
	svrattrl *pal;
	char buf[200];
	attribute *pattr;
	attribute *pRA;
	vna_t *psrp;
	char *dot;
	char *resc;
	resource *prs;
	resource_def *prdef;
	resource_def *prdefhost;
	resource_def *prdefvnode;
	mom_svrinfo_t *pcursvrm;
	vnpool_mom_t *ppool;
	static char *cannot_def_resc = "error: resource %s for vnode %s cannot be defined";
	char *p;
	char hook_name[HOOK_BUF_SIZE + 1];
	int vn_state_updates = 0;
	int vn_resc_added = 0;

	CLEAR_HEAD(atrlist);

	/*
	 * Can't do static initialization of these because svr_resc_def
	 * may change as new resources are added dynamically.
	 */
	prdefhost = &svr_resc_def[RESC_HOST];
	prdefvnode = &svr_resc_def[RESC_VNODE];

	pnode = find_nodebyname(pvnal->vnal_id);

	if (pnode == NULL) {
		/*
		 * see if this vnode def entry contains the topology info
		 * if so, it is the natural vnode for this Mom or for the
		 * first compute node on a cray which isn't of concern here
		 */

		int have_topology = 0;
		int is_compute_node = 0;
		for (i = 0; i < pvnal->vnal_used; i++) {
			psrp = VNAL_NODENUM(pvnal, i);
			if (strcasecmp(psrp->vna_name, ATTR_NODE_TopologyInfo) == 0)
				have_topology = 1;
			if ((strcasecmp(psrp->vna_name, "resources_available.vntype") == 0) && (strcasecmp(psrp->vna_val, CRAY_COMPUTE) == 0))
				is_compute_node = 1;
		}
		if ((have_topology == 1) && (is_compute_node == 0)) {
			/* this is for the natural vnode, use that for pnode */
			mom_svrinfo_t *prmomsvr = pmom->mi_data;
			pnode = prmomsvr->msr_children[0];
		}
	}

	if ((pnode == NULL) && new) {
		/* create vnode */
		pal = attrlist_create(ATTR_NODE_Mom, 0, strlen(pmom->mi_host) + 1);
		strcpy(pal->al_value, pmom->mi_host);
		append_link(&atrlist, &pal->al_link, pal);
		if (pmom->mi_port != PBS_MOM_SERVICE_PORT) {
			sprintf(buf, "%u", pmom->mi_port);
			pal = attrlist_create(ATTR_NODE_Port, 0, strlen(buf) + 1);
			strcpy(pal->al_value, buf);
			append_link(&atrlist, &pal->al_link, pal);
		}
		pal = GET_NEXT(atrlist);
		bad = create_pbs_node(pvnal->vnal_id, pal, ATR_DFLAG_MGWR,
				      &bad, &pnode, FALSE);
		free_attrlist(&atrlist);
		if (bad != 0) {
			snprintf(log_buffer, sizeof(log_buffer),
				 "could not autocreate vnode \"%s\", error = %d",
				 pvnal->vnal_id, bad);
			log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
				  LOG_NOTICE, pmom->mi_host, log_buffer);
			return bad;
		}
		*madenew = 1;
		localmadenew = 1;
		snprintf(log_buffer, sizeof(log_buffer),
			 "autocreated vnode %s", pvnal->vnal_id);
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
			  LOG_INFO, pmom->mi_host, log_buffer);
	}

	if (pnode == NULL) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "%s reported in %s message from Mom on %s",
			 pvnal->vnal_id,
			 from_hook ? UPDATE_FROM_HOOK_U : UPDATE2_U,
			 pmom->mi_host);
		log_err(PBSE_UNKNODE, from_hook ? UPDATE_FROM_HOOK : UPDATE2, log_buffer);
		return PBSE_UNKNODE;
	}

	/* If the request is coming from a hook, check if the MoM requesting the update
	 * actually owns the vnode. If it does not, do not crosslink and return an error.
	 */
	if (from_hook == 1) {
		int pnode_has_mom = 0;
		/* see if the node already has this Mom listed,if not add her */
		for (i = 0; i < pnode->nd_nummoms; ++i) {
			if (pnode->nd_moms[i] == pmom)
				pnode_has_mom = 1;
			break;
		}

		if (!pnode_has_mom) {
			snprintf(log_buffer, sizeof(log_buffer),
				 "Not allowed to update vnode '%s', as it is owned by a different mom", pvnal->vnal_id);
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_NODE,
				  LOG_INFO, pmom->mi_host, log_buffer);
			return (PBSE_BADHOST);
		}
	}

	/* if mom has a vnode_pool value */
	pcursvrm = (mom_svrinfo_t *) (pmom->mi_data);
	if ((localmadenew == 1) && (pcursvrm->msr_vnode_pool > 0)) {
		ppool = find_vnode_pool(pmom);
		if (ppool != NULL) {
			for (j = 0; j < ppool->vnpm_nummoms; ++j) {
				if (ppool->vnpm_moms[j] != NULL) {
					int ret;

					if ((ret = cross_link_mom_vnode(pnode, ppool->vnpm_moms[j])) != 0) {
						/* deal with error */
						return (ret);
					}
				}
			}
		}
	} else if (from_hook != 2) {
		/* not done for UPDATE_FROM_HOOK2 (i.e. from_hook == 2)
		 * as it becomes like a qmgr request. So no need to change
		 * current vnode's parent mom be the incoming node,
		 * which is what cross_link_mom_node() does.
		 */
		if ((i = cross_link_mom_vnode(pnode, pmom)) != 0)
			return (i);
	}

	/*
	 * Attributes and Resources within Resources_Available set by a Mom
	 * via this message (and not coming from the UPDATE_FROM_HOOK),
	 * have the ATR_VFLAG_DEFLT (default) flag set.
	 * If the Mom no longer reports the attribute/resource it should be
	 * unset.  The only way to do this is unset all "default" attribute/
	 * resources first then reset what Mom is now reporting.
	 *
	 * Exceptions to the above:
	 * resources_available.host - must be set,  so it isn't unset
	 *	even if default
	 * sharing - can only be set via this message, so set to the default
	 *	value to insure it is reset based on what Mom now sends or to
	 *	the default setting if Mom no longer sends anything
	 */

	if (!from_hook) {
		for (i = 0; i < ND_ATR_LAST; ++i) {
			/* if this vnode has been updated earlier in this update2 */
			/* then don't free anything but topology */
			if ((i != ND_ATR_TopologyInfo))
				continue; /* seeing vnl update for node just updated, don't clear */

			if (i != ND_ATR_ResourceAvail) {
				if (((get_nattr(pnode, i))->at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) == (ATR_VFLAG_SET | ATR_VFLAG_DEFLT))
					free_nattr(pnode, i);
			} else if (is_nattr_set(pnode, i) != 0) {
				prs = (resource *) GET_NEXT(get_nattr_list(pnode, i));
				while (prs) {
					if ((prs->rs_value.at_flags & ATR_VFLAG_DEFLT) &&
					    (prs->rs_defin != prdefhost) &&
					    (prs->rs_defin != prdefvnode)) {
						prs->rs_defin->rs_free(&prs->rs_value);
					}
					prs = (resource *) GET_NEXT(prs->rs_link);
				}
			}
		}

		set_nattr_l_slim(pnode, ND_ATR_Sharing, VNS_DFLT_SHARED, SET);
		(get_nattr(pnode, ND_ATR_Sharing))->at_flags |= ATR_VFLAG_DEFLT;
	}

	/* set attributes/resources if they are default */

	pRA = get_nattr(pnode, ND_ATR_ResourceAvail);

	for (i = 0; i < pvnal->vnal_used; i++) {
		psrp = VNAL_NODENUM(pvnal, i);
		strncpy(buf, psrp->vna_name, sizeof(buf) - 1);
		buf[sizeof(buf) - 1] = '\0';

		/* make sure no trailing white space in the value */
		for (dot = psrp->vna_val + strlen(psrp->vna_val) - 1;
		     dot >= psrp->vna_val;
		     dot--) {
			if (isspace((int) *dot))
				*dot = '\0';
			else
				break;
		}

		if ((dot = strchr(buf, (int) '.')) != NULL) {
			/* found a resource setting, had better be Resources_Available */
			resc = dot + 1;
			*dot = '\0';
			if ((strcasecmp(buf, ATTR_rescavail) != 0) &&
			    (from_hook && (strcasecmp(buf, ATTR_rescassn) != 0))) {
				snprintf(log_buffer, sizeof(log_buffer),
					 "error: not legal to set resource in attribute %s, in %s for vnode %s",
					 psrp->vna_name, from_hook ? UPDATE_FROM_MOM_HOOK : UPDATE,
					 pnode->nd_name);
				log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
					  LOG_ERR, pmom->mi_host, log_buffer);
				continue;
			}

			if (from_hook && (strcasecmp(buf, ATTR_rescassn) == 0))
				pRA = get_nattr(pnode, ND_ATR_ResourceAssn);

			/* Is the resource already defined? */
			prdef = find_resc_def(svr_resc_def, resc);
			if (prdef == NULL) {
				int err;

				/* currently resource is undefined, add it */

				err = add_resource_def(resc, psrp->vna_type, psrp->vna_flag);
				if (err < 0) {
					snprintf(log_buffer, sizeof(log_buffer), cannot_def_resc,
						 resc, pvnal->vnal_id);
					log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_NODE,
						  LOG_ERR, pmom->mi_host, log_buffer);
					continue; /* skip this attribute, go to next */
				} else {

					snprintf(log_buffer, sizeof(log_buffer),
						 "adding resource %s, type %d, in update for vnode %s", resc, psrp->vna_type, pnode->nd_name);
					log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_NODE,
						  LOG_INFO, pmom->mi_host, log_buffer);
					vn_resc_added++;
				}
				/* now find the new resource definition */
				prdef = find_resc_def(svr_resc_def, resc);
				if (prdef == NULL)
					continue; /* skip this attribute, go to next */
			} else if ((psrp->vna_type != 0) &&
				   (psrp->vna_type != prdef->rs_type)) {
				snprintf(log_buffer, sizeof(log_buffer), cannot_def_resc,
					 resc, pvnal->vnal_id);
				log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_NODE,
					  LOG_ERR, pmom->mi_host, log_buffer);
				continue; /* skip this attribute/resource, go to next */
			}

			/* add resource entry to Resources_Available for the vnode */

			prs = add_resource_entry(pRA, prdef);
			if (prs) {
				bad = 0;
				if (from_hook ||
				    (prs->rs_value.at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) != ATR_VFLAG_SET) {
					/* if not from_hook, will only set */
					/* resource values that have the */
					/* ATR_VFLAG_DEFLT flag only, which */
					/* means it wasn't set externally */
					/* (i.e. qmgr). */
					/* if from_hook, we override values */
					/* set externally. */

					/* If indirect resource, decode it as a string */
					if (psrp->vna_val[0] == '@') {
						extern int resc_access_perm;
						int perms = resc_access_perm;
						resc_access_perm |= ATR_PERM_ALLOW_INDIRECT;
						bad = decode_str(&prs->rs_value, psrp->vna_name, resc, psrp->vna_val);
						resc_access_perm = perms;
						if (bad == 0) {
							prs->rs_value.at_flags |= ATR_VFLAG_DEFLT | ATR_VFLAG_INDIRECT;
							bad = fix_indirectness(prs, pnode, 1);
						}
					} else if ((bad = prdef->rs_decode(&prs->rs_value, buf, resc, psrp->vna_val)) == 0) {
						/* This (ATR_FLAG_DEFLT) means set by the */
						/* server and not manager */
						/* mom hook we're treating */
						/* set by manager */
						if (from_hook) {
							/* These flags ensure */
							/* changes survive */
							/* server restart */
							prs->rs_value.at_flags &= ~ATR_VFLAG_DEFLT;
							post_attr_set(&prs->rs_value);
							if (psrp->vna_val[0] != '\0') {
								prs->rs_value.at_flags |= (ATR_VFLAG_SET | ATR_VFLAG_MODIFY);
							} else {
								prs->rs_defin->rs_free(&prs->rs_value);
								delete_link(&prs->rs_link);
								free(prs);
							}
						} else
							prs->rs_value.at_flags |= ATR_VFLAG_DEFLT;
						if (strcasecmp("ncpus", resc) == 0) {
							/* if ncpus, adjust virtual/subnodes */
							j = prs->rs_value.at_val.at_long;
							mod_node_ncpus(pnode, j, ATR_ACTION_ALTER);
						}
					}
					if (bad != 0) {
						snprintf(log_buffer, sizeof(log_buffer),
							 "Error %d decoding resource %s in update for vnode %s", bad, resc, pnode->nd_name);
						log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
							  LOG_WARNING, pmom->mi_host, log_buffer);
					} else if (from_hook) {
						snprintf(log_buffer,
							 sizeof(log_buffer),
							 "Updated vnode %s's "
							 "resource %s=%s per "
							 "mom hook request",
							 pnode->nd_name,
							 psrp->vna_name,
							 psrp->vna_val);
						log_event(PBSEVENT_DEBUG2,
							  PBS_EVENTCLASS_NODE,
							  LOG_INFO, pmom->mi_host,
							  log_buffer);
					}
				}
			}

		} else if (strcasecmp(psrp->vna_name, VNATTR_PNAMES) == 0) {

			/* special case pnames because it is set at the Server */

			snprintf(log_buffer, sizeof(log_buffer), "pnames %s",
				 psrp->vna_val);
			log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
				  LOG_INFO, pmom->mi_host, log_buffer);

			setup_pnames(psrp->vna_val);

		} else if (strcasecmp(psrp->vna_name, VNATTR_HOOK_REQUESTOR) == 0) {

			if (from_hook) {
				/* decides whether succeeding requests from the same 'pvnal' */
				/* should be allowed; if the name is the null string */
				/* the hook ran as the administrator (root) */

				if ((*psrp->vna_val != '\0') &&
				    ((svr_get_privilege(psrp->vna_val, pmom->mi_host) &
				      (ATR_DFLAG_MGWR | ATR_DFLAG_OPWR)) == 0)) {
					snprintf(log_buffer, sizeof(log_buffer),
						 hook_privilege, psrp->vna_val, pmom->mi_host);
					log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_NODE,
						  LOG_INFO, pmom->mi_host, log_buffer);
					return (PBSE_PERM);
				}
			}
		} else if (strcasecmp(psrp->vna_name, VNATTR_HOOK_OFFLINE_VNODES) == 0) {

			if (from_hook) {
				p = strchr(psrp->vna_val, ',');
				hook_name[0] = '\0';
				if (p != NULL) {
					*p = '\0';
					p++;
					strncpy(hook_name, p, HOOK_BUF_SIZE);
				}
				if (strcmp(psrp->vna_val, "1") == 0) {
					char hook_buf[sizeof(hook_name) + 40];
					snprintf(hook_buf, sizeof(hook_buf),
						 "offlined by hook '%s' due to hook error",
						 hook_name);
					mark_node_offline_by_mom(pnode->nd_name, hook_buf);
				} else if (strcmp(psrp->vna_val, "0") == 0) {
					clear_node_offline_by_mom(pnode->nd_name, NULL);
				}
				if (p != NULL)
					*p = ','; /* restore psrp->vna_val */
			}

		} else if (strcasecmp(psrp->vna_name, VNATTR_HOOK_SCHEDULER_RESTART_CYCLE) == 0) {

			if (from_hook) {
				p = strchr(psrp->vna_val, ',');
				hook_name[0] = '\0';
				if (p != NULL) {
					*p = '\0';
					p++;
					strncpy(hook_name, p, HOOK_BUF_SIZE);
				}
				if (strcmp(psrp->vna_val, "1") == 0) {

					set_scheduler_flag(SCH_SCHEDULE_RESTART_CYCLE, dflt_scheduler);
					snprintf(log_buffer,
						 sizeof(log_buffer),
						 "hook '%s' requested for "
						 "scheduler to restart cycle",
						 hook_name);
					log_event(PBSEVENT_DEBUG2,
						  PBS_EVENTCLASS_NODE,
						  LOG_INFO, pmom->mi_host,
						  log_buffer);
				}
				if (p != NULL)
					*p = ','; /* restore psrp->vna_val */
			}

		} else {

			/* a non-resource attribute to set */

			j = find_attr(node_attr_idx, node_attr_def, psrp->vna_name);
			if (j == -1) {
				snprintf(log_buffer, sizeof(log_buffer),
					 "unknown attribute %s in %s for vnode %s",
					 psrp->vna_name,
					 from_hook ? UPDATE_FROM_MOM_HOOK : UPDATE,
					 pnode->nd_name);
				log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
					  LOG_WARNING, pmom->mi_host, log_buffer);
				continue;
			}
			pattr = get_nattr(pnode, j);
			if (from_hook || ((pattr->at_flags &
					   (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) != ATR_VFLAG_SET)) {
				/* if not from_hook, will only set attribute */
				/* values that have the ATR_VFLAG_DEFLT flag */
				/* only, which means it wasn't set externally */
				/* (i.e. qmgr). */
				/* if from_hook, we override values */
				/* set externally. */

				if (from_hook) {
					if (node_attr_def[j].at_action &&
					    (bad = node_attr_def[j].at_action(pattr, pnode, ATR_ACTION_ALTER))) {
						snprintf(log_buffer, sizeof(log_buffer),
						    "Error %d setting attribute %s "
						    "in %s for vnode %s",
						    bad, psrp->vna_name,
						    UPDATE_FROM_MOM_HOOK,
						    pnode->nd_name);
						log_event(PBSEVENT_SYSTEM,
						    PBS_EVENTCLASS_NODE, LOG_WARNING,
						    pmom->mi_host, log_buffer);
						continue;
					}
				}

				bad = set_attr_generic(pattr, &node_attr_def[j], psrp->vna_val, NULL, INTERNAL);
				if (bad != 0) {
					snprintf(log_buffer, sizeof(log_buffer),
						 "Error %d decoding attribute %s "
						 "in %s for vnode %s",
						 bad, psrp->vna_name,
						 from_hook ? UPDATE_FROM_MOM_HOOK : UPDATE,
						 pnode->nd_name);
					log_event(PBSEVENT_SYSTEM,
						  PBS_EVENTCLASS_NODE, LOG_WARNING,
						  pmom->mi_host, log_buffer);
					continue;
				}
				if (from_hook) {
					/* these flag values ensure changes */
					/* are displayed and survive server */
					/* restart */

					pattr->at_flags &= ~ATR_VFLAG_DEFLT;
					pattr->at_flags |= ATR_VFLAG_MODCACHE;
					if (psrp->vna_val[0] != '\0')
						pattr->at_flags |= (ATR_VFLAG_SET | ATR_VFLAG_MODIFY);
					snprintf(log_buffer,
						 sizeof(log_buffer),
						 "Updated vnode %s's "
						 "attribute %s=%s per "
						 "mom hook request",
						 pnode->nd_name,
						 psrp->vna_name,
						 psrp->vna_val);
					log_event(PBSEVENT_DEBUG2,
						  PBS_EVENTCLASS_NODE, LOG_INFO,
						  pmom->mi_host, log_buffer);

				} else
					pattr->at_flags |= ATR_VFLAG_DEFLT;

				if (strcasecmp(psrp->vna_name, ATTR_NODE_VnodePool) == 0) {
					if ((bad = node_attr_def[j].at_action(pattr,
									      pnode, ATR_ACTION_ALTER)) == 0) {
						pattr->at_flags |= ATR_VFLAG_DEFLT;
					} else {
						snprintf(log_buffer, sizeof(log_buffer),
							 "Error %d setting attribute %s "
							 "in update for vnode %s",
							 bad,
							 psrp->vna_name, pnode->nd_name);
						log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
							  LOG_WARNING, pmom->mi_host, log_buffer);
					}
				}
			}
			if ((strcasecmp(psrp->vna_name,
					ATTR_NODE_TopologyInfo) == 0) ||
			    (strcasecmp(psrp->vna_name,
					ATTR_NODE_state) == 0)) {
				bad = node_attr_def[j].at_action(pattr,
								 pnode, ATR_ACTION_ALTER);
				if (bad != 0) {
					snprintf(log_buffer, sizeof(log_buffer),
						 "Error %d setting attribute %s "
						 "in %s for vnode %s",
						 bad,
						 psrp->vna_name,
						 from_hook ? UPDATE_FROM_MOM_HOOK : UPDATE,
						 pnode->nd_name);
					log_event(PBSEVENT_SYSTEM,
						  PBS_EVENTCLASS_NODE,
						  LOG_WARNING,
						  pmom->mi_host,
						  log_buffer);
				}
				if (strcasecmp(psrp->vna_name,
					       ATTR_NODE_state) == 0) {
					vn_state_updates++;
				}
			}
		}
	}

	if (vn_resc_added > 0) {
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
			  LOG_INFO, "update2_to_vnode",
			  "Restarting Python interpreter as resourcedef file has changed.");
		pbs_python_ext_shutdown_interpreter(&svr_interp_data);
		if (pbs_python_ext_start_interpreter(&svr_interp_data) != 0) {
			log_err(PBSE_INTERNAL, __func__, "Failed to restart Python interpreter");
			return PBSE_INTERNAL;
		}

		send_rescdef(1);
	}

	if (pnode) {
		int states_to_clear = 0;

		check_and_set_multivnode(pnode);

		if (from_hook) {
			/* INUSE_DOWN not part here since it could */
			/* have been set from a hook . */
			states_to_clear = (INUSE_STALE | INUSE_UNKNOWN);
			if (vn_state_updates == 0) {
				states_to_clear |= INUSE_DOWN;
			}
		} else {
			states_to_clear = (INUSE_STALE | INUSE_DOWN | INUSE_UNKNOWN);
		}

		/* clear stale, down, unknown bits in state */
		set_vnode_state(pnode,
				~states_to_clear,
				Nd_State_And);
		pnode->nd_modified = 1;
		return 0;
	} else {
		snprintf(log_buffer, sizeof(log_buffer),
			 "vnode %s declared by %s but it does not exist",
			 pvnal->vnal_id, pmom->mi_host);
		log_err(PBSE_UNKNODE, from_hook ? UPDATE_FROM_HOOK : UPDATE2, log_buffer);
		return PBSE_UNKNODE;
	}
}

/**
 * @brief
 * 		Check if vnode shares the resource "host" with any other vnode, and
 * 		set vnode attribute "in_multivnode_host" accordingly.
 * @see
 * 		update2_to_vnode
 *
 * @param[in] pnode - The node being considered
 *
 * @return void
 *
 * @par MT-Safe: no, depends on globals svr_totnodes and pbsndlist
 *
 * @par Esoteric Side-case:
 * 		In a multivnode host, all vnodes being processed
 * 		that have not been checked by this function are assumed to be in state
 * 		stale; this is needed to handle the case of two vnodes that would swap
 * 		resources_available.host on an update.
 */
static void
check_and_set_multivnode(struct pbsnode *pnode)
{
	int i;
	resource *prc;
	resource *prc_i;
	resource_def *prd;
	char *host_str1 = NULL;

	if (pnode == NULL)
		return;

	prd = &svr_resc_def[RESC_HOST];
	if (prd == NULL)
		return;

	prc = find_resc_entry(get_nattr(pnode, ND_ATR_ResourceAvail), prd);
	if (prc == NULL) {
		if (pnode->nd_hostname != NULL)
			host_str1 = pnode->nd_hostname;
	} else {
		host_str1 = prc->rs_value.at_val.at_str;
	}

	for (i = 0; i < svr_totnodes; i++) {
		if (pnode != pbsndlist[i]) {
			char *host_str2 = NULL;

			if (pbsndlist[i]->nd_state & INUSE_STALE)
				continue;

			prc_i = find_resc_entry(get_nattr(pbsndlist[i], ND_ATR_ResourceAvail), prd);
			if (prc_i == NULL) {
				if (pbsndlist[i]->nd_hostname != NULL)
					host_str2 = pbsndlist[i]->nd_hostname;
			} else {
				host_str2 = prc_i->rs_value.at_val.at_str;
			}

			if (host_str1 && host_str2 && !strcmp(host_str1, host_str2)) {
				set_nattr_l_slim(pbsndlist[i], ND_ATR_in_multivnode_host, 1, SET);
				(get_nattr(pbsndlist[i], ND_ATR_in_multivnode_host))->at_flags |= ATR_VFLAG_DEFLT;

				/* DEFLT needed to reset on update */
				set_nattr_l_slim(pnode, ND_ATR_in_multivnode_host, 1, SET);
				(get_nattr(pnode, ND_ATR_in_multivnode_host))->at_flags |= ATR_VFLAG_DEFLT;
				break;
			}
		}
	}
}

/**
 * @brief
 * 		read the list of running jobs sent by Mom in a
 *		HELLO3/4 message and validate them against their state known to the
 *		Server.  Message contains the following:
 * @par
 *		count of number of jobs which follows
 *		for each job
 *		   string - job id
 *		   int    - job substate
 *		   long   - run version (count)
 *		   int    - node id, 0 (for Mother Superior) to N-1 **
 *		   string - exec_vnode string **
 *
 *  		** - these values are not currently used for anything.
 * @see
 * 		is_request
 *
 * @param[in]	stream	- list of running jobs sent by Mom in a HELLO3/4 message
 *
 * @return	void
 */
void
mom_running_jobs(int stream)
{
	char *execvnod = NULL;
	char *jobid = NULL;
	unsigned njobs = 0;
	job *pjob = NULL;
	int rc = 0;
	int substate = 0;
	long runver = 0, runver_server = 0;
	int discarded = 0;
	mominfo_t *pmom = NULL;
	char mom_name[PBS_MAXHOSTNAME + 2] = "UNKNOWN";
	char exec_host_name[PBS_MAXHOSTNAME + 2] = "UNKNOWN2";
	char *slash_pos = NULL;
	int exec_host_hostlen = 0;

	njobs = disrui(stream, &rc); /* number of jobs in update */
	if (rc)
		return;

	while (njobs--) {
		runver_server = 0;
		discarded = 0;
		strcpy(mom_name, "_UNKNOWN_");
		strcpy(exec_host_name, "_UNKNOWN2_");
		execvnod = NULL;
		jobid = NULL;

		jobid = disrst(stream, &rc);
		if (rc)
			goto err;
		substate = disrsi(stream, &rc);
		if (rc)
			goto err;
		runver = disrsl(stream, &rc);
		if (rc)
			goto err;
		(void) disrsi(stream, &rc); /* sister is not currently used */
		if (rc)
			goto err;
		execvnod = disrst(stream, &rc);
		if (rc)
			goto err;

		DBPRT(("mom_running_jobs: %s substate: %d runver: %ld\n", jobid, substate, runver))
		if ((pjob = find_job(jobid)) == NULL) {
			/* job not found,  tell Mom to discard it */
			send_discard_job(stream, jobid, -1, "not known to Server");
			discarded = 1;
		}

		if (pjob && !discarded && (is_jattr_set(pjob, JOB_ATR_run_version)))
			runver_server = get_jattr_long(pjob, JOB_ATR_run_version);

		if (pjob && !discarded && (runver_server != runver)) {
			if (runver_server > 0) {
				/* different Version, discard it */
				send_discard_job(stream, jobid, runver, "has been run again");
				discarded = 1;
			} else {
				/* server had no clue about runver -- accept what MOM tells us if exec_host matches stream source */

				if ((pmom = tfind2((u_long) stream, 0, &streams)) != NULL && ((mom_svrinfo_t *) (pmom->mi_data))->msr_numvnds > 0)
					strncpy(mom_name, ((mom_svrinfo_t *) (pmom->mi_data))->msr_children[0]->nd_name, PBS_MAXHOSTNAME);
				if ((is_jattr_set(pjob, JOB_ATR_exec_host)) &&
				    (slash_pos = strchr(get_jattr_str(pjob, JOB_ATR_exec_host), '/')) != NULL) {
					exec_host_hostlen = slash_pos - get_jattr_str(pjob, JOB_ATR_exec_host);
					strncpy(exec_host_name, get_jattr_str(pjob, JOB_ATR_exec_host), exec_host_hostlen);
					exec_host_name[exec_host_hostlen] = '\0';
				}

				if (!strcmp(exec_host_name, mom_name)) {
					/* natural vnode of MOM at end of stream matches exec_host first entry */

					snprintf(log_buffer, sizeof(log_buffer), "run_version %ld for job recovered from MOM with vnode %s; exec_host %s", runver, mom_name, exec_host_name);
					log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ALERT, pjob->ji_qs.ji_jobid, log_buffer);

					set_jattr_l_slim(pjob, JOB_ATR_run_version, runver, SET);

					if (!(is_jattr_set(pjob, JOB_ATR_runcount)) || (get_jattr_long(pjob, JOB_ATR_runcount) <= 0)) {
						set_jattr_l_slim(pjob, JOB_ATR_runcount, runver, SET);
						/* update for resources used will save this to DB on later message from MOM, if it is indeed valid */
					}
				} else {
					/* wrong MOM, exec_host either empty or non-matching, discard job on MOM (and hope the correct MOM will come along) */

					snprintf(log_buffer, sizeof(log_buffer), "run_version recovery: exec_host %s != MOM name %s, discarding job on that MOM", exec_host_name, mom_name);
					log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ALERT, pjob->ji_qs.ji_jobid, log_buffer);

					send_discard_job(stream, jobid, -1, "MOM fails to match exec_host");
					discarded = 1;
				}
			}
		}

		if (pjob && !discarded && !check_job_substate(pjob, substate)) {

			/* Job substates disagree */

			if ((check_job_substate(pjob, JOB_SUBSTATE_SCHSUSP)) ||
			    (check_job_substate(pjob, JOB_SUBSTATE_SUSPEND))) {

				if (substate == JOB_SUBSTATE_RUNNING) {

					/* tell Mom to suspend job */
					(void) issue_signal(pjob, "SIG_SUSPEND", release_req, 0);
				}
			} else if (check_job_substate(pjob, JOB_SUBSTATE_RUNNING)) {
				if (substate == JOB_SUBSTATE_SUSPEND) {

					/* tell Mom to resume job */
					(void) issue_signal(pjob, "SIG_RESUME", release_req, 0);
				}

			} else if ((!check_job_state(pjob, JOB_STATE_LTR_EXITING)) &&
				   (!check_job_state(pjob, JOB_STATE_LTR_RUNNING))) {

				/* for any other disagreement of state except */
				/* in Exiting or RUNNING, discard job         */
				send_discard_job(stream, jobid, runver, "state mismatch");
				pjob->ji_discarding = 1;
			}

			/*
			 * calls to issue_signal would reset transport from TPP to TCP
			 * revert it back to TPP before continuing
			 */
			DIS_tpp_funcs();
		}

		/* all other cases - job left as is */

		free(jobid);
		jobid = NULL;
		free(execvnod);
		execvnod = NULL;
	}
	return;

err:
	snprintf(log_buffer, sizeof(log_buffer), "%s for %s", dis_emsg[rc],
		 "HELLO3/4");
	log_err(errno, "mom_running_jobs", log_buffer);
	free(jobid);
	free(execvnod);
}

/**
 * @brief
 * 		Input is coming from another server (MOM) over a TPP stream.
 *
 * @par
 *		Read the stream to get a Inter-Server request.
 *		Some error cases call stream_eof instead of tpp_close because
 *		a customer encountered a stream mixup (spid 183257) where a
 *		stream that should not have been found by tfind2 was found.
 *
 * @param[in] stream  - TPP stream on which the request is arriving
 * @param[in] version - Version of protocol, not to be changed lightly as it makes everything incompatable.
 *
 * @return none
 */
void
is_request(int stream, int version)
{
	int check_other_moms_time = 0;
	int command = 0;
	int command_orig = 0;
	int cr_node;
	int ret = DIS_SUCCESS;
	int i, j;
	u_Long l;
	int ivnd;
	char *jid = NULL;
	int made_new_vnodes;
	unsigned long hook_seq;
	char *hook_euser;
	job *pjob;
	unsigned long ipaddr;
	unsigned long port;
	struct sockaddr_in *addr;
	struct pbsnode *np = NULL;
	attribute *pala;
	resource_def *prd;
	resource *prc;
	mominfo_t *pmom;
	mom_svrinfo_t *psvrmom;
	dmn_info_t *pdmninfo;
	int s;
	char *val;
	unsigned long oldstate;
	vnl_t *vnlp; /* vnode list */
	static char node_up[] = "node up";
	pbs_list_head reported_hooks;
	hook *phook;
	char *hname = NULL;
	unsigned long hook_rescdef_checksum;
	unsigned long chksum_rescdef;
	static int reply_send_tm = 0;
	char *badconstr = "unset";

	CLEAR_HEAD(reported_hooks);
	DBPRT(("%s: stream %d version %d\n", __func__, stream, version))
	addr = tpp_getaddr(stream);
	if (version != IS_PROTOCOL_VER) {
		sprintf(log_buffer, "protocol version %d unknown from %s",
			version, netaddr(addr));
		log_err(-1, __func__, log_buffer);
		stream_eof(stream, 0, NULL);
		return;
	}
	if (addr == NULL) {
		sprintf(log_buffer, "Sender unknown");
		log_err(-1, __func__, log_buffer);
		stream_eof(stream, 0, NULL);
		return;
	}

	ipaddr = ntohl(addr->sin_addr.s_addr);

	command = disrsi(stream, &ret);
	if (ret != DIS_SUCCESS) {
		badconstr = "disrsi:command";
		goto badcon;
	}

	if (command == IS_HELLOSVR) {
		port = disrui(stream, &ret);
		if (ret != DIS_SUCCESS) {
			badconstr = "disrui:port";
			goto badcon;
		}

		DBPRT(("%s: IS_HELLOSVR addr: %s, port %lu\n", __func__, netaddr(addr), port))

		if ((pmom = tfind2(ipaddr, port, &ipaddrs)) == NULL) {
			badconstr = "tfind2:pmom";
			goto badcon;
		}

		log_eventf(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
			   LOG_NOTICE, pmom->mi_host, "Hello from MoM on port=%lu", port);

		psvrmom = (mom_svrinfo_t *) (pmom->mi_data);
		pdmninfo = pmom->mi_dmn_info;
		pdmninfo->dmn_state |= INUSE_UNKNOWN;
		if (pdmninfo->dmn_stream >= 0 && pdmninfo->dmn_stream != stream) {
			DBPRT(("%s: stream %d from %s:%d already open on %d\n",
			       __func__, stream, pmom->mi_host,
			       ntohs(addr->sin_port), pdmninfo->dmn_stream))
			tpp_close(pdmninfo->dmn_stream);
			tdelete2((u_long) pdmninfo->dmn_stream, 0ul, &streams);
		}

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
		if (psvrmom->msr_numjobs > 0)
			pdmninfo->dmn_state |= INUSE_NEED_CREDENTIALS;
#endif

		if (psvrmom->msr_vnode_pool != 0) {
			/*
			 * Mom has a pool, see if the pool has an
			 * inventory Mom already, if not make this Mom the one
			 */
			vnpool_mom_t *ppool;
			ppool = find_vnode_pool(pmom);
			if (ppool != NULL) {
				if (ppool->vnpm_inventory_mom == NULL) {
					ppool->vnpm_inventory_mom = pmom;
					psvrmom->msr_has_inventory = 1;
					sprintf(log_buffer,
						msg_new_inventory_mom,
						ppool->vnpm_vnode_pool,
						pmom->mi_host);
					log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER,
						  LOG_DEBUG, msg_daemonname, log_buffer);
				}
			}
		}

		/* we save this stream for future communications */
		pdmninfo->dmn_stream = stream;
		pdmninfo->dmn_state |= INUSE_INIT;
		pdmninfo->dmn_state &= ~INUSE_NEEDS_HELLOSVR;
		tinsert2((u_long) stream, 0ul, pmom, &streams);
		tpp_eom(stream);

		/* mcast reply togethor */
		if (psvrmom->msr_vnode_pool <= 0 || psvrmom->msr_has_inventory)
			mcast_add(pmom, &mtfd_replyhello, FALSE);
		else
			mcast_add(pmom, &mtfd_replyhello_noinv, FALSE);

		if (reply_send_tm <= time_now) {
			struct work_task *ptask;

			/* time to wait depends on the no of moms server knows */
			reply_send_tm = time_now + (mominfo_array_size > 1024 ? MCAST_WAIT_TM : 0);
			ptask = set_task(WORK_Timed, reply_send_tm, mcast_msg, NULL);
			ptask->wt_aux = IS_REPLYHELLO;
		}
		return;

	} else if ((pmom = tfind2((u_long) stream, 0, &streams)) != NULL)
		goto found;

badcon:
	sprintf(log_buffer, "bad attempt to connect from %s, reason=%s",
		netaddr(addr), badconstr);
	log_err(-1, __func__, log_buffer);
	stream_eof(stream, 0, NULL);
	return;

found:
	psvrmom = (mom_svrinfo_t *) (pmom->mi_data);
	pdmninfo = pmom->mi_dmn_info;
	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_SERVER, LOG_DEBUG, msg_daemonname, "Received request2: %d", command);

	switch (command) {

		case IS_CMD_REPLY:
			DBPRT(("%s: IS_CMD_REPLY\n", __func__))
			process_DreplyTPP(stream);
			break;

		case IS_REGISTERMOM:
			if (psvrmom->msr_wktask) { /* if task requeue jobs, delete it */
				delete_task(psvrmom->msr_wktask);
				psvrmom->msr_wktask = 0;
			}

			set_all_state(pmom, 0,
				      INUSE_UNKNOWN | INUSE_NEED_ADDRS | INUSE_SLEEP, NULL,
				      Set_All_State_Regardless);
			set_all_state(pmom, 1, INUSE_DOWN | INUSE_INIT, NULL,
				      Set_ALL_State_All_Down);
			if ((pdmninfo->dmn_state & INUSE_MARKEDDOWN) == 0)
				log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_NODE, LOG_INFO,
					  pmom->mi_host, "Setting host to Initialize");

			/* validate jobs Mom reported against what I have */
			mom_running_jobs(stream);
			/*
			 * respond to HELLO from Mom by sending her optional vmap and
			 * all addresses of all Moms
			 */
			command_orig = command;
			if (psvrmom->msr_vnode_pool <= 0 || psvrmom->msr_has_inventory)
				command = IS_UPDATE2;
			else
				command = IS_UPDATE;
			/* fall into IS_UPDATE */

		case IS_UPDATE:
		case IS_UPDATE2:

			if (psvrmom->msr_vnode_pool != 0) {
				sprintf(log_buffer, "POOL: IS_UPDATE%c received",
					(command == IS_UPDATE) ? ' ' : '2');
				log_event(PBSEVENT_DEBUG4, PBS_EVENTCLASS_NODE,
					  LOG_INFO, pmom->mi_host, log_buffer);
			}

			cr_node = 0;
			made_new_vnodes = 0;

			if (command == IS_UPDATE) {
				DBPRT(("%s: IS_UPDATE %s\n", __func__, pmom->mi_host))
			} else {
				DBPRT(("%s: IS_UPDATE2 %s\n", __func__, pmom->mi_host))
			}

			set_all_state(pmom, 0, INUSE_BUSY | INUSE_UNKNOWN, NULL,
				      Set_All_State_Regardless);

			s = disrui(stream, &ret); /* state bits, also used later */
			if (ret != DIS_SUCCESS)
				goto err;

			DBPRT(("state 0x%x ", s))
			if (s & INUSE_DOWN) {
				momptr_down(pmom, "by mom");
			} else if (s & INUSE_BUSY) {
				set_all_state(pmom, 1, INUSE_BUSY, NULL,
					      Set_All_State_Regardless);
			}

			i = disrui(stream, &ret); /* num of phy CPUs on system */
			if (ret != DIS_SUCCESS)
				goto err;

			/* physical cpus, set on the one vnode or the "special" */
			DBPRT(("phy ncpus %d ", i))
			psvrmom->msr_pcpus = i;
			if (psvrmom->msr_numvnds > 0) {
				np = psvrmom->msr_children[0]; /* the "one" */
				np->nd_ncpus = psvrmom->msr_pcpus;
				set_nattr_l_slim(np, ND_ATR_pcpus, psvrmom->msr_pcpus, SET);
			}

			i = disrui(stream, &ret); /* num of avail CPUs on host */
			if (ret != DIS_SUCCESS)
				goto err;

			DBPRT(("avail cpus %d ", i))
			psvrmom->msr_acpus = i;

			l = disrull(stream, &ret); /* memory (KB) on system */
			if (ret != DIS_SUCCESS)
				goto err;

			DBPRT(("mem %llukb ", l))

			psvrmom->msr_pmem = l;

			val = disrst(stream, &ret); /* arch of Mom's host */
			if (ret != DIS_SUCCESS)
				goto err;

			DBPRT(("arch %s ", val))
			free(psvrmom->msr_arch);
			psvrmom->msr_arch = val;

			if ((pdmninfo->dmn_state & INUSE_MARKEDDOWN) == 0) {
				sprintf(log_buffer, "update%c state:%d ncpus:%ld",
					command == IS_UPDATE ? ' ' : '2',
					s, psvrmom->msr_pcpus);
				log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
					  LOG_INFO, pmom->mi_host, log_buffer);
			}

			if (command == IS_UPDATE) {
				/* Only one vnode,  set resources_available    */
				/* for multiple vnodes, the info is in UPDATE2 */

				if (psvrmom->msr_numvnds != 0) {
					np = psvrmom->msr_children[0];

					/*
					 * Sharing attribute - three cases for at_flags:
					 * 1. If the sharing attribute was explicitly set via qmgr the flag will be
					 *    ATR_VFLAG_SET.
					 * 2. If set via a prior update2 message from Mom, the flags
					 *    would be ATR_VFLAG_SET | ATR_VFLAG_DEFLT.
					 * 3. If unset, flags would be zero.
					 *
					 * For case 2 and 3, but not for case 1, set or reset the sharing attribute
					 * to the default of "default_shared" on the natural vnode as it may have been
					 * changed via a prior UPDATE2 (multi-vnode) message but the vnodedef file
					 * has now been removed; hence this UPDATE message instead of UPDATE2.
					 */
					if (((get_nattr(np, ND_ATR_Sharing))->at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) != ATR_VFLAG_SET) {
						/* unset or ATR_VFLAG_DEFLT is set */
						set_nattr_l_slim(np, ND_ATR_Sharing, VNS_DFLT_SHARED, SET);
						(get_nattr(np, ND_ATR_Sharing))->at_flags |= ATR_VFLAG_DEFLT;
					}

					/* mark all vnodes under this Mom stale, then because    */
					/* this is non-vnoded update, un-stale the natural vnode */
					/* EXCEPT when the Mom is in a vnode_pool 		 */
					if (psvrmom->msr_vnode_pool <= 0) {
						set_all_state(pmom, 1, INUSE_STALE, NULL,
							      Set_All_State_Regardless);
						set_vnode_state(np, ~INUSE_STALE, Nd_State_And);
					}

					pala = get_nattr(np, ND_ATR_ResourceAvail);

					/* available cpus */
					i = psvrmom->msr_acpus;
					prd = &svr_resc_def[RESC_NCPUS];
					prc = find_resc_entry(pala, prd);
					if (prc == NULL)
						prc = add_resource_entry(pala, prd);
					if (((is_attr_set(&prc->rs_value)) == 0) ||
					    ((prc->rs_value.at_flags & ATR_VFLAG_DEFLT) != 0)) {
						mod_node_ncpus(np, i, ATR_ACTION_ALTER);
						prc->rs_value.at_val.at_long = i;
						prc->rs_value.at_flags |= (ATR_SET_MOD_MCACHE | ATR_VFLAG_DEFLT);
					}

					/* available memory */
					prd = &svr_resc_def[RESC_MEM];
					prc = find_resc_entry(pala, prd);
					if (prc == NULL)
						prc = add_resource_entry(pala, prd);
					if ((prc->rs_value.at_flags & ATR_VFLAG_DEFLT) ||
					    ((is_attr_set(&prc->rs_value)) == 0)) {
						/* set size in KB */
						prc->rs_value.at_val.at_size.atsv_num =
							psvrmom->msr_pmem;
						prc->rs_value.at_val.at_size.atsv_shift = 10;
						prc->rs_value.at_flags |= (ATR_SET_MOD_MCACHE | ATR_VFLAG_DEFLT);
					}
				}
			}

			/* UPDATE2 message - multiple vnoded system */
			if (command == IS_UPDATE2) {
				vnlp = vn_decode_DIS(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto err;
				if (vnlp == NULL) {
					sprintf(log_buffer, "vn_decode_DIS vn failed");
					log_err(-1, __func__, log_buffer);
				} else if (vnlp->vnl_modtime >= pmom->mi_modtime) {
					int i, j;

					if (vnlp->vnl_modtime > pmom->mi_modtime)
						cr_node = 1;

					/* set stale bit in state for all non sleeping vnodes, */
					/* it will be cleared for the vnodes     */
					/* listed in the update2 messsage	 */
					set_all_state(pmom, 1, INUSE_STALE, NULL,
						      Set_All_State_Regardless);

					pmom->mi_modtime = vnlp->vnl_modtime;
					sprintf(log_buffer, "Mom reporting %lu vnodes as of %s", vnlp->vnl_used, ctime((time_t *) &vnlp->vnl_modtime));
					*(log_buffer + strlen(log_buffer) - 1) = '\0';

					if ((pdmninfo->dmn_state & INUSE_MARKEDDOWN) == 0)
						log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, LOG_INFO, pmom->mi_host, log_buffer);
					/*
					 * If the vnode will have multiple
					 * parent Moms, set flag to cross check
					 * mod time against all parent Moms
					 */
					if (vnlp->vnl_used > 1)
						check_other_moms_time = 1;

					for (i = 0; i < vnlp->vnl_used; i++) {
						vnal_t *vnrlp;
						vnrlp = VNL_NODENUM(vnlp, i);
						/* create vnode */
						(void) update2_to_vnode(vnrlp, cr_node, pmom, &made_new_vnodes, 0);
						for (j = 0; j < vnrlp->vnal_used; j++) {
							vna_t *psrp;

							psrp = VNAL_NODENUM(vnrlp, j);
							if (strcasecmp(psrp->vna_name,
								       VNATTR_PNAMES) == 0) {
								snprintf(log_buffer,
									 sizeof(log_buffer),
									 "pnames %s", psrp->vna_val);
								log_event(PBSEVENT_SYSTEM,
									  PBS_EVENTCLASS_NODE,
									  LOG_INFO,
									  pmom->mi_host, log_buffer);

								setup_pnames(psrp->vna_val);
							}
						}
					}

					/* if multiple vnodes indicated (above) and
					 * if the vnodes (except the first) have
					 * multiple Moms,  update the map mod
					 * time on those Moms as well
					 */
					if (check_other_moms_time &&
					    (psvrmom->msr_numvnds > 1)) {
						if (psvrmom->msr_children[1]->nd_nummoms > 1) {
							j = psvrmom->msr_children[1]->nd_nummoms;
							for (i = 0; i < j; ++i) {
								psvrmom->msr_children[1]->nd_moms[i]->mi_modtime = vnlp->vnl_modtime;
							}
						}
					}
					if (made_new_vnodes || cr_node) {
						save_nodes_db(1, pmom); /* update the node database */
						propagate_licenses_to_vnodes(pmom);
					}
				}
				vnl_free(vnlp);
				vnlp = NULL;
			}

			/*read mom's pbs_version data if appended*/

			val = disrst(stream, &ret);
			if (ret == DIS_SUCCESS) {
				DBPRT(("mom's pbs_version %s ", val))
				free(psvrmom->msr_pbs_ver);
				psvrmom->msr_pbs_ver = val;
			} else if (ret == DIS_EOD) {
				/*found no appended version data*/
				free(psvrmom->msr_pbs_ver);
				psvrmom->msr_pbs_ver = strdup("unavailable");
			} else
				goto err;

			/* for either UPDATE or UPDATE2...		    */
			/* log which vnodes under that Mom are stale	    */
			/* set default resources for "arch" on all vnodes   */
			/* also set each vnodes' ATR_ResvEnable if need be  */
			/* Set ncpus and mem in resources_available on the  */
			/* natural vnode if they are not already set.       */

			for (ivnd = 0; ivnd < psvrmom->msr_numvnds; ++ivnd) {
				np = psvrmom->msr_children[ivnd];

				if (np->nd_state & INUSE_STALE) {
					/* vnode is stale */
					snprintf(log_buffer, sizeof(log_buffer),
						 "vnode %s is stale", np->nd_name);
					log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
						  LOG_INFO, pmom->mi_host, log_buffer);
				}

				pala = get_nattr(np, ND_ATR_ResourceAvail);

				prd = &svr_resc_def[RESC_ARCH];
				prc = find_resc_entry(pala, prd);
				if (prc == NULL)
					prc = add_resource_entry(pala, prd);
				if (!is_attr_set(&prc->rs_value)) {
					if (is_attr_set(&prc->rs_value))
						free(prc->rs_value.at_val.at_str);
					prc->rs_value.at_val.at_str = strdup(psvrmom->msr_arch);
					prc->rs_value.at_flags |= (ATR_SET_MOD_MCACHE | ATR_VFLAG_DEFLT);
				}

				/*
				 * make sure resources_available.[ncpus,mem] are set
				 * on the "natural" (first vnode).  Use value from
				 * the Mom.
				 */
				if (ivnd == 0) {
					/* the first = natural vnode */
					prd = &svr_resc_def[RESC_NCPUS];
					prc = find_resc_entry(pala, prd);
					if (prc == NULL)
						prc = add_resource_entry(pala, prd);
					if (prc &&
					    ((is_attr_set(&prc->rs_value)) == 0)) {
						prc->rs_value.at_val.at_long = psvrmom->msr_acpus;
						prc->rs_value.at_flags |= (ATR_SET_MOD_MCACHE | ATR_VFLAG_DEFLT);
					}
					prd = &svr_resc_def[RESC_MEM];
					prc = find_resc_entry(pala, prd);
					if (prc == NULL)
						prc = add_resource_entry(pala, prd);
					if (prc &&
					    ((is_attr_set(&prc->rs_value)) == 0)) {
						prc->rs_value.at_val.at_size.atsv_num =
							psvrmom->msr_pmem;
						prc->rs_value.at_val.at_size.atsv_shift = 10;
						prc->rs_value.at_flags |= (ATR_SET_MOD_MCACHE | ATR_VFLAG_DEFLT);
					}
				}

				/*
				 * is resv_enable attribute in manual/automatic mode?
				 *
				 * Automatic mode is implemented by utilizing the
				 * ATR_VFLAG_DEFLT bit
				 * The table which follows enumerates the cases:
				 *
				 * Manual mode if:
				 * (ATR_VFLAG_SET & at_flags)==1  &&
				 * (ATR_VFLAG_DEFLT & at_flags)==0
				 *
				 * Automatic mode if:
				 * (ATR_VFLAG_SET & at_flags)==1  &&
				 * (ATR_VFLAG_DEFLT & at_flags)==1
				 * (ATR_VFLAG_SET & at_flags)==0  &&
				 * (ATR_VFLAG_DEFLT & at_flags)==1
				 * (ATR_VFLAG_SET & at_flags)==0  &&
				 * (ATR_VFLAG_DEFLT & at_flags)==0
				 *
				 * The later two forms of automatic mode transition to
				 * the first form listed.  Doing it this way provides a
				 * means by which the operator can go in to manual mode
				 * but still have a * way to revert back to automatic
				 * mode if needed.
				 */

				if (!((get_nattr(np, ND_ATR_ResvEnable))->at_flags & ATR_VFLAG_SET) ||
				    ((get_nattr(np, ND_ATR_ResvEnable))->at_flags & ATR_VFLAG_DEFLT)) {

					int change = 0;

					/*
					 * attribute resv_enable is in automatic mode
					 * does mom config file show mom configured for
					 * cycle harvesting?
					 */
					if (s & MOM_STATE_CONF_HARVEST) {
						if (get_nattr_long(np, ND_ATR_ResvEnable)) {
							set_nattr_l_slim(np, ND_ATR_ResvEnable, 0, SET);
							change = 1;
						}
					} else {
						if (!get_nattr_long(np, ND_ATR_ResvEnable)) {
							set_nattr_l_slim(np, ND_ATR_ResvEnable, 1, SET);
							change = 1;
						}
					}

					if (change || !((get_nattr(np, ND_ATR_ResvEnable))->at_flags & ATR_VFLAG_SET) || !((get_nattr(np, ND_ATR_ResvEnable))->at_flags & ATR_VFLAG_DEFLT))
						(get_nattr(np, ND_ATR_ResvEnable))->at_flags |= ATR_VFLAG_DEFLT;
				}

				if (psvrmom->msr_pbs_ver != NULL) {

					if (is_nattr_set(np, ND_ATR_version) == 0 || strcmp(psvrmom->msr_pbs_ver, get_nattr_str(np, ND_ATR_version)) != 0) {
						free_nattr(np, ND_ATR_version);
						if (!set_nattr_str_slim(np, ND_ATR_version, psvrmom->msr_pbs_ver, NULL))
							np->nd_modified = 1;
					}
				}
			}

			if (made_new_vnodes || cr_node)
				save_nodes_db(1, pmom); /* update the node database */

			if (command_orig == IS_REGISTERMOM) {
				/* Mom is acknowledging the info sent by the Server */
				/* Mark the Mom and associated vnodes as up */
				oldstate = pdmninfo->dmn_state;
				if (pdmninfo->dmn_state & INUSE_MARKEDDOWN)
					pdmninfo->dmn_state &= ~INUSE_MARKEDDOWN;

				set_all_state(pmom, 0, INUSE_DOWN | INUSE_INIT,
					      NULL, Set_All_State_Regardless);

				/* log a node up message only if it was not marked
			 * as "markeddown" by TPP layer due to broken connection
			 * to pbs_comm router
			 */
				if ((oldstate & INUSE_MARKEDDOWN) == 0) {
					log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
						  LOG_NOTICE, pmom->mi_host, node_up);
				}
				psvrmom->msr_timedown = 0;

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
				if (pdmninfo->dmn_state & INUSE_NEED_CREDENTIALS) {
					log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_NODE,
						  LOG_INFO, pmom->mi_host, "mom needs credentials");

					for (i = 0; i < psvrmom->msr_numjobs; i++) {
						if (psvrmom->msr_jobindx[i])
							set_task(WORK_Immed, 0, svr_renew_job_cred, psvrmom->msr_jobindx[i]->ji_qs.ji_jobid);
					}

					pdmninfo->dmn_state &= ~INUSE_NEED_CREDENTIALS;
				}
#endif
			}
			break;

		case IS_RESCUSED:
		case IS_RESCUSED_FROM_HOOK:

			if (command == IS_RESCUSED) {
				DBPRT(("%s: IS_RESCUSED\n", __func__))
			} else {
				DBPRT(("%s: IS_RESCUSED_FROM_HOOK\n", __func__))
			}

			stat_update(stream);
			break;

		case IS_JOBOBIT:
			DBPRT(("%s: IS_JOBOBIT\n", __func__))
			recv_job_obit(stream);
			break;

		case IS_IDLE:
			DBPRT(("%s: IS_IDLE\n", __func__))
			recv_wk_job_idle(stream);
			break;

		case IS_DISCARD_DONE:
			/* Mom is acknowledging a IS_DISCARD_JOB request    */
			/* Mark her entry in the discard structure complete */

			jid = disrst(stream, &ret); /* job id */
			if (ret != DIS_SUCCESS)
				goto err;
			j = disrsi(stream, &ret); /* run (hop) count */
			if (ret != DIS_SUCCESS)
				goto err;
			sprintf(log_buffer, "Discard done for job %s", jid);
			log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_NODE, LOG_DEBUG,
				  pmom->mi_host, log_buffer);
			DBPRT(("%s: Mom %s %s (%d)\n", __func__, pmom->mi_host, log_buffer, j))
			pjob = find_job(jid);
			if (pjob &&
			    (get_jattr_long(pjob, JOB_ATR_run_version) == j)) {
				post_discard_job(pjob, pmom, JDCD_REPLIED);
			}
			free(jid);
			jid = NULL;
			break;

		case IS_HOOK_JOB_ACTION: {
			int *replies_seq = NULL;
			int replies_count = 0;
			int acts_count = 0;

			acts_count = i = disrsi(stream, &ret); /* number of actions in request */
			if (ret != DIS_SUCCESS)
				goto err;
			if ((replies_seq = (int *) malloc(sizeof(int) * i)) == NULL)
				goto err;
			while (i--) {
				int runct;
				int hact;
				int hook_seq;

				/* job id */
				jid = disrst(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto hook_act_reply;
				/* hook action sequence number for acknowledgement */
				hook_seq = disrul(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto hook_act_reply;
				/* run count of job to verify that job hasn't changed */
				runct = disrsi(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto hook_act_reply;
				/* action: delete or requeue */
				hact = disrsi(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto hook_act_reply;
				/* user requesting action, not currently used */
				(void) disrui(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto hook_act_reply;

				if (((pjob = find_job(jid)) != NULL) &&
				    (check_job_state(pjob, JOB_STATE_LTR_RUNNING) ||
				     check_job_state(pjob, JOB_STATE_LTR_EXITING)) &&
				    (get_jattr_long(pjob, JOB_ATR_run_version) == runct)) {
					/* set the Exit_status job attribute */
					/* to be later checked in job_obit() */
					if (hact == JOB_ACT_REQ_REQUEUE) {
						set_jattr_l_slim(pjob, JOB_ATR_exit_status, JOB_EXEC_HOOK_RERUN, SET);
						log_eventf(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, LOG_INFO, pmom->mi_host,
							   "hook request rerun %s", jid);
					} else if (hact == JOB_ACT_REQ_DELETE) {
						set_jattr_l_slim(pjob, JOB_ATR_exit_status, JOB_EXEC_HOOK_DELETE, SET);
						log_eventf(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE, LOG_INFO, pmom->mi_host,
							   "hook request delete %s", jid);
					} else if (hact == JOB_ACT_REQ_DEALLOCATE) {

						/* decrement everything found in exec_vnode/exec_vnode_deallocated  */
						if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_Suspend) == 0) {
							/* don't update resources_assigned if job is suspended */
							set_resc_assigned((void *) pjob, 0, DECR);
						}

						deallocate_job(pmom, pjob);

						/* increment everything found in new exec_vnode/exec_vnode_deallocated  */
						if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_Suspend) == 0) {
							/* don't update resources_assigned if job is suspended */
							set_resc_assigned((void *) pjob, 0, INCR);
						}
					}
				}
				free(jid);
				jid = NULL;
				replies_seq[replies_count++] = hook_seq;
			}
		hook_act_reply:
			if (replies_count > 0) {
				if (is_compose(stream, IS_HOOK_ACTION_ACK) != DIS_SUCCESS)
					goto err;
				if (ret != DIS_SUCCESS)
					goto err;
				ret = diswsi(stream, IS_HOOK_JOB_ACTION);
				if (ret != DIS_SUCCESS)
					goto err;
				ret = diswsi(stream, replies_count);
				if (ret != DIS_SUCCESS)
					goto err;
				for (i = 0; i < replies_count; i++) {
					ret = diswul(stream, replies_seq[i]);
					if (ret != DIS_SUCCESS)
						goto err;
				}
				ret = dis_flush(stream);
				if (ret != DIS_SUCCESS) {
					ret = DIS_NOCOMMIT;
					goto err;
				}
			}

			if (replies_count != acts_count)
				goto err;
		} break;

		case IS_HOOK_SCHEDULER_RESTART_CYCLE:
			hook_euser = disrst(stream, &ret);
			if (ret != DIS_SUCCESS)
				goto err;
			if (*hook_euser != '\0') {
				if ((svr_get_privilege(hook_euser, pmom->mi_host) &
				     (ATR_DFLAG_MGWR | ATR_DFLAG_OPWR)) == 0) {
					snprintf(log_buffer, sizeof(log_buffer),
						 hook_privilege, hook_euser,
						 pmom->mi_host);
					log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_NODE,
						  LOG_INFO, pmom->mi_host, log_buffer);
					free(hook_euser);
					hook_euser = NULL;
					break;
				}
			}
			free(hook_euser);
			hook_euser = NULL;
			set_scheduler_flag(SCH_SCHEDULE_RESTART_CYCLE, dflt_scheduler);
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_NODE,
				  LOG_INFO, pmom->mi_host,
				  "requested for scheduler to restart cycle");
			break;

		case IS_UPDATE_FROM_HOOK:
		case IS_UPDATE_FROM_HOOK2:
			hook_seq = disrul(stream, &ret);
			if (ret != DIS_SUCCESS)
				goto err;

			/* hook_euser is not currently used, plan on using it  */
			/* instead of VNATTR_HOOK_REQUESTOR in the future      */
			/* its here to prevent need of changing protocol later */
			hook_euser = disrst(stream, &ret);
			if (ret != DIS_SUCCESS)
				goto err;
			free(hook_euser);
			hook_euser = NULL;

			vnlp = vn_decode_DIS(stream, &ret);
			if (ret != DIS_SUCCESS)
				goto err;

			if (vnlp == NULL) {
				sprintf(log_buffer, "vn_decode_DIS vn failed");
				log_err(-1, __func__, log_buffer);
				goto err;
			}

			cr_node = 0;
			/* is_update2 changes (from vnodedef files) are sent at the same time */
			/* as is_update_from_hook changes, so they'll have the same vnlp timestamp. */
			/* is_update2 also records the received vnlp's vnl_modtime in pmom->mi_modtime. */
			if (vnlp->vnl_modtime >= pmom->mi_modtime)
				cr_node = 1;
			for (i = 0; i < vnlp->vnl_used; i++) {
				vnal_t *vnrlp;
				vnrlp = VNL_NODENUM(vnlp, i);
				/* update vnode */
				made_new_vnodes = 0;
				if (update2_to_vnode(vnrlp, cr_node, pmom, &made_new_vnodes, (command == IS_UPDATE_FROM_HOOK2) ? 2 : 1) == PBSE_PERM) {
					break; /* encountered a bad permission */
				}
			}
			vnl_free(vnlp);
			vnlp = NULL;

			/* tell Mom we got this one, reply with the type of */
			/* action requested and the sequence number         */

			if (is_compose(stream, IS_HOOK_ACTION_ACK) != DIS_SUCCESS)
				goto err;

			if (ret != DIS_SUCCESS)
				goto err;
			ret = diswsi(stream, IS_UPDATE_FROM_HOOK);
			if (ret != DIS_SUCCESS)
				goto err;
			ret = diswsi(stream, 1);
			if (ret != DIS_SUCCESS)
				goto err;
			ret = diswul(stream, hook_seq);
			if (ret != DIS_SUCCESS)
				goto err;
			ret = dis_flush(stream);
			if (ret != DIS_SUCCESS) {
				ret = DIS_NOCOMMIT;
				goto err;
			}
			if (made_new_vnodes || cr_node) {
				save_nodes_db(1, pmom); /* update the node database */
				propagate_licenses_to_vnodes(pmom);
			}
			break;

		case IS_HOOK_CHECKSUMS:
			CLEAR_HEAD(reported_hooks);
			i = disrsi(stream, &ret); /* number of hooks to report */
			if (ret != DIS_SUCCESS)
				goto err;

			while (i--) {
				unsigned long chksum_hk;
				unsigned long chksum_py;
				unsigned long chksum_cf;
				unsigned int haction;

				haction = 0;
				/* hook name */
				hname = disrst(stream, &ret);
				if ((ret != DIS_SUCCESS) || (hname == NULL))
					goto err;

				/* hook control file checksum */
				chksum_hk = disrul(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto err;

				/* hook script checksum */
				chksum_py = disrul(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto err;

				/* hook config file checksum */
				chksum_cf = disrul(stream, &ret);
				if (ret != DIS_SUCCESS)
					goto err;

				phook = find_hook(hname);
				if ((phook == NULL) ||
				    ((phook->event & MOM_EVENTS) == 0)) {
					/* mom has a hook that the server */
					/* does not  know about. tell mom */
					/* to delete that hook */
					snprintf(log_buffer,
						 sizeof(log_buffer),
						 "encountered a mom (%s) hook %s "
						 "that the server does not know "
						 "about! Telling mom to delete",
						 pmom->mi_host, hname);
					log_event(PBSEVENT_DEBUG3,
						  PBS_EVENTCLASS_HOOK,
						  LOG_ERR, hname,
						  log_buffer);
					add_pending_mom_hook_action(pmom,
								    hname, MOM_HOOK_ACTION_DELETE);
					free(hname);
					continue;
				}

				if ((phook->hook_control_checksum > 0) &&
				    (phook->hook_control_checksum != chksum_hk)) {

					snprintf(log_buffer,
						 sizeof(log_buffer),
						 "hook control file "
						 "mismatched checksums: server: "
						 "%lu mom (%s): %lu...resending",
						 phook->hook_control_checksum,
						 pmom->mi_host, chksum_hk);
					log_event(PBSEVENT_DEBUG3,
						  PBS_EVENTCLASS_HOOK,
						  LOG_ERR, phook->hook_name,
						  log_buffer);
					haction |= MOM_HOOK_ACTION_SEND_ATTRS;
				}

				if ((phook->hook_script_checksum > 0) &&
				    (phook->hook_script_checksum != chksum_py)) {

					snprintf(log_buffer,
						 sizeof(log_buffer),
						 "hook script "
						 "mismatched checksums: server: "
						 "%lu mom (%s): %lu...resending",
						 phook->hook_script_checksum,
						 pmom->mi_host, chksum_py);
					log_event(PBSEVENT_DEBUG3,
						  PBS_EVENTCLASS_HOOK,
						  LOG_ERR, phook->hook_name,
						  log_buffer);
					haction |= MOM_HOOK_ACTION_SEND_SCRIPT;
				}

				if ((phook->hook_config_checksum > 0) &&
				    (phook->hook_config_checksum != chksum_cf)) {

					snprintf(log_buffer,
						 sizeof(log_buffer),
						 "hook config file "
						 "mismatched checksums: server: "
						 "%lu mom (%s): %lu...resending",
						 phook->hook_config_checksum,
						 pmom->mi_host, chksum_cf);
					log_event(PBSEVENT_DEBUG3,
						  PBS_EVENTCLASS_HOOK,
						  LOG_ERR, phook->hook_name,
						  log_buffer);
					haction |= MOM_HOOK_ACTION_SEND_CONFIG;
				}

				if (haction != 0) {
					add_pending_mom_hook_action(pmom,
								    hname, haction);
				}

				if (add_to_svrattrl_list(&reported_hooks, hname,
							 NULL, NULL, 0, NULL) == -1) {
					log_event(PBSEVENT_DEBUG3,
						  PBS_EVENTCLASS_HOOK,
						  LOG_INFO, hname,
						  "failed to add to reported "
						  "hooks list");
				}

				free(hname);
			}

			/* hook resourcedef checksum */
			chksum_rescdef = disrul(stream, &ret);
			if (ret != DIS_SUCCESS)
				goto err;

			hook_rescdef_checksum = get_hook_rescdef_checksum();
			if ((hook_rescdef_checksum > 0) &&
			    (hook_rescdef_checksum != chksum_rescdef)) {

				snprintf(log_buffer,
					 sizeof(log_buffer),
					 "hook resourcedef file "
					 "mismatched checksums: server: "
					 "%lu mom %s: %lu...resending",
					 hook_rescdef_checksum, pmom->mi_host,
					 chksum_rescdef);
				log_event(PBSEVENT_DEBUG3,
					  PBS_EVENTCLASS_HOOK,
					  LOG_ERR, PBS_RESCDEF,
					  log_buffer);
				add_pending_mom_hook_action(pmom,
							    PBS_RESCDEF,
							    MOM_HOOK_ACTION_SEND_RESCDEF);
			}

			/* Look for mom hooks known to the server that are */
			/* not known to the mom sending the request. */
			phook = (hook *) GET_NEXT(svr_allhooks);
			while (phook) {
				if (phook->hook_name &&
				    !phook->pending_delete &&
				    (phook->event & MOM_EVENTS) &&
				    (find_svrattrl_list_entry(&reported_hooks,
							      phook->hook_name, NULL) == NULL)) {
					add_pending_mom_hook_action(pmom,
								    phook->hook_name,
								    MOM_HOOK_ACTION_SEND_ATTRS | MOM_HOOK_ACTION_SEND_SCRIPT | MOM_HOOK_ACTION_SEND_CONFIG);
				}
				phook = (hook *) GET_NEXT(phook->hi_allhooks);
			}

			free_attrlist(&reported_hooks);
			np = psvrmom->msr_children[0];
			if (np->nd_state & INUSE_PROV) {
				DBPRT(("%s: calling [is_vnode_prov_done] from is_request\n", __func__))
				is_vnode_prov_done(np->nd_name);
			}

			break;

		case IS_CMD:
			DBPRT(("%s: IS_CMD\n", __func__))
			process_IS_CMD(stream);
			break;
	}

	tpp_eom(stream);
	return;

err:
	/*
	 ** We come here if we got a DIS write error.
	 */
	DBPRT(("\nINTERNAL or DIS i/o error\n"))
	snprintf(log_buffer, sizeof(log_buffer), "%s from %s(%s)",
		 dis_emsg[ret], pmom->mi_host, netaddr(addr));
	log_err(-1, __func__, log_buffer);
	free(jid);
	jid = NULL;
	free(hname);
	hname = NULL;
	free_attrlist(&reported_hooks);

	stream_eof(stream, ret, "write_err");

	return;
}

/**
 * @brief
 * 		free list of prop structures created by proplist()
 *
 * @param[in,out]	prop	- head to list of prop structures which needs to e freed
 *
 * @return	void
 */

static void
free_prop(struct prop *prop)
{
	struct prop *pp;

	for (pp = prop; pp; pp = prop) {
		prop = pp->next;
		free(pp->name);
		pp->name = NULL;
		free(pp);
	}
}

/**
 * @brief
 * 		Parse a number in a spec.
 *
 * @param[in]	ptr	- The string being parsed
 * @param[out]	num	- The number parsed
 * @param[in]	znotok	- (zero not ok) set true means a zero value is an error
 *
 *@return	int
 * @retval	0	- if okay
 * @retval  1	- if no number exists
 * @retval -1	- on error
 */
static int
number(char **ptr, int *num, int znotok)
{
	char holder[80];
	int i = 0;
	char *str = *ptr;

	while (isdigit(*str))
		holder[i++] = *str++;

	if (i == 0)
		return 1;
	if (isalpha((int) *str))
		return 1; /* cannot have digit followed by letter */

	holder[i] = '\0';
	if (((i = atoi(holder)) == 0) && znotok) {
		sprintf(log_buffer, "zero illegal");
		return -1;
	}

	*ptr = str;
	*num = i;
	return 0;
}

/**
 * @brief
 * 		Check string to see if it is a legal property name.
 *
 * @param[in]	ptr	- The string being parsed
 * @param[out]	prop	- set to static char array containing the property
 *
 * @see
 * 		proplist and ctcpus
 *
 * @return	int
 * @retval	0	- if string is a legal property name
 * @retval	1	- if string is not a legal property name
 *
 * @par MT-safe: No
 */
static int
property(char **ptr, char **prop)
{
	static char name[80];
	char *str = *ptr;
	int i = 0;

	if (!isalnum(*str)) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "first character of property (%s) not alphanum", str);
		return 1;
	}

	while (isalnum(*str) || *str == '-' || *str == '_' || *str == '.' || *str == '=')
		name[i++] = *str++;

	name[i] = '\0';
	*prop = (i == 0) ? NULL : name;

	/* skip over "/vp_number" */
	if (*str == '/') {
		do {
			str++;
		} while (isdigit(*str));
	}
	*ptr = str;
	return 0;
}

/**
 * @brief
 * 		Create a property list from a string.
 *
 * @param[in]	str	- The string being parsed
 * @param[out]	prop	- set to static char array containing the property
 * @param[out]	node_req	- node request
 *
 * @return	int
 * @retval 0 on success
 * @retval 1 on failure.
 */
static int
proplist(char **str, struct prop **plist, struct node_req *node_req)
{
	struct prop *pp;
	char *pname;
	char *pequal;

	node_req->nr_ppn = 1; /* default to 1 process per node */
	node_req->nr_cpp = 1; /* default to 1 cpu per process */
	node_req->nr_np = 1;  /* default to 1 total cpus */

	for (;;) {
		if (property(str, &pname))
			return 1;
		if (pname == NULL)
			break;

		/* special property */
		if ((pequal = strchr(pname, (int) '=')) != NULL) {

			/* identify the special property and place its value */
			/* into node_req 					 */
			*pequal = '\0';
			if (strcasecmp(pname, "ppn") == 0) {
				/* Processes (tasks) per Node */
				pequal++;
				if ((number(&pequal, &node_req->nr_ppn, 1) != 0) ||
				    (*pequal != '\0'))
					return 1;
				node_req->nr_np = node_req->nr_ppn * node_req->nr_cpp;
			} else if ((strcasecmp(pname, "cpp") == 0) ||
				   (strcasecmp(pname, "ncpus") == 0)) {
				/* CPUs (threads) per Process (task) */
				pequal++;
				if ((number(&pequal, &node_req->nr_cpp, 0) != 0) ||
				    (*pequal != '\0'))
					return 1;
				node_req->nr_np = node_req->nr_ppn * node_req->nr_cpp;
			} else {
				return 1; /* not recognized - error */
			}
		} else {
			pp = (struct prop *) malloc(sizeof(struct prop));
			if (pp == NULL)
				return 1; /* no mem */
			pp->mark = 1;
			if ((pp->name = strdup(pname)) == NULL) {
				free(pp);
				return 1;
			}
			pp->next = *plist;
			*plist = pp;
		}
		if (**str != ':')
			break;
		(*str)++;
	}
	return 0;
}

/**
 * @brief
 * 		Do a quick validation of the nodespec
 * @see
 * 		set_node_ct
 *
 * @param[in]	str	- nodespec string to be parsed
 *
 * @return	int
 * @retval	0	- success
 * @retval	>0	- failure
 */
int
validate_nodespec(char *str)
{
	int i;
	int num = 1;		  /*default: a request for 1 node*/
	struct prop *prop = NULL; /*assume sub-spec calls out no proper */
	struct node_req node_req;
	/* first quickly validate the node spec */

	if (str == NULL)
		return PBSE_BADNODESPEC;

	while (*str) {

		free_prop(prop);
		prop = NULL; /* this is a must */

		/*Determine how many nodes this subspec requests*/

		if ((i = number(&str, &num, 1)) == -1)
			return PBSE_BADNODESPEC;

		/*Determine properties the node must have and how many processors*/

		if (i == 0) {		   /* subspec specified a number */
			if (*str == ':') { /* subspec is specifying properties */
				(str)++;
				if (proplist(&str, &prop, &node_req)) {
					free_prop(prop);
					return PBSE_BADNODESPEC;
				}
			}
		} else { /* subspec doesn't specify a number */
			if (proplist(&str, &prop, &node_req)) {
				free_prop(prop);
				return PBSE_BADNODESPEC; /* err in gen of prop list */
			}
		}

		if (*str == '+')
			++str;
		else if (*str == '#')
			break;
		else if (*str != '\0')
			return PBSE_BADNODESPEC;
	}
	free_prop(prop);
	prop = NULL; /* this is a must */
	return 0;
}

#define GLOB_SZ 511
/**
 * @brief
 * 		Add the "global" spec to every sub-spec in "spec".
 *
 * @param[in,out]	spec	- spec to which "global" spec needs to be added
 * @param[in]	global	- which will be copied into every sub-spec in "spec".
 *
 * @return a malloc-ed copy of the newly modified string.
 * @retval	NULL	- error
 *
 * @par MT-safe: No
 */
static char *
mod_spec(char *spec, char *global)
{
	static char *line = NULL;
	static int line_len = 0;
	char *cp;
	int i;
	int glen;
	int len;

	if (line_len == 0) {
		line = (char *) malloc(GLOB_SZ + 1);
		if (line == NULL)
			return NULL;
		line_len = GLOB_SZ;
	}

	/* count number of times the global will be inserted into line */
	i = 1;
	glen = strlen(global);
	cp = spec;
	while ((cp = strchr(cp, (int) '+')) != NULL) {
		i++;
		cp++;
	}
	len = strlen(spec) + (i * (glen + 1)) + 1;
	if (len > line_len) {
		/* need to expand line */
		cp = realloc(line, (size_t) len);
		if (cp == NULL)
			return NULL;
		line = cp;
		line_len = len;
	}

	/* now copy spec into line appending ":global" at the end of */
	/* segment seperated by a "+"				     */

	cp = line;
	while (*spec) {
		if (*spec == '+') {
			*cp++ = ':';
			strcpy(cp, global);
			cp += glen;
		}
		*cp++ = *spec++;
	}
	*cp++ = ':';
	strcpy(cp, global);

	return (strdup(line));
}

/**
 * @brief
 * 		convert an existing nodespec to the "matching" select directive
 *
 * @param[in]	str	- node string
 * @param[in,out]	cvt_bp	- is a pointer to the current buffer
 * @param[in,out]	cvt_lenp	- is a pointer to the current buffer's length
 * @param[in]	pattr	- a list headed in an attribute that points to the specified resource_def structure
 *
 * @return	int
 * @retval	0	- success
 * @retval	>0	- pbs error
 * @retval	-1	- modifiers does not exist in "nodes specification"
 *
 * @par MT-safe: No
 */
int
cvt_nodespec_to_select(char *str, char **cvt_bp, size_t *cvt_lenp, attribute *pattr)
{
	int hcpp = 0;
	int hmem = 0;
	char *globs;
	int i;
	u_Long memamt = 0;
	int nt;
	char *nspec;
	int num = 1; /*default: a request for 1 node*/
	struct node_req node_req;
	char *pcvt;
	size_t pcvt_free;
	resource *pncpus;
	resource *pmem;
	struct prop *prop = NULL; /*assume sub-spec calls out no proper */
	int ret = -1;		  /*assume error occurs*/
	struct prop *walkprop;
	char sprintf_buf[BUFSIZ];
	resource_def *pncpusdef = NULL;
	resource_def *pmemdef = NULL;

	**cvt_bp = '\0';
	pcvt = *cvt_bp;
	pcvt_free = *cvt_lenp;

	pncpusdef = &svr_resc_def[RESC_NCPUS];
	pmemdef = &svr_resc_def[RESC_MEM];

	/*
	 * check the local copy of the "nodes" specification for any "global"
	 * modifiers.  Re-write the spec copy in expanded form if modifiers
	 * exist.  Ignore #excl and #shared as they are examined when
	 * creating the "place" directive.
	 */

	nspec = strdup(str);
	if (nspec == NULL)
		return (PBSE_SYSTEM);

	if ((globs = strchr(nspec, '#')) != NULL) {
		char *cp;
		char *hold;
		static char *excl = "excl";
		static char *shared = "shared";

		*globs++ = '\0';
		globs = strdup(globs);
		if (globs == NULL) {
			free(nspec);
			return (PBSE_SYSTEM);
		}
		while ((cp = strrchr(globs, '#')) != NULL) {
			*cp++ = '\0';
			if ((strcasecmp(cp, excl) != 0) &&
			    (strcasecmp(cp, shared) != 0)) {
				hold = mod_spec(nspec, cp);
				if (hold == NULL) {
					free(globs);
					free(nspec);
					return -1;
				}
				free(nspec);
				nspec = hold;
			}
		}
		if ((strcasecmp(globs, excl) != 0) &&
		    (strcasecmp(globs, shared) != 0)) {
			hold = mod_spec(nspec, globs);
			if (hold == NULL) {
				free(globs);
				free(nspec);
				return -1;
			}
			free(nspec);
			nspec = hold;
		}
		free(globs);
		globs = NULL;
	}
	str = nspec; /* work on the copy of the string */

	/* find the number of cpus specified in the node string */

	nt = ctcpus(str, &hcpp); /* total number of cpus requested in str */

	/* Is "ncpus" set as a separate resource? */

	if ((pncpus = find_resc_entry(pattr, pncpusdef)) == NULL) {
		if ((pncpus = add_resource_entry(pattr, pncpusdef)) == 0) {
			free(nspec);
			return (PBSE_SYSTEM);
		}
	}

	if ((pncpus->rs_value.at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) ==
	    ATR_VFLAG_SET) {

		long nc;

		/* ncpus is already set and not a default */

		nc = pncpus->rs_value.at_val.at_long;
		if (hcpp && (nt != pncpus->rs_value.at_val.at_long)) {
			/* if cpp string specificed, this is an error */
			free(nspec);
			return (PBSE_BADATVAL);
		} else if ((nc % nt) != 0) {
			/* ncpus must be multiple of number of tasks */
			free(nspec);
			return (PBSE_BADATVAL);
		} else if ((hcpp == 0) && ((nc / nt) > 1)) {
			/* append ncpus=(C/T) to each chunk */
			nt = nc / nt;
		} else
			nt = 1;

	} else
		nt = 1;

	/* How about "mem", is it set in the Resource_List */

	pmem = find_resc_entry(pattr, pmemdef);
	if (pmem &&
	    (pmem->rs_value.at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) ==
		    ATR_VFLAG_SET) {
		hmem = 1;
		memamt = get_kilobytes_from_attr(&pmem->rs_value) / ctnodes(str);
	}

	while (*str) {
		size_t needed;

		node_req.nr_ppn = 1;
		node_req.nr_cpp = 1;
		node_req.nr_np = 1;

		free_prop(prop);
		prop = NULL; /* this is a must */

		/*Determine how many nodes this subspec requests*/

		if ((i = number(&str, &num, 1)) == -1) {
			free(nspec);
			free_prop(prop);
			return ret;
		}

		/*Determine properties the node must have and how many processors*/

		if (i == 0) {		   /* subspec specified a number */
			if (*str == ':') { /* subspec is specifying properties */
				str++;
				if (proplist(&str, &prop, &node_req)) {
					free(nspec);
					free_prop(prop);
					return ret;
				}
			}
		} else { /* subspec doesn't specify a number */
			if (proplist(&str, &prop, &node_req)) {
				free(nspec);
				free_prop(prop);
				return ret; /* error in generation of prop list */
			}
		}

		/* start building the select spec */
		/* 1.  the number of chunks       */

		sprintf(sprintf_buf, "%d:", num);
		needed = strlen(sprintf_buf) + 1;
		if (cvt_overflow(pcvt_free, needed) &&
		    (cvt_realloc(cvt_bp, cvt_lenp, &pcvt, &pcvt_free) == 0)) {
			free(nspec);
			free_prop(prop);
			return (PBSE_SYSTEM);
		}
		(void) memcpy(pcvt, sprintf_buf, needed);
		pcvt = pcvt + needed - 1; /* advance to NULL byte */
		pcvt_free -= needed;

		/* 2.  the number of cpus */

		sprintf(sprintf_buf, "ncpus=%d", node_req.nr_np * nt);
		needed = strlen(sprintf_buf) + 1;
		if (cvt_overflow(pcvt_free, needed) &&
		    (cvt_realloc(cvt_bp, cvt_lenp, &pcvt, &pcvt_free) == 0)) {
			free(nspec);
			free_prop(prop);
			return (PBSE_SYSTEM);
		}
		(void) memcpy(pcvt, sprintf_buf, needed);
		pcvt = pcvt + needed - 1; /* advance to NULL byte */
		pcvt_free -= needed;

		/* 3. the amt of mem, if specified */

		if (hmem) {
			sprintf(sprintf_buf, ":mem=%lluKB", memamt);
			needed = strlen(sprintf_buf) + 1;
			if (cvt_overflow(pcvt_free, needed) &&
			    (cvt_realloc(cvt_bp, cvt_lenp, &pcvt, &pcvt_free) == 0)) {
				free(nspec);
				free_prop(prop);
				return (PBSE_SYSTEM);
			}
			(void) memcpy(pcvt, sprintf_buf, needed);
			pcvt = pcvt + needed - 1; /* advance to NULL byte */
			pcvt_free -= needed;
		}

		/* 4. now need to see if any property matches a node name */

		for (walkprop = prop; walkprop; walkprop = walkprop->next) {
			for (i = 0; i < svr_totnodes; i++) {
				if (pbsndlist[i]->nd_state & INUSE_DELETED)
					continue;
				if (strcasecmp(pbsndlist[i]->nd_name, walkprop->name) == 0) {
					walkprop->mark = 0;
					break;
				}
			}
		}
		/* 5. now turn each property into "property=True" unless */
		/* it was a nodename, then it is  "host=prop"	  */

		for (walkprop = prop; walkprop; walkprop = walkprop->next) {
			if (walkprop->mark)
				snprintf(sprintf_buf, sizeof(sprintf_buf),
					 ":%s=%s", walkprop->name, ATR_TRUE);
			else
				snprintf(sprintf_buf, sizeof(sprintf_buf),
					 ":host=%s", walkprop->name);
			needed = strlen(sprintf_buf) + 1;
			if (cvt_overflow(pcvt_free, needed) &&
			    (cvt_realloc(cvt_bp, cvt_lenp, &pcvt, &pcvt_free) == 0)) {
				free(nspec);
				free_prop(prop);
				return (PBSE_SYSTEM);
			}
			(void) memcpy(pcvt, sprintf_buf, needed);
			pcvt = pcvt + needed - 1; /* advance to NULL byte */
			pcvt_free -= needed;
		}

		/* 6. if nr_ppn != 1,  add mpiproces=nr_ppn */
		if (node_req.nr_ppn != 1) {
			sprintf(sprintf_buf, ":mpiprocs=%d", node_req.nr_ppn);
			needed = strlen(sprintf_buf) + 1;
			if (cvt_overflow(pcvt_free, needed) &&
			    (cvt_realloc(cvt_bp, cvt_lenp, &pcvt, &pcvt_free) == 0)) {
				free(nspec);
				free_prop(prop);
				return (PBSE_SYSTEM);
			}
			(void) memcpy(pcvt, sprintf_buf, needed);
			pcvt = pcvt + needed - 1; /* advance to NULL byte */
			pcvt_free -= needed;
		}

		if (*str == '+') {
			++str;
			needed = 2; /* 2 = strlen("+") + 1 */
			if (cvt_overflow(pcvt_free, needed) &&
			    (cvt_realloc(cvt_bp, cvt_lenp, &pcvt, &pcvt_free) == 0)) {
				free(nspec);
				free_prop(prop);
				return (PBSE_SYSTEM);
			}
			(void) memcpy(pcvt, "+", needed);
			pcvt = pcvt + needed - 1;
			pcvt_free -= needed;
		} else
			break;
	}
	free(nspec);
	free_prop(prop);
	return 0;
}

#define CVT_PAD 256 /* if less than this much free space, get more */

/**
 * @brief
 * 		is there room in this buffer or should we allocate more?
 *
 * @param[in] buflen is the current buffer's length
 * @param[in] needed is the amount of data we wish to append
 *
 * @return	int
 * @retval	0	- success
 * @retval	1	- overflow
 */
static int
cvt_overflow(size_t buflen, size_t needed)
{
	if ((needed > buflen) || ((buflen - needed) < CVT_PAD))
		return 1;
	else
		return 0;
}

/**
 * @brief
 * 		allocate more room in bufptr.
 *
 * @param[in,out] bp is a pointer to the current buffer
 * @param[in,out] bplen is a pointer to the current buffer's length
 * @param[in,out] curbp is the current pointer into the buffer
 * @param[in,out] bpfree is a pointer to the amount of free space in the
 * 					current buffer
 *
 * @return	int
 * @retval	1	- success
 * @retval	0	- failure
 */
static int
cvt_realloc(char **bp, size_t *bplen, char **curbp, size_t *bpfree)
{
	char *newbp;
	size_t realloc_incr = *bplen;
	ptrdiff_t curoffset = *curbp - *bp;

	if ((newbp = realloc(*bp, *bplen + realloc_incr)) == NULL) {
		return 0;
	} else {
		*bp = newbp;
		*bplen += realloc_incr;
		*bpfree += realloc_incr;
		*curbp = newbp + curoffset;
		return 1;
	}
}

#define JBINXSZ_GROW 16;
/**
 * @brief
 * 		add a job pointer into the index array of a mominfo_t.
 *
 * @par
 *		The index of the entry is used in the exec_host string following the
 *		slash character to be unique for each job running on that Mom
 *
 * @param[in,out]	pnode	- pbsnode structure
 * @param[in]	pjob	- a job pointer
 *
 * @return	int
 * @retval	>=0	- index in which the job got added
 * @retval	-1	- could not realloc memory for adding job index
 */
static int
add_job_index_to_mom(struct pbsnode *pnode, job *pjob)
{
	int i;
	size_t newn;
	size_t oldn;
	job **pnew;
	mom_svrinfo_t *psm;

	psm = (mom_svrinfo_t *) ((pnode->nd_moms[0])->mi_data);

	/* see if there is an empty slot in the array */

	for (i = 0; i < psm->msr_jbinxsz; i++) {
		if (psm->msr_jobindx[i] == NULL) {
			psm->msr_jobindx[i] = pjob;
			return i;
		}
	}

	/* didn't find an empty slot, need to expand array */

	oldn = psm->msr_jbinxsz;
	newn = oldn + JBINXSZ_GROW;

	pnew = realloc(psm->msr_jobindx, sizeof(struct job *) * newn);
	if (pnew == NULL) {
		log_err(PBSE_SYSTEM, "add_job_index_to_mom",
			"could not realloc memory for adding job index");
		return -1;
	}
	for (i = oldn; i < newn; i++)
		pnew[i] = NULL;
	psm->msr_jobindx = pnew;
	psm->msr_jbinxsz = newn;
	psm->msr_jobindx[oldn] = pjob;
	return oldn;
}

/**
 * @brief
 * 		add a job pointer into the index array of a mominfo_t.
 *
 * @par
 *		using a known, old, slot number.   Used to restore the index for a
 *		running job on server recovery.   If for some reason the correct slot
 *		is inuse by a different job, slot -1 is returned.
 *
 * @param[in,out]	pnode	- pbsnode structure
 * @param[in]	pjob	- a job pointer
 * @param[in]	slot	- slot into which job needs to be inserted.
 *
 * @return	int
 * @retval	>=0	- slot where job is inserted.
 * @retval	-1	- already in use or slot doesn't exist.
 */
static int
set_old_job_index(struct pbsnode *pnode, job *pjob, int slot)
{
	int i;
	job **pnew;
	mom_svrinfo_t *psm;

	psm = (mom_svrinfo_t *) ((pnode->nd_moms[0])->mi_data);

	/* see if the slot exists in the array */

	if (slot >= psm->msr_jbinxsz) {
		size_t oldn;
		size_t newn;

		/* slot doesn't exist, need to expand array */

		oldn = psm->msr_jbinxsz;
		newn = slot + JBINXSZ_GROW;

		pnew = realloc(psm->msr_jobindx, sizeof(struct job *) * newn);
		if (pnew == NULL) {
			log_err(PBSE_SYSTEM, "set_old_job_index",
				"could not realloc memory for adding job index");
			return -1;
		}
		for (i = oldn; i < newn; i++)
			pnew[i] = NULL;
		psm->msr_jobindx = pnew;
		psm->msr_jbinxsz = newn;
	}
	/* if slot is empty, or already this job, use slot */
	if ((psm->msr_jobindx[slot] == NULL) || (psm->msr_jobindx[slot] == pjob))
		psm->msr_jobindx[slot] = pjob;
	else
		slot = -1; /* all ready in use */

	return slot;
}

#define OUTBUF_SZ 200
/**
 * @brief
 * 		build an exec_vnode string when the operator only provided a list of
 * 		nodes.
 *
 * @par
 * 		From the select spec, assign each
 *		chunk on a round-robin basis to the nodes given as the destination.
 *
 *		This may very well overload some nodes or end up with chunks on nodes
 *		on which they do not belong,  the operator must be aware.
 *
 * @param[in]	pjob	- a job pointer
 * @param[in]	nds		- list of nodes
 *
 * @return	char *
 * @retval	builded string	- success
 * @retval	NULL	- failure
 *
 * @par MT-safe: No
 */
static char *
build_execvnode(job *pjob, char *nds)
{
	int i;
	int j;
	size_t ns;
	char **ndarray;
	int nnodes = 0;
	long nchunks;
	char *pc;
	char *psl;
	int rc;
	attribute *pschedselect;
	char *selspec;
	static size_t outbufsz = 0;
	static char *outbuf = NULL;

	if (!pjob || !nds)
		return NULL;

	pschedselect = get_jattr(pjob, JOB_ATR_SchedSelect);
	if (!is_attr_set(pschedselect))
		return (nds);

	selspec = pschedselect->at_val.at_str;

	if (outbufsz == 0) {
		outbufsz = OUTBUF_SZ;
		outbuf = malloc(outbufsz);
		if (outbuf == NULL) {
			log_err(ENOMEM, "build_execvnode", "out of  memory");
			return NULL;
		}
	}

	/* break the "plus-ed" list of nodes into an array */

	nnodes = 1;
	pc = nds;
	while ((pc = strchr(pc, (int) '+'))) {
		nnodes++;
		pc++;
	}
	ndarray = (char **) malloc(nnodes * sizeof(char *));
	if (ndarray == NULL)
		return NULL;
	memset(ndarray, 0, nnodes * sizeof(char *));

	i = 0;
	pc = parse_plus_spec(nds, &rc);

	while (pc) {
		if ((*(ndarray + i) = strdup(pc)) == NULL) {
			rc = errno;
			break;
		}
		psl = strchr(*(ndarray + i), (int) '/');
		if (psl)
			*psl = '\0';
		++i;
		pc = parse_plus_spec(NULL, &rc);
	}

	*outbuf = '\0';

	/*
        * if the number of nodes identified for ndarray (nnodes) are not equal
        * to the number of nodes identified by parse_plus_spec, then
        * the vnode specification is invalid.
        */

	if (rc || i != nnodes)
		goto done;

	/* now loop breaking up the select spec into separate chunks */
	/* and determining how many times each chunk is to be used   */

	i = 0;
	pc = parse_plus_spec(selspec, &rc);
	while (pc) {
		nchunks = strtol(pc, &pc, 10);
		if (nchunks <= 0)
			nchunks = 1;

		for (j = 0; j < nchunks; ++j) {

			ns = strlen(*(ndarray + i)) + strlen(pc) + 2;
			if ((strlen(outbuf) + ns) > outbufsz) {
				char *tmp;
				size_t newsz;

				if (ns > OUTBUF_SZ)
					newsz = outbufsz + ns;
				else
					newsz = outbufsz + OUTBUF_SZ;
				tmp = realloc(outbuf, newsz);
				if (tmp) {
					outbuf = tmp;
					outbufsz = newsz;
				} else {
					rc = PBSE_SYSTEM;
					break;
				}
			}

			strcat(outbuf, *(ndarray + i));
			if (*pc != ':')
				strcat(outbuf, ":");
			strcat(outbuf, pc);
			strcat(outbuf, "+");

			if (++i >= nnodes)
				i = 0;
		}
		pc = parse_plus_spec(NULL, &rc);
	}
	*(outbuf + strlen(outbuf) - 1) = '\0'; /* remove trailing '+' */
done:
	/* it is safe to freeing <ndarray> upto <nnodes> as it was memset'd */
	for (i = 0; i < nnodes; ++i)
		free(*(ndarray + i));
	free(ndarray);
	ndarray = NULL;
	if (rc)
		return NULL;
	else
		return (outbuf);
}

/**
 * @brief
 * 		foreach parent mom return the one that is up and with the fewest jobs
 *
 * @param[in]	pnode	- vnode
 * @param[in]	pcur_mom	- former parent Mom
 *
 * @return	mominfo_t *
 * @retval	rtnmom - mom will returned if successful
 * @reval	NULL	- what will be returned if all are down/offline
 */
static mominfo_t *
which_parent_mom(pbsnode *pnode, mominfo_t *pcur_mom)
{
	int i;
	int nj = 0;
	mominfo_t *pmom;
	mom_svrinfo_t *psvrmom;
	mominfo_t *rtnmom;

	/*
	 * If we have a Mom parent of the prior vnode, pcur_mom is not NULL,
	 * continue to use same parent if she is also a parent of this vnode.
	 */

	if (pcur_mom != NULL) {
		for (i = 0; i < pnode->nd_nummoms; ++i) {
			if (pcur_mom == pnode->nd_moms[i])
				return (pcur_mom); /* use same as before */
		}
	}

	/* no former parent Mom or she is not a parent of this vnode,
	 * find the "least busy" Mom parent of this vnode
	 */

	rtnmom = NULL; /* what will be returned if all are down/offline */

	for (i = 0; i < pnode->nd_nummoms; ++i) {
		pmom = pnode->nd_moms[i];
		psvrmom = (mom_svrinfo_t *) pmom->mi_data;

		/* if first mom or mom with fewer jobs, go with her for now */
		if (((pmom->mi_dmn_info->dmn_state & (INUSE_DOWN | INUSE_OFFLINE | INUSE_OFFLINE_BY_MOM)) == 0) &&
		    ((psvrmom->msr_children[0]->nd_state & (INUSE_DOWN | INUSE_OFFLINE | INUSE_OFFLINE_BY_MOM)) == 0)) {
			/* this mom/natural-vnode is not down nor offline */
			if ((rtnmom == NULL) || (nj > psvrmom->msr_numjobs)) {
				nj = psvrmom->msr_numjobs;
				rtnmom = pmom;
			}
		}
	}
	return (rtnmom);
}

/**
 * @brief assign jobs on each subnode of a node
 * 
 * subnode is a structure corresponds to each cpu within a node.
 * assign the jobid on them based on hw_ncpus count. subnodes will
 * be created based on jobs if svr_init is TRUE.
 * 
 * @param[in,out] pnode - node where jobs needs to be assigned
 * @param[in] hw_ncpus - number of cpus requested by the job
 * @param[in] jobid - job id going to land on the node
 * @param[in] svr_init - happens during server initialization?
 * @param[in] share_job - job sharing type
 * @return int 
 * @reval PBSE_* : for failure
 */
static int
assign_jobs_on_subnode(struct pbsnode *pnode, int hw_ncpus, char *jobid, int svr_init, int share_job)
{
	struct pbssubn *snp;
	struct jobinfo *jp;
	int rc = 0;

	if ((svr_init == FALSE) && (pnode->nd_state & INUSE_JOBEXCL)) {
		/* allocate node only if it is not occupied by other jobs */
		for (snp = pnode->nd_psn; snp; snp = snp->next) {
			for (jp = snp->jobs; jp; jp = jp->next) {
				if (strcmp(jp->jobid, jobid))
					return PBSE_RESCUNAV;
			}
		}
	}

	snp = pnode->nd_psn;
	if (hw_ncpus == 0) {
		/* setup jobinfo struture */
		jp = (struct jobinfo *) malloc(sizeof(struct jobinfo));
		if (jp) {
			jp->next = snp->jobs;
			jp->has_cpu = 0; /* has no cpus allocatted */
			snp->jobs = jp;
			jp->jobid = strdup(jobid);
			if (!jp->jobid)
				rc = PBSE_SYSTEM;
		} else
			rc = PBSE_SYSTEM;

	} else {
		struct pbssubn *lst_sn;
		int ncpus;

		lst_sn = NULL;
		for (ncpus = 0; ncpus < hw_ncpus; ncpus++) {

			while (snp->inuse != INUSE_FREE) {
				if (snp->next)
					snp = snp->next;
				else if (svr_init == TRUE) {
					/*
						* Server is in the process of recovering jobs at
						* start up. Haven't contacted the Moms yet, so
						* unsure about the number of cpus.  So add as many
						* subnodes as needed to hold all of the job chunks
						* which were allocated to the node.
						*/
					if ((snp = create_subnode(pnode, lst_sn)) == NULL) {
						return PBSE_SYSTEM;
					}
					break;
				} else
					break; /* if last subnode, use it even if in use */
			}

			if (share_job == VNS_FORCE_EXCL)
				snp->inuse |= INUSE_JOBEXCL;
			else
				snp->inuse |= INUSE_JOB;

			pnode->nd_nsnfree--;
			/*
			* Store the last subnode of parent node list.
			* This removes the need to find the last node of
			* parent node's list, in create_subnode().
			*/
			lst_sn = snp;
			if (pnode->nd_nsnfree < 0) {
				log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
					  LOG_ALERT, pnode->nd_name,
					  "free CPU count went negative on node");
			}

			/* setup jobinfo struture */
			jp = (struct jobinfo *) malloc(sizeof(struct jobinfo));
			if (jp) {
				jp->next = snp->jobs;
				jp->has_cpu = 1; /* has a cpu allocatted */
				snp->jobs = jp;
				jp->jobid = strdup(jobid);
				if (!jp->jobid) {
					rc = PBSE_SYSTEM;
					goto end;
				}
			} else {
				rc = PBSE_SYSTEM;
				goto end;
			}

			DBPRT(("set_node: node: %s/%ld to job %s, still free: %ld\n",
			       pnode->nd_name, snp->index, jobid,
			       pnode->nd_nsnfree))
		}
	}

end:
	if (rc == PBSE_SYSTEM)
		log_errf(rc, __func__, "Failed to allocate memory!");
	return rc;
}

/**
 * @brief update node state based on the job sharing type
 * and node sharing type. For instance:
 * node-state is set to exclusive if either of them are exclusive.
 * 
 * @param[in,out] pnode - node for which state is updated
 * @param[in] share_job - job sharing type
 */
static void
update_node_state(struct pbsnode *pnode, int share_job)
{
	int share_node = get_nattr_long(pnode, ND_ATR_Sharing);

	if (share_node == (int) VNS_FORCE_EXCL || share_node == (int) VNS_FORCE_EXCLHOST) {
		set_vnode_state(pnode, INUSE_JOBEXCL, Nd_State_Or);
	} else if (share_node == VNS_IGNORE_EXCL) {
		if (pnode->nd_nsnfree <= 0)
			set_vnode_state(pnode, INUSE_JOB, Nd_State_Or);
		else
			set_vnode_state(pnode, ~(INUSE_JOB | INUSE_JOBEXCL), Nd_State_And);
	} else if (share_node == VNS_DFLT_EXCL || share_node == VNS_DFLT_EXCLHOST) {
		if (share_job == VNS_IGNORE_EXCL) {
			if (pnode->nd_nsnfree <= 0)
				set_vnode_state(pnode, INUSE_JOB, Nd_State_Or);
			else
				set_vnode_state(pnode, ~(INUSE_JOB | INUSE_JOBEXCL), Nd_State_And);
		} else {
			set_vnode_state(pnode, INUSE_JOBEXCL, Nd_State_Or);
		}
	} else if (share_job == VNS_FORCE_EXCL) {
		set_vnode_state(pnode, INUSE_JOBEXCL, Nd_State_Or);
	} else if (pnode->nd_nsnfree <= 0) {
		set_vnode_state(pnode, INUSE_JOB, Nd_State_Or);
	} else {
		set_vnode_state(pnode, ~(INUSE_JOB | INUSE_JOBEXCL), Nd_State_And);
	}
}

/**
 * @brief Determines job sharing type
 * Job sharing type is determined based on the job placement directive
 * which will be in the form of job's resource.
 * 
 * @param[in] pjob - job struct
 * @return int
 * @retval enum vnode_sharing
 */
int
get_job_share_type(struct job *pjob)
{
	attribute *patresc; /* ptr to job/resv resource_list */
	patresc = get_jattr(pjob, JOB_ATR_resource);
	int share_job = VNS_UNSET;
	resource *pplace;
	resource_def *prsdef;

	prsdef = &svr_resc_def[RESC_PLACE];
	pplace = find_resc_entry(patresc, prsdef);
	if (pplace && pplace->rs_value.at_val.at_str) {
		if ((place_sharing_type(pplace->rs_value.at_val.at_str,
					VNS_FORCE_EXCLHOST) != VNS_UNSET) ||
		    (place_sharing_type(pplace->rs_value.at_val.at_str,
					VNS_FORCE_EXCL) != VNS_UNSET)) {
			share_job = VNS_FORCE_EXCL;
		} else if (place_sharing_type(pplace->rs_value.at_val.at_str,
					      VNS_IGNORE_EXCL) == VNS_IGNORE_EXCL)
			share_job = VNS_IGNORE_EXCL;
		else
			share_job = VNS_DFLT_SHARED;
	}

	return share_job;
}

#define EHBUF_SZ 500
/**
 * @brief
 *	 	set_nodes - take the node plus resource spec from the scheduler or
 *		operator and allocate the named nodes internally.
 *
 * @par Functionality:
 *
 *		Takes the node plus resource spec from the scheduler or operator and
 *		allocate the named nodes internally.  If the operator only provides
 *		a list of nodes,  we attempt to associate the resource chunks from the
 *		select spec with the nodes, see build_execvnode().
 *
 *		"mk_new_host" set true (non-zero) directs that (1) a new exec_host
 *		string should be created and returned and the job should be added to
 *		the job index array on each Mom,  or if false the existing exec_host
 *		string should be used to reset the job_index array on the Moms to
 *		the indices already listed in the existing exec_host.
 *
 *		The job index array is used to provide a "unique" number for each chunk
 *		on a given Mom.  This appears in the exec_host string following the "/"
 *		and was used by Mom on an IBM SP to set the switch interface; it is
 *		currently maintained only for backward compatibility.
 *
 *		On a non error (zero) exit, "execvnod_out" is set to point to either
 *		the original or possibly modified exec_vnode string.
 *
 *		On a non error exit,  "hoststr" is set to point to a new exec_host
 *		string if "mk_new_host" is true or left pointing to the orignal
 *		exec_host string if "mk_new_host" is false.
 *
 *		execvnod_out and hoststr should NOT be freed as they point
 *		either to the original strings or a string living in a static buffer.
 *
 *		"svr_init" is only set to TRUE when the server is recovering running
 *		jobs on startup. This flag tells the function to ignore certain
 *		errors, such as:
 *	   	- unknown resources
 *			It is possible that a resource definition has been removed,
 *			we still wish to have the job show up on the nodes; so ignore
 *			this error.
 *	   	- unlicensed nodes
 *			On initialization, the nodes have not yet been Licensed, and
 *			since they may use fixed licenses, ignore this step.
 *	   	- Job exclusive allocation
 *			Since the node was assigned to the job, just reassigne it
 *			without this check.
 *
 * @param[in]	pobj         -  pointer to an object, either job or reservation
 * @param[in]	objtype      -  set to JOB_OBJECT if pobj points to a job,
 *                              otherwise pobj points to a reservation object
 * @param[in]	execvnod_in  -  original vnode list from scheduler/operator
 * @param[out]	execvnod_out -  original or modified list of vnodes and
 *                              resources, becomes exec_vnode value.
 * @param[in]	hoststr      -  original or modified exec_host string, see
 *                              mk_new_host.
 * @param[in]	hoststr2      - original or modified exec_host2 string
 *
 * @param[in]	mk_new_host  -  if True (non-zero), this function is to create
 *                              a new hoststr including new job indices,
 *                              otherwise return existing exec_host unchanged.
 * @param[in]	svr_init     -  if True, server is recovering jobs.
 *
 * @return	int
 * @retval	PBSE_NONE : success
 * @retval	non-zero  : various PBSE error returns.
 *
 * @par Side Effects: None
 *
 * @par MT-safe: No
 */
int
set_nodes(void *pobj, int objtype, char *execvnod_in, char **execvnod_out, char **hoststr, char **hoststr2, int mk_new_host, int svr_init)
{
	char *chunk;
	int setck;
	char *execvncopy;
	int hasprn; /* set if chunk grouped in parenthesis */
	int hostcpus;
	int i;
	char *last;
	char *execvnod = NULL;
	int ndindex;
	int nelem;
	mominfo_t *parentmom;
	mominfo_t *parentmom_first = NULL;
	char *peh = NULL;
	char *pehnxt = NULL;
	job *pjob = NULL;
	char *pc;
	char *pc2;
	int share_job = VNS_UNSET;
	char *vname;
	resc_resv *presv = NULL;
	int tc; /* num of nodes being allocated  */
	struct pbsnode *pnode;
	struct key_value_pair *pkvp;
	struct howl {
		pbsnode *hw_pnd;   /* ptr to node */
		pbsnode *hw_natvn; /* pointer to "natural" vnode */
		char *hw_mom_host;
		int hw_mom_port;
		mominfo_t *hw_mom;
		int hw_ncpus; /* num of cpus needed from this node */
		int hw_chunk; /* non-zero if start of a chunk      */
		int hw_index; /* index of job on Mom if hw_chunk   */
		int hw_htcpu; /* sum of cpus on this Mom, hw_chuhk */
	} * phowl;
	static size_t ehbufsz = 0;
	static size_t ehbufsz2 = 0;
	static char *ehbuf = NULL;
	static char *ehbuf2 = NULL;
	int rc = 0;

	if (ehbufsz == 0) {
		/* allocate the basic buffer for exec_host string */
		ehbuf = (char *) malloc(EHBUF_SZ);
		if (ehbuf == NULL)
			return (PBSE_SYSTEM);
		ehbufsz = EHBUF_SZ;
	}

	if (ehbufsz2 == 0) {
		/* allocate the basic buffer for exec_host string */
		ehbuf2 = (char *) malloc(EHBUF_SZ);
		if (ehbuf2 == NULL)
			return (PBSE_SYSTEM);
		ehbufsz2 = EHBUF_SZ;
	}

	if (objtype == JOB_OBJECT) {

		pjob = (job *) pobj;

		if (execvnod_in == NULL) {
			execvnod_in = *hoststr;
		}
		if (strchr(execvnod_in, (int) ':') == NULL) {
			/* need to take node only list and build a pseudo */
			/* exec_vnode string with the resources included  */
			execvnod = build_execvnode(pjob, execvnod_in);
		} else {
			execvnod = execvnod_in;
		}
		if (execvnod == NULL)
			return PBSE_BADNODESPEC;

		if (!strlen(execvnod)) {
			log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
				   pjob->ji_qs.ji_jobid, "Unknown node received");
			return PBSE_UNKNODE;
		}

		/* are we to allocate the nodes "excl" ? */
		share_job = get_job_share_type(pjob);

	} else if (objtype == RESC_RESV_OBJECT) {
		presv = (resc_resv *) pobj;
		execvnod = execvnod_in;
	}

	/* first count the number of vnodes */

	tc = 1;
	pc = execvnod;

	while ((pc = strchr(pc, (int) '+')) != NULL) {
		++tc;
		pc++;
	}

	/* allocate an howl array to hold info about allocated nodes */

	phowl = (struct howl *) malloc(tc * sizeof(struct howl));
	if (phowl == NULL)
		return (PBSE_SYSTEM);

	ndindex = 0;

	/* parse the exec_vnode string into a string of chunks and */
	/* then parse each chunk for the required resources        */

	execvncopy = strdup(execvnod);
	if (execvncopy == NULL) {
		rc = PBSE_SYSTEM;
		goto end;
	}

	if (mk_new_host == 0) {
		if (hoststr && *hoststr)
			peh = *hoststr; /* use old exec_host to redo index arrays */
		else
			peh = ""; /* a dummy null string */
	}
	pehnxt = peh;

	setck = 1; /* set flag to indicate likely end of chunk */
	/* therefore next entry is start of new chunk */
	hostcpus = 0; /* number of cpus from all vnodes on host   */
	/* from which chunk was taken		      */

	parentmom = NULL; /* use for multi-mom vnodes		      */

	/* note: hasprn is set based on finding '(' or ')'
	 *	> 0 = found '(' at start of substring
	 *	= 0 = no parens or found both in one substring
	 *	< 0 = found ')' at end of substring
	 */

	for (chunk = parse_plus_spec_r(execvncopy, &last, &hasprn);
	     chunk; chunk = parse_plus_spec_r(last, &last, &hasprn)) {

		if (parse_node_resc(chunk, &vname, &nelem, &pkvp) == 0) {
			if ((pnode = find_nodebyname(vname)) == NULL) {
				if (objtype == JOB_OBJECT) {
					log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
						   pjob->ji_qs.ji_jobid, "Unknown node %s received", vname);
				} else if (objtype == RESC_RESV_OBJECT)
					log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESV, LOG_INFO,
						   presv->ri_qs.ri_resvID, "Unknown node %s received", vname);
				free(execvncopy);
				rc = PBSE_UNKNODE;
				goto end;
			}

			if ((pnode->nd_state & VNODE_UNAVAILABLE) && (svr_init == FALSE))
				if ((objtype == RESC_RESV_OBJECT) && (presv->ri_qs.ri_resvID[0] != PBS_MNTNC_RESV_ID_CHAR) /*&& (presv->ri_qs.ri_state == RESV_UNCONFIRMED)*/)
					set_resv_for_degrade(pnode, presv);

			if (pjob != NULL) { /* only for jobs do we warn if a mom */
				/* hook has not been sent */
				for (i = 0; i < pnode->nd_nummoms; ++i) {

					if ((pnode->nd_moms[i] != NULL) &&
					    (sync_mom_hookfiles_count(pnode->nd_moms[i]) > 0)) {
						snprintf(log_buffer, sizeof(log_buffer),
							 "vnode %s's parent mom %s:%d has a pending copy hook or delete hook request", pnode->nd_name, pnode->nd_moms[i]->mi_host,
							 pnode->nd_moms[i]->mi_port);
						log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_NODE,
							  LOG_WARNING, pjob->ji_qs.ji_jobid, log_buffer);
						break;
					}
				}
			}

			(phowl + ndindex)->hw_pnd = pnode;
			(phowl + ndindex)->hw_ncpus = 0;
			(phowl + ndindex)->hw_chunk = setck;
			(phowl + ndindex)->hw_index = -1; /* will fill in later */
			(phowl + ndindex)->hw_htcpu = 0;
			if (setck == 1) { /* start of new chunk on host */
				if (mk_new_host) {

					/* look up "natural" vnode name for either 'the Mom' */
					/* or 'a Mom' for the real vnode.  This is used in   */
					/* the exec_host string                              */
					if (pnode->nd_nummoms > 1) { /* multi-mom */
						parentmom = which_parent_mom(pnode, parentmom);
						if (parentmom == NULL) {
							/* cannot find a Mom that works */
							free(execvncopy);
							rc = PBSE_SYSTEM;
							goto end;
						}
						/*
						 * save the "first" allocated Mom for incr
						 * the count of jobs on that Mom; used in
						 * load-balancing across multi-Mom vnodes
						 * [i.e. in a Cray]
						 */
						if (parentmom_first == NULL)
							parentmom_first = parentmom;

						/* record "native" vnode for the chosen Mom */
						(phowl + ndindex)->hw_natvn = ((struct mom_svrinfo *) (parentmom->mi_data))->msr_children[0];
						(phowl + ndindex)->hw_mom = parentmom;
					} else if (pnode->nd_nummoms == 1) {
						/* single parent Mom, just use her */
						(phowl + ndindex)->hw_natvn = ((mom_svrinfo_t *) (pnode->nd_moms[0]->mi_data))->msr_children[0];
						(phowl + ndindex)->hw_mom = pnode->nd_moms[0];
						if (parentmom_first == NULL)
							parentmom_first = pnode->nd_moms[0];
						/* if the first chunk goes to a single parent */
						/* set parentmom in case the next chunk can   */
						/* also go there;  otherwise keep the old     */
						/* parentmom value.                           */
						if (parentmom == NULL)
							parentmom = parentmom_first;
					}
				} else if (objtype == JOB_OBJECT) {
					/*
					 * exec_host applies to job's only ...
					 * Have an existing exec_host string which is being
					 * kept.  Reuse it to obtain the "natural" vnode and
					 * the "index" number which we will use in
					 * set_old_job_index() later
					 */
					while (*pehnxt && (*pehnxt != '/'))
						pehnxt++;
					*pehnxt = '\0';
					(phowl + ndindex)->hw_natvn = find_nodebyname(peh);
					if ((phowl + ndindex)->hw_natvn == NULL) {
						log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
							   pjob->ji_qs.ji_jobid, "Unknown node %s received", peh);
						free(phowl);
						free(execvncopy);
						return (PBSE_UNKNODE);
					}
					if ((phowl + ndindex)->hw_pnd->nd_moms)
						(phowl + ndindex)->hw_mom = (phowl + ndindex)->hw_pnd->nd_moms[0];
					else {
						(phowl + ndindex)->hw_mom_host = (phowl + ndindex)->hw_pnd->nd_attr[ND_ATR_Mom].at_val.at_str;
						(phowl + ndindex)->hw_mom_port = (phowl + ndindex)->hw_pnd->nd_attr[ND_ATR_Port].at_val.at_long;
					}
					*pehnxt = '/';
					(phowl + ndindex)->hw_index = atoi(++pehnxt);
					while (*pehnxt && (*pehnxt != '+'))
						pehnxt++;
					if (*pehnxt == '+')
						peh = ++pehnxt;
					else
						peh = pehnxt;
					if (parentmom_first == NULL)
						parentmom_first = (phowl + ndindex)->hw_natvn->nd_moms[0];
				}
			}

			/* set setck to indicate if next vnode starts a new chunk */
			/* stays the same if hasprn == 0			  */
			if (hasprn > 0)
				setck = 0; /* continuation of multi-vnode chunk  */
			else if (hasprn < 0)
				setck = 1; /* end of multi-vnode chunk,start new */

			for (i = 0; i < nelem; i++) {
				if (strcasecmp("ncpus", (pkvp + i)->kv_keyw) == 0)
					(phowl + ndindex)->hw_ncpus = atoi((pkvp + i)->kv_val);
				else {
					if ((find_resc_def(svr_resc_def, (pkvp + i)->kv_keyw) == NULL) && (svr_init == FALSE)) {
						free(execvncopy);
						resc_in_err = strdup((pkvp + i)->kv_keyw);
						rc = PBSE_UNKRESC;
						goto end;
					}
				}
			}

			hostcpus += (phowl + ndindex)->hw_ncpus;

			if (setck == 1) {
				(phowl + ndindex)->hw_htcpu = hostcpus;
				hostcpus = 0;
			}

		} else {
			/* Error */
			free(execvncopy);
			rc = PBSE_BADATVAL;
			goto end;
		}

		ndindex++;
	}

	free(execvncopy);
	execvncopy = NULL;

	/* now we have an array of the required nodes */

	if (objtype == JOB_OBJECT) {
		size_t ehlen;
		size_t ehlen2;

		/* FOR JOBS ... */

		/* make sure that the buf for the new exec_host str is sufficent */
		/* allow room for each name plus /NNNNNN*MMMMMM+ (16 characters) */
		ehlen = 0;
		ehlen2 = 0;

		for (i = 1; i < ndindex; ++i) {
			if ((phowl + i)->hw_chunk) {
				ehlen += strlen((phowl + i)->hw_natvn->nd_name) + 16;
				if ((phowl + i)->hw_mom)
					ehlen2 += strlen((phowl + i)->hw_mom->mi_host);
				else
					ehlen2 += strlen((phowl + i)->hw_mom_host);
				ehlen2 += 6 + 16;
			}
		}

		if (ehlen >= ehbufsz) {
			/* need to grow buffer */
			pc = realloc(ehbuf, ehlen + EHBUF_SZ);
			if (pc) {
				ehbuf = pc;
				ehbufsz = ehlen + EHBUF_SZ;
			} else {
				rc = PBSE_SYSTEM;
				goto end;
			}
		}

		if (ehlen2 >= ehbufsz2) {
			/* need to grow buffer */
			pc2 = realloc(ehbuf2, ehlen2 + EHBUF_SZ);
			if (pc2) {
				ehbuf2 = pc2;
				ehbufsz2 = ehlen2 + EHBUF_SZ;
			} else {
				rc = PBSE_SYSTEM;
				goto end;
			}
		}

		/*
		 * Add a "jobinfo" structure to each subnode of *pnode that
		 * is specified.
		 */

		for (i = 0; i < ndindex; ++i) {

			pnode = (phowl + i)->hw_pnd;

			if ((svr_init == TRUE) &&
			    ((check_job_substate(pjob, JOB_SUBSTATE_SUSPEND) ||
			      check_job_substate(pjob, JOB_SUBSTATE_SCHSUSP))) &&
			    (is_jattr_set(pjob, JOB_ATR_resc_released)))
				/* No need to add suspended job to jobinfo structure and assign CPU slots to it*/
				break;

			rc = assign_jobs_on_subnode(pnode, (phowl + i)->hw_ncpus, pjob->ji_qs.ji_jobid, svr_init, share_job);
			if (rc != PBSE_NONE)
				goto end;

			update_node_state(pnode, share_job);

			/*
			 * now for each new chunk, add the job to the Mom job index
			 * array anew or reusing the indices from the existing
			 * exec_host
			 */

			if ((phowl + i)->hw_chunk && (phowl + i)->hw_mom) {
				if (mk_new_host) {
					/* add new job index to Mom and save it    */
					/* for creating the (new) exec_host string */
					(phowl + i)->hw_index = add_job_index_to_mom((phowl + i)->hw_natvn, pjob);
				} else {
					/* as we are keeping the exec_host from before */
					/* reset the job index to the value saved from */
					/* parsing the old exec_host earlier           */
					(phowl + i)->hw_index =
						set_old_job_index((phowl + i)->hw_natvn,
								  pjob, (phowl + i)->hw_index);
				}
			}
		}

		/* set flag in job that it has nodes associated with it */
		/* increment the number of jobs on the job's Mother Superior */

		pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HasNodes;

		/*
		 * increment the number of jobs on the job's Mother Superior
		 * this has to be done in association with setting
		 * JOB_SVFLG__HasNodes, see free_nodes()
		 * It is decremented in free_nodes()
		 */
		if (parentmom_first)
			((mom_svrinfo_t *) (parentmom_first->mi_data))->msr_numjobs++;

		if (mk_new_host) {

			/* make the new exec_host string */

			*ehbuf = '\0';
			pc = ehbuf;
			*ehbuf2 = '\0';
			pc2 = ehbuf2;
			for (i = 0; i < ndindex; ++i) {
				if ((phowl + i)->hw_chunk) {
					sprintf(pc, "%s/%d", (phowl + i)->hw_natvn->nd_name,
						(phowl + i)->hw_index);

					if ((phowl + i)->hw_mom)
						sprintf(pc2, "%s:%d/%d", (phowl + i)->hw_mom->mi_host,
							(phowl + i)->hw_mom->mi_port,
							(phowl + i)->hw_index);
					else
						sprintf(pc2, "%s:%d/%d", (phowl + i)->hw_mom_host,
							(phowl + i)->hw_mom_port,
							(phowl + i)->hw_index);

					pc = ehbuf + strlen(ehbuf);
					pc2 = ehbuf2 + strlen(ehbuf2);

					if ((phowl + i)->hw_htcpu != 1) {
						sprintf(pc, "*%d", (phowl + i)->hw_htcpu);
						pc = ehbuf + strlen(ehbuf);

						sprintf(pc2, "*%d", (phowl + i)->hw_htcpu);
						pc2 = ehbuf2 + strlen(ehbuf2);
					}
					*(pc++) = '+';
					*pc = '\0';

					*(pc2++) = '+';
					*pc2 = '\0';
				}
			}
			*(ehbuf + strlen(ehbuf) - 1) = '\0';   /* remove last '+' */
			*(ehbuf2 + strlen(ehbuf2) - 1) = '\0'; /* remove last '+' */
		}

	} else {

		/* FOR RESERVATIONS */

		/* now for each node, create a resvinfo structure */
		for (i = 0; i < ndindex; ++i) {

			struct resvinfo *rp;
			/* Create a list of pointers to each vnode associated to the reservation */
			rp = (struct resvinfo *) malloc(sizeof(struct resvinfo));
			if (rp) {
				pbsnode_list_t *tmp_pl;
				rp->next = (phowl + i)->hw_pnd->nd_resvp;
				(phowl + i)->hw_pnd->nd_resvp = rp;
				rp->resvp = presv;

				/* create a backlink from the reservation to the vnode */
				tmp_pl = malloc(sizeof(pbsnode_list_t));
				if (tmp_pl == NULL) {
					free(phowl);
					return PBSE_SYSTEM;
				}
				tmp_pl->next = presv->ri_pbsnode_list;
				tmp_pl->vnode = (phowl + i)->hw_pnd;
				presv->ri_pbsnode_list = tmp_pl;
				presv->ri_vnodect++;
				DBPRT(("%s: Adding %s to %s\n", __func__,
				       (phowl + i)->hw_pnd->nd_name, presv->ri_qs.ri_resvID))
			}
		}
		presv->ri_qs.ri_svrflags |= RESV_SVFLG_HasNodes;
	}

	*execvnod_out = execvnod;
	if (mk_new_host) {
		*hoststr = ehbuf;
		*hoststr2 = ehbuf2;
	}

end:
	free(phowl);
	return rc;
}

static void
remove_job_index_from_mom(job *pjob, struct pbsnode *pnode)
{
	int i;
	int j;
	mom_svrinfo_t *psvrmom;

	if (pnode == NULL)
		return;

	for (i = 0; i < pnode->nd_nummoms; i++) {
		if (pnode->nd_moms[i] == NULL)
			continue;
		psvrmom = (mom_svrinfo_t *) (pnode->nd_moms[i]->mi_data);

		for (j = 0; j < psvrmom->msr_jbinxsz; j++) {
			if (psvrmom->msr_jobindx[j] == pjob) {
				psvrmom->msr_jobindx[j] = NULL;
			}
		}
	}
}

/**
 * @brief
 * 		free nodes allocated to a job
 *
 * @param[in,out]	pjob	- job structure
 *
 * @return	void
 */
void
free_nodes(job *pjob)
{
	struct pbsnode *pnode;
	mom_svrinfo_t *psvrmom;
	char *execvnod_in = NULL;
	char *execvncopy;
	char *chunk;
	char *last;
	int hasprn;
	char *vname;
	int nelem;
	struct key_value_pair *pkvp;
	char *execvnod = NULL;

	/* decrement number of jobs on the Mom who is the first Mom */
	/* for the job, Mother Superior; incremented in set_nodes() */
	/* and saved in ji_destin in assign_hosts()		    */
	if (((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HasNodes) != 0) &&
	    (pjob->ji_qs.ji_destin[0] != '\0')) {
		pnode = find_nodebyname(pjob->ji_qs.ji_destin);
		if (pnode) {
			psvrmom = pnode->nd_moms[0]->mi_data;
			if (--psvrmom->msr_numjobs < 0)
				psvrmom->msr_numjobs = 0;
		}
	}

	/* Now loop through the Moms and remove the jobindx entry */
	/*  remove this jobs's jobinfo entry from each vnode   */

	if (is_jattr_set(pjob, JOB_ATR_exec_vnode_orig))
		execvnod_in = get_jattr_str(pjob, JOB_ATR_exec_vnode_orig);
	else if (is_jattr_set(pjob, JOB_ATR_exec_vnode))
		execvnod_in = get_jattr_str(pjob, JOB_ATR_exec_vnode);

	if (execvnod_in == NULL) {
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "in free_nodes and no exec_vnode");
		return;
	}

	if (strchr(execvnod_in, (int) ':') == NULL) {
		/* need to take node only list and build a pseudo */
		/* exec_vnode string with the resources included  */
		execvnod = build_execvnode(pjob, execvnod_in);
	} else {
		execvnod = execvnod_in;
	}
	execvncopy = strdup(execvnod);
	if (execvncopy == NULL)
		return;

	chunk = parse_plus_spec_r(execvncopy, &last, &hasprn);

	while (chunk) {
		if (parse_node_resc(chunk, &vname, &nelem, &pkvp) == 0) {
			pnode = find_nodebyname(vname);
			remove_job_index_from_mom(pjob, pnode);
			deallocate_job_from_node(pjob->ji_qs.ji_jobid, pnode);
			chunk = parse_plus_spec_r(last, &last, &hasprn);
		}
	}
	free(execvncopy);
	pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_HasNodes;
}

/**
 * @brief
 * 		free nodes allocated to a reservation object
 *
 *		This function is the analog of "free_nodes" for job objects
 *
 * @param[in]	presv	- The reservation for which nodes are freed
 *
 * @return void
 *
 * @par Side-effects: This function will unset the resv-exclusive node state if
 * the reservation has a start time in the past. Care must be taken with
 * standing reservations.
 *
 * @par MT-safe: No
 */
void
free_resvNodes(resc_resv *presv)
{
	struct pbsnode *pnode;
	struct resvinfo *rinfp, *prev;
	int i;
	pbsnode_list_t *pnl;
	pbsnode_list_t *pnl_next;

	DBPRT(("%s: entered\n", __func__))
	for (i = 0; i < svr_totnodes; i++) {
		pnode = pbsndlist[i];

		for (prev = NULL, rinfp = pnode->nd_resvp; rinfp;) {

			if (rinfp->resvp != presv) {
				prev = rinfp;
				rinfp = rinfp->next;
				continue;
			}

			/* garbage collect the pbsnode_list */
			for (pnl = presv->ri_pbsnode_list, pnl_next = pnl; pnl_next; pnl = pnl_next) {
				pnl_next = pnl->next;
				free(pnl);
			}
			presv->ri_pbsnode_list = NULL;

			/* free from provisioning list, if node was in wait_prov */
			free_prov_vnode(pnode);

			/* Unset the resv-exclusive bit if set and
			 * the node was associated to a running reservation
			 * that is either being deleted or just ended.
			 */
			if (pnode->nd_state & INUSE_RESVEXCL &&
			    presv->ri_qs.ri_stime <= time_now)
				set_vnode_state(pnode, ~INUSE_RESVEXCL, Nd_State_And);

			DBPRT(("Freeing resvinfo on node %s from reservation %s\n",
			       pnode->nd_name, presv->ri_qs.ri_resvID))
			if (prev == NULL) {
				pnode->nd_resvp = rinfp->next;
				free(rinfp);
				rinfp = pnode->nd_resvp;
			} else {
				prev->next = rinfp->next;
				free(rinfp);
				rinfp = prev->next;
			}
		}
	}
	presv->ri_vnodect = 0;
	presv->ri_qs.ri_svrflags &= ~RESV_SVFLG_HasNodes;
}

/**
 * @brief
 *	Does a check to make sure a resource value  in 'presc'
 *	has not gone negative, and if so, reset value to 0, and
 *	log a message.
 *
 * @param[in]	prdef	- resource definition of 'presc'
 * @param[in]	presc	- resource in question
 * @param[in]	noden	- non-NULL if resources coming from a vnode
 *
 * @return void
 */
static void
check_for_negative_resource(resource_def *prdef, resource *presc, char *noden)
{
	int nerr = 0;

	if ((prdef == NULL) || (presc == NULL)) {
		return;
	}
	/* make sure nothing in resources_assigned goes negative */
	switch (prdef->rs_type) {
		case ATR_TYPE_LONG:
			if (presc->rs_value.at_val.at_long < 0) {
				presc->rs_value.at_val.at_long = 0;
				nerr = 1;
			}
			break;
		case ATR_TYPE_LL:
			if (presc->rs_value.at_val.at_ll < 0) {
				presc->rs_value.at_val.at_ll = 0;
				nerr = 1;
			}
			break;
		case ATR_TYPE_SHORT:
			if (presc->rs_value.at_val.at_short < 0) {
				presc->rs_value.at_val.at_short = 0;
				nerr = 1;
			}
			break;
		case ATR_TYPE_FLOAT:
			if (presc->rs_value.at_val.at_float < 0.0) {
				presc->rs_value.at_val.at_float = 0.0;
				nerr = 1;
			}
			break;
	}

	if (nerr) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "resource %s went negative on node",
			 prdef->rs_name);
		if (noden) {
			log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
				  LOG_ALERT, noden, log_buffer);
		} else {
			log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER,
				  LOG_ALERT, msg_daemonname, log_buffer);
		}
	}
}

/**
 * @brief
 * 		adjust the resources_assigned on a node.
 *
 * @par
 *		Called with the node name, the node ordinal (0 for first node),
 *		the +/- operator, the resource name, and the resource value.
 *
 * @param[out]	noden	- node name
 * @param[in]	aflag	- node ordinal (0 for first node)
 * @param[in]	batch_op	- operator of type enum batch_op.
 * @param[in]	prdef	- resource structure which stores resource name
 * @param[in]	val	- resource value
 * @param[in]	hop	- always called with 0, this values checks for the level of indirectness.
 *
 * @return	int
 * @retval	0	- success
 * @retval	!=0	- failure code
 */
static int
adj_resc_on_node(char *noden, int aflag, enum batch_op op, resource_def *prdef, char *val, int hop)
{
	pbsnode *pnode;
	resource *presc;
	attribute *pattr;
	int rc;
	attribute tmpattr;

	/* make sure there isn't multiple levels of indirectness */
	/* resource->resource->resource */

	if (hop > 1) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "multiple level of indirectness for resource %s",
			 prdef->rs_name);
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
			  LOG_ALERT, noden, log_buffer);
		return (PBSE_INDIRECTHOP);
	}

	/* If it is accumulated for the Nth node, then */

	if ((prdef->rs_flags & aflag) == 0)
		return 0;

	/* find the node */

	pnode = find_nodebyname(noden);
	if (pnode == NULL)
		return PBSE_UNKNODE;

	/* find the resources_assigned resource for the node */

	pattr = get_nattr(pnode, ND_ATR_ResourceAssn);
	if ((presc = find_resc_entry(pattr, prdef)) == NULL) {
		presc = add_resource_entry(pattr, prdef);
		if (presc == NULL)
			return PBSE_INTERNAL;
	}
	if ((presc->rs_value.at_flags & ATR_VFLAG_INDIRECT) &&
	    (*presc->rs_value.at_val.at_str == '@')) {

		/* indirect reference to another vnode, recurse w/ that node */

		noden = presc->rs_value.at_val.at_str + 1;
		return (adj_resc_on_node(noden, aflag, op, prdef, val, ++hop));
	}

	/* decode the resource value and +/- it to the attribute */

	memset((void *) &tmpattr, 0, sizeof(attribute));
	rc = 0;

	if ((rc = prdef->rs_decode(&tmpattr, ATTR_rescassn, prdef->rs_name, val)) != 0)
		return rc;
	rc = prdef->rs_set(&presc->rs_value, &tmpattr, op);
	if (op == DECR) {
		check_for_negative_resource(prdef, presc, noden);
	}
	return rc;
}

/**
 * @brief
 * 		update the resources assigned at the vnode level
 *		for a job.   Resources_assigned.X is incremented or decremented
 *		based on the operator.
 *
 * @par
 *		The resource list is taken from the exec_vnode string of the job.
 *		It is in the form: NodeA:resc=val:resc=val+NodeB:...
 * @par
 *		Each "chunk" (subspec between plus signs) is broken into the vnode
 *		name and a key_value_pair array of resources and values.  For each
 *		resource, the corresponding resource (if present) in the vnodes's
 *		resources_assigned is adjusted.
 *
 * @param[in]	pjob	- job to update
 * @param[in]	pexech	- exec_vnode string
 * @param[in]	op	- operator of type enum batch_op.
 *
 * @return	void
 */
void
update_job_node_rassn(job *pjob, attribute *pexech, enum batch_op op)
{
	int asgn = ATR_DFLAG_ANASSN | ATR_DFLAG_FNASSN;
	char *chunk;
	int j;
	int nelem;
	char *noden;
	int rc;
	resource_def *prdef = NULL;
	struct key_value_pair *pkvp;
	attribute *queru = NULL;
	attribute *sysru = NULL;
	resource *pr = NULL;
	attribute tmpattr;
	int nchunk = 0;

	/* Parse the exec_vnode string */

	if (!is_attr_set(pexech))
		return;

	if ((pjob != NULL) &&
	    (pexech == get_jattr(pjob, JOB_ATR_exec_vnode_deallocated))) {
		char *pc;
		sysru = get_sattr(SVR_ATR_resource_assn);
		queru = get_qattr(pjob->ji_qhdr, QE_ATR_ResourceAssn);

		pc = pexech->at_val.at_str;
		while (*pc != '\0') {
			/* given exec_vnode format: (<chunk1>+<chunk2>)+(<chunk3), 	*/
			/* <chunk1> and <chunk2> belong to the same node host,      	*/
			/* while  <chunk3> belongs to another node host. 		*/
			/* The number of node host chunks can be determined by # of     */
			/* left parentheses */
			if (*pc == '(') {
				nchunk++;
			}
			pc++;
		}
	}
	chunk = parse_plus_spec(pexech->at_val.at_str, &rc);
	if (rc != 0)
		return;
	while (chunk) {
		if (parse_node_resc(chunk, &noden, &nelem, &pkvp) == 0) {
			for (j = 0; j < nelem; ++j) {
				prdef = find_resc_def(svr_resc_def, pkvp[j].kv_keyw);
				if (prdef == NULL)
					return;

				/* skip all non-consumable resources (e.g. aoe) */
				if ((prdef->rs_flags & asgn) == 0) {
					continue;
				}

				rc = adj_resc_on_node(noden, asgn, op, prdef, pkvp[j].kv_val, 0);
				if (rc && rc != PBSE_UNKNODE)
					return;

				/* update system attribute of resources assigned */

				if (sysru || queru) {
					if ((rc = prdef->rs_decode(&tmpattr, ATTR_rescassn, pkvp[j].kv_keyw,
								   pkvp[j].kv_val)) != 0)
						return;
				}

				if (sysru) {
					pr = find_resc_entry(sysru, prdef);
					if (pr == NULL) {
						pr = add_resource_entry(sysru, prdef);
						if (pr == NULL)
							return;
					}
					prdef->rs_set(&pr->rs_value, &tmpattr, op);
					if (op == DECR) {
						check_for_negative_resource(prdef, pr, NULL);
					}
					post_attr_set(sysru);
				}

				/* update queue attribute of resources assigned */

				if (queru) {
					pr = find_resc_entry(queru, prdef);
					if (pr == NULL) {
						pr = add_resource_entry(queru, prdef);
						if (pr == NULL)
							return;
					}
					prdef->rs_set(&pr->rs_value, &tmpattr, op);
					if (op == DECR) {
						check_for_negative_resource(prdef, pr, NULL);
					}
					post_attr_set(queru);
				}
			}
		} else {
			return;
		}
		asgn = ATR_DFLAG_ANASSN;
		chunk = parse_plus_spec(NULL, &rc);
		if (rc != 0)
			return;
	}

	if (sysru || queru) {
		/* set pseudo-resource "nodect" to the number of chunks */
		prdef = &svr_resc_def[RESC_NODECT];
		if (prdef == NULL) {
			return;
		}
	}
	if (sysru) {
		pr = find_resc_entry(sysru, prdef);
		if (pr == NULL)
			pr = add_resource_entry(sysru, prdef);
		if (pr) {

			if (op == DECR) {
				pr->rs_value.at_val.at_long -= nchunk;
				check_for_negative_resource(prdef, pr, NULL);
			} else {
				pr->rs_value.at_val.at_long += nchunk;
			}
			pr->rs_value.at_flags |= ATR_SET_MOD_MCACHE | ATR_VFLAG_DEFLT;
		}
	}
	if (queru) {
		pr = find_resc_entry(queru, prdef);
		if (pr == NULL)
			pr = add_resource_entry(queru, prdef);
		if (pr) {
			if (op == DECR) {
				pr->rs_value.at_val.at_long -= nchunk;
				check_for_negative_resource(prdef, pr, NULL);
			} else {
				pr->rs_value.at_val.at_long += nchunk;
			}
			pr->rs_value.at_flags |= ATR_VFLAG_DEFLT | ATR_SET_MOD_MCACHE;
		}
	}
	return;
}

/**
 * @brief
 * 		mark node by name down
 *
 * @param[in]	nodename - node being searched then marking as down.
 * @param[in]	why - error message
 *
 * @return void
 */
void
mark_node_down(char *nodename, char *why)
{
	struct pbsnode *pnode;

	/* note - find_nodebyname strips off /VP */

	if ((pnode = find_nodebyname(nodename)) != NULL) {
		/* XXXX fix see momptr_down() XXXX */
		momptr_down(pnode->nd_moms[0], why);
	}
}

/**
 * @brief
 * 		Mark mom (by ptr) down and log message given by 'why'.
 *
 * @param[in]	pmom - a mom entry
 * @param[in]	why - node comment
 *
 * @return void
 */
void
momptr_offline_by_mom(mominfo_t *pmom, char *why)
{
	if (pmom == NULL)
		return;

	pmom->mi_dmn_info->dmn_state |= INUSE_OFFLINE_BY_MOM;

	if ((why != NULL) && (why[0] != '\0'))
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
			  LOG_ALERT, pmom->mi_host, why);

	set_all_state(pmom, 1, INUSE_OFFLINE_BY_MOM, why, Set_All_State_All_Offline);
	return;
}

/**
 * @brief
 * 		offline_by_mom vnodes whose parent mom is 'nodename'.
 *
 * @param[in]	nodename - node to mark offline_by_mom state
 * @param[in]	why - comment to put in the node
 *
 * @return void
 */
void
mark_node_offline_by_mom(char *nodename, char *why)
{
	struct pbsnode *pnode;

	/* note - find_nodebyname strips off /VP */

	if ((pnode = find_nodebyname(nodename)) != NULL) {
		/* XXXX fix see momptr_down() XXXX */
		momptr_offline_by_mom(pnode->nd_moms[0], why);
		node_save_db(pnode);
	}
}

/**
 * @brief
 * 		Clear mom (by ptr) offline_by_mom state and log message given by 'why'.
 *
 * @param[in]	pmom - a mom entry
 * @param[in]	why - node comment
 *
 * @return void
 */
void
momptr_clear_offline_by_mom(mominfo_t *pmom, char *why)
{
	if (pmom == NULL)
		return;

	if ((why != NULL) && (why[0] != '\0'))
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_NODE,
			  LOG_ALERT, pmom->mi_host, why);

	/* '0' second argument means to clear */
	set_all_state(pmom, 0, INUSE_OFFLINE_BY_MOM, why, Set_All_State_Regardless);
	return;
}

/**
 * @brief
 * 		clears offline_by_mom vnodes whose parent mom is 'nodename'.
 *
 * @param[in]	nodename - node to clear offline_by_mom state
 * @param[in]	why - comment to put in the node
 *
 * @return void
 */
void
clear_node_offline_by_mom(char *nodename, char *why)
{
	struct pbsnode *pnode;

	/* note - find_nodebyname strips off /VP */

	if ((pnode = find_nodebyname(nodename)) != NULL) {
		/* XXXX fix see momptr_down() XXXX */
		momptr_clear_offline_by_mom(pnode->nd_moms[0], why);
		node_save_db(pnode);
	}
}

/**
 * @brief
 * 		send Mom on each node a shutdown command.
 *
 * @par
 *		Note, there is no error checking or retry.   If Mom doesn't go down,
 *		so be it.
 */
void
shutdown_nodes(void)
{
	dmn_info_t *pdmninfo;
	int i, ret;

	DBPRT(("%s: entered\n", __func__))
	for (i = 0; i < mominfo_array_size; i++) {
		mominfo_t *pmom;

		pmom = mominfo_array[i];

		if (pmom == NULL)
			continue;

		pdmninfo = pmom->mi_dmn_info;
		if (pdmninfo->dmn_stream < 0)
			continue;

		DBPRT(("%s: down %s\n", __func__, pmom->mi_host))

		ret = is_compose(pdmninfo->dmn_stream, IS_SHUTDOWN);
		if (ret == DIS_SUCCESS) {
			(void) dis_flush(pdmninfo->dmn_stream);
		}
	}
}

/**
 * @brief
 * 		count number of processors specified in node string.
 *
 * @param[out] *hascpp	- is set non-zero if :cpp or :ncpus appears in string
 *						indicating that user has specified fixed placement of cpus
 *
 * @return	totalcpu
 */
int
ctcpus(char *buf, int *hascpp)
{
	int i;
	char *pc;
	char *pplus;
	char *str;
	int totalcpu = 0;

	if (!buf)
		return 0;

	str = buf;
	*hascpp = 0;

	/* look for each subnode element: [N[:]][ppn=Y[:]][cpp=Z] */
	while (*str) {
		int cpp;
		int nd;
		int ppn;

		nd = 1;
		cpp = 1;
		ppn = 1;
		if ((pplus = strchr(str, (int) '+')))
			*pplus = '\0';

		if (number(&str, &i, 1) == 0) {
			nd = i; /* leading "N" */
			if (*str)
				str++;
		}

		while (1) {

			if (property(&str, &pc))
				break;

			if (strncasecmp(pc, "ppn=", 4) == 0) {
				i = atoi(pc + 4);
				if (i == 0)
					return 1; /* error */
				ppn = i;
			}
			if ((strncasecmp(pc, "cpp=", 4) == 0) ||
			    (strncasecmp(pc, "ncpus=", 6) == 0)) {
				*hascpp = 1; /* found a cpp/ncpus item */
				pc = strchr(pc, (int) '=');
				i = atoi(pc + 1);
				if (i == 0)
					return 1;
				cpp = i;
			}
			if (*str != ':')
				break;
			str++;
		}

		totalcpu += nd * cpp * ppn;
		if (pplus) {
			*pplus = '+';
			str = pplus + 1;
			/* continue on to next subnode element */
		} else
			break;
	}
	return totalcpu;
}

/**
 * @brief
 * 		should be called from function pbsd_init.
 *
 * @par
 *		Its purpose is to re-establish the resvinfo for any reservation
 *		having state "CONFIRMED", which is still "time-viable" and which
 *		had a set of nodes allocated to it when the server was taken down.
 *
 *	Specifically:
 *	   a) examine reservation attribute RESV_ATR_resv_nodes to
 *	      determine which vnode are in the '+' separated
 *	      string and if the server still knows about these nodes
 *
 *	   b) if (a) succeeds, call assign_resv_resc to assign the resources to
 *        the reservation
 *
 *	   c) if at any point in the process described in steps (a) or (b)
 *	      a failure occurs, update the reservation's state to cause
 *	      subsequent reservation deletion to occur
 *
 *  @return  void
 *
 *  @par Side-effects:
 *       If the reservation has yet to be CONFIRMED, or has a state
 *		indicating that it's to be deleted, the function simply
 *		returns without doing anything.
 *
 *  @par MT-safe: No
 */
void
set_old_subUniverse(resc_resv *presv)
{
	int rc;
	char *sp;

	if (presv == NULL || svr_totnodes == 0)
		return;

	if (!is_rattr_set(presv, RESV_ATR_resv_nodes)) {
		return;
	}

	if (presv->ri_qs.ri_state != RESV_CONFIRMED &&
	    presv->ri_qs.ri_substate != RESV_DEGRADED &&
	    presv->ri_qs.ri_substate != RESV_IN_CONFLICT &&
	    presv->ri_qs.ri_state != RESV_RUNNING)
		return;

	/* duplicate the resv_nodes because assign_resv_resc will first free the
	 * resv_nodes attribute before doing the allocation and setting the nodes
	 */
	sp = strdup(get_rattr_str(presv, RESV_ATR_resv_nodes));
	if (sp == NULL) {
		log_err(errno, __func__, "Could not allocate memory");
		return;
	}
	/* for resources that are not specified in the request and for which
	 * default values can be determined, set these values as the values
	 * for those resources
	 */
	if ((rc = set_resc_deflt((void *) presv, RESC_RESV_OBJECT, NULL)) != 0) {
		log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV, LOG_NOTICE,
			   presv->ri_qs.ri_resvID, "problem assigning default resource "
						   "to reservation %d",
			   rc);
		free(sp);
		return;
	}
	/* set the nodes on the reservation */
	rc = assign_resv_resc(presv, sp, TRUE);
	if (rc != PBSE_NONE) {
		log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_RESV,
			   LOG_NOTICE, presv->ri_qs.ri_resvID,
			   "problem assigning resource to reservation %d", rc);
		free(sp);
		return;
	}

	if ((presv->ri_qs.ri_state == RESV_RUNNING) ||
	    (presv->ri_qs.ri_state == RESV_TIME_TO_RUN))
		resv_exclusive_handler(presv);

	/* the total number of vnodes associated to the reservation is computed
	 * in set_nodes which is called from assign_resv_resc. We assume that
	 * all vnodes are down until they report up.
	 */
	presv->ri_vnodes_down = presv->ri_vnodect;
	DBPRT(("%s: %s ri_vnodect: %d\n", __func__, presv->ri_qs.ri_resvID,
	       presv->ri_vnodect))
	/* Upon restart, ignore the degraded state of confirmed reservations by
	 * reverting their state back to confirmed. If vnodes don't report back
	 * available the reservation will go through the degradation process.
	 * In other words, we assume the reservation is confirmed again until
	 * proven wrong.
	 */
	if ((presv->ri_qs.ri_substate == RESV_DEGRADED || presv->ri_qs.ri_substate == RESV_IN_CONFLICT) &&
	    presv->ri_qs.ri_state != RESV_RUNNING) {
		(void) resv_setResvState(presv, RESV_CONFIRMED, RESV_CONFIRMED);

		/* unset the reservation retry time attribute */
		unset_resv_retry(presv);
	}
	free(sp);
}

/**
 * @brief
 * 		Walk all vnodes and invoke vnode_unavailable for all those that were
 *  	set offline or offline_by_mom.
 *
 * 		We assume that the reservation is in the state prior to it being degraded,
 * 		which would be either CONFIRMED, UNCONFIRMED, or RUNNING.
 *
 * 		If some of the nodes do not come back up, then the process of degrading
 * 		the reservation is followed by detecting a node as unavailable
 *
 * @return void
 *
 *  @par MT-safe: No
 */
void
degrade_offlined_nodes_reservations(void)
{
	int i;

	DBPRT(("%s: entered\n", __func__))
	for (i = 0; i < svr_totnodes; i++) {
		struct pbsnode *pn;
		pn = pbsndlist[i];
		if ((pn->nd_state & (INUSE_OFFLINE | INUSE_OFFLINE_BY_MOM)) != 0 ||
		    (pn->nd_state & INUSE_UNRESOLVABLE) != 0) {
			/* find all associated reservations and mark them
			 * degraded but do not increment the count of downed
			 * vnodes as these have already been accounted for in
			 * set_old_subuniverse.
			 */
			vnode_unavailable(pn, 0);
		}
	}
	/* create a task to check for vnodes that don't report back up after MAX_NODE_WAIT */
	(void) set_task(WORK_Timed, time_now + MAX_NODE_WAIT,
			degrade_downed_nodes_reservations, NULL);
}

/**
 * @brief
 * 		Walk all vnodes and invoke vnode_unavailable for all those that have
 *  	remained (unknown | down | stale) since the server restarted.
 *
 * @return	void
 *
 * @par MT-safe: No
 */
void
degrade_downed_nodes_reservations(void)
{
	int i;
	struct pbsnode *pn;

	DBPRT(("%s: entered\n", __func__))
	for (i = 0; i < svr_totnodes; i++) {
		pn = pbsndlist[i];
		/* checking for nodes that are down, including stale state,
		 * but excluding those that are offlined as those were checked
		 * earlier in degrade_offlined_nodes_reservations.
		 */
		if (!(pn->nd_state & (INUSE_OFFLINE | INUSE_OFFLINE_BY_MOM)) &&
		    (pn->nd_state & (INUSE_DOWN |
				     INUSE_UNKNOWN | INUSE_STALE))) {
			/* find all associated reservations and mark them
			 * degraded but do not increment the count of downed
			 * vnodes as these have already been accounted for in
			 * set_old_subuniverse
			 */
			vnode_unavailable(pn, 0);
		}
	}
}

/**
 * @brief	Set last_used_time for job's exec_vnodes or reservation's resv_nodes.
 *		Finds the vnodes by name and sets ND_ATR_last_used_time to time_now.
 *
 * @param[in]	pobj - pointer to job/reservation.
 * @param[in]	type - int, denoting the type of object.
 *                     Value 1 means reservation object.
 *                     Value 0 means job object.
 *
 * @retval	void
 */
void
set_last_used_time_node(void *pobj, int type)
{
	char *pc;
	char *pn;
	char *last_pn = NULL;
	struct pbsnode *pnode;
	int rc;
	int time_int_val;

	time_int_val = time_now;

	if (pobj == NULL)
		return;

	if (type) {
		resc_resv *presv;

		presv = pobj;
		pn = parse_plus_spec(get_rattr_str(presv, RESV_ATR_resv_nodes), &rc);
	} else {
		job *pjob;

		pjob = pobj;
		pn = parse_plus_spec(get_jattr_str(pjob, JOB_ATR_exec_vnode), &rc);
	}

	while (pn) {
		int cmp_ret;

		pc = pn;
		while ((*pc != '\0') && (*pc != ':'))
			++pc;
		*pc = '\0';

		if (last_pn == NULL || (cmp_ret = strcmp(pn, last_pn)) != 0) {
			pnode = find_nodebyname(pn);
			/* had better be the "natural" vnode with only the one parent */
			if (pnode) {
				set_nattr_l_slim(pnode, ND_ATR_last_used_time, time_int_val, SET);
				node_save_db(pnode);
			}
		}
		last_pn = pn;
		pn = parse_plus_spec(NULL, &rc);
	}
}

/**
 * @brief update_resource_rel - This function creates JOB_ATR_resc_released_list job attribute
 *		    and add RASSN resources reported in ATTR_released attribute to it.
 * @param[out] pjob - job structure
 * @param[in] attrib - attribute which contains list of resources to be released
 * @param[in] op - kind of operation to be performed while setting the resource value.
 *
 * @return int
 * @retval 0  - SUCCESS
 * @retval > 0 - FAILURE
 */
int
update_resources_rel(job *pjob, attribute *attrib, enum batch_op op)
{
	char *chunk;
	int j;
	int rc;
	int nelem;
	char *noden;
	struct key_value_pair *pkvp;
	resource_def *prdef;
	resource *presc;
	resource *presc_sq;
	attribute tmpattr;

	if (attrib == NULL || pjob == NULL)
		return 1;

	chunk = parse_plus_spec(attrib->at_val.at_str, &rc);
	if (rc != 0)
		return 1;
	while (chunk) {
		if (parse_node_resc(chunk, &noden, &nelem, &pkvp) == 0) {
			for (j = 0; j < nelem; j++) {
				prdef = find_resc_def(svr_resc_def, pkvp[j].kv_keyw);
				if (prdef == NULL)
					return 1;
				if (prdef->rs_flags & (ATR_DFLAG_RASSN | ATR_DFLAG_ANASSN | ATR_DFLAG_FNASSN)) {
					presc = add_resource_entry(get_jattr(pjob, JOB_ATR_resc_released_list), prdef);
					if (presc == NULL)
						return 1;
					if ((rc = prdef->rs_decode(&tmpattr, ATTR_rel_list, prdef->rs_name, pkvp[j].kv_val)) != 0)
						return rc;
					prdef->rs_set(&presc->rs_value, &tmpattr, op);
				}
			}
			chunk = parse_plus_spec(NULL, &rc);
			if (rc != 0)
				return 1;
		} else
			return 1;
	}
	/* Now iterate through all of the job resources that are present on at
	 * queue/server level and add them to resource_release_list. Only do this if
	 * restrict_res_to_release_on_suspend is set
	 */
	if (is_sattr_set(SVR_ATR_restrict_res_to_release_on_suspend)) {
		presc_sq = (resource *) GET_NEXT(get_jattr_list(pjob, JOB_ATR_resource));
		for (; presc_sq != NULL; presc_sq = (resource *) GET_NEXT(presc_sq->rs_link)) {
			prdef = presc_sq->rs_defin;
			/* make sure it is a server/queue level consumable resource and not
			* set in resource_released_list already
			*/
			if ((prdef->rs_flags & ATR_DFLAG_RASSN) &&
			    (find_resc_entry(get_jattr(pjob, JOB_ATR_resc_released_list), prdef) == NULL)) {
				struct array_strings *pval = get_sattr_arst(SVR_ATR_restrict_res_to_release_on_suspend);
				for (j = 0; pval != NULL && j < pval->as_usedptr; j++) {
					if (strcmp(pval->as_string[j], prdef->rs_name) == 0) {
						presc = add_resource_entry(get_jattr(pjob, JOB_ATR_resc_released_list), prdef);
						if (presc == NULL)
							return 1;
						prdef->rs_set(&presc->rs_value, &presc_sq->rs_value, op);
						break;
					}
				}
			}
		}
	}
	return 0;
}

/**
 * @brief
 *	Free pjob's vnodes whose parent mom is a sister mom.
 *
 * @param[in,out] pjob - Job structure
 * @param[in]	vnodelist - non-NULL means it's the list of vnode names
 *			to free. If NULL, free all the vnodes assigned
 *			to 'pjob' whose parent mom is a sister mom.
 * @param[in]	keep_select - non-NULL means it's a select string that
 *			describes vnodes to be kept while freeing all other vnodes
 *			assigned to 'pjob' whose parent mom is a sister mom.
 * @param[out]  err_msg - if function returns != 0 (failure), return
 *			  any error message in this buffer.
 * @param[int]	err_msg_sz - size of 'err_msg' buf.
 * @param[int]	reply_req - the batch request to reply to if any.
 * @return int
 * @retval 0 - success
 * @retval != 0  - failure error code.
 */
int
free_sister_vnodes(job *pjob, char *vnodelist, char *keep_select, char *err_msg,
		   int err_msg_sz, struct batch_request *reply_req)
{
	int rc = 0;
	pbs_sched *psched;

	if (pjob == NULL) {
		log_err(PBSE_INTERNAL, __func__, "bad pjob parameter");
		return (1);
	}

	if (!is_jattr_set(pjob, JOB_ATR_exec_vnode))
		return 0; /* nothing to free up */

	if (err_msg_sz > 0)
		err_msg[0] = '\0';

	/* decrements everything found in exec_vnode */
	set_resc_assigned((void *) pjob, 0, DECR);

	/* re-create the job's exec_vnode based on free vnodes specs */
	if ((rc = recreate_exec_vnode(pjob, vnodelist, keep_select, err_msg,
				      err_msg_sz)) != 0) {
		set_resc_assigned((void *) pjob, 0, INCR);
		return (rc);
	}
	/* increment everything found in new exec_vnode */
	set_resc_assigned((void *) pjob, 0, INCR);

	if (find_assoc_sched_jid(pjob->ji_qs.ji_jobid, &psched))
		set_scheduler_flag(SCH_SCHEDULE_TERM, psched);
	else {
		log_err(-1, __func__, "Unable to find scheduler associated with partition");
	}
	rc = send_job_exec_update_to_mom(pjob, err_msg, err_msg_sz, reply_req);

	if (rc == 0) {
		account_job_update(pjob, PBS_ACCT_UPDATE);
		account_jobstr(pjob, PBS_ACCT_NEXT);
	}

	return (rc);
}

/**
 * @brief
 *	Wrapper function to update_job_node_rassn() function.
 *
 * @param[in]	pexech	- exec_vnode string
 * @param[in]	op	- operator of type enum batch_op.
 */
void
update_node_rassn(attribute *pexech, enum batch_op op)
{
	update_job_node_rassn(NULL, pexech, op);
}

/**
 * @brief update the jid on the respective nodes in the execvnode string
 * and update the state based on share_job value
 * 
 * @param[in] jid - job id
 * @param[in] exec_vnode - execvnode string
 * @param[in] op - operation INCR/DECR
 * @param[in] share_job - job sharing type
 */
void
update_jobs_on_node(char *jid, char *exec_vnode, int op, int share_job)
{
	char *chunk;
	int j;
	int nelem;
	char *noden;
	int rc;
	resource_def *prdef = NULL;
	struct key_value_pair *pkvp;
	int asgn = ATR_DFLAG_ANASSN | ATR_DFLAG_FNASSN;
	long ncpus;
	struct pbsnode *pnode;

	for (chunk = parse_plus_spec(exec_vnode, &rc);
	     chunk && !rc; chunk = parse_plus_spec(NULL, &rc)) {

		if (parse_node_resc(chunk, &noden, &nelem, &pkvp) == 0) {
			if ((pnode = find_nodebyname(noden)) == NULL)
				continue;
			for (j = 0; j < nelem; ++j) {
				prdef = find_resc_def(svr_resc_def, pkvp[j].kv_keyw);
				if (prdef == NULL)
					return;
				/* skip all non-consumable resources (e.g. aoe) */
				if ((prdef->rs_flags & asgn) == 0)
					continue;

				if (!strcmp(prdef->rs_name, "ncpus")) {
					ncpus = strtol(pkvp[j].kv_val, NULL, 10);
					if (ncpus < 0) {
						log_err(PBSE_SYSTEM, "bad value for ncpus: %s\n", pkvp[j].kv_val);
						ncpus = 0;
					}
					if (op == INCR) {
						assign_jobs_on_subnode(pnode, ncpus, jid, 0, share_job);
						update_node_state(pnode, share_job);
					} else if (op == DECR)
						deallocate_job_from_node(jid, pnode);
				}
			}
		}
	}
}

/**
 * @brief - Degrade a reservation.
 *
 * This function is different from vnode_unavailable, as here we know the
 * reservation that needs to be degraded
 *
 * @param[in] - pnode - pbsnode which has gone down.
 * @param[in] - presv - reservation that needs to be degraded.
 *
 */

static void
set_resv_for_degrade(struct pbsnode *pnode, resc_resv *presv)
{
	long degraded_time;

	if ((degraded_time = get_rattr_long(presv, RESV_ATR_resv_standing)) == 0)
		presv->ri_degraded_time = degraded_time;
	else
		find_degraded_occurrence(presv, pnode, Set_Degraded_Time);

	degraded_time = presv->ri_degraded_time;

	if (degraded_time > (time_now + resv_retry_time))
		set_resv_retry(presv, (time_now + resv_retry_time));

	(void) resv_setResvState(presv, presv->ri_qs.ri_state, RESV_DEGRADED);

	/* the number of vnodes down could exceed the number of vnodes in
	 * the reservation only in the case of a standing reservation for
	 * which the vnodes unavailable are associated to later occurrences
	 */
	if (presv->ri_vnodes_down > presv->ri_vnodect) {
		/* If a standing reservation we print the execvnodes sequence
		 * string for debugging purposes
		 */
		if (get_rattr_long(presv, RESV_ATR_resv_standing)) {
			char *execvnodes = NULL;
			int occurrence = -1;

			if (is_rattr_set(presv, RESV_ATR_resv_execvnodes))
				execvnodes = get_rattr_str(presv, RESV_ATR_resv_execvnodes);
			if (execvnodes == NULL)
				execvnodes = "";
 			log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
			           presv->ri_qs.ri_resvID, "execvnodes sequence: %s",
			           execvnodes);
			if (is_rattr_set(presv, RESV_ATR_resv_idx))
				occurrence = get_rattr_long(presv, RESV_ATR_resv_idx);
			log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
			           presv->ri_qs.ri_resvID, "vnodes in occurrence %d: %d; "
			           "unavailable vnodes in reservation: %d",
			           occurrence, presv->ri_vnodect, presv->ri_vnodes_down);
		} else {
			log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_RESV, LOG_DEBUG,
			           presv->ri_qs.ri_resvID, "vnodes in reservation: %d; "
			           "unavailable vnodes in reservation: %d",
			           presv->ri_vnodect, presv->ri_vnodes_down);
		}
	}
	presv->ri_vnodes_down++;
}

/**
 * 	@brief determine the new retry time for a resv
 *
 * 	@param[in] presv - the reservation
 *
 * 	@return long
 * 	@retval next resv retry time for the resv
 */
long
determine_resv_retry(resc_resv *presv)
{
	long retry;
	long resv_start = get_rattr_long(presv, RESV_ATR_start);

	if (time_now < resv_start && time_now + resv_retry_time > resv_start)
		retry = resv_start;
	else
		retry = time_now + resv_retry_time;

	return retry;
}
