/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 *
 * @brief
 * contains functions to initialize several pbs data structures.
 *
 */
#include <pbs_config.h> /* the master config generated by configure */

#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <memory.h>
#include <signal.h>
#include <time.h>
#include <sys/stat.h>
#include <libutil.h>

#include <dirent.h>
#include <grp.h>
#include <netdb.h>
#include <pwd.h>
#include <unistd.h>
#include <sys/param.h>
#include <sys/resource.h>
#include <sys/time.h>

#include "libpbs.h"
#include "pbs_ifl.h"
#include "net_connect.h"
#include "log.h"
#include "list_link.h"
#include "attribute.h"
#include "server_limits.h"
#include "server.h"
#include "credential.h"
#include "ticket.h"
#include "batch_request.h"
#include "work_task.h"
#include "resv_node.h"
#include "job.h"
#include "queue.h"
#include "reservation.h"
#include "pbs_db.h"
#include "pbs_nodes.h"
#include "tracking.h"
#include "provision.h"
#include "pbs_idx.h"
#include "svrfunc.h"
#include "acct.h"
#include "pbs_version.h"
#include "tpp.h"
#include "pbs_license.h"
#include "resource.h"
#include "pbs_python.h"
#include "hook.h"
#include "hook_func.h"
#include "pbs_share.h"
#include "liblicense.h"

#ifndef SIGKILL
/* there is some weid stuff in gcc include files signal.h & sys/params.h */
#include <signal.h>
#endif

/* global Data Items */

extern char *msg_startup3;
extern char *msg_daemonname;
extern char *msg_init_abt;
extern char *msg_init_queued;
extern char *msg_init_substate;
extern char *msg_err_noqueue;
extern char *msg_err_noqueue1;
extern char *msg_init_resvNOq;
extern char *msg_init_recovque;
extern char *msg_init_recovresv;
extern char *msg_init_expctq;
extern char *msg_init_nojobs;
extern char *msg_init_exptjobs;
extern char *msg_init_unkstate;
extern char *msg_init_baddb;
extern char *msg_init_chdir;
extern char *msg_unkresc;
extern char *msg_corelimit;

extern char *acct_file;
extern int ext_license_server;
extern char *log_file;
extern char *path_acct;
extern char *path_usedlicenses;
extern char path_log[];
extern char *path_priv;
extern char *path_jobs;
extern char *path_users;
extern char *path_spool;
extern char *path_track;
extern char *path_prov_track;
extern long new_log_event_mask;
extern char server_name[];
extern pbs_list_head svr_newjobs;
extern pbs_list_head svr_allresvs;
extern time_t time_now;

extern struct server server;
extern struct attribute attr_jobscript_max_size;
extern char *path_hooks;
extern char *path_hooks_workdir;
extern pbs_list_head prov_allvnodes;
extern int max_concurrent_prov;
extern void *svr_db_conn;

extern pbs_list_head svr_allhooks;

/* External Functions Called */

extern void on_job_exit(struct work_task *);
extern void on_job_rerun(struct work_task *);
extern int resize_prov_table(int newsize);
extern void offline_all_provisioning_vnodes(void);
extern void stop_db();
extern job *job_recov_db_spl(pbs_db_job_info_t *dbjob, job *pjob);
extern pbs_sched *sched_alloc(char *sched_name);
extern job *recov_job_cb(pbs_db_obj_info_t *, int *);
extern resc_resv *recov_resv_cb(pbs_db_obj_info_t *, int *);
extern pbs_queue *recov_queue_cb(pbs_db_obj_info_t *, int *);
extern pbs_sched *recov_sched_cb(pbs_db_obj_info_t *, int *);
extern void revert_alter_reservation(resc_resv *presv);
extern void log_licenses(pbs_licenses_high_use *pu);
/* Private functions in this file */

static void catch_child(int);
static void init_abt_job(job *);
static void change_logs(int);
int chk_save_file(char *filename);
static void need_y_response(int, char *);
static int pbsd_init_reque(job *job, int change_state);
static void resume_net_move(struct work_task *);
static void stop_me(int);
static int Rmv_if_resv_not_possible(job *);
static int attach_queue_to_reservation(resc_resv *);
static void call_log_license(struct work_task *);
/* private data */

#define CHANGE_STATE 1
#define KEEP_STATE 0
static char badlicense[] = "One or more PBS license keys are invalid, jobs may not run";
char *pbs_licensing_location = NULL;
/**
 * @brief
 *		Initializes the server attribute array with default values which are
 * 		necessary for recovery and action routines to work properly.
 *
 * @return	void
 */
void
init_server_attrs()
{
	resource_def *prdef = NULL;
	resource *presc = NULL;
	int i = 0;

	for (i = 0; i < SVR_ATR_LAST; i++)
		clear_attr(get_sattr(i), &svr_attr_def[i]);

	set_sattr_str_slim(SVR_ATR_scheduler_iteration, TOSTR(PBS_SCHEDULE_CYCLE), NULL);

	server.newobj = 1;

	set_sattr_l_slim(SVR_ATR_State, SV_STATE_INIT, SET);

	set_sattr_l_slim(SVR_ATR_ResvEnable, 1, SET);
	(get_sattr(SVR_ATR_ResvEnable))->at_flags |= ATR_VFLAG_DEFLT;

	set_sattr_str_slim(SVR_ATR_SvrHost, server_host, NULL);
	(get_sattr(SVR_ATR_SvrHost))->at_flags |= ATR_VFLAG_DEFLT;

	set_sattr_l_slim(SVR_ATR_NodeFailReq, PBS_NODE_FAIL_REQUEUE_DEFAULT, SET);
	(get_sattr(SVR_ATR_NodeFailReq))->at_flags |= ATR_VFLAG_DEFLT;

	set_sattr_l_slim(SVR_ATR_maxarraysize, PBS_MAX_ARRAY_JOB_DFL, SET);
	(get_sattr(SVR_ATR_maxarraysize))->at_flags |= ATR_VFLAG_DEFLT;

	set_sattr_l_slim(SVR_ATR_license_min, PBS_MIN_LICENSING_LICENSES, SET);
	(get_sattr(SVR_ATR_license_min))->at_flags |= ATR_VFLAG_DEFLT;
	licensing_control.licenses_min = PBS_MIN_LICENSING_LICENSES;

	set_sattr_l_slim(SVR_ATR_license_max, PBS_MAX_LICENSING_LICENSES, SET);
	(get_sattr(SVR_ATR_license_max))->at_flags |= ATR_VFLAG_DEFLT;
	licensing_control.licenses_max = PBS_MAX_LICENSING_LICENSES;

	set_sattr_l_slim(SVR_ATR_license_linger, PBS_LIC_LINGER_TIME, SET);
	(get_sattr(SVR_ATR_license_linger))->at_flags |= ATR_VFLAG_DEFLT;
	licensing_control.licenses_linger_time = PBS_LIC_LINGER_TIME;

	set_sattr_l_slim(SVR_ATR_EligibleTimeEnable, 0, SET);
	(get_sattr(SVR_ATR_EligibleTimeEnable))->at_flags |= ATR_VFLAG_DEFLT;

	set_sattr_l_slim(SVR_ATR_max_concurrent_prov, PBS_MAX_CONCURRENT_PROV, SET);
	(get_sattr(SVR_ATR_max_concurrent_prov))->at_flags |= ATR_VFLAG_DEFLT;

	set_sattr_str_slim(SVR_ATR_max_job_sequence_id, TOSTR(SVR_MAX_JOB_SEQ_NUM_DEFAULT), NULL);
	(get_sattr(SVR_ATR_max_job_sequence_id))->at_flags |= ATR_VFLAG_DEFLT;

	set_attr_generic(&attr_jobscript_max_size, &svr_attr_def[SVR_ATR_jobscript_max_size], DFLT_JOBSCRIPT_MAX_SIZE, NULL, INTERNAL);
	attr_jobscript_max_size.at_type = ATR_TYPE_SIZE; /* needed by get_bytes_from_attr */

	set_sattr_l_slim(SVR_ATR_has_runjob_hook, 0, SET);
	set_sattr_l_slim(SVR_ATR_log_events, SVR_LOG_DFLT, SET);
	*log_event_mask = get_sattr_long(SVR_ATR_log_events);
	set_sattr_str_slim(SVR_ATR_mailer, SENDMAIL_CMD, NULL);
	set_sattr_str_slim(SVR_ATR_mailfrom, PBS_DEFAULT_MAIL, NULL);
	set_sattr_l_slim(SVR_ATR_query_others, 1, SET);
	set_sattr_l_slim(SVR_ATR_scheduling, 1, SET);

	prdef = &svr_resc_def[RESC_NCPUS];
	if (prdef) {
		attribute *pattr = get_sattr(SVR_ATR_DefaultChunk);
		presc = add_resource_entry(pattr, prdef);
		if (presc) {
			presc->rs_value.at_val.at_long = 1;
			presc->rs_value.at_flags = ATR_VFLAG_DEFLT | ATR_SET_MOD_MCACHE;
			pattr->at_flags = ATR_VFLAG_DEFLT | ATR_SET_MOD_MCACHE;
			(void) deflt_chunk_action(pattr, (void *) &server, ATR_ACTION_NEW);
		}
		pattr = get_sattr(SVR_ATR_resource_deflt);
		presc = add_resource_entry(pattr, prdef);
		if (presc) {
			presc->rs_value.at_val.at_long = 1;
			presc->rs_value.at_flags = ATR_VFLAG_DEFLT | ATR_SET_MOD_MCACHE;
			pattr->at_flags = ATR_VFLAG_DEFLT | ATR_SET_MOD_MCACHE;
		}
	}
}

/**
 * @brief
 *		This file contains the functions to initialize the PBS Batch Server.
 *		The code is called once when the server is brought up.
 *
 * @param[in]	type	- The type of initialization
 *							RECOV_CREATE - reinitializes all serverdb data
 *
 * @return	Error code
 * @retval	0	- Success
 * @retval	Non-Zero	- Failure
 *
 */
int
pbsd_init(int type)
{
	int a_opt = -1;
	int baselen;
	struct dirent *pdirent;
	DIR *dir;
	int fd;
	int i = 0;
	char zone_dir[MAXPATHLEN];
	char *hook_suffix = HOOK_FILE_SUFFIX;
	int hook_suf_len = strlen(hook_suffix);
	hook *phook, *phook_current;
	char *psuffix;
	int rc;
	struct stat statbuf;
	char hook_msg[HOOK_MSG_SIZE];
	char *conn_db_err = NULL;
	struct sigaction act;
	struct sigaction oact;

	struct tm *ptm;
	pbs_db_job_info_t dbjob = {{0}};
	pbs_db_resv_info_t dbresv = {{0}};
	pbs_db_que_info_t dbque = {{0}};
	pbs_db_sched_info_t dbsched = {{0}};
	pbs_db_obj_info_t obj = {0};
	void *conn = (void *) svr_db_conn;
	char *buf = NULL;
	int buf_len = 0;

#ifdef RLIMIT_CORE
	int char_in_cname = 0;
#endif /* RLIMIT_CORE */

	if ((job_attr_idx = cr_attrdef_idx(job_attr_def, JOB_ATR_LAST)) == NULL) {
		log_err(errno, __func__, "Failed creating job attribute search index");
		return (-1);
	}
	if ((node_attr_idx = cr_attrdef_idx(node_attr_def, ND_ATR_LAST)) == NULL) {
		log_err(errno, __func__, "Failed creating node attribute search index");
		return (-1);
	}
	if ((que_attr_idx = cr_attrdef_idx(que_attr_def, QA_ATR_LAST)) == NULL) {
		log_err(errno, __func__, "Failed creating queue attribute search index");
		return (-1);
	}
	if ((svr_attr_idx = cr_attrdef_idx(svr_attr_def, SVR_ATR_LAST)) == NULL) {
		log_err(errno, __func__, "Failed creating server attribute search index");
		return (-1);
	}
	if ((sched_attr_idx = cr_attrdef_idx(sched_attr_def, SCHED_ATR_LAST)) == NULL) {
		log_err(errno, __func__, "Failed creating sched attribute search index");
		return (-1);
	}
	if ((resv_attr_idx = cr_attrdef_idx(resv_attr_def, RESV_ATR_LAST)) == NULL) {
		log_err(errno, __func__, "Failed creating resv attribute search index");
		return (-1);
	}
	if (cr_rescdef_idx(svr_resc_def, svr_resc_size) != 0) {
		log_err(errno, __func__, "Failed creating resc definition search index");
		return (-1);
	}

	/* initialize the pointers in the resource_def array */

	for (i = 0; i < (svr_resc_size - 1); ++i)
		svr_resc_def[i].rs_next = &svr_resc_def[i + 1];
	/* last entry is left with null pointer */

	/* The following is code to reduce security risks                */

	log_supported_auth_methods(pbs_conf.supported_auth_methods);

	i = getgid();
	(void) setgroups(1, (gid_t *) &i); /* secure suppl. groups */

#ifdef RLIMIT_CORE
	if (pbs_conf.pbs_core_limit) {
		char *pc = pbs_conf.pbs_core_limit;
		while (*pc != '\0') {
			if (!isdigit(*pc)) {
				/* there is a character in core limit */
				char_in_cname = 1;
				break;
			}
			pc++;
		}
	}
#endif /* RLIMIT_CORE */

	{
		struct rlimit rlimit;

		rlimit.rlim_cur = RLIM_INFINITY;
		rlimit.rlim_max = RLIM_INFINITY;

		(void) setrlimit(RLIMIT_CPU, &rlimit);
		(void) setrlimit(RLIMIT_FSIZE, &rlimit);
		(void) setrlimit(RLIMIT_DATA, &rlimit);
		(void) setrlimit(RLIMIT_STACK, &rlimit);
#ifdef RLIMIT_RSS
		(void) setrlimit(RLIMIT_RSS, &rlimit);
#endif /* RLIMIT_RSS */
#ifdef RLIMIT_VMEM
		(void) setrlimit(RLIMIT_VMEM, &rlimit);
#endif /* RLIMIT_VMEM */
#ifdef RLIMIT_CORE
		if (pbs_conf.pbs_core_limit) {
			struct rlimit corelimit;
			corelimit.rlim_max = RLIM_INFINITY;
			if (strcmp("unlimited", pbs_conf.pbs_core_limit) == 0)
				corelimit.rlim_cur = RLIM_INFINITY;
			else if (char_in_cname == 1) {
				log_record(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_WARNING,
					   __func__, msg_corelimit);
				corelimit.rlim_cur = RLIM_INFINITY;
			} else
				corelimit.rlim_cur =
					(rlim_t) atol(pbs_conf.pbs_core_limit);
			(void) setrlimit(RLIMIT_CORE, &corelimit);
		}
#endif /* RLIMIT_CORE */
	}

	/* 1. set up to catch or ignore various signals */
	sigemptyset(&act.sa_mask);
	act.sa_flags = 0;
	act.sa_handler = change_logs;
	if (sigaction(SIGHUP, &act, &oact) != 0) {
		log_err(errno, __func__, "sigaction for HUP");
		return (2);
	}
	act.sa_handler = stop_me;
	if (sigaction(SIGINT, &act, &oact) != 0) {
		log_err(errno, __func__, "sigaction for INT");
		return (2);
	}
	if (sigaction(SIGTERM, &act, &oact) != 0) {
		log_err(errno, __func__, "sigactin for TERM");
		return (2);
	}
#ifdef NDEBUG
	if (sigaction(SIGQUIT, &act, &oact) != 0) {
		log_err(errno, __func__, "sigactin for QUIT");
		return (2);
	}
#endif /* NDEBUG */
#ifdef SIGSHUTDN
	if (sigaction(SIGSHUTDN, &act, &oact) != 0) {
		log_err(errno, __func__, "sigactin for SHUTDN");
		return (2);
	}
#endif /* SIGSHUTDN */

	act.sa_handler = catch_child;
	if (sigaction(SIGCHLD, &act, &oact) != 0) {
		log_err(errno, __func__, "sigaction for CHLD");
		return (2);
	}

	act.sa_handler = SIG_IGN;
	if (sigaction(SIGPIPE, &act, &oact) != 0) {
		log_err(errno, __func__, "sigaction for PIPE");
		return (2);
	}
	if (sigaction(SIGUSR1, &act, &oact) != 0) {
		log_err(errno, __func__, "sigaction for USR1");
		return (2);
	}
	if (sigaction(SIGUSR2, &act, &oact) != 0) {
		log_err(errno, __func__, "sigaction for USR2");
		return (2);
	}

	/* 2. check security and set up various global variables we need */

#if !defined(DEBUG) && !defined(NO_SECURITY_CHECK)
	rc = chk_file_sec(path_jobs, 1, 0, S_IWGRP | S_IWOTH, 1);
	if (stat(path_users, &statbuf) != 0)
		(void) mkdir(path_users, 0750);
	rc |= chk_file_sec(path_users, 1, 0, S_IWGRP | S_IWOTH, 1);
	rc |= chk_file_sec(path_hooks, 1, 0, S_IWGRP | S_IWOTH, 0);
	rc |= chk_file_sec(path_hooks_workdir, 1, 0, S_IWGRP | S_IWOTH, 0);
	rc |= chk_file_sec(path_spool, 1, 1, 0, 0);
	rc |= chk_file_sec(path_acct, 1, 1, S_IWGRP | S_IWOTH, 0);
	rc |= chk_file_sec(pbs_conf.pbs_environment, 0, 0, S_IWGRP | S_IWOTH, 1);

	if (rc) {
		log_err(-1, __func__, "chk_file_sec has a failure");
		return (3);
	}
#endif /* not DEBUG and not NO_SECURITY_CHECK */

	time_now = time(NULL);

	rc = setup_resc(1);
	if (rc != 0) {
		/* log_buffer set in setup_resc */
		log_err(-1, __func__, log_buffer);
		/* return value of -1 means a fatal error, -2 means errors
		 * were "auto-corrected" */
		if (rc == -1)
			return (-1);
	}

	/* 3. Set default server attibutes values */
	memset(&server, 0, sizeof(server));
	server.sv_started = time(&time_now); /* time server started */
	if (is_sattr_set(SVR_ATR_scheduling))
		a_opt = get_sattr_long(SVR_ATR_scheduling);

	init_server_attrs();

	/* 5. If not a "create" initialization, recover server db */
	/*    and sched db					  */
	rc = svr_recov_db();
	if ((rc != 0) && (type != RECOV_CREATE)) {
		pbs_db_get_errmsg(PBS_DB_ERR, &conn_db_err);
		if (conn_db_err != NULL) {
			log_errf(-1, __func__, "%s", conn_db_err);
			free(conn_db_err);
		}
		need_y_response(type, "no server database exists");
		type = RECOV_CREATE;
	}
	if (type != RECOV_CREATE) {
		/* Server read success full ?*/

		if (rc != 0) {
			log_errf(rc, __func__, msg_init_baddb);
			return (-1);
		}

		if (is_sattr_set(SVR_ATR_resource_assn))
			free_sattr(SVR_ATR_resource_assn);

		if (new_log_event_mask) {
			/* set to what was given on command line -e option */
			set_sattr_l_slim(SVR_ATR_log_events, new_log_event_mask, SET);
		}
		*log_event_mask = get_sattr_long(SVR_ATR_log_events);

		/* if server comment is a default, clear it */
		/* it will be reset as needed               */
		if (((get_sattr(SVR_ATR_Comment))->at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) == (ATR_VFLAG_SET | ATR_VFLAG_DEFLT))
			free_sattr(SVR_ATR_Comment);

		/* now do sched db */

		obj.pbs_db_obj_type = PBS_DB_SCHED;
		obj.pbs_db_un.pbs_db_sched = &dbsched;

		rc = pbs_db_search(conn, &obj, NULL, (query_cb_t) &recov_sched_cb);
		if (rc == -1) {
			pbs_db_get_errmsg(PBS_DB_ERR, &conn_db_err);
			if (conn_db_err != NULL) {
				log_errf(-1, __func__, "%s", conn_db_err);
				free(conn_db_err);
			}
			return (-1);
		}

		if (!dflt_scheduler) {
			dflt_scheduler = sched_alloc(PBS_DFLT_SCHED_NAME);
			set_sched_default(dflt_scheduler, 0);
			sched_save_db(dflt_scheduler);
		}

		if (get_sattr_long(SVR_ATR_scheduling))
			set_scheduler_flag(SCH_SCHEDULE_ETE_ON, NULL);
	} else {	     /* init type is "create" */
		if (rc == 0) /* server was loaded */
			need_y_response(type, "server database exists");

		svr_save_db(&server);

		dflt_scheduler = sched_alloc(PBS_DFLT_SCHED_NAME);
		set_sched_default(dflt_scheduler, 0);
		sched_save_db(dflt_scheduler);
	}

	/* 4. Check License information */

	reset_license_counters(&license_counts);

	fd = open(path_usedlicenses, O_RDONLY, 0400);

	if ((fd == -1) ||
	    (read(fd, &(license_counts.licenses_high_use), sizeof(pbs_licenses_high_use)) !=
	     sizeof(pbs_licenses_high_use))) {
		license_counts.licenses_high_use.lu_max_hr = 0;
		license_counts.licenses_high_use.lu_max_day = 0;
		license_counts.licenses_high_use.lu_max_month = 0;
		license_counts.licenses_high_use.lu_max_forever = 0;
		ptm = localtime(&time_now);
		license_counts.licenses_high_use.lu_day = ptm->tm_mday;
		license_counts.licenses_high_use.lu_month = ptm->tm_mon;
	}
	if (fd != -1)
		close(fd);

	set_sattr_str_slim(SVR_ATR_version, PBS_VERSION, NULL);

	if ((pbs_licensing_location == NULL) && (license_counts.licenses_local == 0)) {
		printf("%s\n", badlicense);
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_ALERT,
			  msg_daemonname, badlicense);
	}

	if (pbs_licensing_location) {
		sprintf(log_buffer, "Using license server at %s",
			PBS_LICENSE_LOCATION);
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_NOTICE,
			  msg_daemonname, log_buffer);
		printf("%s\n", log_buffer);
	}
	if (license_counts.licenses_local > 0) {
		sprintf(log_buffer,
			"Licenses valid for %ld Floating hosts",
			license_counts.licenses_local);
		log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_NOTICE,
			  msg_daemonname, log_buffer);
		printf("%s\n", log_buffer);
	}

	/* start a timed-event every hour to long the number of floating used */
	if ((license_counts.licenses_local > 0))
		(void) set_task(WORK_Timed, (long) (((time_now + 3600) / 3600) * 3600),
				call_log_license, 0);

	/* 6. open accounting file */

	if (acct_open(acct_file) != 0) {
		log_errf(-1, __func__, "Could not open accounting file");
		return (-1);
	}

	/* 7. Set up other server and global variables */

	if (a_opt != -1) {
		/* a_option was set, overrides saved value of scheduling attr */
		set_sattr_l_slim(SVR_ATR_scheduling, a_opt, SET);
	}

	/*
	 * 8A. If not a "create" initialization, recover queues.
	 *    If a create, remove any queues that might be there.
	 */
	if ((queues_idx = pbs_idx_create(0, 0)) == NULL) {
		log_err(-1, __func__, "Creating queue index failed!");
		return (-1);
	}

	server.sv_qs.sv_numque = 0;

	/* get jobs from DB for this instance of server, by port and address */
	obj.pbs_db_obj_type = PBS_DB_QUEUE;
	obj.pbs_db_un.pbs_db_que = &dbque;

	rc = pbs_db_search(conn, &obj, NULL, (query_cb_t) &recov_queue_cb);
	if (rc == -1) {
		pbs_db_get_errmsg(PBS_DB_ERR, &conn_db_err);
		if (conn_db_err != NULL) {
			log_errf(-1, __func__, "%s", conn_db_err);
			free(conn_db_err);
		}
		return (-1);
	}

	/* Open and read in node list if one exists */
	if ((rc = setup_nodes()) == -1) {
		/* log_buffer set in setup_nodes */
		log_errf(-1, __func__, log_buffer);
		return (-1);
	}
	mark_which_queues_have_nodes();

	/* at this point, we know all the resource types have been defined,        */
	/* build the resource summation table for validating the Select directives */
	update_resc_sum();

	/*
	 * 8B. If not a "create" initialization, recover reservations.
	 */
	/* set the zoneinfo directory to $PBS_EXEC/zoneinfo.
	 * This is used for standing reservations user of libical */
	sprintf(zone_dir, "%s%s", pbs_conf.pbs_exec_path, ICAL_ZONEINFO_DIR);
	set_ical_zoneinfo(zone_dir);

	/* load reservations */
	if ((resvs_idx = pbs_idx_create(0, 0)) == NULL) {
		log_err(-1, __func__, "Creating reservations index failed!");
		return (-1);
	}
	obj.pbs_db_obj_type = PBS_DB_RESV;
	obj.pbs_db_un.pbs_db_resv = &dbresv;

	rc = pbs_db_search(conn, &obj, NULL, (query_cb_t) &recov_resv_cb);
	if (rc == -1) {
		pbs_db_get_errmsg(PBS_DB_ERR, &conn_db_err);
		if (conn_db_err != NULL) {
			log_errf(-1, __func__, "%s", conn_db_err);
			free(conn_db_err);
		}
		return (-1);
	}

	/*
	 * 9. If not "create" or "clean" recovery, recover the jobs.
	 *    If a create or clean recovery, delete any jobs.
	 *    Before job creation/recovery, create the jobs index.
	 */
	if ((jobs_idx = pbs_idx_create(0, 0)) == NULL) {
		log_err(-1, __func__, "Creating jobs index failed!");
		return (-1);
	}

	server.sv_qs.sv_numjobs = 0;

	/* get jobs from DB */
	obj.pbs_db_obj_type = PBS_DB_JOB;
	obj.pbs_db_un.pbs_db_job = &dbjob;
	rc = pbs_db_search(conn, &obj, NULL, (query_cb_t) &recov_job_cb);
	if (rc == -1) {
		pbs_db_get_errmsg(PBS_DB_ERR, &conn_db_err);
		if (conn_db_err != NULL) {
			log_errf(-1, __func__, "%s", conn_db_err);
			free(conn_db_err);
		}
		return (-1);
	} else if (rc == 1) {
		if ((type != RECOV_CREATE) && (type != RECOV_COLD))
			log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER,
				  LOG_DEBUG, msg_daemonname, msg_init_nojobs);
	}

	log_eventf(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, LOG_NOTICE, msg_daemonname, msg_init_exptjobs, server.sv_qs.sv_numjobs);

	/* Now, cause any reservations marked RESV_FINISHED to be
	 * removed and place "begin" and "end" tasks onto the
	 * "work_task_timed" list, as appropriate, for those that
	 * remain
	 */

	remove_deleted_resvs();
	degrade_corrupted_confirmed_resvs();
	add_resv_beginEnd_tasks();

	resv_timer_init();

	/* Put us back in the Server's Private directory */

	if (chdir(path_priv) != 0) {
		(void) sprintf(log_buffer, msg_init_chdir, path_priv);
		log_err(-1, __func__, log_buffer);
		return (3);
	}

	/*
	 * 10. Recover the hooks.
	 *
	 */

	if (chdir(path_hooks) != 0) {
		(void) sprintf(log_buffer, msg_init_chdir, path_hooks);
		log_err(errno, __func__, log_buffer);
		return (-1);
	}

	dir = opendir(".");
	if (dir == NULL) {
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER,
			  LOG_DEBUG, msg_daemonname,
			  "Could not open hooks dir");
	} else {
		/* Now, for each hook found ... */

		while (errno = 0,
		       (pdirent = readdir(dir)) != NULL) {

			if (chk_save_file(pdirent->d_name) != 0) {
				continue;
			}

			/* recover the hooks */

			baselen = strlen(pdirent->d_name) - hook_suf_len;
			psuffix = pdirent->d_name + baselen;
			if (strcmp(psuffix, hook_suffix)) {
				continue;
			}

			if ((phook =
				     hook_recov(pdirent->d_name, NULL, hook_msg,
						sizeof(hook_msg),
						pbs_python_ext_alloc_python_script,
						pbs_python_ext_free_python_script)) == NULL) {
				sprintf(log_buffer,
					"hook_recov(%s): can't recover - %s",
					pdirent->d_name, hook_msg);
				log_event(PBSEVENT_SYSTEM,
					  PBS_EVENTCLASS_SERVER, LOG_NOTICE,
					  msg_daemonname, log_buffer);
			} else {
				sprintf(log_buffer, "Found hook %s type=%s",
					phook->hook_name,
					((phook->type == HOOK_SITE) ? "site" : "pbs"));
				log_event(PBSEVENT_SYSTEM | PBSEVENT_ADMIN |
						  PBSEVENT_DEBUG,
					  PBS_EVENTCLASS_SERVER,
					  LOG_INFO, msg_daemonname, log_buffer);
				if (phook->event & MOM_EVENTS)
					mark_mom_hooks_seen();
			}
		}

		if (errno != 0 && errno != ENOENT)
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER,
				  LOG_DEBUG, msg_daemonname,
				  "Could not read hooks dir");
		(void) closedir(dir);
	}
	print_hooks(0);
	print_hooks(HOOK_EVENT_QUEUEJOB);
	print_hooks(HOOK_EVENT_POSTQUEUEJOB);
	print_hooks(HOOK_EVENT_MODIFYJOB);
	print_hooks(HOOK_EVENT_RESVSUB);
	print_hooks(HOOK_EVENT_MODIFYRESV);
	print_hooks(HOOK_EVENT_MOVEJOB);
	print_hooks(HOOK_EVENT_RUNJOB);
	print_hooks(HOOK_EVENT_JOBOBIT);
	print_hooks(HOOK_EVENT_MANAGEMENT);
	print_hooks(HOOK_EVENT_MODIFYVNODE);
	print_hooks(HOOK_EVENT_PROVISION);
	print_hooks(HOOK_EVENT_PERIODIC);
	print_hooks(HOOK_EVENT_RESV_CONFIRM);
	print_hooks(HOOK_EVENT_RESV_BEGIN);
	print_hooks(HOOK_EVENT_RESV_END);

	/*
	 * cleanup  the hooks work directory
	 */

	cleanup_hooks_workdir(0);

	/* Put us back in the Server's Private directory */

	if (chdir(path_priv) != 0) {
		(void) sprintf(log_buffer, msg_init_chdir, path_priv);
		log_err(-1, __func__, log_buffer);
		return (3);
	}

	/* 11. Open and read in tracking records */

	fd = open(path_track, O_RDONLY | O_CREAT, 0600);
	if (fd < 0) {
		log_err(errno, __func__, "unable to open tracking file");
		return (-1);
	}
#if !defined(DEBUG) && !defined(NO_SECURITY_CHECK)
	if (chk_file_sec(path_track, 0, 0, S_IWGRP | S_IWOTH, 0) != 0)
		return (-1);
#endif /* not DEBUG and not NO_SECURITY_CHECK */

	if (fstat(fd, &statbuf) < 0) {
		log_err(errno, "pbs_init", "unable to stat tracking file");
		return (-1);
	} else {

		size_t amt;
		size_t rd;
		char *w;

		/* validate the size of the file, it should be a multiple */
		/* of the tracking structure size                         */

		i = statbuf.st_size / sizeof(struct tracking);
		amt = i * sizeof(struct tracking);

		if (amt != statbuf.st_size) {
			log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER,
				  LOG_ALERT, msg_daemonname,
				  "tracking file has invalid length");
		}
		if (i < PBS_TRACK_MINSIZE)
			server.sv_tracksize = PBS_TRACK_MINSIZE;
		else
			server.sv_tracksize = i;
		server.sv_track = (struct tracking *) calloc(server.sv_tracksize,
							     sizeof(struct tracking));
		if (server.sv_track == NULL) {
			log_err(errno, "init", "out of memory");
			return -1;
		}
		for (i = 0; i < server.sv_tracksize; i++)
			(server.sv_track + i)->tk_mtime = 0;

		w = (char *) server.sv_track;

		/* read in the file (a mutiple of the struct size) */

		while (amt > 0) {
			rd = read(fd, w, amt);
			if ((rd == -1) && (errno == EINTR)) {
				continue;
			} else if (rd <= 0) {
				break;
			}
			amt -= rd;
			w += rd;
		}
		(void) close(fd);
		server.sv_trackmodifed = 0;
	}

	/* set work task to periodically save the tracking records */

	(void) set_task(WORK_Timed, (long) (time_now + PBS_SAVE_TRACK_TM),
			track_save, 0);

	fd = open(path_prov_track, O_RDONLY | O_CREAT, 0600);
	if (fd < 0) {
		log_err(errno, __func__, "unable to open prov_tracking file");
		return (-1);
	}
#if !defined(DEBUG) && !defined(NO_SECURITY_CHECK)
	if (chk_file_sec(path_prov_track, 0, 0, S_IWGRP | S_IWOTH, 0) != 0)
		return (-1);
#endif /* not DEBUG and not NO_SECURITY_CHECK */

	if (fstat(fd, &statbuf) < 0) {
		log_err(errno, "pbs_init", "unable to stat prov_tracking file");
		return (-1);
	} else {
		size_t amt;
		size_t rd;
		char *p, *buffer;  /* to hold entire file data */
		int ctrl_flag = 0; /* we always write pvtk_mtime first */
		char *token;
		long mtime;
		i = 0;

		/* whats the size of data in file */
		amt = statbuf.st_size;

		server.sv_provtracksize = get_sattr_long(SVR_ATR_max_concurrent_prov);
		DBPRT(("%s: server.sv_provtracksize=%d amt=%ld\n", __func__, server.sv_provtracksize, (long) amt))

		p = malloc(amt + 1);
		if (p == NULL) {
			log_err(errno, "pbs_init", "unable to malloc");
			close(fd);
			return (-1);
		}
		buffer = p;

		/* read entire file into buffer */
		while (amt > 0) {
			rd = read(fd, p, amt);
			if ((rd == -1) && (errno == EINTR)) {
				continue;
			} else if (rd <= 0) {
				break;
			}
			amt -= rd;
			p += rd;
		}
		(void) close(fd);
		buffer[statbuf.st_size] = '\0';

		server.sv_prov_track = (struct prov_tracking *) calloc(server.sv_provtracksize,
								       sizeof(struct prov_tracking));
		if (server.sv_prov_track == NULL) {
			free(buffer);
			log_err(errno, "pbs_init", "unable to calloc");
			return (-1);
		}

		for (i = 0; i < server.sv_provtracksize; i++) {
			server.sv_prov_track[i].pvtk_mtime = 0;
			server.sv_prov_track[i].pvtk_pid = -1;
			server.sv_prov_track[i].pvtk_vnode = NULL;
			server.sv_prov_track[i].pvtk_aoe_req = NULL;
			server.sv_prov_track[i].prov_vnode_info = NULL;
		}

		/* start tokenizing by '|' */
		i = 0;
		token = strtok(buffer, "|");
		while (token != NULL && i < server.sv_provtracksize) {
			switch (ctrl_flag) {
				case 0:
					errno = 0;
					mtime = strtol(token, NULL, 10);
					if (errno) {
						free(buffer);
						free(server.sv_prov_track);
						log_err(errno, "pbs_init",
							"bad data in prov_tracking");
						return (-1);
					}
					server.sv_prov_track[i].pvtk_mtime = mtime;
					++ctrl_flag;
					break;
				case 1:
					/* after first save, 0 is written if */
					/* value is null. If reading 0, then */
					/* pvtk_vnode should be null else it */
					/* becomes "0" */
					if (strcmp(token, "0") != 0) {
						server.sv_prov_track[i].pvtk_vnode =
							(char *) malloc(strlen(token) + 1);
						if (server.sv_prov_track[i].pvtk_vnode == NULL) {
							free(buffer);
							free(server.sv_prov_track);
							log_err(errno, "pbs_init",
								"unable to malloc");
							return (-1);
						}
						strcpy(server.sv_prov_track[i].pvtk_vnode,
						       token);
					}
					++ctrl_flag;
					break;
				case 2:
					if (strcmp(token, "0") != 0) {
						server.sv_prov_track[i].pvtk_aoe_req =
							(char *) malloc(strlen(token) + 1);
						if (server.sv_prov_track[i].pvtk_vnode == NULL) {
							free(buffer);
							free(server.sv_prov_track);
							log_err(errno, "pbs_init",
								"unable to malloc");
							return (-1);
						}
						strcpy(server.sv_prov_track[i].pvtk_aoe_req,
						       token);
					}
					ctrl_flag = 0;
					++i;
					break;
			}
			token = strtok(NULL, "|");
		}
		server.sv_provtrackmodifed = 0;
		free(buffer);
		/* less data recovered than expected */
		if ((i != server.sv_provtracksize) && (statbuf.st_size != 0)) {
			sprintf(log_buffer, "Recovered prov_tracking, "
					    "Expected %d, recovered %d records",
				server.sv_provtracksize, i);
			log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, LOG_WARNING,
				  msg_daemonname, log_buffer);
		}
	}

	/* mark all nodes that are in the prov tracking table as offline,
	 * also do away with all jobs that were waiting on such nodes
	 */
	offline_all_provisioning_vnodes();
	server.sv_cur_prov_records = 0;

	(void) resize_prov_table(max_concurrent_prov);
	CLEAR_HEAD(prov_allvnodes);

	/* trigger degraded reservations on offlined nodes */
	degrade_offlined_nodes_reservations();

	hook_track_recov();

	/* Check to see that jobs in the maintenance_jobs attribute on a node still exist
	 * If they don't exist any more, remove them from a node's maintenance_jobs attribute
	 */
	buf = NULL;
	buf_len = 0;
	for (i = 0; i < svr_totnodes; i++) {
		struct array_strings *arst;
		if (is_nattr_set(pbsndlist[i], ND_ATR_MaintJobs) && (arst = get_nattr_arst(pbsndlist[i], ND_ATR_MaintJobs))->as_usedptr > 0) {
			int j;
			int len = 0;
			int cur_len = 0;
			attribute new;

			for (j = 0; j < arst->as_usedptr; j++)
				len += strlen(arst->as_string[j]) + 1; /* 1 for the comma*/

			if (len > buf_len) {
				char *tmp_buf;
				tmp_buf = realloc(buf, len + 1);
				if (tmp_buf == NULL) {
					free(buf);
					return (-1);
				} else {
					buf = tmp_buf;
					buf_len = len;
				}
			}
			buf[0] = '\0';
			for (j = 0; j < arst->as_usedptr; j++) {
				if (find_job(arst->as_string[j]) == NULL) {
					strncat(buf, arst->as_string[j], len);
					strncat(buf, ",", len);
					buf[len] = '\0';
				}
			}
			/* Did we find a string we need to remove*/
			cur_len = strlen(buf);
			if (cur_len > 0) {
				buf[cur_len - 1] = '\0'; /* remove trailing comma */
				clear_attr(&new, &node_attr_def[(int) ND_ATR_MaintJobs]);
				decode_arst(&new, ATTR_NODE_MaintJobs, NULL, buf);
				set_arst(get_nattr(pbsndlist[i], ND_ATR_MaintJobs), &new, DECR);
			}

			if (arst->as_usedptr > 0)
				set_vnode_state(pbsndlist[i], INUSE_MAINTENANCE, Nd_State_Or);
		}
	}
	free(buf);

	/* purge deleted hooks */
	phook = (hook *) GET_NEXT(svr_allhooks);
	while (phook) {
		phook_current = phook;
		phook = (hook *) GET_NEXT(phook->hi_allhooks);

		if (phook_current->pending_delete && !has_pending_mom_action_delete(phook_current->hook_name))
			hook_purge(phook_current, pbs_python_ext_free_python_script);
	}
	send_rescdef(0);
	hook_track_save(NULL, -1); /* refresh path_hooks_tracking file */

	(void) set_task(WORK_Immed, time_now, memory_debug_log, NULL);

	return (0);
}

/**
 * @brief
 * 		reassign_resc - for a recovered running job, reassign the resources and
 *		nodes to the job.
 *
 * @param[in,out]	pjob	- the job.
 *
 * @return	void
 */
static void
reassign_resc(job *pjob)
{
	int set_exec_vnode;
	int rc;
	int unset_resc_released = 0;
	char *hoststr = get_jattr_str(pjob, JOB_ATR_exec_host);
	char *hoststr2 = get_jattr_str(pjob, JOB_ATR_exec_host2);
	char *vnodein;
	char *vnodeout;

	/* safety check: if no hoststr, no node (hosts) assigned, just return */
	if (hoststr == NULL)
		return;

	if ((is_jattr_set(pjob, JOB_ATR_exec_vnode)) == 0) {
		/*
		 * if exec_vnode is not set, we must be dealing with a
		 * pre-8.0 job.   Then we need to set exec_vnode anew based
		 * on the select spec that was auto generated when the job
		 * was requeued and the existing exec_host.  This is done in
		 * the same as as when a "qrun -H vn+vn+... jobid" is done.
		 */
		set_exec_vnode = 1;
		vnodein = hoststr;
	} else {
		set_exec_vnode = 0;
		vnodein = get_jattr_str(pjob, JOB_ATR_exec_vnode);
	}

	rc = set_nodes((void *) pjob, JOB_OBJECT,
		       vnodein,
		       &vnodeout,
		       &hoststr,
		       &hoststr2,
		       set_exec_vnode,
		       TRUE);

	if (rc != 0) {
		sprintf(log_buffer, "Unable to reallocate resources from nodes for job, error %d", rc);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
			  pjob->ji_qs.ji_jobid, log_buffer);
	} else if (set_exec_vnode == 1) {
		/* need to recreate the exec_host/exec_vnode values */
		free_jattr(pjob, JOB_ATR_exec_host);
		free_jattr(pjob, JOB_ATR_exec_vnode);
		set_jattr_str_slim(pjob, JOB_ATR_exec_vnode, vnodeout, NULL);
		set_jattr_str_slim(pjob, JOB_ATR_exec_host, hoststr, NULL);
	}

	if ((rc == 0) && (is_jattr_set(pjob, JOB_ATR_exec_vnode_deallocated))) {
		char *hstr = NULL;
		char *hstr2 = NULL;
		char *vnalloc = NULL;
		char *new_exec_vnode_deallocated;

		new_exec_vnode_deallocated = get_jattr_str(pjob, JOB_ATR_exec_vnode_deallocated);

		rc = set_nodes((void *) pjob, JOB_OBJECT, new_exec_vnode_deallocated,
			       &vnalloc, &hstr, &hstr2, 1, TRUE);
		if (rc != 0) {
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_WARNING,
				  pjob->ji_qs.ji_jobid, "warning: Failed to make some nodes aware of a deleted job");
		}
	}

	if ((check_job_substate(pjob, JOB_SUBSTATE_SCHSUSP) || check_job_substate(pjob, JOB_SUBSTATE_SUSPEND)) &&
	    (is_jattr_set(pjob, JOB_ATR_resc_released))) {
		/*
		 * Allocating resources back to a suspended job is tricky.
		 * Suspended jobs only hold part of their resources
		 * If set_resc_assigned() is called by a job with the JOB_ATR_resc_released set,
		 * only some of the resources will be acted upon.  Since this
		 * is a fresh job from disk, we need to allocate all of
		 * its resources to it before we partially release some.
		 * We do this by temporarily unsetting JOB_ATR_resc_released attribute while
		 * restoring the job's resources.  This will allocate all of the
		 * requested resources to the job.  We add the flag back to the job
		 * and then decrement the resources released when the job was originally suspended.
		 */
		mark_jattr_not_set(pjob, JOB_ATR_resc_released);
		unset_resc_released = 1;
	}

	set_resc_assigned((void *) pjob, 0, INCR);

	if (unset_resc_released == 1) {
		mark_jattr_set(pjob, JOB_ATR_resc_released);
		set_resc_assigned((void *) pjob, 0, DECR);
	}
}

/**
 * @brief
 * 		pbsd_init_job - decide what to do with the recovered job structure
 *
 *		The action depends on the type of initialization.
 *
 * @param[in,out]	pjob	- the job.
 * @param[in]	type		- type of initialization.
 *
 * @return	int
 * @retval	0	- success
 * @retval	-1	- error.
 */
int
pbsd_init_job(job *pjob, int type)
{
	char newstate;
	int newsubstate;

	/* chk if job belongs to a reservation or is a reservation job.  If this is true
	* and the reservation is no longer possible, return (1) else return (0) */
	if (Rmv_if_resv_not_possible(pjob)) {
		account_record(PBS_ACCT_ABT, pjob, "");
		svr_mailowner(pjob, MAIL_ABORT, MAIL_NORMAL, msg_init_abt);
		check_block(pjob, msg_init_abt);
		job_purge(pjob);
		return 0;
	}

	pjob->ji_momhandle = -1;
	pjob->ji_mom_prot = PROT_INVALID;

	/* update at_server attribute in case name changed */

	free_jattr(pjob, JOB_ATR_at_server);
	set_jattr_generic(pjob, JOB_ATR_at_server, server_name, NULL, SET);

	/* now based on the initialization type */

	if ((type == RECOV_COLD) || (type == RECOV_CREATE)) {
		need_y_response(type, "jobs exists");
		init_abt_job(pjob);
	} else {

		if (type != RECOV_HOT)
			pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_HOTSTART;

		/* make sure JOB_SVFLG_RescAssn is cleared,		   */
		/* we will reassign resources if needed	based on the job's */
		/* substate (if the job had resources when server exited   */
		/* JOB_SVFLG_RescAssn is reset when the resources are	   */
		/* reassigned by calling reassign_resc().		   */
		pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_RescAssn;

		/* Update run_version if it is not set but run_count is,   */
		/* Likely means recovering a job from a older version      */
		if ((is_jattr_set(pjob, JOB_ATR_run_version) == 0) && is_jattr_set(pjob, JOB_ATR_runcount) != 0) {
			set_jattr_l_slim(pjob, JOB_ATR_run_version, get_jattr_long(pjob, JOB_ATR_runcount), SET);
		}

		if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) {
			if ((pjob->ji_parentaj = find_arrayparent(pjob->ji_qs.ji_jobid)) == NULL) {
				/* parent job object not found */
				init_abt_job(pjob);
				return -1;
			}

			update_sj_parent(pjob->ji_parentaj, pjob, pjob->ji_qs.ji_jobid, JOB_STATE_LTR_EXPIRED, get_job_state(pjob));
		}

		switch (get_job_substate(pjob)) {

			case JOB_SUBSTATE_TRANSICM:
				if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {

					/*
					 * This server created the job, so client
					 * was qsub (a transient client), it won't be
					 * arround to recommit, so auto-commit now
					 */

					set_job_state(pjob, JOB_STATE_LTR_QUEUED);
					set_job_substate(pjob, JOB_SUBSTATE_QUEUED);

					if (pbsd_init_reque(pjob, CHANGE_STATE) == -1)
						return -1;
				} else {
					/*
					 * another server is sending, append to new job
					 * list and wait for commit; need to clear
					 * receiving sock number though
					 */
					pjob->ji_qs.ji_un.ji_newt.ji_fromsock = -1;
					append_link(&svr_newjobs, &pjob->ji_alljobs, pjob);
				}
				break;

			case JOB_SUBSTATE_TRNOUT:
				set_job_state(pjob, JOB_STATE_LTR_QUEUED);
				set_job_substate(pjob, JOB_SUBSTATE_QUEUED);
				/* requeue as queued */
				if (pbsd_init_reque(pjob, CHANGE_STATE) == -1)
					return -1;
				break;

			case JOB_SUBSTATE_TRNOUTCM:

				if (check_job_state(pjob, JOB_STATE_LTR_RUNNING)) {
					/* was sending to Mom, requeue for now */

					svr_evaljobstate(pjob, &newstate, &newsubstate, 1);
					svr_setjobstate(pjob, newstate, newsubstate);
				} else {
					/* requeue as is - rdy to cmt */

					/* resend rtc */
					set_task(WORK_Immed, 0, resume_net_move, (void *) pjob);
				}
				if (pbsd_init_reque(pjob, KEEP_STATE) == -1)
					return -1;
				break;

			case JOB_SUBSTATE_QUEUED:
			case JOB_SUBSTATE_PRESTAGEIN:
			case JOB_SUBSTATE_STAGEIN:
			case JOB_SUBSTATE_STAGECMP:
			case JOB_SUBSTATE_STAGEFAIL:
			case JOB_SUBSTATE_STAGEGO:
			case JOB_SUBSTATE_HELD:
			case JOB_SUBSTATE_SYNCHOLD:
			case JOB_SUBSTATE_DEPNHOLD:
			case JOB_SUBSTATE_WAITING:
				if (pbsd_init_reque(pjob, CHANGE_STATE) == -1)
					return -1;
				break;

			case JOB_SUBSTATE_PRERUN:
				if (pbsd_init_reque(pjob, KEEP_STATE) == -1)
					return -1;
				break;

			case JOB_SUBSTATE_PROVISION:
				if (is_jattr_set(pjob, JOB_ATR_prov_vnode)) /* If JOB_ATR_prov_vnode is set, free it */
					free_jattr(pjob, JOB_ATR_prov_vnode);
				if (pbsd_init_reque(pjob, CHANGE_STATE) == -1)
					return -1;
				break;

			case JOB_SUBSTATE_RUNNING:
			case JOB_SUBSTATE_SUSPEND:
			case JOB_SUBSTATE_SCHSUSP:
			case JOB_SUBSTATE_BEGUN:
				if (pbsd_init_reque(pjob, KEEP_STATE) == -1)
					return -1;
				if (check_job_substate(pjob, JOB_SUBSTATE_RUNNING) ||
				    ((is_jattr_set(pjob, JOB_ATR_resc_released)) &&
				     (check_job_substate(pjob, JOB_SUBSTATE_SCHSUSP) ||
				      check_job_substate(pjob, JOB_SUBSTATE_SUSPEND)))) {

					reassign_resc(pjob);
					if (type == RECOV_HOT)
						pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HOTSTART;
				}
				break;

			case JOB_SUBSTATE_SYNCRES:

				/* clear all dependent job ready flags */

				if (pbsd_init_reque(pjob, CHANGE_STATE) == -1)
					return -1;
				break;

			case JOB_SUBSTATE_TERM:
			case JOB_SUBSTATE_EXITING:
			case JOB_SUBSTATE_STAGEOUT:
			case JOB_SUBSTATE_STAGEDEL:
			case JOB_SUBSTATE_EXITED:
				set_task(WORK_Immed, 0, on_job_exit, (void *) pjob);
				if (pbsd_init_reque(pjob, KEEP_STATE) == -1)
					return -1;
				reassign_resc(pjob);
				break;

			case JOB_SUBSTATE_ABORT:
				/* requeue job and if no nodes assigned,thats all */
				if (pbsd_init_reque(pjob, KEEP_STATE) == -1)
					return -1;
				if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HasNodes) != 0) {
					/* has nodes so reassign */
					set_task(WORK_Immed, 0, on_job_exit, (void *) pjob);
					reassign_resc(pjob);
				}
				break;

			case JOB_SUBSTATE_MOVED:
			case JOB_SUBSTATE_FAILED:
			case JOB_SUBSTATE_FINISHED:
			case JOB_SUBSTATE_TERMINATED:
				if (pbsd_init_reque(pjob, KEEP_STATE) == -1)
					return -1;
				break;

			case JOB_SUBSTATE_RERUN:
				if (check_job_state(pjob, JOB_STATE_LTR_EXITING))
					set_task(WORK_Immed, 0, on_job_rerun, (void *) pjob);
				if (pbsd_init_reque(pjob, KEEP_STATE) == -1)
					return -1;
				break;

			case JOB_SUBSTATE_RERUN1:
			case JOB_SUBSTATE_RERUN2:
			case JOB_SUBSTATE_RERUN3:
				set_task(WORK_Immed, 0, on_job_rerun, (void *) pjob);
				if (pbsd_init_reque(pjob, KEEP_STATE) == -1)
					return -1;
				break;

			default:
				(void) sprintf(log_buffer,
					       msg_init_unkstate, get_job_substate(pjob));
				log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB,
					  LOG_NOTICE,
					  pjob->ji_qs.ji_jobid, log_buffer);
				job_abt(pjob, log_buffer);
				return -1;
		}

		/* update entity limit sums for this job */
		(void) account_entity_limit_usages(pjob, NULL, NULL, INCR, ETLIM_ACC_ALL);

		/* if job has exec host of Mom, set addr and port based on hostname */

		if (pjob->ji_qs.ji_un_type == JOB_UNION_TYPE_EXEC) {
			pjob->ji_qs.ji_un.ji_exect.ji_momaddr = 0;
			pjob->ji_qs.ji_un.ji_exect.ji_momport = 0;

			if (is_jattr_set(pjob, JOB_ATR_exec_host)) {
				pbs_net_t new_momaddr;
				unsigned int new_momport;

				new_momaddr =
					get_addr_of_nodebyname(
						get_jattr_str(pjob, JOB_ATR_exec_host), &new_momport);

				if (new_momaddr != 0) {
					pjob->ji_qs.ji_un.ji_exect.ji_momaddr = new_momaddr;
					pjob->ji_qs.ji_un.ji_exect.ji_momport = new_momport;
				} else {
					log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB,
						  LOG_INFO, pjob->ji_qs.ji_jobid,
						  "Failed to update mom address. Mom address not changed.");
				}
			}
		}
	}
	return 0;
}

/**
 * @brief
 * 		pbsd_init_resv - decide what to do with the recovered reservation structure.
 *
 *		The action depends on the type of initialization.
 *
 * @param[in,out]	presv	- the reservation.
 * @param[in]		type	- type of initialization (read-only=0 , or ownership=1)
 * 					type is unused for now, will be used in later PRs
 *
 */
void
pbsd_init_resv(resc_resv *presv, int type)
{
	revert_alter_reservation(presv);
	is_resv_window_in_future(presv);
	set_old_subUniverse(presv);

	/* add resv to server list */
	append_link(&svr_allresvs, &presv->ri_allresvs, presv);
	if (attach_queue_to_reservation(presv))
		/* reservation needed queue; failed to find it */
		log_eventf(PBSEVENT_SYSTEM | PBSEVENT_ADMIN | PBSEVENT_DEBUG, PBS_EVENTCLASS_RESV,
			   LOG_NOTICE, msg_daemonname, msg_init_resvNOq, presv->ri_qs.ri_queue, presv->ri_qs.ri_resvID);
	else
		log_eventf(PBSEVENT_SYSTEM | PBSEVENT_ADMIN | PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER,
			   LOG_INFO, msg_daemonname, msg_init_recovresv, presv->ri_qs.ri_resvID);
}

/**
 * @brief
 * 		pbsd_init_node - decide what to do with the recovered node structure.
 *
 *		The action depends on the type of initialization.
 *
 * @param[in,out]	dbnode	- the node recovered.
 * @param[in]		type	- type of initialization (read-only=0 , or ownership=1)
 * 					type is unused for now, will be used in later PRs
 *
 * @return	ptr to pbsnode
 * @retval	Node structure	- success
 * @retval	NULL	- error.
 */
struct pbsnode *
pbsd_init_node(pbs_db_node_info_t *dbnode, int type)
{
	time_t mom_modtime = 0;
	struct pbsnode *np;
	svrattrl *pal;
	int bad;
	int rc = 0;
	int perm = ATR_DFLAG_ACCESS | ATR_PERM_ALLOW_INDIRECT;

	mom_modtime = dbnode->mom_modtime;

	pal = GET_NEXT(dbnode->db_attr_list.attrs);

	/* now create node and subnodes */
	rc = create_pbs_node2(dbnode->nd_name, pal, perm, &bad, &np, FALSE, TRUE); /* allow unknown resources */
	if (rc)
		np = NULL;

	if (np) {
		if (mom_modtime)
			np->nd_moms[0]->mi_modtime = mom_modtime;

		if (is_nattr_set(np, ND_ATR_vnode_pool) && get_nattr_long(np, ND_ATR_vnode_pool) > 0) {
			mominfo_t *pmom = np->nd_moms[0];
			if (pmom && (np == ((mom_svrinfo_t *) (pmom->mi_data))->msr_children[0])) {
				/* natural vnode being recovered, add to pool */
				add_mom_to_pool(np->nd_moms[0]);
			}
		}
	} else {
		if (rc == PBSE_NODEEXIST)
			sprintf(log_buffer, "duplicate node \"%s\"", dbnode->nd_name);
		else
			sprintf(log_buffer, "could not create node \"%s\", error = %d", dbnode->nd_name, rc);
		log_errf(-1, __func__, log_buffer);
	}
	return np;
}

/**
 * @brief
 * 		pbsd_init_reque - re-enqueue the job into the queue it was in
 *
 *		update the state, typically to some form of QUEUED.
 *		make sure substate attributes match actual value.
 *
 * @param[in,out]	pjob	- the job.
 * @param[in]	change_state- possible  values,
 * 								CHANGE_STATE - 1
 * 								KEEP_STATE	 - 0
 *
 * @return	int
 * @retval	0	- success
 * @retval	-1	- error.
 */
static int
pbsd_init_reque(job *pjob, int change_state)
{
	char logbuf[384];
	char newstate;
	int newsubstate;
	int rc;

	/* re-enqueue the job into the queue it was in */

	if (change_state) {
		/* update the state, typically to some form of QUEUED */
		unset_extra_attributes(pjob);
		svr_evaljobstate(pjob, &newstate, &newsubstate, 1);
		if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob)
			update_sj_parent(pjob->ji_parentaj, pjob, pjob->ji_qs.ji_jobid, get_job_state(pjob), newstate);
		set_job_state(pjob, newstate);
		set_job_substate(pjob, newsubstate);
	}

	/* make sure substate attributes match actual value */
	post_attr_set(get_jattr(pjob, JOB_ATR_substate));

	if ((rc = svr_enquejob(pjob, NULL)) == 0) {
		sprintf(logbuf, msg_init_substate, get_job_substate(pjob));
		(void) strcat(logbuf, msg_init_queued);
		(void) strcat(logbuf, pjob->ji_qs.ji_queue);
		log_event(PBSEVENT_SYSTEM | PBSEVENT_ADMIN | PBSEVENT_DEBUG,
			  PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, logbuf);
	} else {
		if (rc == PBSE_UNKQUE) {

			/* Oops, this should never happen */

			sprintf(logbuf, "%s %s; job %s queue %s",
				msg_err_noqueue, msg_err_noqueue1,
				pjob->ji_qs.ji_jobid, pjob->ji_qs.ji_queue);
		} else if (rc == PBSE_UNKRESC) {
			sprintf(logbuf, "%s %s; job %s",
				msg_err_noqueue, msg_unkresc,
				pjob->ji_qs.ji_jobid);
		} else {
			sprintf(logbuf, "%s; job %s queue %s error %d",
				msg_err_noqueue,
				pjob->ji_qs.ji_jobid, pjob->ji_qs.ji_queue, rc);
		}
		log_err(-1, "pbsd_init", logbuf);
		(void) job_abt(pjob, logbuf);
		return (-1);
	}
	return (0);
}

/**
 * @brief
 * 		catch_child() - the signal handler for  SIGCHLD.
 *		Set a flag for the main loop to know that a child processes
 *		needs to be reaped.
 *
 * @param[in]	sig	- not used in fun.
 *
 * @return	void
 */
static void
catch_child(int sig)
{
	extern int reap_child_flag;

	reap_child_flag = 1;
}

/**
 * @brief
 * 		change_logs - signal handler for SIGHUP
 *		Causes the accounting file and log file to be closed and reopened.
 *		Thus the old one can be renamed.
 *
 * @param[in]	sig	- not used in fun.
 *
 * @return	void
 */
static void
change_logs(int sig)
{
	acct_close();
	log_close(1);
	log_open(log_file, path_log);
	(void) acct_open(acct_file);
}

/**
 * @brief
 * 		stop_me - signal handler for all caught signals which terminate the server
 *
 *		Record the signal so an log_event call can be made outside of
 *		the handler, and set the server state to indicate we should shut down.
 *
 * @param[in]	sig	- not used in fun.
 *
 * @return	void
 */
/*ARGSUSED*/
static void
stop_me(int sig)
{
	set_sattr_l_slim(SVR_ATR_State, SV_STATE_SHUTSIG, SET);
}
/**
 * @brief
 * 		chk_save_file - check whether data can be saved into file.
 *
 *		checks include the file permission checks and regular file check.
 *
 * @param[in]	filename	- file which needs to be checked.
 *
 * @return	error code
 * @retval	0	- success
 * @retval	-1	- failure
 */
int
chk_save_file(char *filename)
{
	struct stat sb;

	if (stat(filename, &sb) == -1)
		return (errno);

	if (S_ISREG(sb.st_mode))
		return (0);
	return (-1);
}

/**
 * @brief
 * 		resume_net_move - call net_move() to complete the routing of a job
 *		This is invoked via a work task created on recovery of a job
 *		in JOB_SUBSTATE_TRNOUTCM state.
 *
 * @param[in]	ptask	- work task created on recovery of a job
 *
 * @return	void
 */
static void
resume_net_move(struct work_task *ptask)
{
	net_move((job *) ptask->wt_parm1, 0);
}

/**
 * @brief
 * 		need_y_response - on create/clean initialization that would delete
 *		information, obtain the operator approval first.
 *
 * @param[in]	type	- server initialization mode
 * @param[in]	txt	- text field in msg_startup3 string
 *
 * @return	void
 *
 * @par MT-safe: No
 */
static void
need_y_response(int type, char *txt)
{
	static int answ = -2;
	int c;
	char *t[] = {"Hot",
		     "Warm",
		     "Cold",
		     "Create"};

	char *tp;

	if (answ > 0)
		return; /* already gotten a response */

	fflush(stdin);
	if ((type > RECOV_CREATE) || (type < RECOV_HOT)) {
		stop_db();
		exit(1);
	}

	tp = t[type];

	printf(msg_startup3, msg_daemonname, server_name, tp, txt);
	while (1) {
		answ = getchar();
		c = answ;
		while ((c != '\n') && (c != EOF))
			c = getchar();
		switch (answ) {
			case 'y':
			case 'Y':
				return;

			case EOF:
			case '\n':
			case 'n':
			case 'N':
				printf("PBS server %s initialization aborted\n", server_name);
				stop_db();
				exit(0);
		}
		printf("y(es) or n(o) please:\n");
	}
}

/**
 * @brief
 * 		init_abt_job() - log and email owner message that job is being aborted at
 *		initialization; then purge job (must be called after job is enqueued).
 *
 * @param[in]	pjob	- job
 *
 * @return	void
 */
static void
init_abt_job(job *pjob)
{
	log_event(PBSEVENT_SYSTEM | PBSEVENT_ADMIN | PBSEVENT_DEBUG,
		  PBS_EVENTCLASS_JOB, LOG_INFO,
		  pjob->ji_qs.ji_jobid, msg_init_abt);
	svr_mailowner(pjob, MAIL_ABORT, MAIL_NORMAL, msg_init_abt);
	check_block(pjob, msg_init_abt);
	job_purge(pjob);
}

/**
 * @brief
 * 		Rmv_if_resv_not_possible - If the job belongs to a reservation that
 *		is no longer possible then report back that it should not be requeued.
 *
 * 		If the job is in a standing reservation queue then do not check whether it is
 * 		viable as this will be handled as part of the end event for the occurrence.
 * 		Note that the end event is added to the work task by remove_delete_resvs.
 *
 * @param[in,out]	pjob	- reservation job
 *
 * @return	return code
 * @retval	0	- OK to requeue
 * @retval	1	- should not be requeued
 */
static int
Rmv_if_resv_not_possible(job *pjob)
{
	int rc = 0; /*assume OK to requeue*/
	resc_resv *presv;
	pbs_queue *pque;

	if ((pque = find_queuebyname(pjob->ji_qs.ji_queue)) != 0) {
		if ((presv = pque->qu_resvp) != 0) {

			/*we are dealing with a job in a reservation*/

			pjob->ji_myResv = presv;

			/* If a standing reservation then ignore the check for end time
				* The behavior of a standing reservation differs from that of an
				* advance one in that only running jobs are deleted at the end of
				*  an occurrence (be it missed or not).
				*/
			if (get_rattr_long(presv, RESV_ATR_resv_count) > 1)
				return 0;

			if (presv->ri_qs.ri_etime < time_now)
				rc = 1;
		}
	}
	return (rc);
}

/**
 * @brief
 *  	attach_queue_to_reservation - if the reservation happens to
 *		be supported by a pbs_queue, find the queue and attach
 *		it to the reservation
 *
 * @param[in,out]	presv	- reservation.
 *
 * @return	int
 * @retval	0	- success
 * @retval	-1	- failure
 */
static int
attach_queue_to_reservation(resc_resv *presv)
{
	if (presv == NULL)
		return (0);
	presv->ri_qp = find_queuebyname(presv->ri_qs.ri_queue);

	if (presv->ri_qp) {
		/*resv points to queue and queue points back*/
		presv->ri_qp->qu_resvp = presv;
		return (0);
	} else
		return (-1);
}

/**
 * @brief
 * 		call_log_license - call the routine to long the floating license info
 *
 * @param[in]	ptask	- work task structure.
 *
 * @return	void
 */
static void
call_log_license(struct work_task *ptask)
{
	int fd;
	long ntime;
	struct tm *tms;

	/* log the floating license info */

	log_licenses(&license_counts.licenses_high_use);

	/* reset values for time periods that have passed */

	license_counts.licenses_high_use.lu_max_hr = 0;
	ntime = ptask->wt_event;
	tms = localtime((time_t *) &ntime);
	if (tms->tm_mday != license_counts.licenses_high_use.lu_day) {
		license_counts.licenses_high_use.lu_max_day = 0;
		license_counts.licenses_high_use.lu_day = tms->tm_mday;
	}
	if (tms->tm_mon != license_counts.licenses_high_use.lu_month) {
		license_counts.licenses_high_use.lu_max_month = 0;
		license_counts.licenses_high_use.lu_month = tms->tm_mon;
	}

	/* write current info to file */
	fd = open(path_usedlicenses, O_WRONLY | O_CREAT | O_TRUNC, 0600);
	if (fd != -1) {
		if (write(fd, &license_counts.licenses_high_use, sizeof(license_counts.licenses_high_use)) == -1)
			log_errf(-1, __func__, "write failed. ERR : %s",strerror(errno));
		close(fd);
	}

	/* call myself again at the top of the next hour */
	ntime = ((ntime + 3601) / 3600) * 3600;
	(void) set_task(WORK_Timed, ntime, call_log_license, 0);
}
