/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 * @file	req_delete.c
 *
 * Functions relating to the Delete Job Batch Requests.
 *
 * Included funtions are:
 *	remove_stagein()
 *	acct_del_write()
 *	check_deletehistoryjob()
 *	issue_delete()
 *	req_deletejob()
 *	req_deletejob2()
 *	req_deleteReservation()
 *	post_deljobfromresv_req()
 *
 */
#include <pbs_config.h> /* the master config generated by configure */

#include <stdio.h>
#include <sys/types.h>
#include <signal.h>
#include "portability.h"
#include "libpbs.h"
#include "server_limits.h"
#include "list_link.h"
#include "work_task.h"
#include "attribute.h"
#include "server.h"
#include "credential.h"
#include "batch_request.h"
#include "resv_node.h"
#include "queue.h"
#include "hook.h"

#include "job.h"
#include "reservation.h"
#include "pbs_error.h"
#include "acct.h"
#include "log.h"
#include "pbs_nodes.h"
#include "svrfunc.h"

#define QDEL_BREAKER_SECS 5

/* Global Data Items: */

extern char *msg_deletejob;
extern char *msg_delrunjobsig;
extern char *msg_manager;
extern char *msg_noDeljobfromResv;
extern char *msg_deleteresv;
extern char *msg_deleteresvJ;
extern char *msg_job_history_delete;
extern char *msg_job_history_notset;
extern char *msg_also_deleted_job_history;
extern char *msg_err_malloc;
extern struct server server;
extern time_t time_now;

/* External functions */

extern int issue_to_svr(char *, struct batch_request *, void (*func)(struct work_task *));
extern struct batch_request *cpy_stage(struct batch_request *, job *, enum job_atr, int);
extern resc_resv *chk_rescResv_request(char *, struct batch_request *);

/* Private Functions in this file */

static void post_delete_mom1(struct work_task *);
static void post_deljobfromresv_req(struct work_task *);
static int req_deletejob2(struct batch_request *preq, job *pjob);
static void resume_deletion(struct work_task *ptask);

/* Private Data Items */

static char *sigk = "SIGKILL";
static char *sigt = "SIGTERM";
static char *sigtj = SIG_TermJob;
static char *acct_fmt = "requestor=%s@%s";
static bool qdel_mail = true; /* true: sending mail */

/**
 * @brief
 * 	Service to resume the deletion - this is called from a work_interleave task 
 *
 * @param[in/out] preq - pointer to task structure
 *
 * @return void
 */
static void
resume_deletion(struct work_task *ptask)
{
	struct batch_request *preq = (struct batch_request *) ptask->wt_parm1;

	if (preq == NULL)
		return;

	if (preq->rq_type == PBS_BATCH_DeleteJobList)
		preq->rq_ind.rq_deletejoblist.rq_resume = TRUE;

	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_SERVER, LOG_DEBUG, __func__,
		   "Resuming deletetion operation");
	req_deletejob(preq);
	return;
}

/**
 * @brief
 * 		remove_stagein() - request that mom delete staged-in files for a job
 *		used when the job is to be purged after files have been staged in
 *
 * @param[in,out]	pjob	- job
 * 
 * @return	int
 * @retval	0	- success
 * @retval	non-zero	- error code
 */

int
remove_stagein(job *pjob)
{
	struct batch_request *preq = 0;
	int rc = 0;

	preq = cpy_stage(preq, pjob, JOB_ATR_stagein, 0);

	if (preq) { /* have files to delete		*/

		/* change the request type from copy to delete  */

		preq->rq_type = PBS_BATCH_DelFiles;
		preq->rq_extra = NULL;
		rc = relay_to_mom(pjob, preq, release_req);
		if (rc == 0) {
			pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_StagedIn;
		} else {
			/* log that we were unable to remove the files */
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_FILE,
				  LOG_NOTICE, pjob->ji_qs.ji_jobid,
				  "unable to remove staged-in files for job");
			free_br(preq);
		}
	}
	return rc;
}

/**
 * @brief
 * 		acct_del_write - write the Job Deleted account record
 *
 * @param[in]	jid	- Job Id.
 * @param[in]	pjob	- Job structure.
 * @param[in]	preq - batch_request
 * @param[in]	nomail	- do not send mail to the job owner if enabled.
 */

static void
acct_del_write(char *jid, job *pjob, struct batch_request *preq, int nomail)
{
	sprintf(log_buffer, acct_fmt, preq->rq_user, preq->rq_host);
	write_account_record(PBS_ACCT_DEL, jid, log_buffer);

	sprintf(log_buffer, msg_manager, msg_deletejob,
		preq->rq_user, preq->rq_host);
	log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, jid, log_buffer);

	if (pjob != NULL) {
		int jt = is_job_array(jid);

		switch (jt) {
			case IS_ARRAY_NO: /* not a subjob */
			case IS_ARRAY_ArrayJob:

				/* if block set, send word */
				check_block(pjob, log_buffer);
		}

		if (preq->rq_parentbr == NULL && nomail == 0 &&
		    svr_chk_owner(preq, pjob) != 0 &&
		    qdel_mail != 0) {
			svr_mailowner_id(jid, pjob,
					 MAIL_OTHER, MAIL_FORCE, log_buffer);
		}
	}
}

/**
 * @brief check whether all jobs got deleted
 * 
 * @param[in] preply - reply struct
 * @return bool
 * @retval true - All jobs got deleted.
 * @retval false - more jobs pending to be deleted
 */
static bool
all_jobs_deleted(struct batch_reply *preply)
{
	void *idx = preply->brp_un.brp_deletejoblist.undeleted_job_idx;

	if (pbs_idx_is_empty(idx)) {
		pbs_idx_destroy(idx);
		preply->brp_un.brp_deletejoblist.undeleted_job_idx = NULL;
		return TRUE;
	}

	return FALSE;
}

/**
 * @Brief
 * 	- Updates the reply struct with the finished jobid
 *	- Updates the job's error code in reply structure by allocating 
 *		batch_deljob_status struct.
 *	- Check if it is time to respond back.
 *
 * @param[in]	preq - pointer batch request structure
 * @param[in]	jid	- job id.
 * @param[in]	errcode - Job's error code 
 *
 * @return	bool
 * @retval	TRUE	- all jobs deleted
 * @retval	FALSE	- jobs remaining to be deleted
 */
bool
update_deljob_rply(struct batch_request *preq, char *jid, int errcode)
{
	struct batch_deljob_status *pdelstat;
	struct batch_reply *preply = &preq->rq_reply;
	void *idx;
	char **data = NULL;

	if (preq->rq_type != PBS_BATCH_DeleteJobList)
		return FALSE;

	if (errcode != PBSE_NONE) {
		/* allocate reply structure and fill in jobid and status portion */
		pdelstat = (struct batch_deljob_status *) malloc(sizeof(struct batch_deljob_status));
		if (pdelstat == NULL)
			log_err(PBSE_SYSTEM, __func__, "Failed to allocate memory");

		pdelstat->name = strdup(jid);
		pdelstat->code = errcode;
		pdelstat->next = preply->brp_un.brp_deletejoblist.brp_delstatc;
		preply->brp_un.brp_deletejoblist.brp_delstatc = pdelstat;
		preq->rq_reply.brp_count++;
	}

	idx = preply->brp_un.brp_deletejoblist.undeleted_job_idx;
	if (jid)
		if (pbs_idx_find(idx, (void **) &jid, (void **) &data, NULL) == PBS_IDX_RET_OK)
			pbs_idx_delete(idx, jid);
		else
			log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, LOG_DEBUG, __func__,
				   "job %s has already been deleted from delete job list", jid);
	else
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_SERVER, LOG_DEBUG,
			  __func__, "update_deljob_rply invoked with empty job id");

	return all_jobs_deleted(preply);
}

/**
 * @Brief
 *		If the job is a history job then purge its history
 *		If the job is a non-history job then it must be terminated before purging its history. Will be
 *		done by req_deletejob()
 *
 * @param[in]	preq	- Batch request structure.
 *
 * @return	int
 * @retval	TRUE	- Job history  has been purged
 * @retval	FALSE	- Job is not a history job
 */
int
check_deletehistoryjob(struct batch_request *preq, char *jid)
{
	job *histpjob;
	job *pjob;
	int historyjob;
	int histerr;
	int t;

	/*
	 * If the array subjob or range of subjobs are in a history state then
	 * reject the request as we cant delete history of array subjobs
	 */
	t = is_job_array(jid);
	if ((t == IS_ARRAY_Single) || (t == IS_ARRAY_Range)) {
		pjob = find_arrayparent(jid);
		if ((histerr = svr_chk_histjob(pjob))) {
			if (update_deljob_rply(preq, jid, PBSE_NOHISTARRAYSUBJOB))
				req_reject(PBSE_NOHISTARRAYSUBJOB, 0, preq);
			return TRUE;
		} else {
			/*
			 * Job is in a Non Finished state . It must be terminated and then its history
			 *  should be purged .
			 */
			return FALSE;
		}
	}

	histpjob = find_job(jid);

	historyjob = svr_chk_histjob(histpjob);
	if (historyjob == PBSE_HISTJOBID) {
		snprintf(log_buffer, sizeof(log_buffer),
			 msg_job_history_delete, preq->rq_user,
			 preq->rq_host);
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
			  jid,
			  log_buffer);

		/* Issue history job delete request to remote server if job is moved. */
		if (check_job_state(histpjob, JOB_STATE_LTR_MOVED))
			issue_delete(histpjob);

		if (histpjob->ji_qs.ji_svrflags & JOB_SVFLG_ArrayJob) {
			if (histpjob->ji_ajinfo) {
				int i;
				for (i = histpjob->ji_ajinfo->tkm_start; i <= histpjob->ji_ajinfo->tkm_end; i += histpjob->ji_ajinfo->tkm_step) {
					job *psjob = get_subjob_and_state(histpjob, i, NULL, NULL);
					if (psjob) {
						log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
							   psjob->ji_qs.ji_jobid,
							   msg_job_history_delete, preq->rq_user,
							   preq->rq_host);
						job_purge(psjob);
					}
				}
			}
		}

		job_purge(histpjob);

		if (update_deljob_rply(preq, jid, PBSE_HISTJOBDELETED))
			reply_send(preq);
		return TRUE;
	} else {
		/*
		 *  Job is in a Non Finished state . It must be terminated and then its history
		 * should be purged .
		 */
		return FALSE;
	}
}

/**
 * @Brief
 *		Issue PBS_BATCH_DeleteJob request to remote server.
 *
 * @param[in]	pjob - Job structure.
 */
void
issue_delete(job *pjob)
{
	struct batch_request *preq;
	char rmt_server[PBS_MAXSERVERNAME + 1] = {'\0'};
	char *at = NULL;

	if (pjob == NULL)
		return;

	if ((at = strchr(get_jattr_str(pjob, JOB_ATR_in_queue), (int) '@')) == NULL)
		return;

	snprintf(rmt_server, sizeof(rmt_server), "%s", at + 1);

	preq = alloc_br(PBS_BATCH_DeleteJob);
	if (preq == NULL)
		return;

	pbs_strncpy(preq->rq_ind.rq_delete.rq_objname, pjob->ji_qs.ji_jobid, sizeof(preq->rq_ind.rq_delete.rq_objname));
	preq->rq_extend = malloc(strlen(DELETEHISTORY) + 1);
	if (preq->rq_extend == NULL) {
		log_err(errno, "issue_delete", msg_err_malloc);
		return;
	}

	strncpy(preq->rq_extend, DELETEHISTORY, strlen(DELETEHISTORY) + 1);

	issue_to_svr(rmt_server, preq, release_req);
}

/**
 * @Brief
 *		decrement entity usage for a single un-instantiated subjob
 *
 * @param[in]	parent - pointer to parent Job structure.
 */
static void
decr_single_subjob_usage(job *parent)
{
	parent->ji_qs.ji_svrflags &= ~JOB_SVFLG_ArrayJob;				 /* small hack to decrement usage for a single un-instantiated subjob */
	account_entity_limit_usages(parent, NULL, NULL, DECR, ETLIM_ACC_ALL);		 /* for server limit */
	account_entity_limit_usages(parent, parent->ji_qhdr, NULL, DECR, ETLIM_ACC_ALL); /* for queue limit */
	parent->ji_qs.ji_svrflags |= JOB_SVFLG_ArrayJob;				 /* setting arrayjob flag back */
}

/**
 * @brief Initialize routine for deljoblist
 * reorder the deljob list so that queued jobs will appear first.
 * do not reorder if already sorted
 * 
 * deljob can wait if jobs are in transit state.
 * This triggers other jobs in deljob list to get triggerred to be run.
 * delete will take longer to delete these running jobs as it has to contact the mom
 * meanwhile other jobs in the list will start to run
 * triggering a series of delete followed by run.
 * This has performance issues and makes debugging slower. 
 * Ordering should prevent that.
 * Complexity: O(N)
 * 
 * @param[in] preq - request structure
 *
 * @return int
 * @retval 0 for success
 * @retval 1 for failure
 */
static int
init_deljoblist(struct batch_request *preq)
{
	int head = -1;
	int tail;
	char **jlist = preq->rq_ind.rq_deletejoblist.rq_jobslist;
	struct batch_reply *preply = &preq->rq_reply;
	job *pjob;
	char *temp;

	if (preq->rq_ind.rq_deletejoblist.rq_resume)
		return 0;

	if (!jlist || !jlist[0])
		return 0;

	preply->brp_un.brp_deletejoblist.undeleted_job_idx = pbs_idx_create(0, 0);

	for (tail = 0; jlist[tail]; tail++) {

		pjob = find_job(jlist[tail]);
		if (!pjob)
			continue;

		pbs_idx_insert(preply->brp_un.brp_deletejoblist.undeleted_job_idx, pjob->ji_qs.ji_jobid, NULL);
		if (strcmp(jlist[tail], pjob->ji_qs.ji_jobid) != 0) {
			free(jlist[tail]);
			jlist[tail] = strdup(pjob->ji_qs.ji_jobid);
			if (jlist[tail] == NULL)
				return 1;
		}

		if (head == -1) {
			if (get_job_state(pjob) != JOB_STATE_LTR_QUEUED) {
				head = tail;
			}
			continue;
		}

		if (get_job_state(pjob) == JOB_STATE_LTR_QUEUED) {
			temp = jlist[head];
			jlist[head++] = jlist[tail];
			jlist[tail] = temp;
		}
	}

	return 0;
}

/**
 * @brief delete any pending array jobs part of delete request
 * 	from the reply index
 * 
 * @param[in] preq - request structure
 * 
 * @return bool
 * @retval TRUE: no more jobs to be deleted
 * @reval  FALSE: more jobs pending to be deleted
 */
bool
delete_pending_arrayjobs(struct batch_request *preq)
{
	void *idx;
	void *idx_ctx = NULL;
	char *jid = NULL;
	struct batch_reply *preply = &preq->rq_reply;
	char **data = NULL;

	idx = preply->brp_un.brp_deletejoblist.undeleted_job_idx;

	while (pbs_idx_find(idx, (void **) &jid, (void **) &data, &idx_ctx) == PBS_IDX_RET_OK) {
		int job_type = is_job_array(jid);
		if (jid && (((job_type == IS_ARRAY_ArrayJob) || (job_type == IS_ARRAY_Range) || (job_type == IS_ARRAY_Single))))
			pbs_idx_delete(idx, jid);
	}

	pbs_idx_free_ctx(idx_ctx);

	return all_jobs_deleted(preply);
}

/**
 * @brief
 * 		req_deletejob - service the Delete Job Request
 *
 *		This request deletes a job.
 *
 * @param[in]	preq	- Job Request
 */
void
req_deletejob(struct batch_request *preq)
{
	int forcedel = 0;
	int i;
	char jid[PBS_MAXSVRJOBID + 1];
	int jt; /* job type */
	char *pc;
	job *pjob;
	job *parent;
	char *range;
	char sjst; /* subjob state */
	int rc = 0;
	int delhist = 0;
	int err = PBSE_NONE;
	char **jobids;
	int count;
	int j;
	struct batch_reply *preply = &preq->rq_reply;
	preply->brp_un.brp_deletejoblist.brp_delstatc = NULL;
	preply->brp_count = 0;
	int start_jobid = 0;
	time_t begin_time = time(NULL);

	if (preq->rq_type == PBS_BATCH_DeleteJobList) {

		if (!preq->rq_ind.rq_deletejoblist.rq_resume) {
			if (init_deljoblist(preq) != 0) {
				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_SERVER, LOG_DEBUG, __func__,
						"Error while initializing deljoblist operation (init_deljoblist failed)");
				req_reject(PBSE_INTERNAL, 0, preq);
				return;
			}
		} else
			start_jobid = preq->rq_ind.rq_deletejoblist.jobid_to_resume;

		preply->brp_choice = BATCH_REPLY_CHOICE_Delete;
		jobids = preq->rq_ind.rq_deletejoblist.rq_jobslist;
		count = preq->rq_ind.rq_deletejoblist.rq_count;
	} else {
		jobids = break_comma_list(preq->rq_ind.rq_delete.rq_objname);
		count = 1;
	}

	if (preq->rq_extend && strstr(preq->rq_extend, DELETEHISTORY))
		delhist = 1;
	if (preq->rq_extend && strstr(preq->rq_extend, FORCE))
		forcedel = 1;

	/* with nomail , nomail_force , nomail_deletehist or nomailforce_deletehist options are set
	 *  no mail is sent
	 */
	if (preq->rq_extend && strstr(preq->rq_extend, NOMAIL))
		qdel_mail = false;

	for (j = start_jobid; j < count; j++) {

		if (j != start_jobid) {
			preq->rq_ind.rq_deletejoblist.rq_resume = FALSE;
			preq->rq_ind.rq_deletejoblist.jobid_to_resume = j;
		}

		if ((time(NULL) - begin_time) > QDEL_BREAKER_SECS) {
			log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_SERVER, LOG_DEBUG, __func__,
				   "req_delete has been running for %d seconds, Pausing for other requests",
				   QDEL_BREAKER_SECS);
			set_task(WORK_Interleave, 0, resume_deletion, preq);
			return;
		}

		snprintf(jid, sizeof(jid), "%s", jobids[j]);
		parent = chk_job_request(jid, preq, &jt, &err);
		if (parent == NULL) {
			pjob = find_job(jid);
			if (pjob != NULL && pjob->ji_pmt_preq != NULL)
				reply_preempt_jobs_request(err, PREEMPT_METHOD_DELETE, pjob);
			if (update_deljob_rply(preq, jid, err)) {
				if (preq->rq_type == PBS_BATCH_DeleteJobList)
					req_reject(err, 0, preq); /* note, req_reject is not called for delete job request 2 */
				return;
			} else
				continue;
		}

		if (delhist) {
			rc = check_deletehistoryjob(preq, jid);
			if (rc == TRUE)
				continue;
		}

		if (jt == IS_ARRAY_NO) {

			/* just a regular job, pass it on down the line and be done
			 * If the request is to purge the history of the job then set ji_deletehistory to 1
			 */
			if (delhist)
				parent->ji_deletehistory = 1;
			if (req_deletejob2(preq, parent) == 2)
				return;
			continue;

		} else if (jt == IS_ARRAY_Single) {
			/* single subjob, if running do full delete, */
			/* if not then just set it expired		 */

			pjob = get_subjob_and_state(parent, get_index_from_jid(jid), &sjst, NULL);
			if (sjst == JOB_STATE_LTR_UNKNOWN) {
				if (update_deljob_rply(preq, jid, PBSE_IVALREQ)) {
					req_reject(PBSE_IVALREQ, 0, preq);
					return;
				} else
					continue;
			}

			if ((sjst == JOB_STATE_LTR_EXITING) && (forcedel == 0)) {
				if (parent->ji_pmt_preq != NULL) {
					pjob = find_job(jid);
					reply_preempt_jobs_request(PBSE_BADSTATE, PREEMPT_METHOD_DELETE, pjob);
				}
				if (update_deljob_rply(preq, jid, PBSE_BADSTATE)) {
					req_reject(PBSE_BADSTATE, 0, preq);
					return;
				} else
					continue;
			} else if (sjst == JOB_STATE_LTR_EXPIRED) {
				if (update_deljob_rply(preq, jid, PBSE_NOHISTARRAYSUBJOB)) {
					req_reject(PBSE_NOHISTARRAYSUBJOB, 0, preq);
					return;
				} else
					continue;
			} else if (pjob != NULL) {
				/*
				* If the request is to also purge the history of the sub job then set ji_deletehistory to 1
				*/
				if (delhist)
					pjob->ji_deletehistory = 1;
				if (req_deletejob2(preq, pjob) == 2)
					return;
			} else {
				update_sj_parent(parent, NULL, jid, sjst, JOB_STATE_LTR_EXPIRED);
				acct_del_write(jid, parent, preq, 0);
				parent->ji_ajinfo->tkm_dsubjsct++;
				decr_single_subjob_usage(parent);
				if (update_deljob_rply(preq, jid, PBSE_NONE))
					reply_ack(preq);
			}
			chk_array_doneness(parent);
			continue;

		} else if (jt == IS_ARRAY_ArrayJob) {
			int del_parent = 1;
			int start = parent->ji_ajinfo->tkm_start;

			/*
			 * For array jobs the history is stored at the parent array level and also at the subjob level .
			 * If the request is to delete the history of an array job then set  ji_deletehistory to 1 for
			 * the parent array.The function chk_array_doneness() will take care of eventually
			 *  purging the history .
			 */
			if (delhist)
				parent->ji_deletehistory = 1;
			/* The Array Job itself ... */
			/* for each subjob that is running, delete it via req_deletejob2 */

			if (preq->rq_ind.rq_deletejoblist.rq_resume)
				start = preq->rq_ind.rq_deletejoblist.subjobid_to_resume;
			else {
				/* in case of resume counts are incremented already */
				++preq->rq_refct;
			}

			/* keep the array from being removed while we are looking at it */
			parent->ji_ajinfo->tkm_flags |= TKMFLG_NO_DELETE;
			for (i = start; i <= parent->ji_ajinfo->tkm_end; i += parent->ji_ajinfo->tkm_step) {

				if ((time(NULL) - begin_time) > QDEL_BREAKER_SECS) {
					preq->rq_ind.rq_deletejoblist.subjobid_to_resume = i;
					log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_SERVER, LOG_DEBUG, __func__,
						   "req_delete has been running for %d seconds, Pausing for other requests",
						   QDEL_BREAKER_SECS);
					set_task(WORK_Interleave, 0, resume_deletion, preq);
					return;
				}
				pjob = get_subjob_and_state(parent, i, &sjst, NULL);
				if (sjst == JOB_STATE_LTR_UNKNOWN)
					continue;
				if ((sjst == JOB_STATE_LTR_EXITING) && !forcedel)
					continue;
				if (pjob) {
					if (delhist)
						pjob->ji_deletehistory = 1;
					if (check_job_state(pjob, JOB_STATE_LTR_EXPIRED)) {
						log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO,
							   pjob->ji_qs.ji_jobid,
							   msg_job_history_delete, preq->rq_user,
							   preq->rq_host);
						job_purge(pjob);
					} else {
						if (dup_br_for_subjob(preq, pjob, req_deletejob2) == 2)
							continue;
						del_parent = 0;
					}
				} else {
					/* Queued, Waiting, Held, just set to expired */
					if (sjst != JOB_STATE_LTR_EXPIRED) {
						update_sj_parent(parent, NULL, create_subjob_id(parent->ji_qs.ji_jobid, i), sjst, JOB_STATE_LTR_EXPIRED);
						decr_single_subjob_usage(parent);
					}
				}
			}
			parent->ji_ajinfo->tkm_flags &= ~TKMFLG_NO_DELETE;

			/* if deleting running subjobs, then just return;            */
			/* parent will be deleted when last running subjob(s) ends   */
			/* and reply will be sent to client when last delete is done */
			/* If not deleteing running subjobs, delete2 to del parent   */

			if (--preq->rq_refct == 0) {
				if ((parent = find_job(jid)) != NULL) {
					if (req_deletejob2(preq, parent) == 2) {
						preq->rq_ind.rq_deletejoblist.subjobid_to_resume = parent->ji_ajinfo->tkm_end;
						++preq->rq_refct;
						return;
					}
					del_parent = 0;
				} else if (update_deljob_rply(preq, jid, PBSE_NONE))
					reply_send(preq);
			} else
				acct_del_write(jid, parent, preq, 0);

			if (del_parent == 1) {
				if ((parent = find_job(jid)) != NULL) {
					if (req_deletejob2(preq, parent) == 2) {
						preq->rq_ind.rq_deletejoblist.subjobid_to_resume = parent->ji_ajinfo->tkm_end;
						++preq->rq_refct;
						return;
					}
				}
			}

			continue;
		}
		/* what's left to handle is a range of subjobs, foreach subjob 	*/
		/* if running, do full delete, else just update state	        */

		range = get_range_from_jid(jid);
		if (range == NULL) {
			if (update_deljob_rply(preq, jid, PBSE_IVALREQ)) {
				req_reject(PBSE_IVALREQ, 0, preq);
				return;
			} else
				continue;
		}

		++preq->rq_refct;
		while (1) {
			int start;
			int end;
			int step;
			int count;

			if ((i = parse_subjob_index(range, &pc, &start, &end, &step, &count)) == -1) {
				if (update_deljob_rply(preq, jid, PBSE_IVALREQ))
					req_reject(PBSE_IVALREQ, 0, preq);
				break;
			} else if (i == 1)
				break;

			/*
			 * Ensure that the range specified in the delete job request does not exceed the
			 * index of the highest numbered array subjob
			 */
			if (start < parent->ji_ajinfo->tkm_start || start > parent->ji_ajinfo->tkm_end) {
				if (update_deljob_rply(preq, jid, PBSE_UNKJOBID))
					req_reject(PBSE_UNKJOBID, 0, preq);
				break;
			}
			for (i = start; i <= end; i += step) {
				pjob = get_subjob_and_state(parent, i, &sjst, NULL);
				if (sjst == JOB_STATE_LTR_UNKNOWN)
					continue;

				if ((sjst == JOB_STATE_LTR_EXITING) && !forcedel)
					continue;

				if (pjob) {
					if (delhist)
						pjob->ji_deletehistory = 1;
					if (check_job_state(pjob, JOB_STATE_LTR_EXPIRED)) {
						log_eventf(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO, pjob->ji_qs.ji_jobid, msg_job_history_delete, preq->rq_user, preq->rq_host);
						job_purge(pjob);
					} else {
						if (dup_br_for_subjob(preq, pjob, req_deletejob2) == 2)
							continue;
					}
				} else {
					/* Queued, Waiting, Held, just set to expired */
					if (sjst != JOB_STATE_LTR_EXPIRED) {
						update_sj_parent(parent, NULL, create_subjob_id(parent->ji_qs.ji_jobid, i), sjst, JOB_STATE_LTR_EXPIRED);
						decr_single_subjob_usage(parent);
					}
				}
			}
			range = pc;
		}
		if (i != -1) {
			sprintf(log_buffer, msg_manager, msg_deletejob,
				preq->rq_user, preq->rq_host);
			if (qdel_mail != 0) {
				svr_mailowner_id(jid, parent, MAIL_OTHER, MAIL_FORCE, log_buffer);
			}
		}

		/* if deleting running subjobs, then just return;            */
		/* parent will be deleted when last running subjob(s) ends   */
		/* and reply will be sent to client when last delete is done */

		if (--preq->rq_refct == 0) {
			if (update_deljob_rply(preq, jid, PBSE_NONE))
				reply_send(preq);
			chk_array_doneness(parent);
		}
	}
}

/**
 * @brief
 * 		req_deletejob2 - service the Delete Job Request
 *
 *		This request deletes a job.
 *
 * @param[in]	preq	- Job Request
 * @param[in,out]	pjob	- Job structure
 *
 * @return int
 * @retval 0 for Success
 * @retval 1 for Error
 * @retval 2 if deljoblist needs to be suspended
 */

static int
req_deletejob2(struct batch_request *preq, job *pjob)
{
	int abortjob = 0;
	char *sig;
	char hook_msg[HOOK_MSG_SIZE] = {0};
	char *rec = "";
	int forcedel = 0;
	struct work_task *pwtold;
	struct work_task *pwtnew;
	struct batch_request *temp_preq = NULL;
	int rc;
	int is_mgr = 0;
	int jt;
	char *job_id = NULL;

	/* + 2 is for the '@' in user@host and for the null termination byte. */
	char by_user[PBS_MAXUSER + PBS_MAXHOSTNAME + 2] = {'\0'};

	/* active job is being deleted by delete job batch request */
	pjob->ji_terminated = 1;
	if ((preq->rq_user != NULL) && (preq->rq_host != NULL))
		sprintf(by_user, "%s@%s", preq->rq_user, preq->rq_host);

	if ((preq->rq_extend && strstr(preq->rq_extend, FORCE)))
		forcedel = 1;

	/* See if the request is coming from a manager */
	if (preq->rq_perm & (ATR_DFLAG_MGRD | ATR_DFLAG_MGWR))
		is_mgr = 1;

	jt = is_job_array(pjob->ji_qs.ji_jobid);

	if (check_job_state(pjob, JOB_STATE_LTR_TRANSIT)) {

		/*
		 * Find pid of router from existing work task entry,
		 * then establish another work task on same child.
		 * Next, signal the router and wait for its completion;
		 */

		pwtold = (struct work_task *) GET_NEXT(pjob->ji_svrtask);
		while (pwtold) {
			if ((pwtold->wt_type == WORK_Deferred_Child) ||
			    (pwtold->wt_type == WORK_Deferred_Cmp)) {
				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid,
					   "Job in Transit state, deletion will resume once we hear back!");
				pwtnew = set_task(pwtold->wt_type,
						  pwtold->wt_event, resume_deletion,
						  preq);
				if (pwtnew) {
					/*
					 * reset type in case the SIGCHLD came
					 * in during the set_task;  it makes
					 * sure that next_task() will find the
					 * new entry.
					 */
					pwtnew->wt_type = pwtold->wt_type;
					pwtnew->wt_aux = pwtold->wt_aux;

					kill((pid_t) pwtold->wt_event, SIGTERM);
					set_job_substate(pjob, JOB_SUBSTATE_ABORT);
					if (preq->rq_type == PBS_BATCH_DeleteJobList) {
						/* let the caller know that the deljoblist request needs to be suspended */
						return 2;
					}
					return 0; /* all done for now */

				} else {
					if (pjob->ji_pmt_preq != NULL)
						reply_preempt_jobs_request(PBSE_SYSTEM, PREEMPT_METHOD_DELETE, pjob);

					if (update_deljob_rply(preq, pjob->ji_qs.ji_jobid, PBSE_SYSTEM)) {
						req_reject(PBSE_SYSTEM, 0, preq);
						return 0;
					}
				}
			}
			pwtold = (struct work_task *) GET_NEXT(pwtold->wt_linkobj);
		}
		/* should never get here ...  */
		log_err(-1, "req_delete", "Did not find work task for router");
		if (pjob->ji_pmt_preq != NULL)
			reply_preempt_jobs_request(PBSE_INTERNAL, PREEMPT_METHOD_DELETE, pjob);

		if (update_deljob_rply(preq, pjob->ji_qs.ji_jobid, PBSE_INTERNAL)) {
			req_reject(PBSE_INTERNAL, 0, preq);
			return 0;
		}

	} else if (((jt != IS_ARRAY_Range) && (jt != IS_ARRAY_Single)) &&
		   (check_job_state(pjob, JOB_STATE_LTR_QUEUED) ||
		    check_job_state(pjob, JOB_STATE_LTR_HELD))) {
		depend_runone_remove_dependency(pjob);
	}

	if (is_mgr && forcedel) {
		/*
		 * Set exit status for the job to SIGKILL as we will not be working with any obit.
		 */
		pjob->ji_qs.ji_un.ji_exect.ji_exitstat = SIGKILL + 0x100;
	}

	if (check_job_state(pjob, JOB_STATE_LTR_RUNNING) ||
	    (check_job_substate(pjob, JOB_SUBSTATE_TERM))) {
		if (check_job_substate(pjob, JOB_SUBSTATE_RERUN)) {
			/* rerun just started, clear that substate and */
			/* normal delete will happen when mom replies  */

			set_job_substate(pjob, JOB_SUBSTATE_RUNNING);
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
				  pjob->ji_qs.ji_jobid, "deleting instead of rerunning");
			acct_del_write(pjob->ji_qs.ji_jobid, pjob, preq, 0);
			if (update_deljob_rply(preq, pjob->ji_qs.ji_jobid, PBSE_NONE))
				reply_ack(preq);
			return 0;
		}

		if (((check_job_substate(pjob, JOB_SUBSTATE_SUSPEND)) ||
		     (check_job_substate(pjob, JOB_SUBSTATE_SCHSUSP))) &&
		    (is_jattr_set(pjob, JOB_ATR_resc_released))) {
			set_resc_assigned(pjob, 0, INCR);
			free_jattr(pjob, JOB_ATR_resc_released);
			mark_jattr_not_set(pjob, JOB_ATR_resc_released);
			if (is_jattr_set(pjob, JOB_ATR_resc_released_list)) {
				free_jattr(pjob, JOB_ATR_resc_released_list);
				mark_jattr_not_set(pjob, JOB_ATR_resc_released_list);
			}
		}

		if (check_job_substate(pjob, JOB_SUBSTATE_PROVISION)) {
			if (forcedel) {
				/*
				 * discard_job not called since job not sent
				 * to MOM
				 */
				log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB,
					  LOG_INFO,
					  pjob->ji_qs.ji_jobid, "deleting job");
				acct_del_write(pjob->ji_qs.ji_jobid, pjob,
					       preq, 0);
				if (update_deljob_rply(preq, pjob->ji_qs.ji_jobid, PBSE_NONE))
					reply_ack(preq);
				rel_resc(pjob);
				job_abt(pjob, NULL);
			} else {
				if (pjob->ji_pmt_preq != NULL)
					reply_preempt_jobs_request(PBSE_BADSTATE, PREEMPT_METHOD_DELETE, pjob);
				if (update_deljob_rply(preq, pjob->ji_qs.ji_jobid, PBSE_BADSTATE))
					req_reject(PBSE_BADSTATE, 0, preq);
			}
			return 0;
		}

		/*
		 * Job is in fact running, so we want to terminate it.
		 *
		 * Send signal request to MOM.  The server will automagically
		 * pick up and "finish" off the client request when MOM replies.
		 * If not "force" send special termjob signal,
		 * if "force" send SIGTERM.
		 */
		if (forcedel)
			sig = sigk;
		else
			sig = sigtj;

		if (is_mgr && forcedel)
			temp_preq = NULL;
		else
			temp_preq = preq;

		rc = issue_signal(pjob, sig, post_delete_mom1, temp_preq);

		/*
		 * If forcedel is set and request is from a manager,
		 * job is deleted from server regardless
		 * of issue_signal to MoM was a success or failure.
		 * Eventually, when the mom updates server about the job,
		 * server sends a discard message to mom and job is then
		 * deleted from mom as well.
		 */
		if ((rc || is_mgr) && forcedel) {
			svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_EXITED);
			if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0)
				issue_track(pjob);
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
				  pjob->ji_qs.ji_jobid, "Delete forced");
			acct_del_write(pjob->ji_qs.ji_jobid, pjob, preq, 0);

			/*
			 * If we are waiting for preemption to be complete and someone does a qdel -Wforce
			 * we need to reply back to the scheduler.  We need to reply success so we don't
			 * attempt another preemption method.  This leads to a minor race condition
			 * where the moms might not be finished cleaning up when the high priority job runs.
			 */
			if (pjob->ji_pmt_preq != NULL)
				reply_preempt_jobs_request(PBSE_NONE, PREEMPT_METHOD_DELETE, pjob);

			if (preq->rq_parentbr)
				reply_ack(preq);
			else {
				if (update_deljob_rply(preq, pjob->ji_qs.ji_jobid, PBSE_NONE))
					reply_ack(preq);
			}

			pjob->ji_qs.ji_obittime = time_now;
			set_jattr_l_slim(pjob, JOB_ATR_obittime, pjob->ji_qs.ji_obittime, SET);

			/* Allocate space for the jobobit hook event params */
			temp_preq = alloc_br(PBS_BATCH_JobObit);
			if (temp_preq == NULL) {
				log_err(PBSE_INTERNAL, __func__, "rq_jobobit alloc failed");
			} else {
				temp_preq->rq_ind.rq_obit.rq_pjob = pjob;
				rc = process_hooks(temp_preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt);
				if (rc == -1) {
					log_err(-1, __func__, "rq_jobobit process_hooks call failed");
				}
				free_br(temp_preq);
			}
			discard_job(pjob, "Forced Delete", 1);
			rel_resc(pjob);

			account_job_update(pjob, PBS_ACCT_LAST);
			account_jobend(pjob, pjob->ji_acctrec, PBS_ACCT_END);

			if (pjob->ji_acctrec)
				rec = pjob->ji_acctrec;

			if (get_sattr_long(SVR_ATR_log_events) & PBSEVENT_JOB_USAGE) {
				/* log events set to record usage */
				log_event(PBSEVENT_JOB_USAGE | PBSEVENT_JOB_USAGE,
					  PBS_EVENTCLASS_JOB, LOG_INFO,
					  pjob->ji_qs.ji_jobid, rec);
			} else {
				char *pc;

				/* no usage in log, truncate messge */

				if ((pc = strchr(rec, ' ')) != NULL)
					*pc = '\0';
				log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
					  pjob->ji_qs.ji_jobid, rec);
			}

			if (is_mgr) {
				/*
				 * Set exit status for the job to SIGKILL as we will not be working with any obit.
				 */
				set_jattr_l_slim(pjob, JOB_ATR_exit_status, pjob->ji_qs.ji_un.ji_exect.ji_exitstat, SET);
			}

			/* see if it has any dependencies */
			if (is_jattr_set(pjob, JOB_ATR_depend))
				depend_on_term(pjob);

			/*
			 * Check if the history of the finished job can be saved or it needs to be purged .
			 */
			svr_saveorpurge_finjobhist(pjob);
			return 0;
		}
		if (rc) {
			if (pjob->ji_pmt_preq != NULL)
				reply_preempt_jobs_request(rc, PREEMPT_METHOD_DELETE, pjob);
			if (update_deljob_rply(preq, pjob->ji_qs.ji_jobid, rc))
				req_reject(rc, 0, preq); /* cant send to MOM */
			sprintf(log_buffer, "Delete failed %d", rc);
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_NOTICE,
				  pjob->ji_qs.ji_jobid, log_buffer);
			return 0;
		}
		/* normally will ack reply when mom responds */
		update_job_finish_comment(pjob, JOB_SUBSTATE_TERMINATED, by_user);
		sprintf(log_buffer, msg_delrunjobsig, sig);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);
		return 0;
	} else if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT) != 0) {

		/* job has restart file at mom, do end job processing */

		svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_EXITING);
		pjob->ji_momhandle = -1; /* force new connection */
		pjob->ji_mom_prot = PROT_INVALID;
		set_task(WORK_Immed, 0, on_job_exit, (void *) pjob);

	} else if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_StagedIn) != 0) {

		/* job has staged-in file, should remove them */
		remove_stagein(pjob);
		abortjob = 1; /* set flag to abort job after mail sent */

	} else {

		/*
		 * the job is not transiting (though it may have been) and
		 * is not running, so abort it.
		 */
		abortjob = 1; /* set flag to abort job after mail sent */
	}
	/*
	 * Log delete and if requesting client is not job owner, send mail.
	 */

	acct_del_write(pjob->ji_qs.ji_jobid, pjob, preq, 0);
	job_id = strdup(pjob->ji_qs.ji_jobid);

	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_ArrayJob) {
		if (forcedel) {
			/* If the array job is being force deleted, then reset the subjob
			   state counts so that chk_array_doneness() doesn't consider the
			   array job to still have active subjobs. */
			pjob->ji_ajinfo->tkm_subjsct[JOB_STATE_QUEUED] = 0;
			pjob->ji_ajinfo->tkm_subjsct[JOB_STATE_RUNNING] = 0;
			pjob->ji_ajinfo->tkm_subjsct[JOB_STATE_HELD] = 0;
			pjob->ji_ajinfo->tkm_subjsct[JOB_STATE_EXITING] = 0;
		}
		chk_array_doneness(pjob);
	}
	else if (abortjob) {
		if (check_job_state(pjob, JOB_STATE_LTR_EXITING))
			discard_job(pjob, "Forced Delete", 1);
		rel_resc(pjob);
		if (pjob->ji_pmt_preq != NULL)
			reply_preempt_jobs_request(PBSE_NONE, PREEMPT_METHOD_DELETE, pjob);
		job_abt(pjob, NULL);
	}

	if (preq->rq_parentbr)
		reply_send(preq);
	else {
		if (update_deljob_rply(preq, job_id, PBSE_NONE))
			reply_send(preq);
	}

	free(job_id);

	return 0;
}

/**
 * @brief
 * 		req_reservationOccurrenceEnd - service the PBS_BATCH_ResvOccurEnd Request
 *
 *		This request runs a hook script at the end of the reservation occurrence
 *
 * @param[in]	preq	- Job Request
 *
 */

void
req_reservationOccurrenceEnd(struct batch_request *preq)
{
	char hook_msg[HOOK_MSG_SIZE] = {0};

	switch (process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt)) {
		case 0: /* explicit reject */
			reply_text(preq, PBSE_HOOKERROR, hook_msg);
			break;
		case 1: /* no recreate request as there are only read permissions */
		case 2: /* no hook script executed - go ahead and accept event*/
			reply_ack(preq);
			break;
		default:
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK, LOG_INFO, __func__, "resv_end event: accept req by default");
			reply_ack(preq);
	}
	return;
}

/**
 * @brief
 * 		req_deleteReservation - service the PBS_BATCH_DeleteResv Request
 *
 *		This request deletes a resources reservation if the requester
 *		is authorized to do this.
 *
 * @param[in]	preq	- Job Request
 *
 * @par	MT-safe: No
 */

void
req_deleteReservation(struct batch_request *preq)
{
	static int lenF = 6; /*strlen ("False") + 1*/

	resc_resv *presv;
	job *pjob;
	struct batch_request *newreq;
	struct work_task *pwt;

	char buf[PBS_MAXHOSTNAME + PBS_MAXUSER + 2]; /* temp, possibly remove in future */
	char user[PBS_MAXUSER + 1];
	char host[PBS_MAXHOSTNAME + 1];
	int perm;
	int relVal;
	int state, sub;
	long futuredr;

	/*Does resc_resv object exist and requester have enough priviledge?*/

	presv = chk_rescResv_request(preq->rq_ind.rq_manager.rq_objname, preq);

	/*Note: on failure, chk_rescResv_request invokes req_reject
	 *Appropriate reply got sent & batch_request got freed
	 */
	if (presv == NULL)
		return;

	/*Know resc_resv struct exists and requester allowed to remove it*/
	futuredr = presv->ri_futuredr;
	presv->ri_futuredr = 0; /*would be non-zero if getting*/
	/*here from task_list_timed*/
	strcpy(user, preq->rq_user); /*need after request is gone*/
	strcpy(host, preq->rq_host);
	perm = preq->rq_perm;

	/*Generate message(s) to reservation owner (listed users) as appropriate
	 *according to what was requested in the mailpoints attribute and who
	 *the submitter of the request happens to be (user, scheduler, or us)
	 */
	resv_mailAction(presv, preq);
	/*ck_submitClient_needs_reply()*/
	if (presv->ri_brp) {
		if (presv->ri_qs.ri_state == RESV_UNCONFIRMED) {
			if (is_rattr_set(presv, RESV_ATR_interactive) &&
			    get_rattr_long(presv, RESV_ATR_interactive) < 0 &&
			    futuredr != 0) {

				sprintf(buf, "%s delete, wait period expired",
					presv->ri_qs.ri_resvID);
			} else {
				sprintf(buf, "%s DENIED", presv->ri_qs.ri_resvID);
			}

		} else {
			sprintf(buf, "%s BEING DELETED", presv->ri_qs.ri_resvID);
		}

		reply_text(presv->ri_brp, PBSE_NONE, buf);
		presv->ri_brp = NULL;
	}

	sprintf(buf, "%s@%s", preq->rq_user, preq->rq_host);
	sprintf(log_buffer, "requestor=%s", buf);

	if (strcmp(get_rattr_str(presv, RESV_ATR_resv_owner), buf))
		account_recordResv(PBS_ACCT_DRss, presv, log_buffer);
	else
		account_recordResv(PBS_ACCT_DRclient, presv, log_buffer);

	if (presv->ri_qs.ri_state != RESV_UNCONFIRMED) {
		char hook_msg[HOOK_MSG_SIZE] = {0};
		switch (process_hooks(preq, hook_msg, sizeof(hook_msg), pbs_python_set_interrupt)) {
			case 0: /* explicit reject */
			case 1: /* no recreate request as there are only read permissions */
			case 2: /* no hook script executed - go ahead and accept event*/
				break;
			default:
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK, LOG_INFO, __func__, "resv_end event: accept req by default");
		}
	}

	/*If there are any jobs associated with the reservation, construct and
	 *issue a PBS_BATCH_DeleteJob request for each job.
	 *
	 *General notes on this process:
	 *Use issue_Drequest() to issue a PBS_BATCH_* request - can be to
	 *1) this server, 2) another server, 3) to a pbs_mom.
	 *
	 *In the present situation the server is going to issue the request to
	 *itself (a locally generated request).  The future "event" that's
	 *to occur, and which must be handled, is the reply fom this request.
	 *The handling task is initially placed on the server's "task_list_event"
	 *list as a task of type "WORK_Deferred_Local" and, a call is made to the
	 *general "dispatch" function (dispatch_request) to dispatch the request.
	 *In replying back to itself regarding the request to itself, function
	 *"reply_send" is called.  Since it is passed a pointer to the batch_request
	 *structure for the request that it received, it will note that the request
	 *came from itself (connection set to PBS_LOCAL_CONNECTION).  It finds the
	 *handling task on "task_list_event" by finding the task with task field
	 *"wt_parm1" set equal to the address of the batch request structure in
	 *question.  That task is moved off the "event_task_list" and put on the
	 *"immedite_task_list" where it can be found and invoked the next time that
	 *the server calls function "next_task" from it's main loop.
	 *The work_task function that's going to be invoked will be responding to
	 *the "reply" that comes back from the servers original request to itself.
	 *The handling function, in addition to whatever else it might do, does
	 *have the RESPONSIBILITY of calling free_br() to remove all memory associated
	 *with the batch_request structure.
	 */

	if (presv->ri_qp != NULL && presv->ri_qp->qu_numjobs > 0) {

		/*One or more jobs are attached to this resource reservation
		 *Issue a PBS_BATCH_Manager request to set "enable" to "False"
		 *for the queue (if it is not already so set), set "start" for
		 *the queue to "False" as well- so the scheduler will cease
		 *scheduling the jobs in queue, then issue a PBS_BATCH_DeleteJob
		 *request for each resident job.
		 */

		int deleteProblem = 0;
		job *pnxj;

		if (get_qattr_long(presv->ri_qp, QA_ATR_Enabled)) {

			svrattrl *psatl;
			newreq = alloc_br(PBS_BATCH_Manager);
			if (newreq == NULL) {
				req_reject(PBSE_SYSTEM, 0, preq);
				return;
			}
			CLEAR_HEAD(newreq->rq_ind.rq_manager.rq_attr);

			newreq->rq_ind.rq_manager.rq_cmd = MGR_CMD_SET;
			newreq->rq_ind.rq_manager.rq_objtype = MGR_OBJ_QUEUE;
			strcpy(newreq->rq_ind.rq_manager.rq_objname,
			       presv->ri_qp->qu_qs.qu_name);
			strcpy(newreq->rq_user, user);
			strcpy(newreq->rq_host, host);
			newreq->rq_perm = perm;

			if ((psatl = attrlist_create(ATTR_enable, NULL, lenF)) == NULL) {

				req_reject(PBSE_SYSTEM, 0, preq);
				free_br(newreq);
				return;
			}
			psatl->al_flags = que_attr_def[QA_ATR_Enabled].at_flags;
			strcpy(psatl->al_value, "False");
			append_link(&newreq->rq_ind.rq_manager.rq_attr, &psatl->al_link, psatl);

			if ((psatl = attrlist_create(ATTR_start, NULL, lenF)) == NULL) {

				req_reject(PBSE_SYSTEM, 0, preq);
				free_br(newreq);
				return;
			}
			psatl->al_flags = que_attr_def[QA_ATR_Started].at_flags;
			strcpy(psatl->al_value, "False");
			append_link(&newreq->rq_ind.rq_manager.rq_attr,
				    &psatl->al_link, psatl);

			if (issue_Drequest(PBS_LOCAL_CONNECTION, newreq,
					   release_req, &pwt, 0) == -1) {
				req_reject(PBSE_SYSTEM, 0, preq);
				free_br(newreq);
				return;
			}
			/* set things so that any removal of the reservation
			 * structure also removes any "yet to be processed"
			 * work tasks that are associated with the reservation
			 */
			append_link(&presv->ri_svrtask, &pwt->wt_linkobj, pwt);

			tickle_for_reply();
		}

		/*Ok, input to the queue is stopped, try and delete jobs in queue*/

		relVal = 1;
		eval_resvState(presv, RESVSTATE_req_deleteReservation,
			       relVal, &state, &sub);
		resv_setResvState(presv, state, sub);
		pjob = (job *) GET_NEXT(presv->ri_qp->qu_jobs);
		while (pjob != NULL) {

			pnxj = (job *) GET_NEXT(pjob->ji_jobque);

			/* skip all expired subjobs, expired subjobs are deleted when array parent is
			 * issued delete request
			 */
			for (; pnxj != NULL && (pnxj->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) &&
			       check_job_state(pnxj, JOB_STATE_LTR_EXPIRED);
			     pnxj = (job *) GET_NEXT(pnxj->ji_jobque))
				;
			/*
			 * If a history job (job state is JOB_STATE_LTR_MOVED
			 * or JOB_STATE_LTR_FINISHED, then no need to delete
			 * it again as it is already deleted.
			 */
			if (check_job_state(pjob, JOB_STATE_LTR_MOVED) ||
			    (pjob->ji_qs.ji_svrflags & JOB_SVFLG_SubJob) ||
			    check_job_state(pjob, JOB_STATE_LTR_FINISHED)) {
				pjob = pnxj;
				continue;
			}

			newreq = alloc_br(PBS_BATCH_DeleteJob);
			if (newreq != NULL) {

				/*when owner of job is not same as owner of resv, */
				/*need extra permission; Also extra info for owner*/

				CLEAR_HEAD(newreq->rq_ind.rq_manager.rq_attr);
				newreq->rq_perm = perm | ATR_DFLAG_MGWR;
				newreq->rq_extend = NULL;

				/*reply processing needs resv*/
				newreq->rq_extra = (void *) presv;

				strcpy(newreq->rq_user, user);
				strcpy(newreq->rq_host, host);
				strcpy(newreq->rq_ind.rq_delete.rq_objname,
				       pjob->ji_qs.ji_jobid);

				if (issue_Drequest(PBS_LOCAL_CONNECTION, newreq,
						   release_req, &pwt, 0) == -1) {
					deleteProblem++;
					free_br(newreq);
				}
				/* set things so that any removal of the reservation
				 * structure also removes any "yet to be processed"
				 * work tasks that are associated with the reservation
				 */
				append_link(&presv->ri_svrtask, &pwt->wt_linkobj, pwt);

				tickle_for_reply();
			} else
				deleteProblem++;

			pjob = pnxj;
		}

		if (deleteProblem) {
			/*some problems attempting to delete reservation's jobs
			 *shouldn't end up re-calling req_deleteReservation
			 */
			sprintf(log_buffer, "%s %s\n",
				"problem deleting jobs belonging to",
				presv->ri_qs.ri_resvID);
			reply_text(preq, PBSE_RESVMSG, log_buffer);
		} else {
			/*no problems so far, we are attempting to do it
			 *If all job deletions succeed, resv_purge()
			 *should get triggered
			 */
			reply_ack(preq);

			/*
			 * If all the jobs in the RESV are history jobs, then
			 * better to purge the RESV now only without waiting
			 * for next resv delete iteration.
			 */
			pjob = NULL;
			if (presv && presv->ri_qp)
				pjob = (job *) GET_NEXT(presv->ri_qp->qu_jobs);
			while (pjob != NULL) {
				if ((!check_job_state(pjob, JOB_STATE_LTR_MOVED)) &&
				    (!check_job_state(pjob, JOB_STATE_LTR_FINISHED)) &&
				    (!check_job_state(pjob, JOB_STATE_LTR_EXPIRED)))
					break;
				pjob = (job *) GET_NEXT(pjob->ji_jobque);
			}
			if (pjob == NULL) /* all are history jobs only */
				resv_purge(presv);
			else {
				/* other jobs remain, need to set task to monitor */
				/* when they are dequeued */
				pwt = set_task(WORK_Immed, 0, post_deljobfromresv_req,
					       (void *) presv);
				if (pwt)
					append_link(&presv->ri_svrtask,
						    &pwt->wt_linkobj, pwt);
			}
		}

		/*This is all we can do for now*/
		return;
	}

	/* Ok, we have no jobs attached so can purge reservation
		If reservation has an attached queue, a request to qmgr
		will get made to delete the queue
		*/
	relVal = 2;
	eval_resvState(presv, RESVSTATE_req_deleteReservation,
		       relVal, &state, &sub);
	resv_setResvState(presv, state, sub);
	reply_ack(preq);
	resv_purge(presv);
	return;
}

/**
 * @brief
 * 		post_delete_mom1 - first of 2 work task trigger functions to finish the
 *		deleting of a running job.  This first part is invoked when MOM
 *		responds to the SIGTERM signal request.
 *
 * @param[in]	pwt	- work task
 */

static void
post_delete_mom1(struct work_task *pwt)
{
	int auxcode;
	job *pjob;
	struct batch_request *preq_sig; /* signal request to MOM */
	struct batch_request *preq_clt; /* original client request */
	int rc;
	int tries = 0;

	preq_sig = pwt->wt_parm1;
	rc = preq_sig->rq_reply.brp_code;
	/* Look for PBSE_ error code in wt_aux if brp code reports some other error */
	if (rc && pwt->wt_aux && (rc < PBSE_) && (pwt->wt_aux > PBSE_))
		rc = pwt->wt_aux;
	auxcode = preq_sig->rq_reply.brp_auxcode;
	preq_clt = preq_sig->rq_extra;
	if (preq_clt == NULL) {
		release_req(pwt);
		return;
	}

	pjob = find_job(preq_sig->rq_ind.rq_signal.rq_jid);
	if (pjob == NULL) {
		/* job has gone away */
		if (update_deljob_rply(preq_clt, preq_sig->rq_ind.rq_signal.rq_jid, PBSE_UNKJOBID))
			req_reject(PBSE_UNKJOBID, 0, preq_clt);
		release_req(pwt);
		return;
	}
	release_req(pwt);

resend:
	if (rc) {
		/* mom rejected request */
		sprintf(log_buffer, "MOM rejected signal during delete (%d)", rc);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);

		if (rc == PBSE_UNKSIG) {
			if (tries++) {
				if (update_deljob_rply(preq_clt, pjob->ji_qs.ji_jobid, rc))
					req_reject(rc, 0, preq_clt);
				return;
			}
			/* 2nd try, use SIGTERM */
			rc = issue_signal(pjob, sigt, post_delete_mom1, preq_clt);
			if (rc == 0)
				return; /* will be back when replies */
			goto resend;
		} else if (rc == PBSE_UNKJOBID) {

			if (check_job_substate(pjob, JOB_SUBSTATE_PRERUN)) {
				/* This means the job has trigerred to run again,
				 try deleting again! */
				rc = issue_signal(pjob, sigtj, post_delete_mom1, preq_clt);
				if (rc == 0)
					return; /* will be back when replies */
				goto resend;
			}

			/* MOM claims no knowledge, so just purge it */
			acct_del_write(pjob->ji_qs.ji_jobid, pjob, preq_clt, 0);
			/* removed the resources assigned to job */
			free_nodes(pjob);
			set_resc_assigned(pjob, 0, DECR);
			if (update_deljob_rply(preq_clt, pjob->ji_qs.ji_jobid, PBSE_NONE))
				reply_ack(preq_clt);
			svr_saveorpurge_finjobhist(pjob);
		} else {
			if (pjob->ji_pmt_preq != NULL)
				reply_preempt_jobs_request(rc, PREEMPT_METHOD_DELETE, pjob);
			if (update_deljob_rply(preq_clt, pjob->ji_qs.ji_jobid, rc))
				req_reject(rc, 0, preq_clt);
		}
		return;
	}

	acct_del_write(pjob->ji_qs.ji_jobid, pjob, preq_clt, 0);

	if (preq_clt->rq_parentbr)
		reply_ack(preq_clt);
	else
		if (update_deljob_rply(preq_clt, pjob->ji_qs.ji_jobid, PBSE_NONE))
			reply_ack(preq_clt); /* dont need it, reply now */

	if (auxcode == JOB_SUBSTATE_TERM) {
		/* Mom running a site supplied Terminate Job script   */
		/* Put job into special Exiting state and we are done */

		svr_setjobstate(pjob, JOB_STATE_LTR_EXITING, JOB_SUBSTATE_TERM);
		return;
	}
}

/**
 * @brief
 *		post_deljobfromresv_req
 *
 * @par Functionality:
 *
 *		This work_task function is triggered after all jobs in the queue
 *		associated with a reservation have had delete requests issued.
 *		If all jobs are  indeed found to be no longer present,
 *		the down counter in the reservation structure is
 *		decremented.  When the decremented value becomes less
 *		than or equal to zero, issue a request to delete the
 *		reservation.
 *
 *		If SERVER is configured for history jobs...
 *		If the reservation down counter is positive, check if all
 *		the jobs in the resv are history jobs. If yes, purge the
 *		reservation using resv_purge() without waiting.
 *
 *		If there are still non-history jobs, recall itself after 30 seconds.
 *
 * @param[in]	pwt	- pointer to the work task, the reservation structure
 *			  			pointer is contained in wt_parm1
 *
 * @return	void
 *
 */

static void
	post_deljobfromresv_req(pwt) struct work_task *pwt;
{
	resc_resv *presv;
	job *pjob = NULL;

	presv = (resc_resv *) ((struct batch_request *) pwt->wt_parm1);

	/* return if presv is not valid */
	if (presv == NULL)
		return;

	presv->ri_downcnt = presv->ri_qp->qu_numjobs;
	if (presv->ri_downcnt != 0) {
		if (presv->ri_qp)
			pjob = (job *) GET_NEXT(presv->ri_qp->qu_jobs);
		while (pjob != NULL) {
			if ((!check_job_state(pjob, JOB_STATE_LTR_MOVED)) &&
			    (!check_job_state(pjob, JOB_STATE_LTR_FINISHED)) &&
			    (!check_job_state(pjob, JOB_STATE_LTR_EXPIRED)))
				break;
			pjob = (job *) GET_NEXT(pjob->ji_jobque);
		}
		/*
			* If pjob is NULL, then all are history jobs only,
			* make the ri_downcnt to 0, so that resv_purge()
			* can be called down.
			*/
		if (pjob == NULL)
			presv->ri_downcnt = 0;
	}

	if (presv->ri_downcnt == 0) {
		resv_purge(presv);
	} else if (pjob) {
		/* one or more jobs still not able to be deleted; set me up for
		 * another call for 30 seconds into the future.
		 */
		pwt = set_task(WORK_Timed, time_now + 30, post_deljobfromresv_req,
			       (void *) presv);
		if (pwt)
			append_link(&presv->ri_svrtask, &pwt->wt_linkobj, pwt);
	}
}
