/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 *
 * @brief
 *	Functions relating to the Manager Batch Request (qmgr)
 *
 */
#include <pbs_config.h> /* the master config generated by configure */

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>

#include <arpa/inet.h>
#include <ctype.h>
#include <memory.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netdb.h>
#include <errno.h>

#include "libpbs.h"
#include "server_limits.h"
#include "list_link.h"
#include "work_task.h"
#include "attribute.h"
#include "resource.h"
#include "pbs_license.h"
#include "server.h"
#include "job.h"
#include "reservation.h"
#include "queue.h"
#include "credential.h"
#include "batch_request.h"
#include "net_connect.h"
#include "pbs_error.h"
#include "log.h"
#include "pbs_nodes.h"
#include "svrfunc.h"
#include "pbs_ifl.h"
#include "batch_request.h"
#include "hook.h"
#include "hook_func.h"
#include "pbs_entlim.h"
#include "provision.h"
#include "pbs_db.h"
#include "assert.h"
#include "pbs_idx.h"
#include "sched_cmds.h"
#include "pbs_sched.h"
#include "pbs_share.h"

#define PERM_MANAGER (ATR_DFLAG_MGWR | ATR_DFLAG_MGRD)
#define PERM_OPorMGR (ATR_DFLAG_MGWR | ATR_DFLAG_MGRD | ATR_DFLAG_OPRD | ATR_DFLAG_OPWR)

pntPBS_IP_LIST pbs_iplist = NULL;

void *node_idx = NULL;
static void *hostaddr_idx = NULL;

/* Global Data Items: */

extern void unset_license_location(void);
extern void unset_license_min(void);
extern void unset_license_max(void);
extern void unset_license_linger(void);
extern void unset_job_history_enable(void);
extern void unset_job_history_duration(void);
extern void unset_max_job_sequence_id(void);
extern void force_qsub_daemons_update(void);
extern void unset_node_fail_requeue(void);
extern pbs_sched *sched_alloc(char *sched_name);
extern void sched_free(pbs_sched *psched);
extern int sched_delete(pbs_sched *psched);

extern struct server server;
extern pbs_list_head svr_queues;
extern attribute_def que_attr_def[];
extern attribute_def svr_attr_def[];
extern char *msg_attrtype;
extern char *msg_daemonname;
extern char *msg_manager;
extern char *msg_man_cre;
extern char *msg_man_del;
extern char *msg_man_set;
extern char *msg_man_uns;
extern char *msg_noattr;
extern unsigned int pbs_mom_port;
extern char *path_hooks;
extern int max_concurrent_prov;
extern char *msg_cannot_set_route_que;
extern int check_req_aoe_available(struct pbsnode *, char *);
int resize_prov_table(int);

/* private data */

static char *all_quename = "_All_";
static char *all_nodes = "_All_";
enum res_op_flag {
	INDIRECT_RES_UNLINK,
	INDIRECT_RES_CHECK,
};

extern time_t time_now;
extern void *svr_db_conn;
struct work_task *rescdef_wt_g = NULL;

/*
 * This structure used as part of the index tree
 * to do a faster lookup of hostnames.
 * It is stored against pname in make_host_addresses_list()
 */
struct pul_store {
	u_long *pul; /* list of ipaddresses */
	int len;     /* length */
};

/**
 * @brief
 * 		is_local_root: returns TRUE if <user>@<host> corresponds to the local
 *  	root (or Admin-privilege account) on the local host.
 *
 * @param[in]	user	- user name
 * @param[in]	host	- host name
 *
 * @return	Boolean
 * @retval	TRUE	- User has Admin-privilege
 * @retval	FALSE	- No Admin-privilege
 */
int
is_local_root(user, host)
char *user;
char *host;
{
	/* Similar method used in svr_get_privilege() */
	if (strcmp(user, PBS_DEFAULT_ADMIN) == 0) {
		int is_local = 0;
		char myhostname[PBS_MAXHOSTNAME + 1];
		/* First try without DNS lookup. */
		if (strcasecmp(host, server_host) == 0) {
			is_local = 1;
		} else if (strcasecmp(host, LOCALHOST_SHORTNAME) == 0) {
			is_local = 1;
		} else if (strcasecmp(host, LOCALHOST_FULLNAME) == 0) {
			is_local = 1;
		} else {
			if (gethostname(myhostname, (sizeof(myhostname) - 1)) == -1) {
				myhostname[0] = '\0';
			}
			if (strcasecmp(host, myhostname) == 0) {
				is_local = 1;
			}
		}
		if (is_local == 0) {
			/* Now try with DNS lookup. */
			if (is_same_host(host, server_host)) {
				is_local = 1;
			} else if (is_same_host(host, myhostname)) {
				is_local = 1;
			}
		}
		if (is_local != 0)
			return (TRUE);
	}
#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
	char *privil_auth_user = pbs_conf.pbs_privileged_auth_user ? pbs_conf.pbs_privileged_auth_user : NULL;
	char uh[PBS_MAXUSER + PBS_MAXHOSTNAME + 2];
	if (privil_auth_user &&
	    is_string_in_arr(pbs_conf.supported_auth_methods, AUTH_GSS_NAME)) {
		strcpy(uh, user);
		strcat(uh, "@");
		strcat(uh, host);

		if (strcmp(uh, privil_auth_user) == 0) {
			return (TRUE);
		}
	}
#endif
	return (FALSE);
}

/**
 * @brief
 * 		warnings_update - adds node pointer to the warnings array if
 * 		client needs a warning about this node.
 *
 * 		The first argument, a "warning code", determines the processing that
 * 		occurs.  It might be simply some initialization, or it might be the
 * 		actual test for a "warning required" situation.
 *
 * @param[in]	wcode	- "warning code", determines the processing that occurs.
 * @param[out]	wnodes	- warnings array
 * @param[in,out]	widx	- index inside warnings array where the node is getting added
 * @param[in]	np	- node pointer which gets added into warnings array.
 *
 * @return	None
 *
 * @par MT-safe: No
 */
void
warnings_update(int wcode, pbsnode **wnodes, int *widx, pbsnode *np)
{
	resource *resc;
	struct pbssubn *psub;

	static int ngrp_had = 0;
	static char *rname = NULL;
	static resource_def *rscdef = NULL;
	int warning_ok = 0;

	switch (wcode) {
		case WARN_ngrp_init:
		case WARN_ngrp:
		case WARN_ngrp_ck:
			if (is_sattr_set(SVR_ATR_NodeGroupKey))
				warning_ok = 1;
			break;

		default:
			break;
	}

	if (warning_ok == 1 && wcode == WARN_ngrp_init) {
		/*
		 * initialize some static data
		 */

		rname = get_sattr_str(SVR_ATR_NodeGroupKey);
		if ((rname != NULL) && (*rname != '\0'))
			rscdef = find_resc_def(svr_resc_def, rname);
		return;

	} else if (warning_ok == 1 && wcode == WARN_ngrp && wnodes != NULL) {

		if (rscdef != NULL) {
			resc = find_resc_entry(get_nattr(np, ND_ATR_ResourceAvail), rscdef);
			if (resc != NULL) {
				if (resc->rs_value.at_flags & ATR_VFLAG_MODIFY) {
					if (np->nd_resvp) {
						wnodes[*widx] = np;
						*widx += 1;
					}
					for (psub = np->nd_psn; psub != 0; psub = psub->next) {
						if (psub->jobs) {
							wnodes[*widx] = np;
							*widx += 1;
						}
					}
				}
			} else if (resc == NULL && ngrp_had) {

				if (np->nd_resvp) {
					wnodes[*widx] = np;
					*widx += 1;
				}
				for (psub = np->nd_psn; psub != 0; psub = psub->next) {
					if (psub->jobs) {
						wnodes[*widx] = np;
						*widx += 1;
					}
				}
			}
		}
	} else if (warning_ok == 1 && wcode == WARN_ngrp_ck) {
		if (rscdef != NULL) {
			resc = find_resc_entry(get_nattr(np, ND_ATR_ResourceAvail), rscdef);
			if (resc != NULL)
				ngrp_had = 1;
			else
				ngrp_had = 0;
		}
	}
}

/**
 * @brief
 * 		warn_msg_build - assembles the appropriate warning message
 *
 * @param[in]	wcode	- "warning code", determines the processing that occurs.
 * @param[out]	wnodes	- warnings array
 * @param[in,out]	widx	- index inside warnings array where the node is getting added
 *
 * @return	pointer to assembled message
 * @retval	NULL	- pointer if can't build message
 *
 * @note
 * Calling party RESPONSIBLE for freeing dynamic memory that
 * is acquired by this function.
 */

char *
warn_msg_build(int wcode, pbsnode **wnodes, int widx)
{
	char whead[] = "WARNING: modified grouping resource on node(s) with reservation(s)/job(s) - ";
	char nohead[] = "";
	char *wmsg;
	char *phead;
	int i, len;

	if (widx == 0)
		return NULL;

	if (wcode == WARN_ngrp)
		phead = whead;
	else
		phead = nohead;

	for (len = strlen(phead), i = 0; i < widx; ++i)
		len += strlen(wnodes[i]->nd_name) + 2; /* 2: ", " */

	if ((wmsg = malloc(len + 1)) == NULL)
		return NULL;

	strcpy(wmsg, phead);
	for (i = 0; i < widx; ++i) {
		strcat(wmsg, wnodes[i]->nd_name);
		strcat(wmsg, ", ");
	}
	wmsg[strlen(wmsg) - 2] = '\0'; /* overwrite trailing ", " */
	return (wmsg);
}

/**
 * @brief
 * 		check_que_attr - check if attributes in request are consistent with
 *		the current queue type.  This is called when creating or setting
 *		the attributes of a queue.
 *
 * @param[in]	pque	- current queue
 *
 * @return	char *
 * @retval	NULL	- if all ok
 * @retval	NULL	- name if bad attribute is not ok
 */

static char *
check_que_attr(pbs_queue *pque)
{
	int i;
	int type;

	type = pque->qu_qs.qu_type; /* current type of queue */
	for (i = 0; i < (int) QA_ATR_LAST; ++i) {
		if (is_qattr_set(pque, i)) {
			if (que_attr_def[i].at_parent == PARENT_TYPE_QUE_ALL) {
				continue;
			} else if (que_attr_def[i].at_parent == PARENT_TYPE_QUE_EXC) {
				if (type == QTYPE_Unset)
					type = QTYPE_Execution;
				else if (type != QTYPE_Execution)
					return (que_attr_def[i].at_name);

			} else if (que_attr_def[i].at_parent == PARENT_TYPE_QUE_RTE) {
				if (type == QTYPE_Unset)
					type = QTYPE_RoutePush;
				else if (type != QTYPE_RoutePush)
					return (que_attr_def[i].at_name);
			}
		}
	}

	return NULL; /* all attributes are ok */
}
/**
 * @brief
 * 		Set resource limit on resources. If resource limits can not be set
 * 		on any resources, return error. Resource limit can not be set on
 * 		min_walltime and max_walltime.
 *
 * @param[in]	old_list	-  existing attribute list
 * @param[in]	new_list	- new attribute list
 * @param[in]	op	- operation e.g. "SET"
 *
 * @return	success/failure
 * @retval	=0	- OK
 * @retval  >0	- error.
 *
 */

int
set_resources_min_max(attribute *old, attribute *new, enum batch_op op)
{
	if (op == SET) {
		resource_def *resdef = NULL;
		resource *pres = NULL;
		resdef = &svr_resc_def[RESC_MIN_WALLTIME];
		pres = find_resc_entry(new, resdef);
		if (pres != NULL)
			return PBSE_NOLIMIT_RESOURCE;
		resdef = &svr_resc_def[RESC_MAX_WALLTIME];
		pres = find_resc_entry(new, resdef);
		if (pres != NULL)
			return PBSE_NOLIMIT_RESOURCE;
	}
	return (set_resc(old, new, op));
}
/**
 * @brief
 * 		check_que_enable - check if attempt to enable incompletely defined queue
 *		This is the at_action() routine for QA_ATR_Enabled
 *
 * @param[in]	pattr	- attribute structure
 * @param[in]	pque	- queue
 * @param[in]	mode	- not used here
 *
 * @return	error code
 * @retval	0	- success
 * @retval	!=0	- failure
 */

int
check_que_enable(attribute *pattr, void *pque, int mode)
{
	if (pattr->at_val.at_long != 0) {

		/*
		 * admin attempting to  enabled queue,
		 * is it completely defined
		 */

		if (((pbs_queue *) pque)->qu_qs.qu_type == QTYPE_Unset)
			return (PBSE_QUENOEN);
		else if (((pbs_queue *) pque)->qu_qs.qu_type == QTYPE_RoutePush) {
			if (!is_qattr_set((pbs_queue *) pque, QR_ATR_RouteDestin) || (get_qattr_arst((pbs_queue *) pque, QR_ATR_RouteDestin))->as_usedptr == 0)
				return (PBSE_QUENOEN);
		}
	}
	return (0); /* ok to enable queue */
}

/**
 * @brief
 * 		Check the requested value of the queue type attribute
 *		and set qu_type accordingly if there are no conflicts with
 *		route-only or execution-only attributes.
 *		This is the at_action() routine for QA_ATR_QType
 *
 * @param[in]	pattr - pointer to the attribute being set
 * @param[in]	pque  - pointer to parent object (queue)
 * @param[in]	mode  - mode of operation: set, recovery, ... unused here
 *
 * @return	error code
 * @retval	0	- success
 * @retval	!=0	- error
 */

int
set_queue_type(attribute *pattr, void *pque, int mode)
{
	int i;
	char *pca;
	char *pcv;
	int spectype;
	static struct {
		int type;
		char *name;
	} qt[2] = {
		{QTYPE_Execution, "Execution"},
		{QTYPE_RoutePush, "Route"}};

	if (!is_attr_set(pattr))
		/* better be set or we shouldn't be here */
		return (PBSE_BADATVAL);

	/* does the requested value match a legal value? */

	for (i = 0; i < 2; i++) {
		spectype = qt[i].type;
		pca = pattr->at_val.at_str;
		pcv = qt[i].name;
		if (*pca == '\0')
			return (PBSE_BADATVAL);

		while (*pca) {
			if (toupper((int) *pca++) != toupper((int) *pcv++)) {
				spectype = -1; /* no match */
				break;
			}
		}

		if (spectype != -1) { /* set up the attribute */

			/* If not an Execution queue, cannot */
			/* have nodes allocated to it        */
			if ((spectype != QTYPE_Execution) &&
			    is_qattr_set((pbs_queue *) pque, QE_ATR_HasNodes) &&
			    get_qattr_long((pbs_queue *) pque, QE_ATR_HasNodes) != 0) {
				return (PBSE_ATTRTYPE);
			} else {
				if (is_qattr_set((pbs_queue *) pque, QA_ATR_partition) &&
				    (spectype == QTYPE_RoutePush)) {
					return PBSE_CANNOT_SET_ROUTE_QUE;
				}
			}
			((pbs_queue *) pque)->qu_qs.qu_type = spectype;
			(void) free(pattr->at_val.at_str);
			pattr->at_val.at_str = malloc(strlen(qt[i].name) + 1);
			if (pattr->at_val.at_str == NULL)
				return (PBSE_SYSTEM);
			(void) strcpy(pattr->at_val.at_str, qt[i].name);
			pattr->at_flags |= ATR_MOD_MCACHE;
			return (0);
		}
	}
	return (PBSE_BADATVAL);
}

/**
 * @brief
 * 		mgr_log_attr - log the change of an attribute
 *
 * @param[in]	msg	- log message
 * @param[in]	plist	- svrattrl list header
 * @param[in]	logclass	- see log.h
 * @param[in]	objname	- object being modified
 * @param[in]	hookname	- for adding 'by <hookname>' msg
 */

void
mgr_log_attr(char *msg, struct svrattrl *plist, int logclass, char *objname, char *hookname)
{
	char *pstr;

	while (plist) {
		(void) strcpy(log_buffer, msg);
		(void) strcat(log_buffer, plist->al_name);
		if (plist->al_rescln) {
			(void) strcat(log_buffer, ".");
			(void) strcat(log_buffer, plist->al_resc);
		}
		if (plist->al_op == INCR)
			pstr = " + ";
		else if (plist->al_op == DECR)
			pstr = " - ";
		else
			pstr = " = ";
		(void) strcat(log_buffer, pstr);
		if (plist->al_valln)
			(void) strncat(log_buffer, plist->al_value,
				       LOG_BUF_SIZE - strlen(log_buffer) - 1);

		if (hookname != NULL) {
			(void) strncat(log_buffer, " by ",
				       LOG_BUF_SIZE - strlen(log_buffer) - 1);
			(void) strncat(log_buffer, hookname,
				       LOG_BUF_SIZE - strlen(log_buffer) - 1);
		}

		log_buffer[LOG_BUF_SIZE - 1] = '\0';
		log_event(PBSEVENT_ADMIN, logclass, LOG_INFO, objname, log_buffer);
		plist = (struct svrattrl *) GET_NEXT(plist->al_link);
	}
}

/**
 * @brief
 * 		unset_indirect - unset the indirect target for a node resource_available
 *
 * @param[in]	presc	- pointer to resource structure
 * @param[in]	pidx    - search index of the attribute array
 * @param[in]	name	- checks whether name attribute is "resources_available"
 * @param[in]	pobj	- Vnode structure
 * @param[in]	objtype	- it tells node is parent type or not
 */
static void
unset_indirect(resource *presc, void *pidx, attribute_def *pdef, char *name, void *pobj, int objtype)
{
	int i;
	struct pbsnode *pnode;
	resource_def *prdef;

	if (objtype != PARENT_TYPE_NODE)
		return;

	pnode = (struct pbsnode *) pobj;
	(void) fix_indirect_resc_targets(pnode, presc, ND_ATR_ResourceAvail, 0);
	free_str(&presc->rs_value);

	/* Now,  if and only if the above attrbute was "resources_available" */
	/* find and unset indirectness and clear for "resources_assigned"    */

	if (strcasecmp(name, ATTR_rescavail) != 0)
		return;

	i = find_attr(pidx, pdef, ATTR_rescassn);
	if (i < 0)
		return;

	prdef = presc->rs_defin;
	presc = find_resc_entry(get_nattr(pnode, i), prdef);
	if (presc) {
		if (presc->rs_value.at_flags & ATR_VFLAG_INDIRECT) {
			(void) fix_indirect_resc_targets(pnode, presc, ND_ATR_ResourceAssn, 0);
			free_str(&presc->rs_value);
		}
	}
	return;
}

/*
 * @brief
 * 		Set attributes for manager function.
 *
 * @param[in]	pattr	- Address of the parent objects attribute array
 * @param[in]	pidx	- Search index for the attribute def array
 * @param[in]	pdef	- Address of attribute definition array
 * @param[in]	limit	- Last attribute in the list
 * @param[in]	plist	- List of attributes to set
 * @param[in]	privil	- Permission list
 * @param[out]	bad 	- A bad attributes index is returned in this param
 *		       				This actually returns the bad index + 1.
 * @param[in]   parent	- Pointer to the parent object
 * @param[in]   mode 	- operation mode.
 * @param[in]   allow_unkresc	- set to TRUE to allow unknown resource values;
 * 									otherwise, FALSE.
 *
 * @return	Error code
 * @retval	PBSE_NONE  - Success
 * @retval	! PBSE_NONE - Failure
 *
 * @note
 *		The set operation is performed as an atomic operation: all specified
 *		attributes must be successfully set, or none are modified.
 */

static int
mgr_set_attr2(attribute *pattr, void *pidx, attribute_def *pdef, int limit, svrattrl *plist, int privil, int *bad, void *parent, int mode, int allow_unkresc)
{
	int index;
	attribute *new;
	attribute *pre_copy;
	attribute *pnew;
	attribute *pold;
	attribute *attr_save;
	int rc;
	resource *presc;
	resource *oldpresc;

	if (plist == NULL)
		return (PBSE_NONE);

	/*
	 * We have multiple attribute lists in play here.  pre_copy is used
	 * to copy the attributes into pattr prior to calling the action functions.
	 * This means the object passed to the action function is fully up to date.
	 * new is attribute list which the action functions are called with.  new
	 * may be modified by the action functions, so we need to copy these to pattr
	 * a second time.  Lastly we have attr_save.  It is a copy of the object's
	 * attributes prior to any change.  If any action function fails, we copy
	 * attr_copy back to the real attributes before leaving the function.
	 */

	new = (attribute *) calloc((unsigned int) limit, sizeof(attribute));
	if (new == NULL)
		return (PBSE_SYSTEM);

	/* Below says if 'allow_unkresc' is TRUE, then set 'unkn' param
	 * of attr_atomic_set() to '1'; otherwise, set it to '-1' meaning
	 * not to allow unknown resource
	 */
	if ((rc = attr_atomic_set(plist, pattr, new, pidx, pdef, limit, (allow_unkresc ? 1 : -1), privil, bad)) != 0) {
		attr_atomic_kill(new, pdef, limit);
		return (rc);
	}

	pre_copy = (attribute *) calloc((unsigned int) limit, sizeof(attribute));
	if (pre_copy == NULL) {
		attr_atomic_kill(new, pdef, limit);
		return (PBSE_SYSTEM);
	}

	attr_atomic_copy(pre_copy, new, pdef, limit);

	attr_save = calloc((unsigned int) limit, sizeof(attribute));
	if (attr_save == NULL) {
		attr_atomic_kill(new, pdef, limit);
		attr_atomic_kill(pre_copy, pdef, limit);
		return (PBSE_SYSTEM);
	}

	attr_atomic_copy(attr_save, pattr, pdef, limit);

	for (index = 0; index < limit; index++) {
		pnew = pre_copy + index;
		pold = pattr + index;
		if (pnew->at_flags & ATR_VFLAG_MODIFY) {
			/* Special test, aka kludge, for entity-limits, make sure
			 * not accepting an entity without an actual limit; i.e.
			 * [u:user] instead of [u:user=limit].  The [u:user] form
			 * is allowed in the "unset" attribute function in which
			 * case the entry isn't present when it gets here
			 */

			if ((pdef + index)->at_type == ATR_TYPE_ENTITY) {
				svr_entlim_leaf_t *pleaf;
				void *unused = NULL;

				while ((pleaf = entlim_get_next((new + index)->at_val.at_enty.ae_tree, &unused)) != NULL) {

					/* entry that is Modified, and not Set meant it had a null value - illegal */
					if ((pleaf->slf_limit.at_flags & (ATR_VFLAG_SET | ATR_VFLAG_MODIFY)) == ATR_VFLAG_MODIFY) {
						*bad = index + 1;
						attr_atomic_kill(new, pdef, limit);
						attr_atomic_kill(pre_copy, pdef, limit);
						attr_atomic_kill(attr_save, pdef, limit);
						return (PBSE_BADATVAL);
					}
				}
			}

			/* now replace the old values with any modified new values */

			(pdef + index)->at_free(pold);
			pold->at_flags = pnew->at_flags; /* includes MODIFY */

			if (pold->at_type == ATR_TYPE_LIST) {
				list_move(&pnew->at_val.at_list, &pold->at_val.at_list);
			} else if (pold->at_type == ATR_TYPE_RESC) {
				set_resc(pold, pnew, INCR);
				/* clear ATR_VFLAG_DEFLT on modified values */
				for (presc = GET_NEXT(pold->at_val.at_list);
				     presc;
				     presc = GET_NEXT(presc->rs_link)) {
					if (presc->rs_value.at_flags & ATR_VFLAG_MODIFY) {
						presc->rs_value.at_flags &= ~ATR_VFLAG_DEFLT;
					}
				}
				for (presc = GET_NEXT(pnew->at_val.at_list);
				     presc;
				     presc = GET_NEXT(presc->rs_link)) {
					if ((presc->rs_value.at_flags & ATR_VFLAG_MODIFY) == 0) {
						if (presc->rs_value.at_flags & ATR_VFLAG_DEFLT) {
							oldpresc = find_resc_entry(pold,
										   presc->rs_defin);
							if (oldpresc) {
								oldpresc->rs_value.at_flags |= ATR_VFLAG_DEFLT;
							}
						}
					}
				}
				(pdef + index)->at_free(pnew);
			} else {
				/*
				 * copy value from new into old including pointers to
				 * strings and array of strings, clear the
				 * "new" attribute so those "pointers" are not freed
				 * when "new" is freed later
				 */
				*pold = *pnew;
				clear_attr(pnew, pdef + index);
			}
		}
	}

	for (index = 0; index < limit; index++) {
		/*
		 * for each attribute which is to be modified, call the
		 * at_action routine for the attribute, if one exists, with the
		 * new value.  If the action fails, undo everything.
		 */
		if ((new + index)->at_flags & ATR_VFLAG_MODIFY) {
			if ((pdef + index)->at_action) {
				rc = (pdef + index)->at_action((new + index), parent, mode);
				if (rc) {
					*bad = index + 1;
					attr_atomic_kill(new, pdef, limit);
					attr_atomic_kill(pre_copy, pdef, limit);
					attr_atomic_copy(pattr, attr_save, pdef, limit);
					attr_atomic_kill(attr_save, pdef, limit);
					return (rc);
				}
			}
		}
	}

	/* The action functions might have modified new.  Need to set pattr again */

	for (index = 0; index < limit; index++) {
		pnew = new + index;
		pold = pattr + index;
		if (pnew->at_flags & ATR_VFLAG_MODIFY) {
			(pdef + index)->at_free(pold);
			pold->at_flags = pnew->at_flags; /* includes MODIFY */

			if (pold->at_type == ATR_TYPE_LIST) {
				list_move(&pnew->at_val.at_list, &pold->at_val.at_list);
			} else if (pold->at_type == ATR_TYPE_RESC) {
				set_resc(pold, pnew, INCR);
				/* clear ATR_VFLAG_DEFLT on modified values */
				for (presc = GET_NEXT(pold->at_val.at_list);
				     presc;
				     presc = GET_NEXT(presc->rs_link)) {
					if (presc->rs_value.at_flags & ATR_VFLAG_MODIFY) {
						presc->rs_value.at_flags &= ~ATR_VFLAG_DEFLT;
					}
				}
				for (presc = GET_NEXT(pnew->at_val.at_list);
				     presc;
				     presc = GET_NEXT(presc->rs_link)) {
					if ((presc->rs_value.at_flags & ATR_VFLAG_MODIFY) == 0) {
						if (presc->rs_value.at_flags & ATR_VFLAG_DEFLT) {
							oldpresc = find_resc_entry(pold,
										   presc->rs_defin);
							if (oldpresc) {
								oldpresc->rs_value.at_flags |= ATR_VFLAG_DEFLT;
							}
						}
					}
				}
				(pdef + index)->at_free(pnew);
			} else {
				/*
				 * copy value from new into old including pointers to
				 * strings and array of strings, clear the
				 * "new" attribute so those "pointers" are not freed
				 * when "new" is freed later
				 */
				*pold = *pnew;
				clear_attr(pnew, pdef + index);
			}
		}
	}

	/*
	 * we have moved all the "external" values to the old array, thus
	 * we just free the new array, NOT call at_free on each.
	 */
	free(new);
	free(pre_copy);
	attr_atomic_kill(attr_save, pdef, limit);
	if (pdef->at_parent == PARENT_TYPE_NODE)
		((pbsnode *)parent)->nd_modified = 1;
	return (PBSE_NONE);
}

/**
 * @brief
 * 		Wrapper function to 'mgr_set_attr2()' without the 'allow_unkresc'
 * 		argument.
 *
 * @param[in]	pattr	- Address of the parent objects attribute array
 * @param[in]	pidx	- Search index for the attribute def array
 * @param[in]	pdef	- Address of attribute definition array
 * @param[in]	limit	- Last attribute in the list
 * @param[in]	plist	- List of attributes to set
 * @param[in]	privil	- Permission list
 * @param[out]	bad 	- A bad attributes index is returned in this param
 *		       				This actually returns the bad index + 1.
 * @param[in]   parent	- Pointer to the parent object
 * @param[in]   mode 	- operation mode.
 *
 * @return	Error code
 * @retval	PBSE_NONE  - Success
 * @retval	! PBSE_NONE - Failure
 **/
int
mgr_set_attr(attribute *pattr, void *pidx, attribute_def *pdef, int limit, svrattrl *plist, int privil, int *bad, void *parent, int mode)
{
	return (mgr_set_attr2(pattr, pidx, pdef, limit, plist, privil, bad, parent, mode, FALSE));
}

/**
 * @brief
 *		Unset (clear) attributes for manager function
 *
 *		Operation depends on type of attribute and if a resource is specified.
 *		For a attribute of type ATR_TYPE_RESC:
 *	  	If a resource name is specified, unset only that resource entry.
 *	  	If a resource name is not specified, unset the whole attribute.
 *		For a attribute of type ATR_TYPE_ENTITY:
 *	  	If a resource name is specified, unset only entries with that resc
 *	  	If a resource name is not specified, unset the whole attribute.
 *		For "normal" attributes, unset the entire attribute.
 *
 * @param[in]	pattr	- Address of the parent objects attribute array
 * @param[in]	pidx	- Search index of the attribute array
 * @param[in]	pdef	- Address of attribute definition array
 * @param[in]	limit 	- Last attribute in the list
 * @param[in]	plist 	- List of attributes to unset.
 * @param[in]	privil	- Permission list.  A value of -1 is an override that
 * 			 				bypasses the check for permissions, used internally by
 * 			 				the server
 * @param[out]	bad  	- A bad attributes index is returned in this param
 * @param[in]	pobj 	- Pointer to the parent object
 * @param[in] 	ptype 	- Type of the parent object
 * @param[in] 	rflag 	- INDIRECT_RES_UNLINK will unlink indirect resources
 * 		     				INDIRECT_RES_CHECK will return an error if an indirect
 * 		    			 	resource is set
 *
 * @return	Success/Failure
 * @retval	0	- Success
 * @retval	-1  - Failure
 */
static int
mgr_unset_attr(attribute *pattr, void *pidx, attribute_def *pdef, int limit, svrattrl *plist, int privil, int *bad, void *pobj, int ptype, enum res_op_flag rflag)
{
	void *parent_id = NULL;
	pbs_db_attr_list_t db_attr_list;
	int do_indirect_check = 0;
	int index;
	int ord;
	int rc;
	svrattrl *pl;
	resource_def *prsdef;
	resource *presc;
	struct pbsnode *pnode = pobj;
	void *conn = (void *) svr_db_conn;
	pbs_db_obj_info_t obj;
	obj.pbs_db_un.pbs_db_job = NULL;

	/* first check the attribute exists and we have privilege to set */
	ord = 0;
	pl = plist;
	while (pl) {
		ord++;
		index = find_attr(pidx, pdef, pl->al_name);
		if (index < 0) {
			*bad = ord;
			return (PBSE_NOATTR);
		}

		/* have we privilege to unset the attribute ? */

		if ((privil != -1) && ((pdef + index)->at_flags & privil & ATR_DFLAG_WRACC) == 0) {
			*bad = ord;
			return (PBSE_ATTRRO);
		}
		if (((pdef + index)->at_type == ATR_TYPE_RESC) &&
		    (pl->al_resc != NULL)) {

			/* check the individual resource */

			prsdef = find_resc_def(svr_resc_def, pl->al_resc);
			if (prsdef == NULL) {
				*bad = ord;
				return (PBSE_UNKRESC);
			}
			if ((privil != -1) && ((prsdef->rs_flags & privil & ATR_DFLAG_WRACC) == 0)) {
				*bad = ord;
				return (PBSE_PERM);
			}
			presc = find_resc_entry(pattr + index, prsdef);
			if (presc &&
			    (presc->rs_value.at_flags & ATR_VFLAG_TARGET)) {
				if (rflag == INDIRECT_RES_UNLINK) {
					presc->rs_value.at_flags &= ~ATR_VFLAG_TARGET;
				} else {
					*bad = ord;
					return (PBSE_OBJBUSY);
				}
			}
			if ((pnode->nd_state & INUSE_PROV) &&
			    !strcmp(prsdef->rs_name, "aoe")) {
				*bad = ord;
				return (PBSE_NODEPROV_NOACTION);
			}
		}
		if ((pnode->nd_state & INUSE_PROV) &&
		    !strcmp((pdef + index)->at_name, ATTR_NODE_current_aoe)) {
			*bad = ord;
			return (PBSE_NODEPROV_NOACTION);
		}

		pl = (svrattrl *) GET_NEXT(pl->al_link);
	}

	/* ok, now clear them */
	db_attr_list.attr_count = 0;
	CLEAR_HEAD(db_attr_list.attrs);

	while (plist) {
		index = find_attr(pidx, pdef, plist->al_name);
		if (encode_single_attr_db((pdef + index), (pattr + index), &db_attr_list) != 0)
			return (PBSE_NOATTR);

		if (((pdef + index)->at_type == ATR_TYPE_RESC) &&
		    (plist->al_resc != NULL)) {

			/* attribute of type resource and specified resource */
			/* free resource member, not the attribute */

			prsdef = find_resc_def(svr_resc_def, plist->al_resc);
			presc = find_resc_entry(pattr + index, prsdef);
			if (presc) {
				if ((ptype != PARENT_TYPE_SERVER) ||
				    (index != (int) SVR_ATR_resource_cost)) {

					unset_signature(pnode, prsdef->rs_name);
					if ((ptype == PARENT_TYPE_NODE) && (presc->rs_value.at_flags & ATR_VFLAG_INDIRECT)) {
						unset_indirect(presc, pidx, pdef, plist->al_name, pobj, ptype);
						do_indirect_check = 1;
					}
					prsdef->rs_free(&presc->rs_value);
				}
				delete_link(&presc->rs_link);
				free(presc);
				presc = NULL;
			}
			/* If the last resource has been delinked from  */
			/* the attribute,  "unset" the attribute itself */
			presc = (resource *) GET_NEXT((pattr + index)->at_val.at_list);
			if (presc == NULL)
				mark_attr_not_set(pattr + index);
			(pattr + index)->at_flags |= ATR_MOD_MCACHE;

		} else if (((pdef + index)->at_type == ATR_TYPE_ENTITY) &&
			   (plist->al_resc != NULL)) {

			/* attribute of type ENTITY and specifed resource */
			/* unset the entity limit on that resource for    */
			/* all entities */

			unset_entlim_resc(pattr + index, plist->al_resc);

		} else {

			/* either the attribute is not of type ENTITY or RESC */
			/* or there is no specific resource specified         */
			if ((pdef + index)->at_type == ATR_TYPE_RESC) {

				/* if a resource type, check each for being indirect */
				presc = (resource *) GET_NEXT((pattr + index)->at_val.at_list);
				while (presc) {
					if (presc->rs_value.at_flags & ATR_VFLAG_INDIRECT) {
						unset_indirect(presc, pidx, pdef, plist->al_name, pobj, ptype);
						do_indirect_check = 1;
					}
					presc = (resource *) GET_NEXT(presc->rs_link);
				}
			}

			/* now free the whole attribute */

			(pdef + index)->at_free(pattr + index);
			(pattr + index)->at_flags |= ATR_VFLAG_MODIFY;
		}
		plist = (svrattrl *) GET_NEXT(plist->al_link);
	}

	/* now delete the collected list from the database */
	switch (ptype) {
		case PARENT_TYPE_SERVER:
			obj.pbs_db_obj_type = PBS_DB_SVR;
			parent_id = 0;
			break;

		case PARENT_TYPE_SCHED:
			obj.pbs_db_obj_type = PBS_DB_SCHED;
			parent_id = ((pbs_sched *) pobj)->sc_name;
			break;

		case PARENT_TYPE_NODE:
			obj.pbs_db_obj_type = PBS_DB_NODE;
			parent_id = pnode->nd_name;
			break;

		case PARENT_TYPE_QUE_ALL:
			obj.pbs_db_obj_type = PBS_DB_QUEUE;
			parent_id = ((pbs_queue *) pobj)->qu_qs.qu_name;
			break;

		case PARENT_TYPE_JOB:
			obj.pbs_db_obj_type = PBS_DB_JOB;
			parent_id = ((job *) pobj)->ji_qs.ji_jobid;
			break;

		case PARENT_TYPE_RESV:
			obj.pbs_db_obj_type = PBS_DB_RESV;
			parent_id = ((resc_resv *) pobj)->ri_qs.ri_resvID;
			break;
	}

	rc = pbs_db_delete_attr_obj(conn, &obj, parent_id, &db_attr_list);
	free_db_attr_list(&db_attr_list);

	if (rc != 0)
		return -1;

	if (do_indirect_check)
		indirect_target_check(0);
	return (0);
}

/**
 * @brief
 *		Process request to create a queue
 *
 *		Creates queue and calls mgr_set_attr to set queue attributes.
 *
 * @param[in]	preq	- Pointer to a batch request structure
 */

void
	mgr_queue_create(preq) struct batch_request *preq;
{
	int bad;
	char *badattr;
	svrattrl *plist;
	pbs_queue *pque;
	int rc;

	rc = strlen(preq->rq_ind.rq_manager.rq_objname);

	if ((rc > PBS_MAXQUEUENAME) || (rc == 0)) {
		req_reject(PBSE_QUENBIG, 0, preq);
		return;
	}
	if (find_queuebyname(preq->rq_ind.rq_manager.rq_objname)) {
		req_reject(PBSE_QUEEXIST, 0, preq);
		return;
	}

	pque = que_alloc(preq->rq_ind.rq_manager.rq_objname);

	/* set the queue attributes */

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	rc = mgr_set_attr(pque->qu_attr, que_attr_idx, que_attr_def, QA_ATR_LAST, plist, preq->rq_perm, &bad, (void *) pque, ATR_ACTION_NEW);
	if (rc != 0) {
		reply_badattr(rc, bad, plist, preq);
		que_free(pque);
		pque = NULL;
	} else {
		que_save_db(pque);

		log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_QUEUE, LOG_INFO, pque->qu_qs.qu_name, msg_manager, msg_man_cre, preq->rq_user, preq->rq_host);
		mgr_log_attr(msg_man_set, plist, PBS_EVENTCLASS_QUEUE, preq->rq_ind.rq_manager.rq_objname, NULL);

		/* check the appropriateness of the attributes vs. queue type */

		if ((badattr = check_que_attr(pque)) != NULL) {
			/* miss match, issue warning */
			(void) sprintf(log_buffer, msg_attrtype, pque->qu_qs.qu_name, badattr);
			(void) reply_text(preq, PBSE_ATTRTYPE, log_buffer);
		} else {
			reply_ack(preq);
		}
	}
}

/**
 * @brief
 *		Delete a queue
 *
 * @par
 * 		The queue must be empty of jobs
 * @param[in,out]	preq	- Pointer to a batch request structure
 *     							 The request can be rejected with the following error codes:
 *       						PBSE_OBJBUSY returned if it is attached to a reservation/node or if the queue fails to get deleted
 *       						PBSE_UNKQUE returned if the queue is unknown
 *       						PBSE_SYSTEM returned for system issues like failure to allocate memory
 *
 * @par
 * 		Acknowledgement reply sent to the batch request(preq) upon successful deletion of the queue
 */

void
mgr_queue_delete(struct batch_request *preq)
{
	int i;
	int j;
	int total_queues;
	int len;
	char *problem_names;
	int problem_cnt;
	char *name;
	pbs_queue *pque = NULL;
	pbs_queue *next_queue = NULL;
	int rc;
	int type = 0;
	struct pbs_queue **problem_queues = NULL;

	name = preq->rq_ind.rq_manager.rq_objname;

	if ((*name == '\0') || (*name == '@')) {
		type = 1;
	}

	/* get the queue to be deleted */
	if (type == 0) {
		pque = find_queuebyname(name);
	} else {
		problem_queues = (struct pbs_queue **) malloc(server.sv_qs.sv_numque * sizeof(struct pbs_queue *));
		if (problem_queues == NULL) {
			log_err(ENOMEM, __func__, "out of memory");
			req_reject(PBSE_SYSTEM, 0, preq);
			return;
		}
		problem_cnt = 0;
		pque = (pbs_queue *) GET_NEXT(svr_queues);
	}

	/* if the queue is unknown, reject the request */
	if (pque == NULL) {
		free(problem_queues);
		req_reject(PBSE_UNKQUE, 0, preq);
		return;
	}

	total_queues = server.sv_qs.sv_numque;

	for (j = 0; (pque != NULL) && (j < total_queues); j++) {
		rc = 0;
		/* Do not allow deletion of a queue associated to a reservation, unless
		 * it is coming from the server itself */
		if ((pque->qu_resvp != NULL) && (preq->rq_conn != PBS_LOCAL_CONNECTION)) {
			rc = PBSE_OBJBUSY;
		}

		/* are there nodes associated with the queue */

		for (i = 0; i < svr_totnodes; i++) {
			if (pbsndlist[i]->nd_pque == pque) {
				rc = PBSE_OBJBUSY;
				break;
			}
		}

		/* if modification is for all queues, get the next queue if the present queue will be deleted successfully */

		if (type == 1) {
			next_queue = (pbs_queue *) GET_NEXT(pque->qu_link);
		}

		if (rc == 0) {
			char queue_name[PBS_MAXQUEUENAME + 1];
			/* Save the queue name before we purge it so it will appear in the log.  */
			pbs_strncpy(queue_name, pque->qu_qs.qu_name, sizeof(queue_name));
			if ((rc = que_purge(pque)) != 0) {
				rc = PBSE_OBJBUSY;
			} else
				log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_QUEUE, LOG_INFO, queue_name, msg_manager, msg_man_del, preq->rq_user, preq->rq_host);
		}
		if (rc != 0) {
			if (type == 1) {
				if (problem_queues) /*we have an array in which to save*/
					problem_queues[problem_cnt] = pque;
				++problem_cnt;
			} else {
				req_reject(rc, 0, preq);
				return;
			}
		}

		/* Get the next queue if all queues need to be deleted
		 * from the default server */
		if (type == 1) {
			pque = next_queue;
		} else {
			break;
		}
	}

	if (type == 1) { /*modification was for all queues  */

		if (problem_cnt) { /*one or more problems encountered*/

			for (len = 0, i = 0; i < problem_cnt; i++)
				len = strlen(problem_queues[i]->qu_qs.qu_name) + 3;

			len += strlen(pbse_to_txt(PBSE_OBJBUSY));

			if ((problem_names = malloc(len)) != NULL) {

				strcpy(problem_names, pbse_to_txt(PBSE_OBJBUSY));
				for (i = 0; i < problem_cnt; i++) {
					if (i)
						strcat(problem_names, ", ");
					strcat(problem_names, " ");
					strcat(problem_names, problem_queues[i]->qu_qs.qu_name);
				}

				(void) reply_text(preq, PBSE_OBJBUSY, problem_names);
				free(problem_names);
				problem_names = NULL;
			} else {
				(void) reply_text(preq, PBSE_SYSTEM, pbse_to_txt(PBSE_SYSTEM));
			}
		}

		free(problem_queues);
		problem_queues = NULL;

		if (problem_cnt) { /*reply has already been sent  */
			return;
		}
	}

	reply_ack(preq);
}

/**
 * @brief
 *		Set Server Attribute Values
 *
 *		Sets the requested attributes and returns a reply
 *
 * @param[in]	preq	- Pointer to a batch request structure
 * @param[in]	conn	- Pointer to a connection structure assosiated with preq
 */

void
mgr_server_set(struct batch_request *preq, conn_t *conn)
{
	int bad_attr = 0;
	svrattrl *plist, *psvrat;
	pbs_list_head setlist;
	pbs_list_head unsetlist;
	int rc;
	int has_log_events = 0;

	CLEAR_HEAD(setlist);
	CLEAR_HEAD(unsetlist);

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	/*Only root at server host can set server attribute "acl_roots".*/
	while (plist) {
		if (strcmp(plist->al_name, ATTR_logevents) == 0)
			has_log_events = 1;
		if (strcasecmp(plist->al_name, ATTR_aclroot) == 0) {
			if (!is_local_root(preq->rq_user, preq->rq_host)) {
				reply_badattr(PBSE_ATTRRO, bad_attr, plist, preq);
				return;
			}
		}
		/*
		 * We do not overwrite/update the entire record in the database. Therefore, to
		 * unset attributes, we will need to find out the ones with a 0 or NULL value set.
		 * We create a separate list for removal from the list of attributes provided, and
		 * pass it to mgr_unset_attr, below
		 */
		psvrat = dup_svrattrl(plist);
		if (psvrat == NULL) {
			req_reject(PBSE_SYSTEM, 0, preq);
			free_attrlist(&setlist);
			free_attrlist(&unsetlist);
			return;
		}
		if (psvrat->al_atopl.value == NULL || psvrat->al_atopl.value[0] == '\0')
			append_link(&unsetlist, &psvrat->al_link, psvrat);
		else
			append_link(&setlist, &psvrat->al_link, psvrat);

		plist = (struct svrattrl *) GET_NEXT(plist->al_link);
	}

	/* if the unsetist has attributes, call server_unset to remove them separately */
	plist = (svrattrl *) GET_NEXT(unsetlist);
	if (plist) {
		rc = mgr_unset_attr(server.sv_attr, svr_attr_idx, svr_attr_def, SVR_ATR_LAST, plist,
				    preq->rq_perm, &bad_attr, (void *) &server, PARENT_TYPE_SERVER, INDIRECT_RES_CHECK);
		if (rc != 0) {
			reply_badattr(rc, bad_attr, plist, preq);
			free_attrlist(&setlist);
			free_attrlist(&unsetlist);
			return;
		}
	}

	plist = (svrattrl *) GET_NEXT(setlist);
	if (!plist)
		goto done;

	rc = mgr_set_attr(server.sv_attr, svr_attr_idx, svr_attr_def, SVR_ATR_LAST, plist,
			  preq->rq_perm, &bad_attr, (void *) &server,
			  ATR_ACTION_ALTER);
	if (rc != 0) {
		reply_badattr(rc, bad_attr, plist, preq);
	} else {
		svr_save_db(&server);
	done:
		free_attrlist(&setlist);
		free_attrlist(&unsetlist);

		if (has_log_events)
			*log_event_mask = get_sattr_long(SVR_ATR_log_events);
		log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_INFO, msg_daemonname, msg_manager, msg_man_set, preq->rq_user, preq->rq_host);
		mgr_log_attr(msg_man_set, GET_NEXT(preq->rq_ind.rq_manager.rq_attr), PBS_EVENTCLASS_SERVER, msg_daemonname, NULL);
		reply_ack(preq);
	}
}

/**
 * @brief
 *		Unset (clear) Server Attribute Values
 *
 *		Clears the requested attributes and returns a reply
 *
 * @param[in]	preq	- Pointer to a batch request structure
 * @param[in]	conn	- Pointer to a connection structure assosiated with preq
 */

void
mgr_server_unset(struct batch_request *preq, conn_t *conn)
{
	int bad_attr = 0;
	svrattrl *plist;
	int rc;

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);

	/* Check unsetting pbs_license_info,			*/
	/*                 pbs_license_min,			*/
	/*                 pbs_license_max,			*/
	/*                 pbs_license_linger_time.		*/
	while (plist) {
		if (strcasecmp(plist->al_name, ATTR_aclroot) == 0) {
			/*Only root at server host can unset server attribute "acl_roots".*/
			if (!is_local_root(preq->rq_user, preq->rq_host)) {
				reply_badattr(PBSE_ATTRRO, bad_attr, plist, preq);
				return;
			}
		} else if (strcasecmp(plist->al_name,
				      ATTR_pbs_license_info) == 0) {
			unset_license_location();
		} else if (strcasecmp(plist->al_name,
				      ATTR_license_min) == 0) {
			unset_license_min();
		} else if (strcasecmp(plist->al_name,
				      ATTR_license_max) == 0) {
			unset_license_max();
		} else if (strcasecmp(plist->al_name,
				      ATTR_license_linger) == 0) {
			unset_license_linger();
		} else if (strcasecmp(plist->al_name, ATTR_resv_retry_init) == 0 ||
			   strcasecmp(plist->al_name, ATTR_resv_retry_time) == 0) {
			resv_retry_time = RESV_RETRY_TIME_DEFAULT;
		} else if (strcasecmp(plist->al_name,
				      ATTR_JobHistoryEnable) == 0) {
			unset_job_history_enable();
		} else if (strcasecmp(plist->al_name,
				      ATTR_JobHistoryDuration) == 0) {
			unset_job_history_duration();
		} else if (strcasecmp(plist->al_name,
				      ATTR_max_job_sequence_id) == 0) {
			unset_max_job_sequence_id();
		} else if (strcasecmp(plist->al_name,
				      ATTR_max_concurrent_prov) == 0) {
			max_concurrent_prov = PBS_MAX_CONCURRENT_PROV;
			resize_prov_table(max_concurrent_prov);
		} else if (strcasecmp(plist->al_name,
				      ATTR_dfltqsubargs) == 0) {
			force_qsub_daemons_update();
		} else if (strcasecmp(plist->al_name,
				      ATTR_nodefailrq) == 0) {
			unset_node_fail_requeue();
		} else if (strcasecmp(plist->al_name,
				      ATTR_jobscript_max_size) == 0) {
			unset_jobscript_max_size();
		} else if (strcasecmp(plist->al_name,
				      ATTR_scheduling) == 0) {
			if (dflt_scheduler) {
				set_sched_attr_l_slim(dflt_scheduler, SCHED_ATR_scheduling, 0, SET);
				sched_save_db(dflt_scheduler);
			}
		} else if (strcasecmp(plist->al_name, ATTR_schediteration) == 0) {
			if (dflt_scheduler) {
				svrattrl *tm_list;
				/* value is 600 so it is of size 4 including terminating character */
				tm_list = attrlist_create(plist->al_name, NULL, 8);
				if (tm_list == NULL) {
					reply_badattr(-1, bad_attr, plist, preq);
				}
				tm_list->al_link.ll_next->ll_struct = NULL;
				/* when unset, set scheduler_iteration to 600 seconds */
				sprintf(tm_list->al_value, "%d", PBS_SCHEDULE_CYCLE);
				rc = mgr_set_attr(dflt_scheduler->sch_attr, sched_attr_idx, sched_attr_def, SCHED_ATR_LAST, tm_list,
						  MGR_ONLY_SET, &bad_attr, (void *) dflt_scheduler, ATR_ACTION_ALTER);
				if (rc != 0) {
					free_svrattrl(tm_list);
					reply_badattr(rc, bad_attr, plist, preq);
					return;
				}
				sched_save_db(dflt_scheduler);
				free_svrattrl(tm_list);
			}
		}
		plist = (struct svrattrl *) GET_NEXT(plist->al_link);
	}
	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);

	rc = mgr_unset_attr(server.sv_attr, svr_attr_idx, svr_attr_def, SVR_ATR_LAST, plist,
			    preq->rq_perm, &bad_attr, (void *) &server, PARENT_TYPE_SERVER, INDIRECT_RES_CHECK);
	if (rc != 0)
		reply_badattr(rc, bad_attr, plist, preq);
	else {
		attribute *pattr = get_sattr(SVR_ATR_DefaultChunk);
		if (pattr->at_flags & ATR_VFLAG_MODIFY) {
			(void) deflt_chunk_action(pattr, (void *) &server, ATR_ACTION_ALTER);
		}
		/* Now set the default values on some of the unset attributes */
		plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
		for (plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
		     plist != NULL; plist = (struct svrattrl *) GET_NEXT(plist->al_link)) {
			if (strcasecmp(plist->al_name, ATTR_logevents) == 0) {
				set_sattr_l_slim(SVR_ATR_log_events, SVR_LOG_DFLT, SET);
				*log_event_mask = get_sattr_long(SVR_ATR_log_events);
			} else if (strcasecmp(plist->al_name, ATTR_mailer) == 0)
				set_sattr_str_slim(SVR_ATR_mailer, SENDMAIL_CMD, NULL);
			else if (strcasecmp(plist->al_name, ATTR_mailfrom) == 0)
				set_sattr_str_slim(SVR_ATR_mailfrom, PBS_DEFAULT_MAIL, NULL);
			else if (strcasecmp(plist->al_name, ATTR_queryother) == 0)
				set_sattr_l_slim(SVR_ATR_query_others, 1, SET);
			else if (strcasecmp(plist->al_name, ATTR_schediteration) == 0)
				set_sattr_str_slim(SVR_ATR_scheduler_iteration, TOSTR(PBS_SCHEDULE_CYCLE), NULL);
			else if (strcasecmp(plist->al_name, ATTR_ResvEnable) == 0)
				set_sattr_l_slim(SVR_ATR_ResvEnable, 1, SET);
			else if (strcasecmp(plist->al_name, ATTR_maxarraysize) == 0)
				set_sattr_str_slim(SVR_ATR_maxarraysize, TOSTR(PBS_MAX_ARRAY_JOB_DFL), NULL);
			else if (strcasecmp(plist->al_name, ATTR_max_concurrent_prov) == 0)
				set_sattr_str_slim(SVR_ATR_max_concurrent_prov, TOSTR(PBS_MAX_CONCURRENT_PROV), NULL);
			else if (strcasecmp(plist->al_name, ATTR_EligibleTimeEnable) == 0)
				set_sattr_l_slim(SVR_ATR_EligibleTimeEnable, 0, SET);
			else if (strcasecmp(plist->al_name, ATTR_license_linger) == 0) {
				set_sattr_l_slim(SVR_ATR_license_linger, PBS_LIC_LINGER_TIME, SET);
				licensing_control.licenses_linger_time = PBS_LIC_LINGER_TIME;
			} else if (strcasecmp(plist->al_name, ATTR_license_max) == 0) {
				set_sattr_l_slim(SVR_ATR_license_max, PBS_MAX_LICENSING_LICENSES, SET);
				licensing_control.licenses_max = PBS_MAX_LICENSING_LICENSES;
			} else if (strcasecmp(plist->al_name, ATTR_license_min) == 0) {
				set_sattr_l_slim(SVR_ATR_license_min, PBS_MIN_LICENSING_LICENSES, SET);
				licensing_control.licenses_min = PBS_MIN_LICENSING_LICENSES;
			} else if (strcasecmp(plist->al_name, ATTR_rescdflt) == 0) {
				if (plist->al_resc != NULL && strcasecmp(plist->al_resc, "ncpus") == 0) {
					svrattrl *tm_list;
					tm_list = attrlist_create(plist->al_name, "ncpus", 8);
					if (tm_list == NULL) {
						reply_badattr(-1, bad_attr, plist, preq);
						return;
					}
					tm_list->al_link.ll_next->ll_struct = NULL;
					sprintf(tm_list->al_value, "%d", 1);
					rc = mgr_set_attr(server.sv_attr, svr_attr_idx, svr_attr_def, SVR_ATR_LAST, tm_list,
							  NO_USER_SET, &bad_attr, (void *) &server, ATR_ACTION_ALTER);
					if (rc != 0) {
						free_svrattrl(tm_list);
						reply_badattr(rc, bad_attr, plist, preq);
						return;
					}
					free_svrattrl(tm_list);
				}
			} else if (strcasecmp(plist->al_name, ATTR_scheduling) == 0)
				set_sattr_l_slim(SVR_ATR_scheduling, 1, SET);
		}
		svr_save_db(&server);
		log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_INFO,
			   msg_daemonname, msg_manager, msg_man_uns,
			   preq->rq_user, preq->rq_host);
		mgr_log_attr(msg_man_uns, GET_NEXT(preq->rq_ind.rq_manager.rq_attr), PBS_EVENTCLASS_SERVER, msg_daemonname, NULL);
		reply_ack(preq);
	}
}

/**
 * @brief
 *		Set Scheduler Attribute Values
 *
 *		Sets the requested attributes and returns a reply
 *
 * @param[in] preq - Pointer to a batch request structure
 */

void
mgr_sched_set(struct batch_request *preq)
{
	int bad_attr = 0;
	svrattrl *plist, *psvrat;
	pbs_list_head setlist;
	pbs_list_head unsetlist;
	int rc;
	pbs_sched *psched;
	int only_scheduling = 1;

	psched = find_sched(preq->rq_ind.rq_manager.rq_objname);
	if (!psched) {
		req_reject(PBSE_UNKSCHED, 0, preq);
		return;
	}

	CLEAR_HEAD(setlist);
	CLEAR_HEAD(unsetlist);

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	while (plist) {
		if (strcmp(plist->al_atopl.name, ATTR_scheduling)) {
			only_scheduling = 0;
		}
		/*
		 * We do not overwrite/update the entire record in the database. Therefore, to
		 * unset attributes, we will need to find out the ones with a 0 or NULL value set.
		 * We create a separate list for removal from the list of attributes provided, and
		 * pass it to mgr_unset_attr, below
		 */
		psvrat = dup_svrattrl(plist);
		if (psvrat == NULL) {
			req_reject(PBSE_SYSTEM, 0, preq);
			free_attrlist(&setlist);
			free_attrlist(&unsetlist);
			return;
		}
		if (psvrat->al_atopl.value == NULL || psvrat->al_atopl.value[0] == '\0')
			append_link(&unsetlist, &psvrat->al_link, psvrat);
		else
			append_link(&setlist, &psvrat->al_link, psvrat);

		plist = (struct svrattrl *) GET_NEXT(plist->al_link);
	}

	/* if the unsetlist has attributes, call server_unset to remove them separately */
	plist = (svrattrl *) GET_NEXT(unsetlist);
	if (plist) {
		rc = mgr_unset_attr(psched->sch_attr, sched_attr_idx, sched_attr_def, SCHED_ATR_LAST, plist,
				    preq->rq_perm, &bad_attr, (void *) psched, PARENT_TYPE_SCHED, INDIRECT_RES_CHECK);
		if (rc != 0) {
			reply_badattr(rc, bad_attr, plist, preq);
			free_attrlist(&setlist);
			free_attrlist(&unsetlist);
			return;
		}
	}

	plist = (svrattrl *) GET_NEXT(setlist);
	if (!plist)
		goto done;

	rc = mgr_set_attr(psched->sch_attr, sched_attr_idx, sched_attr_def,
			  SCHED_ATR_LAST, plist, preq->rq_perm, &bad_attr, (void *) psched, ATR_ACTION_ALTER);
	if (rc != 0) {
		reply_badattr(rc, bad_attr, plist, preq);
		free_attrlist(&setlist);
		free_attrlist(&unsetlist);
		return;
	}
	if (only_scheduling != 1)
		set_scheduler_flag(SCH_CONFIGURE, psched);

	sched_save_db(psched);

done:
	free_attrlist(&setlist);
	free_attrlist(&unsetlist);
	sprintf(log_buffer, msg_manager, msg_man_set, preq->rq_user, preq->rq_host);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SCHED, LOG_INFO, msg_daemonname, log_buffer);
	mgr_log_attr(msg_man_set, GET_NEXT(preq->rq_ind.rq_manager.rq_attr), PBS_EVENTCLASS_SCHED, msg_daemonname, NULL);
	reply_ack(preq);
}

/**
 * @brief
 *		Unset (clear) Sched Attribute Values
 *
 *		Clears the requested attributes and returns a reply
 *
 * @param[in]	preq	- Pointer to a batch request structure
 */

void
mgr_sched_unset(struct batch_request *preq)
{
	int bad_attr = 0;
	svrattrl *plist, *tmp_plist;
	int rc;

	pbs_sched *psched = find_sched(preq->rq_ind.rq_manager.rq_objname);
	if (!psched) {
		req_reject(PBSE_UNKSCHED, 0, preq);
		return;
	}

	for (tmp_plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr); tmp_plist; tmp_plist = (struct svrattrl *) GET_NEXT(tmp_plist->al_link)) {
		if (strcasecmp(tmp_plist->al_name, ATTR_schediteration) == 0) {
			if (dflt_scheduler) {
				svrattrl *t_list;

				t_list = attrlist_create(tmp_plist->al_name, NULL, 0);
				if (t_list == NULL)
					reply_badattr(-1, bad_attr, tmp_plist, preq);

				t_list->al_link.ll_next->ll_struct = NULL;
				rc = mgr_unset_attr(server.sv_attr, svr_attr_idx, svr_attr_def, SVR_ATR_LAST, t_list,
						    -1, &bad_attr, (void *) &server, PARENT_TYPE_SERVER, INDIRECT_RES_CHECK);
				if (rc != 0) {
					free_svrattrl(t_list);
					reply_badattr(rc, bad_attr, tmp_plist, preq);
					return;
				}
				svr_save_db(&server);
				free_svrattrl(t_list);
			}
		}
	}

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	rc = mgr_unset_attr(psched->sch_attr, sched_attr_idx, sched_attr_def, SCHED_ATR_LAST, plist,
			    preq->rq_perm, &bad_attr, (void *) psched, PARENT_TYPE_SCHED, INDIRECT_RES_CHECK);
	if (rc != 0) {
		reply_badattr(rc, bad_attr, plist, preq);
		return;
	}

	set_sched_default(psched, 0);

	sched_save_db(psched);
	sprintf(log_buffer, msg_manager, msg_man_uns, preq->rq_user, preq->rq_host);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SCHED, LOG_INFO, msg_daemonname, log_buffer);
	mgr_log_attr(msg_man_uns, plist, PBS_EVENTCLASS_SCHED, msg_daemonname, NULL);
	reply_ack(preq);
}

/**
 * @brief
 *		Set Queue Attribute Values
 *
 *		Finds the queue, Sets the requested attributes and returns a reply
 *
 * @param[in]	preq	- Pointer to a batch request structure
 */

void
mgr_queue_set(struct batch_request *preq)
{
	int allques;
	int bad = 0;
	char *badattr;
	svrattrl *plist, *psvrat;
	pbs_list_head setlist;
	pbs_list_head unsetlist;
	pbs_queue *pque;
	char *qname;
	int rc;

	if ((*preq->rq_ind.rq_manager.rq_objname == '\0') ||
	    (*preq->rq_ind.rq_manager.rq_objname == '@')) {
		qname = all_quename;
		allques = 1;
		pque = (pbs_queue *) GET_NEXT(svr_queues);
	} else {
		qname = preq->rq_ind.rq_manager.rq_objname;
		allques = 0;
		pque = find_queuebyname(qname);
	}
	if (pque == NULL) {
		req_reject(PBSE_UNKQUE, 0, preq);
		return;
	}

	log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_QUEUE, LOG_INFO, qname, msg_manager, msg_man_set, preq->rq_user, preq->rq_host);

	CLEAR_HEAD(setlist);
	CLEAR_HEAD(unsetlist);

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	while (plist) {
		/*
		 * We do not overwrite/update the entire record in the database. Therefore, to
		 * unset attributes, we will need to find out the ones with a 0 or NULL value set.
		 * We create a separate list for removal from the list of attributes provided, and
		 * pass it to mgr_unset_attr, below
		 */
		psvrat = dup_svrattrl(plist);
		if (psvrat == NULL) {
			req_reject(PBSE_SYSTEM, 0, preq);
			free_attrlist(&setlist);
			free_attrlist(&unsetlist);
			return;
		}
		if (psvrat->al_atopl.value == NULL || psvrat->al_atopl.value[0] == '\0')
			append_link(&unsetlist, &psvrat->al_link, psvrat);
		else
			append_link(&setlist, &psvrat->al_link, psvrat);

		plist = (struct svrattrl *) GET_NEXT(plist->al_link);
	}

	if (allques)
		pque = (pbs_queue *) GET_NEXT(svr_queues);

	while (pque) {
		/* if the unsetlist has attributes, call server_unset to remove them separately */
		plist = (svrattrl *) GET_NEXT(unsetlist);
		if (plist) {
			rc = mgr_unset_attr(pque->qu_attr, que_attr_idx, que_attr_def, QA_ATR_LAST, plist,
					    preq->rq_perm, &bad, (void *) pque, PARENT_TYPE_QUE_ALL, INDIRECT_RES_CHECK);
			if (rc != 0) {
				reply_badattr(rc, bad, plist, preq);
				free_attrlist(&setlist);
				free_attrlist(&unsetlist);
				return;
			}
		}

		plist = (svrattrl *) GET_NEXT(setlist);
		if (plist) {
			rc = mgr_set_attr(pque->qu_attr, que_attr_idx, que_attr_def, QA_ATR_LAST,
					  plist, preq->rq_perm, &bad, pque, ATR_ACTION_ALTER);
			if (rc != 0) {
				reply_badattr(rc, bad, plist, preq);
				free_attrlist(&setlist);
				free_attrlist(&unsetlist);
				return;
			} else {
				que_save_db(pque);
				mgr_log_attr(msg_man_set, GET_NEXT(preq->rq_ind.rq_manager.rq_attr), PBS_EVENTCLASS_QUEUE, pque->qu_qs.qu_name, NULL);
			}
		}
		if (allques)
			pque = (pbs_queue *) GET_NEXT(pque->qu_link);
		else
			break;
	}
	free_attrlist(&setlist);
	free_attrlist(&unsetlist);

	/* check the appropriateness of the attributes based on queue type */

	if (allques)
		pque = (pbs_queue *) GET_NEXT(svr_queues);
	while (pque) {
		if ((badattr = check_que_attr(pque)) != NULL) {
			(void) sprintf(log_buffer, msg_attrtype, pque->qu_qs.qu_name, badattr);
			(void) reply_text(preq, PBSE_ATTRTYPE, log_buffer);
			return;
		}
		if (allques)
			pque = (pbs_queue *) GET_NEXT(pque->qu_link);
		else
			break;
	}

	reply_ack(preq);
}

/**
 * @brief
 *		Unset (clear)  Queue Attribute Values
 *
 *		Finds the queue, clears the requested attributes and returns a reply
 *
 * @param[in] preq - Pointer to a batch request structure
 */

void
mgr_queue_unset(struct batch_request *preq)
{
	int allques;
	int bad_attr = 0;
	svrattrl *plist;
	pbs_queue *pque;
	char *qname;
	int rc;

	if ((*preq->rq_ind.rq_manager.rq_objname == '\0') ||
	    (*preq->rq_ind.rq_manager.rq_objname == '@')) {
		qname = all_quename;
		allques = 1;
		pque = (pbs_queue *) GET_NEXT(svr_queues);
	} else {
		allques = 0;
		qname = preq->rq_ind.rq_manager.rq_objname;
		pque = find_queuebyname(qname);
	}
	if (pque == NULL) {
		req_reject(PBSE_UNKQUE, 0, preq);
		return;
	}
	log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_QUEUE, LOG_INFO,
		   qname, msg_manager, msg_man_uns, preq->rq_user, preq->rq_host);

	for (plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	     plist != NULL;
	     plist = (svrattrl *) GET_NEXT(plist->al_link)) {
		if (strcmp(plist->al_name, ATTR_qtype) == 0) {
			req_reject(PBSE_NEEDQUET, 0, preq);
			return;
		}
	}

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);

	while (pque) {
		rc = mgr_unset_attr(pque->qu_attr, que_attr_idx, que_attr_def, QA_ATR_LAST,
				    plist, preq->rq_perm, &bad_attr,
				    (void *) pque, PARENT_TYPE_QUE_ALL, INDIRECT_RES_CHECK);
		if (rc != 0) {
			reply_badattr(rc, bad_attr, plist, preq);
			return;
		} else {
			attribute *attr;
			if ((attr = get_qattr(pque, QE_ATR_DefaultChunk))->at_flags & ATR_VFLAG_MODIFY)
				(void) deflt_chunk_action(attr, (void *) pque, ATR_ACTION_ALTER);
			que_save_db(pque);
			mgr_log_attr(msg_man_uns, plist, PBS_EVENTCLASS_QUEUE, pque->qu_qs.qu_name, NULL);
			if (is_qattr_set(pque, QA_ATR_QType) == 0)
				pque->qu_qs.qu_type = QTYPE_Unset;
		}
		if (allques)
			pque = GET_NEXT(pque->qu_link);
		else
			break;
	}
	reply_ack(preq);
}

/**
 * @brief
 *		Set vnode attributes
 *
 * 		Finds the set of vnodes, either one specified, all for a host or all.
 * 		Sets the request attributes on that set.
 * 		returns a reply to the sender of the batch_request
 *
 * 		Note the use of the ':' to indicate a port number as part of a host name
 * 		is purely for internal testing and is not documented externally.
 *
 * @param[in] preq - Pointer to a batch request structure
 *
 * @par MT-safe: No
 */

static void
mgr_node_set(struct batch_request *preq)
{
	extern char *msg_queue_not_in_partition;
	extern char *msg_partition_not_in_queue;
	int bad = 0;
	char hostname[PBS_MAXHOSTNAME + 1];
	int numnodes = 1; /* number of vnodes to be operated on */
	svrattrl *plist, *psvrat;
	pbs_list_head setlist;
	pbs_list_head unsetlist;
	char *nodename;
	mominfo_t *pmom = NULL;
	mom_svrinfo_t *psvrmom = NULL;
	struct pbsnode *pnode;
	int rc;
	int i, j, len;
	int problem_cnt = 0;
	int momidx;
	char *problem_names = NULL;
	struct pbsnode **problem_nodes = NULL;
	static char *warnmsg = NULL;
	struct pbsnode **warn_nodes = NULL;
	int warn_idx = 0;
	int replied = 0; /* boolean */

	nodename = preq->rq_ind.rq_manager.rq_objname;

	if (((*preq->rq_ind.rq_manager.rq_objname == '\0') ||
	     (*preq->rq_ind.rq_manager.rq_objname == '@')) &&
	    (preq->rq_ind.rq_manager.rq_objtype != MGR_OBJ_HOST)) {

		/*
		 * In this instance the set node req is to apply to all
		 * nodes at the local ('\0')  or specified ('@') server
		 */

		if ((pbsndlist != NULL) && svr_totnodes) {
			nodename = all_nodes;
			pnode = pbsndlist[0];
			numnodes = svr_totnodes;
		} else { /* specified server has no nodes in its node table */
			pnode = NULL;
		}

	} else if (preq->rq_ind.rq_manager.rq_objtype == MGR_OBJ_HOST) {
		/* Operating on all vnodes on a named host
		 * if it is the last/only host
		 * find the mom and get the first vnode in her list
		 */

		char *pc;
		unsigned int port = pbs_mom_port;

		pc = strchr(preq->rq_ind.rq_manager.rq_objname, (int) ':');
		if (pc) {
			port = atol(pc + 1);
		}
		if (get_fullhostname(preq->rq_ind.rq_manager.rq_objname,
				     hostname, (sizeof(hostname) - 1)) != 0) {
			req_reject(PBSE_UNKNODE, 0, preq);
			return;
		}
		pmom = find_mom_entry(hostname, port);
		if (pmom) {
			psvrmom = (mom_svrinfo_t *) pmom->mi_data;
			numnodes = psvrmom->msr_numvnds;
			momidx = 0;
			pnode = psvrmom->msr_children[momidx];
		} else {
			/* no such Mom */
			req_reject(PBSE_UNKNODE, 0, preq);
			return;
		}
	} else /* Else one and only one vnode */
		pnode = find_nodebyname(nodename);

	if (pnode == NULL) {
		req_reject(PBSE_UNKNODE, 0, preq);
		return;
	}

	CLEAR_HEAD(setlist);
	CLEAR_HEAD(unsetlist);

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	while (plist) {
		psvrat = dup_svrattrl(plist);
		if (psvrat == NULL) {
			req_reject(PBSE_SYSTEM, 0, preq);
			free_attrlist(&setlist);
			free_attrlist(&unsetlist);
			return;
		}
		if (psvrat->al_atopl.value == NULL || psvrat->al_atopl.value[0] == '\0')
			append_link(&unsetlist, &psvrat->al_link, psvrat);
		else
			append_link(&setlist, &psvrat->al_link, psvrat);

		plist = (struct svrattrl *) GET_NEXT(plist->al_link);
	}

	/* set writtable attributes of node (nodes if numnodes > 1) */
	log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_NODE, LOG_INFO, nodename, msg_manager, msg_man_set, preq->rq_user, preq->rq_host);

	if (numnodes > 1) {
		problem_nodes = (struct pbsnode **) malloc(numnodes * sizeof(struct pbsnode *));
		if (problem_nodes == NULL) {
			log_err(ENOMEM, __func__, "out of memory");
			return;
		}
		problem_cnt = 0;
	}

	warn_idx = 0;
	warn_nodes = (struct pbsnode **) malloc(numnodes * sizeof(struct pbsnode *));
	if (warn_nodes == NULL) {
		log_err(ENOMEM, __func__, "out of memory");
		free(problem_nodes);
		return;
	}
	warnings_update(WARN_ngrp_init, warn_nodes, &warn_idx, pnode);

	i = 0;
	while (pnode) {
		if ((pnode->nd_state & INUSE_DELETED) == 0) {
			for (j = 0; j < 2; j++) {
				rc = 0;
				if (j == 0) {
					plist = (svrattrl *) GET_NEXT(unsetlist);
					if (plist)
						rc = mgr_unset_attr(pnode->nd_attr, node_attr_idx, node_attr_def, ND_ATR_LAST, plist, preq->rq_perm, &bad, (void *) pnode, PARENT_TYPE_NODE, INDIRECT_RES_CHECK);
				} else {
					plist = (svrattrl *) GET_NEXT(setlist);
					if (plist)
						rc = mgr_set_attr(pnode->nd_attr, node_attr_idx, node_attr_def, ND_ATR_LAST, plist, preq->rq_perm | ATR_PERM_ALLOW_INDIRECT, &bad, (void *) pnode, ATR_ACTION_ALTER);
				}
				if (rc != 0) {
					if (numnodes > 1) {
						if (problem_nodes) {
							/*we have an array in which to save*/
							if (problem_nodes[problem_cnt - 1] != pnode) {
								/* and this node was not saved already */
								problem_nodes[problem_cnt] = pnode;
								++problem_cnt;
							}
						}
					} else { /*In the specific node case, reply w/ error and return*/
						switch (rc) {
							case PBSE_INTERNAL:
							case PBSE_SYSTEM:
								req_reject(rc, bad, preq);
								break;

							case PBSE_NOATTR:
							case PBSE_ATTRRO:
							case PBSE_MUTUALEX:
							case PBSE_BADNDATVAL:
							case PBSE_UNKRESC:
								reply_badattr(rc, bad, plist, preq);
								break;
							case PBSE_QUE_NOT_IN_PARTITION:
								(void) snprintf(log_buffer, LOG_BUF_SIZE, msg_queue_not_in_partition,
										get_nattr_str(pnode, ND_ATR_Queue));
								log_err(-1, __func__, log_buffer);
								reply_text(preq, PBSE_QUE_NOT_IN_PARTITION, log_buffer);
								break;
							case PBSE_PARTITION_NOT_IN_QUE:
								(void) snprintf(log_buffer, LOG_BUF_SIZE, msg_partition_not_in_queue,
										get_nattr_str(pnode, ND_ATR_partition));
								log_err(-1, __func__, log_buffer);
								reply_text(preq, PBSE_PARTITION_NOT_IN_QUE, log_buffer);
								break;

							default:
								req_reject(rc, 0, preq);
						}
						free(warn_nodes);
						free_attrlist(&unsetlist);
						free_attrlist(&setlist);
						return;
					}
				} else if (plist) { /*modifications succeed for this node*/
					warnings_update(WARN_ngrp, warn_nodes, &warn_idx, pnode);

					if ((pnode->nd_nsnfree == 0) && (pnode->nd_state == 0))
						set_vnode_state(pnode, INUSE_JOB, Nd_State_Or);

					mgr_log_attr(msg_man_set, GET_NEXT(preq->rq_ind.rq_manager.rq_attr), PBS_EVENTCLASS_NODE, pnode->nd_name, NULL);
					pnode->nd_modified = 1;
				}
			}
		}
		if (numnodes == 1)
			break; /* just the one vnode */
		else if (preq->rq_ind.rq_manager.rq_objtype == MGR_OBJ_HOST) {
			int update_mom_only = 0;

			/* next vnode under the Mom */
			if (++momidx >= psvrmom->msr_numvnds)
				break; /* all down */
			pnode = psvrmom->msr_children[momidx];
			if ((strcmp(plist->al_name, ATTR_NODE_state) == 0) && (plist->al_op == INCR)) {
				/* Marking nodes offline.  We should only mark the children vnodes
				 * as offline if no other mom that reports the vnodes are up.
				 */
				if (pnode->nd_nummoms > 1) {
					int imom;
					for (imom = 0; imom < pnode->nd_nummoms; ++imom) {
						unsigned long mstate;
						mstate = pnode->nd_moms[imom]->mi_dmn_info->dmn_state;
						if ((mstate & (INUSE_DOWN | INUSE_OFFLINE)) == 0) {
							/* If another mom is up (i.e. not down or offline)
							 * then do not set the vnode state of the children
							 */
							update_mom_only = 1;
							break; /* found at least one mom that's up, we can stop now */
						}
					}
				}
			}
			if (update_mom_only)
				break; /* all done */
		} else {
			if (++i == svr_totnodes)
				break;	      /* all done */
			pnode = pbsndlist[i]; /* next vnode in array */
		}
	} /*bottom of the while()*/

	free_attrlist(&setlist);
	free_attrlist(&unsetlist);

	warnmsg = warn_msg_build(WARN_ngrp, warn_nodes, warn_idx);

	save_nodes_db(0, NULL);

	if (numnodes > 1) { /*modification was for multiple vnodes  */

		if (problem_cnt) { /*one or more problems encountered*/

			for (len = 0, i = 0; i < problem_cnt; i++)
				len += strlen(problem_nodes[i]->nd_name) + 3;

			if (warnmsg == NULL)
				len += strlen(pbse_to_txt(PBSE_GMODERR));
			else
				len += strlen(pbse_to_txt(PBSE_GMODERR)) + strlen(warnmsg);

			if ((problem_names = malloc(len)) != NULL) {

				strcpy(problem_names, pbse_to_txt(PBSE_GMODERR));
				for (i = 0; i < problem_cnt; i++) {
					if (i)
						strcat(problem_names, ", ");
					strcat(problem_names, problem_nodes[i]->nd_name);
				}
				if (warnmsg != NULL)
					strcat(problem_names, warnmsg);

				(void) reply_text(preq, PBSE_GMODERR, problem_names);
				free(problem_names);
				problem_names = NULL;
				replied = 1;
			} else {
				(void) reply_text(preq, PBSE_GMODERR, pbse_to_txt(PBSE_GMODERR));
				replied = 1;
			}
		}
	}

	if (replied == 0) {

		if (warnmsg) {
			(void) reply_text(preq, PBSE_NONE, warnmsg);
			free(warnmsg);
			warnmsg = NULL;
		} else
			reply_ack(preq);
	}

	free(problem_nodes);
	free(warn_nodes);
}

/**
 * @brief
 *		Unset node attributes
 *
 * 		Finds the node, unsets the attributes and
 * 		returns a reply to the sender of the batch_request
 *
 * @note
 * 		Note, the attrbibutes of "state" and "ntype" cannot be unset.
 *
 *  @param[in]	preq	- Pointer to a batch request structure
 *
 *  @par MT-safe: No
 */

void
mgr_node_unset(struct batch_request *preq)

{
	int bad = 0;
	char hostname[PBS_MAXHOSTNAME + 1];
	int numnodes = 1; /* number of vnode to operate */
	svrattrl *plist;
	char *nodename;
	mominfo_t *pmom = NULL;
	mom_svrinfo_t *psvrmom = NULL;
	struct pbsnode *pnode;
	int rc;
	int unset_que = 0;
	int i, len;
	int problem_cnt = 0;
	int momidx;
	char *problem_names;
	struct pbsnode **problem_nodes = NULL;
	static char *warnmsg = NULL;
	struct pbsnode **warn_nodes = NULL;
	int warn_idx = 0;
	int replied = 0; /* boolean */
	attribute *patr;
	resource_def *prd;
	resource *prc;
	static char *astate = ATTR_NODE_state;
	static char *antype = ATTR_NODE_ntype;
	static char *ra = ATTR_rescavail;

	nodename = preq->rq_ind.rq_manager.rq_objname;

	if (preq->rq_ind.rq_manager.rq_objtype == MGR_OBJ_HOST) {
		/* Operating on all vnodes on a named host          */
		/* find the mom and get the first vnode in her list */
		char *pc;
		unsigned int port = pbs_mom_port;

		pc = strchr(preq->rq_ind.rq_manager.rq_objname, (int) ':');
		if (pc) {
			port = atol(pc + 1);
		}
		if (get_fullhostname(preq->rq_ind.rq_manager.rq_objname,
				     hostname, (sizeof(hostname) - 1)) != 0) {
			req_reject(PBSE_UNKNODE, 0, preq);
			return;
		}
		pmom = find_mom_entry(hostname, port);
		if (pmom) {
			/* found mom, set number of and first vnode */
			psvrmom = (mom_svrinfo_t *) pmom->mi_data;
			numnodes = psvrmom->msr_numvnds;
			momidx = 0;
			pnode = psvrmom->msr_children[momidx];
		} else {
			/* no such Mom */
			req_reject(PBSE_UNKNODE, 0, preq);
			return;
		}

	} else if ((*preq->rq_ind.rq_manager.rq_objname == '\0') ||
		   (*preq->rq_ind.rq_manager.rq_objname == '@')) {

		/*In this instance the set node req is to apply to all */
		/*nodes at the local ('\0')  or specified ('@') server */

		if ((pbsndlist != NULL) && svr_totnodes) {
			nodename = all_nodes;
			pnode = pbsndlist[0];
			numnodes = svr_totnodes;
		} else { /* specified server has no nodes in its node table */
			pnode = NULL;
		}

	} else {
		pnode = find_nodebyname(nodename);
	}

	if (pnode == NULL) {
		req_reject(PBSE_UNKNODE, 0, preq);
		return;
	}

	/* check attributes being unset */

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	while (plist) {
		bad++;

		/* check that state, vnode_pool, ntype and	*/
		/* resources_available.host are not being unset	*/
		if ((strcasecmp(plist->al_name, astate) == 0) ||
		    (strcasecmp(plist->al_name, antype) == 0) ||
		    (strcasecmp(plist->al_name, ATTR_NODE_VnodePool) == 0) ||
		    ((strcasecmp(plist->al_name, ra) == 0) &&
		     ((plist->al_resc == NULL) ||
		      (strcasecmp(plist->al_resc, "host") == 0)))) {
			reply_badattr(PBSE_BADNDATVAL, bad, plist, preq);
			return;
		}

		/* is "queue" being unset */
		if (strcasecmp(plist->al_name, "queue") == 0)
			unset_que = 1;

		plist = (struct svrattrl *) GET_NEXT(plist->al_link);
	}
	bad = 0;

	/* unset writtable attributes of node (nodes if numnodes > 1) */

	log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_NODE, LOG_INFO,
		   nodename, msg_manager, msg_man_uns,
		   preq->rq_user, preq->rq_host);

	if (numnodes > 1) {
		problem_nodes = (struct pbsnode **) malloc(numnodes * sizeof(struct pbsnode *));
		if (problem_nodes == NULL) {
			log_err(ENOMEM, __func__, "out of memory");
			return;
		}
		problem_cnt = 0;
	}

	warn_idx = 0;
	warn_nodes = (struct pbsnode **) malloc(numnodes * sizeof(struct pbsnode *));
	if (warn_nodes == NULL) {
		log_err(ENOMEM, __func__, "out of memory");
		free(problem_nodes);
		return;
	}
	warnings_update(WARN_ngrp_init, warn_nodes, &warn_idx, pnode);

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	i = 0;
	while (pnode) {
		if ((pnode->nd_state & INUSE_DELETED) == 0) {
			/*
			 * The unset operation requires us to note before hand
			 * whether we have the grouping resource on this node
			 * as we loose that information if unset succeeds.
			 */

			warnings_update(WARN_ngrp_ck, warn_nodes, &warn_idx, pnode);

			rc = mgr_unset_attr(pnode->nd_attr, node_attr_idx, node_attr_def, ND_ATR_LAST,
					    plist, preq->rq_perm, &bad, (void *) pnode,
					    PARENT_TYPE_NODE, INDIRECT_RES_CHECK);
			if (rc != 0) {

				if (numnodes > 1) {
					if (problem_nodes) {
						/*we have an array in which to save*/
						problem_nodes[problem_cnt] = pnode;
						++problem_cnt;
					}

				} else { /*In the specific node case, reply w/ error and return*/
					switch (rc) {
						case PBSE_INTERNAL:
						case PBSE_SYSTEM:
							req_reject(rc, bad, preq);
							break;

						case PBSE_NOATTR:
						case PBSE_ATTRRO:
						case PBSE_MUTUALEX:
						case PBSE_BADNDATVAL:
						case PBSE_UNKRESC:
							reply_badattr(rc, bad, plist, preq);
							break;

						default:
							req_reject(rc, 0, preq);
					}
					free(warn_nodes);
					return;
				}

			} else { /*modifications succeed for this node*/

				warnings_update(WARN_ngrp, warn_nodes, &warn_idx, pnode);

				/* if queue unset, clear pointer to queue struct */
				if (unset_que) {
					pnode->nd_pque = NULL;
					mark_which_queues_have_nodes();
				}

				/* if resources_avail.ncpus unset, reset to default */
				patr = get_nattr(pnode, ND_ATR_ResourceAvail);
				prd = &svr_resc_def[RESC_NCPUS];
				prc = find_resc_entry(patr, prd);
				if (prc == NULL)
					prc = add_resource_entry(patr, prd);
				if (!is_attr_set(&prc->rs_value)) {
					prc->rs_value.at_val.at_long = pnode->nd_ncpus;
					prc->rs_value.at_flags |= ATR_VFLAG_DEFLT | ATR_SET_MOD_MCACHE;
				}

				/* If the Mom attribute is unset, reset to default */
				if (is_nattr_set(pnode, ND_ATR_Mom) == 0) {
					attribute tmp;
					if (get_fullhostname(pnode->nd_name, hostname,
							     (sizeof(hostname) - 1)) != 0) {
						strncpy(hostname, pnode->nd_name, (sizeof(hostname) - 1));
					}
					clear_attr(&tmp, &node_attr_def[(int) ND_ATR_Mom]);
					rc = decode_arst(&tmp, ATTR_NODE_Mom, NULL, hostname);
					if (rc == 0) {
						set_arst(get_nattr(pnode, ND_ATR_Mom), &tmp, INCR);
						(get_nattr(pnode, ND_ATR_Mom))->at_flags |= ATR_VFLAG_DEFLT;
						free_arst(&tmp);
					}
				}

				pnode->nd_modified = 1;
				mgr_log_attr(msg_man_set, plist, PBS_EVENTCLASS_NODE, pnode->nd_name, NULL);
			}
		}
		if (numnodes == 1)
			break;
		if (preq->rq_ind.rq_manager.rq_objtype == MGR_OBJ_HOST) {
			if (++momidx >= psvrmom->msr_numvnds)
				break;
			pnode = psvrmom->msr_children[momidx];
		} else {
			if (++i == svr_totnodes)
				break;
			pnode = pbsndlist[i];
		}
	} /* bottom of the while() */

	warnmsg = warn_msg_build(WARN_ngrp, warn_nodes, warn_idx);

	save_nodes_db(0, NULL);

	if (numnodes > 1) { /*modification was for all nodes  */

		if (problem_cnt) { /*one or more problems encountered*/

			for (len = 0, i = 0; i < problem_cnt; i++)
				len += strlen(problem_nodes[i]->nd_name) + 3;

			if (warnmsg == NULL)
				len += strlen(pbse_to_txt(PBSE_GMODERR));
			else
				len += strlen(pbse_to_txt(PBSE_GMODERR)) + strlen(warnmsg);

			if ((problem_names = malloc(len)) != NULL) {

				strcpy(problem_names, pbse_to_txt(PBSE_GMODERR));
				for (i = 0; i < problem_cnt; i++) {
					if (i)
						strcat(problem_names, ", ");
					strcat(problem_names, problem_nodes[i]->nd_name);
				}
				if (warnmsg != NULL)
					strcat(problem_names, warnmsg);

				(void) reply_text(preq, PBSE_GMODERR, problem_names);
				free(problem_names);
				problem_names = NULL;
				replied = 1;
			} else {
				(void) reply_text(preq, PBSE_GMODERR, pbse_to_txt(PBSE_GMODERR));
				replied = 1;
			}
		}
	}

	if (replied == 0) {

		if (warnmsg) {
			(void) reply_text(preq, PBSE_NONE, warnmsg);
			free(warnmsg);
		} else
			reply_ack(preq);
	}

	free(problem_nodes);
	free(warn_nodes);
}

/**
 * @brief
 * 		make_host_addresses_list - return a null terminated list of all of the
 *		IP addresses of the named host (phost)
 *
 * @param[in]	phost	- named host
 * @param[in]	pul	- ptr to null terminated address list is returned in *pul
 *
 * @return	error code
 * @retval	0	- no error
 * @retval	PBS error	- error
 */

int
make_host_addresses_list(char *phost, u_long **pul)
{
	int i;
	int err;
	struct pul_store *tpul = NULL;
	int len;
	struct addrinfo *aip, *pai;
	struct addrinfo hints;
	struct sockaddr_in *inp;

	if ((phost == 0) || (*phost == '\0'))
		return (PBSE_SYSTEM);

	/* search for the address list in the address list index
	 * so that we do not hit NS for everything
	 */
	if (hostaddr_idx != NULL) {
		if (pbs_idx_find(hostaddr_idx, (void **) &phost, (void **) &tpul, NULL) == PBS_IDX_RET_OK) {
			*pul = (u_long *) malloc(tpul->len);
			if (!*pul) {
				strcat(log_buffer, "out of memory ");
				return (PBSE_SYSTEM);
			}
			memmove(*pul, tpul->pul, tpul->len);
			return 0;
		}
	}

	memset(&hints, 0, sizeof(struct addrinfo));
	/*
	 *      Why do we use AF_UNSPEC rather than AF_INET?  Some
	 *      implementations of getaddrinfo() will take an IPv6
	 *      address and map it to an IPv4 one if we ask for AF_INET
	 *      only.  We don't want that - we want only the addresses
	 *      that are genuinely, natively, IPv4 so we start with
	 *      AF_UNSPEC and filter ai_family below.
	 */
	hints.ai_family = AF_UNSPEC;
	hints.ai_socktype = SOCK_STREAM;
	hints.ai_protocol = IPPROTO_TCP;
	if ((err = getaddrinfo(phost, NULL, &hints, &pai)) != 0) {
		sprintf(log_buffer,
			"addr not found for %s h_errno=%d errno=%d",
			phost, err, errno);
		return (PBSE_UNKNODE);
	}

	i = 0;
	for (aip = pai; aip != NULL; aip = aip->ai_next) {
		/* skip non-IPv4 addresses */
		if (aip->ai_family == AF_INET)
			i++;
	}

	/* null end it */
	len = sizeof(u_long) * (i + 1);
	*pul = (u_long *) malloc(len);
	if (*pul == NULL) {
		strcat(log_buffer, "Out of memory ");
		return (PBSE_SYSTEM);
	}

	i = 0;
	for (aip = pai; aip != NULL; aip = aip->ai_next) {
		if (aip->ai_family == AF_INET) {
			u_long ipaddr;

			inp = (struct sockaddr_in *) aip->ai_addr;
			ipaddr = ntohl(inp->sin_addr.s_addr);
			(*pul)[i] = ipaddr;
			i++;
		}
	}
	(*pul)[i] = 0; /* null term array ip adrs */

	freeaddrinfo(pai);

	tpul = malloc(sizeof(struct pul_store));
	if (!tpul) {
		strcat(log_buffer, "out of  memory");
		free(*pul);
		return (PBSE_SYSTEM);
	}
	tpul->len = len;
	tpul->pul = (u_long *) malloc(tpul->len);
	if (!tpul->pul) {
		free(tpul);
		free(*pul);
		strcat(log_buffer, "out of  memory");
		return (PBSE_SYSTEM);
	}
	memmove(tpul->pul, *pul, tpul->len);

	if (hostaddr_idx == NULL) {
		if ((hostaddr_idx = pbs_idx_create(0, 0)) == NULL) {
			free(tpul->pul);
			free(tpul);
			free(*pul);
			strcat(log_buffer, "out of  memory");
			return (PBSE_SYSTEM);
		}
	}
	if (pbs_idx_insert(hostaddr_idx, phost, tpul) != PBS_IDX_RET_OK) {
		free(tpul->pul);
		free(tpul);
		free(*pul);
		return (PBSE_SYSTEM);
	}
	return 0;
}

/**
 * @brief
 * 		remove the cached ip addresses of a mom from the host index and the ipaddrs index
 *
 * @param[in]	pmom - valid ptr to the mom info
 *
 * @return	error code
 * @retval	0		- no error
 * @retval	PBS error	- error
 */
int
remove_mom_ipaddresses_list(mominfo_t *pmom)
{
	/* take ipaddrs from ipaddrs cache index */
	if (hostaddr_idx != NULL) {
		struct pul_store *tpul = NULL;
		void *phost = &pmom->mi_host;

		if (pbs_idx_find(hostaddr_idx, &phost, (void **) &tpul, NULL) == PBS_IDX_RET_OK) {
			u_long *pul;
			for (pul = tpul->pul; *pul; pul++)
				tdelete2(*pul, pmom->mi_port, &ipaddrs);

			if (pbs_idx_delete(hostaddr_idx, pmom->mi_host) != PBS_IDX_RET_OK)
				return (PBSE_SYSTEM);

			free(tpul->pul);
			free(tpul);
		}
	}
	return 0;
}

/**
 * @brief
 *		create pbs node structure, i.e. add a node
 *
 * @param[in]	objname	- Name of node
 * @param[in]	plist 	- list of attributes to set on node
 * @param[out]	bad 	- Return the index of a bad attribute
 * @param[out]	rtnpnode	- pointer to created node structure
 * @param[in]	nodup 	- TRUE  - means duplicated name not allowed (qmgr create)
 *			 				FALSE - dups are allowed (same vnode under multi Moms)
 * @param[in]	allow_unkresc	- TRUE - allow node to have unknown resources
 * 									FALSE - do not allow unknown resources.
 *
 * @return Error code
 * @retval - 0 - Success
 * @retval - pbs_errno - Failure code
 *
 */

int
create_pbs_node2(char *objname, svrattrl *plist, int perms, int *bad, struct pbsnode **rtnpnode, int nodup, int allow_unkresc)
{
	struct pbsnode *pnode;
	struct pbsnode **tmpndlist;
	int ntype; /* node type, always PBS */
	char *pc;
	char *phost;	    /* trial host name */
	char *pname;	    /* node name w/o any :ts       */
	u_long *pul = NULL; /* 0 terminated host adrs array*/
	int rc;
	int j;
	int iht;
	attribute *pattr;
	svrattrl *plx;
	mominfo_t *pmom;
	mom_svrinfo_t *smp;
	resource_def *prdef;
	resource *presc;
	char realfirsthost[PBS_MAXHOSTNAME + 1];
	int ret;

	if (rtnpnode != NULL)
		*rtnpnode = NULL;

	ret = PBSE_NONE;

	/* change "Host" attrribute into Mom */
	/* a carry over from the past	     */
	plx = plist;
	while (plx) {
		if (strcasecmp(plx->al_name, "Host") == 0) {
			/* this only works becase strlen(Mom) < strlen(Host) */
			strcpy(plx->al_name, "Mom");
			break;
		}
		plx = (svrattrl *) GET_NEXT(plx->al_link);
	}

	rc = process_host_name_part(objname, plist, &pname, &ntype);
	if (rc)
		return (rc);

	if ((pnode = find_nodebyname(pname)) == NULL) {

		/* need to create the pbs_node entry */

		pnode = (struct pbsnode *) malloc(sizeof(struct pbsnode));
		if (pnode == NULL) {
			free(pname);
			return (PBSE_SYSTEM);
		}

		/* expand pbsndlist array exactly svr_totnodes long*/
		tmpndlist = (struct pbsnode **) realloc(pbsndlist,
							sizeof(struct pbsnode *) * (svr_totnodes + 1));

		if (tmpndlist != NULL) {
			/*add in the new entry etc*/
			pbsndlist = tmpndlist;
			/* nd_index = 0 (regular node, single parent mom), nd_index = 1 (multiple parent moms, usually Cray) */
			pnode->nd_index = 0;
			pnode->nd_arr_index = svr_totnodes; /* this is only in mem, not from db */
			pbsndlist[svr_totnodes++] = pnode;
		} else {
			free(pname);
			free_pnode(pnode);
			return (PBSE_SYSTEM);
		}
		if (initialize_pbsnode(pnode, pname, ntype) != PBSE_NONE) {
			svr_totnodes--;
			free_pnode(pnode);
			return (PBSE_SYSTEM);
		}

		/* create and initialize the first subnode to go with */
		/* the parent node */
		if (create_subnode(pnode, NULL) == NULL) {
			svr_totnodes--;
			free_pnode(pnode);
			return (PBSE_SYSTEM);
		}

		/* create node index if not already done */
		if (node_idx == NULL) {
			if ((node_idx = pbs_idx_create(0, 0)) == NULL) {
				svr_totnodes--;
				free_pnode(pnode);
				return (PBSE_SYSTEM);
			}
		}

		/* add to node to index */
		if (pbs_idx_insert(node_idx, pname, pnode) != PBS_IDX_RET_OK) {
			svr_totnodes--;
			free_pnode(pnode);
			return (PBSE_SYSTEM);
		}
	} else if (nodup == TRUE) {
		/* duplicating/modifying vnode by qmgr is not allowed */
		/* as what qmgr creates is the natural vnode          */
		free(pname);
		return (PBSE_NODEEXIST);
	}

	/*
	 * Make sure Mom attribute is or will be  set.
	 * Action functions in mgr_set_attr expect it to be set
	 *
	 * If it is specified in the provided attrl list,  turn the
	 * operation into a INCR to add any unique host names to those
	 * already there.
	 *
	 * If it isn't specified in the attrl, use the node name.
	 */
	plx = plist;
	while (plx) {
		if (strcasecmp(plx->al_name, ATTR_NODE_Mom) == 0) {
			break;
		}
		plx = (svrattrl *) GET_NEXT(plx->al_link);
	}

	if (plx) {
		plx->al_op = INCR;
	} else if (is_nattr_set(pnode, ND_ATR_Mom) == 0) {
		rc = set_nattr_str_slim(pnode, ND_ATR_Mom, pname, NULL);
		if (rc != PBSE_NONE) {
			effective_node_delete(pnode);
			return (rc);
		}
	}

	/* Make sure Port is or will be set */
	plx = plist;
	while (plx) {
		if (strcasecmp(plx->al_name, ATTR_NODE_Port) == 0) {
			break;
		}
		plx = (svrattrl *) GET_NEXT(plx->al_link);
	}
	if ((plx == NULL) && (is_nattr_set(pnode, ND_ATR_Port) == 0))
		set_nattr_l_slim(pnode, ND_ATR_Port, pbs_mom_port, SET);

	/* OK, set the attributes specified */

	rc = mgr_set_attr2(pnode->nd_attr, node_attr_idx, node_attr_def, ND_ATR_LAST,
			   plist, perms | ATR_PERM_ALLOW_INDIRECT, bad,
			   (void *) pnode, ATR_ACTION_NEW, allow_unkresc);

	if (rc != 0) {
		/*
		 * If an attribute could not be resolved, do not delete node
		 * from database, for other errors go ahead and delete node.
		 */
		if (rc != PBSE_UNKRESC)
			effective_node_delete(pnode);

		return (rc);
	}

	/*
	 * Ensure "resources_available.host=HOSTNAME" is in the resource list.
	 * Use the first hostname provided, and determine whether it is a
	 * hostname or IP address. If it is truly a host name, use the short
	 * form by truncating it at the first '.' character. If it is an IP
	 * address, use the entire string. Note that RFC 1123 allows
	 * hostnames to start with a number, so do not simply key off of the
	 * first character.
	 */

	pattr = get_nattr(pnode, ND_ATR_ResourceAvail);

	prdef = &svr_resc_def[RESC_HOST];
	presc = find_resc_entry(pattr, prdef);
	if (presc == NULL) {
		/* add the entry */
		presc = add_resource_entry(pattr, prdef);
		if (presc) {
			struct sockaddr_in sa4;
			struct sockaddr_in6 sa6;

			strncpy(realfirsthost, (get_nattr_arst(pnode, ND_ATR_Mom))->as_string[0], (sizeof(realfirsthost) - 1));
			realfirsthost[PBS_MAXHOSTNAME] = '\0';

			if ((inet_pton(AF_INET, realfirsthost, &(sa4.sin_addr)) != 1) &&
			    (inet_pton(AF_INET6, realfirsthost, &(sa6.sin6_addr)) != 1)) {
				/* Not an IPv4 or IPv6 address, truncate it. */
				pc = strchr(realfirsthost, '.');
				if (pc)
					*pc = '\0';
			}
			rc = prdef->rs_decode(&presc->rs_value, "", "host", realfirsthost);
			presc->rs_value.at_flags |= ATR_VFLAG_DEFLT; /* so not written to nodes file */
		} else {
			rc = PBSE_SYSTEM;
		}
	}

	if (rc != 0) {
		effective_node_delete(pnode);
		return (rc);
	}

	pnode->nd_hostname = strdup(presc->rs_value.at_val.at_str);
	if (pnode->nd_hostname == NULL) {
		effective_node_delete(pnode);
		return (PBSE_SYSTEM);
	}
	prdef = &svr_resc_def[RESC_VNODE];
	presc = find_resc_entry(pattr, prdef);
	if (presc == NULL)
		presc = add_resource_entry(pattr, prdef); /* add the entry */

	if (presc) {
		rc = prdef->rs_decode(&presc->rs_value, NULL, NULL, objname);
		presc->rs_value.at_flags |= ATR_VFLAG_DEFLT; /* so not written to nodes file */

	} else {
		rc = PBSE_SYSTEM;
	}

	if (rc != 0) {
		effective_node_delete(pnode);
		return (rc);
	}

	/*
	 * Now we need to create the Mom structure for each Mom who is a
	 * parent of this (v)node.
	 * The Mom structure may already exist
	 */

	pattr = get_nattr(pnode, ND_ATR_Mom);
	for (iht = 0; iht < pattr->at_val.at_arst->as_usedptr; ++iht) {
		unsigned int nport;

		phost = pattr->at_val.at_arst->as_string[iht];

		if ((rc = make_host_addresses_list(phost, &pul))) {
			log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_NODE, LOG_INFO, pnode->nd_name, log_buffer);

			/* special case for unresolved nodes in case of server startup */
			if (rc == PBSE_UNKNODE && get_sattr_long(SVR_ATR_State) == SV_STATE_INIT) {
				/*
				 * mark node as INUSE_UNRESOLVABLE, pbsnodes will show unresolvable state
				 */
				set_vnode_state(pnode, INUSE_UNRESOLVABLE | INUSE_DOWN, Nd_State_Set);

				/*
				 * make_host_addresses_list failed, so pul was not allocated
				 * Since we are going ahead nevertheless, we need to allocate
				 * an "empty" pul list
				 */
				free(pul);
				pul = malloc(sizeof(u_long) * (1));
				pul[0] = 0;
				ret = PBSE_UNKNODE; /* set return of function to this, so that error is logged */
			} else {
				free(pul);
				effective_node_delete(pnode);
				return (rc); /* return the error code from make_host_addresses_list */
			}
		}

		/*
		 * Note, once create_svrmom_entry() is called, it has the
		 * responsibility for "pul" including freeing it if need be.
		 */

		nport = get_nattr_long(pnode, ND_ATR_Port);

		if ((pmom = create_svrmom_entry(phost, nport, pul)) == NULL) {
			free(pul);
			effective_node_delete(pnode);
			return (PBSE_SYSTEM);
		}

		if (!pbs_iplist) {
			pbs_iplist = create_pbs_iplist();
			if (!pbs_iplist) {
				return (PBSE_SYSTEM); /* No Memory */
			}
		}
		smp = (mom_svrinfo_t *) (pmom->mi_data);
		for (j = 0; pmom->mi_dmn_info->dmn_addrs[j]; j++) {
			u_long ipaddr = pmom->mi_dmn_info->dmn_addrs[j];
			if (insert_iplist_element(pbs_iplist, ipaddr)) {
				delete_pbs_iplist(pbs_iplist);
				return (PBSE_SYSTEM); /* No Memory */
			}
		}

		/* cross link the vnode (pnode) and its Mom (pmom) */

		if ((rc = cross_link_mom_vnode(pnode, pmom)) != 0)
			return (rc);

		/* If this is the "natural vnode" (i.e. 0th entry) */
		if (pnode->nd_nummoms == 1) {
			if (is_nattr_set(pnode, ND_ATR_vnode_pool) && get_nattr_long(pnode, ND_ATR_vnode_pool) > 0)
				smp->msr_vnode_pool = get_nattr_long(pnode, ND_ATR_vnode_pool);
		}
	}

	pnode->nd_modified = 1;
	if (rtnpnode != NULL)
		*rtnpnode = pnode;
	return (ret); /*create completely successful*/
}

/**
 * @brief
 * 		Wrapper function to create_pbs_node() but without the 'allow_unkresc'
 * 		parameter.
 */
int
create_pbs_node(char *objname, svrattrl *plist, int perms, int *bad, struct pbsnode **rtnpnode, int nodup)
{
	return (create_pbs_node2(objname, plist, perms, bad, rtnpnode, nodup, FALSE));
}

/**
 * @brief
 * 		check_sister_vnodes_for_delete	- check for sister vnodes which are eligible for delete.
 * 		returns if nd_summons is equal to one for the array of supporing vnodes.
 *
 * @param[in]	psvrmom	- mom_svrinfo structure of the mom which needs to be checked
 *
 * @return	return code
 * @retval	0	- no node to delete.
 * @retval	1	- there are nodes to be deleted
 */
static int
check_sister_vnodes_for_delete(mom_svrinfo_t *psvrmom)
{
	int ct = 0;
	int i;

	if (psvrmom->msr_numvnds <= 1)
		return ct;
	for (i = 1; i < psvrmom->msr_numvnds; ++i) {
		if (psvrmom->msr_children[i]->nd_nummoms == 1) {
			++ct;
			break;
		}
	}
	return (ct);
}

/**
 *  @brief delete a scheduler object
 *
 *  @param[in] preq - Pointer to a batch request structure
 *
 */
static void
mgr_sched_delete(struct batch_request *preq)
{
	pbs_sched *psched;
	pbs_sched *tmpsched;

	if ((*preq->rq_ind.rq_manager.rq_objname == '\0') || (*preq->rq_ind.rq_manager.rq_objname == '@')) {
		/* Delete all except default */
		psched = (pbs_sched *) GET_NEXT(svr_allscheds);
		while (psched != NULL) {
			tmpsched = psched;
			psched = (pbs_sched *) GET_NEXT(psched->sc_link);
			if (tmpsched != dflt_scheduler) {
				if (sched_delete(tmpsched) == PBSE_OBJBUSY) {
					snprintf(log_buffer, sizeof(log_buffer),
						 "Scheduler %s is busy", tmpsched->sc_name);
					log_err(PBSE_OBJBUSY, __func__, log_buffer);
				}
			}
		}
	} else {
		psched = find_sched(preq->rq_ind.rq_manager.rq_objname);
		if (!psched) {
			req_reject(PBSE_UNKSCHED, 0, preq);
			return;
		} else if (psched == dflt_scheduler) {
			req_reject(PBSE_SCHED_NO_DEL, 0, preq);
			return;
		}

		if (sched_delete(psched) == PBSE_OBJBUSY) {
			snprintf(log_buffer, sizeof(log_buffer),
				 "Scheduler %s is busy", psched->sc_name);
			log_err(PBSE_OBJBUSY, __func__, log_buffer);
			(void) reply_text(preq, PBSE_OBJBUSY, preq->rq_ind.rq_manager.rq_objname);
			return;
		}
	}
	reply_ack(preq); /*request completely successful*/
	return;
}

/*
 * mgr_node_delete - mark a node (or all nodes) in the server's node list
 *                   as being "deleted".  It (they) will no longer get
 *                   assigned to a job, will no longer be pinged, and
 *                   any current job tasks will continue until they end,
 *                   abort, or are killed.
 *
 *  @param[in] preq - Pointer to a batch request structure
 *
 */

static void
	mgr_node_delete(preq) struct batch_request *preq;
{
	int numnodes = 1;
	struct pbsnode *pnode;
	struct pbssubn *psub;
	char *nodename;
	int rc;

	int i, len;
	int n;
	int problem_cnt = 0;
	char *problem_names;
	struct pbsnode **problem_nodes = NULL;
	svrattrl *plist;
	mom_svrinfo_t *psvrmom;

	nodename = preq->rq_ind.rq_manager.rq_objname;

	if ((*preq->rq_ind.rq_manager.rq_objname == '\0') ||
	    (*preq->rq_ind.rq_manager.rq_objname == '@')) {

		/*In this instance the delete node req is to apply to all */
		/*nodes at the local ('\0')  or specified ('@') server */

		if ((pbsndlist != NULL) && svr_totnodes) {
			nodename = all_nodes;
			pnode = *pbsndlist;
			numnodes = svr_totnodes;
		} else { /* specified server has no nodes in its node table */
			pnode = NULL;
		}

	} else {
		pnode = find_nodebyname(nodename);
	}

	if (pnode == NULL) {
		req_reject(PBSE_UNKNODE, 0, preq);
		return;
	}

	/* If node being deleted is linked to any queue, clear "has node" flag for that queue */
	if (pnode->nd_pque != NULL) {
		set_qattr_l_slim(pnode->nd_pque, QE_ATR_HasNodes, 0, SET);
		ATR_UNSET(get_qattr(pnode->nd_pque, QE_ATR_HasNodes));
	}

	log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_NODE, LOG_INFO,
		   nodename, msg_manager, msg_man_del,
		   preq->rq_user, preq->rq_host);

	/*if doing many and problem arises with some, record them for report*/
	/*the array of "problem nodes" sees no use now and may never see use*/
	if (numnodes > 1) {
		pnode = pbsndlist[0];
		problem_nodes = (struct pbsnode **) malloc(svr_totnodes * sizeof(struct pbsnode *));
		if (problem_nodes == NULL) {
			log_err(ENOMEM, __func__, "out of memory");
			return;
		}
		problem_cnt = 0;
	}

	/*set "deleted" bit in node's (nodes, numnodes == 1) "inuse" field*/
	/*remove entire prop list, including the node name, from the node */
	/*remove the IP address array hanging from the node               */

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	for (i = 0; i < svr_totnodes; i++) {
		if (numnodes > 1)
			pnode = pbsndlist[i];

		rc = 0;

		if (pnode->nd_state & INUSE_DELETED)
			continue; /* already deleted */
		if (pnode->nd_state & INUSE_PROV)
			rc = PBSE_NODEPROV_NODEL;
		else {
			if (find_vnode_in_resvs(pnode, Skip_Degraded_Time) != NULL)
				rc = PBSE_OBJBUSY;
			for (psub = pnode->nd_psn; psub != 0; psub = psub->next) {
				if (psub->jobs)
					rc = PBSE_OBJBUSY;
			}
		}

		if (pnode->nd_nummoms == 1) {
			psvrmom = pnode->nd_moms[0]->mi_data;
			if ((numnodes == 1) &&
			    (psvrmom->msr_numvnds > 1) &&
			    (psvrmom->msr_children[0] == pnode)) {
				/*
				 * this is the "natural" vnode for a Mom with
				 * multiple vnodes.  If there running jobs, or the
				 * other vnodes under her do not have another
				 * Mom, we cannot delete this vnode (and the Mom)
				 * unless we are deleteing all vnodes.
				 */
				if (psvrmom->msr_numjobs > 0) {
					rc = PBSE_OBJBUSY;
				} else {
					n = check_sister_vnodes_for_delete(psvrmom);
					if (n > 0)
						rc = PBSE_OBJBUSY;
				}
			}
		}

		if (rc != 0) {

			if (numnodes > 1) {
				if (problem_nodes) /*we have an array in which to save*/
					problem_nodes[problem_cnt] = pnode;

				++problem_cnt;

			} else { /*In the specific node case, reply w/ error and return*/
				switch (rc) {

					default:
						req_reject(rc, 0, preq);
				}
				return;
			}

		} else { /*modifications succeed for this node*/
			nodename = strdup(pnode->nd_name);
			effective_node_delete(pnode);
			pnode = NULL; /* pnode has been freed, set it to NULL */
			i--;	      /* the array has been coalesced, so reset i to the earlier position */

			if (nodename) {
				mgr_log_attr(msg_man_set, plist, PBS_EVENTCLASS_NODE, nodename, NULL);
				free(nodename);
				nodename = NULL;
			}
		}
		if (numnodes == 1)
			break;
	} /*bottom of the for()*/

	save_nodes_db(1, NULL);

	if (numnodes > 1) { /*modification was for all nodes  */

		if (problem_cnt) { /*one or more problems encountered*/

			for (len = 0, i = 0; i < problem_cnt; i++)
				len += strlen(problem_nodes[i]->nd_name) + 3;

			len += strlen(pbse_to_txt(PBSE_GMODERR));

			if ((problem_names = malloc(len)) != NULL) {

				strcpy(problem_names, pbse_to_txt(PBSE_GMODERR));
				for (i = 0; i < problem_cnt; i++) {
					if (i)
						strcat(problem_names, ", ");
					strcat(problem_names, problem_nodes[i]->nd_name);
				}

				(void) reply_text(preq, PBSE_GMODERR, problem_names);
				free(problem_names);
				problem_names = NULL;
			} else {
				(void) reply_text(preq, PBSE_GMODERR, pbse_to_txt(PBSE_GMODERR));
			}
		}

		free(problem_nodes); /* maybe problem malloc failed */
		problem_nodes = NULL;

		if (problem_cnt) /* reply has already been sent */
			return;
	}

	reply_ack(preq);
}

/**
 * @brief
 *		mgr_sched_create - process request to create a sched
 *
 *		Creates sched
 *
 *  @param[in]	preq	- Pointer to a batch request structure
 */
static void
mgr_sched_create(struct batch_request *preq)
{
	int bad;
	svrattrl *plist;
	pbs_sched *psched;
	int rc;

	if (strlen(preq->rq_ind.rq_manager.rq_objname) > PBS_MAXSCHEDNAME) {
		req_reject(PBSE_SCHED_NAME_BIG, 0, preq);
		return;
	}
	if (find_sched(preq->rq_ind.rq_manager.rq_objname)) {
		req_reject(PBSE_SCHEDEXIST, 0, preq);
		return;
	}

	psched = sched_alloc(preq->rq_ind.rq_manager.rq_objname);
	if (!psched)
		req_reject(PBSE_SYSTEM, 0, preq);

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	rc = mgr_set_attr(psched->sch_attr, sched_attr_idx, sched_attr_def, SCHED_ATR_LAST, plist,
			  preq->rq_perm, &bad, (void *) psched, ATR_ACTION_NEW);
	if (rc != 0) {
		reply_badattr(rc, bad, plist, preq);
		sched_free(psched);
		return;
	}

	set_sched_default(psched, 0);

	sched_save_db(psched);
	snprintf(log_buffer, LOG_BUF_SIZE, msg_manager, msg_man_set, preq->rq_user, preq->rq_host);
	log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SCHED, LOG_INFO, msg_daemonname, log_buffer);
	mgr_log_attr(msg_man_set, plist, PBS_EVENTCLASS_SCHED, msg_daemonname, NULL);
	reply_ack(preq);
}

/**
 * @brief
 *		mgr_node_create - process request to create a node
 *
 *		Creates pbsnode and calls mgr_set_attr to set
 *      any associated node attributes also specified in the
 *      request.
 *
 *  @param[in]	preq	- Pointer to a batch request structure
 */

void
mgr_node_create(struct batch_request *preq)
{
	int bad;
	svrattrl *plist;
	int rc;
	char *vnp; /* Temp storage to validate (v)node name */
	long vn_pool;
	struct pbsnode *pnode;
	mominfo_t *mymom;
	struct sockaddr_in check_ip;
	int is_node_ip;
	char *nodename;

	nodename = preq->rq_ind.rq_manager.rq_objname;

	/*
	 * Before creating the (v)node, validate the (v)node name using
	 * legal_vnode_char() to check if it contains any invalid character.
	 * If any invalid char found, then reject the batch req with
	 * PBSE_BADNDATVAL.
	 */
	if ((vnp = nodename) != NULL) {
		for (; *vnp && legal_vnode_char(*vnp, 1); vnp++)
			;
		if (*vnp) {
			req_reject(PBSE_BADNDATVAL, 0, preq);
			return;
		}
		/* Condition to make sure that node name should not exceed
		 * PBS_MAXHOSTNAME i.e. 64 characters. This is because the
		 * corresponding column nd_name in the database table pbs.node
		 * is defined as string of length 64.
		 */
		if (strlen(nodename) > PBS_MAXHOSTNAME) {
			req_reject(PBSE_NODENBIG, 0, preq);
			return;
		}
	}
	is_node_ip = inet_pton(AF_INET, nodename, &(check_ip.sin_addr));
	if (is_node_ip > 0) {
		log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_NODE, LOG_INFO, nodename,
			   "Node added using IPv4 address\nVerify that PBS_MOM_NODE_NAME is configured on the respective host");
	}
	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	rc = create_pbs_node(nodename, plist, preq->rq_perm, &bad, &pnode, TRUE);

	if (rc != 0) {

		switch (rc) {
			case PBSE_INTERNAL:
			case PBSE_SYSTEM:
				req_reject(rc, bad, preq);
				break;

			case PBSE_NOATTR:
			case PBSE_ATTRRO:
			case PBSE_MUTUALEX:
			case PBSE_BADNDATVAL:
				reply_badattr(rc, bad, plist, preq);
				break;

			default:
				req_reject(rc, 0, preq);
		}
		return;
	}

	mymom = pnode->nd_moms[0];
	if (is_nattr_set(pnode, ND_ATR_vnode_pool) && ((vn_pool = get_nattr_long(pnode, ND_ATR_vnode_pool)) > 0)) {
		if (add_mom_to_pool(mymom) == PBSE_NONE) {
			/* cross link any vnodes of an existing Mom in pool */
			int i;
			mom_svrinfo_t *ppoolm = NULL;
			vnpool_mom_t *ppool = vnode_pool_mom_list;
			while (ppool) {
				if (ppool->vnpm_vnode_pool == vn_pool) {
					if (ppool->vnpm_inventory_mom) {
						ppoolm = (mom_svrinfo_t *) (ppool->vnpm_inventory_mom->mi_data);
					}
					break;
				}
				ppool = ppool->vnpm_next;
			}
			if (ppoolm) {
				log_eventf(PBSEVENT_DEBUG4, PBS_EVENTCLASS_NODE, LOG_INFO,
					   mymom->mi_host, "POOL: cross linking %d vnodes from %s",
					   ppoolm->msr_numvnds - 1, ppool->vnpm_inventory_mom->mi_host);
				for (i = 1; i < ppoolm->msr_numvnds; ++i) {
					cross_link_mom_vnode(ppoolm->msr_children[i], mymom);
				}
			}
		}
	}
	mgr_log_attr(msg_man_set, plist, PBS_EVENTCLASS_NODE, nodename, NULL);

	setup_notification(); /*set mechanism for notifying */
	/*other nodes of new member   */

	save_nodes_db(1, NULL);

	reply_ack(preq); /*create completely successful*/
}

/**
 * @brief
 * 		Helper function used to handle resource deletion and setting
 *
 * @par
 * 		The caller must ensure that the parameters are valid and not NULL
 *
 * @param[in]	pattr	- attribute structure which needs to be checked
 * @param[in]	preq	- The client's batch request
 *
 * @return	a pointer to the resource
 * @retval	NULL	- if the resource is not set
 */
static resource *
get_resource(attribute *pattr, resource_def *prdef)
{
	resource *presc;

	if (is_attr_set(pattr)) {
		if (pattr->at_type == ATR_TYPE_RESC) {
			presc = (resource *) GET_NEXT(pattr->at_val.at_list);
			while (presc) {
				if (presc->rs_defin == prdef) {
					return presc;
				}
				presc = (resource *) GET_NEXT(presc->rs_link);
			}
		}
	}
	return NULL;
}
/**
 * @brief
 * 		is_entity_resource_set	- checks for a resource name in all the resources
 * 									in the tree within the attribute structure
 *
 * @param[in]	pattr	- attribute structure which contains the resource tree root
 * @param[in]	resc_name	- resource name
 *
 * @return	int
 * @retval	0	- resource set
 * @retval	1	- resource not set
 */
int
is_entity_resource_set(attribute *pattr, char *resc_name)
{
	if (is_attr_set(pattr)) {
		char *key = NULL;
		void *ctx = pattr->at_val.at_enty.ae_tree;
		char resc[PBS_MAX_RESC_NAME + 1];

		while (entlim_get_next(ctx, (void **) &key) != NULL) {
			if (entlim_resc_from_key(key, resc, PBS_MAX_RESC_NAME) == 0) {
				if (strcmp(resc, resc_name) == 0)
					return 1;
			}
		}
	}
	return 0;
}

/**
 * @brief
 * 		Helper function to check if a resource is set on jobs or reservations.
 * @par
 * 		If a resource is busy on an object, this function will respond back to
 * 		the client request's.
 *
 * @param[in]	preq	- The client's batch request
 * @param[in]	prdef	- The resource definition to check on
 * @param[in]	mod	- Set to 1 if the resource type or flag are being modified
 *
 * @return	BOOLEAN
 * @retval	1	- If resource is busy on jobs or reservations
 * @retval	0	- Otherwise
 */
static int
check_resource_set_on_jobs_or_resvs(struct batch_request *preq, resource_def *prdef, int mod)
{
	job *pj;
	resc_resv *pr;
	char *rmatch;
	int rlen;
	resource *presc;
	resource *presc_list;
	resource *presc_used;

	/* Reject if resource is on a job and the type or flag are being modified */

	for (pj = (job *) GET_NEXT(svr_alljobs); pj != NULL; pj = (job *) GET_NEXT(pj->ji_alljobs)) {
		presc_list = get_resource(get_jattr(pj, JOB_ATR_resc_used), prdef);
		presc_used = get_resource(get_jattr(pj, JOB_ATR_resource), prdef);
		if (((presc_list != NULL) || presc_used != NULL) && (mod == 1)) {
			reply_text(preq, PBSE_RESCBUSY, "Resource busy on job");
			return 1;
		}
		if (is_jattr_set(pj, JOB_ATR_SchedSelect)) {
			char *val = get_jattr_str(pj, JOB_ATR_SchedSelect);
			rmatch = strstr(val, prdef->rs_name);
			if (rmatch != NULL) {
				rlen = strlen(prdef->rs_name);
				if (((mod == 1) && (*(rmatch + rlen) == '=')) &&
				    ((rmatch == val) || *(rmatch - 1) == ':')) {
					reply_text(preq, PBSE_RESCBUSY, "Resource busy on job");
					return 1;
				}
			}
		}
	}

	/* Reject if resource is on a job and the type or flag are being modified */
	pr = (resc_resv *) GET_NEXT(svr_allresvs);
	while (pr != NULL) {
		presc = get_resource(get_rattr(pr, RESV_ATR_resource), prdef);
		if ((presc != NULL) && (mod == 1)) {
			reply_text(preq, PBSE_RESCBUSY, "Resource busy on reservation");
			return 1;
		}
		if (is_rattr_set(pr, RESV_ATR_SchedSelect)) {
			rmatch = strstr(get_rattr_str(pr, RESV_ATR_SchedSelect), prdef->rs_name);
			if (rmatch != NULL) {
				rlen = strlen(prdef->rs_name);
				if (((*(rmatch + rlen) == '=') && (*(rmatch - 1) == ':')) && (mod == 1)) {
					reply_text(preq, PBSE_RESCBUSY, "Resource busy on reservation");
					return 1;
				}
			}
		}
		pr = (resc_resv *) GET_NEXT(pr->ri_allresvs);
	}
	return 0;
}

/**
 * @brief
 * 		helper function to send/update resourcedef file.
 */
static void
timed_send_rescdef()
{
	send_rescdef(1); /* forcing with 1 to avoid failures due to intermittent file stamp race issues */
	rescdef_wt_g = NULL;
}

/**
 * @brief
 * 		deferred send/update resourcedef file
 *
 * 		Bursty updates to resources may result in a large number of requests to
 * 		update the resourcedef file on the MoMs, to avoid piling up the requests,
 * 		the update is sent after one second from the last request.
 */
static void
deferred_send_rescdef()
{
	if (rescdef_wt_g) {
		delete_task(rescdef_wt_g);
	}
	rescdef_wt_g = set_task(WORK_Timed, ((long) (time(0))) + 1, timed_send_rescdef, NULL);
}

/**
 * @brief
 * 		Create a resource
 *
 * @param[in]	preq	- The request containing information about the resource to
 * 		     				create
 *
 * @par
 * 		Requests to create a resource with a name that may already exist are
 * 		rejected.
 *
 * @return	void
 */
static void
mgr_resource_create(struct batch_request *preq)
{
	char *resc;
	char buf[LOG_BUF_SIZE];
	int rc;
	svrattrl *plist;
	int type = ATR_TYPE_STR;
	int flags = READ_WRITE;
	int flag_ir = 0;
	resource_def *prdef;

	if ((resc = preq->rq_ind.rq_manager.rq_objname) == NULL) {
		req_reject(PBSE_BADATVAL, 0, preq);
		return;
	}

	rc = verify_resc_name(resc);
	if (rc != 0) {
		req_reject(PBSE_BADATVAL, 0, preq);
		return;
	}

	prdef = find_resc_def(svr_resc_def, resc);
	if (prdef != NULL) {
		req_reject(PBSE_DUPLIST, 0, preq);
		return;
	}

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	while (plist) {
		if (strcmp(plist->al_atopl.name, ATTR_RESC_TYPE) == 0) {
			if (parse_resc_type(plist->al_atopl.value, &type) == -1) {
				req_reject(PBSE_BADATVAL, 0, preq);
				return;
			}
		} else if (strcmp(plist->al_atopl.name, ATTR_RESC_FLAG) == 0) {
			if (parse_resc_flags(plist->al_atopl.value, &flag_ir, &flags) == -1) {
				req_reject(PBSE_BADATVAL, 0, preq);
				return;
			}
		} else {
			req_reject(PBSE_BADATVAL, 0, preq);
			return;
		}
		plist = (svrattrl *) GET_NEXT(plist->al_link);
	}

	if (verify_resc_type_and_flags(type, &flag_ir, &flags, resc, buf, sizeof(buf), 0) != 0) {
		reply_text(preq, PBSE_BADATVAL, buf);
		return;
	}

	rc = add_resource_def(resc, type, flags);
	if (rc < 0) {
		log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_ERR, msg_daemonname, "resource %s can not be defined", resc);
		req_reject(PBSE_BADATVAL, 0, preq);
		return;
	}

	log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_RESC, LOG_INFO, resc, msg_manager, msg_man_cre, preq->rq_user, preq->rq_host);
	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	mgr_log_attr(msg_man_set, plist, PBS_EVENTCLASS_RESC, preq->rq_ind.rq_manager.rq_objname, NULL);

	if (flags & (ATR_DFLAG_RASSN | ATR_DFLAG_FNASSN | ATR_DFLAG_ANASSN)) {
		update_resc_sum();
	}
	reply_ack(preq);

	restart_python_interpreter(__func__);
	deferred_send_rescdef();

	return;
}

/**
 * @brief
 * 		Delete a resource
 *
 * @param[in]	preq	- The request containing information about the resource to
 * 		     				delete
 *
 * @par
 * 		Requests to delete resources that are set on a job or reservation
 * 		are rejected. If the resource is set on any other object, i.e., server,
 * 		queue, or node, the resource is unset from the associated object.
 *
 * @return void
 */
static void
mgr_resource_delete(struct batch_request *preq)
{
	char *resc;
	int rc;
	int i, j;
	pbs_queue *pq;
	resource_def *prdef;
	attribute *pattr;
	resource_def *svr_rd;
	resource_def *svr_rd_prev;
	svrattrl *plist;
	int bad;
	int updatedb; /* set to 1 if a db update is needed */

	if ((resc = preq->rq_ind.rq_manager.rq_objname) == NULL) {
		req_reject(PBSE_BADATVAL, 0, preq);
		return;
	}

	if ((prdef = find_resc_def(svr_resc_def, resc)) == NULL) {
		req_reject(PBSE_UNKRESC, 0, preq);
		return;
	}

	if (!prdef->rs_custom) {
		req_reject(PBSE_IVALREQ, 0, preq);
		return;
	}

	rc = check_resource_set_on_jobs_or_resvs(preq, prdef, 1);
	if (rc == 1)
		return;

	/* check if resource is set in restrict_res_to_release_on_suspend */
	if (is_sattr_set(SVR_ATR_restrict_res_to_release_on_suspend)) {
		struct array_strings *pval = get_sattr_arst(SVR_ATR_restrict_res_to_release_on_suspend);
		for (i = 0; pval != NULL && i < pval->as_usedptr; i++) {
			if (strcmp(pval->as_string[i], prdef->rs_name) == 0) {
				reply_text(preq, PBSE_RESCBUSY, "Resource busy on server");
				return;
			}
		}
	}

	rc = update_resource_def_file(resc, RESDEF_DELETE, prdef->rs_type, prdef->rs_flags);
	if (rc != 0) {
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESC, LOG_ERR, msg_daemonname, "Error updating resource definitions");
		req_reject(PBSE_SYSTEM, 0, preq);
		return;
	}

	/* Is the resource set on queues? If so unset */
	pq = (pbs_queue *) GET_NEXT(svr_queues);
	while (pq != NULL) {
		updatedb = 0;
		for (i = 0; i < QA_ATR_LAST; i++) {
			pattr = get_qattr(pq, i);
			if (is_attr_set(pattr) && (pattr->at_type == ATR_TYPE_RESC || pattr->at_type == ATR_TYPE_ENTITY)) {
				plist = attrlist_create(que_attr_def[i].at_name, prdef->rs_name, 0);
				plist->al_link.ll_next->ll_struct = NULL;
				rc = mgr_unset_attr(pq->qu_attr, que_attr_idx, que_attr_def, QA_ATR_LAST, plist, -1, &bad, (void *) pq, PARENT_TYPE_QUE_ALL, INDIRECT_RES_CHECK);
				if (rc != 0) {
					log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESC, LOG_DEBUG, resc, "error unsetting resource %s.%s", que_attr_def[i].at_name, prdef->rs_name);
					reply_badattr(rc, bad, plist, preq);
					free_svrattrl(plist);
					return;
				} else {
					/* default_chunk requires special handling because
					 * the server keeps track of defaults to add to
					 * schedselect @see qu_seldft
					 */
					if (i == QE_ATR_DefaultChunk && (get_qattr(pq, QE_ATR_DefaultChunk))->at_flags & ATR_VFLAG_MODIFY)
						(void) deflt_chunk_action(get_qattr(pq, QE_ATR_DefaultChunk), (void *) pq, ATR_ACTION_ALTER);
				}
				updatedb = 1;
				free_svrattrl(plist);
			}
		}
		if (updatedb) {
			que_save_db(pq);
		}
		pq = (pbs_queue *) GET_NEXT(pq->qu_link);
	}

	updatedb = 0;
	/* Is the resource set on the server? If so unset */
	for (i = 0; i < SVR_ATR_LAST; i++) {
		pattr = get_sattr(i);
		if (is_attr_set(pattr) && (pattr->at_type == ATR_TYPE_RESC || pattr->at_type == ATR_TYPE_ENTITY)) {
			plist = attrlist_create(svr_attr_def[i].at_name, prdef->rs_name, 0);
			plist->al_link.ll_next->ll_struct = NULL;
			rc = mgr_unset_attr(server.sv_attr, svr_attr_idx, svr_attr_def, SVR_ATR_LAST, plist, -1, &bad, (void *) &server, PARENT_TYPE_SERVER, INDIRECT_RES_CHECK);
			if (rc != 0) {
				log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESC, LOG_DEBUG, resc, "error unsetting resource %s.%s", svr_attr_def[i].at_name, prdef->rs_name);
				reply_badattr(rc, bad, plist, preq);
				free_svrattrl(plist);
				return;
			} else {
				/* default_chunk requires special handling because
				 * the server keeps track of defaults to add to
				 * schedselect @see sv_seldft
				 */
				attribute *dfltchk_attr;
				if ((i == SVR_ATR_DefaultChunk) && ((dfltchk_attr = get_sattr(SVR_ATR_DefaultChunk))->at_flags & ATR_VFLAG_MODIFY)) {
					(void) deflt_chunk_action(dfltchk_attr, (void *) &server, ATR_ACTION_ALTER);
				}
			}
			free_svrattrl(plist);
			updatedb = 1;
		}
	}
	if (updatedb) {
		svr_save_db(&server);
	}

	/* Is the resource set on nodes? If so unset */
	for (i = 0; i < svr_totnodes; i++) {
		updatedb = 0;
		for (j = 0; j < ND_ATR_LAST; j++) {
			pattr = get_nattr(pbsndlist[i], j);
			if (is_attr_set(pattr) && (pattr->at_type == ATR_TYPE_RESC || pattr->at_type == ATR_TYPE_ENTITY)) {
				plist = attrlist_create(node_attr_def[j].at_name, prdef->rs_name, 0);
				plist->al_link.ll_next->ll_struct = NULL;
				rc = mgr_unset_attr(pbsndlist[i]->nd_attr, node_attr_idx, node_attr_def, ND_ATR_LAST, plist, -1, &bad, (void *) pbsndlist[i], PARENT_TYPE_NODE, INDIRECT_RES_UNLINK);
				if (rc != 0) {
					log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESC, LOG_DEBUG, resc, "error unsetting resource %s.%s", node_attr_def[i].at_name, prdef->rs_name);
					reply_badattr(rc, bad, plist, preq);
					free_svrattrl(plist);
					return;
				}
				free_svrattrl(plist);
				updatedb = 1;
			}
		}
		if (updatedb) {
			node_save_db(pbsndlist[i]);
		}
	}

	for (i = 0; svr_resc_sum[i].rs_def != NULL; i++) {
		if (svr_resc_sum[i].rs_def == prdef) {
			for (j = i; svr_resc_sum[j].rs_def != NULL; j++) {
				svr_resc_sum[j].rs_def = svr_resc_sum[j + 1].rs_def;
				svr_resc_sum[j].rs_prs = svr_resc_sum[j + 1].rs_prs;
				svr_resc_sum[j].rs_attr = svr_resc_sum[j + 1].rs_attr;
				svr_resc_sum[j].rs_set = svr_resc_sum[j + 1].rs_set;
			}
			break;
		}
	}

	for (svr_rd_prev = NULL, svr_rd = svr_resc_def; svr_rd != NULL; svr_rd_prev = svr_rd, svr_rd = svr_rd->rs_next) {
		if (svr_rd == prdef) {
			if (svr_rd_prev != NULL) {
				svr_rd_prev->rs_next = svr_rd->rs_next;
			} else {
				svr_resc_def = svr_rd->rs_next;
			}
			if (pbs_idx_delete(resc_attrdef_idx, prdef->rs_name) != PBS_IDX_RET_OK)
				log_errf(-1, __func__, "Could not remove %s from server resource index", prdef->rs_name);
			free(prdef->rs_name);
			free(prdef);
			prdef = NULL;
			svr_resc_size--;
			break;
		}
	}

	log_eventf(PBSEVENT_ADMIN, PBS_EVENTCLASS_RESC, LOG_INFO, resc, msg_manager, msg_man_del, preq->rq_user, preq->rq_host);

	reply_ack(preq);

	restart_python_interpreter(__func__);
	deferred_send_rescdef();
	set_scheduler_flag(SCH_CONFIGURE, NULL);

	return;
}

/**
 * @brief
 * 		Set a resource type and/or flag
 *
 * @param[in]	preq	- The request containing information about the resource to
 * 		     				set
 *
 * @par
 * 		Requests to set a resource type is only possible if the resource
 * 		is not set on any object, i.e. not on any job, reservation, node, queue, or
 * 		server.
 *
 * @par
 * 		Requests to set a resource flag is only possible if the resource
 * 		is not set on any job or reservation. Setting/resetting a resource's
 * 		visibility flags i or r is however always honored regardless of whether the
 * 		resource is defined on objects or not.
 *
 * @return void
 */
static void
mgr_resource_set(struct batch_request *preq)
{
	char *resc;
	char buf[LOG_BUF_SIZE];
	resource_def *prdef;
	svrattrl *plist;
	int type;
	int flags;
	int o_flags;
	int flag_ir = 0;
	int i, j;
	attribute *pattr;
	resource *presc;
	pbs_queue *pq;
	int mod_type = 0;
	int mod_flag = 0;
	int busy;
	int rc;
	struct resc_type_map *p_resc_type_map = NULL;

	if ((preq->rq_perm & PERM_MANAGER) == 0) {
		req_reject(PBSE_PERM, 0, preq);
		return;
	}

	if ((resc = preq->rq_ind.rq_manager.rq_objname) == NULL) {
		req_reject(PBSE_BADATVAL, 0, preq);
		return;
	}

	prdef = find_resc_def(svr_resc_def, resc);
	if (prdef == NULL) {
		req_reject(PBSE_UNKRESC, 0, preq);
		return;
	}

	if (!prdef->rs_custom) {
		req_reject(PBSE_IVALREQ, 0, preq);
		return;
	}

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	if (plist == NULL) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "missing type and/or permissions for '%s'", resc);
		req_reject(PBSE_BADATVAL, 0, preq);
		return;
	}
	while (plist != NULL) {
		if (strcmp(plist->al_atopl.name, ATTR_RESC_TYPE) == 0) {
			/* we will reject any request that modifies type if the
			 * resource is set on any object
			 */
			mod_type = 1;
		} else if (strcmp(plist->al_atopl.name, ATTR_RESC_FLAG) == 0) {
			const char *f = "fhnq";
			char *new = plist->al_atopl.value;
			char *old = find_resc_flag_map(prdef->rs_flags);

			/* If any of "fhnq" flags are being added, then reject
			 * the request if the resource is set on
			 * jobs/reservations.
			 *
			 * Adding flags r or i is allowed
			 */
			for (i = 0; (i < strlen(f)) && (mod_flag == 0); i++) {
				if ((strchr(old, f[i]) == NULL) &&
				    (strchr(new, f[i]) != NULL)) {
					mod_flag = 1;
				} else if ((strchr(old, f[i]) != NULL) &&
					   (strchr(new, f[i]) == NULL)) {
					mod_flag = 1;
				}
			}
			free(old);
		}
		plist = (svrattrl *) GET_NEXT(plist->al_link);
	}

	rc = check_resource_set_on_jobs_or_resvs(preq, prdef, (mod_type || mod_flag));
	if (rc == 1) {
		/* check function replies to client's preq */
		return;
	}
	/* Reject if resource is on a queue and the type is being modified */
	pq = (pbs_queue *) GET_NEXT(svr_queues);
	while (pq != NULL) {
		busy = 0;
		for (i = 0; (i < QA_ATR_LAST) && (busy == 0); i++) {
			pattr = get_qattr(pq, i);
			if (pattr->at_type == ATR_TYPE_RESC) {
				presc = get_resource(pattr, prdef);
				if ((mod_type == 1) && (presc != NULL)) {
					busy = 1;
				}
			} else if (pattr->at_type == ATR_TYPE_ENTITY) {
				if ((mod_type == 1) && is_entity_resource_set(pattr, prdef->rs_name)) {
					busy = 1;
				}
			}
		}
		if (busy) {
			reply_text(preq, PBSE_RESCBUSY, "Resource busy on queue");
			return;
		}
		pq = (pbs_queue *) GET_NEXT(pq->qu_link);
	}

	/* Reject if resource is on the server and the type is being modified */
	for (i = 0, busy = 0; (i < SVR_ATR_LAST) && (busy == 0); i++) {
		pattr = get_sattr(i);
		if (pattr->at_type == ATR_TYPE_RESC) {
			presc = get_resource(pattr, prdef);
			if ((presc != NULL) && (mod_type == 1)) {
				busy = 1;
			}
		} else if (pattr->at_type == ATR_TYPE_ENTITY) {
			if ((mod_type == 1) && is_entity_resource_set(pattr, prdef->rs_name)) {
				busy = 1;
			}
		}
	}
	if (busy) {
		reply_text(preq, PBSE_RESCBUSY, "Resource busy on server");
		return;
	}

	/* Reject if resource is on a node and the type is being modified */
	for (i = 0; i < svr_totnodes; i++) {
		if (pbsndlist[i]->nd_state & INUSE_DELETED)
			continue;

		pattr = get_nattr(pbsndlist[i], ND_ATR_ResourceAvail);
		for (j = 0; j < ND_ATR_LAST; j++) {
			pattr = get_nattr(pbsndlist[i], j);
			presc = get_resource(pattr, prdef);
			if ((presc != NULL) && (mod_type == 1)) {
				reply_text(preq, PBSE_RESCBUSY, "Resource busy on node");
				return;
			}
		}
	}

	type = prdef->rs_type;
	o_flags = flags = prdef->rs_flags;

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	while (plist) {
		if (strcmp(plist->al_atopl.name, ATTR_RESC_TYPE) == 0) {
			p_resc_type_map = find_resc_type_map_by_typest(plist->al_atopl.value);
			if (p_resc_type_map == NULL) {
				req_reject(PBSE_BADATVAL, 0, preq);
				return;
			}
			type = p_resc_type_map->rtm_type;
		} else if (strcmp(plist->al_atopl.name, ATTR_RESC_FLAG) == 0) {
			if (parse_resc_flags(plist->al_atopl.value, &flag_ir, &flags) == -1) {
				req_reject(PBSE_BADATVAL, 0, preq);
				return;
			}
			mod_flag = 1;
		} else {
			req_reject(PBSE_BADATVAL, 0, preq);
			return;
		}

		plist = (svrattrl *) GET_NEXT(plist->al_link);
	}

	if (verify_resc_type_and_flags(type, &flag_ir, &flags, resc, buf, sizeof(buf), 0) != 0) {
		reply_text(preq, PBSE_BADATVAL, buf);
		return;
	}

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	mgr_log_attr(msg_man_set, plist, PBS_EVENTCLASS_RESC, resc, NULL);

	rc = update_resource_def_file(resc, RESDEF_UPDATE, type, flags);
	if (rc != 0) {
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESC, LOG_ERR, msg_daemonname, "Error updating resource definitions");
		req_reject(PBSE_SYSTEM, 0, preq);
		return;
	}

	if (mod_type && (p_resc_type_map != NULL)) {
		prdef->rs_decode = p_resc_type_map->rtm_decode;
		prdef->rs_encode = p_resc_type_map->rtm_encode;
		prdef->rs_set = p_resc_type_map->rtm_set;
		prdef->rs_comp = p_resc_type_map->rtm_comp;
		prdef->rs_free = p_resc_type_map->rtm_free;
		prdef->rs_type = p_resc_type_map->rtm_type;
	}
	if (mod_flag) {
		prdef->rs_flags = flags;
	}

	if ((o_flags & (ATR_DFLAG_RASSN | ATR_DFLAG_FNASSN | ATR_DFLAG_ANASSN)) ||
	    (flags & (ATR_DFLAG_RASSN | ATR_DFLAG_FNASSN | ATR_DFLAG_ANASSN))) {
		update_resc_sum();
	}

	reply_ack(preq);

	restart_python_interpreter(__func__);
	deferred_send_rescdef();
	set_scheduler_flag(SCH_CONFIGURE, NULL);

	return;
}

/**
 * @brief
 * 		Unset a resource flag
 *
 * @param[in]	preq	- The request containing information about the resource to
 * 		     				unset
 *
 * @par
 * 		Request to unset a resource flag is rejected if the resource is
 * 		set on any job or reservation.
 *
 * @return void
 */
static void
mgr_resource_unset(struct batch_request *preq)
{
	resource_def *prdef;
	svrattrl *plist;
	char *resc;
	int i, j;
	attribute *pattr;
	resource *presc;
	pbs_queue *pq;
	int mod = 0;
	int busy;
	int rc;
	int o_type;
	int o_flags;
	pbs_queue **pq_list = NULL;
	int pq_list_size = 0;
	attribute *q_attr = NULL;
	int q_count = 0;

	if ((preq->rq_perm & PERM_MANAGER) == 0) {
		req_reject(PBSE_PERM, 0, preq);
		return;
	}

	if ((resc = preq->rq_ind.rq_manager.rq_objname) == NULL) {
		req_reject(PBSE_BADATVAL, 0, preq);
		return;
	}

	prdef = find_resc_def(svr_resc_def, resc);
	if (prdef == NULL) {
		req_reject(PBSE_UNKRESC, 0, preq);
		return;
	}

	if (!prdef->rs_custom) {
		req_reject(PBSE_IVALREQ, 0, preq);
		return;
	}

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	if (plist == NULL) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "missing type and/or permissions for '%s'", resc);
		req_reject(PBSE_BADATVAL, 0, preq);
		return;
	}
	while (plist) {
		if ((strcmp(plist->al_atopl.name, ATTR_RESC_TYPE) == 0)) {
			req_reject(PBSE_IVALREQ, 0, preq);
			return;
		}
		if (strcmp(plist->al_atopl.name, ATTR_RESC_FLAG) == 0) {
			mod = 1;
			break;
		}
		plist = (svrattrl *) GET_NEXT(plist->al_link);
	}

	rc = check_resource_set_on_jobs_or_resvs(preq, prdef, 1);
	if (rc == 1)
		return;

	/* Reject if resource is on a queue */
	pq = (pbs_queue *) GET_NEXT(svr_queues);
	while (pq != NULL) {
		for (i = 0, busy = 0; (i < QA_ATR_LAST) && (busy == 0); i++) {
			pattr = get_qattr(pq, i);
			if (pattr->at_type == ATR_TYPE_RESC) {
				presc = get_resource(pattr, prdef);
				if ((presc != NULL) && (mod == 1)) {
					if (i == QE_ATR_ResourceAssn) {
						pbs_queue **temp_q = NULL;
						temp_q = (pbs_queue **) realloc(pq_list, (pq_list_size + 1) * sizeof(pbs_queue *));
						if (temp_q == NULL) {
							reply_text(preq, PBSE_SYSTEM, "malloc failure");
							return;
						}
						pq_list = temp_q;
						pq_list[pq_list_size] = pq;
						pq_list_size++;
					} else
						busy = 1;
				}
			} else if (pattr->at_type == ATR_TYPE_ENTITY) {
				if ((mod == 1) && is_entity_resource_set(pattr, prdef->rs_name)) {
					busy = 1;
				}
			}
		}
		if (busy) {
			reply_text(preq, PBSE_RESCBUSY, "Resource busy on queue");
			if (pq_list != NULL)
				free(pq_list);
			return;
		}
		pq = (pbs_queue *) GET_NEXT(pq->qu_link);
	}

	/* Reject if resource is on a node */
	for (i = 0; i < svr_totnodes; i++) {
		if (pbsndlist[i]->nd_state & INUSE_DELETED)
			continue;

		for (j = 0; j < ND_ATR_LAST; j++) {
			pattr = get_nattr(pbsndlist[i], j);
			presc = get_resource(pattr, prdef);
			if ((presc != NULL) && (mod == 1)) {
				reply_text(preq, PBSE_RESCBUSY, "Resource busy on node");
				if (pq_list != NULL)
					free(pq_list);
				return;
			}
		}
	}

	/* Reject if resource is on the server */
	for (i = 0, busy = 0; (i < SVR_ATR_LAST) && (busy == 0); i++) {
		pattr = get_sattr(i);
		if (pattr->at_type == ATR_TYPE_RESC) {
			presc = get_resource(pattr, prdef);
			if ((presc != NULL) && (mod == 1)) {
				/* Since the resource is not present in resources_available attribute
				 * just delete the resource entry from resources_assigned attribute.
				 */
				if (i == SVR_ATR_resource_assn) {
					if (pattr->at_flags & ATR_VFLAG_SET) {
						presc->rs_defin->rs_free(&presc->rs_value);
						delete_link(&presc->rs_link);
						free(presc);
						presc = (resource *) GET_NEXT(get_attr_list(pattr));
						if (presc == NULL)
							pattr->at_flags &= ~ATR_VFLAG_SET;
						pattr->at_flags |= ATR_MOD_MCACHE;
					}
				} else
					busy = 1;
			}
		} else if (is_attr_set(pattr) && (pattr->at_type == ATR_TYPE_ENTITY)) {
			if ((mod == 1) && is_entity_resource_set(pattr, prdef->rs_name)) {
				busy = 1;
			}
		}
	}
	if (busy) {
		reply_text(preq, PBSE_RESCBUSY, "Resource busy on server");
		if (pq_list != NULL)
			free(pq_list);
		return;
	}
	if (pq_list != NULL) {
		for (q_count = 0; q_count < pq_list_size; q_count++) {
			q_attr = get_qattr(pq_list[q_count], QE_ATR_ResourceAssn);
			presc = get_resource(q_attr, prdef);
			presc->rs_defin->rs_free(&presc->rs_value);
			delete_link(&presc->rs_link);
			free(presc);
			presc = (resource *) GET_NEXT(q_attr->at_val.at_list);
			if (presc == NULL)
				mark_attr_not_set(q_attr);
			q_attr->at_flags |= ATR_MOD_MCACHE;
		}
		free(pq_list);
		pq_list = NULL;
	}

	o_type = prdef->rs_type;
	o_flags = prdef->rs_flags;

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	while (plist) {
		/* Only consider flag since unsetting type is disallowed */
		if (strcmp(plist->al_atopl.name, ATTR_RESC_FLAG) == 0) {
			prdef->rs_flags = READ_WRITE;
		} else {
			req_reject(PBSE_BADATVAL, 0, preq);
			return;
		}
		plist = (svrattrl *) GET_NEXT(plist->al_link);
	}

	plist = (svrattrl *) GET_NEXT(preq->rq_ind.rq_manager.rq_attr);
	mgr_log_attr(msg_man_uns, plist, PBS_EVENTCLASS_RESC, resc, NULL);

	rc = update_resource_def_file(resc, RESDEF_UPDATE, prdef->rs_type, prdef->rs_flags);
	if (rc != 0) {
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_RESC, LOG_ERR, msg_daemonname, "Error updating resource definitions");
		/* rollback mods */
		prdef->rs_type = o_type;
		prdef->rs_flags = o_flags;
		req_reject(PBSE_SYSTEM, 0, preq);
		return;
	}

	if (o_flags & (ATR_DFLAG_RASSN | ATR_DFLAG_FNASSN | ATR_DFLAG_ANASSN)) {
		update_resc_sum();
	}

	reply_ack(preq);

	restart_python_interpreter(__func__);
	deferred_send_rescdef();
	set_scheduler_flag(SCH_CONFIGURE, NULL);

	return;
}

/**
 * @brief
 * 		req_manager - the dispatch routine for a series of functions which
 *		implement the Manager (qmgr) Batch Request
 *
 *		The privilege of the requester is checked against the type of
 *		the object and the operation to be performed on it.  Then the
 *		appropriate function is called to perform the operation.
 *
 * @param[in]	preq	- The request containing information about the resource to perform the operation.
 *
 * @return void
 *
 */
void
req_manager(struct batch_request *preq)
{
	int obj_name_len;
	conn_t *conn = NULL;

	++preq->rq_refct;

	if (preq->prot == PROT_TCP) {
		if (preq->rq_conn != PBS_LOCAL_CONNECTION) {
			conn = get_conn(preq->rq_conn);
			if (!conn) {
				log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_REQUEST, LOG_ERR, __func__, "did not find socket in connection table");
				req_reject(PBSE_SYSTEM, 0, preq);
				goto req_manager_exit;
			}
		}
	}

	obj_name_len = strlen(preq->rq_ind.rq_manager.rq_objname);

	switch (preq->rq_ind.rq_manager.rq_cmd) {

		case MGR_CMD_CREATE:
		case MGR_CMD_DELETE:

			/* MGR_OBJ_SITE_HOOK permission checking is different */
			if (preq->rq_ind.rq_manager.rq_objtype != MGR_OBJ_SITE_HOOK) {
				if ((preq->rq_perm & PERM_MANAGER) == 0) {
					req_reject(PBSE_PERM, 0, preq);
					goto req_manager_exit;
				}
			}

			switch (preq->rq_ind.rq_manager.rq_objtype) {
				case MGR_OBJ_QUEUE:
					if (preq->rq_ind.rq_manager.rq_cmd == MGR_CMD_CREATE)
						mgr_queue_create(preq);
					else
						mgr_queue_delete(preq);
					break;

				case MGR_OBJ_NODE:
					if (preq->rq_ind.rq_manager.rq_cmd == MGR_CMD_CREATE)
						mgr_node_create(preq);
					else
						mgr_node_delete(preq);
					break;

				case MGR_OBJ_SITE_HOOK:
					if (!is_local_root(preq->rq_user, preq->rq_host)) {
						sprintf(log_buffer,
							"%s@%s is unauthorized to access hooks data from server %s",
							preq->rq_user, preq->rq_host, server_host);

						reply_text(preq, PBSE_HOOKERROR, log_buffer);
						goto req_manager_exit;
					}
					if (preq->rq_ind.rq_manager.rq_cmd == MGR_CMD_CREATE)
						mgr_hook_create(preq);
					else
						mgr_hook_delete(preq);
					break;

				case MGR_OBJ_RSC:
					if (preq->rq_ind.rq_manager.rq_cmd == MGR_CMD_CREATE)
						mgr_resource_create(preq);
					else
						mgr_resource_delete(preq);
					break;

				case MGR_OBJ_SCHED:
					if (preq->rq_ind.rq_manager.rq_cmd == MGR_CMD_CREATE) {
						if (obj_name_len == 0) {
							strncpy(preq->rq_ind.rq_manager.rq_objname,
								PBS_DFLT_SCHED_NAME, PBS_MAXSVRJOBID);
							preq->rq_ind.rq_manager.rq_objname[PBS_MAXSVRJOBID] = '\0';
						}
						mgr_sched_create(preq);
					} else
						mgr_sched_delete(preq);
					break;

				default:
					req_reject(PBSE_IVALREQ, 0, preq);
					goto req_manager_exit;
			}
			break;

		case MGR_CMD_SET:

			/* MGR_OBJ_SITE_HOOK permission checking is different */
			if ((preq->rq_ind.rq_manager.rq_objtype != MGR_OBJ_SITE_HOOK) &&
			    (preq->rq_ind.rq_manager.rq_objtype != MGR_OBJ_PBS_HOOK)) {
				if ((preq->rq_perm & PERM_OPorMGR) == 0) {
					req_reject(PBSE_PERM, 0, preq);
					goto req_manager_exit;
				}
			}

			switch (preq->rq_ind.rq_manager.rq_objtype) {
				case MGR_OBJ_SERVER:
					mgr_server_set(preq, conn);
					break;
				case MGR_OBJ_SCHED:
					if (obj_name_len == 0) {
						strncpy(preq->rq_ind.rq_manager.rq_objname,
							PBS_DFLT_SCHED_NAME, PBS_MAXSVRJOBID);
						preq->rq_ind.rq_manager.rq_objname[PBS_MAXSVRJOBID] = '\0';
					}
					mgr_sched_set(preq);
					break;
				case MGR_OBJ_QUEUE:
					mgr_queue_set(preq);
					break;
				case MGR_OBJ_NODE:
				case MGR_OBJ_HOST:
					mgr_node_set(preq);
					break;
				case MGR_OBJ_SITE_HOOK:
				case MGR_OBJ_PBS_HOOK:
					if (!is_local_root(preq->rq_user, preq->rq_host)) {
						sprintf(log_buffer,
							"%s@%s is unauthorized to access hooks data from server %s",
							preq->rq_user, preq->rq_host, server_host);

						reply_text(preq, PBSE_HOOKERROR, log_buffer);
						goto req_manager_exit;
					}
					mgr_hook_set(preq);
					break;
				case MGR_OBJ_RSC:
					mgr_resource_set(preq);
					break;
				default:
					req_reject(PBSE_IVALREQ, 0, preq);
					goto req_manager_exit;
			}
			break;

		case MGR_CMD_UNSET:
			/* MGR_OBJ_SITE_HOOK permission checking is different */
			if ((preq->rq_ind.rq_manager.rq_objtype != MGR_OBJ_SITE_HOOK) &&
			    (preq->rq_ind.rq_manager.rq_objtype != MGR_OBJ_PBS_HOOK)) {
				if ((preq->rq_perm & PERM_OPorMGR) == 0) {
					req_reject(PBSE_PERM, 0, preq);
					goto req_manager_exit;
				}
			}

			switch (preq->rq_ind.rq_manager.rq_objtype) {
				case MGR_OBJ_SERVER:
					mgr_server_unset(preq, conn);
					break;
				case MGR_OBJ_QUEUE:
					mgr_queue_unset(preq);
					break;
				case MGR_OBJ_NODE:
					mgr_node_unset(preq);
					break;
				case MGR_OBJ_SITE_HOOK:
				case MGR_OBJ_PBS_HOOK:
					if (!is_local_root(preq->rq_user, preq->rq_host)) {
						sprintf(log_buffer,
							"%s@%s is unauthorized to access hooks data from server %s",
							preq->rq_user, preq->rq_host,
							server_host);

						reply_text(preq, PBSE_HOOKERROR, log_buffer);
						goto req_manager_exit;
					}
					mgr_hook_unset(preq);
					break;
				case MGR_OBJ_SCHED:
					if (obj_name_len == 0) {
						strncpy(preq->rq_ind.rq_manager.rq_objname,
							PBS_DFLT_SCHED_NAME, PBS_MAXSVRJOBID);
						preq->rq_ind.rq_manager.rq_objname[PBS_MAXSVRJOBID] = '\0';
					}
					mgr_sched_unset(preq);
					break;
				case MGR_OBJ_RSC:
					mgr_resource_unset(preq);
					break;
				default:
					req_reject(PBSE_IVALREQ, 0, preq);
					goto req_manager_exit;
			}
			break;

		case MGR_CMD_IMPORT:

			/* If this expands to operate on other objects like       */
			/* MGR_OBJ_SERVER, MGR_OBJ_QUEUE, MGR_OBJ_NODE, then you  */
			/* might need to put back in here the permission checking */
			/* "if( (preq->rq_perm & PERM_OPorMGR) == 0)..."          */

			switch (preq->rq_ind.rq_manager.rq_objtype) {
				case MGR_OBJ_SITE_HOOK:
				case MGR_OBJ_PBS_HOOK:
					if (!is_local_root(preq->rq_user, preq->rq_host)) {
						sprintf(log_buffer,
							"%s@%s is unauthorized to access hooks data from server %s",
							preq->rq_user, preq->rq_host, server_host);

						reply_text(preq, PBSE_HOOKERROR, log_buffer);
						goto req_manager_exit;
					}
					mgr_hook_import(preq);
					break;
				default:
					req_reject(PBSE_IVALREQ, 0, preq);
					goto req_manager_exit;
			}
			break;

		case MGR_CMD_EXPORT:

			/* If this expands to operate on other objects like       */
			/* MGR_OBJ_SERVER, MGR_OBJ_QUEUE, MGR_OBJ_NODE, then you  */
			/* might need to put back in here the permission checking */
			/* "if( (preq->rq_perm & PERM_OPorMGR) == 0)..."          */

			switch (preq->rq_ind.rq_manager.rq_objtype) {
				case MGR_OBJ_SITE_HOOK:
				case MGR_OBJ_PBS_HOOK:
					if (!is_local_root(preq->rq_user, preq->rq_host)) {
						sprintf(log_buffer,
							"%s@%s is unauthorized to access hooks data from server %s",
							preq->rq_user, preq->rq_host, server_host);

						reply_text(preq, PBSE_HOOKERROR, log_buffer);
						goto req_manager_exit;
					}
					mgr_hook_export(preq);
					break;
				default:
					req_reject(PBSE_IVALREQ, 0, preq);
					goto req_manager_exit;
			}
			break;

		default: /*batch_request specified an invalid command*/
			req_reject(PBSE_IVALREQ, 0, preq);
			goto req_manager_exit;
	}
req_manager_exit : {
	char hook_msg[HOOK_MSG_SIZE];
	process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt);
}
	if (--preq->rq_refct == 0) {
		reply_send(preq);
	}
}

/**
 * @brief
 * 		manager_oper_chk - check the @host part of a manager or operator acl
 *		entry to insure it is fully qualified.  This is to prevent
 *		input errors when setting the list.
 *		This is the at_action() routine for the server attributes
 *		"managers" and "operators"
 *
 * @param[in]	pattr	-     pointer to new attribute value
 * @param[in]	pobject	-     pointer to node
 * @param[in]	actmode	-     action mode
 *
 * @return	error code
 * @retval	0	- success
 * @retval	PBSE_BADHOST	- failure
 */

int
manager_oper_chk(attribute *pattr, void *pobject, int actmode)
{
	char *entry;
	int err = 0;
	char hostname[PBS_MAXHOSTNAME + 1];
	int i;
	struct array_strings *pstr;

	if (actmode == ATR_ACTION_FREE)
		return (0); /* no checking on free */

	if ((pstr = pattr->at_val.at_arst) == NULL)
		return (0);

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
	/* with kerberos, we do not check */
	return 0;
#endif

	for (i = 0; i < pstr->as_usedptr; ++i) {
		entry = strchr(pstr->as_string[i], (int) '@');
		if (entry == NULL) {
			err = PBSE_BADHOST;
			break;
		}
		entry++;	     /* point after the '@' */
		if (*entry != '*') { /* if == * cannot check it any more */
			/* if not wild card, must be fully qualified host */
			if (get_fullhostname(entry, hostname, (sizeof(hostname) - 1)) ||
			    strncasecmp(entry, hostname, (sizeof(hostname) - 1))) {
				if (actmode == ATR_ACTION_RECOV) {
					(void) sprintf(log_buffer, "bad entry in acl: %s",
						       pstr->as_string[i]);
					log_err(PBSE_BADHOST, "manager_oper_chk",
						log_buffer);
				} else {
					err = PBSE_BADHOST;
				}
			}
		}
	}
	return (err);
}

/**
 * @brief
 * 		node_comment - action routine for the comment attribute of a node
 *		if set to non-default, set flag to cause node_status file to
 *		be written.   The comment will be the last item on the status line.
 *
 * @param[in]	pattr	-     pointer to new attribute value
 * @param[in]	pobj    -     pointer to node
 * @param[in]	act     -     action mode
 *
 * @return	error code
 * @retval	0	- success
 */
int
node_comment(attribute *pattr, void *pobj, int act)
{
	return 0;
}

/**
 * @brief
 *		Checks if provisioning can be enabled on a vnode.
 *
 * @par Functionality:
 *		This function is an action routine for provision_enable attribute of a
 *		node and server. It checks if attribute can be set a vnode. If vnode is
 *		a head node returns error.
 *
 * @param[in]	new     -     pointer to new attribute value
 * @param[in]	pobj    -     pointer to node
 * @param[in]	act     -     action mode
 *
 * @return	int
 * @retval	 PBSE_NONE	- success
 * @retval	 PBSE_PROV_HEADERROR	- failure if setting on head node
 *
 * @par Side Effects: None
 *
 * @par MT-safe: No
 *
 */
int
node_prov_enable_action(attribute *new, void *pobj, int act)
{
	struct pbsnode *pnode = (struct pbsnode *) pobj;

	if (is_attr_set(new) && new->at_val.at_long == 1) {
		/* Check user tries to set on Head node */
		if (is_nattr_set(pnode, ND_ATR_Mom) && compare_short_hostname(get_nattr_str(pnode, ND_ATR_Mom), server_host) == 0)
			return PBSE_PROV_HEADERROR;
	}

	return PBSE_NONE;
}

/**
 * @brief
 *		Checks if prov_tracking size needs to be readjusted.
 *
 * @par Functionality:
 *		This function is action routine for max_concurrent_provision attribute
 *		of server. It resizes prov_tracking table. If new value is greater than
 *		current value, an immediate work task is created to drain queued provisioning
 *		requests.
 *
 * @see
 *		#prov_tracking in provision.h
 *
 * @param[in]	new     -     pointer to new attribute value
 * @param[in]	pobj    -     pointer to node
 * @param[in]	act     -     action mode
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 * @retval	PBSE_BADATVAL : if the value provided <= 0
 * @retval	PBSE_SYSTEM : error returned if resize_prov_table function fails
 *
 * @par Side Effects:
 *		Unknown
 *
 * @par MT-safe: No
 *
 */
int
svr_max_conc_prov_action(attribute *new, void *pobj, int act)
{
	int rc;

	if (is_attr_set(new)) {

		if ((int) new->at_val.at_long <= 0)
			return PBSE_BADATVAL;
	}

	max_concurrent_prov = new->at_val.at_long;

	if (act == ATR_ACTION_RECOV)
		/* dont resize prov table yet, since we might be recovering */
		return PBSE_NONE;

	/* pick a few if size is increased */
	if (server.sv_provtracksize < max_concurrent_prov)
		set_task(WORK_Immed, 0, do_provisioning, NULL);

	rc = resize_prov_table(max_concurrent_prov);

	return rc;
}

/**
 * @brief
 *		Resizes and reinitializes prov_tracking table.
 *
 * @par Functionality:
 *		This function resizes and reinitializes provision tracking table as per
 *		newsize. If there is an action provisioning taking place and newsize is
 *		less than current size then it does not resize or reinitialize.
 *
 * @param[in]	newsize	-	new size of provision table
 *
 * @return	int
 * @retval	PBSE_NONE	: success or no action taken
 * @retval	PBSE_SYSTEM	: failure
 *
 * @par Side Effects:
 *		Unknown
 *
 * @par MT-safe: No
 *
 */
int
resize_prov_table(int newsize)
{
	struct prov_tracking *tmp;
	int i;
	int oldsize = server.sv_provtracksize; /* save  previous size */

	DBPRT(("resize_prov_table: oldsize = %d, newsize=%d\n", oldsize, newsize))

	if (newsize == oldsize)
		return PBSE_NONE;

	if (server.sv_cur_prov_records != 0) {
		if (newsize < oldsize)
			return PBSE_NONE;
	}

	server.sv_provtracksize = newsize;

	/* though the table is scattered, by the time we resize it, its guaranteed
	 that the table will have all empty slots, so resizing to smaller is also fine */

	/* realloc the existing memory size, since table size has changed */
	tmp = (struct prov_tracking *) realloc(server.sv_prov_track,
					       server.sv_provtracksize * sizeof(struct prov_tracking));
	if (tmp == NULL)
		return PBSE_SYSTEM;
	else
		server.sv_prov_track = tmp;

	for (i = oldsize; i < server.sv_provtracksize; i++) {
		memset(&(server.sv_prov_track[i]), 0, sizeof(struct prov_tracking));
		server.sv_prov_track[i].pvtk_mtime = 0;
	}

	server.sv_provtrackmodifed = 1;
	prov_track_save();
	set_sattr_l_slim(SVR_ATR_max_concurrent_prov, newsize, SET);
	svr_save_db(&server);
	return PBSE_NONE;
}

/**
 * @brief
 *		Allows or disallows setting current_aoe on a vnode.
 *
 * @par Functionality:
 *		This function is action routine for current_aoe attribute of a vnode.
 *		It allows setting current_aoe only if the aoe is available on the
 *		vnode and vnode is not provisioning/wait-provisioning and vnode is not
 *		a head node.
 *
 * @param[in]	new	-	pointer to new attribute value
 * @param[in]	pobj	-	pointer to node
 * @param[in]	act	-	action mode
 *
 * @return	int
 * @retval	PBSE_NONE	: success
 * @retval	PBSE_PROV_HEADERROR	: failure if node is a head node
 * @retval	PBSE_NODEPROV_NOACTION	: failure if node is provisioning/wait-provisioning
 * @retval	PBSE_NODE_BAD_CURRENT_AOE	: failure if aoe is not available on node
 *
 * @par Side Effects:
 *		Unknown
 *
 * @par MT-safe: No
 *
 */
int
node_current_aoe_action(attribute *new, void *pobj, int act)
{
	struct pbsnode *pnode = (struct pbsnode *) pobj;

	if (act == ATR_ACTION_RECOV)
		return PBSE_NONE;

	/* Check user tries to set on Head node */
	if (is_nattr_set(pnode, ND_ATR_Mom) && compare_short_hostname(get_nattr_str(pnode, ND_ATR_Mom), server_host) == 0)
		return PBSE_PROV_HEADERROR;

	/* Don't set/unset while provisioning */
	if ((pnode->nd_state & INUSE_PROV) ||
	    (pnode->nd_state & INUSE_WAIT_PROV))
		return PBSE_NODEPROV_NOACTION;

		/* check if value being set is available in resources_available.aoe */
#ifdef NAS /* localmod 148 */
	if (check_req_aoe_available(pnode, new->at_val.at_str) != 0) {
		if (pnode->nd_name != NULL) {
			sprintf(log_buffer, "node \"%s\" would have received PBSE_NODE_BAD_CURRENT_AOE, but localmod 148 avoided this", pnode->nd_name);
		} else {
			sprintf(log_buffer, "unknown node would have received PBSE_NODE_BAD_CURRENT_AOE");
		}

		log_err(-1, "node_current_aoe_action", log_buffer);
	}
#else
	if (check_req_aoe_available(pnode, new->at_val.at_str) != 0)
		return PBSE_NODE_BAD_CURRENT_AOE;
#endif /* localmod 148 */

	return PBSE_NONE;
}
