/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1

#include <thrift/Thrift.h>
#include <memory>
#include <thrift/server/TServer.h>
#include <thrift/transport/PlatformSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TNonblockingServerTransport.h>
#include <thrift/concurrency/ThreadManager.h>
#include <climits>
#include <thrift/concurrency/Thread.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Mutex.h>
#include <stack>
#include <vector>
#include <string>
#include <cstdlib>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <event.h>
#include <event2/event_compat.h>
#include <event2/event_struct.h>

namespace apache {
namespace thrift {
namespace server {

using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TNonblockingServerTransport;
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::Mutex;
using apache::thrift::concurrency::Guard;

#ifdef LIBEVENT_VERSION_NUMBER
#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
#else
// assume latest version 1 series
#define LIBEVENT_VERSION_MAJOR 1
#define LIBEVENT_VERSION_MINOR 14
#define LIBEVENT_VERSION_REL 13
#define LIBEVENT_VERSION_NUMBER                                                                    \
  ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
#endif

#if LIBEVENT_VERSION_NUMBER < 0x02000000
typedef THRIFT_SOCKET evutil_socket_t;
#endif

#ifndef SOCKOPT_CAST_T
#ifndef _WIN32
#define SOCKOPT_CAST_T void
#else
#define SOCKOPT_CAST_T char
#endif // _WIN32
#endif

template <class T>
inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
  return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
}

template <class T>
inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
  return reinterpret_cast<SOCKOPT_CAST_T*>(v);
}

/**
 * This is a non-blocking server in C++ for high performance that
 * operates a set of IO threads (by default only one). It assumes that
 * all incoming requests are framed with a 4 byte length indicator and
 * writes out responses using the same framing.
 */

/// Overload condition actions.
enum TOverloadAction {
  T_OVERLOAD_NO_ACTION,       ///< Don't handle overload */
  T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
  T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
};

class TNonblockingIOThread;

class TNonblockingServer : public TServer {
private:
  class TConnection;

  friend class TNonblockingIOThread;

private:
  /// Listen backlog
  static const int LISTEN_BACKLOG = 1024;

  /// Default limit on size of idle connection pool
  static const size_t CONNECTION_STACK_LIMIT = 1024;

  /// Default limit on frame size
  static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;

  /// Default limit on total number of connected sockets
  static const int MAX_CONNECTIONS = INT_MAX;

  /// Default limit on connections in handler/task processing
  static const int MAX_ACTIVE_PROCESSORS = INT_MAX;

  /// Default size of write buffer
  static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;

  /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
  static const int IDLE_READ_BUFFER_LIMIT = 1024;

  /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
  static const int IDLE_WRITE_BUFFER_LIMIT = 1024;

  /// # of calls before resizing oversized buffers (0 = check only on close)
  static const int RESIZE_BUFFER_EVERY_N = 512;

  /// # of IO threads to use by default
  static const int DEFAULT_IO_THREADS = 1;

  /// # of IO threads this server will use
  size_t numIOThreads_;

  /// Whether to set high scheduling priority for IO threads
  bool useHighPriorityIOThreads_;

  /// Server socket file descriptor
  THRIFT_SOCKET serverSocket_;

  /// The optional user-provided event-base (for single-thread servers)
  event_base* userEventBase_;

  /// For processing via thread pool, may be nullptr
  std::shared_ptr<ThreadManager> threadManager_;

  /// Is thread pool processing?
  bool threadPoolProcessing_;

  // Factory to create the IO threads
  std::shared_ptr<ThreadFactory> ioThreadFactory_;

  // Vector of IOThread objects that will handle our IO
  std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;

  // Index of next IO Thread to be used (for round-robin)
  uint32_t nextIOThread_;

  // Synchronizes access to connection stack and similar data
  Mutex connMutex_;

  /// Number of TConnection object we've created
  size_t numTConnections_;

  /// Number of Connections processing or waiting to process
  size_t numActiveProcessors_;

  /// Limit for how many TConnection objects to cache
  size_t connectionStackLimit_;

  /// Limit for number of connections processing or waiting to process
  size_t maxActiveProcessors_;

  /// Limit for number of open connections
  size_t maxConnections_;

  /// Limit for frame size
  size_t maxFrameSize_;

  /// Time in milliseconds before an unperformed task expires (0 == infinite).
  int64_t taskExpireTime_;

  /**
   * Hysteresis for overload state.  This is the fraction of the overload
   * value that needs to be reached before the overload state is cleared;
   * must be <= 1.0.
   */
  double overloadHysteresis_;

  /// Action to take when we're overloaded.
  TOverloadAction overloadAction_;

  /**
   * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
   * and found to be exceeded, reinitialized) to this size.
   */
  size_t writeBufferDefaultSize_;

  /**
   * Max read buffer size for an idle TConnection.  When we place an idle
   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
   * we will free the buffer (such that it will be reinitialized by the next
   * received frame) if it has exceeded this limit.  0 disables this check.
   */
  size_t idleReadBufferLimit_;

  /**
   * Max write buffer size for an idle connection.  When we place an idle
   * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
   * we insure that its write buffer is <= to this size; otherwise we
   * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
   * idle connections don't hog memory. 0 disables this check.
   */
  size_t idleWriteBufferLimit_;

  /**
   * Every N calls we check the buffer size limits on a connected TConnection.
   * 0 disables (i.e. the checks are only done when a connection closes).
   */
  int32_t resizeBufferEveryN_;

  /// Set if we are currently in an overloaded state.
  bool overloaded_;

  /// Count of connections dropped since overload started
  uint32_t nConnectionsDropped_;

  /// Count of connections dropped on overload since server started
  uint64_t nTotalConnectionsDropped_;

  /**
   * This is a stack of all the objects that have been created but that
   * are NOT currently in use. When we close a connection, we place it on this
   * stack so that the object can be reused later, rather than freeing the
   * memory and reallocating a new object later.
   */
  std::stack<TConnection*> connectionStack_;

  /**
   * This container holds pointers to all active connections. This container
   * allows the server to clean up unlcosed connection objects at destruction,
   * which in turn allows their transports, protocols, processors and handlers
   * to deallocate and clean up correctly.
   */
  std::vector<TConnection*> activeConnections_;

  /*
  */
  std::shared_ptr<TNonblockingServerTransport> serverTransport_;

  /**
   * Called when server socket had something happen.  We accept all waiting
   * client connections on listen socket fd and assign TConnection objects
   * to handle those requests.
   *
   * @param which the event flag that triggered the handler.
   */
  void handleEvent(THRIFT_SOCKET fd, short which);

  void init() {
    serverSocket_ = THRIFT_INVALID_SOCKET;
    numIOThreads_ = DEFAULT_IO_THREADS;
    nextIOThread_ = 0;
    useHighPriorityIOThreads_ = false;
    userEventBase_ = nullptr;
    threadPoolProcessing_ = false;
    numTConnections_ = 0;
    numActiveProcessors_ = 0;
    connectionStackLimit_ = CONNECTION_STACK_LIMIT;
    maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
    maxConnections_ = MAX_CONNECTIONS;
    maxFrameSize_ = MAX_FRAME_SIZE;
    taskExpireTime_ = 0;
    overloadHysteresis_ = 0.8;
    overloadAction_ = T_OVERLOAD_NO_ACTION;
    writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
    idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
    idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
    resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
    overloaded_ = false;
    nConnectionsDropped_ = 0;
    nTotalConnectionsDropped_ = 0;
  }

public:
  TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
    : TServer(processorFactory), serverTransport_(serverTransport) {
    init();
  }

  TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
    : TServer(processor), serverTransport_(serverTransport) {
    init();
  }


  TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
                     const std::shared_ptr<TProtocolFactory>& protocolFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
                     const std::shared_ptr<ThreadManager>& threadManager
                     = std::shared_ptr<ThreadManager>())
    : TServer(processorFactory), serverTransport_(serverTransport) {
    init();

    setInputProtocolFactory(protocolFactory);
    setOutputProtocolFactory(protocolFactory);
    setThreadManager(threadManager);
  }

  TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
                     const std::shared_ptr<TProtocolFactory>& protocolFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
                     const std::shared_ptr<ThreadManager>& threadManager
                     = std::shared_ptr<ThreadManager>())
    : TServer(processor), serverTransport_(serverTransport) {
    init();

    setInputProtocolFactory(protocolFactory);
    setOutputProtocolFactory(protocolFactory);
    setThreadManager(threadManager);
  }

  TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
                     const std::shared_ptr<TTransportFactory>& inputTransportFactory,
                     const std::shared_ptr<TTransportFactory>& outputTransportFactory,
                     const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
                     const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
                     const std::shared_ptr<ThreadManager>& threadManager
                     = std::shared_ptr<ThreadManager>())
    : TServer(processorFactory), serverTransport_(serverTransport) {
    init();

    setInputTransportFactory(inputTransportFactory);
    setOutputTransportFactory(outputTransportFactory);
    setInputProtocolFactory(inputProtocolFactory);
    setOutputProtocolFactory(outputProtocolFactory);
    setThreadManager(threadManager);
  }

  TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
                     const std::shared_ptr<TTransportFactory>& inputTransportFactory,
                     const std::shared_ptr<TTransportFactory>& outputTransportFactory,
                     const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
                     const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
                     const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
                     const std::shared_ptr<ThreadManager>& threadManager
                     = std::shared_ptr<ThreadManager>())
    : TServer(processor), serverTransport_(serverTransport) {
    init();

    setInputTransportFactory(inputTransportFactory);
    setOutputTransportFactory(outputTransportFactory);
    setInputProtocolFactory(inputProtocolFactory);
    setOutputProtocolFactory(outputProtocolFactory);
    setThreadManager(threadManager);
  }

  ~TNonblockingServer() override;

  void setThreadManager(std::shared_ptr<ThreadManager> threadManager);

  int getListenPort() { return serverTransport_->getListenPort(); }

  std::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }

  /**
   * Sets the number of IO threads used by this server. Can only be used before
   * the call to serve() and has no effect afterwards.
   */
  void setNumIOThreads(size_t numThreads) {
    numIOThreads_ = numThreads;
    // User-provided event-base doesn't works for multi-threaded servers
    assert(numIOThreads_ <= 1 || !userEventBase_);
  }

  /** Return whether the IO threads will get high scheduling priority */
  bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }

  /** Set whether the IO threads will get high scheduling priority. */
  void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }

  /** Return the number of IO threads used by this server. */
  size_t getNumIOThreads() const { return numIOThreads_; }

  /**
   * Get the maximum number of unused TConnection we will hold in reserve.
   *
   * @return the current limit on TConnection pool size.
   */
  size_t getConnectionStackLimit() const { return connectionStackLimit_; }

  /**
   * Set the maximum number of unused TConnection we will hold in reserve.
   *
   * @param sz the new limit for TConnection pool size.
   */
  void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }

  bool isThreadPoolProcessing() const { return threadPoolProcessing_; }

  void addTask(std::shared_ptr<Runnable> task) {
    threadManager_->add(task, 0LL, taskExpireTime_);
  }

  /**
   * Return the count of sockets currently connected to.
   *
   * @return count of connected sockets.
   */
  size_t getNumConnections() const { return numTConnections_; }

  /**
   * Return the count of sockets currently connected to.
   *
   * @return count of connected sockets.
   */
  size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }

  /**
   * Return the count of connection objects allocated but not in use.
   *
   * @return count of idle connection objects.
   */
  size_t getNumIdleConnections() const { return connectionStack_.size(); }

  /**
   * Return count of number of connections which are currently processing.
   * This is defined as a connection where all data has been received and
   * either assigned a task (when threading) or passed to a handler (when
   * not threading), and where the handler has not yet returned.
   *
   * @return # of connections currently processing.
   */
  size_t getNumActiveProcessors() const { return numActiveProcessors_; }

  /// Increment the count of connections currently processing.
  void incrementActiveProcessors() {
    Guard g(connMutex_);
    ++numActiveProcessors_;
  }

  /// Decrement the count of connections currently processing.
  void decrementActiveProcessors() {
    Guard g(connMutex_);
    if (numActiveProcessors_ > 0) {
      --numActiveProcessors_;
    }
  }

  /**
   * Get the maximum # of connections allowed before overload.
   *
   * @return current setting.
   */
  size_t getMaxConnections() const { return maxConnections_; }

  /**
   * Set the maximum # of connections allowed before overload.
   *
   * @param maxConnections new setting for maximum # of connections.
   */
  void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }

  /**
   * Get the maximum # of connections waiting in handler/task before overload.
   *
   * @return current setting.
   */
  size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }

  /**
   * Set the maximum # of connections waiting in handler/task before overload.
   *
   * @param maxActiveProcessors new setting for maximum # of active processes.
   */
  void setMaxActiveProcessors(size_t maxActiveProcessors) {
    maxActiveProcessors_ = maxActiveProcessors;
  }

  /**
   * Get the maximum allowed frame size.
   *
   * If a client tries to send a message larger than this limit,
   * its connection will be closed.
   *
   * @return Maxium frame size, in bytes.
   */
  size_t getMaxFrameSize() const { return maxFrameSize_; }

  /**
   * Set the maximum allowed frame size.
   *
   * @param maxFrameSize The new maximum frame size.
   */
  void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }

  /**
   * Get fraction of maximum limits before an overload condition is cleared.
   *
   * @return hysteresis fraction
   */
  double getOverloadHysteresis() const { return overloadHysteresis_; }

  /**
   * Set fraction of maximum limits before an overload condition is cleared.
   * A good value would probably be between 0.5 and 0.9.
   *
   * @param hysteresisFraction fraction <= 1.0.
   */
  void setOverloadHysteresis(double hysteresisFraction) {
    if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
      overloadHysteresis_ = hysteresisFraction;
    }
  }

  /**
   * Get the action the server will take on overload.
   *
   * @return a TOverloadAction enum value for the currently set action.
   */
  TOverloadAction getOverloadAction() const { return overloadAction_; }

  /**
   * Set the action the server is to take on overload.
   *
   * @param overloadAction a TOverloadAction enum value for the action.
   */
  void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }

  /**
   * Get the time in milliseconds after which a task expires (0 == infinite).
   *
   * @return a 64-bit time in milliseconds.
   */
  int64_t getTaskExpireTime() const { return taskExpireTime_; }

  /**
   * Set the time in milliseconds after which a task expires (0 == infinite).
   *
   * @param taskExpireTime a 64-bit time in milliseconds.
   */
  void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }

  /**
   * Determine if the server is currently overloaded.
   * This function checks the maximums for open connections and connections
   * currently in processing, and sets an overload condition if they are
   * exceeded.  The overload will persist until both values are below the
   * current hysteresis fraction of their maximums.
   *
   * @return true if an overload condition exists, false if not.
   */
  bool serverOverloaded();

  /** Pop and discard next task on threadpool wait queue.
   *
   * @return true if a task was discarded, false if the wait queue was empty.
   */
  bool drainPendingTask();

  /**
   * Get the starting size of a TConnection object's write buffer.
   *
   * @return # bytes we initialize a TConnection object's write buffer to.
   */
  size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }

  /**
   * Set the starting size of a TConnection object's write buffer.
   *
   * @param size # bytes we initialize a TConnection object's write buffer to.
   */
  void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }

  /**
   * Get the maximum size of read buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will dealloc idle buffer.
   */
  size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }

  /**
   * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
   * Get the maximum size of read buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will dealloc idle buffer.
   */
  size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }

  /**
   * Set the maximum size read buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its read buffer, we free it and allow it to be reinitialized
   * on the next received frame.
   *
   * @param limit of bytes beyond which we will shrink buffers when checked.
   */
  void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }

  /**
   * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
   * Set the maximum size read buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its read buffer, we free it and allow it to be reinitialized
   * on the next received frame.
   *
   * @param limit of bytes beyond which we will shrink buffers when checked.
   */
  void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }

  /**
   * Get the maximum size of write buffer allocated to idle TConnection objects.
   *
   * @return # bytes beyond which we will reallocate buffers when checked.
   */
  size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }

  /**
   * Set the maximum size write buffer allocated to idle TConnection objects.
   * If a TConnection object is found (either on connection close or between
   * calls when resizeBufferEveryN_ is set) with more than this much memory
   * allocated to its write buffer, we destroy and construct that buffer with
   * writeBufferDefaultSize_ bytes.
   *
   * @param limit of bytes beyond which we will shrink buffers when idle.
   */
  void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }

  /**
   * Get # of calls made between buffer size checks.  0 means disabled.
   *
   * @return # of calls between buffer size checks.
   */
  int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }

  /**
   * Check buffer sizes every "count" calls.  This allows buffer limits
   * to be enforced for persistent connections with a controllable degree
   * of overhead. 0 disables checks except at connection close.
   *
   * @param count the number of calls between checks, or 0 to disable
   */
  void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }

  /**
   * Main workhorse function, starts up the server listening on a port and
   * loops over the libevent handler.
   */
  void serve() override;

  /**
   * Causes the server to terminate gracefully (can be called from any thread).
   */
  void stop() override;

  /// Creates a socket to listen on and binds it to the local port.
  void createAndListenOnSocket();

  /**
   * Register the optional user-provided event-base (for single-thread servers)
   *
   * This method should be used when the server is running in a single-thread
   * mode, and the event base is provided by the user (i.e., the caller).
   *
   * @param user_event_base the user-provided event-base. The user is
   * responsible for freeing the event base memory.
   */
  void registerEvents(event_base* user_event_base);

  /**
   * Returns the optional user-provided event-base (for single-thread servers).
   */
  event_base* getUserEventBase() const { return userEventBase_; }

  /** Some transports, like THeaderTransport, require passing through
   * the framing size instead of stripping it.
   */
  bool getHeaderTransport();

private:
  /**
   * Callback function that the threadmanager calls when a task reaches
   * its expiration time.  It is needed to clean up the expired connection.
   *
   * @param task the runnable associated with the expired task.
   */
  void expireClose(std::shared_ptr<Runnable> task);

  /**
   * Return an initialized connection object.  Creates or recovers from
   * pool a TConnection and initializes it with the provided socket FD
   * and flags.
   *
   * @param socket FD of socket associated with this connection.
   * @param addr the sockaddr of the client
   * @param addrLen the length of addr
   * @return pointer to initialized TConnection object.
   */
  TConnection* createConnection(std::shared_ptr<TSocket> socket);

  /**
   * Returns a connection to pool or deletion.  If the connection pool
   * (a stack) isn't full, place the connection object on it, otherwise
   * just delete it.
   *
   * @param connection the TConection being returned.
   */
  void returnConnection(TConnection* connection);
};

class TNonblockingIOThread : public Runnable {
public:
  // Creates an IO thread and sets up the event base.  The listenSocket should
  // be a valid FD on which listen() has already been called.  If the
  // listenSocket is < 0, accepting will not be done.
  TNonblockingIOThread(TNonblockingServer* server,
                       int number,
                       THRIFT_SOCKET listenSocket,
                       bool useHighPriority);

  ~TNonblockingIOThread() override;

  // Returns the event-base for this thread.
  event_base* getEventBase() const { return eventBase_; }

  // Returns the server for this thread.
  TNonblockingServer* getServer() const { return server_; }

  // Returns the number of this IO thread.
  int getThreadNumber() const { return number_; }

  // Returns the thread id associated with this object.  This should
  // only be called after the thread has been started.
  Thread::id_t getThreadId() const { return threadId_; }

  // Returns the send-fd for task complete notifications.
  evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }

  // Returns the read-fd for task complete notifications.
  evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }

  // Returns the actual thread object associated with this IO thread.
  std::shared_ptr<Thread> getThread() const { return thread_; }

  // Sets the actual thread object associated with this IO thread.
  void setThread(const std::shared_ptr<Thread>& t) { thread_ = t; }

  // Used by TConnection objects to indicate processing has finished.
  bool notify(TNonblockingServer::TConnection* conn);

  // Enters the event loop and does not return until a call to stop().
  void run() override;

  // Exits the event loop as soon as possible.
  void stop();

  // Ensures that the event-loop thread is fully finished and shut down.
  void join();

  /// Registers the events for the notification & listen sockets
  void registerEvents();

private:
  /**
   * C-callable event handler for signaling task completion.  Provides a
   * callback that libevent can understand that will read a connection
   * object's address from a pipe and call connection->transition() for
   * that object.
   *
   * @param fd the descriptor the event occurred on.
   */
  static void notifyHandler(evutil_socket_t fd, short which, void* v);

  /**
   * C-callable event handler for listener events.  Provides a callback
   * that libevent can understand which invokes server->handleEvent().
   *
   * @param fd the descriptor the event occurred on.
   * @param which the flags associated with the event.
   * @param v void* callback arg where we placed TNonblockingServer's "this".
   */
  static void listenHandler(evutil_socket_t fd, short which, void* v) {
    ((TNonblockingServer*)v)->handleEvent(fd, which);
  }

  /// Exits the loop ASAP in case of shutdown or error.
  void breakLoop(bool error);

  /// Create the pipe used to notify I/O process of task completion.
  void createNotificationPipe();

  /// Unregisters our events for notification and listen sockets.
  void cleanupEvents();

  /// Sets (or clears) high priority scheduling status for the current thread.
  void setCurrentThreadHighPriority(bool value);

private:
  /// associated server
  TNonblockingServer* server_;

  /// thread number (for debugging).
  const int number_;

  /// The actual physical thread id.
  Thread::id_t threadId_;

  /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
  THRIFT_SOCKET listenSocket_;

  /// Sets a high scheduling priority when running
  bool useHighPriority_;

  /// pointer to eventbase to be used for looping
  event_base* eventBase_;

  /// Set to true if this class is responsible for freeing the event base
  /// memory.
  bool ownEventBase_;

  /// Used with eventBase_ for connection events (only in listener thread)
  struct event serverEvent_;

  /// Used with eventBase_ for task completion notification
  struct event notificationEvent_;

  /// File descriptors for pipe used for task completion notification.
  evutil_socket_t notificationPipeFDs_[2];

  /// Actual IO Thread
  std::shared_ptr<Thread> thread_;
};
}
}
} // apache::thrift::server

#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
