# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Provides data ingestion logic backed by local event processing."""

import os
import re
import threading
import time


from tensorboard.backend.event_processing import data_provider
from tensorboard.backend.event_processing import plugin_event_multiplexer
from tensorboard.backend.event_processing import tag_types
from tensorboard.data import ingester
from tensorboard.plugins.audio import metadata as audio_metadata
from tensorboard.plugins.histogram import metadata as histogram_metadata
from tensorboard.plugins.image import metadata as image_metadata
from tensorboard.plugins.pr_curve import metadata as pr_curve_metadata
from tensorboard.plugins.scalar import metadata as scalar_metadata
from tensorboard.util import tb_logging


DEFAULT_SIZE_GUIDANCE = {
    tag_types.TENSORS: 10,
}

# TODO(@wchargin): Replace with something that works for third-party plugins.
DEFAULT_TENSOR_SIZE_GUIDANCE = {
    scalar_metadata.PLUGIN_NAME: 1000,
    image_metadata.PLUGIN_NAME: 10,
    audio_metadata.PLUGIN_NAME: 10,
    histogram_metadata.PLUGIN_NAME: 500,
    pr_curve_metadata.PLUGIN_NAME: 100,
}

logger = tb_logging.get_logger()


class LocalDataIngester(ingester.DataIngester):
    """Data ingestion implementation to use when running locally."""

    def __init__(self, flags):
        """Initializes a `LocalDataIngester` from `flags`.

        Args:
          flags: An argparse.Namespace containing TensorBoard CLI flags.

        Returns:
          The new `LocalDataIngester`.
        """
        tensor_size_guidance = dict(DEFAULT_TENSOR_SIZE_GUIDANCE)
        tensor_size_guidance.update(flags.samples_per_plugin)
        self._multiplexer = plugin_event_multiplexer.EventMultiplexer(
            size_guidance=DEFAULT_SIZE_GUIDANCE,
            tensor_size_guidance=tensor_size_guidance,
            purge_orphaned_data=flags.purge_orphaned_data,
            max_reload_threads=flags.max_reload_threads,
            event_file_active_filter=_get_event_file_active_filter(flags),
        )
        self._data_provider = data_provider.MultiplexerDataProvider(
            self._multiplexer, flags.logdir or flags.logdir_spec
        )
        self._reload_interval = flags.reload_interval
        self._reload_task = flags.reload_task
        if flags.logdir:
            self._path_to_run = {os.path.expanduser(flags.logdir): None}
        else:
            self._path_to_run = _parse_event_files_spec(flags.logdir_spec)

    @property
    def data_provider(self):
        return self._data_provider

    @property
    def deprecated_multiplexer(self):
        return self._multiplexer

    def start(self):
        """Starts ingesting data based on the ingester flag configuration."""

        def _reload():
            while True:
                start = time.time()
                logger.info("TensorBoard reload process beginning")
                for path, name in self._path_to_run.items():
                    self._multiplexer.AddRunsFromDirectory(path, name)
                logger.info(
                    "TensorBoard reload process: Reload the whole Multiplexer"
                )
                self._multiplexer.Reload()
                duration = time.time() - start
                logger.info(
                    "TensorBoard done reloading. Load took %0.3f secs", duration
                )
                if self._reload_interval == 0:
                    # Only load the multiplexer once. Do not continuously reload.
                    break
                time.sleep(self._reload_interval)

        if self._reload_task == "process":
            logger.info("Launching reload in a child process")
            import multiprocessing

            process = multiprocessing.Process(target=_reload, name="Reloader")
            # Best-effort cleanup; on exit, the main TB parent process will attempt to
            # kill all its daemonic children.
            process.daemon = True
            process.start()
        elif self._reload_task in ("thread", "auto"):
            logger.info("Launching reload in a daemon thread")
            thread = threading.Thread(target=_reload, name="Reloader")
            # Make this a daemon thread, which won't block TB from exiting.
            thread.daemon = True
            thread.start()
        elif self._reload_task == "blocking":
            if self._reload_interval != 0:
                raise ValueError(
                    "blocking reload only allowed with load_interval=0"
                )
            _reload()
        else:
            raise ValueError("unrecognized reload_task: %s" % self._reload_task)


def _get_event_file_active_filter(flags):
    """Returns a predicate for whether an event file load timestamp is active.

    Returns:
      A predicate function accepting a single UNIX timestamp float argument, or
      None if multi-file loading is not enabled.
    """
    if not flags.reload_multifile:
        return None
    inactive_secs = flags.reload_multifile_inactive_secs
    if inactive_secs == 0:
        return None
    if inactive_secs < 0:
        return lambda timestamp: True
    return lambda timestamp: timestamp + inactive_secs >= time.time()


def _parse_event_files_spec(logdir_spec):
    """Parses `logdir_spec` into a map from paths to run group names.

    The `--logdir_spec` flag format is a comma-separated list of path
    specifications. A path spec looks like 'group_name:/path/to/directory' or
    '/path/to/directory'; in the latter case, the group is unnamed. Group names
    cannot start with a forward slash: /foo:bar/baz will be interpreted as a spec
    with no name and path '/foo:bar/baz'.

    Globs are not supported.

    Args:
      logdir: A comma-separated list of run specifications.
    Returns:
      A dict mapping directory paths to names like {'/path/to/directory': 'name'}.
      Groups without an explicit name are named after their path. If logdir is
      None, returns an empty dict, which is helpful for testing things that don't
      require any valid runs.
    """
    files = {}
    if logdir_spec is None:
        return files
    # Make sure keeping consistent with ParseURI in core/lib/io/path.cc
    uri_pattern = re.compile("[a-zA-Z][0-9a-zA-Z.]*://.*")
    for specification in logdir_spec.split(","):
        # Check if the spec contains group. A spec start with xyz:// is regarded as
        # URI path spec instead of group spec. If the spec looks like /foo:bar/baz,
        # then we assume it's a path with a colon. If the spec looks like
        # [a-zA-z]:\foo then we assume its a Windows path and not a single letter
        # group
        if (
            uri_pattern.match(specification) is None
            and ":" in specification
            and specification[0] != "/"
            and not os.path.splitdrive(specification)[0]
        ):
            # We split at most once so run_name:/path:with/a/colon will work.
            run_name, _, path = specification.partition(":")
        else:
            run_name = None
            path = specification
        if uri_pattern.match(path) is None:
            path = os.path.realpath(os.path.expanduser(path))
        files[path] = run_name
    return files
