/*
 * Copyright (C) 1994-2021 Altair Engineering, Inc.
 * For more information, contact Altair at www.altair.com.
 *
 * This file is part of both the OpenPBS software ("OpenPBS")
 * and the PBS Professional ("PBS Pro") software.
 *
 * Open Source License Information:
 *
 * OpenPBS is free software. You can redistribute it and/or modify it under
 * the terms of the GNU Affero General Public License as published by the
 * Free Software Foundation, either version 3 of the License, or (at your
 * option) any later version.
 *
 * OpenPBS is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public
 * License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Commercial License Information:
 *
 * PBS Pro is commercially licensed software that shares a common core with
 * the OpenPBS software.  For a copy of the commercial license terms and
 * conditions, go to: (http://www.pbspro.com/agreement.html) or contact the
 * Altair Legal Department.
 *
 * Altair's dual-license business model allows companies, individuals, and
 * organizations to create proprietary derivative works of OpenPBS and
 * distribute them - whether embedded or bundled with other software -
 * under a commercial license agreement.
 *
 * Use of Altair's trademarks, including but not limited to "PBS™",
 * "OpenPBS®", "PBS Professional®", and "PBS Pro™" and Altair's logos is
 * subject to Altair's trademark licensing policies.
 */

#include <pbs_config.h> /* the master config generated by configure */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <dirent.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <pwd.h>
#include <grp.h>
#include <string.h>
#include <limits.h>
#include <assert.h>
#include <signal.h>
#include <termios.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <fcntl.h>

#if defined(__osf__)
#include <stropts.h>
#endif

#include "libpbs.h"
#include "portability.h"
#include "list_link.h"
#include "server_limits.h"
#include "attribute.h"
#include "resource.h"
#include "job.h"
#include "log.h"
#include "tpp.h"
#include "dis.h"
#include "pbs_nodes.h"
#include "mom_mach.h"
#include "pbs_error.h"
#include "net_connect.h"
#include "batch_request.h"
#include "mom_func.h"
#include "pbs_ifl.h"
#include "port_forwarding.h"

#include "credential.h"
#include "ticket.h"
#include "svrfunc.h"
#include "libsec.h"
#include "mom_hook_func.h"
#include "mom_server.h"
#include "placementsets.h"
#include "pbs_internal.h"
#include "pbs_reliable.h"

#include "renew_creds.h"

#include "mock_run.h"

#define PIPE_READ_TIMEOUT 5
#define EXTRA_ENV_PTRS 32

/* Global Variables */

extern char mom_host[];
extern int num_var_env;
extern char **environ;
extern int exiting_tasks;
extern u_long localaddr;
extern int lockfds;
extern pbs_list_head mom_polljobs;
extern int next_sample_time;
extern int min_check_poll;
extern char *path_jobs;
extern char *path_prolog;
extern char *path_spool;
extern unsigned int pbs_rm_port;
extern gid_t pbsgroup;
extern int server_stream;
extern unsigned int pbs_mom_port;
extern time_t time_now;
extern time_t time_resc_updated;
extern char *path_hooks_workdir;
extern long joinjob_alarm_time;
extern long job_launch_delay;
int mom_reader_go; /* see catchinter() & mom_writer() */

extern int x11_reader_go;
extern int enable_exechost2;
extern char *msg_err_malloc;
extern unsigned char pbs_aes_key[][16];
extern unsigned char pbs_aes_iv[][16];

int ptc = -1; /* fd for master pty */
#include <poll.h>
#ifdef RLIM64_INFINITY
extern struct rlimit64 orig_nproc_limit;
extern struct rlimit64 orig_core_limit;
#else
extern struct rlimit orig_nproc_limit;
extern struct rlimit orig_core_limit;
#endif /* RLIM64... */

extern eventent *event_dup(eventent *ep, job *pjob, hnodent *pnode);
extern void send_join_job_restart_mcast(int mtfd, int com, eventent *ep, int nth, job *pjob, pbs_list_head *phead);

/* Local Varibles */

static int script_in;	/* script file, will be stdin	  */
static pid_t writerpid; /* writer side of interactive job */
static pid_t shellpid;	/* shell part of interactive job  */
static size_t cred_len;
static char *cred_buf;

char *variables_else[] = {/* variables to add, value computed */
			  "HOME",
			  "LOGNAME",
			  "PBS_JOBNAME",
			  "PBS_JOBID",
			  "PBS_QUEUE",
			  "SHELL",
			  "USER",
			  "PBS_JOBCOOKIE",
			  "PBS_NODENUM",
			  "PBS_TASKNUM",
			  "PBS_MOMPORT",
			  "PBS_NODEFILE",
			  "OMP_NUM_THREADS",
			  "PBS_ACCOUNT",
			  "PBS_ARRAY_INDEX",
			  "PBS_ARRAY_ID"};

static int num_var_else = sizeof(variables_else) / sizeof(char *);
static void catchinter(int);

extern int is_direct_write(job *, enum job_file, char *, int *);
static int direct_write_possible = 1;

void
starter_return(int upfds, int downfds, int code,
	       struct startjob_rtn *);

#define FDMOVE(fd)                                \
	if (fd < 3) {                             \
		int hold = fcntl(fd, F_DUPFD, 3); \
		(void) close(fd);                 \
		fd = hold;                        \
	}

/**
 * @brief
 * 	Internal error routine.
 *
 * @param[in] string - error related to
 * @param[in] value - error number
 *
 * @return 	int
 * @retval	error number
 *
 */
int
error(char *string, int value)
{
	int i = 0;
	char *message;
	extern char *msg_momsetlim;
	extern struct pbs_err_to_txt pbs_err_to_txt[];

	assert(string != NULL);
	assert(*string != '\0');
	assert(value > PBSE_);		  /* minimum PBS error number */
	assert(value <= PBSE_NOSYNCMSTR); /* maximum PBS error number */
	assert(pbs_err_to_txt[i].err_no != 0);

	do {
		if (pbs_err_to_txt[i].err_no == value)
			break;
	} while (pbs_err_to_txt[++i].err_no != 0);

	assert(pbs_err_to_txt[i].err_txt != NULL);
	message = *pbs_err_to_txt[i].err_txt;
	assert(message != NULL);
	assert(*message != '\0');

	if (value == PBSE_SYSTEM) {
		strcpy(log_buffer, message);
		strcat(log_buffer, strerror(errno));
		message = log_buffer;
	}
	(void) fprintf(stderr, msg_momsetlim, string, message);
	(void) fflush(stderr);

	return value;
}

/**
 * @brief
 * 	no_hang() - interrupt handler for alarm() around attempt to connect
 *	to qsub for interactive jobs.   If qsub hung or suspended or if the
 *	network is fouled up, mom cannot afford to wait forever.
 *
 * @param[in] sig - signal number
 *
 * @return 	Void
 *
 */

static void
no_hang(int sig)
{
	log_event(PBSEVENT_JOB, PBS_EVENTCLASS_REQUEST, LOG_DEBUG, " ",
		  "alarm timed-out connect to qsub");
}

/**
 * @brief
 *	validate credentials of user for job.
 *
 * @param[in] pjob - job pointer
 *
 * @return	pointer to structure
 * @retval	structure handle to passwd
 *
 */

struct passwd *
check_pwd(job *pjob)
{
	struct passwd *pwdp;
	struct group *grpp;
	struct stat sb;
	attribute *jb_group;

	pwdp = getpwnam(get_jattr_str(pjob, JOB_ATR_euser));
	if (pwdp == NULL) {
		(void) sprintf(log_buffer, "No Password Entry for User %s",
			       get_jattr_str(pjob, JOB_ATR_euser));
		return NULL;
	}
	/* check that home directory is valid */
	if (*pwdp->pw_dir == '\0') {
		sprintf(log_buffer, "null home directory");
		return NULL;
	}
	if (pjob->ji_grpcache == NULL) {
		pjob->ji_grpcache = malloc(sizeof(struct grpcache) +
					   strlen(pwdp->pw_dir) + 1);
		if (pjob->ji_grpcache == NULL) {
			sprintf(log_buffer, "Malloc failed");
			return NULL;
		}
		if (stat(pwdp->pw_dir, &sb) == -1) {
			sprintf(log_buffer, "%s: home directory: %s",
				pjob->ji_qs.ji_jobid, pwdp->pw_dir);
			log_err(errno, "check_pwd", log_buffer);
		}
		strcpy(pjob->ji_grpcache->gc_homedir, pwdp->pw_dir);
	}

	pjob->ji_grpcache->gc_uid = pwdp->pw_uid;  /* execution uid */
	pjob->ji_grpcache->gc_rgid = pwdp->pw_gid; /* real user gid */

	/* get the group and supplimentary under which the job is to be run */

	jb_group = get_jattr(pjob, JOB_ATR_egroup);
	if ((jb_group->at_flags & (ATR_VFLAG_SET | ATR_VFLAG_DEFLT)) == ATR_VFLAG_SET) {

		/* execution group specified - not defaulting to login group */

		grpp = getgrnam(get_jattr_str(pjob, JOB_ATR_egroup));
		if (grpp == NULL) {
			(void) sprintf(log_buffer, "No Group Entry for Group %s",
				       get_jattr_str(pjob, JOB_ATR_egroup));
			return NULL;
		}
		if (grpp->gr_gid != pwdp->pw_gid) {
			char **pgnam;

			pgnam = grpp->gr_mem;
			while (*pgnam) {
				if (!strcmp(*pgnam, pwdp->pw_name))
					break;
				++pgnam;
			}
			if (*pgnam == 0) {
				(void) sprintf(log_buffer, "user not in group");
				return NULL;
			}
		}
		pjob->ji_grpcache->gc_gid = grpp->gr_gid;
	} else {
		/* default to login group */
		pjob->ji_grpcache->gc_gid = pwdp->pw_gid;
	}

	/* perform site specific check on validatity of account */
	if (site_mom_chkuser(pjob))
		return NULL;

	return pwdp;
}

/**
 * @brief
 *	writepipe() - writes to pipe
 *
 * @param[in] pfd - file descriptor
 * @param[in] vptr - content to be written
 * @param[in] nbytes - length of content
 *
 * @return	ssize_t
 * @retval	-1					error
 * @retval	number of bytes written to pipe		success
 *
 */

ssize_t
writepipe(int pfd, void *vptr, size_t nbytes)
{
	size_t nleft;
	char *ptr = vptr; /* so we can do pointer arithmetic */

	nleft = nbytes;
	while (nleft > 0) {
		ssize_t nwritten;
		nwritten = write(pfd, ptr, nleft);
		if (nwritten == -1) {
			if (errno == EINTR)
				continue;
			else
				return -1;
		}

		nleft -= nwritten;
		ptr += nwritten;
	}
	return nbytes;
}

/**
 * @brief
 *      readpipe() - reads from pipe
 *
 * @param[in] pfd - file descriptor
 * @param[in] vptr - content to be into
 * @param[in] nbytes - length of content
 *
 * @return      ssize_t
 * @retval      -1                                      error
 * @retval      number of bytes read to pipe         success
 *
 */

ssize_t
readpipe(int pfd, void *vptr, size_t nbytes)
{
	size_t nleft;
	char *ptr = vptr; /* so we can do pointer arithmetic */

	nleft = nbytes;
	while (nleft > 0) {
		ssize_t nread;
		nread = read(pfd, ptr, nleft);
		if (nread == -1) {
			if (errno == EINTR)
				continue;
			else
				return -1;
		}
		if (nread == 0)
			break;

		nleft -= nread;
		ptr += nread;
	}
	return (nbytes - nleft);
}

/**
 * @brief
 *	exec_bail - called when the start of a job fails to clean up
 *
 * @par Functionality:
 *	Logs the message if one is passed in.
 *	Sends IM_ABORT_JOB to the sisters.
 *	sets the job's substate to JOB_SUBSTATE_EXITING, sets the job's
 *	exit code and sets exiting_tasks so an obit is sent for the job.
 *	The job's standard out/err are closed and then resources are released.
 *
 * @param[in]	pjob - pointer to job structure
 * @param[in]	code - the error code for the exit value, typically JOB_EXEC_*
 * @param[in]	txt  - a message to log or NULL if none or already logged
 *
 * @return	None
 *
 * @par MT-safe: likely no
 *
 */

void
exec_bail(job *pjob, int code, char *txt)
{
	int nodes;
	mom_hook_input_t hook_input;
	mom_hook_output_t hook_output;
	int hook_errcode = 0;
	hook *last_phook = NULL;
	unsigned int hook_fail_action = 0;
	char hook_msg[HOOK_MSG_SIZE + 1];

	/* log message passed in if one was */
	if (txt != NULL) {
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ERR,
			  pjob->ji_qs.ji_jobid, txt);
	}

	mom_hook_input_init(&hook_input);
	hook_input.pjob = pjob;

	mom_hook_output_init(&hook_output);
	hook_output.reject_errcode = &hook_errcode;
	hook_output.last_phook = &last_phook;
	hook_output.fail_action = &hook_fail_action;

	(void) mom_process_hooks(HOOK_EVENT_EXECJOB_ABORT,
				 PBS_MOM_SERVICE_NAME, mom_host,
				 &hook_input, &hook_output, hook_msg,
				 sizeof(hook_msg), 1);

	nodes = send_sisters(pjob, IM_ABORT_JOB, NULL);
	if (nodes != pjob->ji_numnodes - 1) {
		sprintf(log_buffer,
			"sent %d ABORT requests, should be %d",
			nodes, pjob->ji_numnodes - 1);
		log_joberr(-1, __func__, log_buffer, pjob->ji_qs.ji_jobid);
	}
	set_job_substate(pjob, JOB_SUBSTATE_EXITING);
	pjob->ji_qs.ji_un.ji_momt.ji_exitstat = code;
	exiting_tasks = 1;
	if (pjob->ji_stdout > 0)
		(void) close(pjob->ji_stdout);
	if (pjob->ji_stderr > 0)
		(void) close(pjob->ji_stderr);

	if (job_clean_extra != NULL) {
		(void) job_clean_extra(pjob);
	}
}

#define RETRY 3

/**
 * @brief
 *	opens the demux
 *
 * @param[in] addr - ip address
 * @param[in] port - port number
 *
 * @return 	int
 * @retval	-1		Error
 * @retval	socket number	Success
 *
 */

int
open_demux(u_long addr, int port)
{
	int sock;
	int i;
	struct sockaddr_in remote;

	remote.sin_addr.s_addr = addr;
	remote.sin_port = htons((unsigned short) port);
	remote.sin_family = AF_INET;

	if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
		sprintf(log_buffer, "%s: socket %s", __func__, netaddr(&remote));
		log_err(errno, __func__, log_buffer);
		return -1;
	}

	for (i = 0; i < RETRY; i++) {
		if (connect(sock, (struct sockaddr *) &remote,
			    sizeof(remote)) == 0)
			return sock;

		switch (errno) {

			case EINTR:
			case EADDRINUSE:
			case ETIMEDOUT:
			case ECONNREFUSED:
				sleep(2);
				continue;

			default:
				break;
		}
		break;
	}
	sprintf(log_buffer, "%s: connect %s", __func__, netaddr(&remote));
	log_err(errno, __func__, log_buffer);
	(void) close(sock);
	return -1;
}

/**
 * @brief
 * 	open_pty - open slave side of master/slave pty
 *
 * @param[in] pjob - job pointer
 *
 * @retval	int
 * @retval 	pty descriptor	Success
 *
 */

static int
open_pty(job *pjob)
{
	char *name;
	int pts;

	/* Open the slave pty as the controlling tty */

	name = get_jattr_str(pjob, JOB_ATR_outpath);

	if ((pts = open(name, O_RDWR, 0600)) < 0) {
		sprintf(log_buffer, "open_pty(%s): cannot open slave", name);
		log_err(errno, "open_pty", log_buffer);
	} else {

		FDMOVE(pts);

		if (fchmod(pts, 0620) == -1) 
			log_errf(-1, __func__, "fchmod failed. ERR : %s",strerror(errno));				
		if (fchown(pts, pjob->ji_qs.ji_un.ji_momt.ji_exuid,
			      pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1) 
			log_errf(-1, __func__, "fchown failed. ERR : %s",strerror(errno));		
#if defined(__osf__)
		(void) ioctl(pts, TIOCSCTTY, 0); /* make controlling */
#endif
	}
	return (pts);
}

/**
 * @brief
 * 	is_joined - determine if stdard out and stardard error are joined together
 *	(-j option) and if so which is first
 *
 * @param[in] pjob - job pointer
 *
 * @return 	int
 * @retval	0	no join, separate files
 * @retval  	+1	joined as stdout
 * @retval  	-1 	joined as stderr
 *
 */
int
is_joined(job *pjob)
{
	char *join;

	if (is_jattr_set(pjob, JOB_ATR_join)) {
		join = get_jattr_str(pjob, JOB_ATR_join);
		if (join[0] != 'n') {
			if (join[0] == 'o' && strchr(join, (int) 'e') != 0)
				return 1;
			else if (join[0] == 'e' && strchr(join, (int) 'e') != 0)
				return -1;
		}
	}
	return 0;
}

/**
 * @brief
 * 	open_std_out_err - open standard out and err to files
 *
 * @param[in] pjob - job pointer
 *
 * @return	int
 * @retval	0	Success
 * @retval	-1	Error
 *
 */

static int
open_std_out_err(job *pjob)
{
	int i;
	int file_out = -2;
	int file_err = -2;
	int filemode = O_CREAT | O_WRONLY | O_APPEND;
	direct_write_possible = 1;

	/* if std out/err joined (set and !="n"),which file is first */

	i = is_joined(pjob);
	if (i == 1) {
		file_out = open_std_file(pjob, StdOut, filemode,
					 pjob->ji_qs.ji_un.ji_momt.ji_exgid);
		file_err = dup(file_out);
	} else if (i == -1) {
		file_err = open_std_file(pjob, StdErr, filemode,
					 pjob->ji_qs.ji_un.ji_momt.ji_exgid);
		file_out = dup(file_err);
	}

	if (file_out == -2)
		file_out = open_std_file(pjob, StdOut, filemode,
					 pjob->ji_qs.ji_un.ji_momt.ji_exgid);
	if (file_err == -2)
		file_err = open_std_file(pjob, StdErr, filemode,
					 pjob->ji_qs.ji_un.ji_momt.ji_exgid);
	if ((file_out < 0 || file_err < 0)) {
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
			  pjob->ji_qs.ji_jobid,
			  "Unable to open standard output/error");
		return -1;
	}

	if (!direct_write_possible && direct_write_requested(pjob)) {
		sprintf(log_buffer,
			"Direct write is requested for job: %s, but the destination is not usecp-able from %s\n",
			pjob->ji_qs.ji_jobid, pjob->ji_hosts[pjob->ji_nodeid].hn_host);
		if (write(file_err, log_buffer, strlen(log_buffer)) == -1) 
			log_errf(-1, __func__, "write failed. ERR : %s",strerror(errno));			
	}

	FDMOVE(file_out); /* make sure descriptor > 2       */
	FDMOVE(file_err); /* so don't clobber stdin/out/err */
	if (file_out != 1) {
		(void) close(1);
		if (dup(file_out) == -1) 
			log_errf(-1, __func__, "dup failed. ERR : %s",strerror(errno));		
		(void) close(file_out);
	}
	if (file_err != 2) {
		(void) close(2);
		if (dup(file_err) == -1) 
			log_errf(-1, __func__, "dup failed. ERR : %s",strerror(errno));		
		(void) close(file_err);
	}
	return 0;
}
#ifdef NAS /* localmod 010 */

/**
 * @brief
 * 	NAS_tmpdirname - build NAS version of temporary directory name
 * 	Modifies pbs_tmpdir to append user name
 *
 * @param[in] pjob - job pointer
 *
 * @return 	string
 * @retval	temp directory name
 *
 */

char *
NAS_tmpdirname(job *pjob)
{
	char *ss;

	ss = strstr(pbs_tmpdir, "//");
	if (ss != NULL)
		strcpy(ss + 2, get_jattr_str(pjob, JOB_ATR_euser));

	return tmpdirname(pjob->ji_qs.ji_jobid);
}
#endif /* localmod 010 */

/**
 * @brief
 * 	tmpdirname - build a temporary directory name
 *
 * @param[in] sequence - directory name
 *
 * @return 	string
 * @retval	directory name
 *
 */

char *
tmpdirname(char *sequence)
{
	static char tmpdir[MAXPATHLEN + 1];

	sprintf(tmpdir, "%s/pbs.%s", pbs_tmpdir, sequence);
	return tmpdir;
}

/**
 * @brief
 *	jobdirname - build the staging and execution directory name
 *	with a random number tagged onto the end
 *
 * @param[in] sequence - directory name
 * @param[in] homedir - home dirctory
 *
 * @return	string
 * @retval	job directory name	success
 *
 */

char *
jobdirname(char *sequence, char *homedir)
{
	static char dir[MAXPATHLEN + 1];

	/* this will be implemented/used in phase II
	 **	static char unique[5+1];
	 **	unsigned int seed = (unsigned int)time(NULL);
	 **
	 **	for (i = 0; i < 5; i++) {
	 **		srandom(seed);
	 **		tempnum = random(NULL);
	 **		seed = tempnum;
	 **		unique[i] = (tempnum%26) + '0';
	 **	}
	 **
	 **	sprintf(dir, "%s/%s.%s", pbs_jobdir_root, sequence, unique);
	 */

	if ((pbs_jobdir_root[0] != '\0') && (strcmp(pbs_jobdir_root, JOBDIR_DEFAULT) != 0)) {
		sprintf(dir, "%s/pbs.%s.%s", pbs_jobdir_root, sequence, FAKE_RANDOM);
	} else if ((homedir != NULL) && (*homedir != '\0')) {
		/*
		 * jobdir_root was not set in mom_priv/config file
		 * so use the given homedir
		 */
		sprintf(dir, "%s/pbs.%s.%s", homedir, sequence, FAKE_RANDOM);
	} else {
		/* last resort, use default tmp dir */
		sprintf(dir, "%s/pbs.%s.%s", pbs_tmpdir, sequence, FAKE_RANDOM);
	}

	return dir;
}

/**
 * @brief
 * 	mktmpdir - make temporary directory(s)
 *	A temporary directory is created and the name is
 *	placed in an environment variable.
 *
 * @param[in] jobid - job id
 * @param[in] uid - user id
 * @param[in] gid - group id
 * @param[in] vtab - pointer to variable table
 *
 * @return	int
 * @retval	0		Success
 * @retval 	JOB_EXEC_FAIL1 	failure to make directory
 *
 */

int
mktmpdir(char *jobid, uid_t uid, gid_t gid, struct var_table *vtab)
{
	char *tmpdir;

	tmpdir = tmpdirname(jobid);
	errno = 0;
	if (mkdir(tmpdir, 0700) == -1) {
		int tmp_errno;

		if ((tmp_errno = errno) != EEXIST) {
			sprintf(log_buffer, "mkdir: %s", tmpdir);
			log_joberr(tmp_errno, __func__, log_buffer, jobid);
			return JOB_EXEC_FAIL1;
		} else if (tmp_errno == EEXIST) {
			struct stat statbuf;
			if (lstat(tmpdir, &statbuf) == -1) {
				sprintf(log_buffer, "%s: lstat : %s", jobid, tmpdir);
				log_joberr(tmp_errno, __func__, log_buffer, jobid);
				return JOB_EXEC_FAIL1;
			}
			if (!S_ISDIR(statbuf.st_mode)) { /* Not a directory */
				sprintf(log_buffer, "mkdir: %s is already available: possible attempted security breach by %d:%d(uid:gid of job tmpdir)",
					tmpdir, statbuf.st_uid, statbuf.st_gid);
				log_joberr(tmp_errno, __func__, log_buffer, jobid);
				return JOB_EXEC_FAIL_SECURITY;
			} else if (!((statbuf.st_uid == uid || statbuf.st_uid == 0) && (statbuf.st_gid == gid || statbuf.st_gid == 0))) {
				sprintf(log_buffer, "mkdir: %s is already available: possible attempted security breach by %d:%d(uid:gid of job tmpdir)",
					tmpdir, statbuf.st_uid, statbuf.st_gid);
				log_joberr(tmp_errno, __func__, log_buffer, jobid);
				return JOB_EXEC_FAIL_SECURITY;
			}
		}
	}
	/* Explicitly call chmod because umask affects mkdir() */
	if (chmod(tmpdir, 0700) == -1) {
		sprintf(log_buffer, "chmod: %s", tmpdir);
		log_joberr(errno, __func__, log_buffer, jobid);
		return JOB_EXEC_FAIL1;
	}
	if (chown(tmpdir, uid, gid) == -1) {
		sprintf(log_buffer, "chown: %s", tmpdir);
		log_joberr(errno, __func__, log_buffer, jobid);
		return JOB_EXEC_FAIL1;
	}
	/* Only set TMPDIR if everything succeeded to this point. */
	if (vtab) {
		bld_env_variables(vtab, "TMPDIR", tmpdir);
	}
	return 0;
}

/**
 * @brief
 *	Make the staging and execution directory with what ever
 *	privileges are currently set,  may be root or may be user.
 *	This function is a helper task for mkjobdir() below.
 *
 * @param[in] jobid - the job id string
 * @param[in] jobdir - the full path to the sandox (working directory to make
 *
 * @return int
 * @retval JOB_EXEC_FAIL1 failure to make directory
 * @retval  0 success
 *
 */
static int
internal_mkjobdir(char *jobid, char *jobdir)
{
	if (mkdir(jobdir, 0700) == -1) {
		if (errno == EEXIST) {
			sprintf(log_buffer, "the staging and execution directory %s already exists", jobdir);
			log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_INFO, jobid, log_buffer);
		} else {
			sprintf(log_buffer, "mkdir: %s", jobdir);
			log_joberr(errno, __func__, log_buffer, jobid);
			return JOB_EXEC_FAIL1;
		}
	}
	if (chmod(jobdir, 0700) == -1) {
		sprintf(log_buffer, "unable to change permissions on staging and execution directory %s", jobdir);
		log_event(PBSEVENT_DEBUG, PBS_EVENTCLASS_JOB, LOG_INFO, jobid, log_buffer);
	}
	return 0;
}

/**
 * @brief
 * 	Impersonate the user by changing effective uid and gid.
 *
 * @param[in] uid - user id
 * @param[in] gid - group id
 *
 * @return 	int
 * @retval	0	Success
 * @retval	-1	Error
 *
 */

int
impersonate_user(uid_t uid, gid_t gid)
{
#if defined(HAVE_GETPWUID) && defined(HAVE_INITGROUPS)
	struct passwd *pwd = getpwuid(uid);
	if (pwd == NULL)
		return -1;

	if ((geteuid() != uid) &&
	    (initgroups(pwd->pw_name, gid) == -1)) {
		return -1;
	}

#endif
#if defined(HAVE_SETEUID) && defined(HAVE_SETEGID)
	/* most systems */
	if ((setegid(gid) == -1) ||
	    (seteuid(uid) == -1)) {
		if (setegid(pbsgroup) == -1) 
			log_errf(-1, __func__, "setegid to pbs group failed. ERR : %s",strerror(errno));		
		return -1;
	}
#elif defined(HAVE_SETRESUID) && defined(HAVE_SETRESGID)
	if ((setresgid(-1, gid, -1) == -1) ||
	    (setresuid(-1, uid, -1) == -1)) {
		(void) setresgid(-1, pbsgroup, -1);
		return -1;
	}
#else
#error No function to change effective UID or GID
#endif
	return 0;
}

void
revert_from_user(void)
{
#if defined(HAVE_SETEUID)
	/* most systems */
	if (seteuid(0) == -1) 
		log_errf(-1, __func__, "seteuid failed. ERR : %s",strerror(errno));	
#elif defined(HAVE_SETRESUID)
	(void) setresuid(-1, 0, -1);
#else
#error No function to change effective UID
#endif
#if defined(HAVE_INITGROUPS)
	(void) initgroups("root", pbsgroup);
#endif
#if defined(HAVE_SETEGID)
	if (setegid(pbsgroup) == -1) 
		log_errf(-1, __func__, "setegid to pbs group failed. ERR : %s",strerror(errno));		
#elif defined(HAVE_SETRESGID)
	(void) setresgid(-1, pbsgroup, -1);
#else
#error No function to change effective GID
#endif
}

/**
 * @brief
 *	Make the staging and execution directory for the job.
 *
 * @par If the root of the working directory (sandbox) is the User's home
 *	directory, make the directory as the User in case root has no access.
 *	Otherwise, it is being made in a admin specified secure root owned
 *	location, "job_dir_root" and should be made as root and then the
 *	ownship changed.
 * @par The global character array pbs_jobdir_root is set to a non null string
 *	if the base for the sandbox is not to be the User's home directory.
 *
 * @param[in] jobid - the job id string, i.e. "123.server"
 * @param[in] jobdir - the full path to the sandox (working directory to make
 * @param[in] uid    - the user id of the user under which the job will run
 * @param[in] gid    - the group id of the user under which the job will run
 *
 * @return int
 * @retval -1 failure to make directory
 * @retval  0 success
 *
 */

/**
 * @brief
 * 	mkjobdir - make the staging and execution directory
 *	A per-job staging and execution directory is created.
 *	If the parent of the directory is the user's home, it is made while
 *	operating with the user's privilege.  Otherwise, it is made as root
 *	and then changed as it would be in "job_dir_root" which is root owned.
 *
 * @param[in] jobid - job id
 * @param[in] jobdir - job directory
 * @param[in] uid - user id
 * @param[in] gid - group id
 *
 * @return 	int
 * @retval	0	Success
 * @retval	-1	Error
 *
 */

int
mkjobdir(char *jobid, char *jobdir, uid_t uid, gid_t gid)
{
	int rc;

	if ((pbs_jobdir_root[0] != '\0') && (strcmp(pbs_jobdir_root, JOBDIR_DEFAULT) != 0)) {

		/* making the directory as root in a secure root owned dir */

		if ((rc = internal_mkjobdir(jobid, jobdir)) != 0)
			return (rc);

		/* now change ownership to the user */
		if (chown(jobdir, uid, gid) == -1) {
			sprintf(log_buffer, "chown: %s", jobdir);
			log_joberr(errno, __func__, log_buffer, jobid);
			return JOB_EXEC_FAIL1;
		}
	} else {

		/* making the directory in the user's home, do it as user */

		if (impersonate_user(uid, gid) == -1)
			return -1;

		/* make the directory */
		rc = internal_mkjobdir(jobid, jobdir);

		/* go back to being root */

		revert_from_user();

		if (rc != 0)
			return (rc);
	}

	/*
	 * success.  log a message that shows the name of the
	 * staging and execution dir
	 */
	sprintf(log_buffer, "created the job directory %s", jobdir);
	log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, jobid, log_buffer);
	return 0;
}

/**
 * @brief
 * 	rmtmpdir - remove the temporary directory
 *	This may take awhile so the task is forked and execed to another
 *	process.
 *
 * @param[in] jobid - job id
 *
 * @return	Void
 *
 */

void
rmtmpdir(char *jobid)
{
	static char rmdir[MAXPATHLEN + 1];
	struct stat sb;
	pid_t pid;
	char *rm = "/bin/rm";
	char *rf = "-rf";
	char *tmpdir;
	char *newdir = rmdir;

	/* Hello, is any body there? */
	tmpdir = tmpdirname(jobid);
	if (stat(tmpdir, &sb) == -1) {
		if (errno != ENOENT) {
			sprintf(log_buffer, "stat: %s", tmpdir);
			log_joberr(errno, __func__, log_buffer, jobid);
		}
		return;
	}

	sprintf(rmdir, "%s/pbs_remove.%s", pbs_tmpdir, jobid);
	if (rename(tmpdir, newdir) == -1) {
		char *msgbuf;

		pbs_asprintf(&msgbuf, "%s %s", tmpdir, newdir);
		log_joberr(errno, __func__, msgbuf, jobid);
		free(msgbuf);
		newdir = tmpdir;
	}

	/* fork and exec the cleantmp process */
	pid = fork();
	if (pid < 0) {
		log_err(errno, __func__, "fork");
		return;
	}

	if (pid > 0) /* parent */
		return;

	tpp_terminate();
	execl(rm, "pbs_cleandir", rf, newdir, NULL);
	log_err(errno, __func__, "execl");
	exit(21);
}

/**
 * @brief
 *	returns shell name
 *
 * @param[in] shell - shellname
 *
 * @return 	string
 * @retval	shell name
 *
 */

char *
lastname(char *shell)
{
	char *shellname;

	shellname = strrchr(shell, '/');
	if (shellname)
		shellname++; /* go past last '/' */
	else
		shellname = shell;
	return shellname;
}

/**
 * @brief
 *	Become the user with specified user name, uid, and gids.
 *	Obtains the current supplement group list and if necessary adds
 *	the user's login group to it,  then changes to the specified group,
 *	new group list, and the specified uid.
 *
 * @param[in] eusrname - the execution user name
 * @param[in] euid     - the execution uid
 * @param[in] egid     - the execution gid
 * @param[in] rgid     - the login (or real) gid of the user
 *
 * @return int
 * @retval 0  - success
 * @retval -1 - failure to change
 *
 */
int
becomeuser_args(char *eusrname, uid_t euid, gid_t egid, gid_t rgid)
{
	gid_t *grplist = NULL;
	static int maxgroups = 0;

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
#if defined(HAVE_LIBKAFS) || defined(HAVE_LIBKOPENAFS)
	int32_t pag = 0;
	pag = getpag();
#endif
#endif

	/* obtain the maximum number of groups possible in the list */
	if (maxgroups == 0)
		maxgroups = (int) sysconf(_SC_NGROUPS_MAX);

	if (initgroups(eusrname, egid) != -1) {
		int numsup;
		int i;

		/* allocate an array for the group list */
		grplist = calloc((size_t) maxgroups, sizeof(gid_t));
		if (grplist == NULL)
			return -1;
		/* get the current list of groups */
		numsup = getgroups(maxgroups, grplist);
		for (i = 0; i < numsup; ++i) {
			if (grplist[i] == rgid)
				break;
		}
		if (i == numsup) {
			/* need to add primary group to list */
			if (numsup == maxgroups) {
				/* cannot, list already at max size */
				free(grplist);
				return -1;
			}
			grplist[numsup++] = rgid;
		}

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
#if defined(HAVE_LIBKAFS) || defined(HAVE_LIBKOPENAFS)
		if (pag)
			grplist[numsup++] = pag;
#endif
#endif

		if ((setgroups((size_t) numsup, grplist) != -1) &&
		    (setgid(egid) != -1) &&
		    (setuid(euid) != -1)) {
			free(grplist);
			return 0;
		}
	}
	if (grplist)
		free(grplist);
	return -1;
}

/**
 * @brief
 *	Become the user using information sent with the job and in the cached
 *	password information in the job structure if available.
 *
 *	Picks up the execution user name from the euser attribute, the euid
 *	and egid from the mom subarea of the job structure and the login gid
 *	from the cached password info if that has been set.  Otherwise use
 *	the egid.
 *
 *	The real work is done by passing the above to becomeuser_args().
 * @see	becomeuser_args
 *
 * @param[in] pjob - pointer to the job structure
 * @return int
 * @retval 0  - success
 * @retval -1 - failure to change
 *
 */

int
becomeuser(job *pjob)
{
	gid_t rgid;
	if (pjob->ji_grpcache != NULL)
		rgid = pjob->ji_grpcache->gc_rgid;
	else
		rgid = pjob->ji_qs.ji_un.ji_momt.ji_exgid;
	if (becomeuser_args(get_jattr_str(pjob, JOB_ATR_euser), pjob->ji_qs.ji_un.ji_momt.ji_exuid, pjob->ji_qs.ji_un.ji_momt.ji_exgid, rgid) == -1) {
		fprintf(stderr, "unable to set user privileges, errno = %d\n",
			errno);
		return -1;
	} else
		return 0;
}

/**
 * @brief
 * 	Expects the current process will invoke some external program,
 * 	and this sets the process to have the special credential
 * 	stored in the job, along with 'shell', arguments array (argarray),
 * 	and 'pjob->ji_env' values.
 *
 * @param[in] pjob - job in question
 * @param[out] shell - if not NULL, filled in with shell to use for future
 *				external program invocations.
 * @param[out] argarray - if not NULL, filled in with argument array to be
 *				used for future external program invocations.
 *
 *	Do the right thing for the type of credential the job has.
 *	We are in a child process which will become a task.
 *
 * @return	int
 * @retval	-1	error
 * @retval	0	Success
 *
 */
int
set_credential(job *pjob, char **shell, char ***argarray)
{
	char **argv;
	static char buf[MAXPATHLEN + 1];
	int ret = 0;
	char *prog = NULL; /* possible new shell */
	char *name;
	int i = 0;
	int j;
	int num = 0;
	int fds[2];

	if ((argarray != NULL) && (*argarray != NULL)) {
		while ((*argarray)[num] != NULL) {
			num++;
		}
	}
	cred_buf = NULL;

	switch (pjob->ji_extended.ji_ext.ji_credtype) {

		case PBS_CREDTYPE_NONE:
			argv = (char **) calloc(2 + num, sizeof(char *));
			assert(argv != NULL);

			/* construct argv array */
			if (shell != NULL) {
				prog = *shell;
				name = lastname(*shell);
				argv[i] = malloc(strlen(name) + 2);
				assert(argv[i] != NULL);
				strcpy(argv[i], "-");
				strcat(argv[i++], name);
				/* copy remaining command line args 1..end, skip 0 */
				if (num >= 2) { /* num=# of !NULL argarray entries */
					for (j = 1; (*argarray)[j]; j++)
						argv[i++] = (*argarray)[j];
				}
			}
			ret = becomeuser(pjob);
			break;

		case PBS_CREDTYPE_AES:
			/* there are 3 set argv[] entries below, so need to alloc */
			/* 3+1 initial slots (+1 for the terminating NULL entry) */
			argv = (char **) calloc(4 + num, sizeof(char *));
			assert(argv != NULL);

			if (read_cred(pjob, &cred_buf, &cred_len) != 0)
				break;

			ret = becomeuser(pjob);

			if (pipe(fds) == -1) {
				log_err(errno, __func__, "pipe");
				break;
			}

			name = NULL;
			if (pbs_decrypt_pwd(cred_buf, PBS_CREDTYPE_AES, cred_len, &name, (const unsigned char *) pbs_aes_key, (const unsigned char *) pbs_aes_iv) != 0) {
				log_joberr(-1, __func__, "decrypt_pwd", pjob->ji_qs.ji_jobid);
				close(fds[0]);
			} else if (writepipe(fds[1], name, cred_len) != cred_len) {
				log_err(errno, __func__, "pipe write");
				close(fds[0]);
			} else {
				sprintf(buf, "%d", fds[0]);
				bld_env_variables(&pjob->ji_env, "PBS_PWPIPE", buf);
			}
			if (name != NULL) {
				memset(name, 0, cred_len);
				free(name);
			}
			close(fds[1]);

			/* construct argv array */
			if (shell != NULL) {
				prog = *shell;
				name = lastname(*shell);
				argv[i] = malloc(strlen(name) + 2);
				if (argv[i] == NULL)
					break;
				strcpy(argv[i], "-");
				strcat(argv[i++], name);
			}
			break;

		default:
			log_err(errno, __func__, "unknown credential type");
			return -1;
	}

	if (shell == NULL ||  /* only args OR */
	    prog != *shell) { /* we added a program */
		/* copy remaining command line args */
		if (argarray != NULL) {
			if (*argarray != NULL) {
				argv[i++] = (shell == NULL) ? (*argarray)[0] : *shell;
				if (num >= 2) { /* num=# of !NULL argarray entries */
					for (j = 1; (*argarray)[j]; j++) {
						argv[i++] = (*argarray)[j];
					}
				}
			} else {
				argv[i++] = (shell == NULL) ? NULL : *shell;
			}
		}
		if (shell != NULL)
			*shell = prog;
	}
	argv[i++] = NULL;

	if (argarray != NULL) {
		*argarray = argv;
	} else {
		free_str_array(argv);
	}

	if (cred_buf) {
		free(cred_buf);
		cred_buf = NULL;
	}
	return ret;
}

/** @brief
 * 	get_index_and_parent - from the job if of a subjob, return the parent array
 *	job jobid and the index for this subjob. The two returned strings are
 *	in static buffers and must be copied before this is called again.
 *
 * @param[in] jobid - job id
 * @param[out] pparent - parent array job
 * @param[out] pindex - index for subjob
 *
 * @return	Void
 *
 */

void
get_index_and_parent(char *jobid, char **pparent, char **pindex)
{
	char *pd;
	char *pi;
	char *ps;
	static char parent[PBS_MAXSVRJOBID + 1];
	static char index[20];

	ps = jobid;
	pd = parent;
	pi = index;
	while (*ps != '[') /* copy first part of job id */
		*pd++ = *ps++;
	*pd++ = *ps++;	   /* copy in '[' */
	while (*ps != ']') /* copy index  */
		*pi++ = *ps++;
	*pi = '\0';
	while (*ps)
		*pd++ = *ps++;
	*pd = '\0';
	*pparent = parent;
	*pindex = index;
}

/**
 * @brief
 *	creates set up for job.
 *
 * @param[in] pjob - job pointer
 * @param[in] pwdparm - pointer to passwd structure
 *
 * @return	int
 * @retval	JOB_EXEC_FAILUID(-10)	Error
 * @retval	JOB_EXEC_RETRY(-3)	Error
 * @retval	JOB_EXEC_OK(0)		Success
 *
 */

static int
job_setup(job *pjob, struct passwd **pwdparm)
{
	struct passwd *pwdp;
	char *chkpnt;

	/*
	 * get the password entry for the user under which the job is to be run
	 * we do this now to save a few things in the job structure
	 */
	pwdp = check_pwd(pjob);
	if (pwdparm != NULL)
		*pwdparm = pwdp;

	if (pwdp == NULL) {
		log_event(PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB,
			  LOG_ERR, pjob->ji_qs.ji_jobid, log_buffer);
		pjob->ji_qs.ji_stime = time_now; /* for walltime */
		set_jattr_l_slim(pjob, JOB_ATR_stime, time_now, SET);
		return JOB_EXEC_FAILUID;
	}
	pjob->ji_qs.ji_un.ji_momt.ji_exuid = pjob->ji_grpcache->gc_uid;
	pjob->ji_qs.ji_un.ji_momt.ji_exgid = pjob->ji_grpcache->gc_gid;

	/*
	 ** Call job_setup_final if it is available.
	 ** The stream parameter is not used by mother superior.
	 */
	if (job_setup_final != NULL) {
		if (job_setup_final(pjob, -1) != PBSE_NONE)
			return JOB_EXEC_RETRY;
	}

	/*
	 * if certain resource limits require that the job usage be
	 * polled or it is a multinode job, we link the job to mom_polljobs.
	 *
	 * NOTE: we overload the job field ji_jobque for this as it
	 * is not used otherwise by MOM
	 */

	if (pjob->ji_numnodes > 1 || mom_do_poll(pjob))
		if (is_linked(&mom_polljobs, &pjob->ji_jobque) == 0)
			append_link(&mom_polljobs, &pjob->ji_jobque, pjob);

	/* Is the job to be periodic checkpointed */

	pjob->ji_chkpttype = PBS_CHECKPOINT_NONE;
	if (is_jattr_set(pjob, JOB_ATR_chkpnt)) {
		chkpnt = get_jattr_str(pjob, JOB_ATR_chkpnt);
		if ((*chkpnt == 'c') && (*(chkpnt + 1) == '=')) {
			/* has cpu checkpoint time in minutes, convert to seconds */
			pjob->ji_chkpttype = PBS_CHECKPOINT_CPUT;
			pjob->ji_chkpttime = atoi(chkpnt + 2) * 60;
		} else if ((*chkpnt == 'w') && (*(chkpnt + 1) == '=')) {
			/* has checkpoint walltime in minutes, convert to seconds */
			pjob->ji_chkpttype = PBS_CHECKPOINT_WALLT;
			pjob->ji_chkpttime = atoi(chkpnt + 2) * 60;
		}
		pjob->ji_chkptnext = pjob->ji_chkpttime;
	}
	return JOB_EXEC_OK;
}

/**
 * @brief
 *	record_finish_exec - record the results of finish_exec()
 *	primarily the session id of the started job.
 *
 * @par Functionality:
 *	Find the connection table entry associated with the pipe file
 *	descriptor.  This leads to the task and from there to the job
 *	being started.  The starter return information is read from the pipe.
 *	If the read fails, log the fact and requeue the job.
 *	Otherwise, record that the job is now running:
 *	- the session id and global id (if one)
 *	- set the state/substate to RUNNING
 *	- get a first sample of usage for this job and
 *	  return a status update to the Server so it knows the job is going.
 *
 * @param[in]	sd - file descriptor of the pipe on which the job starter
 *		process has written the session id and other info
 *
 * @return	None
 *
 * @par MT-safe: likely no
 *
 */
static void
record_finish_exec(int sd)
{
	conn_t *conn = NULL;
	int i;
	int j;
	job *pjob = NULL;
	pbs_task *ptask;
	struct startjob_rtn sjr;

	if ((conn = get_conn(sd)) == NULL) {
		log_err(PBSE_INTERNAL, __func__, "unable to find pipe");
		return;
	}

	ptask = (pbs_task *) conn->cn_data;
	if (ptask != NULL)
		pjob = ptask->ti_job;
	else {
		/*
		 * Job has been deleted before recording session id.
		 * Read the session information and kill the process.
		 */
		memset(&sjr, 0, sizeof(sjr));
		i = readpipe(sd, &sjr, sizeof(sjr));
		j = errno;
		(void) close_conn(sd);

		if (i == sizeof(sjr))
			kill_session(sjr.sj_session, SIGKILL, 0);
		else {
			sprintf(log_buffer,
				"read of pipe for session information got %d not %d",
				i, (int) sizeof(sjr));
			log_err(j, __func__, log_buffer);
		}

		return;
	}

	if (pjob == NULL) {
		log_err(PBSE_INTERNAL, __func__,
			"no job task associated with connection");
		return;
	}

	/* now we read the session id or error */
	memset(&sjr, 0, sizeof(sjr));
	i = readpipe(pjob->ji_jsmpipe, &sjr, sizeof(sjr));
	j = errno;

	if (i != sizeof(sjr)) {
		sprintf(log_buffer,
			"read of pipe for pid job %s got %d not %d",
			pjob->ji_qs.ji_jobid,
			i, (int) sizeof(sjr));
		log_err(j, __func__, log_buffer);
		(void) close_conn(pjob->ji_jsmpipe);
		pjob->ji_jsmpipe = -1;
		(void) close(pjob->ji_mjspipe);
		pjob->ji_mjspipe = -1;

		if (pjob->ji_jsmpipe2 != -1) {
			(void) close_conn(pjob->ji_jsmpipe2);
			pjob->ji_jsmpipe2 = -1;
		}

		if (pjob->ji_mjspipe2 != -1) {
			(void) close(pjob->ji_mjspipe2);
			pjob->ji_mjspipe2 = -1;
		}

		if (pjob->ji_child2parent_job_update_pipe != -1) {
			(void) close_conn(pjob->ji_child2parent_job_update_pipe);
			pjob->ji_child2parent_job_update_pipe = -1;
		}

		if (pjob->ji_parent2child_job_update_pipe != -1) {
			(void) close(pjob->ji_parent2child_job_update_pipe);
			pjob->ji_parent2child_job_update_pipe = -1;
		}

		if (pjob->ji_parent2child_job_update_status_pipe != -1) {
			(void) close(pjob->ji_parent2child_job_update_status_pipe);
			pjob->ji_parent2child_job_update_status_pipe = -1;
		}

		if (pjob->ji_parent2child_moms_status_pipe != -1) {
			(void) close(pjob->ji_parent2child_moms_status_pipe);
			pjob->ji_parent2child_moms_status_pipe = -1;
		}
		(void) sprintf(log_buffer, "start failed, improper sid");
		exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
		return;
	}

#if MOM_ALPS
	if (sjr.sj_code == JOB_EXEC_UPDATE_ALPS_RESV_ID) {
		pjob->ji_extended.ji_ext.ji_pagg = sjr.sj_pagg;
		pjob->ji_extended.ji_ext.ji_reservation = sjr.sj_reservation;
		(void) writepipe(pjob->ji_mjspipe, &sjr, sizeof(sjr));
		return;
	}
#endif

	/* send back as an acknowledgement that MOM got it */
	(void) writepipe(pjob->ji_mjspipe, &sjr, sizeof(sjr));
	(void) close_conn(pjob->ji_jsmpipe);
	pjob->ji_jsmpipe = -1;
	(void) close(pjob->ji_mjspipe);
	pjob->ji_mjspipe = -1;

	if (pjob->ji_jsmpipe2 != -1) {
		(void) close_conn(pjob->ji_jsmpipe2);
		pjob->ji_jsmpipe2 = -1;
	}

	if (pjob->ji_mjspipe2 != -1) {
		(void) close(pjob->ji_mjspipe2);
		pjob->ji_mjspipe2 = -1;
	}

	if (pjob->ji_child2parent_job_update_pipe != -1) {
		(void) close_conn(pjob->ji_child2parent_job_update_pipe);
		pjob->ji_child2parent_job_update_pipe = -1;
	}

	if (pjob->ji_parent2child_job_update_pipe != -1) {
		(void) close(pjob->ji_parent2child_job_update_pipe);
		pjob->ji_parent2child_job_update_pipe = -1;
	}

	if (pjob->ji_parent2child_job_update_status_pipe != -1) {
		(void) close(pjob->ji_parent2child_job_update_status_pipe);
		pjob->ji_parent2child_job_update_status_pipe = -1;
	}

	if (pjob->ji_parent2child_moms_status_pipe != -1) {
		(void) close(pjob->ji_parent2child_moms_status_pipe);
		pjob->ji_parent2child_moms_status_pipe = -1;
	}

	DBPRT(("%s: read start return %d %d\n", __func__,
	       sjr.sj_code, sjr.sj_session))

	/* update pjob with values set from a prologue/launch hook
	 * since these are hooks that are executing in a child process
	 * and changes inside the child will not be reflected in main
	 * mom
	 */
	if ((num_eligible_hooks(HOOK_EVENT_EXECJOB_PROLOGUE) > 0) ||
	    (num_eligible_hooks(HOOK_EVENT_EXECJOB_LAUNCH) > 0)) {
		char hook_outfile[MAXPATHLEN + 1];
		struct stat stbuf;
		int reject_rerunjob = 0;
		int reject_deletejob = 0;

		snprintf(hook_outfile, MAXPATHLEN, FMT_HOOK_JOB_OUTFILE,
			 path_hooks_workdir, pjob->ji_qs.ji_jobid);
		if (stat(hook_outfile, &stbuf) == 0) {
			pbs_list_head vnl_changes;

			CLEAR_HEAD(vnl_changes);
			if (sjr.sj_code == JOB_EXEC_HOOKERROR) {

				char hook_buf2[HOOK_BUF_SIZE];
				int fd;
				char *hook_name = NULL;
				int rd_size = stbuf.st_size;

				if (rd_size >= HOOK_BUF_SIZE) {
					rd_size = HOOK_BUF_SIZE - 1;
				}

				fd = open(hook_outfile, O_RDONLY);
				hook_buf2[0] = '\0';
				if (fd != -1) {
					if (read(fd, hook_buf2, rd_size) == rd_size) {
						hook_buf2[rd_size] = '\0';
						if (hook_buf2[rd_size - 1] == '\n') {
							hook_buf2[rd_size - 1] = '\0';
						}

						hook_name = strchr(hook_buf2, '=');
						if (hook_name != NULL)
							hook_name++;
					}

					close(fd);
					unlink(hook_outfile);
				}
				if (hook_name != NULL) {
					send_hook_fail_action(find_hook(hook_name));
				}

			} else if (get_hook_results(hook_outfile, NULL, NULL, NULL, 0,
						    &reject_rerunjob, &reject_deletejob, NULL,
						    NULL, 0, &vnl_changes, pjob,
						    NULL, 0, NULL) != 0) {
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK, LOG_ERR, __func__, "Failed to get prologue hook results");
				vna_list_free(vnl_changes);
				/* important to unlink this file here */
				/* as this file is usually opened in append */
				/* mode under mom_process_hooks() */
				unlink(hook_outfile);
			} else {
				/* Delete job or reject job actions */
				/* NOTE: Must appear here before vnode changes, */
				/* since this action will be sent whether or not */
				/* hook script executed by PBSADMIN or not. */
				if (reject_deletejob) {
					/* deletejob takes precedence */
					new_job_action_req(pjob, HOOK_PBSADMIN, JOB_ACT_REQ_DELETE);
				} else if (reject_rerunjob) {
					new_job_action_req(pjob, HOOK_PBSADMIN, JOB_ACT_REQ_REQUEUE);
				}

				/* Whether or not we accept or reject, we'll make */
				/* job changes, vnode changes, job actions */
				enqueue_update_for_send(pjob, IS_RESCUSED_FROM_HOOK);

				/* Push vnl hook changes to server */
				hook_requests_to_server(&vnl_changes);

				unlink(hook_outfile);
			}
		}
	}

	/*
	 ** Set the global id before exiting on error so any
	 ** information can be put into the job struct first.
	 */
	set_globid(pjob, &sjr);
	if (sjr.sj_code < 0) {
#if MOM_ALPS
		/* we couldn't get a reservation so refresh the inventory */
		if (sjr.sj_reservation == -1)
			call_hup = HUP_INIT;
#endif
#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
		AFSLOG_TERM(ptask);
#endif
		(void) sprintf(log_buffer, "job not started, %s %d",
			       (sjr.sj_code == JOB_EXEC_RETRY) ? "Retry" : "Failure", sjr.sj_code);
		exec_bail(pjob, sjr.sj_code, log_buffer);
		return;
	}

	ptask->ti_qs.ti_sid = sjr.sj_session;
	ptask->ti_qs.ti_status = TI_STATE_RUNNING;

	strcpy(ptask->ti_qs.ti_parentjobid, pjob->ji_qs.ji_jobid);
	if (task_save(ptask) == -1) {
		(void) sprintf(log_buffer, "Task save failed");
		exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
		return;
	}

	/*
	 * return from the starter indicated the job is a go ...
	 * record the start time and session/process id
	 */

	start_walltime(pjob);

	set_jattr_l_slim(pjob, JOB_ATR_session_id, sjr.sj_session, SET);

	set_job_state(pjob, JOB_STATE_LTR_RUNNING);
	set_job_substate(pjob, JOB_SUBSTATE_RUNNING);
	job_save(pjob);

	if (mom_get_sample() == PBSE_NONE) {
		time_resc_updated = time_now;
		(void) mom_set_use(pjob);
	}
	/*
	 * these are set so that it will
	 * return them to the Server on the first update below
	 */
	(get_jattr(pjob, JOB_ATR_errpath))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_outpath))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_session_id))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_altid))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_state))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_substate))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_jobdir))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_altid2))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_acct_id))->at_flags |= ATR_VFLAG_MODIFY;

	enqueue_update_for_send(pjob, IS_RESCUSED);
	next_sample_time = min_check_poll;
	log_eventf(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO, pjob->ji_qs.ji_jobid, "Started, pid = %d", sjr.sj_session);

	return;
}

/**
 * @brief
 *	Regenerate the PBS_NODEFILE of a job based on internal
 *	nodes-related data.
 * @param[in]	pjob	- the job whose PBS_NODEFILE is to be generated.
 * @param[out]	nodefile- buffer to hold the path to PBS_NODEFILE
 *			  that got regenerated.
 *			  NOTE: OK for this to be NULL, which means
 *			  don't save nodefile path.
 *
 * @param[in] nodefile_sz - size of the 'nodefile' buffer.
 * @param[out] err_msg	- buffer to hold the error message if this
 *			 functions returns a failure.
 * @param[in]	err_msg_sz - size of the 'err_msg' buffer.
 *
 * @return int
 * @retval  0	success
 * @retval < 0	failure
 *
 */
int
generate_pbs_nodefile(job *pjob, char *nodefile, int nodefile_sz,
		      char *err_msg, int err_msg_sz)
{
	FILE *nhow;
	int j, vnodenum;
	char pbs_nodefile[MAXPATHLEN + 1];

	if (pjob == NULL) {
		snprintf(err_msg, err_msg_sz, "bad pjob param");
		return (-1);
	}

	if ((err_msg != NULL) && (err_msg_sz > 0))
		err_msg[0] = '\0';

	snprintf(pbs_nodefile, sizeof(pbs_nodefile) - 1, "%s/aux/%s",
		 pbs_conf.pbs_home_path, pjob->ji_qs.ji_jobid);

	if ((nhow = fopen(pbs_nodefile, "w")) == NULL) {
		if ((err_msg != NULL) && (err_msg_sz > 0)) {
			snprintf(err_msg, err_msg_sz,
				 "cannot open %s", pbs_nodefile);
		}
		return (-1);
	}
	/*
	 **	The file must be owned by root and readable by
	 **	the user.  We take the easy way out and make
	 **	it readable by anyone.
	 */
	if (fchmod(fileno(nhow), 0644) == -1) {
		if ((err_msg != NULL) && (err_msg_sz > 0)) {
			snprintf(err_msg, err_msg_sz, "cannot chmod %s",
				 pbs_nodefile);
		}
		fclose(nhow);
		(void) unlink(pbs_nodefile);
		return (-1);
	}

	/* write each node name out once per vnod and entry */
	vnodenum = pjob->ji_numvnod;
	for (j = 0; j < vnodenum; j++) {
		if (pjob->ji_vnods[j].vn_hname == NULL) {
			size_t len;
			char *pdot;

			/* we want to write just the short name of the host */
			if ((pdot = strchr(pjob->ji_vnods[j].vn_host->hn_host, '.')) != NULL)
				len = (size_t) (pdot - pjob->ji_vnods[j].vn_host->hn_host);
			else
				len = strlen(pjob->ji_vnods[j].vn_host->hn_host);
			fprintf(nhow, "%.*s\n", (int) len,
				pjob->ji_vnods[j].vn_host->hn_host);
		} else
			fprintf(nhow, "%s\n", pjob->ji_vnods[j].vn_hname);
	}
	fclose(nhow);

	if ((nodefile != NULL) && (nodefile_sz > 0))
		pbs_strncpy(nodefile, pbs_nodefile, nodefile_sz);

	return (0);
}

/**
 * @brief
 *	Read a piece of data from 'downfds' pipe of size 'data_size'.
 *
 * @param[in]	downfds - the pipe descriptor to read from.
 * @param[in]	data_size - the size of data to read.
 * @param[in]	wait_sec - # of seconds to wait for data to arrive.
 *
 * @return void *
 * @retval <opaque_data>	- pointer to some data that is in a fixed
 *				  memory area that must not be freed and can
 *				  get overwritten on a next call to this
 *				  function.
 * @retval NULL			- if no data was found or error encountered.
 * @note
 *	The read time is timed out using the $job_launch_delay mom config
 *	option value.
 */
void *
read_pipe_data(int downfds, int data_size, int wait_sec)
{
	static char *buf = NULL;
	static int buf_size = 0;
	int ret;
	int nread = 0;
	struct pollfd pollfds[1];
	int timeout = (int) (wait_sec * 1000); /* milli seconds */
	pollfds[0].fd = downfds;
	pollfds[0].events = POLLIN;
	pollfds[0].revents = 0;

	ret = poll(pollfds, 1, timeout);

	if (ret == -1) {
		log_err(errno, __func__, "error on monitoring pipe");
		return NULL;
	} else if (ret == 0) {
		/* select or poll timed out */
		return NULL;
	}

	if (data_size > buf_size) {
		char *tpbuf;

		tpbuf = realloc(buf, data_size);
		if (tpbuf == NULL) {
			log_err(-1, __func__, "realloc failure");
			return NULL;
		}
		buf = tpbuf;
		buf_size = data_size;
	}
	memset(buf, 0, buf_size);

	nread = readpipe(downfds, buf, data_size);

	if (data_size != nread) {
		log_err(-1, __func__, "did not receive all data");
		return NULL;
	}
	return (buf);
}

/**
 * @brief
 *	Write a piece of data of size 'data_size' into pipe descriptors
 *	'upfds' (data write)  and 'downfds' (data ack).
 *
 * @param[in]	upfds - pipe descriptor upstream.
 * @param[in]	downfds - pipe descriptor downstream
 * @param[in]	data - the data to write
 * @param[in]	data_size - the size of 'data'
 *
 * @return int
 * @retval  0	- for success
 * @retval  1	- for failure
 */
int
write_pipe_data_ack(int upfds, int downfds, void *data, size_t data_size)
{
	void *data_recv = NULL;
	int nwrite = 0;
	size_t data_size_recv;

	if ((data == NULL) || (data_size == 0)) {
		return (1);
	}

	/* first send out data length */
	nwrite = writepipe(upfds, data, data_size);
	if (nwrite != data_size) {
		log_err(-1, __func__, "failed to write data to pipe");
		return (1);
	}

	/* wait for acknowledgement */
	data_recv = read_pipe_data(downfds, sizeof(size_t), PIPE_READ_TIMEOUT);
	if (data_recv == NULL) {
		log_err(-1, __func__, "failed to get ack from pipe");
		return (1);
	}

	memcpy(&data_size_recv, data_recv, sizeof(size_t));
	if (data_size_recv != data_size) {
		log_err(-1, __func__, "received data not match sent data");
		return (1);
	}
	return (0);
}

/**
 * @brief
 *	Write a piece of data of size 'data_size' into pipe 'upfds'.
 *
 * @param[in]	upfds - pipe descriptor upstream.
 * @param[in]	data - the data to write
 * @param[in]	data_size - the size of 'data'
 *
 * @return int
 * @retval  0	- for success
 * @retval  1	- for failure
 */
int
write_pipe_data(int upfds, void *data, int data_size)
{
	int nwrite = 0;

	if ((data == NULL) || (data_size <= 0)) {
		return (1);
	}

	nwrite = writepipe(upfds, data, data_size);
	if (nwrite != data_size) {
		log_err(-1, __func__, "partial write detected");
		return (1);
	}
	return (0);
}

/**
 * @brief
 *	Write 'r_size' first, and then the actual data 'r_buf' into pipe
 *	descriptors *	'upfds' (data write)  and 'downfds' (data ack).
 *
 * @param[in]	upfds - pipe descriptor upstream.
 * @param[in]	downfds - pipe descriptor downstream
 * @param[in]	r_buf - the data to write
 * @param[in]	r_size - the size of 'r_buf'
 *
 * @return int
 * @retval  0	- for success
 * @retval  1	- for failure
 *
 */
int
send_string_data(int upfds, int downfds, void *r_buf, size_t r_size)
{
	/* send new string size */
	if (write_pipe_data_ack(upfds, downfds, &r_size, sizeof(size_t)) != 0) {
		return (1);
	}
	/* now send string data actual data */
	if (write_pipe_data_ack(upfds, downfds, r_buf, r_size) != 0) {
		return (1);
	}

	return (0);
}

/**
 * @brief
 *	Read some string of data from 'downfds' pipe descriptor and using
 *	'upfds' for acknowledgement.
 *
 * @param[in]	downfds - the pipe descriptor to read from.
 * @param[in]	upfds - the pipe descriptor to use for acks.
 *
 * @return char *
 * @retval <string_of_data>	- pointer to some string data that is in a
 *				  fixed memory area that must not be freed and
 *				  can get overwritten on a next call to this
 *				  function.
 * @retval NULL			- if no data was found or error encountered.
 * @note
 *	The read time is timed out using the $job_launch_delay mom config
 *	option value.
 */
char *
receive_string_data(int downfds, int upfds)
{
	char *r_buf;
	size_t r_size;
	size_t ack_size;

	/* get size of buffer to receive */
	r_buf = read_pipe_data(downfds, sizeof(size_t), PIPE_READ_TIMEOUT);
	if (r_buf == NULL) {
		return (NULL);
	}
	memcpy(&r_size, r_buf, sizeof(size_t));
	/* ack that we got the r_size */
	ack_size = sizeof(size_t);
	if (write_pipe_data(upfds, &ack_size, sizeof(size_t)) != 0) {
		snprintf(log_buffer, sizeof(log_buffer), "write of length %lu bytes to pipe failed", (unsigned long) ack_size);
		log_err(errno, __func__, log_buffer);
		return (NULL);
	}

	/* now get the actual string data */
	r_buf = read_pipe_data(downfds, r_size, PIPE_READ_TIMEOUT);
	if (r_buf == NULL) {
		snprintf(log_buffer, sizeof(log_buffer), "read of pipe of size %lu bytes for failed", (unsigned long) r_size);
		log_err(errno, __func__, log_buffer);
		return (NULL);
	}
	/* send back as an acknowledgement that MOM got it */
	ack_size = r_size;
	if (write_pipe_data(upfds, &ack_size, sizeof(size_t)) != 0) {
		snprintf(log_buffer, sizeof(log_buffer), "write of length %lu bytes to pipe failed", (unsigned long) r_size);
		log_err(errno, __func__, log_buffer);
		return (NULL);
	}
	return (r_buf);
}

/**
 * @brief
 *	Send a command 'cmd' request using the pipes given.
 *
 * @param[in]	upfds - upstream pipe
 * @param[in]	downstream - downstream pipe
 * @param[in]	cmd - command request to send (e.g. IM_EXEC_PROLOGUE)
 *
 * @return int
 * @retval 0	- success
 * @retval 1	- fail
 */
int
send_pipe_request(int upfds, int downfds, int cmd)
{
	char *r_buf;
	int cmd_read;

	if (write_pipe_data(upfds, &cmd, sizeof(int)) != 0) {
		log_err(-1, __func__, "bad write to pipe");
		return (1);
	}

	/* wait for acknowledgement */
	r_buf = read_pipe_data(downfds, sizeof(int), PIPE_READ_TIMEOUT);
	if (r_buf == NULL) {
		log_err(-1, __func__, "bad read from pipe");
		return (1);
	}

	memcpy(&cmd_read, r_buf, sizeof(int));
	if (cmd != cmd_read) {
		snprintf(log_buffer, sizeof(log_buffer), "wrote %d got %d", cmd, cmd_read);
		log_err(-1, __func__, log_buffer);
		return (1);
	}
	return (0);
}

/**
 * @brief
 *	Returns 1 (true) if sister moms have all replied IM_ALL_OKAY status in
 *	regards to execution of remote prologue hooks.
 *
 * @param[in,out]	pjob	 - job being operated on.
 * @param[in]		pipefd	-  pipe to mother superior to get status info.
 * @return int
 * @retval 1	- for true
 * @retval 0	- for false
 */
int
prologue_hook_all_okay_from_sisters_moms(job *pjob, int pipefd)
{
	int cmd_ack = 0;
	char *r_buf = NULL;

	if (pipefd == -1)
		return (0);

	/* get cmd_ack from parent that it received IM_ALL_OKAY status from
	 * all sister moms regarding execution of remote prologue hooks.
	 */
	r_buf = read_pipe_data(pipefd, sizeof(int), 0);
	if (r_buf != NULL)
		memcpy(&cmd_ack, r_buf, sizeof(int));

	if ((r_buf == NULL) || (cmd_ack != IM_ALL_OKAY))
		return (0);
	return (1);
}

/**
 * @brief
 *	Wait/read from 'pipfd' pipe for node names of unhealthy moms, and
 *	update accordingly the job 'pjob''s ji_node_list and ji_failed_node_list.
 *	Return in 'vnl_fails' those entries in job's exec_vnode where the
 *	vnodes are managed by parent moms appearing in pjob->ji_failed_node_list.
 *
 * @param[in/out]	pjob	 - job being operated on.
 * @param[in]		pipefd	-  pipe to mother superior to get data.
 * @param[in]		prolo_pipefd	-  pipe to mother superior to get info
 *				   about remote prologue hook execution
 * @param[out]		vnl_fails - fill in with the list of vnodes and their
 *				   resources with non-healthy  parent moms.
 * @param[out]		vnl_good - fill in with the list of vnodes and their
 *				   resources with functional parent moms.
 * @param[in]		timeout	- # of seconds to wait waiting for list of failed
 *				   mom hosts.
 * @return int
 * @retval 0	- for success
 * @retval 1	- for failure
 */
int
get_failed_moms_and_vnodes(job *pjob, int pipefd, int prolo_pipefd, vnl_t **vnl_fails, vnl_t **vnl_good, unsigned int timeout)
{
	size_t r_size = 0;
	char *r_buf = NULL;
	int timer;
	char err_msg[LOG_BUF_SIZE];
	int prolo_okay = 0;

	if (pjob == NULL)
		return (1);

	/* Get failed mom hosts, and update the job's node_list and failed_node_list */
	timer = timeout;
	do {
		/* get size of buffer to receive */
		r_buf = read_pipe_data(pipefd, sizeof(size_t), 1);
		if (r_buf != NULL) {
			memcpy(&r_size, r_buf, sizeof(size_t));
			/* now get the actual string data */
			r_buf = read_pipe_data(pipefd, r_size, 0);
			if (r_buf != NULL) {
				snprintf(log_buffer, sizeof(log_buffer), "received from parent mom that node's host %s is not healthy", r_buf);
				log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
				reliable_job_node_add(&pjob->ji_failed_node_list, r_buf);
				reliable_job_node_delete(&pjob->ji_node_list, r_buf);
			}
		}
		if (prolo_pipefd != -1)
			prolo_okay = prologue_hook_all_okay_from_sisters_moms(pjob, prolo_pipefd);
		timer--;
	} while ((timer >= 0) && !prolo_okay);

	if ((prolo_pipefd != -1) && !prolo_okay)
		log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "not all prologue hooks to sister moms completed, but job will proceed to execute");

	/* now prune_exec_vnode taking away vnodes managed by moms
	 * in job's node_fail_list, and also satisfy the original
	 * job schedselect
	 */
	if (prune_exec_vnode(pjob, NULL, vnl_fails, vnl_good, err_msg, LOG_BUF_SIZE) != 0) {
		return (1);
	}

	return (0);
}

/**
 * @brief
 *	This is called by a job child process telling parent mom of job attribute
 *	updates, using the communication pipes:'pipefd_write', 'pipefd_ack',
 *	'pipefd_status'.
 *
 * @param[in,out]	pjob	 - job being operated on.
 * @param[in]		pipefd_write - for sending the job update request.
 * @param[in]		pipefd_ack - for receiving the ack from parent mom that it
 *				     has received the job update request.
 * @param[in]		pipefd_status - for child to get the result from parent mom
 *					on the job update.
 * @retval 0	- for success
 * @retval 1	- for failure
 */
int
send_update_job(job *pjob, int pipefd_write, int pipefd_ack, int pipefd_status)
{
	int exec_vnode_hookset;
	int schedselect_hookset;
	int exec_host_hookset;
	int exec_host2_hookset;
	size_t r_size = 0;
	char *r_buf = NULL;
	int cmd_ack = IM_ALL_OKAY;

	if (pjob == NULL)
		return (1);

	exec_vnode_hookset = (get_jattr(pjob, JOB_ATR_exec_vnode))->at_flags & ATR_VFLAG_HOOK;
	schedselect_hookset = (get_jattr(pjob, JOB_ATR_SchedSelect))->at_flags & ATR_VFLAG_HOOK;
	exec_host_hookset = (get_jattr(pjob, JOB_ATR_exec_host))->at_flags & ATR_VFLAG_HOOK;
	exec_host2_hookset = (get_jattr(pjob, JOB_ATR_exec_host2))->at_flags & ATR_VFLAG_HOOK;
	if (!exec_vnode_hookset || !schedselect_hookset ||
	    (!exec_host_hookset && !exec_host2_hookset)) {
		return (1);
	}

	/* now that we pruned exec_vnode, need to send the
	 * update to the parent mom
	 */
	if (send_pipe_request(pipefd_write, pipefd_ack, IM_UPDATE_JOB) != 0) {
		log_err(-1, __func__, "send of IM_UPDATE_JOB to parent mom failed");
		return (1);
	}

	/* add delay */
	r_buf = get_jattr_str(pjob, JOB_ATR_exec_vnode);
	r_size = strlen(r_buf) + 1;

	if (send_string_data(pipefd_write, pipefd_ack, r_buf, r_size) != 0) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "failed to send_string_data %s to parent mom", r_buf);
		log_err(-1, __func__, log_buffer);
		return (1);
	}

	/* now send new exec_host or exec_host2 */
	if (is_jattr_set(pjob, JOB_ATR_exec_host2))
		r_buf = get_jattr_str(pjob, JOB_ATR_exec_host2);
	else if (is_jattr_set(pjob, JOB_ATR_exec_host)) /* send new exec_host size */
		r_buf = get_jattr_str(pjob, JOB_ATR_exec_host);
	else {
		snprintf(log_buffer, sizeof(log_buffer),
			 "job %s has unset exec_host and exec_host2", pjob->ji_qs.ji_jobid);
		log_err(-1, __func__, log_buffer);
		return (1);
	}
	r_size = strlen(r_buf) + 1;

	if (send_string_data(pipefd_write, pipefd_ack, r_buf, r_size) != 0) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "failed to send_string_data %s to parent mom", r_buf);
		log_err(-1, __func__, log_buffer);
		return (1);
	}

	/* now send schedselect */
	r_buf = get_jattr_str(pjob, JOB_ATR_SchedSelect);

	r_size = strlen(r_buf) + 1;
	if (send_string_data(pipefd_write, pipefd_ack, r_buf, r_size) != 0) {
		snprintf(log_buffer, sizeof(log_buffer),
			 "failed to send_string_data %s to parent mom", r_buf);
		log_err(-1, __func__, log_buffer);
		return (1);
	}

	/* clear the hook set flag since we've sent the update */
	(get_jattr(pjob, JOB_ATR_exec_vnode))->at_flags &= ~ATR_VFLAG_HOOK;
	if (exec_host2_hookset)
		(get_jattr(pjob, JOB_ATR_exec_host2))->at_flags &= ~ATR_VFLAG_HOOK;
	else
		(get_jattr(pjob, JOB_ATR_exec_host))->at_flags &= ~ATR_VFLAG_HOOK;
	(get_jattr(pjob, JOB_ATR_SchedSelect))->at_flags &= ~ATR_VFLAG_HOOK;

	if (pjob->ji_numnodes > 1) {
		/* get cmd_ack from parent that it received
		 * and acted upon the job updates from sis moms
		 */
		snprintf(log_buffer, sizeof(log_buffer), "waiting up to %d secs for job update acks from sister moms", PIPE_READ_TIMEOUT);
		log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
		r_buf = read_pipe_data(pipefd_status, sizeof(int), PIPE_READ_TIMEOUT);
		if (r_buf != NULL)
			memcpy(&cmd_ack, r_buf, sizeof(int));
		if ((r_buf == NULL) || (cmd_ack != IM_ALL_OKAY)) {
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "not all job updates to sister moms completed");
		}
	}
	return (0);
}

/**
 * @brief
 *	Get/set job pjob's exec_vnode, exec_host, schedselect from the job's
 *	3rd pipe, and communicatng to the server of the new exec_vnode,
 *	schedselect values (no need to send exec_host as server will just
 *	recreate it on its end). The sister moms whose vnodes have been
 *	out in the new exec_vnode would get an IM_DELETE_JOB2 request.
 * @param[in]	pjob - job whose exec_vnode/exec_host/schedselect  is being
 *			obtained.
 * @param[in]	msg - fill in with error message received if this function
 *			encounters a failure.
 * @param[in]	msg_size - size of 'msg'.
 *
 * @return int
 * @retval 0	- for success
 * @retval 1	- for non-success due to pipes failure
 * @retval 2	- for no data found
 * @retval -1	- for non-success due to internal error.
 */
int
get_new_exec_vnode_host_schedselect(job *pjob, char *msg, size_t msg_size)
{
	char *new_exec_vnode = NULL;
	char *new_exec_host = NULL;
	char *new_schedselect = NULL;
	char *r_buf;
	int rc = 0;

	/* get exec_vnode don't close pipes */
	r_buf = receive_string_data(pjob->ji_child2parent_job_update_pipe,
				    pjob->ji_parent2child_job_update_pipe);
	if (r_buf == NULL) {
		(void) snprintf(msg, msg_size, "failed to obtain new exec_vnode");
		return (1);
	}
	new_exec_vnode = strdup(r_buf);
	if (new_exec_vnode == NULL) {
		(void) snprintf(msg, msg_size, "%s: new exec_vnode strdup error", __func__);
		return (1);
	}

	/* get exec_host */
	r_buf = receive_string_data(pjob->ji_child2parent_job_update_pipe, pjob->ji_parent2child_job_update_pipe);
	if (r_buf == NULL) {
		(void) snprintf(msg, msg_size, "failed to obtain new exec_host size");
		free(new_exec_vnode);
		return (1);
	}

	new_exec_host = strdup(r_buf);
	if (new_exec_host == NULL) {
		(void) snprintf(msg, msg_size, "failed to strdup new exec_host");
		free(new_exec_vnode);
		return (1);
	}
	/* get schedselect */
	r_buf = receive_string_data(pjob->ji_child2parent_job_update_pipe, pjob->ji_parent2child_job_update_pipe);
	if (r_buf == NULL) {
		(void) snprintf(msg, msg_size, "failed to obtain new schedselect size");
		free(new_exec_vnode);
		free(new_exec_host);
		return (1);
	}

	new_schedselect = strdup(r_buf);
	if (new_schedselect == NULL) {
		(void) snprintf(msg, msg_size, "failed to strdup new schedselect");
		free(new_exec_vnode);
		free(new_exec_host);
		return (1);
	}

	/* set job's exec_vnode */
	snprintf(log_buffer, sizeof(log_buffer), "pruned from exec_vnode=%s",
		 get_jattr_str(pjob, JOB_ATR_exec_vnode));
	log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
		  pjob->ji_qs.ji_jobid, log_buffer);
	snprintf(log_buffer, sizeof(log_buffer),
		 "pruned to exec_vnode=%s", new_exec_vnode);
	log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
		  pjob->ji_qs.ji_jobid, log_buffer);

	set_jattr_str_slim(pjob, JOB_ATR_exec_vnode, new_exec_vnode, NULL);

	(void) update_resources_list(pjob, ATTR_l,
				     JOB_ATR_resource, new_exec_vnode, INCR, 0,
				     JOB_ATR_resource_orig);

	if (is_jattr_set(pjob, JOB_ATR_exec_host2))
		set_jattr_str_slim(pjob, JOB_ATR_exec_host2, new_exec_host, NULL);
	else if (is_jattr_set(pjob, JOB_ATR_exec_host))
		set_jattr_str_slim(pjob, JOB_ATR_exec_host, new_exec_host, NULL);

	/* Send DELETE_JOB2 request to the sister moms not in
	 * 'new_peh', to kill the job on that sister and
	 * report resources_used info.
	 */
	(void) send_sisters_inner(pjob, IM_DELETE_JOB2, NULL, new_exec_host);

	set_jattr_str_slim(pjob, JOB_ATR_SchedSelect, new_schedselect, NULL);

	free(new_exec_vnode);
	free(new_exec_host);
	free(new_schedselect);

	if ((rc = job_nodes(pjob)) != 0) {
		snprintf(msg, msg_size, "failed updating internal nodes data (rc=%d)", rc);
		return (-1);
	}
	if (generate_pbs_nodefile(pjob, NULL, 0, msg, msg_size) != 0) {
		return (-1);
	}

	job_save(pjob);
	/* set modify flag on the job attributes that will be sent to the server */
	(get_jattr(pjob, JOB_ATR_exec_vnode))->at_flags |= ATR_VFLAG_MODIFY;
	(get_jattr(pjob, JOB_ATR_SchedSelect))->at_flags |= ATR_VFLAG_MODIFY;
	enqueue_update_for_send(pjob, IS_RESCUSED);

	return (0);
}

/**
 * @brief
 *	A task that will report failed node hosts due to
 *	unsuccessful execjob_prologue hook execution.
 *
 * @param[in] 	work_task -  task to process.
 *
 * @return none
 *
 */
static void
report_failed_node_hosts_task(struct work_task *ptask)
{
	job *pjob = (job *) ptask->wt_parm1;
	reliable_job_node *rjn, *rjn_next;

	if (pjob == NULL) {
		log_err(-1, __func__, "task structure contains reference to NULL job");
		return;
	}
	if (pjob->ji_report_task)
		pjob->ji_report_task = NULL;

	if (!check_job_state(pjob, JOB_STATE_LTR_RUNNING) ||
	    !check_job_substate(pjob, JOB_SUBSTATE_PRERUN))
		return; /* job not longer waiting for healthy moms */

	for (rjn = (reliable_job_node *) GET_NEXT(pjob->ji_node_list); rjn != NULL; rjn = rjn_next) {
		rjn_next = (reliable_job_node *) GET_NEXT(rjn->rjn_link);
		if (strcmp(rjn->rjn_host, mom_host) == 0)
			continue;

		if (!rjn->prologue_hook_success) {
			reliable_job_node_add(&pjob->ji_failed_node_list, rjn->rjn_host);
			if (pjob->ji_parent2child_moms_status_pipe != -1) {
				size_t r_size;
				r_size = strlen(rjn->rjn_host) + 1;
				if (write_pipe_data(pjob->ji_parent2child_moms_status_pipe, &r_size, sizeof(size_t)) == 0)
					(void) write_pipe_data(pjob->ji_parent2child_moms_status_pipe, rjn->rjn_host, r_size);
				else
					log_err(errno, __func__, "failed to write");
			}
			delete_link(&rjn->rjn_link);
			free(rjn);
		}
	}
}

/**
 * @brief
 * 	Receive a special request from the pipe represented by descriptor
 *	'sd'.
 * @param[in]	sd - connection descriptor
 *
 * @return none
 *
 */
static void
receive_pipe_request(int sd)
{
	conn_t *conn;
	int i;
	job *pjob = NULL;
	pbs_task *ptask;
	int cmd;
	char msg[LOG_BUF_SIZE];

	if ((conn = get_conn(sd)) == NULL) {
		log_err(PBSE_INTERNAL, __func__, "unable to find pipe");
		return;
	}

	ptask = (pbs_task *) conn->cn_data;
	if (ptask == NULL)
		return;

	pjob = ptask->ti_job;

	if (pjob == NULL) {
		log_err(PBSE_INTERNAL, __func__, "no job task associated with connection");
		return;
	}

	/* now we read the cmd or error */
	i = readpipe(pjob->ji_jsmpipe2, &cmd, sizeof(int));

	if (i != sizeof(int)) {
		return;
	}

	/* send back as an acknowledgement that MOM got it */
	(void) writepipe(pjob->ji_mjspipe2, &cmd, sizeof(int));

	if (cmd == IM_EXEC_PROLOGUE) {

		if (send_sisters(pjob, IM_EXEC_PROLOGUE, NULL) != pjob->ji_numnodes - 1) {
			snprintf(log_buffer, sizeof(log_buffer),
				 "warning: %s: IM_EXEC_PROLOGUE requests "
				 "could not reach some sister moms",
				 pjob->ji_qs.ji_jobid);
			log_err(-1, __func__, log_buffer);
		}

		if (do_tolerate_node_failures(pjob)) {
			long delay_value;
			/* execute report task 'delay_value' seconds from
			 * now, where the value is 95% of the job_launch_delay
			 * value. This allows the waiting child mom to
			 * capture the failed node host values before it times
			 * out on job_launch_delay.
			 */
			delay_value = 0.95 * job_launch_delay;
			pjob->ji_report_task = set_task(WORK_Timed, time_now + delay_value, report_failed_node_hosts_task, pjob);
		}
	} else {
		snprintf(msg, sizeof(msg), "ignoring unknown cmd %d", cmd);
		log_err(-1, __func__, msg);
	}
}

/**
 * @brief
 *	Close various pipes touched by a job update.
 *
 * @param[in] pjob - structure handle to job
 *
 * @return Void
 *
 */
void
close_update_pipes(job *pjob)
{
	if (pjob == NULL)
		return;

	(void) close_conn(pjob->ji_child2parent_job_update_pipe);
	pjob->ji_child2parent_job_update_pipe = -1;
	(void) close(pjob->ji_parent2child_job_update_pipe);
	pjob->ji_parent2child_job_update_pipe = -1;

	if (pjob->ji_jsmpipe2 != -1) {
		(void) close(pjob->ji_jsmpipe2);
		pjob->ji_jsmpipe2 = -1;
	}

	if (pjob->ji_mjspipe2 != -1) {
		(void) close(pjob->ji_mjspipe2);
		pjob->ji_mjspipe2 = -1;
	}

	(void) close_conn(pjob->ji_jsmpipe);
	pjob->ji_jsmpipe = -1;
	(void) close(pjob->ji_mjspipe);
	pjob->ji_mjspipe = -1;
}

/**
 * @brief
 * 	Receive a special request from the pipe represented by descriptor
 *	'sd'.
 * @param[in]	sd - connection descriptor
 *
 * @return none
 *
 */
static void
receive_job_update_request(int sd)
{
	conn_t *conn = NULL;
	int i;
	job *pjob = NULL;
	pbs_task *ptask;
	int cmd;
	char msg[LOG_BUF_SIZE];

	if ((conn = get_conn(sd)) == NULL) {
		log_err(PBSE_INTERNAL, __func__, "unable to find pipe");
		return;
	}

	ptask = (pbs_task *) conn->cn_data;

	if (ptask == NULL)
		return;

	pjob = ptask->ti_job;

	if (pjob == NULL) {
		log_err(PBSE_INTERNAL, __func__, "no job task associated with connection");
		return;
	}

	/* now we read the cmd or error */
	i = readpipe(pjob->ji_child2parent_job_update_pipe, &cmd, sizeof(int));

	if (i != sizeof(int)) {
		snprintf(msg, sizeof(msg),
			 "read of pipe for pid job %s got %d not %d: errno %s",
			 pjob->ji_qs.ji_jobid, i, (int) sizeof(int), strerror(errno));

		close_update_pipes(pjob);
		exec_bail(pjob, JOB_EXEC_RETRY, msg);
		return;
	}

	/* send back as an acknowledgement that MOM got it */
	(void) writepipe(pjob->ji_parent2child_job_update_pipe, &cmd, sizeof(int));

	if (cmd == IM_UPDATE_JOB) {
		mom_hook_input_t hook_input;
		mom_hook_output_t hook_output;
		char hook_msg[HOOK_MSG_SIZE + 1];
		int hook_errcode = 0;
		hook *last_phook;
		unsigned int hook_fail_action = 0;

		if (get_new_exec_vnode_host_schedselect(pjob, msg, LOG_BUF_SIZE) != 0) {
			close_update_pipes(pjob);
			exec_bail(pjob, JOB_EXEC_RETRY, msg);
			return;
		}

		mom_hook_input_init(&hook_input);
		hook_input.pjob = pjob;

		mom_hook_output_init(&hook_output);
		hook_output.reject_errcode = &hook_errcode;
		hook_output.last_phook = &last_phook;
		hook_output.fail_action = &hook_fail_action;
		if (mom_process_hooks(HOOK_EVENT_EXECJOB_RESIZE,
				      PBS_MOM_SERVICE_NAME, mom_host, &hook_input,
				      &hook_output,
				      hook_msg, sizeof(hook_msg), 1) == 0) {
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE, pjob->ji_qs.ji_jobid, "execjob_resize hook rejected request");
			close_update_pipes(pjob);
			exec_bail(pjob, JOB_EXEC_RETRY, hook_msg);
			return;
		}
		(void) send_sisters_job_update(pjob);
		pjob->ji_updated = 1;
	} else {
		snprintf(msg, sizeof(msg), "ignoring unknown cmd %d", cmd);
		log_err(-1, __func__, msg);
	}
}

/**
 *
 * @brief
 * 	Used by MOM superior to start the shell process for 'pjob'
 *
 * @param[in]	pjob - pointer to the job whose initial shell is
 *		being spawned.
 *
 * @return	Void
 *
 */
void
finish_exec(job *pjob)
{
	char **argv = NULL;
	char buf[(2 * MAXPATHLEN) + 5];
	pid_t cpid;
	struct passwd *pwdp; /* for uid, shell, home dir */
	int i, j, k;
	pbs_socklen_t len;
	int is_interactive = 0;
	int numthreads;
#if SHELL_INVOKE == 1
	int pipe_script[] = {-1, -1};
#endif
	char *pts_name; /* name of slave pty */
	char *shell;
	int jsmpipe[] = {-1, -1};		       /* job starter to MOM for sid */
	int jsmpipe2[] = {-1, -1};		       /* job starter to MOM */
	int child2parent_job_update_pipe[] = {-1, -1}; /* job starter to MOM */
	int child2parent_job_update_pipe_w = -1;
	int upfds = -1;				       /* init to invalid fd */
	int upfds2 = -1;			       /* init to invalid fd */
	int mjspipe[] = {-1, -1};		       /* MOM to job starter for ack */
	int mjspipe2[] = {-1, -1};		       /* MOM to job starter */
	int parent2child_job_update_pipe[] = {-1, -1}; /* MOM to job starter */
	int parent2child_job_update_pipe_r = -1;
	int parent2child_job_update_status_pipe[] = {-1, -1}; /* MOM to job starter */
	int parent2child_job_update_status_pipe_r = -1;	      /* init to invalid fd */
	int downfds = -1;				      /* init to invalid fd */
	int downfds2 = -1;				      /* init to invalid fd */
	int parent2child_moms_status_pipe[] = {-1, -1};	      /* MOM to job starter */
	int parent2child_moms_status_pipe_r = -1;	      /* init to invalid fd */
	int port_out, port_err;
	struct startjob_rtn sjr;
#if MOM_ALPS
	struct startjob_rtn ack;
#endif
	pbs_task *ptask;
	struct array_strings *vstrs;
	struct sockaddr_in saddr;
	int nodemux = 0;
	char *pbs_jobdir; /* staging and execution directory of this job */
	int sandbox_private = 0;
	int display_number = 0, n = 0;
	struct pfwdsock *socks = NULL;
#ifdef NAS /* localmod 020 */
	char *schedselect;
#endif /* localmod 020 */
	char hook_msg[HOOK_MSG_SIZE + 1];
	int hook_rc;
	int prolo_hooks = 0; /*# of runnable prologue hooks*/
	char *progname = NULL;
	pbs_list_head argv_list;
	char *the_progname;
	char **the_argv;
	char **the_env;
	char **res_env;
	hook *last_phook = NULL;
	unsigned int hook_fail_action = 0;
	int hook_errcode = 0;
	mom_hook_input_t hook_input;
	mom_hook_output_t hook_output;
	int job_has_executable;
	FILE *temp_stderr = stderr;
	vnl_t *vnl_fails = NULL;
	vnl_t *vnl_good = NULL;

	ptc = -1; /* No current master pty */

	memset(&sjr, 0, sizeof(sjr));
	if (is_jattr_set(pjob, JOB_ATR_nodemux))
		nodemux = get_jattr_long(pjob, JOB_ATR_nodemux);

	if ((i = job_setup(pjob, &pwdp)) != JOB_EXEC_OK) {
		exec_bail(pjob, i, NULL);
		return;
	}

	/* wait until after job_setup to call jobdirname(), we need the user's home info */
	pbs_jobdir = jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir);

	if ((is_jattr_set(pjob, JOB_ATR_sandbox)) &&
	    (strcasecmp(get_jattr_str(pjob, JOB_ATR_sandbox), "PRIVATE") == 0)) {
		/* set local variable sandbox_private */
		sandbox_private = 1;
	}

	/* If job has been checkpointed, restart from the checkpoint image */

	if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT) ||
	    (pjob->ji_qs.ji_svrflags & JOB_SVFLG_ChkptMig)) {
		if ((i = local_restart(pjob, NULL)) != 0) {
			post_restart(pjob, i);
			exec_bail(pjob, (i == PBSE_CKPBSY) ? JOB_EXEC_RETRY : JOB_EXEC_FAIL2, NULL);
		}
		return;
	}

	if (pjob->ji_numnodes == 1 || nodemux) {
		port_out = -1;
		port_err = -1;
	} else {
		/*
		 ** Get port numbers from file decriptors in job struct.  The
		 ** sockets are stored there so they can be closed later as
		 ** Main MOM will not need them after the job is going.
		 */
		len = sizeof(saddr);
		if (getsockname(pjob->ji_stdout,
				(struct sockaddr *) &saddr, &len) == -1) {
			(void) sprintf(log_buffer, "getsockname on stdout");
			exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
			return;
		}
		port_out = (int) ntohs(saddr.sin_port);

		len = sizeof(saddr);
		if (getsockname(pjob->ji_stderr,
				(struct sockaddr *) &saddr, &len) == -1) {
			(void) sprintf(log_buffer, "getsockname on stderr");
			exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
			return;
		}
		port_err = (int) ntohs(saddr.sin_port);
	}

	if (is_jattr_set(pjob, JOB_ATR_interactive) && get_jattr_long(pjob, JOB_ATR_interactive) != 0) {

		is_interactive = 1;

		/*
		 * open a master pty, need to do it here before we fork,
		 * to save the slave name in the master's job structure
		 */

		if ((ptc = open_master(&pts_name)) < 0) {
			log_err(errno, __func__, "cannot open master pty");
			exec_bail(pjob, JOB_EXEC_RETRY, NULL);
			return;
		}
		FDMOVE(ptc)

		/* save pty name in job output/error file name */
		set_jattr_str_slim(pjob, JOB_ATR_outpath, pts_name, NULL);
		set_jattr_str_slim(pjob, JOB_ATR_errpath, pts_name, NULL);

#if SHELL_INVOKE == 1
	} else {
		/* need a pipe on which to write the shell script 	*/
		/* file name to the input of the shell			*/

		if (pipe(pipe_script) == -1) {
			(void) sprintf(log_buffer,
				       "Failed to create shell name pipe");
			exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
			return;
		}
#endif /* SHELL_INVOKE */
	}

	/* create pipes between MOM and the job starter    */
	/* fork the job starter which will become the job */

	if ((pipe(mjspipe) == -1) || (pipe(jsmpipe) == -1)) {
		i = -1;

	} else {

		i = 0;

		/* make sure pipe file descriptors are above 2 */

		if (jsmpipe[1] < 3) {
			upfds = fcntl(jsmpipe[1], F_DUPFD, 3);
			(void) close(jsmpipe[1]);
			jsmpipe[1] = -1;
		} else {
			upfds = jsmpipe[1];
		}
		if (mjspipe[0] < 3) {
			downfds = fcntl(mjspipe[0], F_DUPFD, 3);
			(void) close(mjspipe[0]);
			mjspipe[0] = -1;
		} else {
			downfds = mjspipe[0];
		}
	}
	if ((i == -1) || (upfds < 3) || (downfds < 3)) {
		if (upfds != -1)
			(void) close(upfds);
		if (downfds != -1)
			(void) close(downfds);
		if (jsmpipe[0] != -1)
			(void) close(jsmpipe[0]);
		if (mjspipe[1] != -1)
			(void) close(mjspipe[1]);
		(void) sprintf(log_buffer, "Failed to create communication pipe");
		exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
		return;
	}
	if ((ptask = momtask_create(pjob)) == NULL) {
		if (upfds != -1)
			(void) close(upfds);
		if (downfds != -1)
			(void) close(downfds);
		if (jsmpipe[0] != -1)
			(void) close(jsmpipe[0]);
		if (mjspipe[1] != -1)
			(void) close(mjspipe[1]);
		(void) sprintf(log_buffer, "Task creation failed");
		exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
		return;
	}

	prolo_hooks = num_eligible_hooks(HOOK_EVENT_EXECJOB_PROLOGUE);

	/* create 2nd set of pipes between MOM and the job starter */
	/* if there are prologue hooks */
	if (prolo_hooks > 0) {
		if ((pipe(mjspipe2) == -1) || (pipe(jsmpipe2) == -1)) {
			i = -1;

		} else {

			i = 0;

			/* make sure pipe file descriptors are above 2 */

			if (jsmpipe2[1] < 3) {
				upfds2 = fcntl(jsmpipe2[1], F_DUPFD, 3);
				(void) close(jsmpipe2[1]);
				jsmpipe2[1] = -1;
			} else {
				upfds2 = jsmpipe2[1];
			}

			if (mjspipe2[0] < 3) {
				downfds2 = fcntl(mjspipe2[0], F_DUPFD, 3);
				(void) close(mjspipe2[0]);
				mjspipe2[0] = -1;
			} else {
				downfds2 = mjspipe2[0];
			}
		}
		if ((i == -1) || (upfds2 < 3) || (downfds2 < 3)) {
			if (upfds2 != -1)
				(void) close(upfds2);
			if (downfds2 != -1)
				(void) close(downfds2);
			if (jsmpipe2[0] != -1)
				(void) close(jsmpipe2[0]);
			if (mjspipe2[1] != -1)
				(void) close(mjspipe2[1]);
			(void) snprintf(log_buffer, sizeof(log_buffer),
					"Failed to create communication pipe");
			exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
			return;
		}
	}

	if (do_tolerate_node_failures(pjob)) {
		/* create 3rd set of pipes between MOM and the job starter
		 * fork the job starter which will become the job
		 */

		if ((pipe(parent2child_job_update_pipe) == -1) || (pipe(child2parent_job_update_pipe) == -1)) {
			i = -1;
		} else {
			i = 0;
			/* make sure pipe file descriptors are above 2 */
			if (child2parent_job_update_pipe[1] < 3) {
				child2parent_job_update_pipe_w = fcntl(child2parent_job_update_pipe[1], F_DUPFD, 3);
				(void) close(child2parent_job_update_pipe[1]);
				child2parent_job_update_pipe[1] = -1;
			} else {
				child2parent_job_update_pipe_w = child2parent_job_update_pipe[1];
			}
			if (parent2child_job_update_pipe[0] < 3) {
				parent2child_job_update_pipe_r = fcntl(parent2child_job_update_pipe[0], F_DUPFD, 3);
				(void) close(parent2child_job_update_pipe[0]);
				parent2child_job_update_pipe[0] = -1;
			} else {
				parent2child_job_update_pipe_r = parent2child_job_update_pipe[0];
			}
		}
		if ((i == -1) || (child2parent_job_update_pipe_w < 3) || (parent2child_job_update_pipe_r < 3)) {
			if (child2parent_job_update_pipe_w != -1)
				(void) close(child2parent_job_update_pipe_w);
			if (parent2child_job_update_pipe_r != -1)
				(void) close(parent2child_job_update_pipe_r);
			if (child2parent_job_update_pipe[0] != -1)
				(void) close(child2parent_job_update_pipe[0]);
			if (parent2child_job_update_pipe[1] != -1)
				(void) close(parent2child_job_update_pipe[1]);
			(void) sprintf(log_buffer,
				       "Failed to create communication pipe");
			exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
			return;
		}

		/* create 4th set of pipes between MOM and the job starter
		 * fork the job starter which will become the job
		 */

		if (pipe(parent2child_job_update_status_pipe) == -1) {
			i = -1;
		} else {
			i = 0;
			/* make sure pipe file descriptors are above 2 */
			if (parent2child_job_update_status_pipe[0] < 3) {
				parent2child_job_update_status_pipe_r = fcntl(parent2child_job_update_status_pipe[0], F_DUPFD, 3);
				(void) close(parent2child_job_update_status_pipe[0]);
				parent2child_job_update_status_pipe[0] = -1;
			} else {
				parent2child_job_update_status_pipe_r = parent2child_job_update_status_pipe[0];
			}
		}
		if ((i == -1) || (parent2child_job_update_status_pipe_r < 3)) {
			if (parent2child_job_update_status_pipe_r != -1)
				(void) close(parent2child_job_update_status_pipe_r);
			if (parent2child_job_update_status_pipe[1] != -1)
				(void) close(parent2child_job_update_status_pipe[1]);
			(void) sprintf(log_buffer,
				       "Failed to create communication pipe");
			exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
			return;
		}

		if (pipe(parent2child_moms_status_pipe) == -1) {
			i = -1;

		} else {

			i = 0;

			/* make sure pipe file descriptors are above 2 */
			if (parent2child_moms_status_pipe[0] < 3) {
				parent2child_moms_status_pipe_r = fcntl(parent2child_moms_status_pipe[0], F_DUPFD, 3);
				(void) close(parent2child_moms_status_pipe[0]);
				parent2child_moms_status_pipe[0] = -1;
			} else {
				parent2child_moms_status_pipe_r = parent2child_moms_status_pipe[0];
			}
		}

		if ((i == -1) || (parent2child_moms_status_pipe_r < 3)) {
			if (parent2child_moms_status_pipe_r != -1)
				(void) close(parent2child_moms_status_pipe_r);
			if (parent2child_moms_status_pipe[1] != -1)
				(void) close(parent2child_moms_status_pipe[1]);
			(void) sprintf(log_buffer,
				       "Failed to create communication pipe");
			exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
			return;
		}
	}

	pjob->ji_qs.ji_stime = time_now;
	set_jattr_l_slim(pjob, JOB_ATR_stime, time_now, SET);
	pjob->ji_sampletim = time_now;

	/*
	 * Fork the child process that will become the job.
	 */
	cpid = fork_me(-1);
	if (cpid > 0) {
		conn_t *conn = NULL;

		/* the parent side, still the main man, uhh that is MOM */

		(void) close(upfds);
		(void) close(downfds);

		(void) close(upfds2);
		(void) close(downfds2);

		(void) close(child2parent_job_update_pipe_w);
		(void) close(parent2child_job_update_pipe_r);

		(void) close(parent2child_job_update_status_pipe_r);
		(void) close(parent2child_moms_status_pipe_r);

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
		DIS_tcp_funcs();
#endif

		/* add the pipe to the connection table so we can poll it */

		if ((conn = add_conn(jsmpipe[0], ChildPipe, (pbs_net_t) 0,
				     (unsigned int) 0, NULL, record_finish_exec)) == NULL) {
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ERR,
				  pjob->ji_qs.ji_jobid,
				  "Unable to start job, communication connection table is full");
			(void) close(jsmpipe[0]);
			(void) close(mjspipe[1]);
			(void) close(jsmpipe2[0]);
			(void) close(mjspipe2[1]);

			(void) close(child2parent_job_update_pipe[0]);
			(void) close(parent2child_job_update_pipe[1]);

			(void) close(parent2child_job_update_status_pipe[1]);
			(void) close(parent2child_moms_status_pipe[1]);
#if SHELL_INVOKE == 1
			if (pipe_script[0] != -1)
				(void) close(pipe_script[0]);
			if (pipe_script[1] != -1)
				(void) close(pipe_script[1]);
#endif
			exec_bail(pjob, JOB_EXEC_RETRY, NULL);
			return;
		}
		conn->cn_data = ptask;
		pjob->ji_jsmpipe = jsmpipe[0];
		pjob->ji_mjspipe = mjspipe[1];
		pjob->ji_jsmpipe2 = jsmpipe2[0];
		pjob->ji_mjspipe2 = mjspipe2[1];

		/*
		 * at this point, parent mom writes to
		 * pjob->ji_mjspipe2, and parent reads from
		 * pjob->ji_jsmpipe2
		 */

		/*
		 * if there are prologue hooks to run
		 * add the pipe to the connection table so we can poll it
		 */
		if (prolo_hooks > 0) {
			if ((conn = add_conn(jsmpipe2[0], ChildPipe,
					     (pbs_net_t) 0, (unsigned int) 0, NULL,
					     receive_pipe_request)) == NULL) {
				log_event(PBSEVENT_ERROR,
					  PBS_EVENTCLASS_JOB, LOG_ERR,
					  pjob->ji_qs.ji_jobid,
					  "Unable t0 start job... communication "
					  "connection table is full");
				(void) close(jsmpipe2[0]);
				(void) close(mjspipe2[1]);

				(void) close(jsmpipe[0]);
				(void) close(mjspipe[1]);

				if (pipe_script[0] != -1)
					(void) close(pipe_script[0]);
				if (pipe_script[1] != -1)
					(void) close(pipe_script[1]);
				exec_bail(pjob, JOB_EXEC_RETRY, NULL);
				return;
			}
			conn->cn_data = ptask;
		}

		/*
		 * if there are prologue hooks to run
		 * add the pipe to the connection table so we can poll it
		 */
		if (do_tolerate_node_failures(pjob)) {

			if ((conn = add_conn(child2parent_job_update_pipe[0], ChildPipe,
					     (pbs_net_t) 0, (unsigned int) 0, NULL,
					     receive_job_update_request)) == NULL) {
				log_event(PBSEVENT_ERROR,
					  PBS_EVENTCLASS_JOB, LOG_ERR,
					  pjob->ji_qs.ji_jobid,
					  "Unable to start job, communication connection table is full");
				(void) close(child2parent_job_update_pipe[0]);
				(void) close(parent2child_job_update_pipe[1]);

				(void) close(jsmpipe2[0]);
				(void) close(mjspipe2[1]);

				(void) close(jsmpipe[0]);
				(void) close(mjspipe[1]);

				if (pipe_script[0] != -1)
					(void) close(pipe_script[0]);
				if (pipe_script[1] != -1)
					(void) close(pipe_script[1]);
				exec_bail(pjob, JOB_EXEC_RETRY, NULL);
				return;
			}
			conn->cn_data = ptask;

			pjob->ji_child2parent_job_update_pipe = child2parent_job_update_pipe[0];
			pjob->ji_parent2child_job_update_pipe = parent2child_job_update_pipe[1];

			pjob->ji_parent2child_job_update_status_pipe = parent2child_job_update_status_pipe[1];
			pjob->ji_parent2child_moms_status_pipe = parent2child_moms_status_pipe[1];
		}

		if (ptc >= 0) {
			(void) close(ptc);
			ptc = -1;
		}

#if SHELL_INVOKE == 1
		if (is_interactive == 0) {
			char *s;
			char *d;
			char holdbuf[(2 * MAXPATHLEN) + 5];
			int k;

			if (*pjob->ji_qs.ji_fileprefix != '\0')
				sprintf(buf, "%s%s%s", path_jobs,
					pjob->ji_qs.ji_fileprefix, JOB_SCRIPT_SUFFIX);
			else
				sprintf(buf, "%s%s%s", path_jobs,
					pjob->ji_qs.ji_jobid, JOB_SCRIPT_SUFFIX);

			if (chown(buf, pjob->ji_qs.ji_un.ji_momt.ji_exuid,
					pjob->ji_qs.ji_un.ji_momt.ji_exgid) == -1)
					log_errf(-1, __func__, "chown failed. ERR : %s",strerror(errno));

			/* add escape in front of brackets */
			for (s = buf, d = holdbuf; *s && ((d - holdbuf) < sizeof(holdbuf)); s++, d++) {
				if (*s == '[' || *s == ']')
					*d++ = '\\';
				*d = *s;
			}
			*d = '\0';
			snprintf(buf, sizeof(buf), "%s", holdbuf);
			DBPRT(("shell: %s\n", buf))

			/* pass name of shell script on pipe	*/
			/* will be stdin of shell 		*/

			(void) close(pipe_script[0]);

			/* if in "sandbox=PRIVATE" mode, prepend the script name on the pipe */
			/* with "cd $PBS_JOBDIR;" command */
			if (sandbox_private) {
				snprintf(buf, sizeof(buf), "cd %s;%.*s", pbs_jobdir,
					 (int) (sizeof(buf) - strlen(pbs_jobdir) - 5), holdbuf);
			}

			(void) strcat(buf, "\n"); /* setup above */
			i = strlen(buf);
			j = 0;
			while (j < i) {
				if ((k = write(pipe_script[1], buf + j, i - j)) < 0) {
					if (errno == EINTR)
						continue;
					break;
				}
				j += k;
			}
			(void) close(pipe_script[1]);
		}

		if (pjob->ji_numnodes > 1 && !nodemux) {
			/*
			 * Put port numbers into job struct and close sockets.
			 * The job uses them to talk to demux, but main MOM
			 * doesn't need them.   The port numbers are stored
			 * here for use in start_process(), to connect to
			 * pbs_demux.
			 */
			(void) close(pjob->ji_stdout);
			pjob->ji_stdout = port_out;
			(void) close(pjob->ji_stderr);
			pjob->ji_stderr = port_err;
		}

		/* record job working directory in jobdir attribute */
		set_jattr_str_slim(pjob, JOB_ATR_jobdir, sandbox_private ? pbs_jobdir : pwdp->pw_dir, NULL);
#endif /* SHELL_INVOKE */

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
		if (is_jattr_set(pjob, JOB_ATR_cred_id))
			send_cred_sisters(pjob);
#endif

		return;

	} else if (cpid < 0) {
#if SHELL_INVOKE == 1
		if (pipe_script[0] != -1)
			(void) close(pipe_script[0]);
		if (pipe_script[1] != -1)
			(void) close(pipe_script[1]);
#endif /* SHELL_INVOKE */
		if (upfds != -1)
			(void) close(upfds);
		if (downfds != -1)
			(void) close(downfds);
		if (jsmpipe[0] != -1)
			(void) close(jsmpipe[0]);
		if (mjspipe[1] != -1)
			(void) close(mjspipe[1]);
		if (upfds2 != -1)
			(void) close(upfds2);
		if (downfds2 != -1)
			(void) close(downfds2);
		if (child2parent_job_update_pipe_w != -1)
			(void) close(child2parent_job_update_pipe_w);
		if (parent2child_job_update_pipe_r != -1)
			(void) close(parent2child_job_update_pipe_r);
		if (parent2child_job_update_status_pipe_r != -1)
			(void) close(parent2child_job_update_status_pipe_r);
		if (parent2child_moms_status_pipe_r != -1)
			(void) close(parent2child_moms_status_pipe_r);

		if (jsmpipe2[0] != -1)
			(void) close(jsmpipe2[0]);
		if (mjspipe2[1] != -1)
			(void) close(mjspipe2[1]);
		if (child2parent_job_update_pipe[0] != -1)
			(void) close(child2parent_job_update_pipe[0]);
		if (parent2child_job_update_pipe[1] != -1)
			(void) close(parent2child_job_update_pipe[1]);
		if (parent2child_job_update_status_pipe[1] != -1)
			(void) close(parent2child_job_update_status_pipe[1]);
		if (parent2child_moms_status_pipe[1] != -1)
			(void) close(parent2child_moms_status_pipe[1]);

		(void) sprintf(log_buffer, "Fork failed in %s: %d",
			       __func__, errno);
		exec_bail(pjob, JOB_EXEC_RETRY, log_buffer);
		return;
	}
	/************************************************/
	/*						*/
	/* The child process - will become THE JOB	*/
	/*						*/
	/************************************************/

	if (jsmpipe[0] != -1)
		(void) close(jsmpipe[0]);

	if (mjspipe[1] != -1)
		(void) close(mjspipe[1]);

	if (jsmpipe2[0] != -1)
		(void) close(jsmpipe2[0]);

	if (mjspipe2[1] != -1)
		(void) close(mjspipe2[1]);

	if (child2parent_job_update_pipe[0] != -1)
		(void) close(child2parent_job_update_pipe[0]);

	if (parent2child_job_update_pipe[1] != -1)
		(void) close(parent2child_job_update_pipe[1]);

	if (parent2child_job_update_status_pipe[1] != -1)
		(void) close(parent2child_job_update_status_pipe[1]);

	if (parent2child_moms_status_pipe[1] != -1)
		(void) close(parent2child_moms_status_pipe[1]);

	CLR_SJR(sjr) /* clear structure used to return info to parent */

	/* unprotect the job from the vagaries of the kernel */
	daemon_protect(0, PBS_DAEMON_PROTECT_OFF);

	/* set system core limit */
#if defined(RLIM64_INFINITY)
	(void) setrlimit64(RLIMIT_CORE, &orig_core_limit);
#else  /* set rlimit 32 bit */
	(void) setrlimit(RLIMIT_CORE, &orig_core_limit);
#endif /* RLIM64_INFINITY */

	/*
	 * find which shell to use, one specified or the login shell
	 */
	shell = set_shell(pjob, pwdp); /* in the machine dependent section */

	prolo_hooks = num_eligible_hooks(HOOK_EVENT_EXECJOB_PROLOGUE);

	/*
	 * set up the Environmental Variables to be given to the job
	 */
	vstrs = get_jattr_arst(pjob, JOB_ATR_variables);
	pjob->ji_env.v_ensize = vstrs->as_usedptr + num_var_else + num_var_env +
				EXTRA_ENV_PTRS;
	pjob->ji_env.v_used = 0;
	pjob->ji_env.v_envp = (char **) calloc(pjob->ji_env.v_ensize, sizeof(char *));
	if (pjob->ji_env.v_envp == NULL) {
		log_err(ENOMEM, __func__, "out of memory");
		starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
	}

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
	if (cred_by_job(ptask->ti_job, CRED_RENEWAL) != PBS_KRB5_OK) {
		starter_return(upfds, downfds, JOB_EXEC_FAIL_KRB5, &sjr);
	}

#if defined(HAVE_LIBKAFS) || defined(HAVE_LIBKOPENAFS)
	if (start_afslog(ptask, NULL, pipe_script[0], pipe_script[1]) != PBS_KRB5_OK) {
		sprintf(log_buffer, "afslog for task %8.8X not started",
			ptask->ti_qs.ti_task);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_ERR,
			  pjob->ji_qs.ji_jobid, log_buffer);
	}
#endif
#endif

	/*  First variables from the local environment */

	for (j = 0; j < num_var_env; ++j)
		bld_env_variables(&(pjob->ji_env), environ[j], NULL);

	/* Second, the variables passed with the job.  They may */
	/* be overwritten with new correct values for this job	*/

	for (j = 0; j < vstrs->as_usedptr; ++j) {
#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
			/* never set KRB5CCNAME; it would rewrite the correct value */
			if (strncmp(vstrs->as_string[j], "KRB5CCNAME", strlen("KRB5CCNAME")) == 0)
				continue;
#endif
		bld_env_variables(&(pjob->ji_env), vstrs->as_string[j], NULL);
	}

	/* .. Next the critical variables: home, path, logname, ... */
	/* these may replace some passed in with the job	    */

	/* HOME */
	bld_env_variables(&(pjob->ji_env), variables_else[0], pwdp->pw_dir); /* HOME */

	/* LOGNAME */
	bld_env_variables(&(pjob->ji_env), variables_else[1], pwdp->pw_name);

	/* PBS_JOBNAME */
	bld_env_variables(&(pjob->ji_env), variables_else[2], get_jattr_str(pjob, JOB_ATR_jobname));

	/* PBS_JOBID */
	bld_env_variables(&(pjob->ji_env), variables_else[3], pjob->ji_qs.ji_jobid);

	/* PBS_QUEUE */
	bld_env_variables(&(pjob->ji_env), variables_else[4], get_jattr_str(pjob, JOB_ATR_in_queue));

	/* SHELL */
	bld_env_variables(&(pjob->ji_env), variables_else[5], shell);

	/* USER, for compatability */
	bld_env_variables(&(pjob->ji_env), variables_else[6], pwdp->pw_name);

	/* PBS_JOBCOOKIE */
	bld_env_variables(&(pjob->ji_env), variables_else[7], get_jattr_str(pjob, JOB_ATR_Cookie));

	/* PBS_NODENUM */
	sprintf(buf, "%d", pjob->ji_nodeid);
	bld_env_variables(&(pjob->ji_env), variables_else[8], buf);

	/* PBS_TASKNUM */
	sprintf(buf, "%u", ptask->ti_qs.ti_task);
	bld_env_variables(&(pjob->ji_env), variables_else[9], buf);

	/* PBS_MOMPORT */
	sprintf(buf, "%u", pbs_rm_port);
	bld_env_variables(&(pjob->ji_env), variables_else[10], buf);

	/* OMP_NUM_THREADS and NCPUS eq to number of cpus */

	numthreads = pjob->ji_vnods[0].vn_threads;
	sprintf(buf, "%d", numthreads);
#ifdef NAS /* localmod 020 */
	/*
	 * If ompthreads specified, use it to set OMP_NUM_THREADS, else
	 * set OMP_NUM_THREADS=1
	 * (Cannot just leave it unset because then the MKL sparse solvers
	 * use every CPU in the system.)
	 */
	schedselect = get_jattr_str(pjob, JOB_ATR_SchedSelect);
	if (schedselect && strstr(schedselect, OMPTHREADS) != NULL)
		bld_env_variables(&(pjob->ji_env), variables_else[12], buf);
	else
		bld_env_variables(&(pjob->ji_env), variables_else[12], "1");
#else
	bld_env_variables(&(pjob->ji_env), variables_else[12], buf);
#endif /* localmod 020 */
	bld_env_variables(&(pjob->ji_env), "NCPUS", buf);

	/* PBS_NODEFILE */

	if (generate_pbs_nodefile(pjob, buf, sizeof(buf) - 1, log_buffer, LOG_BUF_SIZE - 1) == 0)
		bld_env_variables(&(pjob->ji_env), variables_else[11], buf);
	else {
		log_err(errno, __func__, log_buffer);
		starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
	}

	/* PBS_ACCOUNT */
	if (is_jattr_set(pjob, JOB_ATR_account))
		bld_env_variables(&(pjob->ji_env), variables_else[13], get_jattr_str(pjob, JOB_ATR_account));

	/* If an Sub job of an Array job, put in the index */

	if (strchr(pjob->ji_qs.ji_jobid, (int) '[') != NULL) {
		char *pparent;
		char *pindex;

		get_index_and_parent(pjob->ji_qs.ji_jobid, &pparent, &pindex);
		bld_env_variables(&(pjob->ji_env), variables_else[14], pindex);
		bld_env_variables(&(pjob->ji_env), variables_else[15], pparent);
	}

	/* if user specified umask for job, set it */
	if (is_jattr_set(pjob, JOB_ATR_umask)) {
		sprintf(buf, "%ld", get_jattr_long(pjob, JOB_ATR_umask));
		sscanf(buf, "%o", &j);
		umask(j);
	} else
		umask(077);

		/* Add TMPDIR to environment */
#ifdef NAS /* localmod 010 */
	(void) NAS_tmpdirname(pjob);
#endif /* localmod 010 */
	j = mktmpdir(pjob->ji_qs.ji_jobid,
		     pjob->ji_qs.ji_un.ji_momt.ji_exuid,
		     pjob->ji_qs.ji_un.ji_momt.ji_exgid,
		     &(pjob->ji_env));
	if (j != 0)
		starter_return(upfds, downfds, j, &sjr);

	/* set PBS_JOBDIR */
	if (sandbox_private) {
		/* Add PBS_JOBDIR if it doesn't already exist */
		j = mkjobdir(pjob->ji_qs.ji_jobid,
			     pbs_jobdir,
			     pjob->ji_qs.ji_un.ji_momt.ji_exuid,
			     pjob->ji_qs.ji_un.ji_momt.ji_exgid);
		if (j != 0) {
			sprintf(log_buffer, "unable to create the job directory %s",
				pbs_jobdir);
			log_joberr(errno, __func__, log_buffer, pjob->ji_qs.ji_jobid);
			starter_return(upfds, downfds, j, &sjr); /* exits */
		}
		bld_env_variables(&(pjob->ji_env), "PBS_JOBDIR", pbs_jobdir);
	} else {
		bld_env_variables(&(pjob->ji_env), "PBS_JOBDIR", pwdp->pw_dir);
	}

	mom_unnice();

	if (is_interactive) {
		struct sigaction act;
		char *termtype;
		char *phost;
		int qsub_sock;
		int old_qsub_sock;
		int pts; /* fd for slave pty */

		/*************************************************************************/
		/*		We have an "interactive" job, connect the standard	 */
		/*		streams to a socket connected to qsub.			 */
		/*************************************************************************/

		sigemptyset(&act.sa_mask);
#ifdef SA_INTERRUPT
		act.sa_flags = SA_INTERRUPT;
#else
		act.sa_flags = 0;
#endif /* SA_INTERRUPT */
		act.sa_handler = no_hang;
		(void) sigaction(SIGALRM, &act, NULL);
		alarm(30);

		/* Set environment to reflect interactive */

		bld_env_variables(&(pjob->ji_env), "PBS_ENVIRONMENT", "PBS_INTERACTIVE");

		/* get host where qsub resides */

		phost = arst_string("PBS_O_HOST", get_jattr(pjob, JOB_ATR_variables));
		if ((phost == NULL) ||
		    ((phost = strchr(phost, (int) '=')) == NULL)) {
			log_joberr(-1, __func__, "PBS_O_HOST not set",
				   pjob->ji_qs.ji_jobid);
			starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
		}

		qsub_sock = conn_qsub(phost + 1, get_jattr_long(pjob, JOB_ATR_interactive));
		if (qsub_sock < 0) {
			sprintf(log_buffer, "cannot open qsub sock for %s",
				pjob->ji_qs.ji_jobid);
			log_err(errno, __func__, log_buffer);
			starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
		}

		old_qsub_sock = qsub_sock;
		FDMOVE(qsub_sock);

		if (get_jattr_str(pjob, JOB_ATR_X11_cookie)) {
			char display[X_DISPLAY_LEN];

			if ((socks = calloc(sizeof(struct pfwdsock), NUM_SOCKS)) == NULL) {
				/* FAILURE - cannot alloc memory */
				log_err(errno, __func__, "ERROR: could not calloc!\n");
				starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
			}
			display_number = init_x11_display(socks, 1, /* use localhost only */
							  display, pjob->ji_grpcache->gc_homedir,
							  get_jattr_str(pjob, JOB_ATR_X11_cookie));

			if (display_number >= 0) {
				bld_env_variables(&(pjob->ji_env), "DISPLAY", display);
			} else {
				log_err(errno, __func__, "PBS: X11 forwarding init failed\n");
				starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
			}
		}

		if (qsub_sock != old_qsub_sock) {

			if (CS_remap_ctx(old_qsub_sock, qsub_sock) != CS_SUCCESS) {

				(void) CS_close_socket(old_qsub_sock);
				starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
			}
		}

		/* send job id as validation to qsub */

		if (CS_write(qsub_sock, pjob->ji_qs.ji_jobid, PBS_MAXSVRJOBID + 1) !=
		    PBS_MAXSVRJOBID + 1) {
			log_err(errno, __func__, "cannot write jobid");
			starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
		}

		/* receive terminal type and window size */

		if ((termtype = rcvttype(qsub_sock)) == NULL)
			starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);

		bld_env_variables(&(pjob->ji_env), termtype, NULL);

		if (rcvwinsize(qsub_sock) == -1)
			starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);

		/* turn off alarm set around qsub connect activities */

		alarm(0);
		act.sa_handler = SIG_DFL;
		act.sa_flags = 0;
		(void) sigaction(SIGALRM, &act, NULL);

		/* set up the Job session */

		j = set_job(pjob, &sjr);
		if (j < 0) {
			if (j == -1) {
				/* set_job didn't leave message in log_buffer */
				(void) strcpy(log_buffer, "Unable to set session");
			}
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB,
				  LOG_NOTICE, pjob->ji_qs.ji_jobid, log_buffer);
			starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
		}
#if MOM_ALPS
		sjr.sj_code = JOB_EXEC_UPDATE_ALPS_RESV_ID;
		(void) writepipe(upfds, &sjr, sizeof(sjr));

		/* wait for acknowledgement */
		(void) readpipe(downfds, &ack, sizeof(ack));
#endif

		/* Open the slave pty as the controlling tty */

		if ((pts = open_pty(pjob)) < 0) {
			log_err(errno, __func__, "cannot open slave");
			starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
		}

		act.sa_handler = SIG_IGN; /* setup to ignore SIGTERM */

		writerpid = fork();
		if (writerpid == 0) {
			/* child is "writer" process */

			(void) sigaction(SIGTERM, &act, NULL);

			(void) close(upfds);
			(void) close(downfds);
			(void) close(upfds2);
			(void) close(downfds2);
			(void) close(child2parent_job_update_pipe_w);
			(void) close(parent2child_job_update_pipe_r);
			(void) close(parent2child_job_update_status_pipe_r);
			(void) close(parent2child_moms_status_pipe_r);
			(void) close(pts);
			/*Closing the inherited post forwarded listening socket  */
			if (get_jattr_str(pjob, JOB_ATR_X11_cookie)) {
				for (n = 0; n < NUM_SOCKS; n++) {
					if (socks[n].active)
						close(socks[n].sock);
				}
			}

			int res = mom_writer(qsub_sock, ptc);
			/* Inside mom_writer, if read is successful and write fails then it is an error and hence logging here as error for -1 */
			if (res == -1)
				log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ERR, pjob->ji_qs.ji_jobid, "CS_write failed with errno %d", errno);
			else if (res == -2)
				log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "read failed with errno %d", errno);

			shutdown(qsub_sock, 2);
			exit(0);

		} else if (writerpid > 0) {
			/*
			 ** parent -- it first runs the prolog then forks
			 ** again.  the child becomes the job while the
			 ** parent becomes the reader.
			 */

			(void) close(1);
			(void) close(2);
			(void) dup2(pts, 1);
			(void) dup2(pts, 2);
			fflush(stdout);
			fflush(stderr);
			set_termcc(pts);	/* set terminal control char */
			(void) setwinsize(pts); /* set window size to qsub's */
			if (do_tolerate_node_failures(pjob) && (get_failed_moms_and_vnodes(pjob, parent2child_moms_status_pipe_r, -1, &vnl_fails, &vnl_good, 1) != 0)) {
				FREE_VNLS(vnl_fails, vnl_good);
				starter_return(upfds, downfds,
					       JOB_EXEC_RETRY, &sjr);
			}

			/* run prolog */
			if (prolo_hooks > 0) {

				mom_hook_input_init(&hook_input);
				hook_input.pjob = pjob;
				if (do_tolerate_node_failures(pjob)) {
					hook_input.vnl_fail = (vnl_t *) vnl_fails;
					hook_input.failed_mom_list = &pjob->ji_failed_node_list;
					hook_input.succeeded_mom_list = &pjob->ji_node_list;
				}

				mom_hook_output_init(&hook_output);
				hook_output.reject_errcode = &hook_errcode;
				hook_output.last_phook = &last_phook;
				hook_output.fail_action = &hook_fail_action;

				hook_rc =
					mom_process_hooks(HOOK_EVENT_EXECJOB_PROLOGUE,
							  PBS_MOM_SERVICE_NAME,
							  mom_host, &hook_input, &hook_output,
							  hook_msg, sizeof(hook_msg), 0);
			} else { /* no runnable hooks */
				/* don't execute any prologue hook */
				/* as no prologue hooks are runnable */
				hook_rc = 2;
			}

			switch (hook_rc) {

				case 0: /* explicit reject */
					if (hook_errcode == PBSE_HOOK_REJECT_DELETEJOB) {
						starter_return(upfds, downfds,
							       JOB_EXEC_FAILHOOK_DELETE, &sjr);
					} else if (hook_errcode == PBSE_HOOKERROR) {
						starter_return(upfds, downfds,
							       JOB_EXEC_HOOKERROR, &sjr);
					} else {
						/* rerun is the default in prologue */
						starter_return(upfds, downfds,
							       JOB_EXEC_FAILHOOK_RERUN, &sjr);
					}
					return;
				case 1: /* explicit accept */
					if (send_pipe_request(upfds2, downfds2, IM_EXEC_PROLOGUE) != 0) {
						log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
							  LOG_INFO, pjob->ji_qs.ji_jobid,
							  "warning: send of IM_EXEC_PROLOGUE to parent mom failed");
					}
					if (do_tolerate_node_failures(pjob))
						send_update_job(pjob, child2parent_job_update_pipe_w, parent2child_job_update_pipe_r, parent2child_job_update_status_pipe_r);
					break;
				case 2:
					/* no hook script executed - execute old-style prologue */
					if (run_pelog(PE_PROLOGUE,
						      path_prolog, pjob,
						      PE_IO_TYPE_ASIS) != 0) {
						(void) fprintf(stderr,
							       "Could not run prolog: %s\n",
							       log_buffer);
						starter_return(upfds, downfds,
							       JOB_EXEC_FAIL2, &sjr);
					}
					break;
				default:
					log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
						  LOG_INFO, "",
						  "prologue hook event: accept req by default");
					if (send_pipe_request(upfds2, downfds2, IM_EXEC_PROLOGUE) != 0) {
						log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
							  LOG_INFO, pjob->ji_qs.ji_jobid,
							  "warning: send of IM_EXEC_PROLOGUE to parent mom failed");
					}
					if (do_tolerate_node_failures(pjob))
						send_update_job(pjob, child2parent_job_update_pipe_w, parent2child_job_update_pipe_r, parent2child_job_update_status_pipe_r);
			}

			shellpid = fork();
			if (shellpid == 0) {

				/*********************************************/
				/* child - this will be the interactive job  */
				/* i/o is to slave tty			     */
				/*********************************************/

				(void) close(0);
				(void) dup2(pts, 0);
				fflush(stdin);

				(void) close(ptc); /* close master side */
				ptc = -1;
				(void) close(pts); /* dup'ed above */
				(void) close(qsub_sock);

				/* continue setting up and exec-ing shell */

			} else {
				if (shellpid > 0) {
					/* fork, parent is "reader" process  */
					(void) sigaction(SIGTERM, &act, NULL);

					if (pts != -1)
						(void) close(pts);
					if (upfds != -1)
						(void) close(upfds);
					if (downfds != -1)
						(void) close(downfds);
					if (upfds2 != -1)
						(void) close(upfds2);
					if (downfds2 != -1)
						(void) close(downfds2);
					if (child2parent_job_update_pipe_w != -1)
						(void) close(child2parent_job_update_pipe_w);
					if (parent2child_job_update_pipe_r != -1)
						(void) close(parent2child_job_update_pipe_r);
					if (parent2child_job_update_status_pipe_r != -1)
						(void) close(parent2child_job_update_status_pipe_r);
					if (parent2child_moms_status_pipe_r != -1)
						(void) close(parent2child_moms_status_pipe_r);
					(void) close(1);
					(void) close(2);

					sigemptyset(&act.sa_mask);
					act.sa_flags = SA_NOCLDSTOP;
					act.sa_handler = catchinter;
					(void) sigaction(SIGCHLD, &act,
							 NULL);

					mom_reader_go = 1;
					/* prepare shell command "cd $PBS_JOBDIR" if in sandbox=PRIVATE mode */
					if (sandbox_private) {
						sprintf(buf, "cd %s\n", pbs_jobdir);
					} else {
						buf[0] = '\0';
					}
					if ((is_interactive == TRUE) &&
					    get_jattr_str(pjob, JOB_ATR_X11_cookie)) {
						if (sandbox_private) {
							/* Change to $PBS_JOBDIR before
							 blocking waiting for data */
							if (setcurrentworkdir(buf)) {
								log_err(errno, __func__,
									"Setting Private Sandbox directory Failed");
								starter_return(upfds, downfds,
									       JOB_EXEC_FAIL2, &sjr);
							}
						}
						port_forwarder(socks, conn_qsub, phost + 1,
							       get_jattr_long(pjob, JOB_ATR_X11_port),
							       qsub_sock, mom_reader_Xjob,
							       log_mom_portfw_msg);
					} else {
						int res = mom_reader(qsub_sock, ptc, buf);
						/* Inside mom_reader, if read is successful and write fails then it is an error and hence logging here as error for -1 */
						if (res == -1)
							log_eventf(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ERR, pjob->ji_qs.ji_jobid, "Write failed with errno %d", errno);
						else if (res == -2)
							log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "CS_read failed with errno %d", errno);
					}
				} else {
					log_err(errno, __func__,
						"cant fork reader");
				}

				/* make sure qsub gets EOF */

				shutdown(qsub_sock, 2);

				/* change pty back to available after */
				/* job is done */
				if (chmod(pts_name, 0666) == -1) 
					log_errf(-1, __func__, "chmod failed. ERR : %s",strerror(errno));			
				if (chown(pts_name, 0, 0) == -1) 
					log_errf(-1, __func__, "chown failed. ERR : %s",strerror(errno));		
				exit(0);
			}
		} else { /* error */
			log_err(errno, __func__, "cannot fork nanny");

			/* change pty back to available */
			if (chmod(pts_name, 0666) == -1) 
				log_errf(-1, __func__, "chmod failed. ERR : %s",strerror(errno));		
			if (chown(pts_name, 0, 0) == -1) 
				log_errf(-1, __func__, "chown failed. ERR : %s",strerror(errno));			

			starter_return(upfds, downfds, JOB_EXEC_RETRY, &sjr);
		}

	} else {

		/*************************************************************************/
		/*		We have a "normal" batch job, connect the standard	 */
		/*		streams to files					 */
		/*************************************************************************/

		/* set Environment to reflect batch */

		bld_env_variables(&(pjob->ji_env), "PBS_ENVIRONMENT", "PBS_BATCH");
		bld_env_variables(&(pjob->ji_env), "ENVIRONMENT", "BATCH");

#if SHELL_INVOKE == 1
		/* if passing script file name as input to shell */

		(void) close(pipe_script[1]);
		script_in = pipe_script[0];
#else  /* SHELL_INVOKE == 0 */
		/* if passing script itself as input to shell */

		(void) strcpy(buf, path_jobs);
		if (*pjob->ji_qs.ji_fileprefix != '\0')
			(void) strcat(buf, pjob->ji_qs.ji_fileprefix);
		else
			(void) strcat(buf, pjob->ji_qs.ji_jobid);
		(void) strcat(buf, JOB_SCRIPT_SUFFIX);
		if ((script_in = open(buf, O_RDONLY, 0)) < 0) {
			if (errno == ENOENT)
				script_in = open("/dev/null", O_RDONLY, 0);
		}
#endif /* SHELL_INVOKE */
		if (!is_jattr_set(pjob, JOB_ATR_executable)) {
			/*
			 * user has passed executable and argument list as
			 * as command-line options to qsub (i.e after -- flag
			 * so, no need to check for script file)
			 */
			if (script_in < 0) {
				log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ERR,
					  pjob->ji_qs.ji_jobid,
					  "Unable to open script");
				starter_return(upfds, downfds, JOB_EXEC_FAIL1, &sjr);
			}
			FDMOVE(script_in); /* make sure descriptor > 2       */
			if (script_in != 0) {
				close(0);
				if (dup(script_in) == -1) 
					log_errf(-1, __func__, "dup failed. ERR : %s",strerror(errno));			
				close(script_in);
			}
		}

		if (open_std_out_err(pjob) == -1) {
			starter_return(upfds, downfds, JOB_EXEC_RETRY, &sjr);
		}

		/* After the error is redirected, stderr does not have a valid FILE* */
		temp_stderr = fdopen(STDERR_FILENO, "w");
		/* If we could not get the valid FILE*, let temp_stderr point to stderr to avoid
		 * a possible crash in subsequent calls to output functions like printf/fprintf */
		if (!temp_stderr)
			temp_stderr = stderr;
		/* set up the Job session */

		j = set_job(pjob, &sjr);
		if (j < 0) {
			if (j == -1) {
				/* set_job didn't leave message in log_buffer */
				(void) strcpy(log_buffer, "Unable to set session");
			}
			/* set_job leaves message in log_buffer */
			(void) fprintf(temp_stderr, "%s\n", log_buffer);

			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
				  pjob->ji_qs.ji_jobid, log_buffer);

			if (j == -3)
				j = JOB_EXEC_FAIL2;
			else
				j = JOB_EXEC_RETRY;
			starter_return(upfds, downfds, j, &sjr);
		}
		if (do_tolerate_node_failures(pjob) &&
		    (get_failed_moms_and_vnodes(pjob, downfds2, -1, &vnl_fails, &vnl_good, 1) != 0)) {
			FREE_VNLS(vnl_fails, vnl_good);
			starter_return(upfds, downfds, JOB_EXEC_RETRY, &sjr);
		}
		/* run prologue hooks */

		if (prolo_hooks > 0) {
			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;
			if (do_tolerate_node_failures(pjob)) {
				hook_input.vnl_fail = (vnl_t *) vnl_fails;
				hook_input.failed_mom_list = &pjob->ji_failed_node_list;
				hook_input.succeeded_mom_list = &pjob->ji_node_list;
			}

			mom_hook_output_init(&hook_output);
			hook_output.reject_errcode = &hook_errcode;
			hook_output.last_phook = &last_phook;
			hook_output.fail_action = &hook_fail_action;

			hook_rc =
				mom_process_hooks(HOOK_EVENT_EXECJOB_PROLOGUE,
						  PBS_MOM_SERVICE_NAME,
						  mom_host, &hook_input, &hook_output,
						  hook_msg, sizeof(hook_msg), 0);
		} else { /* no runnable hooks */
			/* don't execute any prologue hook */
			/* as no prologue hooks are runnable */
			hook_rc = 2;
		}

		switch (hook_rc) {

			case 0: /* explicit reject */
				if (hook_errcode == PBSE_HOOK_REJECT_DELETEJOB) {
					starter_return(upfds, downfds,
						       JOB_EXEC_FAILHOOK_DELETE, &sjr);
				} else if (hook_errcode == PBSE_HOOKERROR) {
					starter_return(upfds, downfds,
						       JOB_EXEC_HOOKERROR, &sjr);
				} else { /* rerun is the default */
					starter_return(upfds, downfds,
						       JOB_EXEC_FAILHOOK_RERUN, &sjr);
				}
			case 1: /* explicit accept */
				if (send_pipe_request(upfds2, downfds2, IM_EXEC_PROLOGUE) != 0) {
					log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
						  LOG_INFO, pjob->ji_qs.ji_jobid,
						  "warning: send of IM_EXEC_PROLOGUE to parent mom failed");
				}
				if (do_tolerate_node_failures(pjob))
					send_update_job(pjob, child2parent_job_update_pipe_w, parent2child_job_update_pipe_r, parent2child_job_update_status_pipe_r);
				break;
			case 2:
				/* no hook script executed - execute old-style prologue */
				if ((j = run_pelog(PE_PROLOGUE,
						   path_prolog, pjob, PE_IO_TYPE_ASIS)) == 1) {
					/* abort job */
					(void) fprintf(temp_stderr,
						       "Could not run prolog: %s\n", log_buffer);
					starter_return(upfds, downfds, JOB_EXEC_FAIL2,
						       &sjr);
				} else if (j != 0) {
					/* requeue job */
					starter_return(upfds, downfds, JOB_EXEC_RETRY,
						       &sjr);
				}
				break;
			default:
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
					  LOG_INFO, "",
					  "prologue hook event: accept req by default");
				if (send_pipe_request(upfds2, downfds2, IM_EXEC_PROLOGUE) != 0) {
					log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_JOB,
						  LOG_INFO, pjob->ji_qs.ji_jobid,
						  "warning: send of IM_EXEC_PROLOGUE to parent mom failed");
				}
				if (do_tolerate_node_failures(pjob))
					send_update_job(pjob, child2parent_job_update_pipe_w, parent2child_job_update_pipe_r, parent2child_job_update_status_pipe_r);
		}
	}

	/*************************************************************************/
	/*	Set resource limits				 		 */
	/*	Both normal batch and interactive job come through here 	 */
	/*************************************************************************/

	set_jattr_l_slim(pjob, JOB_ATR_session_id, sjr.sj_session, SET);
	if (site_job_setup(pjob) != 0) {
		starter_return(upfds, downfds,
			       JOB_EXEC_FAIL2, &sjr); /* exits */
	}

	i = 0;

	/* if RLIMIT_NPROC is definded,  the value set when Mom was */
	/* invoked was saved,  reset that limit for the job	    */
#ifdef RLIMIT_NPROC
#ifdef RLIM64_INFINITY
	if ((i = setrlimit64(RLIMIT_NPROC, &orig_nproc_limit)) == -1) {
		(void) sprintf(log_buffer,
			       "Unable to restore NPROC limits, err=%d", errno);
	}
#else  /* RLIM64... */
	if ((i = setrlimit(RLIMIT_NPROC, &orig_nproc_limit)) == -1) {
		(void) sprintf(log_buffer,
			       "Unable to restore NPROC limits, err=%d", errno);
	}
#endif /* RLIM64... */
#endif /* RLIMIT_NPROC */
	if (i == 0) {
		/* now set all other kernel enforced limits on the job */
		if ((i = mom_set_limits(pjob, SET_LIMIT_SET)) != PBSE_NONE) {
			(void) sprintf(log_buffer, "Unable to set limits, err=%d", i);
		}
	}
	if (i != 0) {
		/* if we had a setlimit error, fail the job */
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ERR,
			  pjob->ji_qs.ji_jobid, log_buffer);
		if (i == PBSE_RESCUNAV) { /* resource temp unavailable */
			if (is_interactive)
				j = JOB_EXEC_FAIL2;
			else
				j = JOB_EXEC_RETRY;
		} else
			j = JOB_EXEC_FAIL2;
		starter_return(upfds, downfds, j, &sjr); /* exits */
	}
	endpwent();

	job_has_executable = 0;
	if (is_jattr_set(pjob, JOB_ATR_executable)) {
		/*
		 * Call decode_xml_arg_list to decode XML string
		 * and store executable in shell and argument list in argv.
		 */
		if (decode_xml_arg_list(get_jattr_str(pjob, JOB_ATR_executable),
					get_jattr_str(pjob, JOB_ATR_Arglist), &shell, &argv) != 0) {
			starter_return(upfds, downfds, JOB_EXEC_FAIL2, &sjr);
		}
		job_has_executable = 1;
	}

	if (do_tolerate_node_failures(pjob) && (prolo_hooks > 0)) {

		/* free up from previous execjob_prologue hook */
		FREE_VNLS(vnl_fails, vnl_good);

		if (pjob->ji_numnodes > 1) {
			snprintf(log_buffer, sizeof(log_buffer), "waiting up to %ld secs ($job_launch_delay) for mom hosts status and prologue hooks ack", job_launch_delay);
			log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
			/* a filled-in log_buffer could be mistaken for an error message */
			log_buffer[0] = '\0';

			if (get_failed_moms_and_vnodes(pjob, parent2child_moms_status_pipe_r, (prolo_hooks > 0) ? downfds2 : -1, &vnl_fails, &vnl_good, job_launch_delay) != 0) {
				FREE_VNLS(vnl_fails, vnl_good);
				starter_return(upfds, downfds, JOB_EXEC_RETRY, &sjr);
			}
		}
	}

	the_progname = shell;
	the_argv = argv;

	/* NULL terminate the envp array */
	*((pjob->ji_env).v_envp + (pjob->ji_env).v_used) = NULL;
	the_env = (pjob->ji_env).v_envp;

	mom_hook_input_init(&hook_input);
	hook_input.pjob = pjob;
	hook_input.progname = the_progname;
	hook_input.argv = the_argv;
	hook_input.env = the_env;

	if (do_tolerate_node_failures(pjob)) {
		hook_input.vnl_fail = (vnl_t *) vnl_fails;
		hook_input.failed_mom_list = &pjob->ji_failed_node_list;
		hook_input.succeeded_mom_list = &pjob->ji_node_list;
	}

	mom_hook_output_init(&hook_output);
	hook_output.reject_errcode = &hook_errcode;
	hook_output.last_phook = &last_phook;
	hook_output.fail_action = &hook_fail_action;
	hook_output.progname = &progname;
	CLEAR_HEAD(argv_list);
	hook_output.argv = &argv_list;

	switch (mom_process_hooks(HOOK_EVENT_EXECJOB_LAUNCH,
				  PBS_MOM_SERVICE_NAME,
				  mom_host, &hook_input, &hook_output,
				  hook_msg, sizeof(hook_msg), 0)) {

		case 0: /* explicit reject */
			free(progname);
			free_attrlist(&argv_list);
			free_str_array(hook_output.env);
			if (do_tolerate_node_failures(pjob))
				FREE_VNLS(vnl_fails, vnl_good);

			if (hook_errcode == PBSE_HOOK_REJECT_RERUNJOB) {
				starter_return(upfds, downfds,
					       JOB_EXEC_FAILHOOK_RERUN, &sjr);
			} else {
				starter_return(upfds, downfds,
					       JOB_EXEC_FAILHOOK_DELETE, &sjr);
			}
		case 1: /* explicit accept */
			if (progname != NULL)
				the_progname = progname;

			the_argv = svrattrl_to_str_array(&argv_list);
			if (the_argv == NULL) {
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
					  LOG_INFO, "",
					  "execjob_launch hook returned NULL argv!");
				free(progname);
				free_attrlist(&argv_list);
				free_str_array(hook_output.env);
				if (do_tolerate_node_failures(pjob))
					FREE_VNLS(vnl_fails, vnl_good);

				starter_return(upfds, downfds,
					       JOB_EXEC_FAILHOOK_DELETE, &sjr);
			}
			res_env = hook_output.env;

			if (res_env == NULL) {
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
					  LOG_INFO, "",
					  "execjob_launch hook NULL env!");
				free(progname);
				free_attrlist(&argv_list);
				free_str_array(the_argv);
				if (do_tolerate_node_failures(pjob))
					FREE_VNLS(vnl_fails, vnl_good);

				starter_return(upfds, downfds,
					       JOB_EXEC_FAILHOOK_DELETE, &sjr);
			}

			/* clear the env array */
			(pjob->ji_env).v_used = 0;
			(pjob->ji_env).v_envp[0] = NULL;

			/* need to also set vtable as that would */
			/* get appended to later in the code */
			/* vtable holds the environmnent variables */
			/* and their values that are going to be */
			/* part of the job. */
			k = 0;
			while (res_env[k]) {
				char *n, *v, *p;
				if ((p = strchr(res_env[k], '=')) != NULL) {
					*p = '\0';
					n = res_env[k];
					v = p + 1;
					bld_env_variables(&(pjob->ji_env),
							  n, v);
					*p = '=';
				}
				k++;
			}
			the_env = pjob->ji_env.v_envp;
			if (do_tolerate_node_failures(pjob))
				send_update_job(pjob, child2parent_job_update_pipe_w, parent2child_job_update_pipe_r, parent2child_job_update_status_pipe_r);

			break;
		case 2: /* no hook script executed - go ahead and accept event */
			break;
		default:
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
				  LOG_INFO, "",
				  "execjob_launch hook event: accept req by default");
	}

	if (do_tolerate_node_failures(pjob))
		FREE_VNLS(vnl_fails, vnl_good);

	/* if job has executable (submitted as qsub -- <progname> <argv>), then */
	/* <progname> and <argv> take precedence so they must not be passed to */
	/* set_credential(), which would modify them. */
	if (set_credential(pjob, job_has_executable ? NULL : &the_progname,
			   job_has_executable ? NULL : &the_argv) == -1) {
		starter_return(upfds, downfds,
			       JOB_EXEC_FAIL2, &sjr); /* exits */
	}

	/* include any new env settings added by set_credential. */
	the_env = pjob->ji_env.v_envp;
	*(pjob->ji_env.v_envp + pjob->ji_env.v_used) = NULL;

	/*
	 * If JOB_ATR_executable is set, and job is in "sandbox=PRIVATE" mode,
	 * change working directory to PBS_JOBDIR and run the executable.
	 * If JOB_ATR_executable attribute is unset,
	 * change working directory to User's Home.
	 * If in "sandbox=PRIVATE" mode, it is preferable to start in User's HOME
	 * in order to process user's "dot" files in the login shell,
	 * but if user's Home does not exist, start in PBS_JOBDIR.
	 *
	 * Note that even while job process is started in user's Home,
	 * when "sandbox" is "PRIVATE", "cd $PBS_JOBDIR" is prepended to the job script name,
	 * so job script is executed in $PBS_JOBDIR after "dot" files from user's Home are processed.
	 * See the code for the forked parent (about 700 lines above), look for the comment:
	 * "the parent side, still the main man, uhh that is MOM"
	 */
	if (is_jattr_set(pjob, JOB_ATR_executable) && sandbox_private) {
		if (!pbs_jobdir || chdir(pbs_jobdir) == -1) {
			log_event(PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB,
				  LOG_ERR, pjob->ji_qs.ji_jobid,
				  "sandbox=PRIVATE mode: Could not chdir to job directory\n");
			starter_return(upfds, downfds, JOB_EXEC_FAIL2, &sjr);
			return;
		}
	} else if (chdir(pwdp->pw_dir) == -1) {
		log_event(PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB,
			  LOG_ERR, pjob->ji_qs.ji_jobid,
			  "Could not chdir to Home directory");
		(void) fprintf(temp_stderr, "Could not chdir to home directory\n");
		/* check if "qsub -k[oe]" was specified */
		if (((is_jattr_set(pjob, JOB_ATR_keep)) &&
		     ((strchr(get_jattr_str(pjob, JOB_ATR_keep), 'o')) ||
		      (strchr(get_jattr_str(pjob, JOB_ATR_keep), 'e')))) &&
		    !sandbox_private) {
			/* user Home is required for job output if "qsub -k[oe]" was specified
			 * and not in sandbox=private mode, so error out.
			 */
			starter_return(upfds, downfds, JOB_EXEC_FAIL2, &sjr);
			return;
		} else if (sandbox_private) {
			/* "sandbox=PRIVATE" mode is active, so job can be started in PBS_JOBDIR instead of user Home */
			if ((!pbs_jobdir) || (chdir(pbs_jobdir) == -1)) {
				log_event(PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB,
					  LOG_ERR, pjob->ji_qs.ji_jobid,
					  "sandbox=PRIVATE mode: Could not chdir to job directory\n");
				starter_return(upfds, downfds, JOB_EXEC_FAIL2, &sjr);
			}
			/* an else case for O_WORKDIR should be added here */
		} else {
			/* nothing special specified, so job must be started in user Home  */
			starter_return(upfds, downfds, JOB_EXEC_FAIL2, &sjr);
			return;
		}
	}

	/* tell mom we are going */
	starter_return(upfds, downfds, JOB_EXEC_OK, &sjr);
	log_close(0);

	if ((pjob->ji_numnodes == 1) || nodemux || ((cpid = fork()) > 0)) {
		/* parent does the shell */
		FILE *f;

		/* close sockets that child uses */
		(void) close(pjob->ji_stdout);
		(void) close(pjob->ji_stderr);
		if ((is_interactive == TRUE) &&
		    get_jattr_str(pjob, JOB_ATR_X11_cookie)) {
			char auth_display[X_DISPLAY_LEN];
			char cmd[X_DISPLAY_LEN];
			char format[X_DISPLAY_LEN];
			char x11proto[X_DISPLAY_LEN];
			char x11data[X_DISPLAY_LEN];
			char x11authstr[X_DISPLAY_LEN];
			unsigned int x11screen;
			int ret;

			x11proto[0] = x11data[0] = '\0';
			format[0] = '\0';

			sprintf(format, " %%%d[^:]: %%%d[^:]: %%u",
				X_DISPLAY_LEN - 1, X_DISPLAY_LEN - 1);

			/*getting the cookie data from the job attributes*/
			strcpy(x11authstr,
			       get_jattr_str(pjob, JOB_ATR_X11_cookie));

			/**
			 * parsing cookie to get X11 protocol,
			 * hex data and screen number
			 */
			if ((n = sscanf(x11authstr, format,
					x11proto,
					x11data,
					&x11screen)) != 3) {
				sprintf(log_buffer, "sscanf(%s)=%d failed: %s\n",
					x11authstr,
					n,
					strerror(errno));
				log_err(errno, __func__, log_buffer);
				log_close(0);
				return;
			}
			ret = snprintf(auth_display, sizeof(auth_display),
				       "unix:%d.%u",
				       display_number,
				       x11screen);
			if (ret >= sizeof(auth_display)) {
				log_err(-1, __func__, " auth_display overflow");
				log_close(0);
				return;
			}
			if (!sandbox_private) {
				/*Fetching XAUTHORITY from job environment if present*/
				int xauth_index;
				if ((xauth_index = find_env_slot(&(pjob->ji_env), "XAUTHORITY=")) != -1) {
					char *xauth_file = strchr(pjob->ji_env.v_envp[xauth_index], (int) '=') + 1;
					ret = snprintf(cmd, sizeof(cmd), "%s -f %s -q -", XAUTH_BINARY, xauth_file);
				} else {
					ret = snprintf(cmd, sizeof(cmd), "%s -q -",
						       XAUTH_BINARY);
				}
				if (ret >= sizeof(cmd)) {
					log_err(-1, __func__, " cmd overflow ");
					log_close(0);
					return;
				}
			} else {
				char var[MAXPATHLEN + 1];
				sprintf(var, "%s/.Xauthority", pbs_jobdir);
				ret = snprintf(cmd, sizeof(cmd),
					       "%s -f %s/.Xauthority -q -",
					       XAUTH_BINARY, pbs_jobdir);
				if (ret >= sizeof(cmd)) {
					log_err(-1, __func__, " cmd overflow ");
					log_close(0);
					return;
				}
				bld_env_variables(&(pjob->ji_env), "XAUTHORITY", var);
			}
			f = popen(cmd, "w");
			if (f != NULL) {
				/**
				 *  executing commands to add new display
				 *  in Xauthority file
				 */
				fprintf(f, "remove %s\n ", auth_display);
				fprintf(f, "add %s %s %s\n", auth_display,
					x11proto,
					x11data);
				pclose(f);
			} else {
				sprintf(log_buffer, "could not run %s\n", cmd);
				log_err(errno, __func__, log_buffer);
				log_close(0);
				return;
			}
		}

		/* include any new env settings added. */
		the_env = pjob->ji_env.v_envp;
		*(pjob->ji_env.v_envp + pjob->ji_env.v_used) = NULL;

		execve(the_progname, the_argv, the_env);
		free(progname);
		free_attrlist(&argv_list);
		free_str_array(the_argv);
		the_argv = NULL;
		free_str_array(hook_output.env);
		free_str_array(the_env);
		the_env = NULL;
	} else if (cpid == 0) { /* child does demux */
		char *arg[2];
		char *shellname;

		/* setup descriptors 3 and 4 */
		(void) dup2(pjob->ji_stdout, 3);
		if (pjob->ji_stdout > 3)
			close(pjob->ji_stdout);
		(void) dup2(pjob->ji_stderr, 4);
		if (pjob->ji_stderr > 4)
			close(pjob->ji_stderr);

		/* construct argv array */
		shell = pbs_conf.pbs_demux_path;
		shellname = strrchr(shell, '/');
		if (shellname)
			++shellname; /* go past last '/' */
		else
			shellname = shell;
		arg[0] = shellname;
		arg[1] = NULL;

		/* we're purposely not calling log_close() here */
		/* for this causes a side-effect. log_close() would */
		/* do an fclose(<logfile>), but its file position */
		/* is still shared with the parent mom, which */
		/* could be writing to the <logfile>. */
		execve(shell, arg, pjob->ji_env.v_envp);
	}
	fprintf(temp_stderr, "pbs_mom, exec of %s failed with error: %s\n",
		shell, strerror(errno));
	exit(254); /* should never, ever get here */
}

/**
 * @brief
 * 	Start a process for a spawn request.  This will be different from
 * 	a job's initial shell task in that the environment will be specified
 * 	and no interactive code need be included.
 *
 * @param[in] ptask - pointer to task structure
 * @param[in] argv - argument list
 * @param[in] envp - pointer to environment variable list
 * @param[in] nodemux - false if the task process needs demux, true otherwise

 *
 * @return	int
 * @retval	PBSE_NONE (0) if success
 * @retval	PBSE_* on error.
 *
 */
int
start_process(task *ptask, char **argv, char **envp, bool nodemux)
{
	job *pjob = ptask->ti_job;
	int ebsize;
	char buf[MAXPATHLEN + 2];
	pid_t pid;
	int pipes[2], kid_read, kid_write, parent_read, parent_write;
	int pts;
	int i, j, k;
	int fd;
	u_long ipaddr;
	struct array_strings *vstrs;
	struct startjob_rtn sjr;
	char *pbs_jobdir; /* staging and execution directory of this job */
	int hook_errcode = 0;
	char hook_msg[HOOK_MSG_SIZE + 1];
	char *progname = NULL;
	pbs_list_head argv_list;
	mom_hook_input_t hook_input;
	mom_hook_output_t hook_output;
	char *the_progname;
	char **the_argv;
	char **the_env;
	char **res_env;
	hook *last_phook = NULL;
	unsigned int hook_fail_action = 0;
	FILE *temp_stderr = stderr;
#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
	int cred_action;
#endif

	pbs_jobdir = jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir);
	memset(&sjr, 0, sizeof(sjr));
	if (pipe(pipes) == -1)
		return PBSE_SYSTEM;
	if (pipes[1] < 3) {
		kid_write = fcntl(pipes[1], F_DUPFD, 3);
		(void) close(pipes[1]);
	} else
		kid_write = pipes[1];
	parent_read = pipes[0];

	if (pipe(pipes) == -1) {
		close(kid_write);
		close(parent_read);
		return PBSE_SYSTEM;
	}
	if (pipes[0] < 3) {
		kid_read = fcntl(pipes[0], F_DUPFD, 3);
		(void) close(pipes[0]);
	} else
		kid_read = pipes[0];
	parent_write = pipes[1];

	/*
	 ** Get ipaddr to Mother Superior.
	 */
	if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE) /* I'm MS */
		ipaddr = htonl(localaddr);
	else {
		struct sockaddr_in *ap;

		/*
		 ** We always have a stream open to MS at node 0.
		 */
		i = pjob->ji_hosts[0].hn_stream;
		if ((ap = tpp_getaddr(i)) == NULL) {
			log_joberr(-1, __func__, "no stream to MS",
				   pjob->ji_qs.ji_jobid);
			return PBSE_SYSTEM;
		}
		ipaddr = ap->sin_addr.s_addr;
	}

	/*
	 ** Begin a new process for the fledgling task.
	 */
	if ((pid = fork_me(-1)) == -1)
		return PBSE_SYSTEM;
	else if (pid != 0) { /* parent */
		(void) close(kid_read);
		(void) close(kid_write);

		/* read sid */
		i = readpipe(parent_read, &sjr, sizeof(sjr));
		j = errno;
		(void) close(parent_read);
		if (i != sizeof(sjr)) {
			sprintf(log_buffer,
				"read of pipe for pid job %s got %d not %d",
				pjob->ji_qs.ji_jobid, i, (int) sizeof(sjr));
			log_err(j, __func__, log_buffer);
			(void) close(parent_write);
			return PBSE_SYSTEM;
		}
		(void) writepipe(parent_write, &sjr, sizeof(sjr));
		(void) close(parent_write);
		DBPRT(("%s: read start return %d %d\n", __func__,
		       sjr.sj_code, sjr.sj_session))

		/*
		 ** Set the global id before exiting on error so any
		 ** information can be put into the job struct first.
		 */
		set_globid(pjob, &sjr);
		if (sjr.sj_code < 0) {
#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
			AFSLOG_TERM(ptask);
#endif
			(void) sprintf(log_buffer, "task not started, %s %s %d",
				       (sjr.sj_code == JOB_EXEC_RETRY) ? "Retry" : "Failure",
				       argv[0],
				       sjr.sj_code);
			log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB,
				  LOG_NOTICE, pjob->ji_qs.ji_jobid, log_buffer);
			return PBSE_SYSTEM;
		}

		ptask->ti_qs.ti_sid = sjr.sj_session;
		ptask->ti_qs.ti_status = TI_STATE_RUNNING;

		(void) task_save(ptask);
		if (!check_job_substate(pjob, JOB_SUBSTATE_RUNNING)) {
			set_job_state(pjob, JOB_STATE_LTR_RUNNING);
			set_job_substate(pjob, JOB_SUBSTATE_RUNNING);
			job_save(pjob);
		}
		(void) sprintf(log_buffer, "task %8.8X started, %s",
			       ptask->ti_qs.ti_task, argv[0]);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_INFO,
			  pjob->ji_qs.ji_jobid, log_buffer);

		return PBSE_NONE;
	}

	/************************************************/
	/* The child process - will become the TASK	*/
	/************************************************/
	(void) close(parent_read);
	(void) close(parent_write);

	/* unprotect the job from the vagaries of the kernel */
	daemon_protect(0, PBS_DAEMON_PROTECT_OFF);

	/*
	 * set up the Environmental Variables to be given to the job
	 */

	for (j = 0, ebsize = 0; envp[j]; j++)
		ebsize += strlen(envp[j]);
	vstrs = get_jattr_arst(pjob, JOB_ATR_variables);
	pjob->ji_env.v_ensize = vstrs->as_usedptr + num_var_else + num_var_env +
				j + EXTRA_ENV_PTRS;
	pjob->ji_env.v_used = 0;
	pjob->ji_env.v_envp = (char **) malloc(pjob->ji_env.v_ensize * sizeof(char *));
	if (pjob->ji_env.v_envp == NULL) {
		return PBSE_SYSTEM;
	}

#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
	if (pjob->ji_tasks.ll_prior == pjob->ji_tasks.ll_next) { /* create only on first task */
		cred_action = CRED_RENEWAL;
	} else {
		cred_action = CRED_SETENV;
	}

	if (cred_by_job(pjob, cred_action) != PBS_KRB5_OK) {
		log_eventf(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_ERR, pjob->ji_qs.ji_jobid,
			   "failed to set credentials for task %8.8X",
			   ptask->ti_qs.ti_task);
	}

#if defined(HAVE_LIBKAFS) || defined(HAVE_LIBKOPENAFS)
	if (start_afslog(ptask, NULL, kid_write, kid_read) != PBS_KRB5_OK) {
		sprintf(log_buffer, "afslog for task %8.8X not started",
			ptask->ti_qs.ti_task);
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_ERR,
			  pjob->ji_qs.ji_jobid, log_buffer);
	}
#endif
#endif

	/* First variables from the local environment */
	for (j = 0; j < num_var_env; ++j)
		bld_env_variables(&(pjob->ji_env), environ[j], NULL);

	/* Next, the variables passed with the job.  They may   */
	/* be overwritten with new correct values for this job	*/

	for (j = 0; j < vstrs->as_usedptr; ++j) {
#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
			/* never set KRB5CCNAME; it would rewrite the correct value */
			if (strncmp(vstrs->as_string[j], "KRB5CCNAME", strlen("KRB5CCNAME")) == 0)
				continue;
#endif
		bld_env_variables(&(pjob->ji_env), vstrs->as_string[j], NULL);
	}

	/* HOME */
	bld_env_variables(&(pjob->ji_env), variables_else[0],
			  pjob->ji_grpcache->gc_homedir);

	/* PBS_JOBNAME */
	bld_env_variables(&(pjob->ji_env), variables_else[2],
			  get_jattr_str(pjob, JOB_ATR_jobname));

	/* PBS_JOBID */
	bld_env_variables(&(pjob->ji_env), variables_else[3], pjob->ji_qs.ji_jobid);

	/* PBS_QUEUE */
	bld_env_variables(&(pjob->ji_env), variables_else[4],
			  get_jattr_str(pjob, JOB_ATR_in_queue));

	/* PBS_JOBCOOKIE */
	bld_env_variables(&(pjob->ji_env), variables_else[7],
			  get_jattr_str(pjob, JOB_ATR_Cookie));

	/* PBS_NODENUM */
	sprintf(buf, "%d", pjob->ji_nodeid);
	bld_env_variables(&(pjob->ji_env), variables_else[8], buf);

	/* PBS_TASKNUM */
	sprintf(buf, "%8.8X", ptask->ti_qs.ti_task);
	bld_env_variables(&(pjob->ji_env), variables_else[9], buf);

	/* PBS_MOMPORT */
	sprintf(buf, "%d", pbs_rm_port);
	bld_env_variables(&(pjob->ji_env), variables_else[10], buf);

	/* OMP_NUM_THREADS and NCPUS eq to number of cpus */
	sprintf(buf, "%d", pjob->ji_vnods[ptask->ti_qs.ti_myvnode].vn_threads);
#ifdef NAS /* localmod 020 */
	/* Force OMP_NUM_THREADS=1 on Columbia.
	 * If you've ever seen a 256 process MPI program try to start 256
	 * threads for each process, you'd know why.
	 */
	bld_env_variables(&(pjob->ji_env), variables_else[12], "1");
#else
	bld_env_variables(&(pjob->ji_env), variables_else[12], buf);
#endif /* localmod 020 */
	bld_env_variables(&(pjob->ji_env), "NCPUS", buf);

	/* PBS_ACCOUNT */
	if (is_jattr_set(pjob, JOB_ATR_account))
		bld_env_variables(&(pjob->ji_env), variables_else[13],
				  get_jattr_str(pjob, JOB_ATR_account));

	if (is_jattr_set(pjob, JOB_ATR_umask)) {
		sprintf(buf, "%ld", get_jattr_long(pjob, JOB_ATR_umask));
		sscanf(buf, "%o", &j);
		umask(j);
	} else {
		umask(077);
	}

	mom_unnice();

	/* set Environment to reflect batch */
	bld_env_variables(&(pjob->ji_env), "PBS_ENVIRONMENT", "PBS_BATCH");
	bld_env_variables(&(pjob->ji_env), "ENVIRONMENT", "BATCH");

	for (i = 0; envp[i]; i++)
		bld_env_variables(&(pjob->ji_env), envp[i], NULL);

		/* Add TMPDIR to environment */
#ifdef NAS /* localmod 010 */
	(void) NAS_tmpdirname(pjob);
#endif /* localmod 010 */
	j = mktmpdir(pjob->ji_qs.ji_jobid,
		     pjob->ji_qs.ji_un.ji_momt.ji_exuid,
		     pjob->ji_qs.ji_un.ji_momt.ji_exgid,
		     &(pjob->ji_env));
	if (j != 0) {
		starter_return(kid_write, kid_read, j, &sjr);
	}

	/* set PBS_JOBDIR */
	if ((is_jattr_set(pjob, JOB_ATR_sandbox)) &&
	    (strcasecmp(get_jattr_str(pjob, JOB_ATR_sandbox), "PRIVATE") == 0)) {
		bld_env_variables(&(pjob->ji_env), "PBS_JOBDIR", pbs_jobdir);
	} else {
		bld_env_variables(&(pjob->ji_env), "PBS_JOBDIR", pjob->ji_grpcache->gc_homedir);
	}

	j = set_job(pjob, &sjr);
	if (j < 0) {
		if (j == -1) {
			/* set_job didn't leave message in log_buffer */
			(void) strcpy(log_buffer, "Unable to set task session");
		}
		log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, LOG_NOTICE,
			  pjob->ji_qs.ji_jobid, log_buffer);
		if (j == -3)
			j = JOB_EXEC_FAIL2;
		else
			j = JOB_EXEC_RETRY;
		starter_return(kid_write, kid_read, j, &sjr);
	}
	ptask->ti_qs.ti_sid = sjr.sj_session;
	if ((i = mom_set_limits(pjob, SET_LIMIT_SET)) != PBSE_NONE) {
		(void) sprintf(log_buffer, "Unable to set limits, err=%d", i);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_WARNING,
			  pjob->ji_qs.ji_jobid, log_buffer);
		if (i == PBSE_RESCUNAV) /* resource temp unavailable */
			j = JOB_EXEC_RETRY;
		else
			j = JOB_EXEC_FAIL2;
		starter_return(kid_write, kid_read, j, &sjr);
	}

	the_progname = argv[0];
	the_argv = argv;

	*(pjob->ji_env.v_envp + pjob->ji_env.v_used) = NULL;
	the_env = pjob->ji_env.v_envp;

	mom_hook_input_init(&hook_input);
	hook_input.pjob = pjob;
	hook_input.progname = the_progname;
	hook_input.argv = the_argv;
	hook_input.env = the_env;

	mom_hook_output_init(&hook_output);
	hook_output.reject_errcode = &hook_errcode;
	hook_output.last_phook = &last_phook;
	hook_output.fail_action = &hook_fail_action;
	hook_output.progname = &progname;
	CLEAR_HEAD(argv_list);
	hook_output.argv = &argv_list;

	switch (mom_process_hooks(HOOK_EVENT_EXECJOB_LAUNCH,
				  PBS_MOM_SERVICE_NAME,
				  mom_host, &hook_input, &hook_output,
				  hook_msg, sizeof(hook_msg), 0)) {

		case 0: /* explicit reject */
			free(progname);
			free_attrlist(&argv_list);
			free_str_array(hook_output.env);
			starter_return(kid_write, kid_read,
				       JOB_EXEC_FAILHOOK_DELETE, &sjr);
		case 1: /* explicit accept */
			if (progname != NULL)
				the_progname = progname;

			the_argv = svrattrl_to_str_array(&argv_list);
			if (the_argv == NULL) {
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
					  LOG_INFO, "",
					  "execjob_launch hook returned NULL argv!");
				free(progname);
				free_attrlist(&argv_list);
				free_str_array(hook_output.env);
				starter_return(kid_write, kid_read,
					       JOB_EXEC_FAILHOOK_DELETE, &sjr);
			}
			res_env = hook_output.env;
			if (res_env == NULL) {
				log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
					  LOG_INFO, "",
					  "execjob_launch hook NULL env!");
				free(progname);
				free_attrlist(&argv_list);
				free_str_array(the_argv);
				starter_return(kid_write, kid_read,
					       JOB_EXEC_FAILHOOK_DELETE, &sjr);
			}

			/* clear the env array */
			pjob->ji_env.v_used = 0;
			pjob->ji_env.v_envp[0] = NULL;

			/* need to also set vtable as that would */
			/* get appended to later in the code */
			/* vtable holds the environmnent variables */
			/* and their values that are going to be */
			/* part of the job. */
			k = 0;
			while (res_env[k]) {
				char *n, *v, *p;
				if ((p = strchr(res_env[k], '=')) != NULL) {
					*p = '\0';
					n = res_env[k];
					v = p + 1;
					bld_env_variables(&(pjob->ji_env),
							  n, v);
					*p = '=';
				}
				k++;
			}
			the_env = pjob->ji_env.v_envp;

			break;
		case 2: /* no hook script executed - go ahead and accept event */
			break;
		default:
			log_event(PBSEVENT_DEBUG2, PBS_EVENTCLASS_HOOK,
				  LOG_INFO, "",
				  "execjob_launch hook event: accept req by default");
	}
	if (set_credential(pjob, NULL, &the_argv) == -1) {
		starter_return(kid_write, kid_read,
			       JOB_EXEC_FAIL2, &sjr); /* exits */
	}

	/* Pick up any env settings added by set_credential(), and NULL */
	/* terminate the envp array. */
	*(pjob->ji_env.v_envp + pjob->ji_env.v_used) = NULL;
	the_env = pjob->ji_env.v_envp;

	/* change working directory to PBS_JOBDIR or to User's Home */
	if ((is_jattr_set(pjob, JOB_ATR_sandbox)) &&
	    (strcasecmp(get_jattr_str(pjob, JOB_ATR_sandbox), "PRIVATE") == 0)) {
		if ((!pbs_jobdir) || (chdir(pbs_jobdir) == -1)) {
			log_event(PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB,
				  LOG_ERR, pjob->ji_qs.ji_jobid,
				  "Could not chdir to PBS_JOBDIR directory");
			(void) fprintf(stderr, "sandbox=PRIVATE mode: could not chdir to job directory\n");
			starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr);
		}
	} else {
		if (chdir(pjob->ji_grpcache->gc_homedir) == -1) {
			log_event(PBSEVENT_JOB | PBSEVENT_SECURITY, PBS_EVENTCLASS_JOB,
				  LOG_ERR, pjob->ji_qs.ji_jobid,
				  "Could not chdir to Home directory");
			(void) fprintf(stderr, "Could not chdir to home directory\n");
			starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr);
		}
	}

	/*
	 ** Set up stdin.
	 */
	if ((fd = open("/dev/null", O_RDONLY)) == -1) {
		log_err(errno, __func__, "could not open devnull");
		(void) close(0);
	} else {
		(void) dup2(fd, 0);
		if (fd > 0)
			(void) close(fd);
	}

	/* If nodemux is not already set by the caller, check job's JOB_ATR_nodemux attribute. */
	if (!nodemux && (is_jattr_set(pjob, JOB_ATR_nodemux)))
		nodemux = get_jattr_long(pjob, JOB_ATR_nodemux);

	if (pjob->ji_numnodes > 1) {
		if (nodemux) {
			/*
			 ** Open /dev/null for stdout and stderr.
			 */
			if ((fd = open("/dev/null", O_RDONLY)) == -1) {
				log_err(errno, __func__, "could not open devnull");
				(void) close(1);
				(void) close(2);
			} else {
				if (fd != 1)
					(void) dup2(fd, 1);
				if (fd != 2)
					(void) dup2(fd, 2);
				if (fd > 2)
					(void) close(fd);
			}
		} else {
			/*
			 ** Open sockets to demux proc for stdout and stderr.
			 */
			if ((fd = open_demux(ipaddr, pjob->ji_stdout)) == -1)
				starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr);
			(void) dup2(fd, 1);
			if (fd > 1)
				(void) close(fd);
			if ((fd = open_demux(ipaddr, pjob->ji_stderr)) == -1)
				starter_return(kid_write, kid_read, JOB_EXEC_FAIL2, &sjr);
			(void) dup2(fd, 2);
			if (fd > 2)
				(void) close(fd);

			if (write(1, get_jattr_str(pjob, JOB_ATR_Cookie),
			      strlen(get_jattr_str(pjob, JOB_ATR_Cookie))) == -1) 
				log_errf(-1, __func__, "write failed. ERR : %s",strerror(errno));			
			if ( write(2, get_jattr_str(pjob, JOB_ATR_Cookie),
			      strlen(get_jattr_str(pjob, JOB_ATR_Cookie))) == -1) 
				log_errf(-1, __func__, "write failed. ERR : %s",strerror(errno));			
		}
	} else if (is_jattr_set(pjob, JOB_ATR_interactive) && get_jattr_long(pjob, JOB_ATR_interactive) > 0) {
		/* interactive job, single node, write to pty */
		if ((pts = open_pty(pjob)) < 0) {
			log_err(errno, __func__, "cannot open slave");
			starter_return(kid_write, kid_read, JOB_EXEC_FAIL1, &sjr);
		}
		(void) dup2(pts, 1);
		(void) dup2(pts, 2);

	} else {
		/* normal batch job, single node, write straight to files */
		if (open_std_out_err(pjob) == -1) {
			starter_return(kid_write, kid_read,
				       JOB_EXEC_RETRY, &sjr);
		} else {
			/* After the error is redirected, stderr does not have a valid FILE* */
			temp_stderr = fdopen(STDERR_FILENO, "w");

			/* If we could not get the valid FILE*, let temp_stderr point to stderr to avoid
		 	* a possible crash in subsequent calls to output functions like printf/fprintf */
			if (!temp_stderr)
				temp_stderr = stderr;
		}
	}

	log_close(0);
	starter_return(kid_write, kid_read, JOB_EXEC_OK, &sjr);

	environ = the_env;
	execvp(the_progname, the_argv);
	free(progname);
	free_attrlist(&argv_list);
	free_str_array(the_argv);
	free_str_array(hook_output.env);
	free_str_array(the_env);
#if 0
	/*
	 ** This is for a shell to run the command.
	 */
	if (argv[0][0] == '/')		/* full path exe */
		execve(argv[0], argv, pjob->ji_env.v_envp);
	else {
		struct	passwd	*pwent;
		char	*shell = "/bin/sh";
		char	*shname;
		char	*args[4];

		pwent = getpwuid(pjob->ji_qs.ji_un.ji_momt.ji_exuid);
		if (pwent != NULL && pwent->pw_shell[0] == '/')
			shell = pwent->pw_shell;
		shname = strrchr(shell, '/') + 1;	/* one past slash */
		args[0] = strdup("-");
		strcat(args[0], shname);

		args[1] = "-c";

		args[2] = strdup(argv[0]);
		for (i=1; argv[i] != NULL; i++) {
			strcat(args[2], " ");
			strcat(args[2], argv[i]);
		}

		args[3] = NULL;

		printf("%s %s %s\n", args[0], args[1], args[2]);
		execve(shell, args, pjob->ji_env.v_envp);
	}
#endif
	fprintf(temp_stderr, "%s: %s\n", argv[0], strerror(errno));
	exit(254);
	return PBSE_SYSTEM; /* not reached */
}

/**
 * @brief
 *	Free the ji_hosts and ji_vnods arrays for a job.  If any events are
 *	attached to an array element, free them as well.
 *
 * @param[in] pj - job pointer
 *
 * @return Void
 *
 */

void
nodes_free(job *pj)
{
	int i;
	vmpiprocs *vp;

	if (pj->ji_vnods) {
		vp = pj->ji_vnods;
		for (i = 0; i < pj->ji_numvnod; i++, vp++) {
			if (vp->vn_hname)
				free(vp->vn_hname);
			if (vp->vn_vname)
				free(vp->vn_vname);
		}
		(void) free(pj->ji_vnods);
		pj->ji_vnods = NULL;
	}

	if (pj->ji_assn_vnodes) {
		vp = pj->ji_assn_vnodes;
		for (i = 0; i < pj->ji_num_assn_vnodes; i++, vp++) {
			if (vp->vn_hname)
				free(vp->vn_hname);
			if (vp->vn_vname)
				free(vp->vn_vname);
		}
		(void) free(pj->ji_assn_vnodes);
		pj->ji_assn_vnodes = NULL;
		pj->ji_num_assn_vnodes = 0;
	}

	if (pj->ji_hosts) {
		hnodent *np;

		np = pj->ji_hosts;
		for (i = 0; i < pj->ji_numnodes; i++, np++) {
			eventent *ep = (eventent *) GET_NEXT(np->hn_events);

			if (np->hn_host)
				free(np->hn_host);
			if (np->hn_vlist)
				free(np->hn_vlist);

			/* don't close stream incase another job uses it */
			while (ep) {

				if (ep->ee_argv)
					arrayfree(ep->ee_argv);
				if (ep->ee_envp)
					arrayfree(ep->ee_envp);
				delete_link(&ep->ee_next);
				free(ep);
				ep = (eventent *) GET_NEXT(np->hn_events);
			}
			/*
			 ** Here we free any dependent structure(s) from hn_setup.
			 */
			if (job_free_node != NULL)
				job_free_node(pj, np);
		}
		free(pj->ji_hosts);
		pj->ji_hosts = NULL;
	}
}

/**
 * @brief
 *	Add a mom to a job, if the mom is not already present
 *
 * @param[in] pjob - job pointer
 * @param[in] mname - mom name to add
 * @param[in] port - mom port
 * @param[in/out] mi - The last used index in the ji_hosts array 
 * @param[out] mynp - Return pointer to a match with this host
 *
 * @return hnodent
 * @retval - The hnodent structure matching the mname, port
 * @retval - NULL - failure to add (get_fullhostname failed)
 *
 */
hnodent *
add_mom_to_job(job *pjob, char *mname, int port, int *mi, hnodent **mynp)
{
	int j;
	int momindex = *mi;
	hnodent *hp = NULL;

	/*
	* for the natural vnode in a set that satisfies a chunk,
	* see if we have a hnodent entry for the parent Mom,
	* if not add an entry
	*/

	/* see if we already have this mom */
	for (j = 0; j < momindex; ++j) {
		if ((strcmp(mname, pjob->ji_hosts[j].hn_host) == 0) && (port == pjob->ji_hosts[j].hn_port))
			break;
	}
	hp = &pjob->ji_hosts[j];
	if ((hp != NULL) && (j == momindex)) {
		log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "Adding mom %s:%d to job", mname, port);
		/* need to add entry */
		hp->hn_node = momindex++;
		hp->hn_host = strdup(mname);
		if (hp->hn_host == NULL)
			return (NULL);
		hp->hn_port = port;
		hp->hn_stream = -1;
		hp->hn_eof_ts = 0; /* reset eof timestamp */
		hp->hn_sister = SISTER_OKAY;
		hp->hn_nprocs = 0;
		hp->hn_vlnum = 0;
		hp->hn_vlist = (host_vlist_t *) 0;
		hp->hn_vlist = NULL;
		memset(&hp->hn_nrlimit, 0, sizeof(resc_limit_t));
		CLEAR_HEAD(hp->hn_events);
		/* mark next slot as the (current) end */
		pjob->ji_hosts[momindex].hn_node = TM_ERROR_NODE;

		if (hp->hn_port == pbs_rm_port) {
			int hostmatch = 0;
			static char node_name[PBS_MAXHOSTNAME + 1] = {'\0'};
			static char canonical_name[PBS_MAXHOSTNAME + 1] = {'\0'};

			/*
			* The following block prevents us from having to employ
			* yet another global variable to represent the hostname
			* of the local node.
			*/
			if (pbs_conf.pbs_leaf_name) {
				if (strcmp(pbs_conf.pbs_leaf_name, node_name) != 0) {
					/* PBS_LEAF_NAME has changed or node_name is uninitialized */
					strncpy(node_name, pbs_conf.pbs_leaf_name, PBS_MAXHOSTNAME);
					node_name[PBS_MAXHOSTNAME] = '\0';
					/* Need to canonicalize PBS_LEAF_NAME */
					if (get_fullhostname(node_name, canonical_name, (sizeof(canonical_name) - 1)) != 0) {
						log_errf(errno, __func__, "Failed to get fullhostname from %s for job %s", node_name, pjob->ji_qs.ji_jobid);
						node_name[0] = '\0';
						canonical_name[0] = '\0';
						return (NULL);
					}
				}
			} else {
				if (strcmp(mom_host, node_name) != 0) {
					/* mom_host has changed or node_name is uninitialized */
					strncpy(node_name, mom_host, PBS_MAXHOSTNAME);
					node_name[PBS_MAXHOSTNAME] = '\0';
					/* mom_host contains the canonical name */
					strncpy(canonical_name, mom_host, PBS_MAXHOSTNAME);
					canonical_name[PBS_MAXHOSTNAME] = '\0';
				}
			}

			if (strcmp(hp->hn_host, node_name) == 0)
				hostmatch = 1;
			else {
				char namebuf[PBS_MAXHOSTNAME + 1];
				if (get_fullhostname(hp->hn_host, namebuf, (sizeof(namebuf) - 1)) != 0) {
					log_errf(errno, __func__, "Failed to get fullhostname from %s for job %s", hp->hn_host, pjob->ji_qs.ji_jobid);
					return (NULL);
				}
				if (strcmp(namebuf, canonical_name) == 0)
					hostmatch = 1;
			}

			if (hostmatch) {
				pjob->ji_nodeid = hp->hn_node;
				if (mynp)
					*mynp = hp;
			}
		}
	}
	*mi = momindex;
	return hp;
}

/**
 * @brief
 *	Get the next "chunk" from the exechost(2) string
 *
 * @param[in] enable_exechost2 - is exec_host2 available?
 * @param[in/out] ppeh - pointer to the current location in exechost(2) string
 * @param[out] pport - pointer to the integer port variable to be returned
 *
 * @return char *
 * @retval - the mom name 
 * @retval - NULL - failure
 *
 */
static char *
get_next_exechost2(int enable_exechost2, char **ppeh, int *pport)
{
	static char *mname;
	int port;
	char *peh = *ppeh;
	int n = 0;
	static char natvnodename[PBS_MAXNODENAME + 1];
	static char momname[PBS_MAXNODENAME + 1];
	static char momport[10] = {0};
	momvmap_t *pnat = NULL;

	if (enable_exechost2 == 0) {
		while ((*peh != '/') && (*peh != '\0') &&
		       (n < PBS_MAXNODENAME)) {
			natvnodename[n++] = *peh++;
		}
		natvnodename[n] = '\0';
	} else {
		momport[0] = '\0';
		while ((*peh != ':') && (*peh != '/') && (*peh != '\0') &&
		       (n < PBS_MAXNODENAME)) {
			momname[n++] = *peh++;
		}
		momname[n] = '\0';
		/* check if peh is colon, if so parse out port */
		n = 0;
		if (*peh == ':') {
			peh++; /* skip first ':' character to get port number */
			while ((*peh != '/') && (*peh != '\0') && (n < sizeof(momport)))
				momport[n++] = *peh++;
		}
		momport[n] = '\0';
	}

	/* advance past the "+" to the next host */
	while (*peh != '\0') {
		if (*peh++ == '+')
			break;
	}

	if (enable_exechost2 == 0) {
		pnat = find_vmap_entry(natvnodename);
		if (pnat != NULL) {
			/* found a map entry */
			mname = pnat->mvm_mom->mi_host;
			port = pnat->mvm_mom->mi_port + 1; /* RM port */
		} else {
			/* no map entry, assume same vnode name is */
			/* the host name and the port is standard  */
			mname = natvnodename;
			port = pbs_mom_port + 1; /* RM port */
		}
	} else {
		mname = momname;
		if (strlen(momport) > 0) {
			port = atol(momport) + 1;
		} else {
			port = pbs_mom_port + 1; /* RM port */
		}
	}

	*pport = port;
	*ppeh = peh;
	return mname;
}

/**
 * @brief
 *	job_nodes - process schedselect and exec_vnode to build mapping between
 *	chunks and allocated nodes/resources.
 *
 * @par Functionality:
 *	Loops through schedselect attribute and concurrently exec_vnode and
 *	exec_host attributes creating two arrays of structures:
 *	    hnodent - one per Mom regardless of the number of vnodes
 *		allocated from that Mom.  For the local Mom's entry, indexed
 *		by pjob->ji_nodeid, the hnodent will also contain an sub-array
 *		of host_vlist_t with one entry per vnode allocated on this host.
 *		This sub-array's length is given by hn_vlnum.
 *	    vmiprocs - one pre task/mpi process to be created;  there is one
 *		line per entry written into PBS_NODEFILE by Mom
 *	Both of the hnodent and vmpiprocs arrays are terminated by an entry
 *	where the id (hn_node or vn_node) is set to TM_ERROR_NODE.
 *	Additionally this function also determines the ji_nodeid of the job
 *	by matching the mom's name and port with the exechost list.
 *
 * @param[in]	pjob - pointer to job structure for job to be run
 * @param[out]	mynp - pointer to hnodent structure to be filled with the
 *                     hnodent for the node matching the current mom:port
 *
 * @return	int
 * @retval	PBSE_NONE (0) if success
 * @retval	PBSE_* on error.
 *
 * @par Side Effects:
 *	pjob->ji_vnods, pjob->ji_assn_vnodes, and pjob->ji_hosts are set,
 *	arrays in the heap
 *
 * @par MT-safe: likely no
 *
 */
int
job_nodes_inner(struct job *pjob, hnodent **mynp)
{
	char *execvnode;
	char *schedselect;
	int i, j, k;
	hnodent *hp = NULL;
	int hpn;
	int momindex;
	char *mname;
	int nmoms;
	vmpiprocs *vmp;
	momvmap_t *pmm = NULL;
	mominfo_t *pmom;

	char *peh;
	int port;
	int nprocs;
	int n_chunks;
	int procindex;
	int rc;
	long long sz;
	char *tpc;
	resc_limit_t have;
	resc_limit_t need;
	int naccels = 0;	  /* naccelerators count */
	int need_accel = 0;	  /* accelerator needed in subchunk? */
	long long accel_mem = 0;  /* accel mem per exec_vnode key-value pair */
	char *accel_model = NULL; /* accelerator model if set */

	/* variables used in parsing the "exec_vnode" string */
	int stop_on_paren;
	char *pndspec;
	char *elast;
	int enelma;
	char *nodep;
	static int ebuf_len = 0;
	static char *ebuf = NULL;
	static int enelmt = 0;
	static key_value_pair *enkv = NULL;

	/* variables used in parsing the "schedselect" string */
	char *psubspec;
	char *slast;
	int snc;
	int snelma;
	static int sbuf_len = 0;
	static char *sbuf = NULL;
	static int snelmt = 0;
	static key_value_pair *skv = NULL;
	char *save_ptr; /* posn for strtok_r() */
	int n_assn_vnodes;
	int assn_index;
	char *tmp_str;
	char *evnode;

	if (pjob == NULL)
		return (PBSE_INTERNAL);
	if (!(is_jattr_set(pjob, JOB_ATR_exec_vnode)))
		return (PBSE_INTERNAL);
	if (!(is_jattr_set(pjob, JOB_ATR_SchedSelect)))
		return (PBSE_INTERNAL);

	/* free what might have been done before if job is restarted */
	nodes_free(pjob);

	execvnode = get_jattr_str(pjob, JOB_ATR_exec_vnode);
	if (execvnode == NULL)
		return (PBSE_INTERNAL);

	schedselect = get_jattr_str(pjob, JOB_ATR_SchedSelect);
	if (schedselect == NULL)
		return (PBSE_INTERNAL);

	if (get_jattr_str(pjob, JOB_ATR_exec_host2) != NULL) {
		/* Mom got information from new server */
		enable_exechost2 = 1;
		peh = get_jattr_str(pjob, JOB_ATR_exec_host2);
	} else {
		peh = get_jattr_str(pjob, JOB_ATR_exec_host);
	}
	if (peh == NULL)
		return (PBSE_INTERNAL);

	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "execvnode=%s", execvnode);
	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "schedselect=%s", schedselect);
	log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "%s=%s", enable_exechost2 ? "exechost2" : "exechost", peh);

	/* make sure parsing buffers are long enought */
	if ((i = strlen(execvnode)) >= ebuf_len) {
		tpc = (char *) realloc(ebuf, i + 100);
		if (tpc == NULL)
			return (PBSE_SYSTEM);
		ebuf = tpc;
		ebuf_len = i + 100;
	}
	if ((i = strlen(schedselect)) >= sbuf_len) {
		tpc = (char *) realloc(sbuf, i + 100);
		if (tpc == NULL)
			return (PBSE_SYSTEM);
		sbuf = tpc;
		sbuf_len = i + 100;
	}

	strcpy(sbuf, schedselect);

	/* First, go parse schedselect and count up number of chunks and */
	/* total number of mpiprocs;   assuming one Mom per chunk and    */
	/* one mpiproc structure per mpiproc, this is used to obtain a   */
	/* maxmimun number of each for allocating the array              */

	nmoms = 0;    /* num of mom (struct hnodent) entries needed    */
	nprocs = 0;   /* num of vmpiproc entries needed                */
	n_chunks = 0; /* number of chunks */

	psubspec = parse_plus_spec_r(sbuf, &slast, &hpn);
	/* hpn set to 1 if open paren found, -1 if close paren found, or */
	/* 0 if neither or both found					 */

	while (psubspec) {
		DBPRT(("\tsubspec: %s\n", psubspec))
		rc = parse_chunk_r(psubspec, &snc, &snelma, &snelmt, &skv, NULL);
		/* snc is the number (repeat factor) of chunks */
		if (rc != 0)
			return (rc);

		nmoms += snc; /* num of Moms, one per chunk */
		k = 1;	      /* default number of mpiprocs */
		for (j = 0; j < snelma; ++j) {
			if (strcmp(skv[j].kv_keyw, "mpiprocs") == 0) {
				k = atol(skv[j].kv_val);
			}
		}
#ifdef NAS /* localmod 020 */
		/*
		 * At NAS, if specify only ncpus and not mpiprocs or
		 * ompthreads, assume mpiprocs = ncpus.
		 */
		{
			int ncpusidx = -1;
			for (j = 0; j < snelma; ++j) {
				if (strcmp(skv[j].kv_keyw, "ncpus") == 0) {
					ncpusidx = j;
					continue;
				}
				if (strcmp(skv[j].kv_keyw, MPIPROCS) == 0) {
					break;
				}
				if (strcmp(skv[j].kv_keyw, OMPTHREADS) == 0) {
					break;
				}
			}
			if (ncpusidx >= 0 && j >= snelma) {
				k = atol(skv[ncpusidx].kv_val);
			}
		}
#endif				   /* localmod 020 */
		nprocs += snc * k; /* mpiproces * num of chunks */
		n_chunks += snc;
		psubspec = parse_plus_spec_r(slast, &slast, &hpn);
	}

	DBPRT(("- allocating %d hosts and %d procs\n", nmoms, nprocs))
	pjob->ji_hosts = (hnodent *) calloc(nmoms + 1, sizeof(hnodent));
	pjob->ji_vnods = (vmpiprocs *) calloc(nprocs + 1, sizeof(vmpiprocs));

	n_assn_vnodes = 0;
	evnode = strdup(execvnode);
	if (evnode == NULL) {
		log_err(errno, __func__, "strdup failed");
		return (PBSE_SYSTEM);
	}
	for (tmp_str = strtok_r(evnode, "+", &save_ptr); tmp_str != NULL; tmp_str = strtok_r(NULL, "+", &save_ptr)) {
		n_assn_vnodes++;
	}

	if (n_assn_vnodes == 0)
		n_assn_vnodes = 1;

	free(evnode);

	pjob->ji_assn_vnodes = (vmpiprocs *) calloc(n_assn_vnodes + 1, sizeof(vmpiprocs));

	if ((pjob->ji_hosts == NULL) || (pjob->ji_vnods == NULL) ||
	    (pjob->ji_assn_vnodes == NULL)) {
		log_err(errno, "job_nodes", "calloc failed");
		return (PBSE_SYSTEM);
	}

	for (i = 0; i <= nmoms; ++i) {
		pjob->ji_hosts[i].hn_node = TM_ERROR_NODE;
		CLEAR_HEAD(pjob->ji_hosts[i].hn_events);
	}
	for (i = 0; i <= nprocs; ++i)
		pjob->ji_vnods[i].vn_node = TM_ERROR_NODE;

	for (i = 0; i <= n_assn_vnodes; ++i)
		pjob->ji_assn_vnodes[i].vn_node = TM_ERROR_NODE;

	/* Now parse schedselect and exec_vnode at same time to map mpiprocs */
	/* onto the corresponding Mom and sum up the resources allocated     */
	/* from each Mom						     */

	strcpy(ebuf, execvnode);
	strcpy(sbuf, schedselect);

	momindex = 0;
	procindex = 0;
	assn_index = 0;

	elast = ebuf;

	/*
	 * Next we parse the select spec to look at the next chunk that was
	 * requested by the user.  For each chunk we
	 * 1. parse the subspecs from the exec_vnode that were allocated for
	 *    that chunk.  Then
	 *    a. for the first vnode, get the Mom/host and setup the hnodent
	 *    b. for my hnodent, for each vnode, add a host_vlist entry to
	 *       the hnodent entry
	 * 2. setup the number of "mpiprocs" (from the chunk) vmpiprocs
	 */

	/* (1) parse chunk from select spec */

	psubspec = parse_plus_spec_r(sbuf, &slast, &hpn);
	while (psubspec) {
		int nthreads;
		int numprocs;

		DBPRT(("\tsubspec: %s\n", psubspec))
		nthreads = -1;
		numprocs = -1;
		rc = parse_chunk_r(psubspec, &snc, &snelma, &snelmt, &skv, NULL);
		/* snc = number of chunks */
		if (rc != 0) {
			return (rc);
		}

		for (i = 0; i < snc; ++i) { /* for each chunk in schedselect.. */
			need_accel = 0;
			accel_model = NULL;

			/* clear "need" counts */
			memset(&need, 0, sizeof(need));

			/* clear "have" counts */
			memset(&have, 0, sizeof(have));

			/* figure out what is "need"ed */
			for (j = 0; j < snelma; ++j) {
				if (strcmp(skv[j].kv_keyw, "ncpus") == 0)
					need.rl_ncpus = atol(skv[j].kv_val);
				else if (strcmp(skv[j].kv_keyw, "mem") == 0)
					need.rl_mem = to_kbsize(skv[j].kv_val);
				else if (strcmp(skv[j].kv_keyw, "vmem") == 0)
					need.rl_vmem = to_kbsize(skv[j].kv_val);
				else if (strcmp(skv[j].kv_keyw, "mpiprocs") == 0)
					numprocs = atol(skv[j].kv_val);
				else if (strcmp(skv[j].kv_keyw, "ompthreads") == 0)
					nthreads = atol(skv[j].kv_val);
				else if (strcmp(skv[j].kv_keyw, "accelerator") == 0) {
					if (strcmp(skv[j].kv_val, "True") == 0)
						need_accel = 1;
					else
						need_accel = 0;
				} else if (strcmp(skv[j].kv_keyw, "naccelerators") == 0)
					need.rl_naccels = atol(skv[j].kv_val);
				else if (strcmp(skv[j].kv_keyw, "accelerator_model") == 0) {
					accel_model = skv[j].kv_val;
					need_accel = 1;
				}
			}
#ifdef NAS /* localmod 020 */
			if (nthreads == -1 && numprocs == -1 && need.rl_ncpus != 0) {
				numprocs = need.rl_ncpus;
			}
#endif /* localmod 020 */
			if (nthreads == -1)
#ifdef NAS_NCPUS1 /* localmod 020 */
				nthreads = 1;
#else
				nthreads = need.rl_ncpus;
#endif /* localmod 020 */

			if (numprocs == -1) {
				if (need.rl_ncpus == 0) {
					numprocs = 0;
				} else {
					numprocs = 1;
				}
			}

			DBPRT(("\tchunk: %d, need %d ncpus and %lu mem\n", i,
			       need.rl_ncpus, (unsigned long) need.rl_mem))

			/*
			* The "natural" vnode for the Mom who is managing
			* this chunk of resources can be determined by the
			* corresponding entry in exec_host.  We have to know which
			* Mom in case of multiple-Moms for the allocated vnodes
			*/
			mname = get_next_exechost2(enable_exechost2, &peh, &port);
			hp = add_mom_to_job(pjob, mname, port, &momindex, mynp);
			if (hp == NULL) {
				log_err(errno, __func__, "Failed to add mom to job");
				return (PBSE_SYSTEM);
			}

			/* now parse exec_vnode to match up alloc-ed with needed */
			stop_on_paren = 0;

			while ((pndspec = parse_plus_spec_r(elast, &elast, &hpn)) != NULL) {
				int vnncpus = 0;
				long long ndmem = 0;

				if (hpn > 0) /* found open paren '(' */
					stop_on_paren = 1;

				rc = parse_node_resc_r(pndspec, &nodep, &enelma, &enelmt, &enkv);

				/* if no resources specified, skip it */
				if (enelma == 0) {
					stop_on_paren = 0;
					log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "Ignoring vnode %s without resources", nodep);
					mname = get_next_exechost2(enable_exechost2, &peh, &port);
					hp = add_mom_to_job(pjob, mname, port, &momindex, mynp);
					if (hp == NULL) {
						log_err(errno, __func__, "Failed to add mom to job");
						return (PBSE_SYSTEM);
					}
					continue; /* check next piece */
				}

				/* nodep = vnode name */
				if (rc != 0) {
					return (rc);
				}
				DBPRT(("\t\tusing vnode %s\n", nodep))

				/* find the Mom who manages the vnode */
				pmm = (momvmap_t *) find_vmap_entry(nodep);
				if (pmm == NULL) {
					/* Did not find a vmap entry for this vnode */
					/* assume it is host and add it w/ std port */

					if (enable_exechost2) {
						/* In case mom connected with newer server  */
						pmom = create_mom_entry(mname, port - 1);
					} else {
						pmom = create_mom_entry(nodep, pbs_mom_port);
					}
					if (pmom == NULL)
						return PBSE_SYSTEM;
#ifdef NAS /* localmod 123 */
					/* call create_mommap_entry() in a */
					/* way that populates the job      */
					/* nodefile with short names       */
					/* (e.g. r169i0n0)                 */
					if (0)
#else
					if (enable_exechost2)
#endif /* localmod 123 */
					{
						log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, "Creating entry for vnode=%s, mom=%s", nodep, mname);
						pmm = create_mommap_entry(nodep, mname, pmom, 0);
					} else {
						pmm = create_mommap_entry(nodep, NULL, pmom, 0);
					}

					if (pmm == NULL) {
						delete_mom_entry(pmom);
						return PBSE_SYSTEM;
					}
					log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_NODE, LOG_DEBUG, nodep, "implicitly added host to vmap");
				} else {
					log_eventf(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid,
						   "Found! vnodemap entry for vnode=%s: mom-name=%s, hostname=%s", nodep, pmm->mvm_name, pmm->mvm_hostn);
				}

				/* for the allocated resc, add to hnodent resc_limit */
				/* which is used for limit enforcement while running */

				for (j = 0; j < enelma; ++j) {
					if (strcmp(enkv[j].kv_keyw, "ncpus") == 0) {
						vnncpus = atoi(enkv[j].kv_val);
						have.rl_ncpus += vnncpus;
						hp->hn_nrlimit.rl_ncpus += vnncpus;
					} else if (strcmp(enkv[j].kv_keyw, "mem") == 0) {
						ndmem = to_kbsize(enkv[j].kv_val);
						have.rl_mem += ndmem;
						hp->hn_nrlimit.rl_mem += ndmem;
					} else if (strcmp(enkv[j].kv_keyw, "vmem") == 0) {
						sz = to_kbsize(enkv[j].kv_val);
						have.rl_vmem += sz;
						hp->hn_nrlimit.rl_vmem += sz;
					} else if (strcmp(enkv[j].kv_keyw, "ssinodes") == 0) {
						hp->hn_nrlimit.rl_ssi += atoi(enkv[j].kv_val);
					} else if (strcmp(enkv[j].kv_keyw, "naccelerators") == 0) {
						naccels = atoi(enkv[j].kv_val);
						have.rl_naccels += naccels;
						hp->hn_nrlimit.rl_naccels += naccels;
					} else if (strcmp(enkv[j].kv_keyw, "accelerator_memory") == 0) {
						accel_mem = to_kbsize(enkv[j].kv_val);
						have.rl_accel_mem += accel_mem;
						hp->hn_nrlimit.rl_accel_mem += accel_mem;
						need_accel = 1;
					}
				}

				/* (1b)if this mom is me, add vnode to  host_vlist */
				if (hp->hn_node == pjob->ji_nodeid) {
					host_vlist_t *phv;
					phv = (host_vlist_t *) realloc(hp->hn_vlist,
								       (hp->hn_vlnum + 1) * sizeof(host_vlist_t));
					if (phv == NULL) {
						return (PBSE_INTERNAL);
					}
					hp->hn_vlist = phv;
					pbs_strncpy(phv[hp->hn_vlnum].hv_vname, nodep,
						    sizeof(phv[hp->hn_vlnum].hv_vname));
					phv[hp->hn_vlnum].hv_ncpus = vnncpus;
					phv[hp->hn_vlnum].hv_mem = ndmem;
					hp->hn_vlnum++;
				}

				vmp = &pjob->ji_assn_vnodes[assn_index];
				vmp->vn_node = assn_index++;
				if (hp != NULL)
					vmp->vn_host = hp;
				if (pmm != NULL)
					vmp->vn_vname = strdup(pmm->mvm_name);
				if (vmp->vn_vname == NULL) {
					if (vmp->vn_hname != NULL) {
						free(vmp->vn_hname);
						vmp->vn_hname = NULL;
					}
					return (PBSE_SYSTEM);
				}
				vmp->vn_cpus = vnncpus;
				vmp->vn_mem = ndmem;
				/* mark next entry as the (current) end */
				pjob->ji_assn_vnodes[assn_index].vn_node = TM_ERROR_NODE;

				if (stop_on_paren == 0)
					break;
				else if (hpn < 0)
					break;
			}
			/* validate the pointers before using  - SAFER */
			assert((hp != NULL) && (pmm != NULL));
			hp->hn_nprocs += numprocs;

			/* (2) setup the number of vmpiprocs entries based */
			/* on the number of procs, numprocs, in this chunk */

			for (k = 0; k < numprocs; ++k) {
				vmp = &pjob->ji_vnods[procindex];
				vmp->vn_node = procindex++;
				vmp->vn_host = hp;
				if (pmm->mvm_hostn) {
					/* copy the true host name */
					vmp->vn_hname = strdup(pmm->mvm_hostn);
					if (vmp->vn_hname == NULL)
						return (PBSE_SYSTEM);
				} else {
					/* set null and we will use the Mom name */
					vmp->vn_hname = NULL;
				}
				vmp->vn_vname = strdup(pmm->mvm_name);
				if (vmp->vn_vname == NULL) {
					if (vmp->vn_hname != NULL) {
						free(vmp->vn_hname);
						vmp->vn_hname = NULL;
					}
					return (PBSE_SYSTEM);
				}
				vmp->vn_cpus = have.rl_ncpus;
				vmp->vn_mem = have.rl_mem;
				vmp->vn_vmem = have.rl_vmem;
				vmp->vn_mpiprocs = numprocs;
				vmp->vn_threads = nthreads;
				vmp->vn_naccels = have.rl_naccels;
				vmp->vn_need_accel = need_accel;
				if (vmp->vn_need_accel || (vmp->vn_naccels > 0)) {
					if (accel_model) {
						vmp->vn_accel_model = strdup(accel_model);
					}
					vmp->vn_accel_mem = have.rl_accel_mem;
				}

				/* mark next entry as the (current) end */
				pjob->ji_vnods[procindex].vn_node = TM_ERROR_NODE;
			}
		}

		/* do next section of schedselect */
		psubspec = parse_plus_spec_r(slast, &slast, &hpn);
	}

	pjob->ji_numnodes = momindex;
	pjob->ji_numvnod = procindex;
	pjob->ji_num_assn_vnodes = assn_index;

	return (0);
}

/**
 * @brief
 * 	wrapper function that calls job_nodes_inner with a NULL parameter
 * 	for the "mynodeid" parameter
 *
 * @param[in] pjob - job pointer
 *
 * @return 	int
 * @retval	PBSE_NONE (0) Success
 * @retval      PBSE_* on error.
 *
 */

int
job_nodes(struct job *pjob)
{
	return job_nodes_inner(pjob, NULL);
}

/**
 * @brief
 * 	start_exec() - start execution of a job
 *
 * @param[in] pjob - job pointer
 *
 * @return	Void
 *
 */
void
start_exec(job *pjob)
{
	eventent *ep = NULL;
	int i, nodenum;
	pbs_socklen_t len;
	int socks[2];
	struct sockaddr_in saddr;
	hnodent *np;
	pbs_list_head phead;
	mom_hook_input_t hook_input;
	mom_hook_output_t hook_output;
	int hook_errcode = 0;
	int hook_rc = 0;
	char hook_msg[HOOK_MSG_SIZE];
	hook *last_phook = NULL;
	unsigned int hook_fail_action = 0;

	/* make sure we have an open tpp stream back to the server */
	if (server_stream == -1)
		send_hellosvr(server_stream);

#if MOM_ALPS
	/* set ALPS reservation id to -1 to indicate there isn't one yet */
	pjob->ji_extended.ji_ext.ji_reservation = -1;
#endif

	if (pjob->ji_mompost) { /* fail until activity is done */
		log_joberr(-1, __func__, "waiting for worktask completion",
			   pjob->ji_qs.ji_jobid);
		exec_bail(pjob, JOB_EXEC_RETRY, NULL);
		return;
	}

	/*
	 * Ensure we have a cookie for the job. The cookie consists of a
	 * string of 32 hex characters plus a null terminator. The machine
	 * architecture needs to be considered when populating the string
	 * because random() and lrand48() return a long int.
	 */

	if (!(is_jattr_set(pjob, JOB_ATR_Cookie))) {
		char tt[33];
		int i;

		for (i = 0; i < 33; i += sizeof(long)) {
			snprintf(&tt[i], 33 - i, "%.*lX", (int) sizeof(long), (unsigned long) random());
		}
		set_jattr_str_slim(pjob, JOB_ATR_Cookie, tt, NULL);
		DBPRT(("===== COOKIE %s\n", tt))
	}

	if ((i = job_nodes(pjob)) != 0) {
		sprintf(log_buffer, "job_nodes failed with error %d", i);
		log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_NOTICE,
			  pjob->ji_qs.ji_jobid, log_buffer);
		nodes_free(pjob);
		exec_bail(pjob, JOB_EXEC_RETRY, NULL);
		return;
	}
	pjob->ji_nodeid = 0; /* I'm MS */
	nodenum = pjob->ji_numnodes;

	if (do_tolerate_node_failures(pjob))
		reliable_job_node_add(&pjob->ji_node_list, mom_host);

	if (mock_run) {
		pjob->ji_ports[0] = -1;
		pjob->ji_ports[1] = -1;
		pjob->ji_stdout = -1;
		pjob->ji_stderr = -1;

		mock_run_finish_exec(pjob);
		return;
	}

	if (nodenum > 1) {
		int nodemux = 0;
		int mtfd = -1;
		int com;

		pjob->ji_resources = (noderes *) calloc(nodenum - 1,
							sizeof(noderes));
		assert(pjob->ji_resources != NULL);
		pjob->ji_numrescs = nodenum - 1;

		/* pjob->ji_numrescs is the number of entries in pjob->ji_resources array,
		 * which houses the resources obtained from the SISTER moms attached to the
		 * job. So pjob->ji_resources[0] is actually the resources from sister mom #1,
		 * pjob->ji_resources[1] is the resources from sister mom #2, and so on.
		 * Correlating this to the pjob->ji_hosts array,
		 * pjob->ji_hosts[0] refers to the MS entry which won't have an entry in the
		 * pjob->ji_resources array since that is for sisters only.
		 * pjob->ji_hosts[1] is sister #1 whose resources obtained for the job
		 * is in pjob->ji_resources[0],
		 * pjob->ji_hosts[2] is sister #2 whose resources obtained for the job is in
		 * pjob->ji_resources[1], and so on.
		 * This is why pjob->ji_numnodes = pjob->numrescs + 1.
		 */
		CLEAR_HEAD(phead);
		for (i = 0; i < (int) JOB_ATR_LAST; i++) {
			(void) (job_attr_def + i)->at_encode(get_jattr(pjob, i), &phead, (job_attr_def + i)->at_name, NULL, ATR_ENCODE_MOM, NULL);
		}
		attrl_fixlink(&phead);
		/*
		 **		Open streams to the sisterhood.
		 */
		if (pbs_conf.pbs_use_mcast == 1) {
			/* open the tpp mcast channel here */
			if ((mtfd = tpp_mcast_open()) == -1) {
				sprintf(log_buffer, "mcast open failed");
				log_err(errno, __func__, log_buffer);
				exec_bail(pjob, JOB_EXEC_FAIL1, NULL);
				return;
			}
		}

		for (i = 1; i < nodenum; i++) {
			np = &pjob->ji_hosts[i];

			np->hn_stream = tpp_open(np->hn_host, np->hn_port);
			if (np->hn_stream < 0) {
				sprintf(log_buffer, "tpp_open failed on %s:%d",
					np->hn_host, np->hn_port);
				log_err(errno, __func__, log_buffer);
				exec_bail(pjob, JOB_EXEC_FAIL1, NULL);
				return;
			}
			if (pbs_conf.pbs_use_mcast == 1) {
				/* add each of the tpp streams to the tpp mcast channel */
				if (tpp_mcast_add_strm(mtfd, np->hn_stream, FALSE) == -1) {
					tpp_close(np->hn_stream);
					np->hn_stream = -1;
					tpp_mcast_close(mtfd);
					sprintf(log_buffer, "mcast add failed");
					log_err(errno, __func__, log_buffer);
					exec_bail(pjob, JOB_EXEC_FAIL1, NULL);
					return;
				}
			}
		}

		if (is_jattr_set(pjob, JOB_ATR_nodemux))
			nodemux = get_jattr_long(pjob, JOB_ATR_nodemux);

		/*
		 **		Send out a JOIN_JOB/RESTART message to all the MOM's in
		 **		the sisterhood.
		 */
		if ((pjob->ji_qs.ji_svrflags & JOB_SVFLG_CHKPT) ||
		    (pjob->ji_qs.ji_svrflags & JOB_SVFLG_ChkptMig)) {

			/*
			 * NULL value passed to hook_input.vnl means to assign
			 * vnode list using pjob->ji_host[].
			 */
			mom_hook_input_init(&hook_input);
			hook_input.pjob = pjob;

			mom_hook_output_init(&hook_output);
			hook_output.reject_errcode = &hook_errcode;
			hook_output.last_phook = &last_phook;
			hook_output.fail_action = &hook_fail_action;

			switch ((hook_rc = mom_process_hooks(HOOK_EVENT_EXECJOB_BEGIN,
							     PBS_MOM_SERVICE_NAME, mom_host,
							     &hook_input, &hook_output,
							     hook_msg, sizeof(hook_msg), 1))) {
				case 1: /* explicit accept */
					break;
				case 2: /* no hook script executed - go ahead and accept event*/
					break;
				default:
					/* a value of '0' means explicit reject encountered. */
					if (hook_rc != 0) {
						/*
						 * we've hit an internal error (malloc error, full disk, etc...), so
						 * treat this now like a  hook error so hook fail_action will be
						 * consulted. Before, behavior of an internal error was to ignore it!
						 */
						hook_errcode = PBSE_HOOKERROR;
						send_hook_fail_action(last_phook);
					}
					exec_bail(pjob, JOB_EXEC_FAIL1, NULL);
					return;
			}

			if ((i = job_setup(pjob, NULL)) != JOB_EXEC_OK) {
				exec_bail(pjob, i, NULL);
				return;
			}

			/* new tasks can't talk to demux anymore */
			nodemux = 0;

			com = IM_RESTART;
			pjob->ji_mompost = post_restart;

			if ((i = local_restart(pjob, NULL)) != 0) {
				post_restart(pjob, i);
				exec_bail(pjob, (i == PBSE_CKPBSY) ? JOB_EXEC_RETRY : JOB_EXEC_FAIL2, NULL);
				return;
			}
		} else
			com = IM_JOIN_JOB;

		if (nodemux) {
			pjob->ji_ports[0] = -1;
			pjob->ji_ports[1] = -1;
			pjob->ji_stdout = -1;
			pjob->ji_stderr = -1;
		} else {
			/*
			 * Open two sockets for use by demux program later.
			 */
			for (i = 0; i < 2; i++)
				socks[i] = -1;
			for (i = 0; i < 2; i++) {
				if ((socks[i] = socket(AF_INET,
						       SOCK_STREAM, 0)) == -1)
					break;

				memset(&saddr, '\0', sizeof(saddr));
				saddr.sin_addr.s_addr = INADDR_ANY;
				saddr.sin_family = AF_INET;
				if (bind(socks[i], (struct sockaddr *) &saddr,
					 sizeof(saddr)) == -1)
					break;

				len = sizeof(saddr);
				if (getsockname(socks[i],
						(struct sockaddr *) &saddr,
						&len) == -1)
					break;
				pjob->ji_ports[i] = (int) ntohs(saddr.sin_port);
			}
			if (i < 2) {
				log_err(errno, __func__, "stdout/err socket");
				for (i = 0; i < 2; i++) {
					if (socks[i] != -1)
						close(socks[i]);
				}
				exec_bail(pjob, JOB_EXEC_FAIL1, NULL);
				return;
			}
			pjob->ji_stdout = socks[0];
			pjob->ji_stderr = socks[1];
			pjob->ji_extended.ji_ext.ji_stdout = pjob->ji_ports[0];
			pjob->ji_extended.ji_ext.ji_stderr = pjob->ji_ports[1];
		}

		for (i = 1; i < nodenum; i++) {
			np = &pjob->ji_hosts[i];

			if (i == 1)
				ep = event_alloc(pjob, com, -1, np,
						 TM_NULL_EVENT, TM_NULL_TASK);
			else
				ep = event_dup(ep, pjob, np);

			if (ep == NULL) {
				exec_bail(pjob, JOB_EXEC_FAIL1, NULL);
				return;
			}
			if (pbs_conf.pbs_use_mcast == 0)
				send_join_job_restart(com, ep, i, pjob, &phead);
		}
		if (pbs_conf.pbs_use_mcast == 1) {
			send_join_job_restart_mcast(mtfd, com, ep, i, pjob, &phead);
			tpp_mcast_close(mtfd);
		}

		free_attrlist(&phead);
		if (do_tolerate_node_failures(pjob)) {
			if (!check_job_substate(pjob, JOB_SUBSTATE_WAITING_JOIN_JOB)) {
				set_job_substate(pjob, JOB_SUBSTATE_WAITING_JOIN_JOB);
				pjob->ji_joinalarm = time_now + joinjob_alarm_time;
				sprintf(log_buffer, "job waiting up to %ld secs ($sister_join_job_alarm) for all sister moms to join", joinjob_alarm_time);
				log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_INFO, pjob->ji_qs.ji_jobid, log_buffer);
				log_buffer[0] = '\0';
			}
		}
	} else { /* no sisters */
		pjob->ji_ports[0] = -1;
		pjob->ji_ports[1] = -1;
		pjob->ji_stdout = -1;
		pjob->ji_stderr = -1;

		/*
		 * All the JOIN messages have come in.
		 */
		switch (pre_finish_exec(pjob, 0)) {
			case PRE_FINISH_SUCCESS:
				finish_exec(pjob);
				break;
			default:
				exec_bail(pjob, JOB_EXEC_RETRY, NULL);
				return;
		}
	}
}

/**
 * @brief
 *	Forks a child process, with the parent process returning the child
 *	process id, while the child closes shuts down tpp, and closes
 *	network descriptors, and turns off alarm.
 *
 * @param[in]	conn	- connection file descriptor to NOT close in the child.
 * @note
 * 	If 'conn' is the impossible -1, then ALL connection descriptors will
 *	be closed.
 *
 * @return 	pid_t
 * @retval	child process id	Success
 *
 */

pid_t
fork_me(int conn)
{
	struct sigaction act;
	pid_t pid;

	fflush(stdout);
	fflush(stderr);

	pid = fork();
	if (pid == 0) {
		/* now the child */

		/* Turn off alarm if it should happen to be on */
		alarm(0);
		tpp_terminate();
		(void) close(lockfds);

		/* Reset signal actions for most to SIG_DFL */
		sigemptyset(&act.sa_mask);
		act.sa_flags = 0;
		act.sa_handler = SIG_DFL;
		(void) sigaction(SIGCHLD, &act, NULL);
		(void) sigaction(SIGINT, &act, NULL);
		(void) sigaction(SIGTERM, &act, NULL);
		act.sa_handler = SIG_IGN;
		(void) sigaction(SIGHUP, &act, NULL);

		/* Reset signal mask */
		(void) sigprocmask(SIG_SETMASK, &act.sa_mask, NULL);

		(void) mom_close_poll();
		net_close(conn); /* close all but for the current */
	} else if (pid < 0)
		log_err(errno, __func__, "fork failed");

	return (pid);
}

/**
 * @brief
 * 	starter_return - return starter value,
 *	exit if negative
 *
 */

void
starter_return(int upfds, int downfds, int code, struct startjob_rtn *sjrtn)
{
	struct startjob_rtn ack;

	sjrtn->sj_code = code;
	(void) writepipe(upfds, sjrtn, sizeof(*sjrtn));

	/* wait for acknowledgement */
	(void) readpipe(downfds, &ack, sizeof(ack));
	(void) close(upfds);
	(void) close(downfds);
	if (code < 0) {
		exit(254);
	}
}

/**
 * @brief	Open a file as the user.
 *
 * @par	Purpose
 *	This is done to prevent a security problem where in "root" opens a file
 *	in a public directory and then changes ownership.   If a symlink
 *	already existed, we could have a problem.
 *
 * @param[in] path  - full path of file to open/create
 * @param[in] oflag - access mode, see oflag of open(2)
 * @param[in] mode  - file creation mode (permissions), see mode of open(2)
 * @param[in] exuid - effective uid of user as which to open/create file
 * @param[in] exgid - effective gid of user as which to open/create file
 *
 * @return int
 * @retval -1  - error on open/create or impersonating the user
 * @retval >=0 - the opened file descriptor
 *
 */

int
open_file_as_user(char *path, int oflag, mode_t mode, uid_t exuid, gid_t exgid)
{
	int fds;
	int open_errno = 0;
	extern gid_t pbsgroup;

	/* must open or create file as the user */

	if (impersonate_user(exuid, exgid) == -1)
		return -1;

	if ((fds = open(path, oflag, mode)) == -1)
		open_errno = errno;

	revert_from_user();

	if (open_errno)
		errno = open_errno;

	return (fds);
}

/**
 * @brief
 *	create_file_securely - create job's output/error file in a secure
 *	manner, but as the user.
 *
 * @par Functionality:
 *	This function is used when we need to create the job's output/error.
 *	In the spool directory, anyone can create a file there, so we need to
 *	open in a manner that a hacker cannot replace the file with a link
 *	and not open it to create it as a link may be there and we don't wish
 *	to create the link's target.
 *
 *	Uses mktemp() to create a file with a name that does not
 *	currently exist, and then rename this file to the correct name for
 *	the job.  Then set correct permissions based on user's umask for the
 *	job and set the O_APPEND flag so that qmsg can write without having
 *	its text over written by the job.
 *
 * @param[in]	path  - path to the file to be created
 * @param[in]	exuid - uid for file owner
 * @param[in]	exgid - gid for file owner
 *
 * @return	int
 * @retval	file descriptor (>=0) if success
 * @retval	-1 on error.
 *
 * @par MT-safe: likely no
 *
 */
static int
create_file_securely(char *path, uid_t exuid, gid_t exgid)
{
	char buf[MAXPATHLEN + 1];
	char *pc;
	mode_t cur_mask;
	int fds;

	/* create a uniquely named file using mkstemp() */
	/* for that we need to setup the template       */

	pbs_strncpy(buf, path, sizeof(buf));
	pc = strrchr(buf, '/'); /* last slash in path */
	if (pc == NULL)
		return (-1);
	++pc;
	if ((pc - buf) > (MAXPATHLEN - 6))
		return (-1);  /* path too long, unlikely to happen */
	strcpy(pc, "XXXXXX"); /* mkstemp template */

	/* The user's umask has already been set.  Get it for later */

	cur_mask = umask(022);
	(void) umask(cur_mask); /* reset it back */

	/*
	 * become the user, i.e. set effective privileges
	 * IMPORTANT - once we have changed to the user privilege,
	 * DO NOT return until have changed back to root
	 */

	if (impersonate_user(exuid, exgid) == -1)
		return -1;

	fds = mkstemp(buf); /* create file */
	if (fds != -1) {

		/* change permissions based on user's umask */
		/* because mkstemp() ignores umask; then    */
		/* rename to what we want for the job file  */

		if ((fchmod(fds, 0666 & ~cur_mask) == -1) ||
		    (rename(buf, path) == -1)) {
			(void) close(fds);
			(void) unlink(buf);
			fds = -1;
		} else {
			int acc;

			/* add O_APPEND to the file descriptor so that lines */
			/* written by qmsg are not overwritten by the job    */

			acc = fcntl(fds, F_GETFL);
			if (acc == -1) {
				(void) close(fds);
				(void) unlink(path);
				fds = -1;
			}
			acc = (acc & ~O_ACCMODE) | O_APPEND;
			if (fcntl(fds, F_SETFL, acc) == -1) {
				(void) close(fds);
				(void) unlink(path);
				fds = -1;
			}
		}
	}

	/* return to having full administrative (root) privileges */
	revert_from_user();

	/* now we can return */
	return (fds);
}

/**
 * @brief
 *	Generate the fully qualified path name for a job's standard stream
 *
 * @par Functionality:
 *	Creates a fully qualified path name for the output/error file for the following cases:
 *	1.  Interactive PBS job, just return the output attribute value,  it won't be used.
 *	2.  qsub -k option was specified for the file, retain in User's Home directory unless
 *	    sandbox=PRIVATE, in which case it goes there.  The file name is the default of
 *	    job_name.a|eo<sequence number>
 *	3.  If sandbox=PRIVATE, the file is placed there.
 *	4.  If direct_write is specified and the final destination of the file is mapped to a local directory by $usecp,
 *	    create the file in its final destination directory and set the "keeping" flag so it will not be staged.
 *	5.  Else, the file path is created to put the file in PBS_HOME/spool
 * @param[in]  pjob - pointer to job structure
 * @param[in]  which - identifies which file: StdOut, StdErr, or Chkpt.
 * @param[out] keeping - set true if file to reside in User's Home or sandbox, false if in spool.
 *
 * @return char * - pointer to path which is in a static array.
 *
 * @par @par MT-safe: No
 */

char *
std_file_name(job *pjob, enum job_file which, int *keeping)
{
	static char path[MAXPATHLEN + 1];
	char key = '\001'; /* should never be found */
	int len;
	char *pd;
	char *suffix = NULL;

	if (is_jattr_set(pjob, JOB_ATR_interactive) && (get_jattr_long(pjob, JOB_ATR_interactive) > 0)) {

		/* interactive job, name of pty is in outpath */

		*keeping = 0;
		return (get_jattr_str(pjob, JOB_ATR_outpath));
	}

	switch (which) {
		case StdOut:
			key = 'o';
			suffix = JOB_STDOUT_SUFFIX;
			break;

		case StdErr:
			key = 'e';
			suffix = JOB_STDERR_SUFFIX;
			break;

		case Chkpt:
			suffix = JOB_CKPT_SUFFIX;
			break;

		default:
			break;
	}

	if (pjob->ji_grpcache == NULL)
		return (""); /* needs to be non-NULL for figuring out homedir path; otherwise, mom will crash! */

	/* check if file is to be directly written to its final destination */
	if (is_direct_write(pjob, which, path, &direct_write_possible)) {
		*keeping = 1; /* inhibit staging */
		return (path);
	}

	/* Is file to be kept?, if so use default name in Home directory */
	else if ((is_jattr_set(pjob, JOB_ATR_keep)) &&
		 strchr(get_jattr_str(pjob, JOB_ATR_keep), key) && !strchr(get_jattr_str(pjob, JOB_ATR_keep), 'd')) {
		/* sandbox=private mode set the path to be the path to the */
		/* staging and execution directory			   */
		if ((is_jattr_set(pjob, JOB_ATR_sandbox)) && (strcasecmp(get_jattr_str(pjob, JOB_ATR_sandbox), "PRIVATE") == 0)) {
			strcpy(path, jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir));
			*keeping = 1;
		} else
			strcpy(path, pjob->ji_grpcache->gc_homedir);

		pd = strrchr(get_jattr_str(pjob, JOB_ATR_jobname), '/');
		if (pd == NULL) {
			pd = get_jattr_str(pjob, JOB_ATR_jobname);
			strcat(path, "/");
		}

		strcat(path, pd); /* start with the job name */
		len = strlen(path);
		*(path + len++) = '.';	   /* the dot        */
		*(path + len++) = key;	   /* the letter     */
		pd = pjob->ji_qs.ji_jobid; /* the seq_number */
		while (isdigit((int) *pd))
			*(path + len++) = *pd++;
		*(path + len) = '\0';
		if (is_jattr_set(pjob, JOB_ATR_array_index)) {
			/* this is a sub job of an Array Job, append .index */
			strcat(path, ".");
			strcat(path, get_jattr_str(pjob, JOB_ATR_array_index));
		}
		*keeping = 1;
	} else {
		/* put into spool directory unless NO_SPOOL_OUTPUT is defined */

#ifdef NO_SPOOL_OUTPUT
		/* sandbox=PRIVATE mode puts output in job staging and execution directory */
		if (is_jattr_set(pjob, JOB_ATR_sandbox) &&
		    (strcasecmp(get_jattr_str(pjob, JOB_ATR_sandbox), "PRIVATE") == 0)) {
			strcpy(path, jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir));
			strcat(path, "/");
		} else /* force all output to user's HOME */
			strcpy(path, pjob->ji_grpcache->gc_homedir);

		strcat(path, "/");
		*keeping = 1;
#else  /* NO_SPOOL_OUTPUT */
		/* sandbox=PRIVATE mode puts output in job staging and execution directory */
		if (is_jattr_set(pjob, JOB_ATR_sandbox) &&
		    (strcasecmp(get_jattr_str(pjob, JOB_ATR_sandbox), "PRIVATE") == 0)) {
			strcpy(path, jobdirname(pjob->ji_qs.ji_jobid, pjob->ji_grpcache->gc_homedir));
			strcat(path, "/");
			*keeping = 1;
		} else {
			strcpy(path, path_spool);
			*keeping = 0;
		}
#endif /* NO_SPOOL_OUTPUT */
		if (*pjob->ji_qs.ji_fileprefix != '\0')
			(void) strcat(path, pjob->ji_qs.ji_fileprefix);
		else
			(void) strcat(path, pjob->ji_qs.ji_jobid);
		if (suffix)
			(void) strcat(path, suffix);
	}
	return (path);
}

/**
 * @brief
 *	Open (create) either standard output or standard error for the job.
 * @par
 *	Open, likely creating, the job file in a secure manner.
 *	If the job is interactive, connecting to a pseudo terminal, or the file is being opened
 *	in the User's Home or sandbox,  it is opened as the user.
 *	In spool, it is a bit more complex; must make sure an attacker cannot try and slip in a
 *	symbolic link which would cause the file to overwrite something else.
 *
 * @param[in] pjob  - pointer to job structure
 * @param[in] which - which file to create, StdOut, StdErr, or Chkpt
 * @param[in] mode  - file open oflag (O_CREAT, O_WRONLY, ...)
 * @param[in] exgid - User's gid
 *
 * @return 	int
 * @retval	fd	On success
 * @retval	-1	on failure
 *
 */

int
open_std_file(job *pjob, enum job_file which, int mode, gid_t exgid)
{
	uid_t exuid;
	int fds;
	int keeping = 0;
	char *path;
	struct stat sb;

	if (!pjob)
		return (-1);
	if (!pjob->ji_grpcache)
		return (-1);
	exuid = pjob->ji_grpcache->gc_uid;
	path = std_file_name(pjob, which, &keeping);

	/* must open or create file as the user */

	/*
	 * If the job is interactive, the tty device file is "safe" in a
	 * protected directory,  otherwise check others for security.
	 */

	if (is_jattr_set(pjob, JOB_ATR_interactive) != 0 && get_jattr_long(pjob, JOB_ATR_interactive) > 0)
		fds = open_file_as_user(path, mode, 0644, exuid, exgid);
	else if (keeping) {
		/* The user is keeping the file in his Home directory or sandbox, */
		/* both are safe and the file can be opened directly.             */
		fds = open_file_as_user(path, mode, 0644, exuid, exgid);
	} else {

		/* File going into the spool area... */

		int lrc;

		/*
		 * If the file does not already exist (the typical case for a
		 * job running the first time) or exists, but is a regular file
		 * owned by the right user (if job has run before), then we
		 * will open it.  BUT, we need to recheck because there is a
		 * (very) small window in which it could be changed.
		 *
		 * If the file exists, but isn't a regular file owned by the
		 * right user, there is a problem.  In this case, (see "else")
		 * we don't want to open it as this could create a file as the
		 * target of a link,  we make it using a more secure manner
		 * using mkstemp().
		 */
		if (((lrc = lstat(path, &sb)) != -1) && ((sb.st_mode & S_IFMT) == S_IFREG) && (sb.st_nlink == 1) && (sb.st_uid == exuid) && (sb.st_gid == exgid)) {

			/* at this point all is ok, go open it */
			fds = open_file_as_user(path, mode, 0644, exuid, exgid);
			if (fds == -1)
				return (-1);

			/* Recheck what is opened, it might have  */
			/* changed between the check and the open */

			if ((fstat(fds, &sb) == -1) || ((sb.st_mode & S_IFMT) != S_IFREG) || (sb.st_nlink != 1) || (sb.st_uid != exuid) || (sb.st_gid != exgid)) {

				/* Its bad now,  log it, leaving */
				/* it in place as evidence       */
				if (fds != -1)
					close(fds);

				log_suspect_file(__func__, "bad type or owner", path, &sb);
				return (-1);
			}
		} else {

			if ((lrc != -1) || (errno != ENOENT)) {
				/* file exists but is suspect */
				log_suspect_file(__func__,
						 "bad type or owner, attempting to remove file", path, &sb);
				(void) unlink(path);
			}

			/* file does not exist or is not correct */
			/* create the file in a secure manner    */

			if ((fds = create_file_securely(path, exuid, exgid)) == -1) {
				sprintf(log_buffer,
					"secure create of file failed for job %s for user %u",
					pjob->ji_qs.ji_jobid, exuid);
				if (stat(path, &sb) != -1) {
					strcat(log_buffer, ", file exists");
					log_suspect_file(__func__, log_buffer, path, &sb);
				} else {
					log_record(PBSEVENT_SECURITY, PBS_EVENTCLASS_FILE, LOG_CRIT,
						   path, log_buffer);
				}
				return (-1);
			}
		}
	}

	return (fds);
}

/**
 * @brief
 *	catchinter = catch death of writer child of interactive job
 *	and kill off the shell child.
 *
 * @param[in] sig - signal number
 *
 * @return	Void
 *
 */

static void
catchinter(int sig)
{
	int status;
	pid_t pid;

	pid = waitpid(-1, &status, WNOHANG);
	if (pid == 0)
		return;
	if (pid == writerpid) {
		kill(shellpid, SIGKILL);
#if defined(HAVE_LIBKAFS) || defined(HAVE_LIBKOPENAFS)
		waitpid(shellpid, &status, WNOHANG);
#else
		(void) wait(&status);
#endif
		mom_reader_go = 0;
		x11_reader_go = 0;
	}
}
/**
 * @brief
 *	log_mom_portfw_msg - used to log a port forwarding error message to
 *	MOM logs
 *
 * @param[in]	msg -  pointer to error message
 *
 * @return	None
 *
 */
void
log_mom_portfw_msg(char *msg)
{
	strcpy(log_buffer, msg);
	log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_JOB, LOG_ERR, __func__, log_buffer);
}
