# -*- coding: utf-8 -*-
#
# Copyright © Spyder Project Contributors
# Licensed under the terms of the MIT License
# (see spyder/__init__.py for details)

"""
Widget that handles communications between a console in debugging
mode and Spyder
"""

# Standard library imports
import atexit
import pdb
import re

# Third-party imports
from IPython.core.history import HistoryManager
from IPython.core.inputtransformer2 import TransformerManager
from IPython.lib.lexers import IPythonLexer, IPython3Lexer
from pygments.lexer import bygroups
from pygments.token import Keyword, Operator, Text
from pygments.util import ClassNotFound
from qtconsole.rich_jupyter_widget import RichJupyterWidget
from qtpy.QtCore import QEvent
from qtpy.QtGui import QTextCursor

# Local imports
from spyder.api.config.mixins import SpyderConfigurationAccessor
from spyder.config.base import get_conf_path


class SpyderIPy3Lexer(IPython3Lexer):
    # Detect !cmd command and highlight them
    tokens = IPython3Lexer.tokens
    tokens['root'].insert(
        0, (r'(!)(\w+)(.*\n)', bygroups(Operator, Keyword, Text)))


class SpyderIPy2Lexer(IPythonLexer):
    tokens = IPython3Lexer.tokens
    tokens['root'].insert(
        0, (r'(!)(\w+)(.*\n)', bygroups(Operator, Keyword, Text)))


class PdbHistory(HistoryManager):

    def _get_hist_file_name(self, profile=None):
        """
        Get default pdb history file name.

        The profile parameter is ignored, but must exist for compatibility with
        the parent class.
        """
        return get_conf_path('pdb_history.sqlite')


class DebuggingHistoryWidget(RichJupyterWidget):
    """
    Widget with the necessary attributes and methods to override the pdb
    history mechanism while debugging.
    """
    PDB_HIST_MAX = 400

    def __init__(self, *args, **kwargs):
        # History
        self._pdb_history_input_number = 0  # Input number for current session
        self._saved_pdb_history_input_number = []  # for recursive debugging

        # Catch any exception that prevents to create or access the history
        # file to avoid errors.
        # Fixes spyder-ide/spyder#18531
        try:
            self._pdb_history_file = PdbHistory()
            self._pdb_history = [
                line[-1] for line in self._pdb_history_file.get_tail(
                    self.PDB_HIST_MAX, include_latest=True)
            ]
        except Exception:
            self._pdb_history_file = None
            self._pdb_history = []

        self._pdb_history_edits = {}  # Temporary history edits
        self._pdb_history_index = len(self._pdb_history)

        # super init
        super(DebuggingHistoryWidget, self).__init__(*args, **kwargs)

    # --- Public API --------------------------------------------------
    def new_history_session(self):
        """Start a new history session."""
        self._pdb_history_input_number = 0
        if self._pdb_history_file is not None:
            self._pdb_history_file.new_session()

    def end_history_session(self):
        """End an history session."""
        self._pdb_history_input_number = 0
        if self._pdb_history_file is not None:
            self._pdb_history_file.end_session()

    def add_to_pdb_history(self, line):
        """Add command to history"""
        self._pdb_history_input_number += 1
        line_num = self._pdb_history_input_number
        self._pdb_history_index = len(self._pdb_history)
        self._pdb_history_edits = {}
        line = line.rstrip()
        if not line:
            return

        # If repeated line
        history = self._pdb_history
        if len(history) > 0 and history[-1] == line:
            return

        cmd = line.split(" ")[0]
        args = line.split(" ")[1:]
        is_pdb_cmd = (
            cmd.strip() and cmd[0] != '!' and "do_" + cmd in dir(pdb.Pdb)
        )
        if self.is_pdb_using_exclamantion_mark():
            is_pdb_cmd = is_pdb_cmd or (
                cmd.strip() and cmd[0] == '!'
                and "do_" + cmd[1:] in dir(pdb.Pdb))

        if cmd and (not is_pdb_cmd or len(args) > 0):
            self._pdb_history.append(line)
            self._pdb_history_index = len(self._pdb_history)
            if self._pdb_history_file is not None:
                self._pdb_history_file.store_inputs(line_num, line)

    # --- Private API (overrode by us) --------------------------------
    @property
    def _history(self):
        """Get history."""
        if self.is_debugging():
            return self._pdb_history
        else:
            return self.__history

    @_history.setter
    def _history(self, history):
        """Set history."""
        if self.is_debugging():
            self._pdb_history = history
        else:
            self.__history = history

    @property
    def _history_edits(self):
        """Get edited history."""
        if self.is_debugging():
            return self._pdb_history_edits
        else:
            return self.__history_edits

    @_history_edits.setter
    def _history_edits(self, history_edits):
        """Set edited history."""
        if self.is_debugging():
            self._pdb_history_edits = history_edits
        else:
            self.__history_edits = history_edits

    @property
    def _history_index(self):
        """Get history index."""
        if self.is_debugging():
            return self._pdb_history_index
        else:
            return self.__history_index

    @_history_index.setter
    def _history_index(self, history_index):
        """Set history index."""
        if self.is_debugging():
            self._pdb_history_index = history_index
        else:
            self.__history_index = history_index


class DebuggingWidget(DebuggingHistoryWidget, SpyderConfigurationAccessor):
    """
    Widget with the necessary attributes and methods to handle
    communications between a console in debugging mode and
    Spyder
    """

    CONF_SECTION = 'ipython_console'

    def __init__(self, *args, **kwargs):
        # Communication state
        self._pdb_in_loop = 0  # NUmber of debbuging loop we are in
        self._pdb_input_ready = False  # Can we send a command now
        self._waiting_pdb_input = False  # Are we waiting on the user
        # Other state
        self._pdb_prompt = (None, None)  # prompt, password
        self._pdb_last_cmd = ''  # last command sent to pdb
        self._pdb_frame_loc = (None, None)  # fname, lineno
        self._pdb_focus_to_editor = False  # Focus to editor after command
                                          # execution
        # Command queue
        self._pdb_input_queue = []  # List of (code, hidden, echo_stack_entry)
        # Temporary flags
        self._tmp_reading = False
        # super init
        super(DebuggingWidget, self).__init__(*args, **kwargs)

        # Adapted from qtconsole/frontend_widget.py
        # This adds the IPdb as a prompt self._highlighter recognises
        self._highlighter._ipy_prompt_re = re.compile(
            r'^({})?('.format(re.escape(self.other_output_prefix)) +
            r'[ \t]*\(*IPdb \[\d+\]\)*: |' +
            r'[ \t]*In \[\d+\]: |[ \t]*\ \ \ \.\.\.+: )')

    # --- Public API --------------------------------------------------

    def shutdown(self):
        """
        Close the save thread and database file.
        """
        if self._pdb_history_file is not None:
            try:
                self._pdb_history_file.save_thread.stop()
                # Now that it was called, no need to call it at exit
                atexit.unregister(self._pdb_history_file.save_thread.stop)
            except AttributeError:
                pass

            try:
                self._pdb_history_file.db.close()
            except AttributeError:
                pass

    # --- Comm API --------------------------------------------------

    def set_debug_state(self, is_debugging):
        """Update the debug state."""
        if is_debugging:
            # Start debugging
            if self._pdb_in_loop > 0:
                # Recursive debugging
                self._saved_pdb_history_input_number.append(
                    self._pdb_history_input_number)
                self.end_history_session()
            self.new_history_session()
            self._pdb_in_loop += 1
        elif self._pdb_in_loop > 0:
            # Stop debugging
            self._pdb_in_loop -= 1
            self.end_history_session()
            if self._pdb_in_loop > 0:
                # Still debugging
                self.new_history_session()
                self._pdb_history_input_number = (
                    self._saved_pdb_history_input_number.pop())

        # If debugging starts or stops, clear the input queue.
        self._pdb_input_queue = []
        self._pdb_frame_loc = (None, None)

    def _pdb_cmd_prefix(self):
        """Return the command prefix"""
        prefix = ''
        if (self.spyder_kernel_comm.is_open() and
                self.is_pdb_using_exclamantion_mark()):
            prefix = '!'
        return prefix

    def pdb_execute_command(self, command, focus_to_editor=False):
        """
        Execute a pdb command
        """
        self._pdb_focus_to_editor = focus_to_editor
        self.pdb_execute(
            self._pdb_cmd_prefix() + command, hidden=False,
            echo_stack_entry=False, add_history=False)

    def _handle_input_request(self, msg):
        """Process an input request."""
        if (not self.spyder_kernel_comm.is_open() and
                msg['content']['prompt'] == "ipdb> "):
            # Check if we can guess a path from the shell content:
            self._flush_pending_stream()
            cursor = self._get_end_cursor()
            cursor.setPosition(self._prompt_pos, QTextCursor.KeepAnchor)
            text = cursor.selection().toPlainText()
            match = re.search(r"> (.*\.py)\((\d+)\)", text)
            if match:
                fname, lineno = match.groups()
                self.sig_pdb_step.emit(fname, int(lineno))
        return super(DebuggingWidget, self)._handle_input_request(msg)

    def pdb_execute(self, line, hidden=False, echo_stack_entry=True,
                    add_history=True):
        """
        Send line to the pdb kernel if possible.

        Parameters
        ----------
        line: str
            the line to execute

        hidden: bool
            If the line should be hidden

        echo_stack_entry: bool
            If not hidden, if the stack entry should be printed

        add_history: bool
            If not hidden, wether the line should be added to history
        """
        # Send line to input if no comm
        if not self.spyder_kernel_comm.is_open():
            if not hidden:
                self._append_plain_text(line + '\n')
            self._finalize_input_request()
            self.kernel_client.input(line)
            return

        if not self.is_debugging():
            return

        if not line.strip():
            # Must get the last genuine command
            line = self._pdb_last_cmd

        if hidden:
            # Don't show stack entry if hidden
            echo_stack_entry = False
        else:
            if not self.is_waiting_pdb_input():
                # We can't execute this if we are not waiting for pdb input
                self._pdb_input_queue.append(
                    (line, hidden, echo_stack_entry, add_history))
                return

            if line.strip():
                self._pdb_last_cmd = line

            # Print the text if it is programatically added.
            if line.strip() != self.input_buffer.strip():
                self.input_buffer = line
            self._append_plain_text('\n')

            if add_history:
                # Save history to browse it later
                self.add_to_pdb_history(line)

            # Set executing to true and save the input buffer
            self._input_buffer_executing = self.input_buffer
            self._executing = True
            self._waiting_pdb_input = False

            # Disable the console
            self._tmp_reading = False
            self._finalize_input_request()
            hidden = True

            # Emit executing
            self.executing.emit(line)
            self.sig_pdb_state_changed.emit(
                False, self.get_pdb_last_step())

        if self._pdb_input_ready:
            # Print the string to the console
            self._pdb_input_ready = False
            return self.call_kernel(interrupt=True).pdb_input_reply(
                line, echo_stack_entry=echo_stack_entry)

        self._pdb_input_queue.append(
            (line, hidden, echo_stack_entry, add_history))

    def get_pdb_settings(self):
        """Get pdb settings"""
        return {
            "breakpoints": self.get_conf(
                'breakpoints', default={}, section='run'),
            "pdb_ignore_lib": self.get_conf('pdb_ignore_lib'),
            "pdb_execute_events": self.get_conf('pdb_execute_events'),
            "pdb_use_exclamation_mark": self.is_pdb_using_exclamantion_mark(),
            "pdb_stop_first_line": self.get_conf('pdb_stop_first_line'),
        }

    # --- To Sort --------------------------------------------------
    def stop_debugging(self):
        """Stop debugging."""
        if (self.spyder_kernel_comm.is_open() and
                not self.is_waiting_pdb_input()):
            self.interrupt_kernel()
        self.pdb_execute_command("exit")

    def set_spyder_breakpoints(self):
        """Set Spyder breakpoints into a debugging session"""
        self.call_kernel(interrupt=True).set_breakpoints(
            self.get_conf('breakpoints', default={}, section='run'))

    def set_pdb_ignore_lib(self, pdb_ignore_lib):
        """Set pdb_ignore_lib into a debugging session"""
        self.call_kernel(interrupt=True).set_pdb_ignore_lib(
            pdb_ignore_lib)

    def set_pdb_execute_events(self, pdb_execute_events):
        """Set pdb_execute_events into a debugging session"""
        self.call_kernel(interrupt=True).set_pdb_execute_events(
            pdb_execute_events)

    def set_pdb_use_exclamation_mark(self, pdb_use_exclamation_mark):
        """Set pdb_use_exclamation_mark into a debugging session"""
        self.call_kernel(interrupt=True).set_pdb_use_exclamation_mark(
            pdb_use_exclamation_mark)

    def is_pdb_using_exclamantion_mark(self):
        return self.get_conf('pdb_use_exclamation_mark')

    def do_where(self):
        """Where was called, go to the current location."""
        fname, lineno = self._pdb_frame_loc
        if fname:
            self.sig_pdb_step.emit(fname, lineno)

    def refresh_from_pdb(self, pdb_state):
        """
        Refresh Variable Explorer and Editor from a Pdb session,
        after running any pdb command.

        See publish_pdb_state and notify_spyder in spyder_kernels
        """
        if 'step' in pdb_state and 'fname' in pdb_state['step']:
            fname = pdb_state['step']['fname']
            lineno = pdb_state['step']['lineno']

            last_pdb_loc = self._pdb_frame_loc
            self._pdb_frame_loc = (fname, lineno)

            # Only step if the location changed
            if (fname, lineno) != last_pdb_loc:
                self.sig_pdb_step.emit(fname, lineno)

        if 'namespace_view' in pdb_state:
            self.set_namespace_view(pdb_state['namespace_view'])

        if 'var_properties' in pdb_state:
            self.set_var_properties(pdb_state['var_properties'])

    def set_pdb_state(self, pdb_state):
        """Set current pdb state."""
        if pdb_state is not None and isinstance(pdb_state, dict):
            self.refresh_from_pdb(pdb_state)

    def show_pdb_output(self, text):
        """Show Pdb output."""
        self._append_plain_text(self.output_sep, before_prompt=True)
        prompt = self._current_out_prompt()
        self._append_html(
            '<span class="out-prompt">%s</span>' % prompt,
            before_prompt=True
        )
        # If the repr is multiline, make sure we start on a new line,
        # so that its lines are aligned.
        if "\n" in text and not self.output_sep.endswith("\n"):
            self._append_plain_text('\n', before_prompt=True)
        self._append_plain_text(text + self.output_sep2, before_prompt=True)
        self._append_plain_text('\n', before_prompt=True)

    def get_pdb_last_step(self):
        """Get last pdb step retrieved from a Pdb session."""
        fname, lineno = self._pdb_frame_loc
        if fname is None:
            return {}
        return {'fname': fname,
                'lineno': lineno}

    def is_debugging(self):
        """Check if we are debugging."""
        return self._pdb_in_loop > 0

    def is_waiting_pdb_input(self):
        """Check if we are waiting a pdb input."""
        # If the comm is not open, self._pdb_in_loop can not be set
        return self.is_debugging() and self._waiting_pdb_input

    # ---- Public API (overrode by us) ----------------------------
    def reset(self, clear=False):
        """
        Resets the widget to its initial state if ``clear`` parameter
        is True
        """
        super(DebuggingWidget, self).reset(clear)
        # Make sure the prompt is printed
        if clear and self.is_waiting_pdb_input():
            prompt, password = self._pdb_prompt

            try:
                # This is necessary to avoid an error when the iopub channel is
                # closed.
                # See jupyter/qtconsole#574
                if not self.kernel_client.iopub_channel.closed():
                    self.kernel_client.iopub_channel.flush()
            except AttributeError:
                self.kernel_client.iopub_channel.flush()

            self._reading = False
            self._readline(prompt=prompt, callback=self._pdb_readline_callback,
                           password=password)

    # --- Private API --------------------------------------------------
    def _current_prompt(self):
        prompt = "IPdb [{}]".format(self._pdb_history_input_number + 1)
        for i in range(self._pdb_in_loop - 1):
            # Add recursive debugger prompt
            prompt = "({})".format(prompt)
        return prompt + ": "

    def _current_out_prompt(self):
        """Get current out prompt."""
        prompt = "Out\u00A0\u00A0[{}]".format(self._pdb_history_input_number)
        for i in range(self._pdb_in_loop - 1):
            # Add recursive debugger prompt
            prompt = "({})".format(prompt)
        return prompt + ": "

    def _handle_kernel_info_reply(self, rep):
        """Handle kernel info replies."""
        super(DebuggingWidget, self)._handle_kernel_info_reply(rep)
        pygments_lexer = rep['content']['language_info'].get(
            'pygments_lexer', '')
        try:
            # add custom lexer
            if pygments_lexer == 'ipython3':
                lexer = SpyderIPy3Lexer()
            elif pygments_lexer == 'ipython2':
                lexer = SpyderIPy2Lexer()
            else:
                return
            self._highlighter._lexer = lexer
        except ClassNotFound:
            pass

    def _redefine_complete_for_dbg(self, client):
        """Redefine kernel client's complete method to work while debugging."""

        original_complete = client.complete

        def complete(code, cursor_pos=None):
            if self.is_waiting_pdb_input() and client.comm_channel:
                shell_channel = client.shell_channel
                client._shell_channel = client.comm_channel
                try:
                    return original_complete(code, cursor_pos)
                finally:
                    client._shell_channel = shell_channel
            else:
                return original_complete(code, cursor_pos)

        client.complete = complete

    def _update_pdb_prompt(self, prompt, password=None):
        """Update the prompt that is recognised as a pdb prompt."""
        if prompt == self._pdb_prompt[0]:
            # Nothing to do
            return

        if password is None:
            password = self._pdb_prompt[1]
        self._pdb_prompt = (prompt, password)

        # Update continuation prompt to reflect (possibly) new prompt length.
        self._set_continuation_prompt(
            self._make_continuation_prompt(prompt), html=True)

    def _is_pdb_complete(self, source):
        """
        Check if the pdb input is ready to be executed.
        """
        if source and source[0] == '!':
            source = source[1:]
        tm = TransformerManager()
        complete, indent = tm.check_complete(source)
        if indent is not None:
            indent = indent * ' '
        return complete != 'incomplete', indent

    def execute(self, source=None, hidden=False, interactive=False):
        """
        Executes source or the input buffer, possibly prompting for more
        input.

        Do not use to run pdb commands (such as `continue`).
        Use pdb_execute instead. This will add a '!' in front of the code.
        """
        if self.is_waiting_pdb_input():
            if source is None:
                if hidden:
                    # Nothing to execute
                    return
                else:
                    source = self.input_buffer
            else:
                if not self.is_pdb_using_exclamantion_mark():
                    source = '!' + source
                if not hidden:
                    self.input_buffer = source

            if interactive:
                # Add a continuation prompt if not complete
                complete, indent = self._is_pdb_complete(source)
                if not complete:
                    self.do_execute(source, complete, indent)
                    return
            if hidden:
                self.pdb_execute(source, hidden)
            else:
                if self._reading_callback:
                    self._reading_callback()
            return

        return super(DebuggingWidget, self).execute(
            source, hidden, interactive)

    def _pdb_readline_callback(self, line):
        """Callback used when the user inputs text in pdb."""
        self.pdb_execute(line)

    def pdb_input(self, prompt, password=None):
        """Get input for a command."""

        # Replace with numbered prompt
        prompt = self._current_prompt()
        self._update_pdb_prompt(prompt, password)

        # The prompt should be printed unless:
        # 1. The prompt is already printed (self._reading is True)
        # 2. A hidden command is in the queue
        print_prompt = (not self._reading
                        and (len(self._pdb_input_queue) == 0
                             or not self._pdb_input_queue[0][1]))

        if print_prompt:
            # Make sure that all output from the SUB channel has been processed
            # before writing a new prompt.
            try:
                # This is necessary to avoid an error when the iopub channel is
                # closed.
                # See jupyter/qtconsole#574
                if not self.kernel_client.iopub_channel.closed():
                    self.kernel_client.iopub_channel.flush()
            except AttributeError:
                self.kernel_client.iopub_channel.flush()

            self._waiting_pdb_input = True
            self._readline(prompt=prompt, callback=self._pdb_readline_callback,
                           password=password)
            self._executing = False
            self._highlighter.highlighting_on = True
            # The previous code finished executing
            self.executed.emit(self._pdb_prompt)
            self.sig_pdb_prompt_ready.emit()
            self.sig_pdb_state_changed.emit(True, self.get_pdb_last_step())

        self._pdb_input_ready = True

        start_line = self.get_conf('startup/pdb_run_lines', default='')
        # Only run these lines when printing a new prompt
        if start_line and print_prompt and self.is_waiting_pdb_input():
            # Send a few commands
            self.pdb_execute(start_line, hidden=True)
            return

        # While the widget thinks only one input is going on,
        # other functions can be sending messages to the kernel.
        # This must be properly processed to avoid dropping messages.
        # If the kernel was not ready, the messages are queued.
        if len(self._pdb_input_queue) > 0:
            args = self._pdb_input_queue.pop(0)
            self.pdb_execute(*args)
            return

    # --- Private API (overrode by us) ----------------------------------------
    def _show_prompt(self, prompt=None, html=False, newline=True,
                     separator=True):
        """
        Writes a new prompt at the end of the buffer.
        """
        if prompt == self._pdb_prompt[0]:
            html = True
            prompt = '<span class="in-prompt">%s</span>' % prompt
        super(DebuggingWidget, self)._show_prompt(prompt, html, newline,
                                                  separator)

    def _event_filter_console_keypress(self, event):
        """Handle Key_Up/Key_Down while debugging."""
        if self.is_waiting_pdb_input():
            self._control.current_prompt_pos = self._prompt_pos
            # Pretend this is a regular prompt
            self._tmp_reading = self._reading
            self._reading = False
            try:
                ret = super(DebuggingWidget,
                            self)._event_filter_console_keypress(event)
                return ret
            finally:
                self._reading = self._tmp_reading
        else:
            return super(DebuggingWidget,
                         self)._event_filter_console_keypress(event)

    def _register_is_complete_callback(self, source, callback):
        """Call the callback with the result of is_complete."""
        # Add a continuation prompt if not complete
        if self.is_waiting_pdb_input():
            # As the work is done on this side, check synchronously.
            complete, indent = self._is_pdb_complete(source)
            callback(complete, indent)
        else:
            return super(DebuggingWidget, self)._register_is_complete_callback(
                source, callback)

    # ---- Qt methods ---------------------------------------------------------
    def eventFilter(self, obj, event):
        # When using PySide, it can happen that "event" is of type QWidgetItem
        # (reason unknown). This causes an exception in eventFilter() in
        # console_widget.py in the QtConsole package: Therein event.type() is
        # accessed which fails due to an AttributeError. Catch this here and
        # ignore the event.
        if not isinstance(event, QEvent):
            # Note for debugging: event.layout() or event.widget() SEGFAULTs
            return True

        return super().eventFilter(obj, event)
