/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

/**
 * @file	mom_comm.c
 */
#include <pbs_config.h> /* the master config generated by configure */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>

#include <unistd.h>
#include <dirent.h>
#include <pwd.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/param.h>
#include <sys/times.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <limits.h>
#include <sys/types.h>
#include <sys/stat.h>

#include "libpbs.h"
#include "list_link.h"
#include "attribute.h"
#include "resource.h"
#include "server_limits.h"
#include "job.h"
#include "pbs_error.h"
#include "log.h"
#include "net_connect.h"
#include "tpp.h"
#include "dis.h"
#include "mom_func.h"
#include "mom_server.h"
#include "credential.h"
#include "ticket.h"
#include "pbs_nodes.h"
#include "svrfunc.h"
#include "batch_request.h"
#include "hook.h"
#include "mom_hook_func.h"
#include "pbs_internal.h"
#include "placementsets.h"
#include "pbs_reliable.h"
#include "renew_creds.h"
#ifdef PMIX
#include "mom_pmix.h"
#endif

/* Global Data Items */

extern int exiting_tasks;
extern char mom_host[];
extern char *path_jobs;
extern int pbs_errno;
extern pbs_list_head mom_deadjobs; /* for deferred purging of job */
extern pbs_list_head mom_polljobs; /* must have resource limits polled */
extern pbs_list_head svr_alljobs;  /* all jobs under MOM's control */
extern time_t time_now;
extern int server_stream;
extern char mom_short_name[];
extern unsigned int pbs_mom_port;
extern unsigned int pbs_rm_port;
extern int gen_nodefile_on_sister_mom;

extern int mom_net_up;
extern time_t mom_net_up_time;
extern int max_poll_downtime_val;
extern char *msg_err_malloc;
extern int
write_pipe_data(int upfds, void *data, int data_size);
char task_fmt[] = "/%8.8X";
extern void resume_multinode(job *pjob);

/* Function pointers
 **
 ** These are functions to provide extra interaction between mother
 ** superior and the sisters for any special job setup that needs
 ** to take place. If no extra setup needs to happen, the function
 ** pointers are all NULL and standard MOM interaction takes place.
 ** The sequence of actions which happen for extra setup is as follows
 ** (showing one sister):
 **
 **    MS                          sister
 ** Sends JOIN_JOB
 **                                Gets JOIN_JOB, calls job_join_extra.
 **                                Calls job_join_ack to append to reply.
 ** Calls job_join_read to read
 ** extra values included with
 ** JOIN_JOB reply.
 **
 ** Calls job_join_extra to get
 ** her own extra info.
 **
 ** Calls send_sister to send a
 ** SETUP_JOB message and uses
 ** job_setup_send to append setup
 ** information to the message.
 **                                Gets SETUP_JOB, calls job_setup_final.
 ** Gets reply to SETUP_JOB, calls
 ** job_setup_final.
 **
 ** At this point, all the extra setup for the job is done and it
 ** can be started by calling finish_exec.  The clean up to undo or
 ** deallocate whatever resources were claimed in job_setup_final
 ** is done in job_clean_extra.
 */

/*
 **	Gather any extra information needed at job start.  Called by
 **	a sister when she gets JOIN_JOB.  Called by MS after she gets
 **	good JOIN_JOB replies from all the sisters.
 */
#ifdef PMIX
pbs_jobnode_t job_join_extra = &pbs_pmix_job_join_extra;
#else
pbs_jobnode_t job_join_extra = NULL;
#endif

/*
 **	Used by a sister node to write extra information back to MS
 **	with the reply to JOIN_JOB.
 */
pbs_jobndstm_t job_join_ack = NULL;

/*
 **	Used by MS to read extra information sent by a sister reply
 **	to JOIN_JOB.
 */
pbs_jobndstm_t job_join_read = NULL;

/*
 **	Called by MS from send_sisters to form a SETUP_JOB message.
 */
pbs_jobndstm_t job_setup_send = NULL;

/*
 **	Called by a sister to read a SETUP_JOB message and do any
 **	special setup required at job start.
 */
pbs_jobstream_t job_setup_final = NULL;

/*
 **	Does any special processing needed before the epilogue runs.
 */
pbs_jobvoid_t job_end_final = NULL;

/*
 **	Called at job end to undo the setup done in job_setup_final
 **	or job_join_extra.
 */
#ifdef PMIX
pbs_jobfunc_t job_clean_extra = &pbs_pmix_job_clean_extra;
#else
pbs_jobfunc_t job_clean_extra = NULL;
#endif

/*
 **	Free memory allocated in ji_setup and hn_setup for all nodes.
 */
pbs_jobvoid_t job_free_extra = NULL;

/*
 **	Free memory allocated in hn_setup for a given node.
 */
pbs_jobnodevoid_t job_free_node = NULL;

/* the following depends on tm_node_id being 0 to n-1 */
#define TO_PHYNODE(vnode) pjob->ji_vnods[vnode].vn_host->hn_node

eventent *event_dup(eventent *ep, job *pjob, hnodent *pnode);

/**
 * @brief
 *	Save the critical information associated with a task to disk.
 *
 * @param[in]   ptask - structure handle holding task info to be saved
 *
 * @return   Error code
 * @retval   0 Success
 * @retval  -1 Failure
 *
 */
int
task_save(pbs_task *ptask)
{
	job *pjob = ptask->ti_job;
	int fds;
	int i;
	char namebuf[MAXPATHLEN + 1];
	char filnam[MAXPATHLEN + 1];
	int openflags;

	(void) strcpy(namebuf, path_jobs); /* job directory path */
	if (*pjob->ji_qs.ji_fileprefix != '\0')
		(void) strcat(namebuf, pjob->ji_qs.ji_fileprefix);
	else
		(void) strcat(namebuf, pjob->ji_qs.ji_jobid);
	(void) strcat(namebuf, JOB_TASKDIR_SUFFIX);
	(void) sprintf(filnam, task_fmt, ptask->ti_qs.ti_task);
	(void) strcat(namebuf, filnam);

	openflags = O_WRONLY | O_CREAT;
	fds = open(namebuf, openflags, 0600);
	if (fds < 0) {
		sprintf(log_buffer, "error on open %s", namebuf);
		log_err(errno, __func__, log_buffer);
		return (-1);
	}

#ifdef WIN32
	secure_file(namebuf, "Administrators",
		    READS_MASK | WRITES_MASK | STANDARD_RIGHTS_REQUIRED);
#endif

	/* just write the "critical" base structure to the file */

	while ((i = write(fds, (char *) &ptask->ti_qs, sizeof(ptask->ti_qs))) !=
	       sizeof(ptask->ti_qs)) {
		if ((i < 0) && (errno == EINTR)) { /* retry the write */
			if (lseek(fds, (off_t) 0, SEEK_SET) < 0) {
				log_err(errno, __func__, "lseek");
				(void) close(fds);
				return (-1);
			}
			continue;
		} else {
			log_err(errno, __func__, "quickwrite");
			(void) close(fds);
			return (-1);
		}
	}
	(void) close(fds);
	return (0);
}

/**
 * @brief
 *	Duplicate an event and link it to the given nodeent entry.
 *
 * @param[in] ep - eventent pointer to event to be linked
 * @param[in] pjob - job pointer to job
 * @param[in] pnode - hnode pointer to node to link event
 *
 * @return structure
 * @retval event linked
 *
 */
eventent *
event_dup(eventent *ep, job *pjob, hnodent *pnode)
{
	eventent *nep;

	nep = (eventent *) malloc(sizeof(eventent));
	assert(nep);

	memmove(nep, ep, sizeof(*ep));
	CLEAR_LINK(nep->ee_next);

	append_link(&pnode->hn_events, &nep->ee_next, nep);

	if (pnode->hn_stream == -1)
		pnode->hn_stream = tpp_open(pnode->hn_host, pnode->hn_port);

	return nep;
}

/**
 * @brief
 *	Allocate an event and link it to the given nodeent entry.
 *
 * @param[in] pjob - pointer to job structure
 * @param[in] command - command event is for
 * @param[in] fd - TM stream
 * @param[in] pnode - pointer to structure to keep track of events for node
 * @param[in] event - MOM event number
 * @param[in] taskid - which task id
 *
 * @return structure handle
 * @retval eventent *
 *
 */
eventent *
event_alloc(job *pjob, int command, int fd, hnodent *pnode,
	    tm_event_t event, tm_task_id taskid)
{
	static tm_event_t eventnum = TM_NULL_EVENT + 1;
	static int rollover = 0;
	eventent *ep;

	ep = (eventent *) malloc(sizeof(eventent));
	assert(ep);
	ep->ee_command = command;
	ep->ee_retry = 0;
	ep->ee_fd = fd;
	ep->ee_client = event;
	ep->ee_taskid = taskid;
	ep->ee_argv = NULL;
	ep->ee_envp = NULL;
	CLEAR_LINK(ep->ee_next);

	if ((ep->ee_event = eventnum++) == TM_NULL_EVENT) {
		/*
		 ** Set the eventnum counter back to initial condition.
		 ** The first legal event number is TM_NULL_EVENT+1.
		 */
		DBPRT(("%s: EVENT ROLLOVER\n", __func__))
		eventnum = TM_NULL_EVENT + 1;
		ep->ee_event = eventnum++;
		rollover = 1;
	}

	if (rollover) {
		int i;

		/*
		 ** Check for events to be sure there are no dups.
		 */
	check:
		for (i = 0; i < pjob->ji_numnodes; i++) {
			eventent *sp;
			hnodent *np = &pjob->ji_hosts[i];

			sp = (eventent *) GET_NEXT(np->hn_events);
			while (sp) {
				if (sp->ee_event == ep->ee_event) {
					DBPRT(("%s: DUP host event\n", __func__))
					ep->ee_event = eventnum++;
					goto check;
				}

				sp = (eventent *) GET_NEXT(sp->ee_next);
			}
		}
		/*
		 ** We don't need to search the obit events because
		 ** any local client (not MOM) will have generated
		 ** the event number that is saved for the obit.
		 */
	}

	append_link(&pnode->hn_events, &ep->ee_next, ep);

	if (pnode->hn_stream == -1)
		pnode->hn_stream = tpp_open(pnode->hn_host, pnode->hn_port);

	return ep;
}

/**
 * @brief
 *	How many bits does it take to represent a number?
 *
 * @param[in] x - unsigned number
 *
 * @return int
 * @retval number of bits
 *
 */
int
numbits(uint x)
{
	int i;

	for (i = 0; x != 0; i++)
		x >>= 1;
	return i;
}

/**
 * @brief
 *	Create a new task for job
 *
 * @param[in] pjob - structure handle to job
 *
 * @return structure
 * @retval structure handle to pbs_task
 *
 */
pbs_task *
momtask_create(job *pjob)
{
	pbs_task *ptask;
	tm_task_id taskid;

	{
		int i;
		uint nodeid = pjob->ji_numvnod; /* largest nodeid */
		uint myvnodeid = pjob->ji_nodeid;

		i = numbits(nodeid);
		taskid = pjob->ji_taskid++;

		/* check for overflow */
		if (numbits(taskid) > (8 * sizeof(taskid) - i))
			return NULL;

		myvnodeid <<= (8 * sizeof(taskid) - i);
		taskid |= myvnodeid;
	}

	ptask = (pbs_task *) malloc(sizeof(pbs_task));
	assert(ptask);
	memset((void *) ptask, 0, sizeof(pbs_task));
	ptask->ti_job = pjob;
	CLEAR_LINK(ptask->ti_jobtask);
	append_link(&pjob->ji_tasks, &ptask->ti_jobtask, ptask);
	ptask->ti_tmfd = NULL;
	ptask->ti_protover = -1;
	ptask->ti_flags = 0;
	ptask->ti_cput = 0;
#ifdef WIN32
	ptask->ti_hProc = NULL;
#endif
	ptask->ti_register = TM_NULL_EVENT;
	CLEAR_HEAD(ptask->ti_obits);
	CLEAR_HEAD(ptask->ti_info);

	ptask->ti_qs.ti_parentnode = TM_ERROR_NODE;
	ptask->ti_qs.ti_parenttask = 0;
	ptask->ti_qs.ti_task = taskid;

	ptask->ti_qs.ti_myvnode = 0;
	ptask->ti_qs.ti_status = TI_STATE_EMBRYO;
	ptask->ti_qs.ti_sid = 0;
	ptask->ti_qs.ti_exitstat = 0;
	memset(ptask->ti_qs.ti_u.ti_hold, 0, sizeof(ptask->ti_qs.ti_u.ti_hold));

	return ptask;
}

/**
 * @brief
 *	find task for job
 *
 * @param[in] pjob - structure handle to job
 * @param[in] taskid - task id
 *
 * @retval structure handle to pbs_task
 *
 */
pbs_task *
task_find(job *pjob, tm_task_id taskid)
{
	pbs_task *ptask;

	for (ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
	     ptask;
	     ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask)) {
		if (ptask->ti_qs.ti_task == taskid)
			break;
	}
	return ptask;
}

/**
 * @brief
 *	find session  for task
 *
 * @param[in] sid - session id
 *
 * @return structure handle to pbs_task
 *
 */
pbs_task *
find_session(pid_t sid)
{
	job *pjob;
	pbs_task *ptask;

	for (pjob = (job *) GET_NEXT(svr_alljobs);
	     pjob != NULL;
	     pjob = (job *) GET_NEXT(pjob->ji_alljobs)) {
		for (ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
		     ptask;
		     ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask)) {
			if (ptask->ti_qs.ti_sid == sid)
				return ptask;
		}
	}
	return NULL;
}

/**
 * @brief
 *	check task for job
 *
 * @param[in] pjob - structure handle to job
 * @param[in] fd   - TM stream
 * @param[in] taskid - task's taskid
 *
 * @return structure handle to pbs_task
 *
 */
pbs_task *
task_check(job *pjob, int fd, tm_task_id taskid)
{
	int i;
	pbs_task *ptask;

	ptask = task_find(pjob, taskid);
	if (ptask == NULL) {
		sprintf(log_buffer, "requesting task %8.8X not found",
			taskid);
		log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
		return NULL;
	}
	for (i = 0; i < ptask->ti_tmnum; i++) {
		if (ptask->ti_tmfd[i] == fd)
			break;
	}
	if (fd < 0 || i == ptask->ti_tmnum) {
		sprintf(log_buffer, "cannot tm_reply to task %8.8X", taskid);
		log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG,
			  pjob->ji_qs.ji_jobid, log_buffer);
		return NULL;
	}
	return ptask;
}

/**
 * @brief
 *      Recover (read in) the tasks from their save files for a job.
 *      This function is only needed upon MOM start up.
 *
 * @param [in]	pjob - pointer to struct job.
 *
 * @return	int
 * @retval	0	Success
 * @retval	-1	Open dir on dirname failed
 *
 */
int
task_recov(job *pjob)
{
	int fds;
	pbs_task *pt;
	char dirname[MAXPATHLEN + 1];
	char namebuf[MAXPATHLEN + 1];
#ifdef WIN32
	int len;
	HANDLE hDir;
	WIN32_FIND_DATA finfo;
#else
	DIR *dir;
	struct dirent *pdirent;
#endif
	struct taskfix task_save;

	(void) strcpy(dirname, path_jobs); /* job directory path */
	if (*pjob->ji_qs.ji_fileprefix != '\0')
		(void) strcat(dirname, pjob->ji_qs.ji_fileprefix);
	else
		(void) strcat(dirname, pjob->ji_qs.ji_jobid);
	(void) strcat(dirname, JOB_TASKDIR_SUFFIX);

#ifdef WIN32
	(void) strcat(dirname, "\\*");

	if ((hDir = FindFirstFile(dirname, &finfo)) == INVALID_HANDLE_VALUE)
		return -1;

	len = strlen(dirname);
	dirname[len - 1] = '\0'; /* trim wildcard */
	do {
		if (finfo.cFileName[0] == '.')
			continue;

		(void) strcpy(namebuf, dirname);
		(void) strcat(namebuf, finfo.cFileName);

		fds = open(namebuf, O_RDONLY, 0);
		if (fds < 0) {
			log_err(errno, __func__, "open of task file");
			unlink(namebuf);
			continue;
		}

		/* read in task quick save sub-structure */
		if (read(fds, (char *) &task_save, sizeof(task_save)) !=
		    sizeof(task_save)) {
			log_err(errno, __func__, "read");
			unlink(namebuf);
			(void) close(fds);
			continue;
		}
		if ((pt = momtask_create(pjob)) == NULL) {
			unlink(namebuf);
			(void) close(fds);
			continue;
		}
		pt->ti_qs = task_save;
		(void) close(fds);

		if (task_save.ti_sid > 0) {
			pt->ti_hProc = OpenProcess(PROCESS_ALL_ACCESS,
						   FALSE, pt->ti_qs.ti_sid);
		}
	} while (FindNextFile(hDir, &finfo));
	(void) FindClose(hDir);
#else
	if ((dir = opendir(dirname)) == NULL)
		return -1;

	(void) strcat(dirname, "/");
	while (errno = 0, (pdirent = readdir(dir)) != NULL) {
		if (pdirent->d_name[0] == '.')
			continue;

		(void) strcpy(namebuf, dirname);
		(void) strcat(namebuf, pdirent->d_name);

		fds = open(namebuf, O_RDONLY, 0);
		if (fds < 0) {
			log_err(errno, __func__, "open of task file");
			unlink(namebuf);
			continue;
		}

		/* read in task quick save sub-structure */
		if (read(fds, (char *) &task_save, sizeof(task_save)) !=
		    sizeof(task_save)) {
			log_err(errno, __func__, "read");
			unlink(namebuf);
			(void) close(fds);
			continue;
		}
		if ((pt = momtask_create(pjob)) == NULL) {
			unlink(namebuf);
			(void) close(fds);
			continue;
		}
		pt->ti_qs = task_save;
		(void) close(fds);
	}
	if (errno != 0 && errno != ENOENT) {
		log_err(errno, __func__, "readdir");
		(void) closedir(dir);
		return -1;
	}
	(void) closedir(dir);
#endif /* WIN32 */

	return 0;
}

/**
 * @brief
 *	Send a reply message to a user proc over a TCP stream.
 *
 * @param[in] stream - file descriptor to tasks
 * @param[in] version - protocol version
 * @param[in] com - command event
 * @param[in] event - event number
 *
 * @return int
 * @retval (DIS_SUCCESS) 0  No error
 *
 */
int
tm_reply(int stream, int version, int com, tm_event_t event)
{
	int ret;

	DBPRT(("tm_reply: stream %d version %d com %d event %d\n",
	       stream, version, com, event))
	DIS_tcp_funcs();

	ret = diswsi(stream, TM_PROTOCOL);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswsi(stream, version);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswsi(stream, com);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswsi(stream, event);
	if (ret != DIS_SUCCESS)
		goto done;
	return DIS_SUCCESS;

done:
	DBPRT(("tm_reply: send error %s\n", dis_emsg[ret]))
	return ret;
}

/**
 * @brief
 *	Start a standard inter-MOM message.
 *
 * @param[in] stream - file descriptor
 * @param[in] jobid  - character pointer holding jobid
 * @param[in] cookie -
 * @param[in] command - command for task
 * @param[in] event   - event number
 * @param[in] taskid  - task id
 * @param[in] version - protocol version
 *
 * @return int
 * @retval (DIS_SUCCESS) 0  No error
 *
 */
int
im_compose(int stream, char *jobid, char *cookie, int command,
	   tm_event_t event, tm_task_id taskid, int version)
{
	int ret;

	if (stream < 0)
		return DIS_EOF;
	DIS_tpp_funcs();

	ret = diswsi(stream, IM_PROTOCOL);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswsi(stream, version);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswst(stream, jobid);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswst(stream, cookie);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswsi(stream, command);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswsi(stream, event);
	if (ret != DIS_SUCCESS)
		goto done;
	ret = diswui(stream, taskid);
	if (ret != DIS_SUCCESS)
		goto done;
	return DIS_SUCCESS;

done:
	DBPRT(("im_compose: send error %s\n", dis_emsg[ret]))
	return ret;
}

/**
 * @brief
 * 	Close the sister streams associated with the mcast channel
 * 	for a job
 * @param[in] pjob - structure handle to job
 *
 * @return Void
 *
 */
void
close_sisters_mcast(job *pjob)
{
	int i;

	for (i = 0; i < pjob->ji_numnodes; i++) {
		hnodent *np = &pjob->ji_hosts[i];
		if (np->hn_stream != -1) {
			tpp_close(np->hn_stream);
			np->hn_stream = -1;
		}
	}
}

/**
 * @brief
 *	simple helper function that checks whether pbs_comm is up
 *	up for a specified duration of time (in seconds)
 *
 * @param[in] - age of connection from establishment time, in seconds
 *
 * @return - communication up or down code
 * @retval 0 - Communications down or is younger than "maturity_time"
 * @retval 1 - Communications up and older than "maturity_time"
 *
 */
int
is_comm_up(int maturity_time)
{
	if ((mom_net_up == 1) && ((time_now - mom_net_up_time) > maturity_time))
		return 1;

	return 0;
}

/**
 * @brief
 *	Modify job 'pjob''s exec_vnode, exec_host, exec_host2
 *	values so that only the nodes/vnodes
 *	belonging to the parent 'momlist' (pjob->ji_momlist)
 *	are retained, and that it satisfies only the given 'select_str'.
 *
 * @param[in,out] pjob 		- job whose exec_vnode/exec_host/exec_host2
 *				 is being pruned.
 * @param[in]	select_str 	- the "schedselect"-like string containing the
 *				specifications that will filter the job's
 *				exec_vnode value.
 *				If this is NULL, then this function
 *				does not prune job's
 *				exec_vnode/exec_host/exec_host2, but rather
 *				just return in 'failed_vnodes' the list of
 *				vnodes assigned to the job that
 *				have non-functioning parent moms.
 *
 * @param[out]  failed_vnodes - returns in here the vnodes and their resources
 *				that have been taken out from the list of
 *				vnodes assigned with non-functioning parent
 *				moms.
 *
 * @return  int
 * @retval	0	- for success
 * @retval	1	- if any error occurred.
 *
 * @note
 *	The first chunk in job's original exec_vnode value is always retained.
 *	It is the one assigned by the mother superior mom.
*/
int
prune_exec_vnode(job *pjob, char *select_str, vnl_t **failed_vnodes, vnl_t **good_vnodes, char *err_msg, int err_msg_sz)
{
	char *execvnode = NULL;
	char *exechost = NULL;
	char *exechost2 = NULL;
	char *schedselect = NULL;
	int rc = 1;
	char *new_exec_vnode = NULL;
	char *new_exec_host = NULL;
	char *new_exec_host2 = NULL;
	char *new_schedselect = NULL;
	int entry = 0;
	relnodes_input_t r_input;
	relnodes_input_select_t r_input_select;

	if (pjob == NULL) {
		log_err(-1, __func__, "job parameter is NULL");
		return (1);
	}

	if (((is_jattr_set(pjob, JOB_ATR_exec_vnode)) == 0) ||
	    (get_jattr_str(pjob, JOB_ATR_exec_vnode) == NULL)) {
		log_err(-1, __func__, "no execvnode");
		return (1);
	}

	execvnode = get_jattr_str(pjob, JOB_ATR_exec_vnode);
	if (execvnode == NULL) {
		log_err(-1, __func__, "execvnode is NULL");
		return (1);
	}

	if (((is_jattr_set(pjob, JOB_ATR_exec_host)) != 0) &&
	    (get_jattr_str(pjob, JOB_ATR_exec_host) != NULL)) {
		exechost = get_jattr_str(pjob, JOB_ATR_exec_host);
	}

	if (((is_jattr_set(pjob, JOB_ATR_exec_host2)) != 0) &&
	    (get_jattr_str(pjob, JOB_ATR_exec_host2) != NULL)) {
		exechost2 = get_jattr_str(pjob, JOB_ATR_exec_host2);
	}

	if (((is_jattr_set(pjob, JOB_ATR_SchedSelect)) != 0) &&
	    (get_jattr_str(pjob, JOB_ATR_SchedSelect) != NULL)) {
		schedselect = get_jattr_str(pjob, JOB_ATR_SchedSelect);
	}

	if ((exechost == NULL) && (exechost2 == NULL)) {
		log_err(-1, __func__, "no exechost nor exechost2");
		goto prune_exec_vnode_exit;
	}

	if (exechost == NULL)
		exechost = exechost2;

	if (exechost2 == NULL)
		exechost2 = exechost;

	relnodes_input_init(&r_input);
	r_input.jobid = pjob->ji_qs.ji_jobid;
	r_input.execvnode = execvnode;
	r_input.exechost = exechost;
	r_input.exechost2 = exechost2;
	r_input.schedselect = schedselect;
	r_input.p_new_exec_vnode = &new_exec_vnode;
	r_input.p_new_exec_host[0] = &new_exec_host;
	r_input.p_new_exec_host[1] = &new_exec_host2;
	r_input.p_new_schedselect = &new_schedselect;

	relnodes_input_select_init(&r_input_select);
	r_input_select.select_str = select_str;
	r_input_select.failed_mom_list = &pjob->ji_failed_node_list;
	r_input_select.succeeded_mom_list = &pjob->ji_node_list;
	r_input_select.failed_vnodes = failed_vnodes;
	r_input_select.good_vnodes = good_vnodes;
	rc = pbs_release_nodes_given_select(&r_input, &r_input_select, err_msg, LOG_BUF_SIZE);

	snprintf(log_buffer, sizeof(log_buffer), "MOM: release_nodes_given_select: AFT rc=%d keep_select=%s execvnode=%s exechost=%s exechost2=%s new_exec_vnode=%s new_exec_host=%s new_exec_host2=%s new_schedselect=%s", rc, "NULL", execvnode,
		 exechost ? exechost : "null", exechost2 ? exechost2 : "null",
		 new_exec_vnode ? new_exec_vnode : "null",
		 new_exec_host ? new_exec_host : "null",
		 new_exec_host2 ? new_exec_host2 : "null", new_schedselect);
	log_event(PBSEVENT_DEBUG4, PBS_EVENTCLASS_SERVER, LOG_ERR, __func__, log_buffer);

	if ((rc != 0) || (select_str == NULL)) {
		/* a NULL select_str means to just return in
		 * 'failed_vnodes' those vnodes that are assigned to
		 * job that have been seen as down.
		 */
		goto prune_exec_vnode_exit;
	}

	if (new_exec_vnode != NULL) {

		if (strcmp(execvnode, new_exec_vnode) == 0) {
			/* there was no change */
			rc = 0;
			goto prune_exec_vnode_exit;
		}

		entry = strlen(new_exec_vnode) - 1;
		if (new_exec_vnode[entry] == '+')
			new_exec_vnode[entry] = '\0';

		set_jattr_str_slim(pjob, JOB_ATR_exec_vnode, new_exec_vnode, NULL);

		(void) update_resources_list(pjob, ATTR_l, JOB_ATR_resource, new_exec_vnode, INCR, 0, JOB_ATR_resource_orig);
	}

	if (new_exec_host != NULL) {
		entry = strlen(new_exec_host) - 1;
		if (new_exec_host[entry] == '+')
			new_exec_host[entry] = '\0';
		set_jattr_str_slim(pjob, JOB_ATR_exec_host, new_exec_host, NULL);
	}

	if (new_exec_host2 != NULL) {
		entry = strlen(new_exec_host2) - 1;
		if (new_exec_host2[entry] == '+')
			new_exec_host2[entry] = '\0';
		set_jattr_str_slim(pjob, JOB_ATR_exec_host2, new_exec_host2, NULL);
	}

	if (new_schedselect != NULL) {
		set_jattr_str_slim(pjob, JOB_ATR_SchedSelect, new_schedselect, NULL);
	}

	rc = 0;
prune_exec_vnode_exit:
	free(new_exec_vnode);
	free(new_exec_host);
	free(new_exec_host2);
	free(new_schedselect);

	return (rc);
}

/**
 * @brief
 *	Send to sister nodes updates to exec_vnode, exec_host2,
 *	and schedselect job attributes.
 *
 * @param[in]	pjob - job to update
 *
 * @return int
 * @retval <num>	- # of successfully sent requests to sis moms.
 * @retval -1		- for failure.
*/
int
send_sisters_job_update(job *pjob)
{
	pbs_list_head phead;
	int mtfd = -1;
	int com;
	svrattrl *psatl;
	char *cookie;
	int num = 0;
	hnodent *np;
	eventent *ep = NULL;
	eventent *nep = NULL;
	int i;
	int ret;

	if (pjob == NULL) {
		log_err(-1, __func__, "bad pjob parameter");
		return (-1);
	}
	if (pjob->ji_numnodes <= 1) {
		return (0);
	}
	if (!is_jattr_set(pjob, JOB_ATR_Cookie)) {
		log_err(-1, __func__, "job cookie not set");
		return (-1);
	}

	cookie = get_jattr_str(pjob, JOB_ATR_Cookie);

	CLEAR_HEAD(phead);

	(void) job_attr_def[(int) JOB_ATR_exec_vnode].at_encode(
		get_jattr(pjob, JOB_ATR_exec_vnode),
		&phead,
		ATTR_execvnode,
		NULL,
		ATR_ENCODE_MOM,
		NULL);

	(void) job_attr_def[(int) JOB_ATR_exec_host2].at_encode(
		get_jattr(pjob, JOB_ATR_exec_host2),
		&phead,
		ATTR_exechost2,
		NULL,
		ATR_ENCODE_MOM,
		NULL);

	(void) job_attr_def[(int) JOB_ATR_SchedSelect].at_encode(
		get_jattr(pjob, JOB_ATR_SchedSelect),
		&phead,
		ATTR_SchedSelect,
		NULL,
		ATR_ENCODE_MOM,
		NULL);

	attrl_fixlink(&phead);
	/* Open streams to the sisterhood.  */
	if (pbs_conf.pbs_use_mcast == 1) {
		/* open the tpp mcast channel here */
		if ((mtfd = tpp_mcast_open()) == -1) {
			sprintf(log_buffer, "mcast open failed");
			log_err(errno, __func__, log_buffer);
			return (-1);
		}
	}

	psatl = (svrattrl *) GET_NEXT(phead);
	com = IM_UPDATE_JOB;
	num = 0;
	for (i = 1; i < pjob->ji_numnodes; i++) {
		np = &pjob->ji_hosts[i];

		if (reliable_job_node_find(&pjob->ji_failed_node_list, np->hn_host) != NULL) {
			/* ensure current node (which is managed by a failed mom
			 * host) is not flagged as a problem
			 */
			if (pjob->ji_nodekill == np->hn_node)
				pjob->ji_nodekill = TM_ERROR_NODE;
			snprintf(log_buffer, sizeof(log_buffer),
				 "not sending request IM_UPDATE_JOB to failed mom %s",
				 np->hn_host ? np->hn_host : "UNDEFINED");
			log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
			continue;
		}
		if (np->hn_stream == -1)
			np->hn_stream = tpp_open(np->hn_host, np->hn_port);
		if (np->hn_stream < 0) {
			snprintf(log_buffer, sizeof(log_buffer),
				 "tpp_open failed on %s:%d", np->hn_host, np->hn_port);
			log_err(errno, __func__, log_buffer);
			free_attrlist(&phead);
			if (pbs_conf.pbs_use_mcast == 1)
				tpp_mcast_close(mtfd);
			return (-1);
		}

		if (nep == NULL) {
			nep = event_alloc(pjob, com, -1, np,
					  TM_NULL_EVENT, TM_NULL_TASK);
			ep = nep;
		} else {
			ep = event_dup(nep, pjob, np);
		}

		if (ep == NULL) {
			sprintf(log_buffer,
				"failed to create event for %s",
				np->hn_host ? np->hn_host : "node");
			log_err(errno, __func__, log_buffer);
			tpp_close(np->hn_stream);
			np->hn_stream = -1;
			if (pbs_conf.pbs_use_mcast == 1)
				tpp_mcast_close(mtfd);
			free_attrlist(&phead);
			return (-1);
		}
		if (pbs_conf.pbs_use_mcast == 1) {
			/* add each of the tpp streams to the tpp mcast channel */
			if (tpp_mcast_add_strm(mtfd, np->hn_stream, FALSE) == -1) {
				snprintf(log_buffer,
					 sizeof(log_buffer),
					 "mcast add to %s failed",
					 np->hn_host ? np->hn_host : "node");
				log_err(errno, __func__, log_buffer);
				tpp_close(np->hn_stream);
				np->hn_stream = -1;
				tpp_mcast_close(mtfd);
				free_attrlist(&phead);
				return (-1);
			}
		} else {
			/* send message header */
			ret = im_compose(np->hn_stream,
					 pjob->ji_qs.ji_jobid, cookie,
					 com, ep->ee_event, TM_NULL_TASK,
					 IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS) {
				snprintf(log_buffer, sizeof(log_buffer),
					 "failed to send job update to %s",
					 np->hn_host ? np->hn_host : "node");
				log_err(errno, __func__, log_buffer);
				free_attrlist(&phead);
				continue;
			}
			(void) encode_DIS_svrattrl(np->hn_stream,
						   psatl);
			(void) dis_flush(np->hn_stream);
		}
		num++;
	}

	if (pbs_conf.pbs_use_mcast == 1) {
		if (num > 0) {
			ret = im_compose(mtfd, pjob->ji_qs.ji_jobid,
					 cookie, com, ep->ee_event, TM_NULL_TASK,
					 IM_OLD_PROTOCOL_VER);

			if (ret != DIS_SUCCESS) {
				log_err(errno, __func__, "compose mcast header failed");
				tpp_mcast_close(mtfd);
				free_attrlist(&phead);
				return (-1);
			}
			(void) encode_DIS_svrattrl(mtfd, psatl);

			ret = dis_flush(mtfd);
			if (ret != DIS_SUCCESS) {
				log_err(errno, __func__, "flush mcast stream failed");
				tpp_mcast_close(mtfd);
				free_attrlist(&phead);
				return (-1);
			}
		}
		tpp_mcast_close(mtfd);
	}

	free_attrlist(&phead);
	return (num);
}
/**
 *
 * @brief
 *	Receive job updates to exec_vnode, exec_host2, and
 *	schedselect from mother superior via 'stream'.
 *	This would cause job_nodes() to get called to
 *	re-populate nodes info (i.e. ji_vnods)
 *
 * @return int
 * @retval 0	- successs
 * retval  -1	- failure
 */
int
receive_job_update(int stream, job *pjob)
{
	pbs_list_head lhead;
	int found_exechost = 0;
	int found_execvnode = 0;
	int found_schedselect = 0;
	int index;
	int errcode;
	int rc;
	int i;
	svrattrl *psatl;

	CLEAR_HEAD(lhead);
	if (decode_DIS_svrattrl(stream, &lhead) != DIS_SUCCESS) {
		log_err(-1, __func__, "decode_DIS_svrattrl failed");
		return (-1);
	}
	for (psatl = (svrattrl *) GET_NEXT(lhead);
	     psatl; psatl = (svrattrl *) GET_NEXT(psatl->al_link)) {

		/* identify the attribute by name */
		index = find_attr(job_attr_idx, job_attr_def, psatl->al_name);
		if (index < 0) { /* didn`t recognize the name */
			snprintf(log_buffer, sizeof(log_buffer),
				 "did not recognize attribute name %s", psatl->al_name);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
				  pjob->ji_qs.ji_jobid, log_buffer);
			free_attrlist(&lhead);
			return (-1);
		}

		if (strcmp(psatl->al_name, ATTR_execvnode) == 0) {
			found_execvnode = 1;
		} else if (strcmp(psatl->al_name, ATTR_SchedSelect) == 0) {
			found_schedselect = 1;
		} else if (strcmp(psatl->al_name, ATTR_exechost2) == 0) {
			found_exechost = 1;
		} else {
			snprintf(log_buffer, sizeof(log_buffer),
				 "warning: ignoring attribute name %s", psatl->al_name);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
				  pjob->ji_qs.ji_jobid, log_buffer);
			continue;
		}

		errcode = set_jattr_generic(pjob, index, psatl->al_value, psatl->al_resc, INTERNAL);
		/* Unknown resources still get decoded */
		/* under "unknown" resource def */
		if ((errcode != 0) && (errcode != PBSE_UNKRESC)) {
			snprintf(log_buffer, sizeof(log_buffer),
				 "failed to decode attribute name %s", psatl->al_name);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
				  pjob->ji_qs.ji_jobid, log_buffer);
			free_attrlist(&lhead);
			return (-1);
		}

		if (psatl->al_op == DFLT)
			(get_jattr(pjob, index))->at_flags |= ATR_VFLAG_DEFLT;
	}
	free_attrlist(&lhead);
	for (i = 0; i < pjob->ji_numvnod; i++) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "before: ji_vnods[%d].vn_node=%d phy node %d host=%s",
			 i, pjob->ji_vnods[i].vn_node,
			 pjob->ji_vnods[i].vn_host->hn_node,
			 pjob->ji_vnods[i].vn_host->hn_host ? pjob->ji_vnods[i].vn_host->hn_host : "");
		log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG,
			  pjob->ji_qs.ji_jobid, log_buffer);
	}
	if (found_execvnode && found_schedselect && found_exechost) {
		mom_hook_input_t hook_input;
		mom_hook_output_t hook_output;
		char hook_msg[HOOK_MSG_SIZE + 1];
		int hook_errcode = 0;
		hook *last_phook;
		unsigned int hook_fail_action = 0;

		if ((rc = job_nodes(pjob)) != 0) {
			snprintf(log_buffer, sizeof(log_buffer),
				 "failed updating internal nodes data (rc=%d)", rc);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
				  pjob->ji_qs.ji_jobid, log_buffer);
			return (-1);
		}

		mom_hook_input_init(&hook_input);
		hook_input.pjob = pjob;

		mom_hook_output_init(&hook_output);
		hook_output.reject_errcode = &hook_errcode;
		hook_output.last_phook = &last_phook;
		hook_output.fail_action = &hook_fail_action;
		if (mom_process_hooks(HOOK_EVENT_EXECJOB_RESIZE,
				      PBS_MOM_SERVICE_NAME, mom_host,
				      &hook_input, &hook_output,
				      hook_msg, sizeof(hook_msg), 1) == 0) {
			snprintf(log_buffer, sizeof(log_buffer), "execjob_resize hook rejected request: %s", hook_msg);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE, pjob->ji_qs.ji_jobid, log_buffer);
			return (-1);
		}

		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, "updated nodes info");

		pjob->ji_updated = 1;
		(void) job_save(pjob);

		for (i = 0; i < pjob->ji_numvnod; i++) {
			snprintf(log_buffer, sizeof(log_buffer),
				 "after: ji_vnods[%d].vn_node=%d phy node %d "
				 "host=%s",
				 i,
				 pjob->ji_vnods[i].vn_node,
				 pjob->ji_vnods[i].vn_host->hn_node,
				 pjob->ji_vnods[i].vn_host->hn_host ? pjob->ji_vnods[i].vn_host->hn_host : "");
			log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB,
				  LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
		}
	}
	return (0);
}

/**
 * @brief
 *	Returns 1 if mom entry ('mname', 'port')  is listed
 *	as one of the entries in '+' separated 'exechost' string.
 *
 * @param[in]	exechost - a string of the form:
 *		  exec_host2: <host1>:<port1>/...+<host2>:<port2>/...
 *		  - or -
 *		  exec_host: <host1>/...+<host2>/...
 * @param[in]	mname - mom  hostname to match.
 * @param[in]	port - mom  port number to match.
 *
 * @return int
 * @retval 1	- for a match.
 * @retval 0	- for a non-match or error.
 */
static int
in_exechost(char *exechost, char *mname, int port)
{
	char *ehost = NULL;
	char *str = NULL;
	char *hname = NULL;
	int hport;
	char *pc, *pc2 = NULL;
	momvmap_t *pnat = NULL;
	int match_short = 0;
	char *save_ptr; /* posn for strtok_r() */

	if ((exechost == NULL) || (mname == NULL)) {
		log_err(PBSE_INTERNAL, __func__, "bad input parameter");
		return 0;
	}

	ehost = strdup(exechost);
	if (ehost == NULL) {
		log_err(errno, __func__, "strdup failed");
		return 0;
	}

	str = strtok_r(ehost, "+", &save_ptr);
	while (str != NULL) {
		hname = str;
		hport = -1;
		pc = strchr(str, ':');
		match_short = 0;
		if (pc != NULL) {
			*pc = '\0';
			pc++;
			pc2 = strchr(pc, '/');
			if (pc2 != NULL)
				*pc2 = '\0';
			hport = atoi(pc);
		} else { /* no port info...not exechost2 format*/
			pc2 = strchr(hname, '/');
			if (pc2 != NULL)
				*pc2 = '\0';
			pnat = find_vmap_entry(hname);
			if (pnat != NULL) {
				/* found a map entry */
				hport = pnat->mvm_mom->mi_port;
			} else {
				/* no map entry, use standard port */
				/* and match up to short names */
				hport = pbs_mom_port;
				match_short = 1;
			}
		}

		if (match_short) {
			pc = strchr(hname, '.');
			if (pc != NULL)
				*pc = '\0';

			pc2 = strchr(mname, '.');
			if (pc2 != NULL)
				*pc2 = '\0';

			if ((strcmp(hname, mname) == 0) && (hport == port)) {
				if (pc != NULL)
					*pc = '.';
				if (pc2 != NULL)
					*pc2 = '.';

				free(ehost);
				return 1;
			}
			if (pc != NULL)
				*pc = '.';
			if (pc2 != NULL)
				*pc2 = '.';

		} else {
			if ((strcmp(hname, mname) == 0) && (hport == port)) {
				free(ehost);
				return 1;
			}
		}
		str = strtok_r(NULL, "+", &save_ptr);
	}

	free(ehost);
	return 0;
}
/**
 * @brief
 *	Send a message (command = com) to all the other MOMs in
 *	'pjob'.  Set ji_nodekill if there is a problem
 *	with a node.  Call the function command_func if it is
 *	not NULL.  It can be used to send extra information.
 *
 * @param[in] pjob - structure handle to job
 * @param[in] com  - command for task
 * @param[in] command_func - function
 * @param[in] exclude_exec_host - if sister host match one of these,
 *			then ignore sending mcast message to that host.
 *
 * @return int
 * @retval num - number of nodes without problem
 * @retval 0   - Failure
 *
 */
int
send_sisters_mcast_inner(job *pjob, int com, pbs_jobndstm_t command_func,
			 char *exclude_exec_host)
{
	int i, num, ret;
	eventent *ep, *nep = NULL;
	tm_event_t event = TM_NULL_EVENT;
	char *cookie;
	int mtfd;

	DBPRT(("send_sisters_mcast: command %d\n", com))
	if (!(is_jattr_set(pjob, JOB_ATR_Cookie)))
		return 0;
	cookie = get_jattr_str(pjob, JOB_ATR_Cookie);
	num = 0;

	/* open the tpp mcast channel here */
	if ((mtfd = tpp_mcast_open()) == -1)
		return 0;

	for (i = 0; i < pjob->ji_numnodes; i++) {
		hnodent *np = &pjob->ji_hosts[i];

		if (np->hn_node == pjob->ji_nodeid) /* this is me */
			continue;

		if (pjob->ji_nodekill == TM_ERROR_NODE)
			pjob->ji_nodekill = np->hn_node;

		if (np->hn_sister != SISTER_OKAY) /* sis is gone? */
			continue;

		/*
		 ** 'np' holds the RM port number in np->hn_port
		 ** while exclude_exec_host stores the MOM port
		 ** number. So we need to compare against
		 ** np->hn_port-1, for PBS mom expects RM port =
                 ** MOM port + 1.
		 */
		if ((exclude_exec_host != NULL) &&
		    in_exechost(exclude_exec_host, np->hn_host,
				np->hn_port - 1)) {
			/*
			 ** ensure current node (which is managed by an
			 ** excluded mom host) is not flagged as a problem
			 */
			if (pjob->ji_nodekill == np->hn_node)
				pjob->ji_nodekill = TM_ERROR_NODE;
			continue;
		}

		if (reliable_job_node_find(&pjob->ji_failed_node_list, np->hn_host) != NULL) {
			/* ensure current node (which is managed by a failed mom
			 * host) is not flagged as a problem
			 */
			if (pjob->ji_nodekill == np->hn_node)
				pjob->ji_nodekill = TM_ERROR_NODE;
			snprintf(log_buffer, sizeof(log_buffer),
				 "not sending request %d to failed mom %s",
				 com, np->hn_host ? np->hn_host : "UNDEFINED");
			log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
			continue;
		}

		if (np->hn_stream == -1)
			np->hn_stream = tpp_open(np->hn_host, np->hn_port);
		np->hn_sister = SISTER_EOF;

		if (np->hn_stream == -1)
			continue;

		/* add each of the tpp streams to the tpp mcast channel */
		if (tpp_mcast_add_strm(mtfd, np->hn_stream, FALSE) == -1) {
			tpp_close(np->hn_stream);
			np->hn_stream = -1;
			continue;
		}

		if (com == IM_DELETE_JOB)
			event = TM_NULL_EVENT;
		else {
			if (nep == NULL) {
				nep = event_alloc(pjob, com, -1, np,
						  TM_NULL_EVENT, TM_NULL_TASK);
				ep = nep;
			} else {
				ep = event_dup(nep, pjob, np);
			}
			event = ep->ee_event;
		}

		if (pjob->ji_nodekill == np->hn_node)
			pjob->ji_nodekill = TM_ERROR_NODE;
		np->hn_sister = SISTER_OKAY;
		num++;
	}

	if (num > 0) {
		ret = im_compose(mtfd, pjob->ji_qs.ji_jobid,
				 cookie, com, event, TM_NULL_TASK, IM_OLD_PROTOCOL_VER);
		if (ret != DIS_SUCCESS) {
			close_sisters_mcast(pjob);
			tpp_mcast_close(mtfd);
			return 0;
		}

		/*
		 ** Here we send any extra information that needs
		 ** to follow the standard set.
		 ** There was a np being passed, have to think what to do about that
		 */
		if (command_func != NULL) {
			ret = command_func(pjob, NULL, mtfd);
			if (ret != DIS_SUCCESS) {
				close_sisters_mcast(pjob);
				tpp_mcast_close(mtfd);
				return 0;
			}
		}
		ret = dis_flush(mtfd);
		if (ret != DIS_SUCCESS) {
			close_sisters_mcast(pjob);
			tpp_mcast_close(mtfd);
			return 0;
		}
	}

	tpp_mcast_close(mtfd);
	return num;
}

/**
 * @brief
 *	Send a message (command = com) to all the other MOMs not
 *	in 'exclude_exec_host' list attached to 'pjob'.
 *	Set ji_nodekill if there is a problem
 *	with a node.  Call the function command_func if it is
 *	not NULL.  It can be used to send extra information.
 *
 * @param[in] pjob - structure handle to job
 * @param[in] com  - command for task
 * @param[in] command_func - function
 * @param[in] exclude_exec_host - if not NULL, do not
 *				send command 'com' to MOM hostnames
 *				appearing in this list, which has the
 *				form:
 *				 <host1>:<port1>/...+<host2>:<port2>/...
 *				 - or -
 *				<host1>/...+<host2>/...
 *
 * @return int
 * @retval num - number of command requests sent out.
 * @retval 0   - Failure
 *
 * @note
 *	Set pjob->ji_nodekill if there is a problem with a node.
 *
 */
int
send_sisters_inner(job *pjob, int com, pbs_jobndstm_t command_func,
		   char *exclude_exec_host)
{
	int i, num, ret;
	eventent *ep, *nep = NULL;
	tm_event_t event;
	char *cookie;

	if (pbs_conf.pbs_use_mcast == 1)
		return send_sisters_mcast_inner(pjob, com, command_func,
						exclude_exec_host);

	DBPRT(("send_sisters: command %d\n", com))
	if (!(is_jattr_set(pjob, JOB_ATR_Cookie)))
		return 0;

	cookie = get_jattr_str(pjob, JOB_ATR_Cookie);
	num = 0;
	for (i = 0; i < pjob->ji_numnodes; i++) {
		hnodent *np = &pjob->ji_hosts[i];

		if (np->hn_node == pjob->ji_nodeid) /* this is me */
			continue;

		if (pjob->ji_nodekill == TM_ERROR_NODE)
			pjob->ji_nodekill = np->hn_node;

		if (np->hn_sister != SISTER_OKAY) /* sis is gone? */
			continue;
		/* 'np' holds the RM port number in np->hn_port */
		/* while exclude_exec_host stores the MOM port */
		/* number. So we need to compare against */
		/* np->hn_port-1 */
		if ((exclude_exec_host != NULL) &&
		    in_exechost(exclude_exec_host, np->hn_host,
				np->hn_port - 1)) {
			/* ensure current node (which is managed by an
			 * excluded mom host) is not flagged as a problem
			 */
			if (pjob->ji_nodekill == np->hn_node) {
				pjob->ji_nodekill = TM_ERROR_NODE;
			}
			continue;
		}

		if (reliable_job_node_find(&pjob->ji_failed_node_list, np->hn_host) != NULL) {
			/* ensure current node (which is managed by a failed mom
			 * host) is not flagged as a problem
			 */
			if (pjob->ji_nodekill == np->hn_node)
				pjob->ji_nodekill = TM_ERROR_NODE;
			snprintf(log_buffer, sizeof(log_buffer) - 1,
				 "not sending request %d to failed mom %s",
				 com, np->hn_host ? np->hn_host : "UNDEFINED");
			log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
			continue;
		}

		if (np->hn_stream == -1)
			np->hn_stream = tpp_open(np->hn_host, np->hn_port);

		if (np->hn_stream == -1)
			continue;

		np->hn_sister = SISTER_EOF;

		if (com == IM_DELETE_JOB)
			event = TM_NULL_EVENT;
		else {
			if (nep == NULL) {
				nep = event_alloc(pjob, com, -1, np,
						  TM_NULL_EVENT, TM_NULL_TASK);
				ep = nep;
			} else {
				ep = event_dup(nep, pjob, np);
			}
			if (ep == NULL)
				continue;
			event = ep->ee_event;
		}

		ret = im_compose(np->hn_stream, pjob->ji_qs.ji_jobid,
				 cookie, com, event, TM_NULL_TASK, IM_OLD_PROTOCOL_VER);
		if (ret != DIS_SUCCESS)
			continue;
		/*
		 ** Here we send any extra information that needs
		 ** to follow the standard set.
		 */
		if (command_func != NULL) {
			ret = command_func(pjob, np, np->hn_stream);
			if (ret != DIS_SUCCESS)
				continue;
		}
		ret = dis_flush(np->hn_stream);
		if (ret == -1)
			continue;

		if (pjob->ji_nodekill == np->hn_node)
			pjob->ji_nodekill = TM_ERROR_NODE;
		np->hn_sister = SISTER_OKAY;
		num++;
	}
	return num;
}

/**
 * @brief
 *	This is the wrapper function to 'send_sisters_inner()'.
 */
int
send_sisters(job *pjob, int com, pbs_jobndstm_t command_func)
{
	return (send_sisters_inner(pjob, com, command_func, NULL));
}

#define SEND_ERR(err)                                                                                     \
	if (reply) {                                                                                      \
		(void) im_compose(stream, jobid, cookie, IM_ERROR, event, fromtask, IM_OLD_PROTOCOL_VER); \
		(void) diswsi(stream, err);                                                               \
	}

#define SEND_ERR2(err, errmsg)                                                                             \
	if (reply) {                                                                                       \
		(void) im_compose(stream, jobid, cookie, IM_ERROR2, event, fromtask, IM_OLD_PROTOCOL_VER); \
		(void) diswsi(stream, err);                                                                \
		(void) diswst(stream, errmsg);                                                             \
	}

/**
 * @brief
 * 	Check to see which node a stream is coming from.  Return a NULL
 * 	if it is not assigned to this job.  Return a nodeent pointer if
 * 	it is.
 *
 * @param[in] pjob - structure handle to job
 * @param[in] stream - file descriptor for task
 * @param[in] vnodeid - node id
 *
 * @return structure handle to hnodent - SUCCESS
 * @retval                     NULL    - FAILURE
 *
 */
hnodent *
find_node(job *pjob, int stream, tm_node_id vnodeid)
{
	int i;
	vmpiprocs *vp;
	hnodent *hp;
	struct sockaddr_in *node_addr;
	struct sockaddr_in *stream_addr;

	for (vp = pjob->ji_vnods, i = 0; i < pjob->ji_numvnod; vp++, i++) {
		if (vp->vn_node == vnodeid)
			break;
	}
	if (i == pjob->ji_numvnod) {
		sprintf(log_buffer, "node %d not found", vnodeid);
		log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
		return NULL;
	}

	hp = vp->vn_host; /* host for virtual node */
	node_addr = tpp_getaddr(hp->hn_stream);
	stream_addr = tpp_getaddr(stream);

	if (stream_addr == NULL) { /* caller didn't have a stream */
		/*
		 ** If node is not me and no stream open, open one
		 */
		if (pjob->ji_nodeid != hp->hn_node && node_addr == NULL)
			hp->hn_stream = tpp_open(hp->hn_host, hp->hn_port);
		return hp;
	}

	/*
	 **	No stream recorded in the node info, save this one.
	 */
	if (node_addr == NULL) {
		hp->hn_stream = stream;
		hp->hn_eof_ts = 0;
		return hp;
	}

	/*
	 **	At this point, both the input stream and the recorded
	 **	stream for the node are good.  If they are the same
	 **	index, we are done.
	 */
	if (hp->hn_stream == stream) {
		hp->hn_eof_ts = 0;
		return hp;
	}

	/*
	 **	The node struct has a different stream number saved
	 **	then the one passed in (supposedly from the same node).
	 **	Check to see if stream recorded in the node struct
	 **	and the one passed in have the same IP address.  If
	 **	they do (only a possibly different port number),
	 **	we are fine.  Otherwise, a mixup has happened.
	 **
	 **	TODO: check possible multiple IP addresses for
	 **	a single host.
	 */
	if (memcmp(&stream_addr->sin_addr, &node_addr->sin_addr,
		   sizeof(node_addr->sin_addr)) != 0) {
		sprintf(log_buffer,
			"stream id %d does not match %d to node %d",
			stream, hp->hn_stream, vnodeid);
		log_err(-1, __func__, log_buffer);

		sprintf(log_buffer, "%s: stream addr %s", __func__,
			netaddr(stream_addr));
		log_err(-1, __func__, log_buffer);

		sprintf(log_buffer, "%s: node addr %s", __func__,
			netaddr(node_addr));
		log_err(-1, __func__, log_buffer);
		return NULL;
	}

	hp->hn_eof_ts = 0;
	return hp;
}

/**
 *
 * @brief
 *	 Given an socket address 'ap', return the
 *	 hostname mapping the internet address given in 'ap'.
 *
 * @param[in]	ap	- a socket addresas.
 *
 * @return  char *
 *
 * @retval <string>		- the mapped hostname.
 * @retval "" (empty string)	- if none found or error encountered.
 *
 * @note
 *	The returned hostname points to a fixed memory area that must not
 *	be freed, and get overwritten on the next call to
 *	addr_to_hostname().
 *
 */
char *
addr_to_hostname(struct sockaddr_in *ap)
{
	struct hostent *hp;
	static char *ret_hostname = NULL;
	static int hostname_sz = 0;
	char *tmp_str;
	int new_sz;

	if (ap == NULL)
		return ("");

	hp = gethostbyaddr((void *) &ap->sin_addr, sizeof(struct in_addr), AF_INET);
	if (hp == NULL) {
		snprintf(log_buffer, sizeof(log_buffer), "%s: h_errno=%d",
			 inet_ntoa(ap->sin_addr), h_errno);
		log_err(-1, __func__, log_buffer);
		return ("");
	}
	if (hp->h_name == NULL)
		return ("");

	if ((ret_hostname != NULL) && (strcmp(ret_hostname, hp->h_name) == 0))
		return (ret_hostname);

	new_sz = strlen(hp->h_name) + 1;
	if (new_sz > hostname_sz) {
		tmp_str = realloc(ret_hostname, new_sz);
		if (tmp_str == NULL) {
			log_err(errno, __func__, "error on realloc");
			return ("");
		}
		hostname_sz = new_sz;
		ret_hostname = tmp_str;
	}
	pbs_strncpy(ret_hostname, hp->h_name, hostname_sz);
	return (ret_hostname);
}
/**
 * @brief
 * 	An error has been encountered starting a job.
 * 	Format a message to all the sisterhood to get rid of their copy
 * 	of the job.  There should be no processes running at this point.
 *
 * @param pjob		job encountering error
 * @param code		error code
 * @param nodename	name of host that returned the error
 * @param cmd		string giving a verb for what failed
 *
 * @return Void
 *
 */
void
job_start_error(job *pjob, int code, char *nodename, char *cmd)
{
	void exec_bail(job * pjob, int code, char *txt);

	if ((pjob == NULL) || (nodename == NULL) || (cmd == NULL))
		return;

	snprintf(log_buffer, sizeof(log_buffer),
		 "%s %d from node %s could not %s successfully",
		 __func__, code, nodename, cmd);
	log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
		  pjob->ji_qs.ji_jobid, log_buffer);
	if (do_tolerate_node_failures(pjob)) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "ignoring error from %s as job is tolerant of node failures", nodename);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);
		/* a filled-in log_buffer could be mistaken for an error message */
		log_buffer[0] = '\0';

		reliable_job_node_add(&pjob->ji_failed_node_list, nodename);
		reliable_job_node_delete(&pjob->ji_node_list, nodename);

#ifndef WIN32
		if (pjob->ji_parent2child_moms_status_pipe != -1) {
			size_t r_size;
			r_size = strlen(nodename) + 1;
			if (write_pipe_data(pjob->ji_parent2child_moms_status_pipe, &r_size, sizeof(size_t)) == 0)
				(void) write_pipe_data(pjob->ji_parent2child_moms_status_pipe, nodename, r_size);
			else
				log_err(errno, __func__, "failed to write");
		}
#endif
		return;
	}
	if (get_job_substate(pjob) >= JOB_SUBSTATE_EXITING)
		return;

	if (code == PBSE_HOOK_REJECT_DELETEJOB)
		exec_bail(pjob, JOB_EXEC_FAILHOOK_DELETE, NULL);
	else if (code == PBSE_HOOK_REJECT_RERUNJOB)
		exec_bail(pjob, JOB_EXEC_FAILHOOK_RERUN, NULL);
	else if (code == PBSE_SISCOMM)
		exec_bail(pjob, JOB_EXEC_JOINJOB, NULL);
	else
		exec_bail(pjob, JOB_EXEC_RETRY, NULL);
}

/**
 * @brief
 * 	chk_del_job - check that all the sisters have replied to a
 *	IM_DELETE_JOB_REPLY request or are dead.  When all are done,
 *	reply to the server and purge the job structure
 *
 * @param[in] pjob - structure handle to job
 * @param[in] errcode - error code
 *
 * @return Void
 *
 */
static void
chk_del_job(job *pjob, int errcode)
{
	int bad = 0;
	int i;

	DBPRT(("%s for job %s\n", __func__, pjob->ji_qs.ji_jobid))

	for (i = 1; i < pjob->ji_numnodes; i++) {
		if (reliable_job_node_find(&pjob->ji_failed_node_list, pjob->ji_hosts[i].hn_host) != NULL) {
			DBPRT(("%s: %d IGNORED for node %s\n", __func__,
			       i, pjob->ji_hosts[i].hn_host))
		} else if (pjob->ji_hosts[i].hn_sister == SISTER_OKAY) {
			/* still need to wait for her to answer */
			DBPRT(("%s: %d OK  for node %s\n", __func__,
			       i, pjob->ji_hosts[i].hn_host))
			break;
		} else if (pjob->ji_hosts[i].hn_sister != SISTER_KILLDONE) {
			/* either dead or replied with error */
			++bad;
			DBPRT(("%s: %d EOF for node %s\n", __func__,
			       i, pjob->ji_hosts[i].hn_host))
		} else {
			DBPRT(("%s: %d DONE node %s\n", __func__,
			       i, pjob->ji_hosts[i].hn_host))
		}
	}

	if (i == pjob->ji_numnodes) {
		/* all sisters are dead or have replied, can    */
		/* now reply to the Server's delete job request */
		if (pjob->ji_preq) {
			if ((bad != 0) || (errcode != 0)) {
				if (pjob->ji_hook_running_bg_on == BG_NONE) {
					req_reject(PBSE_SISCOMM, 0, pjob->ji_preq);
					pjob->ji_preq = NULL;
				} else
					pjob->ji_hook_running_bg_on = BG_PBSE_SISCOMM;
			} else {
				reply_ack(pjob->ji_preq);
				pjob->ji_preq = NULL;
			}
		}
		DBPRT(("%s: all sisters done for job %s\n",
		       __func__, pjob->ji_qs.ji_jobid))
		/*
		 * jobs "could" be purged now, but doing so may impact loop
		 * processing at a higher level, see the chain:
		 * im_eof() -> node_bailout() -> here
		 *
		 * So move job to list of jobs to be purged in Mom's main loop.
		 * Note, we use the ji_jobque link, not the ji_alljobs.
		 * If the job is already on the list of jobs to be purged,
		 * do nothing.
		 */
		if (is_linked(&mom_deadjobs, &pjob->ji_jobque)) {
			DBPRT(("%s: job %s ALREADY LINKED to deadjobs\n",
			       __func__, pjob->ji_qs.ji_jobid))
		} else {
			if (is_linked(&mom_polljobs, &pjob->ji_jobque))
				delete_link(&pjob->ji_jobque);
			if (pjob->ji_hook_running_bg_on == BG_NONE)
				append_link(&mom_deadjobs, &pjob->ji_jobque, pjob);
		}
	}
}

/**
 * @brief
 *	Deal with events hooked to a node where a stream has gone
 *	south or we are going away.
 *
 * @param[in] pjob - structure handle to job
 * @param[in] np   - structure handle to hnodent
 *
 * @return Void
 *
 */
void
node_bailout(job *pjob, hnodent *np)
{
	pbs_task *ptask;
	eventent *ep;
	eventent *nxtep;
	int i;
	int keep_event = 0;
	char *name;
	pbs_list_head phead;

	ep = (eventent *) GET_NEXT(np->hn_events);
	while (ep) {
		switch (ep->ee_command) {

			case IM_JOIN_JOB:
				/*
				 ** I'm MS and a node has failed to respond to the
				 ** call.  Maybe in the future the use can specify
				 ** the job can start with a range of nodes so
				 ** one (or more) missing can be tolerated.  Not
				 ** for now.
				 */

				DBPRT(("%s: JOIN_JOB %s jjretry %d old stream %d\n", __func__, pjob->ji_qs.ji_jobid, ep->ee_retry, np->hn_stream))
				if (ep->ee_retry == 0) {
					/* first failure, try to reopen and resend */
					np->hn_stream = tpp_open(np->hn_host,
								 np->hn_port);
					if (np->hn_stream < 0) {
						/* reopen failed - fatal */
						job_start_error(pjob, PBSE_SISCOMM,
								np->hn_host,
								"JOIN_JOB retry");
						break;
					}
					/* clear error indicator set in im_eof */
					np->hn_sister = SISTER_OKAY;
					/* encode job attributes to send to sister */
					CLEAR_HEAD(phead);
					for (i = 0; i < (int) JOB_ATR_LAST; i++) {
						(void) (job_attr_def + i)->at_encode(get_jattr(pjob, i), &phead, (job_attr_def + i)->at_name, NULL, ATR_ENCODE_MOM, NULL);
					}

					++ep->ee_retry; /* retry count */

					/* resend JOIN_JOB to this sister */
					i = np - pjob->ji_hosts;
					DBPRT(("%s: JOIN_JOB %s host %s port %d jjretry %d i %d new stream %d\n", __func__, pjob->ji_qs.ji_jobid, np->hn_host, np->hn_port, ep->ee_retry, i, np->hn_stream))
					send_join_job_restart(IM_JOIN_JOB, ep, i,
							      pjob, &phead);

					free_attrlist(&phead);
					/*
					 * note that this event is to be retained in
					 * in the list since the associated request
					 * is being retried
					 */
					keep_event = 1;
				} else {
					/* failed on a retry - fatal */
					job_start_error(pjob, PBSE_SISCOMM,
							np->hn_host, "JOIN_JOB");
				}
				break;

			case IM_SETUP_JOB:
				/*
				 ** I'm MS and a node has failed during setup.
				 */
				DBPRT(("%s: SETUP_JOB %s\n", __func__, pjob->ji_qs.ji_jobid))
				job_start_error(pjob, PBSE_SISCOMM, np->hn_host,
						"SETUP_JOB");
				break;

			case IM_SUSPEND:
			case IM_RESUME:
				/*
				 ** A MOM has failed to suspend or resume a job.
				 ** I'm mother superior.
				 */
				sprintf(log_buffer, "%s returned EOF",
					(ep->ee_command == IM_SUSPEND) ? "SUSPEND" : "RESUME");
				log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
				if (pjob->ji_mompost != NULL)
					pjob->ji_mompost(pjob, PBSE_SISCOMM);
				break;

			case IM_RESTART:
			case IM_CHECKPOINT:
			case IM_CHECKPOINT_ABORT:
				/*
				 ** A MOM has failed to do a checkpoint.
				 ** I'm mother superior.
				 */
				name = (ep->ee_command == IM_RESTART) ? "RESTART" : (ep->ee_command == IM_CHECKPOINT) ? "CHECKPOINT"
														      : "CHECKPOINT_ABORT";
				sprintf(log_buffer, "%s returned EOF", name);
				log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
				if (pjob->ji_mompost != NULL)
					pjob->ji_mompost(pjob, PBSE_SISCOMM);
				break;

			case IM_ABORT_JOB:
			case IM_KILL_JOB:
				/*
				 ** The job is already in the process of being killed
				 ** but somebody has dropped off the face of the
				 ** earth.  Just check to see if everybody has
				 ** been heard from in some form or another and
				 ** set JOB_SUBSTATE_EXITING if so.
				 */
				DBPRT(("%s: KILL/ABORT JOB %s\n",
				       __func__, pjob->ji_qs.ji_jobid))
				for (i = 1; i < pjob->ji_numnodes; i++) {
					if ((pjob->ji_hosts[i].hn_sister == SISTER_OKAY) && (reliable_job_node_find(&pjob->ji_failed_node_list, pjob->ji_hosts[i].hn_host) == NULL))
						break;
				}
				if (i == pjob->ji_numnodes) { /* all dead */
					if (check_job_substate(pjob, JOB_SUBSTATE_KILLSIS)) {
						set_job_state(pjob, JOB_STATE_LTR_EXITING);
						set_job_substate(pjob, JOB_SUBSTATE_EXITING);
						exiting_tasks = 1;
					}
				}
				break;

			case IM_DELETE_JOB_REPLY:
				/*
				 ** The job is being deleted and a sister just went bye.
				 ** See if everyone else has replied or died.
				 */
				DBPRT(("%s: DELETE_REPLY JOB eof %s\n",
				       __func__, pjob->ji_qs.ji_jobid))
				chk_del_job(pjob, 0);
				break;

			case IM_SPAWN_TASK:
			case IM_GET_TASKS:
			case IM_SIGNAL_TASK:
			case IM_OBIT_TASK:
			case IM_GET_INFO:
			case IM_GET_RESC:
			case IM_CRED:
				/*
				 ** A user attempt failed, inform process.
				 */
				DBPRT(("%s: REQUEST %d %s\n", __func__,
				       ep->ee_command, pjob->ji_qs.ji_jobid))

				ptask = task_check(pjob, ep->ee_fd, ep->ee_taskid);
				if (ptask == NULL)
					break;
				(void) tm_reply(ep->ee_fd, ptask->ti_protover,
						TM_ERROR, ep->ee_client);
				(void) diswsi(ep->ee_fd, TM_ESYSTEM);
				(void) dis_flush(ep->ee_fd);
				break;

			case IM_POLL_JOB:
				/*
				 ** I must be Mother Superior for the job and
				 ** this is an error reply to a poll request.
				 */
				if (do_tolerate_node_failures(pjob)) {

					snprintf(log_buffer, sizeof(log_buffer),
						 "ignoring POLL error from failed mom %s as job is tolerant of node failures",
						 np->hn_host ? np->hn_host : "");
					log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
					break;
				}
				sprintf(log_buffer,
					"POLL failed from node %d", np->hn_node);
				log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
				pjob->ji_nodekill = np->hn_node;
				break;

#ifdef PMIX
			case IM_PMIX:
				/* I am MS and a node has failed a PMIX request. */
				DBPRT(("%s: IM_PMIX %s\n", __func__, pjob->ji_qs.ji_jobid))
				snprintf(log_buffer, sizeof(log_buffer),
					 "sister node %s failed PMIx operation",
					 np->hn_host ? np->hn_host : "");
				exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
				break;
#endif /* PMIX */

			case IM_UPDATE_JOB:
				/*
				 ** I'm MS and a node has failed during job update.
				 */
				DBPRT(("%s: UPDATE_JOB %s\n", __func__, pjob->ji_qs.ji_jobid))
				snprintf(log_buffer, sizeof(log_buffer),
					 "sister node %s failed to update job",
					 np->hn_host ? np->hn_host : "");

#ifndef WIN32
				close_update_pipes(pjob);
#endif
				exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
				break;

			default:
				sprintf(log_buffer,
					"unknown command %d saved in event %d",
					ep->ee_command, ep->ee_event);
				if (pjob && pjob->ji_qs.ji_jobid) {
					log_joberr(-1, __func__, log_buffer,
						   pjob->ji_qs.ji_jobid);
				} else
					log_err(-1, __func__, log_buffer);
				break;
		}

		/* get the next event to check; done here as we may keep */
		/* this event in the list (keep_event==1) or delete it   */
		nxtep = (eventent *) GET_NEXT(ep->ee_next);
		if (keep_event == 0) {
			delete_link(&ep->ee_next);
			free(ep);
		} else {
			keep_event = 0; /* reset for next event */
		}
		ep = nxtep; /* go on to examine the next event */
	}
}

/**
 * @brief
 *	Tie off all loose ends for a job that is going away.  In particular,
 * 	release any special resources.  The job should already be terminated
 * 	before getting here.
 *
 * @param[in] pjob - structure handle to job
 *
 * @see job_clean_extra
 * @see del_job_hw
 *
 * @return Void
 *
 */
void
term_job(job *pjob)
{
	hnodent *np;
	int num;

	for (num = 0, np = pjob->ji_hosts;
	     num < pjob->ji_numnodes;
	     num++, np++) {
		if (np->hn_stream >= 0) {
			np->hn_stream = -1;
			np->hn_sister = SISTER_EOF;
		}
		node_bailout(pjob, np);
	}

	if (job_clean_extra != NULL) {
		(void) job_clean_extra(pjob);
	}

	del_job_hw(pjob); /* release special hardware related resources */
}

/**
 * @brief
 *	Handle a stream that needs to be closed.
 *	May be either from another Mom, or the server.
 *
 * @param[in] stream - file descriptor
 * @param[in] ret    - indicates value for error message to be logged
 *
 * @return Void
 *
 */
void
im_eof(int stream, int ret)
{
	int num;
	job *pjob;
	hnodent *np;
	struct sockaddr_in *addr;

	addr = tpp_getaddr(stream);
	sprintf(log_buffer, "%s from addr %s on stream %d",
		dis_emsg[ret], netaddr(addr), stream);
	log_err(-1, __func__, log_buffer);
	tpp_close(stream);

	if (stream == server_stream) {
		sprintf(log_buffer, "Server closed connection.");
		log_event(PBSEVENT_SYSTEM, PBS_EVENTCLASS_SERVER, LOG_NOTICE,
			  __func__, log_buffer);
		server_stream = -1;
	}

	/*
	 ** Search through all the jobs looking for this stream.
	 ** We want to find if any events are being waited for
	 ** from the "dead" stream and do something with them.
	 */
	for (pjob = (job *) GET_NEXT(svr_alljobs);
	     pjob != NULL;
	     pjob = (job *) GET_NEXT(pjob->ji_alljobs)) {
		for (num = 0, np = pjob->ji_hosts;
		     num < pjob->ji_numnodes;
		     num++, np++) {
			if (np->hn_stream != stream)
				continue;

			np->hn_stream = -1;
			if (np->hn_eof_ts == 0)
				np->hn_eof_ts = time(0);
			pjob->ji_msconnected = 0;

			/*
			 ** In case connection to pbs_comm is down/recently established, do not kill a job that is actually running.
			 ** If this is the MS, we check job substate == JOB_SUBSTATE_RUNNING to see if job is running.
			 ** If this is a sister, we check is substate is JOB_SUBSTATE_PRERUN or JOB_SUBSTATE_RUNNING
			 ** We include PRERUN in case of jobs at sisters since at sister moms job substate stays at PRERUN
			 ** till a tm task is initiated on it by the MS
             ** We also check for substate JOB_SUBSTATE_SUSPEND to retain suspended jobs.
			 **
			 */
			if ((((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) && (check_job_substate(pjob, JOB_SUBSTATE_PRERUN))) ||
			    (check_job_substate(pjob, JOB_SUBSTATE_RUNNING)) || (check_job_substate(pjob, JOB_SUBSTATE_SUSPEND))) {
				if (do_tolerate_node_failures(pjob)) {
					sprintf(log_buffer, "ignoring lost communication with %s as job is tolerant of node failures", np->hn_host);
					log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
					reliable_job_node_add(&pjob->ji_failed_node_list, np->hn_host);
					reliable_job_node_delete(&pjob->ji_node_list, np->hn_host);
#ifndef WIN32
					if (pjob->ji_parent2child_moms_status_pipe != -1) {
						size_t r_size;
						r_size = strlen(np->hn_host) + 1;
						if (write_pipe_data(pjob->ji_parent2child_moms_status_pipe, &r_size, sizeof(size_t)) == 0)
							(void) write_pipe_data(pjob->ji_parent2child_moms_status_pipe, np->hn_host, r_size);
						else
							log_err(errno, __func__, "failed to write");
					}
#endif
					continue;
				} else if ((time_now - np->hn_eof_ts) <= max_poll_downtime_val) {
					sprintf(log_buffer, "lost communication with %s, not killing job yet", np->hn_host);
					log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
					continue;
				}

				if (!is_comm_up(COMM_MATURITY_TIME)) {
					sprintf(log_buffer, "lost connection to %s due to pbs_comm down/recently established, not killing job", np->hn_host);
					log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
					continue;
				}

				sprintf(log_buffer, "lost communication with %s for > %d secs, killing job now", np->hn_host, max_poll_downtime_val);
				log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
			}

			np->hn_sister = SISTER_EOF;
			node_bailout(pjob, np);

			/*
			 ** If dead stream is num = 0, I'm a regular node
			 ** and my connection to Mother Superior is gone...
			 ** kill job.
			 */
			if (num != 0)
				continue;

			sprintf(log_buffer,
				"lost connection to MS on %s", np->hn_host);
			log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
			kill_job(pjob, SIGKILL);
			set_job_substate(pjob, JOB_SUBSTATE_EXITING);
			exiting_tasks = 1;
		}
	}
}

/**
 * @brief
 *	Check to be sure this is a connection from Mother Superior on
 * 	a good port.
 *	Check to make sure I am not Mother Superior (talking to myself).
 * 	Set the stream in ji_nodes[0] if needed.
 *
 * @param[in] stream - file descriptor
 * @param[in] pjob   - structure handle to job
 *
 * @return error code
 * @retval TRUE  error
 * @retval FALSE if okay
 *
 */
int
check_ms(int stream, job *pjob)
{
	struct sockaddr_in *addr;
	hnodent *np;

	addr = tpp_getaddr(stream);

	if (pjob == NULL)
		return FALSE;

	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {
		log_joberr(-1, __func__, "Mother Superior talking to herself",
			   pjob->ji_qs.ji_jobid);
		tpp_eom(stream);
		return TRUE;
	}

	/*
	 ** This should be mother superior calling.
	 ** We always have a stream open to her at node 0.
	 */
	np = &pjob->ji_hosts[0]; /* MS entry */
	if (stream != np->hn_stream) {
		if (np->hn_stream != -1) {
			sprintf(log_buffer,
				"MS reset from %d to %d (%s)",
				np->hn_stream, stream, netaddr(addr));
			log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
		}
		np->hn_stream = stream;
	}
	np->hn_eof_ts = 0;
	pjob->ji_msconnected = 1;
	return FALSE;
}

/**
 * @brief
 *	return resource used by job
 *
 * @param[in] pjob - structure handle to job
 * @param[in] name - character pointer holding resource name
 * @param[in] (*func)(resource *) -
 *
 *
 */
u_long
resc_used(job *pjob, char *name, u_long (*func)(resource *))
{
	resource_def *rd;
	resource *pres;
	u_long val = 0L;

	if (!is_jattr_set(pjob, JOB_ATR_resc_used))
		return 0;

	rd = find_resc_def(svr_resc_def, name);
	if (rd == NULL)
		return 0;

	pres = find_resc_entry(get_jattr(pjob, JOB_ATR_resc_used), rd);
	if (pres == NULL)
		return 0;

	val = func(pres);
	DBPRT(("resc_used: %s %lu\n", name, val))
	return val;
}

/**
 * @brief
 *	Find named info for a task.
 *
 * @param[in] ptask - structure handle to pbs_task
 * @param[in] name  - name of task
 *
 * @return structure handle to infoent
 *
 */
infoent *
task_findinfo(pbs_task *ptask, char *name)
{
	infoent *ip;

	for (ip = (infoent *) GET_NEXT(ptask->ti_info);
	     ip;
	     ip = (infoent *) GET_NEXT(ip->ie_next)) {
		if (strcmp(ip->ie_name, name) == 0)
			break;
	}
	return ip;
}

/**
 * @brief
 *	Save named info with a task.
 *
 * @param[in] ptask - structure handle to pbs_task
 * @param[in] name  - char pointer to hold name of task
 * @param[in] info  - string counted
 * @param[in] len   - length of string
 *
 * @return Void
 *
 */
void
task_saveinfo(pbs_task *ptask, char *name, void *info, int len)
{
	infoent *ip;

	if ((ip = task_findinfo(ptask, name)) == NULL) { /* new name */
		ip = (infoent *) malloc(sizeof(infoent));
		assert(ip);
		CLEAR_LINK(ip->ie_next);
		append_link(&ptask->ti_info, &ip->ie_next, ip);
		ip->ie_name = name;
	} else /* replace name with new info */
		free(ip->ie_info);

	ip->ie_info = info;
	ip->ie_len = len;
}

/**
 * @brief
 *	Generate a resource string for a job.
 *
 * @param pjob - structure handle to job
 *
 * @return string
 * @retval res_string
 *
 */
char *
resc_string(job *pjob)
{
	attribute *at;
	attribute_def *ad;
	svrattrl *pal;
	pbs_list_head lhead;
	int len, used, tot;
	char *res_str, *ch;
	char *getuname();
	extern int resc_access_perm;

	ch = getuname();
	len = strlen(ch);
	tot = len * 2;
	used = 0;
	res_str = (char *) malloc(tot);
	if (res_str == NULL)
		return NULL;
	strcpy(res_str, ch);
	used += len;
	res_str[used++] = ':';

	at = get_jattr(pjob, JOB_ATR_resource);
	if (at->at_type != ATR_TYPE_RESC) {
		res_str[used] = '\0';
		return res_str;
	}
	ad = &job_attr_def[(int) JOB_ATR_resource];
	resc_access_perm = ATR_DFLAG_USRD;
	CLEAR_HEAD(lhead);
	(void) ad->at_encode(at,
			     &lhead, ad->at_name,
			     NULL, ATR_ENCODE_CLIENT, NULL);
	attrl_fixlink(&lhead);

	for (pal = (svrattrl *) GET_NEXT(lhead);
	     pal;
	     pal = (svrattrl *) GET_NEXT(pal->al_link)) {
		while (used + pal->al_rescln + pal->al_valln > tot) {
			char *tmp_res_str;

			tot *= 2;
			tmp_res_str = realloc(res_str, tot);
			if (tmp_res_str == NULL) {
				free(res_str);
				free_attrlist(&lhead);
				return NULL;
			}
			res_str = tmp_res_str;
		}
		strcpy(&res_str[used], pal->al_resc);
		used += (pal->al_rescln - 1);
		res_str[used++] = '=';
		strcpy(&res_str[used], pal->al_value);
		used += (pal->al_valln - 1);
		res_str[used++] = ',';
	}
	free_attrlist(&lhead);
	res_str[--used] = '\0';
	return res_str;
}

/**
 * @brief
 *	Process a cred received in a JOIN_JOB.
 *
 * @param[in] pjob - structure handle to job
 * @param[in] info - string count
 * @param[in] len - length of string
 * @param[in] tcp - indiaction whether tcp or not
 * @param[in] con - inter mom stream
 *
 * @return  error code
 * @retval -1     error
 * @retval  0     Success
 *
 */
int
mom_create_cred(job *pjob, char *info, size_t len, int tcp, int con)
{
	int ret = -1;
	int type = pjob->ji_extended.ji_ext.ji_credtype;

	DBPRT(("%s: entered\n", __func__))
	switch (type) {

		case PBS_CREDTYPE_NONE:
			ret = 0;
			break;

		default:
			ret = write_cred(pjob, info, len);
			break;
	}

	return ret;
}

static char bail_format[] = "dis read failed: %s";

#define BAIL(message)                                      \
	if (ret != DIS_SUCCESS) {                          \
		sprintf(log_buffer, bail_format, message); \
		goto err;                                  \
	}

/**
 * @brief
 *	Send resources_used values to the MS via
 *	'stream' descriptor.
 *
 * @param[in] stream - descriptor pathway to MS.
 * @param[in] pjob - poineter to owning job structure
 *
 * @return  error code
 * @retval -1     error
 * @retval  0     Success
 *
 */
int
send_resc_used_to_ms(int stream, job *pjob)
{
	extern int resc_access_perm;
	attribute *at;
	attribute_def *ad;
	svrattrl *pal;
	svrattrl *nxpal;
	pbs_list_head lhead;
	pbs_list_head send_head;
	svrattrl *psatl;
	int ret;

	if (pjob == NULL || stream == -1)
		return (-1);

	at = get_jattr(pjob, JOB_ATR_resc_used);
	if (at->at_type != ATR_TYPE_RESC)
		return (-1);
	ad = &job_attr_def[(int) JOB_ATR_resc_used];
	resc_access_perm = ATR_DFLAG_MGRD;

	memset(&lhead, 0, sizeof(lhead));
	CLEAR_HEAD(lhead);

	(void) ad->at_encode(at, &lhead, ad->at_name, NULL, ATR_ENCODE_CLIENT, NULL);
	memset(&send_head, 0, sizeof(send_head));
	CLEAR_HEAD(send_head);

	pal = (svrattrl *) GET_NEXT(lhead);
	while (pal != NULL) {
		nxpal = (struct svrattrl *) GET_NEXT(pal->al_link);

		/* no need to track the resources automatically sent to MS */
		/* like 'cput', 'mem', and 'cpupercent',but only those */
		/* resources that are set in a mom hook */
		if ((pal->al_flags & ATR_VFLAG_HOOK) != 0 &&
		    strcmp(pal->al_resc, "cput") != 0 &&
		    strcmp(pal->al_resc, "mem") != 0 &&
		    strcmp(pal->al_resc, "cpupercent") != 0) {
			if (add_to_svrattrl_list(&send_head, pal->al_name, pal->al_resc,
						 pal->al_value, pal->al_op, NULL) == -1) {
				free_attrlist(&send_head);
				free_attrlist(&lhead);
				return (-1);
			}
		}
		pal = nxpal;
	}
	free_attrlist(&lhead);

	psatl = (svrattrl *) GET_NEXT(send_head);
	if (psatl == NULL) {
		free_attrlist(&send_head);
		return (-1);
	}

	ret = encode_DIS_svrattrl(stream, psatl);
	free_attrlist(&send_head);
	if (ret != DIS_SUCCESS)
		return (-1);
	return (0);
}

/**
 * @brief
 *	Received resources_used values for job 'jobid'
 *	from descriptor 'stream', with values to be saved in
 *	internal nodes resources table indexed by 'nodeidx'.
 *
 * @param[in] stream - descriptor pathway
 * @param[in] pjob - pointer to owning job structure
 * @param[in] nodeidx - node index to the job's internal resources table
 *			where received values will be saved.
 *			resources values received from
 *
 * @return  error code
 * @retval -1     error
 * @retval  0     Success
 *
 */
int
recv_resc_used_from_sister(int stream, job *pjob, int nodeidx)
{
	extern int resc_access_perm;
	attribute_def *pdef;
	pbs_list_head lhead;
	svrattrl *psatl;
	int errcode;

	if (pjob == NULL || stream == -1 || nodeidx < 0)
		return (-1);

	pdef = &job_attr_def[(int) JOB_ATR_resc_used];

	CLEAR_HEAD(lhead);
	if (decode_DIS_svrattrl(stream, &lhead) != DIS_SUCCESS) {
		sprintf(log_buffer, "decode_DIS_svrattrl failed");
		return (-1);
	}
	if (is_attr_set(&pjob->ji_resources[nodeidx].nr_used) != 0)
		pdef->at_free(&pjob->ji_resources[nodeidx].nr_used);
	/* decode attributes from request into job structure */
	clear_attr(&pjob->ji_resources[nodeidx].nr_used, &job_attr_def[JOB_ATR_resc_used]);

	resc_access_perm = READ_WRITE;
	psatl = (svrattrl *) GET_NEXT(lhead);
	for (; psatl; psatl = (svrattrl *) GET_NEXT(psatl->al_link)) {

		if ((psatl->al_name == NULL) || (psatl->al_resc == NULL)) {
			free_attrlist(&lhead);
			return (-1);
		}

		if (strcmp(psatl->al_name, ATTR_used) != 0) {
			free_attrlist(&lhead);
			return (-1);
		}

		errcode = set_attr_generic(&pjob->ji_resources[nodeidx].nr_used, pdef, psatl->al_value, psatl->al_resc, INTERNAL);
		/* Unknown resources still get decoded */
		/* under "unknown" resource def */
		if ((errcode != 0) && (errcode != PBSE_UNKRESC)) {
			free_attrlist(&lhead);
			return (-1);
		}

		if (psatl->al_op == DFLT)
			pjob->ji_resources[nodeidx].nr_used.at_flags |= ATR_VFLAG_DEFLT;
	}

	free_attrlist(&lhead);
	return (0);
}

/**
 * @brief
 *	General purpose function for executing actions that are done
 *	before calling finish_exec() on a job.
 *
 * @param[in]       pjob		job being operated on
 * @param[in]       do_job_setup_send	set to 1 if job_setup_send() should be done
 *
 * @return enum pre_finish_results_t
 * @retval PRE_FINISH_SUCCESS			all actions executed successfully
 * @retval PRE_FINISH_FAIL			at least one of the actions has failed
 * @retval PRE_FINISH_SUCCESS_JOB_SETUP_SEND	all actions up to job_setup_send()
 *						succeeded
 * @retval PRE_FINISH_FAIL_JOB_SETUP_SEND	action to do job_setup_send() failed
 * @retval PRE_FINISH_FAIL_JOIN_EXTRA		action to do job_join_extra() failed
 *
 */
pre_finish_results_t
pre_finish_exec(job *pjob, int do_job_setup_send)
{
	if (pjob == NULL)
		return PRE_FINISH_FAIL;

	/*
	 * If job_join_read exists, call it to read
	 * any extra info included with the JOIN reply.
	 * This function can return an error if there
	 * is no extra information or deal with it
	 * more gracefully and return SUCCESS.
	 */
	if (job_join_extra != NULL) {
		if (job_join_extra(pjob, &pjob->ji_hosts[0]) != 0)
			return PRE_FINISH_FAIL_JOIN_EXTRA;
	}

	/*
	 * If there is a job_setup_send function,
	 * send a SETUP_JOB message to each node.
	 * The call to finish_exec will happen
	 * when we get a reply from all the nodes.
	 */
	if (do_job_setup_send && (job_setup_send != NULL)) {
		if (send_sisters(pjob, IM_SETUP_JOB, job_setup_send) != pjob->ji_numnodes - 1)
			return PRE_FINISH_FAIL_JOB_SETUP_SEND;
		return PRE_FINISH_SUCCESS_JOB_SETUP_SEND;
	}
	return PRE_FINISH_SUCCESS;
}

// clang-format off

/**
 * @brief
 *	Input is coming from another MOM over a DIS on tpp stream.
 *	Read the stream to get a Inter-MOM request.
 *
 *	request (
 *		jobid	string
 *		cookie	string
 *		command	int
 *		event	int
 *		task	int
 *	)
 *
 *	Format the reply and write it back.
 *
 *
 * @param[in] stream	inter-MOM TPP stream
 * @param[in] version	inter-MOM protocol version; only IM_PROTOCOL_VER is currently supported
 * @return void
 *
 */
void
im_request(int stream, int version)
{
	int			command = 0;
	int			event_com = -1, ret;
	char			*jobid = NULL;
	char			*cookie = NULL;
	char			*oreo;
	char			basename[MAXPATHLEN + 1] = {0};
	char			namebuf[MAXPATHLEN+1];
	job			*pjob;
	pbs_task		*ptask;
	hnodent			*np;
	eventent		*ep = NULL;
	infoent			*ip;
	struct	sockaddr_in	*addr;
	u_long			ipaddr;
	int			i, errcode;
	int			nodeidx =0;
	int			resc_idx = 0;
	int			reply;
	int			exitval;
	tm_node_id		pvnodeid;
	tm_node_id		tvnodeid;
	tm_task_id		fromtask, event_task = 0, taskid;
	int			hnodenum, index;
	int			num;
	int			sig;
	char			**argv, **envp, *cp;
	char			*name;
	void			*info = NULL;
	size_t			len;
	tm_event_t		event, event_client = 0;
	int			efd = -1;
	pbs_list_head		lhead;
	svrattrl		*psatl;
	extern  unsigned long	 QA_testing;
	extern	int		resc_access_perm;
	int			local_supres(job *pjob, int which,
		struct batch_request *preq);
	char			*errmsg = NULL;
	char			hook_msg[HOOK_MSG_SIZE+1];
	int			argc = 0;
	mom_hook_input_t	hook_input;
	mom_hook_output_t	hook_output;
	mom_hook_input_t	*hook_input_ptr;
	mom_hook_output_t	*hook_output_ptr;
	int			hook_errcode = 0;
	int			hook_rc = 0;
	hook			*last_phook = NULL;
	unsigned int		hook_fail_action = 0;
	char			*nodehost = NULL;
	char			timebuf[TIMEBUF_SIZE] = {0};
  	char			*delete_job_msg = NULL;

	DBPRT(("%s: stream %d version %d\n", __func__, stream, version))
	if ((version != IM_PROTOCOL_VER) && (version != IM_OLD_PROTOCOL_VER)) {
		sprintf(log_buffer, "protocol version %d unknown", version);
		log_err(-1, __func__, log_buffer);
		tpp_close(stream);
		return;
	}

	/* check that machine is known */
	addr = tpp_getaddr(stream);
	if (addr == NULL) {
		sprintf(log_buffer, "Sender unknown");
		log_err(-1, __func__, log_buffer);
		tpp_close(stream);
		return;
	}

	ipaddr = ntohl(addr->sin_addr.s_addr);
	DBPRT(("connect from %s\n", netaddr(addr)))
	if (!addrfind(ipaddr)) {
		sprintf(log_buffer, "bad connect from %s",
			netaddr(addr));
		log_err(-1, __func__, log_buffer);
		im_eof(stream, 0);
		return;
	}

	jobid = disrst(stream, &ret);
	BAIL("jobid")
	cookie = disrst(stream, &ret);
	BAIL("cookie")
	command = disrsi(stream, &ret);
	BAIL("command")
	event = disrsi(stream, &ret);
	BAIL("event")
	fromtask = disrui(stream, &ret);
	BAIL("fromtask")
	switch (command) {

		case IM_JOIN_RECOV_JOB:
			reply = 1;

			hnodenum = disrsi(stream, &ret);
			BAIL("JOINJOB nodenum")

			np = NULL;
			/* job should already exist */
			pjob = find_job(jobid);
			if( pjob == NULL ) {
				SEND_ERR(PBSE_SYSTEM)
				goto done;
			}
			pjob->ji_stdout = disrsi(stream, &ret);
			BAIL("JOINJOB stdout")
			pjob->ji_stderr = disrsi(stream, &ret);
			BAIL("JOINJOB stderr")
			pjob->ji_qs.ji_un.ji_momt.ji_exuid = pjob->ji_grpcache->gc_uid;
			pjob->ji_qs.ji_un.ji_momt.ji_exgid = pjob->ji_grpcache->gc_gid;
			pjob->ji_msconnected = 1;
			goto done;
		case IM_JOIN_JOB:
			/*
			 ** Sender is mom superior sending a job structure to me.
			 ** I am going to become a member of a job.
			 **
			 ** auxiliary info (
			 **	local host id	int;
			 **	number of nodes	int;
			 **	stdout port	int;
			 **	stderr port	int;
			 **	cred type	int;
			 **	credential	string; <if cred type != 0>
			 **	jobattrs	attrl;
			 ** )
			 */
			reply = 1;
			if (check_ms(stream, NULL))
				goto fini;

			hnodenum = disrsi(stream, &ret);
			BAIL("JOINJOB nodenum")

			np = NULL;
			/* does job already exist? */
			pjob = find_job(jobid);
			if (pjob) {	/* job is here */
				kill_job(pjob, SIGKILL);
				mom_deljob(pjob);
			}
			if ((pjob = job_alloc()) == NULL) {
				SEND_ERR(PBSE_SYSTEM)
				goto done;
			}

			pjob->ji_stdout = disrsi(stream, &ret);
			BAIL("JOINJOB stdout")
			pjob->ji_stderr = disrsi(stream, &ret);
			BAIL("JOINJOB stderr")
			pjob->ji_extended.ji_ext.ji_credtype = disrsi(stream, &ret);
			BAIL("JOINJOB credtype")
			if (pjob->ji_extended.ji_ext.ji_credtype != PBS_CREDTYPE_NONE) {
				info = disrcs(stream, &len, &ret);
				BAIL("JOINJOB credential")
			}
			pjob->ji_msconnected = 1;

			pjob->ji_numnodes = hnodenum;
			CLEAR_HEAD(lhead);
			if (decode_DIS_svrattrl(stream, &lhead) != DIS_SUCCESS) {
				sprintf(log_buffer, "decode_DIS_svrattrl failed");
				goto err;
			}
			/*
			 ** Get the hashname from the attribute.
			 */
			psatl = (svrattrl *)GET_NEXT(lhead);
			while (psatl) {
				if (!strcmp(psatl->al_name, ATTR_hashname)) {
					pbs_strncpy(basename, psatl->al_value, sizeof(basename));
					break;
				}
				psatl = (svrattrl *)GET_NEXT(psatl->al_link);
			}
			pbs_strncpy(pjob->ji_qs.ji_jobid, jobid, sizeof(pjob->ji_qs.ji_jobid));
			if (strlen(basename) <= PBS_JOBBASE)
				strcpy(pjob->ji_qs.ji_fileprefix, basename);
			else
				*pjob->ji_qs.ji_fileprefix = '\0';
			pjob->ji_nodeid = -1;
			pjob->ji_qs.ji_svrflags = 0;
			pjob->ji_qs.ji_un_type = JOB_UNION_TYPE_MOM;

			/* decode attributes from request into job structure */
			errcode = 0;
			resc_access_perm = READ_WRITE;
			for (psatl = (svrattrl *)GET_NEXT(lhead);
				psatl;
				psatl = (svrattrl *)GET_NEXT(psatl->al_link)) {

				/* identify the attribute by name */
				index = find_attr(job_attr_idx, job_attr_def, psatl->al_name);
				if (index < 0) {	/* didn`t recognize the name */
					errcode = PBSE_NOATTR;
					break;
				}

				errcode = set_jattr_generic(pjob, index, psatl->al_value, psatl->al_resc, INTERNAL);
				/* Unknown resources still get decoded */
				/* under "unknown" resource def */
				if ((errcode != 0) && (errcode != PBSE_UNKRESC))
					break;

				if (psatl->al_op == DFLT)
					(get_jattr(pjob, index))->at_flags |= ATR_VFLAG_DEFLT;
			}
			free_attrlist(&lhead);
			if (errcode != 0) {
				(void)job_purge_mom(pjob);
				SEND_ERR(errcode)
				goto done;
			}

			pjob->ji_nodeid = TM_ERROR_NODE;
			if ((errcode = job_nodes_inner(pjob, &np)) != 0) {
				sprintf(log_buffer,
					"job_nodes_inner failed with error %d", errcode);
				log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB,
					LOG_NOTICE, pjob->ji_qs.ji_jobid, log_buffer);
				nodes_free(pjob);
				SEND_ERR(errcode)
				goto done;
			}

			pjob->ji_hosts[0].hn_stream = stream;

			if (gen_nodefile_on_sister_mom) {
				char varlist[(2 * MAXPATHLEN) + 1] = "PBS_NODEFILE=";
				char buf[MAXPATHLEN + 1];
				if (generate_pbs_nodefile(pjob, buf, sizeof(buf) - 1, log_buffer, LOG_BUF_SIZE - 1) == 0) {
					strcat(varlist, buf);
					set_jattr_generic(pjob, JOB_ATR_variables, varlist, NULL, INCR);
				}
			}

			/*
			 ** Check to make sure we found ourself.
			 */
			if (pjob->ji_nodeid == TM_ERROR_NODE) {
				char *eh2;

				snprintf(log_buffer, sizeof(log_buffer),
					"no match for my hostname '%s' was found in exec_host2, "
					"possible network misconfiguration", mom_host);

				log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_CRIT, pjob->ji_qs.ji_jobid, log_buffer);

				if ((eh2 = get_jattr_str(pjob,  JOB_ATR_exec_host2)) != NULL) {
					snprintf(log_buffer, sizeof(log_buffer), "exec_host2 = %s", eh2);
					log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB,
					LOG_CRIT, pjob->ji_qs.ji_jobid, log_buffer);
				}

				nodes_free(pjob);
				SEND_ERR(PBSE_INTERNAL);
				goto done;
			}

			/* set remaining job structure elements */
			set_job_state(pjob, JOB_STATE_LTR_RUNNING);
			set_job_substate(pjob, JOB_SUBSTATE_PRERUN);
			set_jattr_l_slim(pjob, JOB_ATR_mtime, time_now, SET);
			pjob->ji_qs.ji_stime = time_now;
			pjob->ji_polltime = time_now;

			/* np is set from job_nodes_inner */


			/*
			 * NULL value passed to hook_input.vnl
			 * means to assign vnode list using pjob->ji_host[].
			 */
			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;

			mom_hook_output_init(&hook_output);
			hook_output.reject_errcode = &hook_errcode;
			hook_output.last_phook = &last_phook;
			hook_output.fail_action = &hook_fail_action;

			switch ((hook_rc=mom_process_hooks(HOOK_EVENT_EXECJOB_BEGIN,
					PBS_MOM_SERVICE_NAME, mom_host,
					&hook_input, &hook_output,
					hook_msg, sizeof(hook_msg), 1))) {
				case 1:   	/* explicit accept */
					break;
				case 2:	/* no hook script executed - go ahead and accept event*/
					break;
				default:
					/* a value of '0' means explicit reject encountered. */
					if (hook_rc != 0) {
						/*
						 * we've hit an internal error (malloc error, full disk, etc...), so
						 * treat this now like a  hook error so hook fail_action will be
						 * consulted. Before, behavior of an internal error was to ignore it!
						 */
						hook_errcode = PBSE_HOOKERROR;
					}
					SEND_ERR2(hook_errcode, (char *)hook_msg);
					if ((hook_errcode == PBSE_HOOKERROR) &&
					    (last_phook != NULL) &&
					    ((last_phook->fail_action & HOOK_FAIL_ACTION_OFFLINE_VNODES) != 0)) {
						vnl_t *tvnl = NULL;
						char	hook_buf[HOOK_BUF_SIZE];
						int	vret = 0;

						snprintf(hook_buf,
							HOOK_BUF_SIZE,
							"1,%s",
							last_phook->hook_name);
						if (vnl_alloc(&tvnl) != NULL) {
							vret = vn_addvnr(tvnl,
								mom_short_name,
								VNATTR_HOOK_OFFLINE_VNODES,
								hook_buf, 0, 0, NULL);
						} else {
							vret = 1;
						}

						if (vret != 0) {
							snprintf(log_buffer,
								sizeof(log_buffer),
								"Failed to add to "
								"vnlp: %s=%s",
								VNATTR_HOOK_OFFLINE_VNODES,
								hook_buf);
							log_event(PBSEVENT_DEBUG2,
								PBS_EVENTCLASS_HOOK,
								LOG_INFO,
								last_phook->hook_name,
								log_buffer);
						} else {
							/* this saves 'tvnl' */
							/* in svr_vnl_action, */
							/* and later freed upon */
							/* server acking action */
							(void)send_hook_vnl(tvnl);
							tvnl = NULL;
						}
						if (tvnl != NULL)
							vnl_free(tvnl);
					}

					mom_deljob(pjob);
					goto done;
			}

			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;

			mom_hook_output_init(&hook_output);
			hook_output.reject_errcode = &hook_errcode;
			hook_output.last_phook = &last_phook;
			hook_output.fail_action = &hook_fail_action;

			DBPRT(("%s: JOIN_JOB %s node %d\n", __func__, jobid, pjob->ji_nodeid))

			/*
			 ** Call job_join_extra to do setup.
			 */
			if (job_join_extra != NULL) {
				errcode = job_join_extra(pjob, np);
				if (errcode != 0) {
					(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
					mom_deljob(pjob);
					SEND_ERR(errcode)
					goto done;
				}
			}

			(void)job_save(pjob);
			(void)strcpy(namebuf, path_jobs);	/* job directory path */
			if (*pjob->ji_qs.ji_fileprefix != '\0')
				(void)strcat(namebuf, pjob->ji_qs.ji_fileprefix);
			else
				(void)strcat(namebuf, pjob->ji_qs.ji_jobid);
			(void)strcat(namebuf, JOB_TASKDIR_SUFFIX);

			if (mkdir(namebuf, 0700) == -1) {
				(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
				mom_deljob(pjob);
				SEND_ERR(PBSE_SYSTEM)
				goto done;
			}
#ifdef WIN32
			/* the following must appear before check_pwd() since the */
			/* latter tries to read cred info */
			if (mom_create_cred(pjob, info, len, FALSE, stream) == -1) {
				(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
				mom_deljob(pjob);
				SEND_ERR(PBSE_SYSTEM)
				goto done;
			}
#endif
			if (check_pwd(pjob) == NULL) {
				log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_NOTICE,
					pjob->ji_qs.ji_jobid, log_buffer);
				(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
				mom_deljob(pjob);
				SEND_ERR(PBSE_BADUSER)
				goto done;
			}
			pjob->ji_qs.ji_un.ji_momt.ji_exuid = pjob->ji_grpcache->gc_uid;
			pjob->ji_qs.ji_un.ji_momt.ji_exgid = pjob->ji_grpcache->gc_gid;

#ifndef WIN32
			if (mom_create_cred(pjob, info, len, FALSE, stream) == -1) {
				(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
				mom_deljob(pjob);
				SEND_ERR(PBSE_SYSTEM)
				goto done;
			}
#endif

			/* create staging and execution dir if sandbox=PRIVATE mode is enabled */
			/* this code should appear after check_pwd() since */
			/* mkjobdir() depends on job uid and gid being set correctly */
			if ((is_jattr_set(pjob, JOB_ATR_sandbox)) &&
				(strcasecmp(get_jattr_str(pjob, JOB_ATR_sandbox), "PRIVATE") == 0)) {
#ifdef WIN32
				if (mkjobdir(pjob->ji_qs.ji_jobid,
					jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir),
					(pjob->ji_user != NULL) ? pjob->ji_user->pw_name : NULL,
					(pjob->ji_user != NULL) ? pjob->ji_user->pw_userlogin : INVALID_HANDLE_VALUE)) {
					sprintf(log_buffer, "unable to create the job directory %s",
						jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir));
					log_err(errno, __func__, log_buffer);
					(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
					mom_deljob(pjob);
					SEND_ERR(PBSE_SYSTEM)
					goto done;
				}

#else	/* Unix/Linux */

				mode_t myumask = 0;
				char   maskbuf[22];
				mode_t j;
				int    e;

				if (is_jattr_set(pjob, JOB_ATR_umask)) {
					sprintf(maskbuf, "%ld", get_jattr_long(pjob, JOB_ATR_umask));
					sscanf(maskbuf, "%o", &j);
					myumask = umask(j);
				} else {
					myumask = umask(077);
				}

				e = mkjobdir(pjob->ji_qs.ji_jobid,
					jobdirname(pjob->ji_qs.ji_jobid,
					pjob->ji_grpcache->gc_homedir),
					pjob->ji_qs.ji_un.ji_momt.ji_exuid,
					pjob->ji_qs.ji_un.ji_momt.ji_exgid);
				if (myumask != 0)
					(void)umask(myumask);

				if (e != 0) {
					sprintf(log_buffer, "unable to create the job directory %s", jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir));
					log_err(errno, __func__, log_buffer);
					(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
					mom_deljob(pjob);
					SEND_ERR(PBSE_SYSTEM)
					goto done;
				}
#endif
			}

			sprintf(log_buffer, "JOIN_JOB as node %d", pjob->ji_nodeid);
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, log_buffer);
			/*
			 ** if certain resource limits require that the job usage be
			 ** polled, we link the job to mom_polljobs.
			 **
			 ** NOTE: we overload the job field ji_jobque for this as it
			 ** is not used otherwise by MOM
			 */
			if (mom_do_poll(pjob))
				append_link(&mom_polljobs, &pjob->ji_jobque, pjob);
			if (pbs_idx_insert(jobs_idx, pjob->ji_qs.ji_jobid, pjob) != PBS_IDX_RET_OK) {
				log_joberr(PBSE_INTERNAL, __func__, "Failed to add job in index during join job", pjob->ji_qs.ji_jobid);
				goto join_err;
			}
			append_link(&svr_alljobs, &pjob->ji_alljobs, pjob);

			/*
			 ** At this point, we have done all the job setup.
			 ** Any error from now on is a problem sending the
			 ** reply to MS.  We don't need to call SEND_ERR.
			 */
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS)
				goto join_err;
			/*
			 ** Here we need to call job_join_ack to send any extra
			 ** information with the reply to the JOIN request.
			 ** The format of the data sent by job_join_ack will
			 ** always have a version number as the first item
			 ** sent as an int.  The rest depends on job_join_ack
			 ** and will be defined in the function it points to.
			 */
			if (job_join_ack != NULL) {
				ret = job_join_ack(pjob, np, stream);
				if (ret != DIS_SUCCESS)
					goto join_err;
			}

			if (tpp_eom(stream) == -1)
				goto join_err;

			if (dis_flush(stream) == -1)
				goto join_err;

			goto fini;

join_err:
			log_err(errno, __func__, "tpp flush");
			(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
			tpp_close(stream);
			mom_deljob(pjob);
			goto fini;

		case IM_ALL_OKAY:
		case IM_ERROR:
		case IM_ERROR2:
			reply = 0;
			break;

		default:
			reply = 1;
			break;
	}

	np = NULL;
	/*
	 ** Check if job already exists.
	 */
	if ((pjob = find_job(jobid)) == NULL) {
		SEND_ERR(PBSE_JOBEXIST)
		goto done;
	}

	/* check cookie */
	if (!(is_jattr_set(pjob, JOB_ATR_Cookie))) {
		DBPRT(("%s: job %s has no cookie\n", __func__, jobid))
		SEND_ERR(PBSE_BADSTATE)
		goto done;
	}
	oreo = get_jattr_str(pjob, JOB_ATR_Cookie);
	if (strcmp(oreo, cookie) != 0) {
		DBPRT(("%s: job %s cookie %s message %s\n", __func__, jobid, oreo, cookie))
		SEND_ERR(PBSE_BADSTATE)
		goto done;
	}
	/*
	 ** This is some processing needed that is common between
	 ** both kinds of reply.
	 ** reply == 0 means that this message is a reply not a request
	 ** reply == 1 means that this is a request to which a reply may happen
	 */
	if (reply == 0) {
		for (nodeidx = 0; nodeidx < pjob->ji_numnodes; nodeidx++) {
			np = &pjob->ji_hosts[nodeidx];

			if (np->hn_stream == stream) {
				np->hn_eof_ts = 0; /* reset down timestamp */
				break;
			}
		}
		if (nodeidx == pjob->ji_numnodes) {
			if (pjob->ji_updated)  {
				/* since some of job's nodes have been released early,
				 * this looks like a stream from one of the
				 * released nodes.
				 */
				goto done;
			} else {
				sprintf(log_buffer,
					"stream %d not found to job nodes", stream);
				goto err;
			}
		}
		ep = (eventent *)GET_NEXT(np->hn_events);

		while (ep) {
			if (ep->ee_event == event &&
				ep->ee_taskid == fromtask)
				break;
			ep = (eventent *)GET_NEXT(ep->ee_next);
		}
		if (ep == NULL) {
			if (pjob->ji_updated)  {
				/* some of job's vnodes have been released early
				 * along with their associated tm_spawn events
				 */
				goto done;
			} else {
				sprintf(log_buffer, "event %d taskid %8.8X not found",
					event, fromtask);
				goto err;
			}
		}

		efd = ep->ee_fd;
		event_com = ep->ee_command;
		event_task = ep->ee_taskid;
		event_client = ep->ee_client;
		argv = ep->ee_argv;
		envp = ep->ee_envp;
		delete_link(&ep->ee_next);
		free(ep);
	}

	switch (command) {

		case	IM_KILL_JOB:
			/*
			 ** Sender is (must be) mom superior commanding me to kill a
			 ** job which I should be a part of.
			 ** Send a signal and set the jobstate to begin the
			 ** kill.  We wait for all tasks to exit before sending
			 ** an obit to mother superior.
			 **
			 ** auxiliary info (
			 **	none;
			 ** )
			 */
			if (check_ms(stream, pjob))
				goto fini;

			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;

			mom_hook_output_init(&hook_output);
			hook_output.reject_errcode = &hook_errcode;
			hook_output.last_phook = &last_phook;
			hook_output.fail_action = &hook_fail_action;
			if (mom_process_hooks(HOOK_EVENT_EXECJOB_PRETERM,
				PBS_MOM_SERVICE_NAME, mom_host, &hook_input,
				&hook_output,
				hook_msg, sizeof(hook_msg), 1) == 0) {

				SEND_ERR2(hook_errcode, (char *)hook_msg);
				goto done;	/* explicit reject - don't cancel */
			}


			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, "KILL_JOB received");
			/*
			 ** Send the jobs a signal but we have to wait to
			 ** do a reply to mother superior until the procs
			 ** die and are reaped.
			 */
			DBPRT(("%s: KILL_JOB %s\n", __func__, jobid))
			reply = 0;	/* reply will be deferred */
			kill_job(pjob, SIGKILL);
			set_job_substate(pjob, JOB_SUBSTATE_EXITING);
			set_job_state(pjob, JOB_STATE_LTR_EXITING);
			pjob->ji_obit = event;
			exiting_tasks = 1;

			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;

			mom_hook_output_init(&hook_output);
			hook_output.reject_errcode = &hook_errcode;
			hook_output.last_phook = &last_phook;
			hook_output.fail_action = &hook_fail_action;

			(void)mom_process_hooks(HOOK_EVENT_EXECJOB_EPILOGUE,
				PBS_MOM_SERVICE_NAME, mom_host, &hook_input,
				&hook_output, hook_msg, sizeof(hook_msg), 1);
			break;

		case	IM_DELETE_JOB:
		case	IM_DELETE_JOB2:
		case	IM_DELETE_JOB_REPLY:
			/*
			 ** Sender is (must be) mom superior commanding me to delete a
			 ** job which I should be a part of.  There is no reply for
			 ** IM_DELETE_JOB but is for IM_DELETE_JOB_REPLY.
			 **
			 ** auxiliary info (
			 **	none;
			 ** )
			 */
			DBPRT(("%s: %s for %s\n", __func__, command==IM_DELETE_JOB?"DELETE_JOB":"DELETE_JOB_REPLY", pjob->ji_qs.ji_jobid));

			if (check_ms(stream, pjob))
				goto fini;

 			if ((command == IM_DELETE_JOB) || (command == IM_DELETE_JOB_REPLY))
				/* For IM_DELETE_JOB_REPLY, it should be
				 * 'DELETE_JOB_REPLY received'
				 * but there are QA tests out there that
				 * already depend on the current message.
				 */
 				delete_job_msg = "DELETE_JOB received";
			else if (command == IM_DELETE_JOB2)
				delete_job_msg = "DELETE_JOB2 received";

			if (delete_job_msg != NULL)
				log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, delete_job_msg);

			if (pjob->ji_hook_running_bg_on)
				goto fini;

			kill_job(pjob, SIGKILL);	/* just in case */

			/* NULL value passed to hook_input.vnl 				*/
			/* means to assign vnode list using pjob->ji_host[].	    	*/

			if ((hook_input_ptr = (mom_hook_input_t *)malloc(
				sizeof(mom_hook_input_t))) == NULL) {
					log_err(errno, __func__, MALLOC_ERR_MSG);
					goto err;
			}
			mom_hook_input_init(hook_input_ptr);
			hook_input_ptr->pjob = pjob;

			if ((hook_output_ptr = (mom_hook_output_t *)malloc(
				sizeof(mom_hook_output_t))) == NULL) {
					log_err(errno, __func__, MALLOC_ERR_MSG);
					goto err;
			}
			mom_hook_output_init(hook_output_ptr);

			if ((hook_output_ptr->reject_errcode =
				(int *)malloc(sizeof(int))) == NULL) {
					log_err(errno, __func__, MALLOC_ERR_MSG);
					goto err;
			}
			memset(hook_output_ptr->reject_errcode, 0, sizeof(int));

			pjob->ji_postevent = event;
			pjob->ji_taskid = fromtask;

			if (mom_process_hooks(
				(command == IM_DELETE_JOB2)?
					HOOK_EVENT_EXECJOB_EPILOGUE:
					HOOK_EVENT_EXECJOB_END,
				PBS_MOM_SERVICE_NAME, mom_host, hook_input_ptr,
				hook_output_ptr, NULL, 0, 1) ==
						HOOK_RUNNING_IN_BACKGROUND) {
					pjob->ji_hook_running_bg_on = (command == IM_DELETE_JOB)? BG_IM_DELETE_JOB: BG_IM_DELETE_JOB_REPLY;
					break;
				}

			if (command == IM_DELETE_JOB_REPLY) {
				mom_deljob(pjob);
				ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
					event, fromtask, IM_OLD_PROTOCOL_VER);
				reply = 1;
			} else if (command == IM_DELETE_JOB2) {
				job *pjob2 = NULL;
				long runver;

				ret = im_compose(stream, jobid, cookie, IM_SEND_RESC,
					event, fromtask, IM_OLD_PROTOCOL_VER);

				/* Send the information tallied for the job. */
				ret = diswst(stream, mom_host);
				BAIL("mom_host")
				ret = diswul(stream, resc_used(pjob, "cput",
								gettime));
				BAIL("resources_used.cput")
				ret = diswul(stream, resc_used(pjob, "mem",
								getsize));
				BAIL("resources_used.mem")
				ret = diswul(stream, resc_used(pjob,
						"cpupercent", gettime));
				BAIL("resources_used.cpupercent")
				if (is_jattr_set(pjob, JOB_ATR_run_version))
					runver = get_jattr_long(pjob, JOB_ATR_run_version);
				else
					runver = get_jattr_long(pjob, JOB_ATR_runcount);

				/* Call the execjob_end hook now */
				if (mom_process_hooks(HOOK_EVENT_EXECJOB_END, PBS_MOM_SERVICE_NAME, mom_host, hook_input_ptr,
						hook_output_ptr, NULL, 0, 1) == HOOK_RUNNING_IN_BACKGROUND) {
						pjob->ji_hook_running_bg_on = BG_IM_DELETE_JOB2;
						break;
				}
				mom_deljob(pjob);

				/* Needed to create a lightweight copy of the job to
				 * contain only the jobid info, so I can just call
				 * new_job_action() to create a JOB_ACT_REQ_DEALLOCATE
				 * request. Can't use the original 'pjob' structure as
				 * before creating the request, the real job should have
				 * been deleted already.
				 */
				if ((pjob2 = job_alloc()) != NULL) {
					pbs_strncpy(pjob2->ji_qs.ji_jobid, jobid, sizeof(pjob2->ji_qs.ji_jobid));
					set_jattr_l_slim(pjob2, JOB_ATR_run_version, runver, SET);
				 	/* JOB_ACT_REQ_DEALLOCATE request will tell the
				 	 * the server that this mom has completely deleted the
				 	 * job and now the server can officially free up the
					 * job from the nodes managed by this mom, allowing
					 * other jobs to run.
				 	 */
					new_job_action_req(pjob2, HOOK_PBSADMIN, JOB_ACT_REQ_DEALLOCATE);
					job_free(pjob2);
				}

				reply = 1;
			} else {
				mom_deljob(pjob);
				reply = 0;
			}
			free(hook_input_ptr);
			if (hook_output_ptr) {
				free(hook_output_ptr->reject_errcode);
				free(hook_output_ptr);
			}
			break;

		case	IM_EXEC_PROLOGUE:
			/*
			 ** Sender is (must be) mom superior commanding me to execute
			 ** a prologue hook.
			 */
			DBPRT(("%s: %s for %s\n", __func__, "IM_EXEC_PROLOGUE", pjob->ji_qs.ji_jobid));
			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;

			mom_hook_output_init(&hook_output);
			hook_output.reject_errcode = &hook_errcode;
			hook_output.last_phook = &last_phook;
			hook_output.fail_action = &hook_fail_action;

			switch (hook_rc = mom_process_hooks(HOOK_EVENT_EXECJOB_PROLOGUE,
						PBS_MOM_SERVICE_NAME,
						mom_host, &hook_input, &hook_output,
						hook_msg, sizeof(hook_msg), 1)) {

				case 1: /* explicit accept */
				case 2:	/* no hook script executed - go ahead and accept event*/
					ret = im_compose(stream, jobid, cookie,
							IM_ALL_OKAY,
							event, fromtask, IM_OLD_PROTOCOL_VER);
					if (ret != DIS_SUCCESS)
						goto err;
					break;
				default:
					/* a value of '0' means explicit reject encountered. */
					if (hook_rc != 0) {
						/* we've hit an internal error (malloc error, full disk, etc...), so */
						/* treat this now like a  hook error so hook fail_action  */
						/* will be consulted.  */
						/* Before, behavior of an internal error was to ignore it! */
						hook_errcode = PBSE_HOOKERROR;
					}
					SEND_ERR2(hook_errcode, (char *)hook_msg);
					if (hook_errcode == PBSE_HOOKERROR)
					    send_hook_fail_action(last_phook);
			}
			break;

		case	IM_SPAWN_TASK:
			/*
			 ** Sender is a MOM in a job that wants to start a task.
			 ** I am MOM on the node that is to run the task.
			 **
			 ** auxiliary info (
			 **	parent vnode	tm_node_id
			 **	target vnode	tm_node_id
			 **	task id		tm_task_id (not used)
			 **	argv 0		string
			 **	...
			 **	argv n		string
			 **	null
			 **	envp 0		string
			 **	...
			 **	envp m		string
			 ** )
			 */
			pvnodeid = disrsi(stream, &ret);
			BAIL("SPAWN_TASK pvnodeid")

			if ((np = find_node(pjob, stream, pvnodeid)) == NULL) {
				SEND_ERR(PBSE_BADHOST)
				break;
			}
			tvnodeid = disrsi(stream, &ret);
			BAIL("SPAWN_TASK tvnodeid")
			taskid = disrui(stream, &ret);
			BAIL("SPAWN_TASK taskid")
			DBPRT(("%s: SPAWN_TASK %s parent %d target %d taskid %u\n",
				__func__, jobid, pvnodeid, tvnodeid, taskid))

			/*
			 **	The target node must be here.
			 */
			if (pjob->ji_nodeid != TO_PHYNODE(tvnodeid)) {
				SEND_ERR(PBSE_INTERNAL)
				break;
			}

			if( version == IM_OLD_PROTOCOL_VER) {
				/*
				 * The arg list is ended by an empty (zero length)
				 * string.
				 */
				num = 4;
				argv = (char **)calloc(sizeof(char *), num);
				assert(argv);
				for (i=0;; i++) {
					if ((cp = disrst(stream, &ret)) == NULL)
						break;
					if (ret != DIS_SUCCESS)
						break;
					if (*cp == '\0') {
						/* got a empty string, end of args lits */
						free(cp);
						break;
					}
					if (i == num-1) {
						num *= 2;
						argv = (char **)realloc(argv,
						num*sizeof(char *));
						assert(argv);
					}
					argv[i] = cp;
				}
			} else {
			  	argc = disrui(stream, &ret);
				if (ret != DIS_SUCCESS) {
					sprintf(log_buffer, "SPAWN_TASK read of argc");
					goto err;
				}
				argv = (char **)calloc(argc+1, sizeof(char *));
				assert(argv);
				for (i=0; i<argc; i++) {
					argv[i] = disrst(stream, &ret);
					if (ret != DIS_SUCCESS)
						break;
				}
			}
			argv[i] = NULL;
			if (ret != DIS_SUCCESS) {
				arrayfree(argv);
				sprintf(log_buffer, "SPAWN_TASK read of argv array");
				goto err;
			}

			num = 8;
			envp = (char **)calloc(sizeof(char *), num);
			assert(envp);
			for (i=0;; i++) {
				if ((cp = disrst(stream, &ret)) == NULL)
					break;
				if (ret != DIS_SUCCESS)
					break;
				if (*cp == '\0') {
					free(cp);
					break;
				}
				if (i == num-1) {
					num *= 2;
					envp = (char **)realloc(envp,
						num*sizeof(char *));
					assert(envp);
				}
				envp[i] = cp;
			}
			envp[i] = NULL;
			if (ret != DIS_EOD) {
				arrayfree(argv);
				arrayfree(envp);
				sprintf(log_buffer, "SPAWN_TASK read of envp array");
				goto err;
			}
#ifdef PMIX
			pbs_pmix_register_client(pjob, tvnodeid, &envp);
#endif
			ret = DIS_SUCCESS;
			if ((ptask = momtask_create(pjob)) == NULL) {
				SEND_ERR(PBSE_SYSTEM);
				arrayfree(argv);
				arrayfree(envp);
				break;
			}
			strcpy(ptask->ti_qs.ti_parentjobid, jobid);
			ptask->ti_qs.ti_parentnode = pvnodeid;
			ptask->ti_qs.ti_myvnode    = tvnodeid;
			ptask->ti_qs.ti_parenttask = fromtask;
			if (task_save(ptask) == -1) {
				SEND_ERR(PBSE_SYSTEM)
				arrayfree(argv);
				arrayfree(envp);
				break;
			}
			errcode = start_process(ptask, argv, envp, false);
			if (errcode != PBSE_NONE) {
				SEND_ERR(errcode)
			}
			else {
				ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
					event, fromtask, IM_OLD_PROTOCOL_VER);
				if (ret != DIS_SUCCESS)
					break;
				ret = diswui(stream, ptask->ti_qs.ti_task);
			}

			arrayfree(argv);
			arrayfree(envp);
			break;

		case	IM_GET_TASKS:
			/*
			 ** Sender is MOM which controls a task that wants to get
			 ** the list of tasks running here.
			 **
			 ** auxiliary info (
			 **	sending node	tm_node_id;
			 **	target node	tm_node_id;
			 ** )
			 */
			pvnodeid = disrsi(stream, &ret);
			BAIL("GET_TASKS pvnodeid")
			tvnodeid = disrsi(stream, &ret);
			BAIL("GET_TASKS tvnodeid")
			DBPRT(("%s: GET_TASKS %s from node %d to node %d\n",
				__func__, jobid, pvnodeid, tvnodeid))
			if ((np = find_node(pjob, stream, pvnodeid)) == NULL) {
				SEND_ERR(PBSE_BADHOST)
				break;
			}

			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, "GET_TASKS received");
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS)
				break;
			for (ptask=(pbs_task *)GET_NEXT(pjob->ji_tasks);
				ptask;
				ptask=(pbs_task *)GET_NEXT(ptask->ti_jobtask)) {
				if (ptask->ti_qs.ti_myvnode == tvnodeid) {
					ret = diswui(stream, ptask->ti_qs.ti_task);
					if (ret != DIS_SUCCESS)
						break;
				}
			}
			break;

		case	IM_SIGNAL_TASK:
			/*
			 ** Sender is MOM sending a task and signal to
			 ** deliver.
			 **
			 ** auxiliary info (
			 **	sending node	tm_node_id;
			 **	taskid		tm_task_id;
			 **	signal		int;
			 ** )
			 */
			pvnodeid = disrsi(stream, &ret);
			BAIL("SIGNAL_TASK pvnodeid")
			if ((np = find_node(pjob, stream, pvnodeid)) == NULL) {
				SEND_ERR(PBSE_BADHOST)
				break;
			}
			taskid = disrui(stream, &ret);
			BAIL("SIGNAL_TASK taskit")
			sig = disrsi(stream, &ret);
			BAIL("SIGNAL_TASK signum")
			DBPRT(("%s: SIGNAL_TASK %s fromnode %d task %8.8X sig %d\n",
				__func__, jobid, pvnodeid, taskid, sig))
			ptask = task_find(pjob, taskid);
			if (ptask == NULL) {
				SEND_ERR(PBSE_JOBEXIST)
				break;
			}
			sprintf(log_buffer, "SIGNAL_TASK %8.8X sig %d", taskid, sig);
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, log_buffer);
			kill_task(ptask, sig, 0);
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			break;

		case	IM_OBIT_TASK:
			/*
			 ** Sender is MOM sending a request to monitor a
			 ** task for exit.
			 **
			 ** auxiliary info (
			 **	sending node	tm_node_id;
			 **	taskid		tm_task_id;
			 ** )
			 */
			pvnodeid = disrsi(stream, &ret);
			BAIL("OBIT_TASK pvnodeid")
			if ((np = find_node(pjob, stream, pvnodeid)) == NULL) {
				SEND_ERR(PBSE_BADHOST)
				break;
			}
			taskid = disrui(stream, &ret);
			BAIL("OBIT_TASK taskid")
			ptask = task_find(pjob, taskid);
			if (ptask == NULL) {
				SEND_ERR(PBSE_JOBEXIST)
				break;
			}
			DBPRT(("%s: OBIT_TASK %s from node %d task %8.8X\n", __func__,
				jobid, pvnodeid, taskid))
			if (ptask->ti_qs.ti_status >= TI_STATE_EXITED) {
				ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
					event, fromtask, IM_OLD_PROTOCOL_VER);
				if (ret != DIS_SUCCESS)
					break;
				ret = diswsi(stream, ptask->ti_qs.ti_exitstat);
			}
			else {	/* save obit request with task */
				obitent	*op = (obitent *)malloc(sizeof(obitent));
				assert(op);
				CLEAR_LINK(op->oe_next);
				append_link(&ptask->ti_obits, &op->oe_next, op);
				op->oe_type = OBIT_TYPE_TMEVENT;
				op->oe_u.oe_tm.oe_fd = -1;
				op->oe_u.oe_tm.oe_node = pvnodeid;
				op->oe_u.oe_tm.oe_event = event;
				op->oe_u.oe_tm.oe_taskid = fromtask;
				task_save(ptask);
				reply = 0;
			}
			break;

		case	IM_GET_INFO:
			/*
			 ** Sender is MOM sending a task and name to lookup
			 ** for info to report back.
			 **
			 ** auxiliary info (
			 **	sending node	tm_node_id;
			 **	taskid		tm_task_id;
			 **	name		string;
			 ** )
			 */
			pvnodeid = disrsi(stream, &ret);
			BAIL("GET_INFO pvnodeid")
			if ((np = find_node(pjob, stream, pvnodeid)) == NULL) {
				SEND_ERR(PBSE_BADHOST)
				break;
			}
			taskid = disrui(stream, &ret);
			BAIL("GET_INFO taskid")
			ptask = task_find(pjob, taskid);
			if (ptask == NULL) {
				SEND_ERR(PBSE_JOBEXIST)
				break;
			}
			name = disrst(stream, &ret);
			BAIL("GET_INFO name")
			DBPRT(("%s: GET_INFO %s from node %d task %8.8X name %s\n",
				__func__, jobid, pvnodeid, taskid, name))
			if ((ip = task_findinfo(ptask, name)) == NULL) {
				SEND_ERR(PBSE_JOBEXIST)
				break;
			}
			sprintf(log_buffer, "GET_INFO task %8.8X name %s",
				taskid, name);
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, log_buffer);
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS)
				break;
			ret = diswcs(stream, ip->ie_info, ip->ie_len);
			break;

		case	IM_GET_RESC:
			/*
			 ** Sender is MOM requesting resource info to
			 ** report back its client.
			 **
			 ** auxiliary info (
			 **	sending node	tm_node_id;
			 ** )
			 */
			pvnodeid = disrsi(stream, &ret);
			BAIL("GET_RESC pvnodeid")
			if ((np = find_node(pjob, stream, pvnodeid)) == NULL) {
				SEND_ERR(PBSE_BADHOST)
				break;
			}
			DBPRT(("%s: GET_RESC %s from node %d\n", __func__, jobid, pvnodeid))
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, "GET_RESC received");
			info = resc_string(pjob);
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS)
				break;
			ret = diswst(stream, info);
			break;

		case	IM_POLL_JOB:
			/*
			 ** Sender is (must be) mom superior commanding me to send
			 ** information for a job which I should be a part of.
			 **
			 ** auxiliary info (
			 **	none;
			 ** )
			 */

			if (QA_testing != 0) {		/* for QA Testing only */
				if (QA_testing & PBSQA_POLLJOB_CRASH)
					exit(98);
				else if (QA_testing & PBSQA_POLLJOB_SLEEP)
					sleep(90);
			}

			if (check_ms(stream, pjob))
				goto fini;
			pjob->ji_polltime = time_now;
			DBPRT(("%s: POLL_JOB %s\n", __func__, jobid))
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS)
				break;
			/*
			 ** Now comes a recomendation for killing the job.
			 */
			exitval = (pjob->ji_qs.ji_svrflags &
				(JOB_SVFLG_OVERLMT1|JOB_SVFLG_OVERLMT2)) ? 1 : 0;
			ret = diswsi(stream, exitval);
			if (ret != DIS_SUCCESS)
				break;
			/*
			 ** Send the information tallyed for the job.
			 */
			ret = diswul(stream, resc_used(pjob, "cput", gettime));
			if (ret != DIS_SUCCESS)
				break;
			ret = diswul(stream, resc_used(pjob, "mem", getsize));
			if (ret != DIS_SUCCESS)
				break;
			ret = diswul(stream, resc_used(pjob, "cpupercent", gettime));

			send_resc_used_to_ms(stream, pjob);
			break;

#ifdef PMIX
		case	IM_PMIX:
			/*
			 * Sender is MOM requesting a PMIX operation
			 * be carried out.
			 *
			 * auxiliary info (
			 *	sending node	tm_node_id;
			 *	taskid		tm_task_id;
			 *	operation	string;
			 * )
			 */
			sprintf(log_buffer, "IM_PMIX request received");
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, log_buffer);
			/* TODO: Read aux info, processes request, and send response */
			sprintf(log_buffer, "Handle IM_PMIX request");
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, log_buffer);
			sprintf(log_buffer, "IM_PMIX replying IM_ALL_OK");
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, log_buffer);
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS)
				break;
			break;
#endif

		case	IM_SUSPEND:
		case	IM_RESUME:
			/*
			 ** Sender is (must be) mom superior commanding me to do
			 ** a suspend or resume of all local tasks for a job which
			 ** I should be a part of.
			 **
			 ** auxiliary info (
			 **	none;
			 ** )
			 */
			if (check_ms(stream, pjob))
				goto fini;
			DBPRT(("%s: %s %s\n", __func__, (command == IM_SUSPEND) ?
				"SUSPEND" : "RESUME", jobid))
			sprintf(log_buffer, "%s received", (command == IM_SUSPEND) ?
				"SUSPEND" : "RESUME");
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, log_buffer);
			if ((errcode = local_supres(pjob,
				(command == IM_SUSPEND) ? 1 : 0, NULL))
				!= PBSE_NONE) {
				SEND_ERR(errcode);
				break;
			}
			pjob->ji_mompost = (command == IM_SUSPEND) ?
				post_suspend : post_resume;
			/*
			 ** If a child was started to handle the operation,
			 ** wait to reply until the kid returns.
			 */
			if (pjob->ji_momsubt) {
				reply = 0;
				pjob->ji_postevent = event;
				pjob->ji_taskid = fromtask;
				break;
			}

			pjob->ji_mompost(pjob, PBSE_NONE);
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			break;

		case	IM_RESTART:
			/*
			 ** Sender is (must be) mom superior commanding me to do
			 ** a restart of all local tasks for a job which
			 ** I should be a part of.
			 **
			 ** auxiliary info (
			 **	none;
			 ** )
			 */
			if (check_ms(stream, pjob))
				goto fini;
			DBPRT(("%s: RESTART %s\n", __func__, jobid))
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, "RESTART received");

			/*
			 * NULL value passed to hook_input.vnl means to assign
			 * vnode list using pjob->ji_host[].
			 */
			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;
			mom_hook_output_init(&hook_output);
			hook_output.reject_errcode = &hook_errcode;
			hook_output.last_phook = &last_phook;
			hook_output.fail_action = &hook_fail_action;

			hook_rc=mom_process_hooks(HOOK_EVENT_EXECJOB_BEGIN,
					PBS_MOM_SERVICE_NAME, mom_host,
					&hook_input, &hook_output,
					hook_msg, sizeof(hook_msg), 1);
			if (hook_rc <= 0) {
				/* a value of '0' means explicit reject encountered. */
				if (hook_rc != 0) {
					/*
					 * we've hit an internal error (malloc error, full disk, etc...), so
					 * treat this now like a  hook error so hook fail_action will be consulted.
					 * Before, behavior of an internal error was to ignore it!
					 */
					hook_errcode = PBSE_HOOKERROR;
					send_hook_fail_action(last_phook);
				}
				SEND_ERR2(hook_errcode, (char *)hook_msg);
				mom_deljob(pjob);
				break;
			}

			errcode = local_restart(pjob, NULL);

			if (errcode != PBSE_NONE) {	/* error, send reply */
				SEND_ERR(errcode);
				break;
			}

			/*
			 ** If a child was started to handle the operation,
			 ** wait to reply until the kid returns.
			 */
			if (pjob->ji_momsubt) {
				reply = 0;
				pjob->ji_postevent = event;
				pjob->ji_taskid = fromtask;
				break;
			}

			post_restart(pjob, PBSE_NONE);
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			break;


		case	IM_CHECKPOINT:
		case	IM_CHECKPOINT_ABORT:
			/*
			 ** Sender is (must be) mom superior commanding me to do
			 ** a checkpoint of all local tasks for a job which
			 ** I should be a part of.
			 **
			 ** auxiliary info (
			 **	none;
			 ** )
			 */
			if (check_ms(stream, pjob))
				goto fini;
			DBPRT(("%s: %s %s\n", __func__,
				(command == IM_CHECKPOINT) ? "CHECKPOINT" :
				"CHECKPOINT_ABORT", jobid))
			sprintf(log_buffer, "%s received",
				(command == IM_CHECKPOINT) ?
				"CHECKPOINT" : "CHECKPOINT_ABORT");
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, log_buffer);

			errcode = local_checkpoint(pjob,
				(command == IM_CHECKPOINT) ? 0 : 1, NULL);

			if (errcode != PBSE_NONE) {	/* error, send reply */
				SEND_ERR(errcode);
				break;
			}

			/*
			 ** If a child was started to handle the operation,
			 ** wait to reply until the kid returns.
			 */
			if (pjob->ji_momsubt) {
				reply = 0;
				pjob->ji_postevent = event;
				pjob->ji_taskid = fromtask;
				break;
			}

			post_chkpt(pjob, PBSE_NONE);
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			break;

		case	IM_ABORT_JOB:
			/*
			 ** Sender is (must be) mom superior commanding me to
			 ** abort a JOIN_JOB or RESTART request.
			 **
			 ** auxiliary info (
			 **	none;
			 ** )
			 */
			if (check_ms(stream, pjob))
				goto fini;
			DBPRT(("%s: ABORT_JOB %s\n", __func__, jobid))
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, "ABORT_JOB received");
			reply = 0;
			if (pjob->ji_qs.ji_svrflags &
				(JOB_SVFLG_CHKPT|JOB_SVFLG_ChkptMig)) {
				kill_job(pjob, SIGKILL);	/* is this right? */
			} else {
				mom_hook_input_init(&hook_input);
				hook_input.pjob = pjob;

				mom_hook_output_init(&hook_output);
				hook_output.reject_errcode = &hook_errcode;
				hook_output.last_phook = &last_phook;
				hook_output.fail_action = &hook_fail_action;
				(void)mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT, PBS_MOM_SERVICE_NAME, mom_host, &hook_input, &hook_output, hook_msg, sizeof(hook_msg), 1);
				mom_deljob(pjob);
			}
			break;

		case	IM_REQUEUE:
			/*
			 ** Sender is another MOM telling me that she has gone
			 ** keyboard busy and that a job needs to be requeued
			 **
			 ** auxiliary info (
			 **	none;
			 ** )
			 */
			DBPRT(("%s: IM_REQUEUE job %s\n", __func__, jobid))
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, "REQUEUE received");
			reply = 0;
			if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) != 0) {
				pjob->ji_qs.ji_un.ji_momt.ji_exitstat = JOB_EXEC_RERUN;
				(void)kill_job(pjob, SIGKILL);
				/* Server will decide if job is rerunnable or not */
			}
			break;

		case	IM_SETUP_JOB:
			/*
			 ** Sender is (must be) mom superior sending me setup
			 ** information to complete a JOIN_JOB or RESTART request.
			 **
			 ** auxiliary info (
			 **	identity	int
			 **	... dependent
			 ** )
			 */
			if (check_ms(stream, pjob))
				goto fini;
			DBPRT(("%s: SETUP_JOB %s\n", __func__, jobid))
			log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
				jobid, "SETUP_JOB received");

			/*
			 ** If there is a job_setup_final, call it,
			 ** otherwise, send the "not supported" error.
			 */
			if (job_setup_final == NULL) {
				SEND_ERR(PBSE_NOSUP);
				break;
			}

			errcode = job_setup_final(pjob, stream);
			if (errcode != PBSE_NONE) {	/* error, send reply */
				SEND_ERR(errcode);
				break;
			}
			ret = im_compose(stream, jobid, cookie, IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			break;

		case	IM_ALL_OKAY:		/* this is a REPLY */
			/*
			 ** Sender is another MOM telling me that a request has
			 ** completed just dandy.
			 */
			switch (event_com) {

				case	IM_JOIN_JOB:
					/*
					 ** Sender is one of the sisterhood saying she
					 ** got the job structure sent and she accepts it.
					 ** I'm mother superior.
					 **
					 ** auxiliary info (
					 **	optional;
					 ** )
					 */
					if ((nodeidx > 0) &&
					    (nodeidx < pjob->ji_numnodes) &&
					    ((nodeidx-1) < pjob->ji_numrescs) &&
  					    (pjob->ji_resources[nodeidx-1].nodehost == NULL))
						pjob->ji_resources[nodeidx-1].nodehost = strdup(pjob->ji_hosts[nodeidx].hn_host);

					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"got JOIN_JOB OKAY and I'm not MS");
						goto err;
					}
					DBPRT(("%s: JOIN_JOB %s OKAY\n", __func__, jobid))

					/*
					 ** If job_join_read exists, call it to read
					 ** any extra info included with the JOIN reply.
					 ** This function can return an error if there
					 ** is no extra information or deal with it
					 ** more gracefully and return SUCCESS.
					 */
					if (job_join_read != NULL) {
						/* on error, log_message set */
						ret = job_join_read(pjob, np, stream);
						if (ret != DIS_SUCCESS)
							goto err;
					}

					for (i=0; i<pjob->ji_numnodes; i++) {
						hnodent *xp = &pjob->ji_hosts[i];
						if ((ep = (eventent *)GET_NEXT(xp->hn_events))
							!= NULL)
							break;
					}

					if (do_tolerate_node_failures(pjob) &&
					    (nodeidx > 0) && (nodeidx < pjob->ji_numnodes)) {
						reliable_job_node_add(&pjob->ji_node_list, pjob->ji_hosts[nodeidx].hn_host);
					}

					if (ep == NULL) {	/* no events */
						int rcode;
						int do_break = 0;
						/*
						 * All the JOIN messages have come in.
						 * Call job_join_extra for local MS setup.
						 */
						rcode = pre_finish_exec(pjob, 1);
						switch (rcode) {
						  case PRE_FINISH_SUCCESS_JOB_SETUP_SEND:
							do_break = 1;
							break;
						  case PRE_FINISH_FAIL_JOIN_EXTRA:
							goto done;
						  case PRE_FINISH_FAIL_JOB_SETUP_SEND:
							sprintf(log_buffer, "could not send setup");
							goto err;
						  case PRE_FINISH_FAIL:
							goto err;
						}
						if (do_break)
							break;
						/*
						 ** At this point, we are ready to call
						 ** finish_exec and launch the job.
						 */
 						if (!do_tolerate_node_failures(pjob) || (check_job_substate(pjob, JOB_SUBSTATE_WAITING_JOIN_JOB))) {
							if (check_job_substate(pjob, JOB_SUBSTATE_WAITING_JOIN_JOB)) {
								set_job_substate(pjob, JOB_SUBSTATE_PRERUN);
								job_save(pjob);
							}
							finish_exec(pjob);
							log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
						}
					}
					break;

				case	IM_SETUP_JOB:
					/*
					 ** Sender is one of the sisterhood saying she
					 ** did the job setup step.
					 ** I'm mother superior.
					 **
					 ** auxiliary info (
					 **	none;
					 ** )
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"got JOIN_JOB OKAY and I'm not MS");
						goto err;
					}
					DBPRT(("%s: SETUP_JOB %s from %s OKAY\n", __func__,
						jobid, np->hn_host))
					for (i=0; i<pjob->ji_numnodes; i++) {
						np = &pjob->ji_hosts[i];
						if ((ep = (eventent *)GET_NEXT(np->hn_events))
							!= NULL)
							break;
					}

					if (ep == NULL) {	/* all SETUPs done */
						/*
						 ** Call finish_exec.  The MS call to
						 ** job_setup_final is done in job_setup.
						 */
						finish_exec(pjob);
						log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB,
							LOG_DEBUG,
							pjob->ji_qs.ji_jobid, log_buffer);
					}
					break;

				case	IM_SUSPEND:
				case	IM_RESUME:
					/*
					 ** Sender is one of the sisterhood saying she
					 ** did a suspend or resume.
					 ** I'm mother superior.
					 **
					 ** auxiliary info (
					 **	none;
					 ** )
					 */
					name = (event_com == IM_SUSPEND) ?
						"SUSPEND" : "RESUME";
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"got %s OKAY and I'm not MS", name);
						goto err;
					}
					DBPRT(("%s: %s %s OKAY\n", __func__, jobid, name))

					if (pjob->ji_mompost != NULL)
						pjob->ji_mompost(pjob, PBSE_NONE);
					break;

				case	IM_RESTART:
				case	IM_CHECKPOINT:
				case	IM_CHECKPOINT_ABORT:
					/*
					 ** Sender is one of the sisterhood saying she
					 ** did a checkpoint or restart.
					 ** I'm mother superior.
					 **
					 ** auxiliary info (
					 **	none;
					 ** )
					 */
					name = (event_com == IM_RESTART) ? "RESTART" :
						(event_com == IM_CHECKPOINT) ?
						"CHECKPOINT" : "CHECKPOINT_ABORT";
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"got %s OKAY and I'm not MS", name);
						goto err;
					}

					DBPRT(("%s: %s %s OKAY\n", __func__, jobid, name))
					if (pjob->ji_mompost != NULL)
						pjob->ji_mompost(pjob, PBSE_NONE);
					break;

				case	IM_KILL_JOB:
					/*
					 ** Sender is sending a response that a job
					 ** which needs to die has been given the ax.
					 ** I'm mother superior.
					 **
					 ** auxiliary info (
					 **	cput	int;
					 **	mem	int;
					 **	cpupercent int;
					 ** )
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer, "got KILL_JOB OKAY and I'm not MS");
						goto err;
					}
					DBPRT(("%s: KILL_JOB %s OKAY\n", __func__, jobid))

					pjob->ji_resources[nodeidx - 1].nr_cput = disrul(stream, &ret);
					BAIL("OK-KILL_JOB cput")
					pjob->ji_resources[nodeidx - 1].nr_mem = disrul(stream, &ret);
					BAIL("OK-KILL_JOB mem")
					pjob->ji_resources[nodeidx - 1].nr_cpupercent = disrul(stream, &ret);
					BAIL("OK-KILL_JOB cpupercent")

					DBPRT(("%s: %s FINAL from %d cpu %lu sec mem %lu kb\n",
					       __func__, jobid, nodeidx,
					       pjob->ji_resources[nodeidx - 1].nr_cput,
					       pjob->ji_resources[nodeidx - 1].nr_mem))
					recv_resc_used_from_sister(stream, pjob, nodeidx - 1);

					/* don't close stream in case other jobs use it */
					np->hn_sister = SISTER_KILLDONE;
					for (i = 1; i < pjob->ji_numnodes; i++) {
						if (reliable_job_node_find(&pjob->ji_failed_node_list, pjob->ji_hosts[i].hn_host) == NULL &&
						    pjob->ji_hosts[i].hn_sister == SISTER_OKAY)
							break;
					}
					if (i == pjob->ji_numnodes) { /* all dead */
						DBPRT(("%s: ALL DONE, set EXITING job %s\n", __func__, jobid))
						if (check_job_substate(pjob, JOB_SUBSTATE_KILLSIS)) {
							set_job_state(pjob, JOB_STATE_LTR_EXITING);
							set_job_substate(pjob, JOB_SUBSTATE_EXITING);
							exiting_tasks = 1;
						}
					}
					break;

				case	IM_DELETE_JOB_REPLY:
					/*
					 ** Sender is MOM responding to a "delete job and reply"
					 ** request.
					 **
					 ** auxiliary info - none
					 */
					DBPRT(("%s: reply for DELETE_JOB_REPLY %s received from %s\n", __func__, pjob->ji_qs.ji_jobid, np->hn_host))
					np->hn_sister = SISTER_KILLDONE;
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"got DELETE_JOB_REPLY OKAY and I'm not MS");
						goto err;
					}
					DBPRT(("%s: DELETE_JOB_REPLY %s OKAY\n", __func__, jobid))
					chk_del_job(pjob, 0);
					break;

				case	IM_SPAWN_TASK:
					/*
					 ** Sender is MOM responding to a "spawn_task"
					 ** request.
					 **
					 ** auxiliary info (
					 **	task id		tm_task_id;
					 ** )
					 */
					taskid = disrui(stream, &ret);
					BAIL("OK-SPAWN taskid")
					DBPRT(("%s: SPAWN_TASK %s OKAY task %8.8X\n",
						__func__, jobid, taskid))
					ptask = task_check(pjob, efd, event_task);
					if (ptask == NULL)
						break;
					(void)tm_reply(efd, ptask->ti_protover,
						TM_OKAY, event_client);
					(void)diswui(efd, taskid);
					(void)dis_flush(efd);
					break;

				case	IM_GET_TASKS:
					/*
					 ** Sender is MOM giving a list of tasks which she
					 ** has started for this job.
					 **
					 ** auxiliary info (
					 **	task id		tm_task_id;
					 **	...
					 **	task id		tm_task_id;
					 ** )
					 */
					DBPRT(("%s: GET_TASKS %s OKAY\n", __func__, jobid))
					ptask = task_check(pjob, efd, event_task);
					if (ptask == NULL)
						break;
					(void)tm_reply(efd, ptask->ti_protover,
						TM_OKAY, event_client);
					for (;;) {
						DIS_tpp_funcs();
						taskid = disrui(stream, &ret);
						if (ret != DIS_SUCCESS) {
							if (ret == DIS_EOD)
								break;
							else {
								sprintf(log_buffer,
									bail_format,
									"OK-GET_TASK idlist");
								goto err;
							}
						}
						DIS_tcp_funcs();
						(void)diswui(efd, taskid);
					}
					DIS_tcp_funcs();
					(void)diswui(efd, TM_NULL_TASK);
					(void)dis_flush(efd);
					break;

				case	IM_SIGNAL_TASK:
					/*
					 ** Sender is MOM with a good signal to report.
					 **
					 ** auxiliary info (
					 **	none;
					 ** )
					 */
					DBPRT(("%s: %s SIGNAL_TASK %8.8X OKAY\n",
						__func__, jobid, event_task))
					ptask = task_check(pjob, efd, event_task);
					if (ptask == NULL)
						break;
					(void)tm_reply(efd, ptask->ti_protover,
						TM_OKAY, event_client);
					(void)dis_flush(efd);
					break;

				case	IM_OBIT_TASK:
					/*
					 ** Sender is MOM with a death report.
					 **
					 ** auxiliary info (
					 **	exit value	int;
					 ** )
					 */
					exitval = disrsi(stream, &ret);
					BAIL("OK-OBIT_TASK exitval")
					DBPRT(("%s: %s OBIT_TASK %8.8X OKAY exit val %d\n",
						__func__, jobid, event_task, exitval))
					ptask = task_check(pjob, efd, event_task);
					if (ptask == NULL)
						break;
					(void)tm_reply(efd, ptask->ti_protover,
						TM_OKAY, event_client);
					(void)diswsi(efd, exitval);
					(void)dis_flush(efd);
					break;

				case	IM_GET_INFO:
					/*
					 ** Sender is MOM with a named info to report.
					 **
					 ** auxiliary info (
					 **	info		counted string;
					 ** )
					 */
					info = disrcs(stream, &len, &ret);
					BAIL("OK-GET_INFO info")
					DBPRT(("%s: %s GET_INFO %8.8X OKAY\n",
						__func__, jobid, event_task))
					ptask = task_check(pjob, efd, event_task);
					if (ptask == NULL)
						break;

					(void)tm_reply(efd, ptask->ti_protover,
						TM_OKAY, event_client);
					(void)diswcs(efd, info, len);
					(void)dis_flush(efd);
					break;

				case	IM_GET_RESC:
					/*
					 ** Sender is MOM with a resource info to report.
					 **
					 ** auxiliary info (
					 **	info		counted string;
					 ** )
					 */
					info = disrst(stream, &ret);
					BAIL("OK-GET_RESC info")
					DBPRT(("%s: %s GET_RESC %8.8X OKAY\n",
						__func__, jobid, event_task))
					ptask = task_check(pjob, efd, event_task);
					if (ptask == NULL)
						break;

					(void)tm_reply(efd, ptask->ti_protover,
						TM_OKAY, event_client);
					(void)diswst(efd, info);
					(void)dis_flush(efd);
					break;

				case	IM_POLL_JOB:
					/*
					 ** I must be Mother Superior for the job and
					 ** this is a reply with job resources to
					 ** tally up.
					 **
					 ** auxiliary info (
					 **	recommendation	int;
					 **	cput		u_long;
					 **	mem		u_long;
					 ** )
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer, "got POLL_JOB and I'm not MS");
						goto err;
					}
					exitval = disrsi(stream, &ret);
					BAIL("OK-POLL_JOB exitval")
					pjob->ji_resources[nodeidx - 1].nr_cput = disrul(stream, &ret);
					BAIL("OK-POLL_JOB cput")
					pjob->ji_resources[nodeidx - 1].nr_mem = disrul(stream, &ret);
					BAIL("OK-POLL_JOB mem")
					pjob->ji_resources[nodeidx - 1].nr_cpupercent = disrul(stream, &ret);
					BAIL("OK-POLL_JOB cpupercent")
					recv_resc_used_from_sister(stream, pjob, nodeidx - 1);
					DBPRT(("%s: POLL_JOB %s OKAY kill %d cpu %lu mem %lu\n",
					       __func__, jobid, exitval,
					       pjob->ji_resources[nodeidx - 1].nr_cput,
					       pjob->ji_resources[nodeidx - 1].nr_mem))

					if (exitval)
						pjob->ji_nodekill = np->hn_node;
					break;

#ifdef PMIX
				case	IM_PMIX:
					/*
					 * I must be mother superior for the job and
					 * this is a reply for a PMIX operation.
					 *
					 * auxiliary info (
					 *	operation	int;
					 * )
					 */
					sprintf(log_buffer, "IM_PMIX reply received");
					log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
						jobid, log_buffer);
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"Received IM_PMIX reply "
							"and this is not MS");
						goto err;
					}
					/* TODO: Handle IM_PMIX reply */
					sprintf(log_buffer, "Handle IM_PMIX reply here");
					log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
						jobid, log_buffer);
					break;
#endif /* PMIX */

				case	IM_UPDATE_JOB:
					for (i = 0; i < pjob->ji_numnodes; i++) {
						hnodent *xp = &pjob->ji_hosts[i];
						ep = (eventent *)GET_NEXT(xp->hn_events);
  						if (ep != NULL)
							break;
					}

					if ((nodeidx > 0) && (nodeidx < pjob->ji_numnodes)) {
						char *hn;

						hn  = pjob->ji_hosts[nodeidx].hn_host;
						snprintf(log_buffer, sizeof(log_buffer),
						"received IM_ALL_OK job update from host %s", hn?hn:"");
						log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
					}

					if (ep == NULL) {
						/* no events left */
#ifndef WIN32
						if (do_tolerate_node_failures(pjob) && (pjob->ji_parent2child_job_update_status_pipe != -1)) {
							int cmd = IM_ALL_OKAY;
							log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "all job updates from sisters done");
						    	write_pipe_data(pjob->ji_parent2child_job_update_status_pipe, (int *)&cmd, sizeof(int));
						}
#endif

					}
					break;

				case	IM_EXEC_PROLOGUE:
					for (i = 0; i < pjob->ji_numnodes; i++) {
						hnodent *xp = &pjob->ji_hosts[i];
						ep = (eventent *)GET_NEXT(xp->hn_events);
  						if (ep != NULL)
							break;
					}

					if ((nodeidx > 0) && (nodeidx < pjob->ji_numnodes)) {
						char *hn;
						reliable_job_node *rjn = NULL;

						hn  = pjob->ji_hosts[nodeidx].hn_host;
						snprintf(log_buffer, sizeof(log_buffer),
						"received IM_ALL_OK prologue hook from host %s", hn?hn:"");
						log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
						/* note that do_tolerate_node_failures() could return 0 if
						 * tolerate_node_failures=job_start but job already moved past
						 * the starting up phase. The second if clause will catch
						 * previously failed node host due to not getting ack for
						 * execjob_prologue hook execution, but we got the
						 * ack now, just delayed.
						 */
						rjn = reliable_job_node_find(&pjob->ji_failed_node_list, hn);
						if (do_tolerate_node_failures(pjob) || (rjn != NULL)) {
							(void)reliable_job_node_set_prologue_hook_success(&pjob->ji_node_list, hn);
							if (rjn != NULL) {
								delete_link(&rjn->rjn_link);
								free(rjn);
							}
						}
					}

#ifndef WIN32
					if (ep == NULL) {
						/* no events left */
						if (do_tolerate_node_failures(pjob) && (pjob->ji_mjspipe2 != -1)) {
							int cmd = IM_ALL_OKAY;
							log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "all job prologue hook from sisters done");
						    	write_pipe_data(pjob->ji_mjspipe2, (int *)&cmd, sizeof(int));
						}

					}
#endif
					break;

				default:
					sprintf(log_buffer, "unknown request type %d saved",
						event_com);
					log_err(-1, __func__, log_buffer);
					break;
			}
			break;

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
		case	IM_CRED:
			ret = im_cred_read(pjob, np, stream);
			if (ret != DIS_SUCCESS)
				goto err;
			break;
#endif

		case	IM_ERROR:		/* this is a REPLY */
		case	IM_ERROR2:		/* this is a REPLY */
			/*
			 ** Sender is responding to a request with an error code.
			 **
			 ** auxiliary info (
			 **	error value	int;
			 ** )
			 */
			errcode = disrsi(stream, &ret);
			BAIL("ERROR errcode")

			if (command == IM_ERROR2) {
				errmsg = disrst(stream, &ret);
			}

			switch (event_com) {

				case	IM_JOIN_JOB:
					/*
					 * A MOM has rejected a request to join a job.
					 * We need to send ABORT_JOB to all the sisterhood
					 * and fail the job start to server.
					 * I'm mother superior.
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"JOIN_JOB ERROR and I'm not MS");
						goto err;
					}
					DBPRT(("%s: JOIN_JOB %s returned ERROR %d\n",
						__func__, jobid, errcode))
					job_start_error(pjob, errcode, (do_tolerate_node_failures(pjob) ? addr_to_hostname(addr) : netaddr(addr)), "JOIN_JOB");
					if (errmsg != NULL) {
						log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB,
							LOG_INFO, jobid, errmsg);
					}
					if (!do_tolerate_node_failures(pjob))
						break;

					for (i = 0; i < pjob->ji_numnodes; i++) {
						hnodent *xp = &pjob->ji_hosts[i];
						if ((ep = (eventent *)GET_NEXT(xp->hn_events)) != NULL)
							break;
					}
					if (ep == NULL) {	/* no events */
						int rcode;
						int do_break = 0;

						/* All the JOIN messages have come in. */
						rcode = pre_finish_exec(pjob, 1);
						switch (rcode) {
						  case PRE_FINISH_SUCCESS_JOB_SETUP_SEND:
							do_break = 1;
							break;
						  case PRE_FINISH_FAIL_JOIN_EXTRA:
							goto done;
						  case PRE_FINISH_FAIL_JOB_SETUP_SEND:
							sprintf(log_buffer, "could not send setup");
							goto err;
						  case PRE_FINISH_FAIL:
							goto err;
						}
						if (do_break)
							break;
						/*
						 ** At this point, we are ready to call
						 ** finish_exec and launch the job.
						 */
						if (check_job_substate(pjob, JOB_SUBSTATE_WAITING_JOIN_JOB)) {
							set_job_substate(pjob, JOB_SUBSTATE_PRERUN);
							job_save(pjob);
						}
						finish_exec(pjob);
						log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
					}
					break;

				case	IM_EXEC_PROLOGUE:
					/*
					 * A MOM prologue hook execution has been rejected
					 * for the job.  We need to send ABORT_JOB to all
					 * the sisterhood and fail the job start to server.
					 * I'm mother superior.
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer, "IM_EXEC_PROLOGUE ERROR and I'm not MS");
						goto err;
					}
					DBPRT(("%s: IM_EXEC_PROLOGUE %s returned ERROR %d\n", __func__, jobid, errcode))

					job_start_error(pjob, errcode,(do_tolerate_node_failures(pjob)?addr_to_hostname(addr):netaddr(addr)), "IM_EXEC_PROLOGUE");
					if (errmsg != NULL) {
						log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB,
							  LOG_INFO, jobid, errmsg);
					}
					if (!do_tolerate_node_failures(pjob))
						break;

					for (i = 0; i < pjob->ji_numnodes; i++) {
						hnodent *xp = &pjob->ji_hosts[i];
						if ((ep = (eventent *)GET_NEXT(xp->hn_events))
							!= NULL)
							break;
					}

#ifndef WIN32
					if (ep == NULL) {
						/* no events left */
						if (pjob->ji_mjspipe2 != -1) {
							int cmd = IM_ALL_OKAY;
							log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "all job prologue hooks from sisters executed");
						    	write_pipe_data(pjob->ji_mjspipe2, (int *)&cmd, sizeof(int));
						}
					}
#endif
					break;

				case	IM_SETUP_JOB:
					/*
					 ** A MOM has rejected a request to setup a job.
					 ** If the error is PBSE_NOSUP, the job might be
					 ** able to continue.  Otherwise, we need to send
					 ** ABORT_JOB to all the sisterhood and fail the
					 ** job start to server.  The determination of
					 ** if the job cannot run in the case of PBSE_NOSUP
					 ** is done when MS runs job_setup_final.  Then, the
					 ** lack of information from this node will be
					 ** noted and if it cannot be tolerated, the
					 ** job will be aborted.
					 ** I'm mother superior.
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"SETUP_JOB ERROR and I'm not MS");
						goto err;
					}
					DBPRT(("%s: SETUP_JOB %s returned ERROR %d\n",
						__func__, jobid, errcode))
					if (errcode != PBSE_NOSUP) {
						job_start_error(pjob, errcode, (do_tolerate_node_failures(pjob)?addr_to_hostname(addr):netaddr(addr)), "SETUP_JOB");
					}
					break;

				case	IM_SUSPEND:
				case	IM_RESUME:
					/*
					 ** A MOM has failed to suspend or resume a job.
					 ** I'm mother superior.
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"%s ERROR and I'm not MS",
							(event_com == IM_SUSPEND) ?
							"SUSPEND" : "RESUME");
						goto err;
					}
					sprintf(log_buffer, "%s returned ERROR %d",
						(event_com == IM_SUSPEND) ?
						"SUSPEND" : "RESUME", errcode);
					log_joberr(-1, __func__, log_buffer, jobid);

					if (pjob->ji_mompost != NULL)
						pjob->ji_mompost(pjob, errcode);
					break;

				case	IM_RESTART:
				case	IM_CHECKPOINT:
				case	IM_CHECKPOINT_ABORT:
					/*
					 ** A MOM has failed to do a checkpoint.
					 ** I'm mother superior.
					 **
					 ** auxiliary info (
					 **	none;
					 ** )
					 */
					name = (event_com == IM_RESTART) ? "RESTART" :
						(event_com == IM_CHECKPOINT) ?
						"CHECKPOINT" : "CHECKPOINT_ABORT";

					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"%s ERROR and I'm not MS", name);
						goto err;
					}
					sprintf(log_buffer, "%s returned ERROR %d",
						name, errcode);
					log_joberr(-1, __func__, log_buffer, jobid);

					if (pjob->ji_mompost != NULL)
						pjob->ji_mompost(pjob, errcode);
					break;

				case	IM_ABORT_JOB:
				case	IM_KILL_JOB:
					/*
					 ** Job cleanup failed on a sister.
					 ** Wait for everybody to respond then finishup.
					 ** I'm mother superior.
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"KILL/ABORT ERROR and I'm not MS");
						goto err;
					}
					DBPRT(("%s: KILL/ABORT JOB %s returned ERROR %d\n",
						__func__, jobid, errcode))

					if (errcode == PBSE_HOOKERROR) {
						if (errmsg != NULL) {
							log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB,
								LOG_INFO,
								pjob->ji_qs.ji_jobid, errmsg);
						}
					}
					np->hn_sister = errcode ? errcode : SISTER_KILLDONE;
					for (i=1; i<pjob->ji_numnodes; i++) {
						if ((reliable_job_node_find(&pjob->ji_failed_node_list, pjob->ji_hosts[i].hn_host) == NULL) && (pjob->ji_hosts[i].hn_sister == SISTER_OKAY))
							break;
					}
					if (i == pjob->ji_numnodes) {	/* all dead */
						if (check_job_substate(pjob, JOB_SUBSTATE_KILLSIS)) {
							set_job_substate(pjob, JOB_SUBSTATE_EXITING);
							exiting_tasks = 1;
						}
					}
					break;

				case	IM_DELETE_JOB_REPLY:
					/*
					 ** Job delete failed on a sister.
					 ** Wait for everybody to respond then finishup.
					 ** I'm mother superior.
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"DEL_JOB_REPLY ERROR and I'm not MS");
						goto err;
					}
					DBPRT(("%s: DEL_JOB_REPLY job %s returned ERROR %d\n",
						__func__, jobid, errcode))
					if ((errcode == 0) || (errcode == PBSE_JOBEXIST))
						np->hn_sister = SISTER_KILLDONE;
					else
						np->hn_sister = errcode;
					chk_del_job(pjob, errcode);

					if (errmsg != NULL) {
						log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB,
							LOG_INFO, pjob->ji_qs.ji_jobid, errmsg);
					}
					break;

				case	IM_SPAWN_TASK:
				case	IM_GET_TASKS:
				case	IM_SIGNAL_TASK:
				case	IM_OBIT_TASK:
				case	IM_GET_INFO:
				case	IM_GET_RESC:
					/*
					 ** A user attempt failed, inform process.
					 */
					DBPRT(("%s: REQUEST %d %s returned ERROR %d\n",
						__func__, event_com, jobid, errcode))
					ptask = task_check(pjob, efd, event_task);
					if (ptask == NULL)
						break;
					(void)tm_reply(efd, ptask->ti_protover,
						TM_ERROR, event_client);
					(void)diswsi(efd, errcode);
					(void)dis_flush(efd);
					break;

				case	IM_POLL_JOB:
					/*
					 ** I must be Mother Superior for the job and
					 ** this is an error reply to a poll request.
					 */
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"POLL_JOB ERROR and I'm not MS");
						goto err;
					}

					if (do_tolerate_node_failures(pjob)) {
						snprintf(log_buffer, sizeof(log_buffer), "ignoring POLL_JOB error from failed mom %s as job is tolerant of node failures", np->hn_host?np->hn_host:"");
						log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
						break;
					}

					DBPRT(("%s: POLL_JOB %s returned ERROR %d\n",
						__func__, jobid, errcode))
					sprintf(log_buffer, "POLL_JOB returned ERROR %d",
						errcode);
					log_joberr(-1, __func__, log_buffer, jobid);

					np->hn_sister = errcode ? errcode : SISTER_BADPOLL;
					pjob->ji_nodekill = np->hn_node;
					break;

#ifdef PMIX
				case	IM_PMIX:
					/*
					 * I must be mother superior for the job and
					 * this is an error response to a PMIX request.
					 */
					sprintf(log_buffer, "IM_PMIX error encountered");
					log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
						jobid, log_buffer);
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						sprintf(log_buffer,
							"IM_PMIX error and this is not MS");
						goto err;
					}
					/* TODO: Handle IM_PMIX error */
					sprintf(log_buffer, "Handle IM_PMIX error here");
					log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_DEBUG,
						jobid, log_buffer);
					break;
#endif /* PMIX */

				case	IM_UPDATE_JOB:
					if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
						snprintf(log_buffer, sizeof(log_buffer), "IM_UPDATE_JOB ERROR and I'm not MS");
						goto err;
					}
					DBPRT(("%s: IM_UPDATE_JOB %s returned ERROR %d\n", __func__, jobid, errcode))
					if (errmsg != NULL) {
						log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB,
							  LOG_INFO, jobid, errmsg);
					}
					break;
				default:
					sprintf(log_buffer, "unknown command %d error",
						event_com);
					goto err;
			}
			break;

		case	IM_SEND_RESC:
			/*
			 ** I must be Mother Superior for the job and
			 ** this is a reply with job resources to
			 ** tally up.
			 **
			 */
			if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
				sprintf(log_buffer,
					"got IM_SEND_RESC and I'm not MS");
				goto err;
			}

			nodehost = disrst(stream, &ret);
			BAIL("nodehost")
			resc_idx = -1;
			for (i=0; i < pjob->ji_numrescs; i++) {
				if ((pjob->ji_resources[i].nodehost != NULL) &&
				    (compare_short_hostname(pjob->ji_resources[i].nodehost, nodehost) == 0)) {
					resc_idx = i;
					break;
				}
			}
			if (resc_idx == -1) {
				noderes *tmparr = NULL;
				/* add an entry to pjob->ji_resources */
				/* for this incoming resource report */

				tmparr = (noderes *)realloc(
						pjob->ji_resources,
				 (pjob->ji_numrescs+1)*sizeof(noderes));

				if (tmparr == NULL) {
					snprintf(log_buffer,
						sizeof(log_buffer),
				 	 	"realloc failure extending"
					 	"  pjob->ji_resources");
					goto err;
				}
				pjob->ji_resources = tmparr;
				resc_idx = pjob->ji_numrescs;
				pjob->ji_resources[resc_idx].nodehost =
					strdup(nodehost);
				if (pjob->ji_resources[resc_idx].nodehost == NULL) {
					snprintf(log_buffer, sizeof(log_buffer),
				 	 	"strdup failure setting nodehost");
					goto err;
				}
				clear_attr(&pjob->ji_resources[resc_idx].nr_used,
						&job_attr_def[JOB_ATR_resc_used]);
				pjob->ji_numrescs++;

			}
			pjob->ji_resources[resc_idx].nr_cput =
				disrul(stream, &ret);
			BAIL("resources_used.cput")
			convert_duration_to_str(pjob->ji_resources[resc_idx].nr_cput, timebuf, TIMEBUF_SIZE);

			pjob->ji_resources[resc_idx].nr_mem =
				disrul(stream, &ret);
			BAIL("resources_used.mem")
			pjob->ji_resources[resc_idx].nr_cpupercent =
				disrul(stream, &ret);
			BAIL("resources_used.cpupercent")
			DBPRT(("%s: SEND_RESC %s OKAY nodeidx %d cpu %lu mem %lu\n",
				__func__, jobid, resc_idx,
				pjob->ji_resources[nodeidx-1].nr_cput,
				pjob->ji_resources[nodeidx-1].nr_mem))

			pjob->ji_resources[resc_idx].nr_status = PBS_NODERES_DELETE;

			sprintf(log_buffer,
				"%s cput=%s mem=%lukb", nodehost, timebuf,
				pjob->ji_resources[resc_idx].nr_mem);
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
				LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);

			free(nodehost);
			nodehost = NULL;
			enqueue_update_for_send(pjob, IS_RESCUSED);
			break;

		case	IM_UPDATE_JOB:
			if (check_ms(stream, NULL))
				goto fini;
			if (receive_job_update(stream, pjob) != 0) {
				snprintf(log_buffer,
					sizeof(log_buffer),
					"receive_job_update failed");
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
					LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
				goto err;
			}
			ret = im_compose(stream, jobid, cookie,
				IM_ALL_OKAY,
				event, fromtask, IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS)
				goto err;
			break;
		case IM_RECONNECT_TO_MS:
			if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE)
				resume_multinode(pjob);
			break;

		default:
			sprintf(log_buffer, "unknown command %d sent", command);
			goto err;
	}

done:
	tpp_eom(stream);
	if (reply) {	/* check if write worked */
		if (ret != DIS_SUCCESS ||
			dis_flush(stream) == -1) {
			if (errno != 0)
				log_err(errno, __func__, "dis_flush");
			tpp_close(stream);
			if (np != NULL && np->hn_stream == stream)
				np->hn_stream = -1;
		}
	}
	goto fini;

err:
	/*
	 ** We come here if we got a DIS read error or a protocol
	 ** element is missing, or possibly because we failed to
	 ** create a CPU set.  The likely case is the remote
	 ** host has gone down.
	 */
	if (jobid == NULL)
		log_err(-1, __func__, log_buffer);
	else
		log_joberr(-1, __func__, log_buffer, jobid);
	im_eof(stream, ret);

fini:
	free(jobid);
	free(cookie);
	free(info);
	free(errmsg);
	free(nodehost);
}

// clang-format on

/**
 * @brief
 *      Handle a stream that needs to be closed.
 *      May be either from another Mom, or the server.
 *
 * @param[in] fd - file descriptor
 *
 * @return Void
 *
 */
void
tm_eof(int fd)
{
	job *pjob;
	pbs_task *ptask;
	int i;
	int events;
	tm_task_id fromtask;

	/*
	 ** Search though all the jobs looking for this fd.
	 */
	for (pjob = (job *) GET_NEXT(svr_alljobs);
	     pjob != NULL;
	     pjob = (job *) GET_NEXT(pjob->ji_alljobs)) {
		for (ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
		     ptask;
		     ptask = (pbs_task *)
			     GET_NEXT(ptask->ti_jobtask)) {

			if (ptask->ti_tmfd == NULL)
				continue;

			for (i = 0; i < ptask->ti_tmnum; i++) {
				if (ptask->ti_tmfd[i] == fd) {
					fromtask = ptask->ti_qs.ti_task;
					ptask->ti_tmfd[i] = -1;
					goto cleanup;
				}
			}
		}
	}
	log_err(-1, __func__, "no matching task found");
	return;

cleanup:

	events = 0;
	/*
	 ** Check for events waiting to be sent to the dead client.
	 */
	for (i = 0; i < pjob->ji_numnodes; i++) {
		eventent *ep;
		hnodent *np = &pjob->ji_hosts[i];

		ep = (eventent *) GET_NEXT(np->hn_events);
		while (ep) {
			if (ep->ee_fd == fd) {
				DBPRT(("%s: fd %d drop command %d "
				       "client %d event %d task %8.8X\n",
				       __func__, ep->ee_fd, ep->ee_command,
				       ep->ee_client, ep->ee_event,
				       ep->ee_taskid))
				ep->ee_fd = -1;
				events++;
			}

			ep = (eventent *) GET_NEXT(ep->ee_next);
		}
	}

	/*
	 ** Throw away any obits the dead client was waiting for.
	 */
	for (ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
	     ptask;
	     ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask)) {
		obitent *pobit;

		pobit = (obitent *) GET_NEXT(ptask->ti_obits);
		while (pobit) {
			obitent *next = GET_NEXT(pobit->oe_next);

			if (pobit->oe_type == OBIT_TYPE_TMEVENT &&
			    pobit->oe_u.oe_tm.oe_fd == fd) {
				DBPRT(("%s: fd %d drop obit event %d "
				       "node %d task %8.8X\n",
				       __func__,
				       pobit->oe_u.oe_tm.oe_fd,
				       pobit->oe_u.oe_tm.oe_event,
				       pobit->oe_u.oe_tm.oe_node,
				       pobit->oe_u.oe_tm.oe_taskid))
				delete_link(&pobit->oe_next);
				free(pobit);
				events++;
			}
			pobit = next;
		}
	}

	if (events > 0) {
		sprintf(log_buffer,
			"%d events dropped for TM client in task %8.8X",
			events, fromtask);
		log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG,
			  pjob->ji_qs.ji_jobid, log_buffer);
	}
	return;
}

#define TASK_FDMAX 10

/**
 *
 * @brief
 *	Input is coming from a process running on this host which
 *	should be part of one of the jobs I am part of.  The i/o
 *	will take place using DIS over a tcp fd.
 *
 * @param[in]	fd - the stream to read input from.
 * @param[in]	version - protocol version
 *
 * @note
 *	Read the stream to get a task manager request.  Format the reply
 *	and write it back.
 *
 *	read (
 *		jobid			string
 *		cookie			string
 *		command			int
 *		event			int
 *		from taskid		uint
 *	)
 *
 * @return int
 * @retval 0		for success
 * @retval non-zero 	for failure
 *
 */
int
tm_request(int fd, int version)
{
	extern int reqnum;
	int command;
	int reply = TRUE;
	int ret = DIS_SUCCESS;
	char *jobid = NULL;
	char *cookie = NULL;
	char *oreo;
	job *pjob = NULL;
	eventent *ep;
	pbs_task *ptask = NULL;
	vmpiprocs *pnode;
	hnodent *phost;
	int i, event, numele;
	size_t len;
	long ipadd;
	char **argv, **envp;
	char *name, *info;
	infoent *ip;
	int signum;
	int vnodenum;
	int prev_error = 0;
	tm_node_id tvnodeid;
	tm_node_id myvnodeid;
	tm_task_id taskid, fromtask;
	extern u_long localaddr;
	char hook_msg[HOOK_MSG_SIZE + 1];
	int argc = 0;
	int found_empty_string = 0;
	mom_hook_input_t hook_input;

	conn_t *conn = get_conn(fd);
	if (!conn) {
		sprintf(log_buffer, "not found fd=%d in connection table", fd);
		closesocket(fd);
		if (cookie)
			free(cookie);
		return -1;
	}

	if (conn->cn_addr != localaddr) {
		sprintf(log_buffer, "non-local connect");
		goto err;
	}
	if (version != TM_PROTOCOL_VER &&
	    version != TM_PROTOCOL_OLD) {
		sprintf(log_buffer, "bad protocol version %d", version);
		goto err;
	}

	jobid = disrst(fd, &ret);
	BAIL("jobid")
	cookie = disrst(fd, &ret);
	BAIL("cookie")
	command = disrsi(fd, &ret);
	BAIL("command")
	event = disrsi(fd, &ret);
	BAIL("event")
	fromtask = disrui(fd, &ret);
	BAIL("fromtask")

	DBPRT(("%s: job %s cookie %s task %8.8X com %d event %d\n", __func__,
	       jobid, cookie, fromtask, command, event))

	/*
	 **	Check to see if we are doing a TM_ATTACH.  If so,
	 **	it is a special case since there will be no existing
	 **	task to look up.
	 */
	if (command == TM_ATTACH) {
		static char id[] = "tm_attach";
		pid_t pid;
		pid_t sid;
		extern int attach_allow;
		uid_t proc_uid;
		uid_t jobowner;
#ifdef WIN32
		char proc_uname[UNLEN + 1] = {'\0'};
		char comm[MAX_PATH] = {'\0'};
		HANDLE hProcess = INVALID_HANDLE_VALUE;
		char *user_name = disrst(fd, &ret);
		BAIL("uid")
#else
		char comm[32] = {'\0'};
		uid_t uid;
		uid = disrui(fd, &ret);
		BAIL("uid")
#endif
		pid = disrui(fd, &ret);
		BAIL("pid")

		/*
		 ** See if we are allowed to attach.
		 */
		if (!attach_allow) {
			sprintf(log_buffer, "%s: not allowed", id);
			i = TM_ENOTIMPLEMENTED;
			goto aterr;
		}

		/*
		 ** The cookie must be NULL.
		 */
		if (*cookie != '\0') {
			sprintf(log_buffer, "%s: job cookie is not NULL", id);
			goto err;
		}

		if (*jobid == '\0') { /* search for job */
			job *pj;

			i = 0;
			for (pj = (job *) GET_NEXT(svr_alljobs);
			     pj != NULL;
			     pj = (job *) GET_NEXT(pj->ji_alljobs)) {
#ifdef WIN32
				if (user_name == NULL || pj->ji_user->pw_name == NULL || (strcasecmp(user_name, pj->ji_user->pw_name) != 0))
#else
				if (uid != pj->ji_qs.ji_un.ji_momt.ji_exuid)
#endif
					continue;

				if (!check_job_substate(pj, JOB_SUBSTATE_RUNNING) && !check_job_substate(pj, JOB_SUBSTATE_PRERUN))
					continue;
				i++;
				pjob = pj;
			}
			/*
			 ** If one and only one match is found, pjob is good.
			 */
			if (i != 1) {
				sprintf(log_buffer,
					"%s: job could not be determined", id);
				i = TM_ENOTFOUND;
				goto aterr;
			}
			jobowner = pjob->ji_qs.ji_un.ji_momt.ji_exuid;
		} else {
			/* verify the jobid is known */
			if ((pjob = find_job(jobid)) == NULL) {
				sprintf(log_buffer, "job not found");
				i = TM_ENOTFOUND;
				goto aterr;
			}
			if (!check_job_substate(pjob, JOB_SUBSTATE_RUNNING) && !check_job_substate(pjob, JOB_SUBSTATE_PRERUN)) {
				sprintf(log_buffer, "job not running");
				i = TM_ENOTFOUND;
				goto aterr;
			}
			/*
			 ** The uid must match the job.
			 */
			jobowner = pjob->ji_qs.ji_un.ji_momt.ji_exuid;
#ifdef WIN32
			if (user_name == NULL || pjob->ji_user->pw_name == NULL || (strcasecmp(user_name, pjob->ji_user->pw_name) != 0)) {
				sprintf(log_buffer,
					"%s: uid mismatch %s to job %s",
					id, user_name, pjob->ji_user->pw_name);
				i = TM_EUSER;
				goto aterr;
			}
#else
			if (uid != jobowner) {
				sprintf(log_buffer,
					"%s: uid mismatch %d to job %d",
					id, uid, jobowner);
				i = TM_EUSER;
				goto aterr;
			}
#endif
		}

		mom_hook_input_init(&hook_input);
		hook_input.pjob = pjob;
		hook_input.pid = pid;

		switch (mom_process_hooks(HOOK_EVENT_EXECJOB_ATTACH,
					  PBS_MOM_SERVICE_NAME, mom_host,
					  &hook_input, NULL,
					  hook_msg, sizeof(hook_msg), 1)) {
			case 0: /* explicit reject */
				/* maybe a new TM error? */
				i = TM_EHOOK;
				/* in aterr, log_buffer gets printed */
				snprintf(log_buffer, sizeof(log_buffer),
					 "execjob_attach hook rejected request");
				goto aterr;
			case 1: /* explicit accept */
				break;
			case 2: /* no hook script executed - go ahead and accept event*/
				break;
			default:
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
					  LOG_INFO, "",
					  "execjob_attach event: accept req by default");
		}

		/*
		 ** Get the session, uid and command name for the pid.
		 ** I need to bump reqnum so dep_procinfo will get a
		 ** fresh copy of the process table.
		 */
		reqnum++;
#ifdef WIN32
		i = dep_procinfo(pid, &sid, &proc_uid, proc_uname, sizeof(proc_uname), comm, sizeof(comm));
#else
		i = dep_procinfo(pid, &sid, &proc_uid, comm, sizeof(comm));
#endif
		if (i != TM_OKAY) {
#ifdef linux
			char procid[MAXPATHLEN + 1];
			struct stat sbuf;

			snprintf(procid, sizeof(procid), "/proc/%d", pid);
			if (stat(procid, &sbuf) == -1)
				goto aterr;

			sid = getsid(pid);
			if (sid == -1)
				goto aterr;

			proc_uid = sbuf.st_uid;
#else
			goto aterr;
#endif
		}
		if (sid <= 1) {
			i = TM_ENOPROC;
			goto aterr;
		}

		/*
		 ** Search all the tasks to make sure the session has
		 ** not already been attached.
		 */
		ptask = find_session(sid);
		if (ptask != NULL) {
			sprintf(log_buffer, "%s: sid %d already attached",
				id, sid);
			i = TM_ESESSION;
			goto aterr;
		}

		/*
		 ** The process must be owned by
		 ** the job owner.
		 */
#ifdef WIN32
		if (proc_uname == NULL || pjob->ji_user->pw_name == NULL || (strcasecmp(proc_uname, pjob->ji_user->pw_name) != 0)) {
			sprintf(log_buffer,
				"%s: uid mismatch proc %s to job %s",
				id, proc_uname, pjob->ji_user->pw_name);
			i = TM_EOWNER;
			goto aterr;
		}
#else
		if (proc_uid != jobowner) {
			sprintf(log_buffer,
				"%s: uid mismatch proc %d to job %d",
				id, proc_uid, jobowner);
			i = TM_EOWNER;
			goto aterr;
		}
#endif
		/*
		 **	Create a new task for the session.
		 */
#ifdef WIN32
		if ((hProcess = OpenProcess(PROCESS_ALL_ACCESS, TRUE, (DWORD) sid)) == NULL) {
			sprintf(log_buffer, "%s: OpenProcess Failed for pid %d with error %d", id, sid, GetLastError());
			i = TM_ENOPROC;
			goto aterr;
		}
#endif

		ptask = momtask_create(pjob);
		if (ptask == NULL) {
			sprintf(log_buffer, "%s: task create failed", id);
			i = TM_ESYSTEM;
			goto aterr;
		}

		strcpy(ptask->ti_qs.ti_parentjobid, jobid);
		/*
		 **	The parent self virtual nodes are not known.
		 */
		ptask->ti_qs.ti_parentnode = TM_ERROR_NODE;
		ptask->ti_qs.ti_myvnode = TM_ERROR_NODE;
		ptask->ti_qs.ti_parenttask = TM_INIT_TASK;
		ptask->ti_qs.ti_sid = sid;
#ifdef WIN32
		ptask->ti_hProc = hProcess;
		if (pjob->ji_hJob == NULL) {
			pjob->ji_hJob = CreateJobObject(NULL, pjob->ji_qs.ji_jobid);
			if (pjob->ji_hJob != NULL) {
				/*
				 * When a process is attached using -p option of pbs_attach
				 * and the processe is not running under session 0,
				 * or when a pbs_attach is run outside the job in a session != 0,
				 * it may fail to assign to the Windows Job object
				 * but its resource accounting and resource limit enforcement
				 * will still be applicable via polling.
				 * Any processes created by pbs_attach are automatically
				 * assigned to the job object as long as pbs_attach
				 * gets run inside the job
				 */
				(void) AssignProcessToJobObject(pjob->ji_hJob, hProcess);
			}
		} else {
			/*
			 * When a process is attached using -p option of pbs_attach
			 * and the process is not running under session 0,
			 * or when a pbs_attach is run outside the job in a session != 0,
			 * it may fail to assign to the Windows Job object
			 * but its resource accounting and resource limit enforcement
			 * will still be applicable via polling.
			 * Any processes created by pbs_attach are automatically
			 * assigned to the job object as long as pbs_attach
			 * gets run inside the job
			 */
			(void) AssignProcessToJobObject(pjob->ji_hJob, hProcess);
		}
#endif
		ptask->ti_qs.ti_status = TI_STATE_RUNNING;
		ptask->ti_flags |= TI_FLAGS_ORPHAN;
		(void) task_save(ptask);

		if (!check_job_substate(pjob, JOB_SUBSTATE_RUNNING)) {
			set_job_state(pjob, JOB_STATE_LTR_RUNNING);
			set_job_substate(pjob, JOB_SUBSTATE_RUNNING);
			job_save(pjob);
		}

		/*
		 ** Add to list of polled jobs if it isn't
		 ** already there.
		 */
		if (is_linked(&mom_polljobs,
			      &pjob->ji_jobque) == 0) {
			append_link(&mom_polljobs,
				    &pjob->ji_jobque, pjob);
		}

		/*
		 ** Do any dependent attach operation.
		 */
		i = dep_attach(ptask);
		if (i != TM_OKAY) {
			goto aterr;
		}

		sprintf(log_buffer,
			"pid %d sid %d cmd %s attached as task %8.8X",
			pid, sid, comm, ptask->ti_qs.ti_task);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);
		/*
		 * Do any dependent attach operation.
		 */
	aterr:
		if (i != TM_OKAY) {
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB,
				  LOG_NOTICE,
				  (pjob == NULL) ? "N/A" : pjob->ji_qs.ji_jobid,
				  log_buffer);
		}
		ret = tm_reply(fd, version,
			       (i == TM_OKAY) ? i : TM_ERROR, event);
		if (ret != DIS_SUCCESS)
			goto done;
		ret = diswui(fd, ((i == TM_OKAY) ? ptask->ti_qs.ti_task : i));
		goto done;
	}

	/* Continue normal processing for all other commands. */
	/* verify the jobid is known */
	if ((pjob = find_job(jobid)) == NULL) {
		sprintf(log_buffer, "job not found");
		goto err;
	}

	/* see if the cookie matches */
	if (!(is_jattr_set(pjob, JOB_ATR_Cookie))) {
		sprintf(log_buffer, "job has no cookie");
		goto err;
	}
	oreo = get_jattr_str(pjob, JOB_ATR_Cookie);
	if (strcmp(oreo, cookie) != 0) {
		DBPRT(("job cookie %s message %s", oreo, cookie))
		sprintf(log_buffer, "bad cookie");
		goto err;
	}

	/* verify this taskid is my baby */
	ptask = task_find(pjob, fromtask);
	if (ptask == NULL) { /* not found */
		sprintf(log_buffer, "task %8.8X not found", fromtask);
		log_joberr(-1, __func__, log_buffer, jobid);
		ret = tm_reply(fd, version, TM_ERROR, event);
		if (ret != DIS_SUCCESS)
			goto done;
		ret = diswsi(fd, TM_ENOTFOUND);
		goto done;
	}
	myvnodeid = ptask->ti_qs.ti_myvnode;
	conn->cn_oncl = tm_eof;

	if (ptask->ti_protover != -1 && ptask->ti_protover != version) {
		/* the protocol version should not change */
		sprintf(log_buffer,
			"inconsistent TM version %d from task %8.8X",
			version, fromtask);
		goto err;
	}
	ptask->ti_protover = version;

	if (ptask->ti_tmfd == NULL) {
		ptask->ti_tmfd = (int *) calloc(TASK_FDMAX, sizeof(int));
		assert(ptask->ti_tmfd != NULL);
		ptask->ti_tmnum = 0;
		ptask->ti_tmmax = TASK_FDMAX;
	}
	for (i = 0; i < ptask->ti_tmnum; i++) {
		if (ptask->ti_tmfd[i] == fd)
			break;
	}
	if (i == ptask->ti_tmnum) { /* didn't find existing fd */
		for (i = 0; i < ptask->ti_tmnum; i++) {
			if (ptask->ti_tmfd[i] == -1)
				break;
		}
		if (i == ptask->ti_tmnum) { /* no free slot */
			if (ptask->ti_tmnum == ptask->ti_tmmax) {
				/* no more space */
				ptask->ti_tmmax *= 2;
				ptask->ti_tmfd = (int *) realloc(ptask->ti_tmfd,
								 ptask->ti_tmmax * sizeof(int));
				assert(ptask->ti_tmfd != NULL);
			}
			i = ptask->ti_tmnum++;
		}
		ptask->ti_tmfd[i] = fd;
	}

	/* set no timeout so connection is not closed for being idle */
	conn->cn_authen |= PBS_NET_CONN_NOTIMEOUT;

	switch (command) {

		case TM_INIT:
			/*
			 ** A request to initialize.
			 **
			 **	send (
			 **		number of nodes int;
			 **		nodeid[0]       int;
			 **		...
			 **		nodeid[n-1]     int;
			 **		parent jobid    string;
			 **		parent nodeid   int;
			 **		parent taskid   int;
			 **	)
			 */
			DBPRT(("%s: INIT %s\n", __func__, jobid))
			if (prev_error)
				goto done;

			ret = tm_reply(fd, version, TM_OKAY, event);
			if (ret != DIS_SUCCESS)
				goto done;
			vnodenum = pjob->ji_numvnod;
			ret = diswui(fd, vnodenum); /* num nodes */
			if (ret != DIS_SUCCESS)
				goto done;

			pnode = pjob->ji_vnods;
			for (i = 0; i < vnodenum; i++) {
				ret = diswsi(fd, pnode[i].vn_node);
				if (ret != DIS_SUCCESS)
					goto done;
			}
			ret = diswst(fd, ptask->ti_qs.ti_parentjobid); /* dad job */
			if (ret != DIS_SUCCESS)
				goto done;
			ret = diswsi(fd, ptask->ti_qs.ti_parentnode); /* dad node */
			if (ret != DIS_SUCCESS)
				goto done;
			ret = diswui(fd, ptask->ti_qs.ti_parenttask); /* dad task */
			if (ret != DIS_SUCCESS)
				goto done;

			ptask->ti_flags |= TI_FLAGS_INIT;
			goto done;

		case TM_POSTINFO:
			/*
			 ** Post named info for a task.
			 **
			 **	read (
			 **		name		string;
			 **		info		counted string;
			 **	)
			 */
			name = disrst(fd, &ret);
			BAIL("POSTINFO name")
			info = disrcs(fd, &len, &ret);
			if (ret != DIS_SUCCESS) {
				free(name);
				sprintf(log_buffer, bail_format, "POSTINFO info");
				goto err;
			}
			DBPRT(("%s: POSTINFO %s task %8.8X sent info %s:%s(%d)\n", __func__,
			       jobid, fromtask, name, info, (int) len))
			if (prev_error) {
				free(name);
				free(info);
				goto done;
			}

			task_saveinfo(ptask, name, info, (int) len);
			ret = tm_reply(fd, version, TM_OKAY, event);
			goto done;

		case TM_REGISTER:
			sprintf(log_buffer, "REGISTER received - NOT IMPLEMENTED");
			(void) tm_reply(fd, version, TM_ERROR, event);
			(void) diswsi(fd, TM_ENOTIMPLEMENTED);
			(void) dis_flush(fd);
			goto err;

		default:
			break;
	}

	/*
	 ** All requests beside TM_INIT and TM_POSTINFO
	 ** require a node number where the action will take place.
	 ** Read that and check that it is legal.
	 **
	 **	read (
	 **		node number		int
	 **	)
	 */
	tvnodeid = disrui(fd, &ret);
	BAIL("tvnodeid")

	pnode = pjob->ji_vnods;
	for (i = 0; i < pjob->ji_numvnod; i++, pnode++) {
		if (pnode->vn_node == tvnodeid)
			break;
	}
	if (i == pjob->ji_numvnod) {
		sprintf(log_buffer, "node %d not found", tvnodeid);
		log_joberr(-1, __func__, log_buffer, jobid);
		ret = tm_reply(fd, version, TM_ERROR, event);
		if (ret != DIS_SUCCESS)
			goto done;
		ret = diswsi(fd, TM_ENOTFOUND);
		if (ret != DIS_SUCCESS)
			goto done;
		prev_error = 1;
	}
	phost = pnode->vn_host;

	switch (command) {

		case TM_TASKS:
			/*
			 ** A request to read the list of tasks that a
			 ** particular node has charge of.
			 */
			DBPRT(("%s: TASKS %s on node %d\n",
			       __func__, jobid, tvnodeid))
			if (prev_error)
				goto done;

			if (pjob->ji_nodeid != TO_PHYNODE(tvnodeid)) { /* not me */
				ep = event_alloc(pjob, IM_GET_TASKS, fd, phost,
						 event, fromtask);
				ret = im_compose(phost->hn_stream, jobid, cookie,
						 IM_GET_TASKS, ep->ee_event, fromtask, IM_OLD_PROTOCOL_VER);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswui(phost->hn_stream, myvnodeid);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswui(phost->hn_stream, tvnodeid);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = (dis_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS;
				if (ret != DIS_SUCCESS)
					goto done;
				reply = FALSE;
				goto done;
			}
			ret = tm_reply(fd, version, TM_OKAY, event);
			if (ret != DIS_SUCCESS)
				goto done;
			for (ptask = (pbs_task *) GET_NEXT(pjob->ji_tasks);
			     ptask;
			     ptask = (pbs_task *) GET_NEXT(ptask->ti_jobtask)) {
				ret = diswui(fd, ptask->ti_qs.ti_task);
				if (ret != DIS_SUCCESS)
					goto done;
			}
			ret = diswui(fd, TM_NULL_TASK);
			break;

		case TM_SPAWN:
			/*
			 ** Spawn a task on the requested node.
			 **
			 **	read (
			 **		argc		int;
			 **		arg 0		string;
			 **		...
			 **		arg argc-1	string;
			 **		env 0		string;
			 **		...
			 **		env m		string;
			 **	)
			 */
			DBPRT(("%s: SPAWN %s on node %d\n",
			       __func__, jobid, tvnodeid))
			argc = disrui(fd, &ret);
			if (ret != DIS_SUCCESS)
				goto done;
			argv = (char **) calloc(argc + 1, sizeof(char *));
			assert(argv);
			for (i = 0; i < argc; i++) {
				argv[i] = disrst(fd, &ret);
				if (ret != DIS_SUCCESS) {
					argv[i] = NULL;
					arrayfree(argv);
					goto done;
				}
				if (strlen(argv[i]) == 0)
					found_empty_string = 1; /* arguments contains empty string, Used if spawn on another MOM*/
			}
			argv[i] = NULL;

			numele = 3;
			envp = (char **) calloc(numele, sizeof(char *));
			assert(envp);
			for (i = 0;; i++) {
				char *env;

				env = disrst(fd, &ret);
				if (ret != DIS_SUCCESS && ret != DIS_EOD) {
					arrayfree(argv);
					envp[i] = NULL;
					arrayfree(envp);
					goto done;
				}
				if (env == NULL)
					break;
				if (*env == '\0') {
					free(env);
					break;
				}
				/*
				 **	Need to remember extra slot for NULL
				 **	at the end.  Thanks to Pete Wyckoff
				 **	for finding this.
				 */
				if (i == numele - 1) {
					numele *= 2;
					envp = (char **) realloc(envp,
								 numele * sizeof(char *));
					assert(envp);
				}
				envp[i] = env;
			}
			envp[i] = NULL;
			ret = DIS_SUCCESS;

			if (prev_error) {
				arrayfree(argv);
				arrayfree(envp);
				goto done;
			}

			/*
			 ** If the spawn happens on me, just do it.
			 */
			if (pjob->ji_nodeid == TO_PHYNODE(tvnodeid)) {
#ifdef PMIX
				pbs_pmix_register_client(pjob, tvnodeid, &envp);
#endif
				i = TM_ERROR;
				ptask = momtask_create(pjob);
				if (ptask != NULL) {
					strcpy(ptask->ti_qs.ti_parentjobid, jobid);
					ptask->ti_qs.ti_parentnode = myvnodeid;
					ptask->ti_qs.ti_myvnode = tvnodeid;
					ptask->ti_qs.ti_parenttask = fromtask;
					if (task_save(ptask) != -1) {
						ret = start_process(ptask, argv, envp, false);
						if (ret == PBSE_NONE) {
							i = TM_OKAY;
						} else if (ret == PBSE_SYSTEM) {
							i = TM_ESYSTEM;
							ptask->ti_qs.ti_status = TI_STATE_EXITED;
						}
					}
				}
				arrayfree(argv);
				arrayfree(envp);
				ret = tm_reply(fd, version, i, event);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswui(fd, ((i == TM_ERROR) ? TM_ESYSTEM : ptask->ti_qs.ti_task));
				goto done;
			}
			/*
			 ** Sending to another MOM.
			 */
			ep = event_alloc(pjob, IM_SPAWN_TASK, fd, phost,
					 event, fromtask);
			ret = im_compose(phost->hn_stream, jobid, cookie,
					 IM_SPAWN_TASK, ep->ee_event, fromtask,
					 found_empty_string ? IM_PROTOCOL_VER : IM_OLD_PROTOCOL_VER);
			if (ret != DIS_SUCCESS) {
				arrayfree(argv);
				arrayfree(envp);
				goto done;
			}
			ret = diswui(phost->hn_stream, myvnodeid);
			if (ret != DIS_SUCCESS) {
				arrayfree(argv);
				arrayfree(envp);
				goto done;
			}
			ret = diswui(phost->hn_stream, tvnodeid);
			if (ret != DIS_SUCCESS) {
				arrayfree(argv);
				arrayfree(envp);
				goto done;
			}
			ret = diswui(phost->hn_stream, TM_NULL_TASK);
			if (ret != DIS_SUCCESS) {
				arrayfree(argv);
				arrayfree(envp);
				goto done;
			}
			if (found_empty_string) {
				ret = diswui(phost->hn_stream, argc);
				if (ret != DIS_SUCCESS) {
					arrayfree(argv);
					arrayfree(envp);
					goto done;
				}
				for (i = 0; i < argc; i++) {
					ret = diswst(phost->hn_stream, argv[i]);
					if (ret != DIS_SUCCESS) {
						arrayfree(argv);
						arrayfree(envp);
						goto done;
					}
				}
			} else {
				for (i = 0; argv[i]; i++) {
					ret = diswst(phost->hn_stream, argv[i]);
					if (ret != DIS_SUCCESS) {
						arrayfree(argv);
						arrayfree(envp);
						goto done;
					}
				}
				ret = diswst(phost->hn_stream, "");
				if (ret != DIS_SUCCESS) {
					arrayfree(argv);
					arrayfree(envp);
					goto done;
				}
			}
			for (i = 0; envp[i]; i++) {
				ret = diswst(phost->hn_stream, envp[i]);
				if (ret != DIS_SUCCESS) {
					arrayfree(argv);
					arrayfree(envp);
					goto done;
				}
			}
			ret = (dis_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS;
			if (ret != DIS_SUCCESS) {
				arrayfree(argv);
				arrayfree(envp);
				goto done;
			}
			reply = FALSE;
			arrayfree(argv);
			arrayfree(envp);

			break;

		case TM_SIGNAL:
			/*
			 ** Send a signal to the specified task.
			 **
			 **	read (
			 **		to task			int
			 **		signal			int
			 **	)
			 */
			taskid = disrui(fd, &ret);
			BAIL("SIGNAL taskid")
			signum = disrui(fd, &ret);
			BAIL("SIGNAL signum")
			DBPRT(("%s: SIGNAL %s on node %d task %8.8X sig %d\n",
			       __func__, jobid, tvnodeid, taskid, signum))
			if (prev_error)
				goto done;

			if (pjob->ji_nodeid != TO_PHYNODE(tvnodeid)) { /* not me */
				ep = event_alloc(pjob, IM_SIGNAL_TASK, fd, phost,
						 event, fromtask);
				ret = im_compose(phost->hn_stream, jobid, cookie,
						 IM_SIGNAL_TASK, ep->ee_event, fromtask, IM_OLD_PROTOCOL_VER);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswui(phost->hn_stream, myvnodeid);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswui(phost->hn_stream, taskid);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswsi(phost->hn_stream, signum);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = (dis_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS;
				if (ret != DIS_SUCCESS)
					goto done;
				reply = FALSE;
				goto done;
			}

			/*
			 ** Task should be here... look for it.
			 */
			if ((ptask = task_find(pjob, taskid)) == NULL) {
				ret = tm_reply(fd, version, TM_ERROR, event);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswsi(fd, TM_ENOTFOUND);
				break;
			}
			kill_task(ptask, signum, 0);
			ret = tm_reply(fd, version, TM_OKAY, event);
			break;

		case TM_OBIT:
			/*
			 ** Register an obit request for the specified task.
			 **
			 **	read (
			 **		task to watch		int
			 **	)
			 */
			taskid = disrui(fd, &ret);
			BAIL("OBIT taskid")
			DBPRT(("%s: fd %d OBIT %s on node %d task %8.8X\n",
			       __func__, fd, jobid, tvnodeid, taskid))
			if (prev_error)
				goto done;

			if (pjob->ji_nodeid != TO_PHYNODE(tvnodeid)) { /* not me */
				ep = event_alloc(pjob, IM_OBIT_TASK, fd, phost,
						 event, fromtask);
				ret = im_compose(phost->hn_stream, jobid, cookie,
						 IM_OBIT_TASK, ep->ee_event, fromtask, IM_OLD_PROTOCOL_VER);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswui(phost->hn_stream, myvnodeid);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswui(phost->hn_stream, taskid);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = (dis_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS;
				if (ret != DIS_SUCCESS)
					goto done;
				reply = FALSE;
				goto done;
			}
			/*
			 ** Task should be here... look for it.
			 */
			if ((ptask = task_find(pjob, taskid)) == NULL) {
				ret = tm_reply(fd, version, TM_ERROR, event);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswsi(fd, TM_ENOTFOUND);
				break;
			}
			if (ptask->ti_qs.ti_status >= TI_STATE_EXITED) {
				ret = tm_reply(fd, version, TM_OKAY, event);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswsi(fd, ptask->ti_qs.ti_exitstat);
			} else {
				obitent *op = (obitent *) malloc(sizeof(obitent));
				assert(op);
				CLEAR_LINK(op->oe_next);
				append_link(&ptask->ti_obits, &op->oe_next, op);
				op->oe_type = OBIT_TYPE_TMEVENT;
				op->oe_u.oe_tm.oe_fd = fd;
				op->oe_u.oe_tm.oe_node = tvnodeid;
				op->oe_u.oe_tm.oe_event = event;
				op->oe_u.oe_tm.oe_taskid = fromtask;
				reply = 0;
			}
			break;

		case TM_GETINFO:
			/*
			 ** Get named info for a specified task.
			 **
			 **	read (
			 **		task			int
			 **		name			string
			 **	)
			 */
			taskid = disrui(fd, &ret);
			BAIL("GETINFO taskid")
			name = disrst(fd, &ret);
			BAIL("GETINFO name")
			DBPRT(("%s: GETINFO %s from node %d task %8.8X name %s\n",
			       __func__, jobid, tvnodeid, taskid, name))
			if (prev_error)
				goto done;

			if (pjob->ji_nodeid != TO_PHYNODE(tvnodeid)) { /* not me */
				ep = event_alloc(pjob, IM_GET_INFO, fd, phost,
						 event, fromtask);
				ret = im_compose(phost->hn_stream, jobid, cookie,
						 IM_GET_INFO, ep->ee_event, fromtask, IM_OLD_PROTOCOL_VER);
				if (ret == DIS_SUCCESS) {
					ret = diswui(phost->hn_stream, myvnodeid);
					if (ret == DIS_SUCCESS) {
						ret = diswui(phost->hn_stream, taskid);
						if (ret == DIS_SUCCESS) {
							ret = diswst(phost->hn_stream,
								     name);
						}
					}
				}
				free(name);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = (dis_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS;
				if (ret != DIS_SUCCESS)
					goto done;
				reply = FALSE;
				goto done;
			}

			/*
			 ** Task should be here... look for it.
			 */
			if ((ptask = task_find(pjob, taskid)) != NULL) {
				if ((ip = task_findinfo(ptask, name)) != NULL) {
					ret = tm_reply(fd, version, TM_OKAY, event);
					if (ret != DIS_SUCCESS)
						goto done;
					ret = diswcs(fd, ip->ie_info, ip->ie_len);
					break;
				}
			}
			ret = tm_reply(fd, version, TM_ERROR, event);
			if (ret != DIS_SUCCESS)
				goto done;
			ret = diswsi(fd, TM_ENOTFOUND);
			break;

		case TM_RESOURCES:
			/*
			 ** Get resource string for a node.
			 */
			DBPRT(("%s: RESOURCES %s for node %d\n", __func__, jobid, tvnodeid))
			if (prev_error)
				goto done;

			if (pjob->ji_nodeid != TO_PHYNODE(tvnodeid)) { /* not me */
				ep = event_alloc(pjob, IM_GET_RESC, fd, phost,
						 event, fromtask);
				ret = im_compose(phost->hn_stream, jobid, cookie,
						 IM_GET_RESC, ep->ee_event, fromtask, IM_OLD_PROTOCOL_VER);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = diswui(phost->hn_stream, myvnodeid);
				if (ret != DIS_SUCCESS)
					goto done;
				ret = (dis_flush(phost->hn_stream) == -1) ? DIS_NOCOMMIT : DIS_SUCCESS;
				if (ret != DIS_SUCCESS)
					goto done;
				reply = FALSE;
				goto done;
			}

			info = resc_string(pjob);
			ret = tm_reply(fd, version, TM_OKAY, event);
			if (ret != DIS_SUCCESS)
				goto done;
			ret = diswst(fd, info);
			free(info);
			break;

		default:
			sprintf(log_buffer, "%s: unknown command %d", jobid, command);
			(void) tm_reply(fd, version, TM_ERROR, event);
			(void) diswsi(fd, TM_EUNKNOWNCMD);
			(void) dis_flush(fd);
			goto err;
	}

done:
	if (reply) {
		DBPRT(("%s: REPLY %s\n", __func__, dis_emsg[ret]))
		if (ret != DIS_SUCCESS || dis_flush(fd) == -1) {
			sprintf(log_buffer, "comm failed %s", dis_emsg[ret]);
			log_err(errno, __func__, log_buffer);
			close_conn(fd);
		}
	}

	free(jobid);
	free(cookie);
	return 0;

err:
	if (jobid != NULL) {
		log_joberr(-1, __func__, log_buffer, jobid);
		free(jobid);
	} else
		log_err(-1, __func__, log_buffer);

	ipadd = conn->cn_addr;
	sprintf(log_buffer,
		"message refused from port %d addr %ld.%ld.%ld.%ld",
		conn->cn_port,
		(ipadd & 0xff000000) >> 24,
		(ipadd & 0x00ff0000) >> 16,
		(ipadd & 0x0000ff00) >> 8,
		(ipadd & 0x000000ff));
	close_conn(fd);
	if (cookie)
		free(cookie);
	return -1;
}

/**
 * @brief
 *	send_join_job_restart - send the JOIN_JOB or RESTART message from
 *	a Mother Superior to a Sister.
 *
 * @par Functionality:
 *	The message header is composed and sent.
 *	If the message is JOIN_JOB, the following information is also
 *	encoded and sent:
 *	    number of nodes	int
 *	    stdout port		int
 *	    stderr port		int
 *	    cred type		int
 *		<if cred len > 0>
 *		credential	string
 *	    jobattrs		attrl
 *
 * @param[in]	com    - IM message type: IM_JOIN_JOB or IM_RESTART
 * @param[in]	ep     - pointer to associated event
 * @param[in]	nth    - index of host entry (job's node number for this sister)
 * @param[in]	pjob   - pointer to job structure for job to be run
 * @param[in]	phead  - pointer to pbs_list_head of job's encoded attributes
 */

void
send_join_job_restart(int com, eventent *ep, int nth, job *pjob, pbs_list_head *phead)
{
	size_t mycred_len = 0;
	char *mycred_buf = NULL;
	hnodent *np;
	svrattrl *psatl;
	int stream;

	if (pjob->ji_hosts == NULL)
		return;

	/* find the "nth" hnodent (host entry) of the job and stream to it */
	np = &pjob->ji_hosts[nth];
	stream = np->hn_stream;

	/* send message header */
	im_compose(stream, pjob->ji_qs.ji_jobid,
		   get_jattr_str(pjob, JOB_ATR_Cookie),
		   com, ep->ee_event, TM_NULL_TASK, IM_OLD_PROTOCOL_VER);

	if (com == IM_JOIN_JOB) {
		/* for JOIN_JOB send body of message */
		(void) get_credential(np->hn_host, pjob,
				      PBS_GC_EXEC, &mycred_buf, &mycred_len);

		(void) diswsi(stream, pjob->ji_numnodes);
		(void) diswsi(stream, pjob->ji_ports[0]);
		(void) diswsi(stream, pjob->ji_ports[1]);
		(void) diswsi(stream, pjob->ji_extended.ji_ext.ji_credtype);
		if (mycred_len > 0) {
			(void) diswcs(stream,
				      mycred_buf, mycred_len);
			free(mycred_buf);
		}

		psatl = (svrattrl *) GET_NEXT(*phead);
		(void) encode_DIS_svrattrl(stream, psatl);
	}
	dis_flush(stream);
}

/**
 * @brief
 *	send_join_job_restart - send the JOIN_JOB or RESTART message from
 *	a Mother Superior to a Sister.
 *
 * @par Functionality:
 *	The message header is composed and sent.
 *	If the message is JOIN_JOB, the following information is also
 *	encoded and sent:
 *	    number of nodes	int
 *	    stdout port		int
 *	    stderr port		int
 *	    cred type		int
 *		<if cred len > 0>
 *		credential	string
 *	    jobattrs		attrl
 *
 * @param[in]   mtfd   - The TPP multicast stream descriptor
 * @param[in]	com    - IM message type: IM_JOIN_JOB or IM_RESTART
 * @param[in]	ep     - pointer to associated event
 * @param[in]	nth    - index of host entry (job's node number for this sister)
 * @param[in]	pjob   - pointer to job structure for job to be run
 * @param[in]	phead  - pointer to list_head of job's encoded attributes
 */

void
send_join_job_restart_mcast(int mtfd, int com, eventent *ep, int nth, job *pjob, pbs_list_head *phead)
{
	size_t mycred_len = 0;
	char *mycred_buf = NULL;
	hnodent *np;
	svrattrl *psatl;
	int stream;

	if (pjob->ji_hosts == NULL)
		return;

	/* find the "nth" hnodent (host entry) of the job and stream to it */
	np = &pjob->ji_hosts[nth];
	stream = mtfd;

	/* send message header */
	im_compose(stream, pjob->ji_qs.ji_jobid,
		   get_jattr_str(pjob, JOB_ATR_Cookie),
		   com, ep->ee_event, TM_NULL_TASK, IM_OLD_PROTOCOL_VER);

	if (com == IM_JOIN_JOB) {
		/* for JOIN_JOB send body of message */
		(void) get_credential(np->hn_host, pjob,
				      PBS_GC_EXEC, &mycred_buf, &mycred_len);

		(void) diswsi(stream, pjob->ji_numnodes);
		(void) diswsi(stream, pjob->ji_ports[0]);
		(void) diswsi(stream, pjob->ji_ports[1]);
		(void) diswsi(stream, pjob->ji_extended.ji_ext.ji_credtype);
		if (mycred_len > 0) {
			(void) diswcs(stream,
				      mycred_buf, mycred_len);
			free(mycred_buf);
		}

		psatl = (svrattrl *) GET_NEXT(*phead);
		(void) encode_DIS_svrattrl(stream, psatl);
	}
	dis_flush(stream);
}
