"""The main Signal class and SignalInstance class.

A note on the "reemission" parameter in Signal and SignalInstances.  This controls the
behavior of the signal when a callback emits the signal.

Since it can be a little confusing, take the following example of a Signal that emits an
integer.  We'll connect three callbacks to it, two of which re-emit the same signal with
a different value:

```python
from psygnal import SignalInstance

# a signal that emits an integer
sig = SignalInstance((int,), reemission="...")


def cb1(value: int) -> None:
    print(f"calling cb1 with: {value}")
    if value == 1:
        # cb1 ALSO triggers an emission of the value 2
        sig.emit(2)


def cb2(value: int) -> None:
    print(f"calling cb2 with: {value}")
    if value == 2:
        # cb2 ALSO triggers an emission of the value 3
        sig.emit(3)


def cb3(value: int) -> None:
    print(f"calling cb3 with: {value}")


sig.connect(cb1)
sig.connect(cb2)
sig.connect(cb3)
sig.emit(1)
```

with `reemission="queued"` above: you see a breadth-first pattern:
ALL callbacks are called with the first emitted value, before ANY of them are called
with the second emitted value (emitted by the first connected callback  cb1)

```
calling cb1 with: 1
calling cb2 with: 1
calling cb3 with: 1
calling cb1 with: 2
calling cb2 with: 2
calling cb3 with: 2
calling cb1 with: 3
calling cb2 with: 3
calling cb3 with: 3
```

with `reemission='immediate'` signals emitted by callbacks are immediately processed by
all callbacks in a deeper level, before returning back to the original loop level to
call the remaining callbacks with the original value.

```
calling cb1 with: 1
calling cb1 with: 2
calling cb2 with: 2
calling cb1 with: 3
calling cb2 with: 3
calling cb3 with: 3
calling cb3 with: 2
calling cb2 with: 1
calling cb3 with: 1
```

with `reemission='latest'`, just as with 'immediate', signals emitted by callbacks are
immediately processed by all callbacks in a deeper level.  But in this case, the
remaining callbacks in the current level are never called with the original value.

```
calling cb1 with: 1
calling cb1 with: 2
calling cb2 with: 2
calling cb1 with: 3
calling cb2 with: 3
calling cb3 with: 3
# cb2 is never called with 1
# cb3 is never called with 1 or 2
```

The real-world scenario in which this usually arises is an EventedModel or dataclass.
Evented models emit signals on `setattr`:


```python
class MyModel(EventedModel):
    x: int = 1


m = MyModel(x=1)
print("starting value", m.x)


@m.events.x.connect
def ensure_at_least_20(val: int):
    print("trying to set to", val)
    m.x = max(val, 20)


m.x = 5
print("ending value", m.x)
```

```
starting value 1
trying to set to 5
trying to set to 20
ending value 20
```

With EventedModel.__setattr__, you can easily end up with some complicated recursive
behavior if you connect an on-change callback that also sets the value of the model. In
this case `reemission='latest'` is probably the most appropriate, as it will prevent
the callback from being called with the original (now-stale) value.  But one can
conceive of other scenarios where `reemission='immediate'` or `reemission='queued'`
might be more appropriate.  Qt's default behavior, for example, is similar to
`immediate`, but can also be configured to be like `queued` by changing the
connection type (in that case, depending on threading).
"""

from __future__ import annotations

import inspect
import sys
import threading
import warnings
import weakref
from collections import deque
from contextlib import contextmanager, suppress
from functools import lru_cache, partial, reduce
from inspect import Parameter, Signature, isclass
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    ClassVar,
    ContextManager,
    Final,
    Iterable,
    Iterator,
    Literal,
    NoReturn,
    Type,
    TypeVar,
    Union,
    cast,
    get_args,
    get_origin,
    get_type_hints,
    overload,
)

from ._exceptions import EmitLoopError
from ._mypyc import mypyc_attr
from ._queue import QueuedCallback
from ._weak_callback import (
    StrongFunction,
    WeakCallback,
    WeakSetattr,
    WeakSetitem,
    weak_callback,
)

if TYPE_CHECKING:
    from ._group import EmissionInfo
    from ._weak_callback import RefErrorChoice

    # single function that does all the work of reducing an iterable of args
    # to a single args
    ReducerOneArg = Callable[[Iterable[tuple]], tuple]
    # function that takes two args tuples. it will be passed to itertools.reduce
    ReducerTwoArgs = Callable[[tuple, tuple], tuple]
    ReducerFunc = Union[ReducerOneArg, ReducerTwoArgs]


__all__ = ["Signal", "SignalInstance", "_compiled"]

_NULL = object()
F = TypeVar("F", bound=Callable)
RECURSION_LIMIT = sys.getrecursionlimit()

ReemissionVal = Literal["immediate", "queued", "latest-only"]
VALID_REEMISSION = set(ReemissionVal.__args__)  # type: ignore
DEFAULT_REEMISSION: ReemissionVal = "immediate"


# using basic class instead of enum for easier mypyc compatibility
# this isn't exposed publicly anyway.
class ReemissionMode:
    """Enumeration of reemission strategies."""

    IMMEDIATE: Final = "immediate"
    QUEUED: Final = "queued"
    LATEST: Final = "latest-only"

    @staticmethod
    def validate(value: str) -> str:
        value = str(value).lower()
        if value not in ReemissionMode._members():
            raise ValueError(
                f"Invalid reemission value. Must be one of "
                f"{', '.join(ReemissionMode._members())}. Not {value!r}"
            )
        return value

    @staticmethod
    def _members() -> set[str]:
        return VALID_REEMISSION


class Signal:
    """Declares a signal emitter on a class.

    This is class implements the [descriptor
    protocol](https://docs.python.org/3/howto/descriptor.html#descriptorhowto)
    and is designed to be used as a class attribute, with the supported signature types
    provided in the constructor:

    ```python
    from psygnal import Signal


    class MyEmitter:
        changed = Signal(int)


    def receiver(arg: int):
        print("new value:", arg)


    emitter = MyEmitter()
    emitter.changed.connect(receiver)
    emitter.changed.emit(1)  # prints 'new value: 1'
    ```

    !!! note

        in the example above, `MyEmitter.changed` is an instance of `Signal`,
        and `emitter.changed` is an instance of `SignalInstance`.  See the
        documentation on [`SignalInstance`][psygnal.SignalInstance] for details
        on how to connect to and/or emit a signal on an instance of an object
        that has a `Signal`.


    Parameters
    ----------
    *types : Type[Any] | Signature
        A sequence of individual types, or a *single* [`inspect.Signature`][] object.
    description : str
        Optional descriptive text for the signal.  (not used internally).
    name : str | None
        Optional name of the signal. If it is not specified then the name of the
        class attribute that is bound to the signal will be used. default None
    check_nargs_on_connect : bool
        Whether to check the number of positional args against `signature` when
        connecting a new callback. This can also be provided at connection time using
        `.connect(..., check_nargs=True)`. By default, `True`.
    check_types_on_connect : bool
        Whether to check the callback parameter types against `signature` when
        connecting a new callback. This can also be provided at connection time using
        `.connect(..., check_types=True)`. By default, `False`.
    reemission : Literal["immediate", "queued", "latest-only"] | None
        Determines the order and manner in which connected callbacks are invoked when a
        callback re-emits a signal. Default is `"immediate"`.

        * `"immediate"`: Signals emitted by callbacks are immediately processed in a
            deeper emission loop, before returning to process signals emitted at the
            current level (after all callbacks in the deeper level have been called).

        * `"queued"`: Signals emitted by callbacks are enqueued for emission after the
            current level of emission is complete. This ensures *all* connected
            callbacks are called with the first emitted value, before *any* of them are
            called with values emitted while calling callbacks.

        * `"latest-only"`: Signals emitted by callbacks are immediately processed in a
            deeper emission loop, and remaining callbacks in the current level are never
            called with the original value.
    """

    # _signature: Signature  # callback signature for this signal

    _current_emitter: ClassVar[SignalInstance | None] = None

    def __init__(
        self,
        *types: type[Any] | Signature,
        description: str = "",
        name: str | None = None,
        check_nargs_on_connect: bool = True,
        check_types_on_connect: bool = False,
        reemission: ReemissionVal = DEFAULT_REEMISSION,
    ) -> None:
        self._name = name
        self.description = description
        self._check_nargs_on_connect = check_nargs_on_connect
        self._check_types_on_connect = check_types_on_connect
        self._reemission = reemission
        self._signal_instance_class: type[SignalInstance] = SignalInstance
        self._signal_instance_cache: dict[int, SignalInstance] = {}

        if types and isinstance(types[0], Signature):
            self._signature = types[0]
            if len(types) > 1:
                warnings.warn(
                    "Only a single argument is accepted when directly providing a"
                    f" `Signature`.  These args were ignored: {types[1:]}",
                    stacklevel=2,
                )
        else:
            self._signature = _build_signature(*cast("tuple[Type[Any], ...]", types))

    @property
    def signature(self) -> Signature:
        """[Signature][inspect.Signature] supported by this Signal."""
        return self._signature

    def __set_name__(self, owner: type[Any], name: str) -> None:
        """Set name of signal when declared as a class attribute on `owner`."""
        if self._name is None:
            self._name = name

    @overload
    def __get__(self, instance: None, owner: type[Any] | None = None) -> Signal: ...

    @overload
    def __get__(
        self, instance: Any, owner: type[Any] | None = None
    ) -> SignalInstance: ...

    def __get__(
        self, instance: Any, owner: type[Any] | None = None
    ) -> Signal | SignalInstance:
        """Get signal instance.

        This is called when accessing a Signal instance.  If accessed as an
        attribute on the class `owner`, instance, will be `None`.  Otherwise,
        if `instance` is not None, we're being accessed on an instance of `owner`.

            class Emitter:
                signal = Signal()

            e = Emitter()

            E.signal  # instance will be None, owner will be Emitter
            e.signal  # instance will be e, owner will be Emitter

        Returns
        -------
        Signal or SignalInstance
            Depending on how this attribute is accessed.
        """
        if instance is None:
            return self
        if id(instance) in self._signal_instance_cache:
            return self._signal_instance_cache[id(instance)]
        signal_instance = self._create_signal_instance(instance)

        # cache this signal instance so that future access returns the same instance.
        try:
            # first, try to assign it to instance.name ... this essentially breaks the
            # descriptor, (i.e. __get__ will never again be called for this instance)
            # (note, this is the same mechanism used in the `cached_property` decorator)
            setattr(instance, cast("str", self._name), signal_instance)
        except AttributeError:
            # if that fails, which may happen in slotted classes, then we fall back to
            # our internal cache
            self._cache_signal_instance(instance, signal_instance)

        return signal_instance

    def _cache_signal_instance(
        self, instance: Any, signal_instance: SignalInstance
    ) -> None:
        """Cache a signal instance on the instance."""
        # fallback signal instance cache as last resort.  We use the object id
        # instead a WeakKeyDictionary because we can't guarantee that the instance
        # is hashable or weak-referenceable.  and we use a finalize to remove the
        # cache when the instance is destroyed (if the object is weak-referenceable).
        obj_id = id(instance)
        self._signal_instance_cache[obj_id] = signal_instance
        with suppress(TypeError):
            weakref.finalize(instance, self._signal_instance_cache.pop, obj_id, None)

    def _create_signal_instance(
        self, instance: Any, name: str | None = None
    ) -> SignalInstance:
        return self._signal_instance_class(
            self.signature,
            instance=instance,
            name=name or self._name,
            check_nargs_on_connect=self._check_nargs_on_connect,
            check_types_on_connect=self._check_types_on_connect,
            reemission=self._reemission,
        )

    @classmethod
    @contextmanager
    def _emitting(cls, emitter: SignalInstance) -> Iterator[None]:
        """Context that sets the sender on a receiver object while emitting a signal."""
        previous, cls._current_emitter = cls._current_emitter, emitter
        try:
            yield
        finally:
            cls._current_emitter = previous

    @classmethod
    def current_emitter(cls) -> SignalInstance | None:
        """Return currently emitting `SignalInstance`, if any.

        This will typically be used in a callback.

        Examples
        --------
        ```python
        from psygnal import Signal


        def my_callback():
            source = Signal.current_emitter()
        ```
        """
        return cls._current_emitter

    @classmethod
    def sender(cls) -> Any:
        """Return currently emitting object, if any.

        This will typically be used in a callback.
        """
        return getattr(cls._current_emitter, "instance", None)


_empty_signature = Signature()


@mypyc_attr(allow_interpreted_subclasses=True)
class SignalInstance:
    """A signal instance (optionally) bound to an object.

    In most cases, users will not create a `SignalInstance` directly -- instead
    creating a [Signal][psygnal.Signal] class attribute.  This object will be
    instantiated by the `Signal.__get__` method (i.e. the descriptor protocol),
    when a `Signal` instance is accessed from an *instance* of a class with `Signal`
    attribute.

    However, it is the `SignalInstance` that you will most often be interacting
    with when you access the name of a `Signal` on an instance -- so understanding
    the `SignalInstance` API is key to using psygnal.

    ```python
    class Emitter:
        signal = Signal()


    e = Emitter()

    # when accessed on an *instance* of Emitter,
    # the signal attribute will be a SignalInstance
    e.signal

    # This is what you will use to connect your callbacks
    e.signal.connect(some_callback)
    ```

    Parameters
    ----------
    signature : Signature | None
        The signature that this signal accepts and will emit, by default `Signature()`.
    instance : Any
        An object to which this signal is bound. Normally this will be provided by the
        `Signal.__get__` method (see above).  However, an unbound `SignalInstance`
        may also be created directly. by default `None`.
    name : str | None
        An optional name for this signal.  Normally this will be provided by the
        `Signal.__get__` method. by default `None`
    check_nargs_on_connect : bool
        Whether to check the number of positional args against `signature` when
        connecting a new callback. This can also be provided at connection time using
        `.connect(..., check_nargs=True)`. By default, `True`.
    check_types_on_connect : bool
        Whether to check the callback parameter types against `signature` when
        connecting a new callback. This can also be provided at connection time using
        `.connect(..., check_types=True)`. By default, `False`.
    reemission : Literal["immediate", "queued", "latest-only"] | None
        See docstring for [`Signal`][psygnal.Signal] for details.
        By default, `"immediate"`.

    Raises
    ------
    TypeError
        If `signature` is neither an instance of `inspect.Signature`, or a `tuple`
        of types.
    """

    _is_blocked: bool = False
    _is_paused: bool = False
    _debug_hook: ClassVar[Callable[[EmissionInfo], None] | None] = None

    def __init__(
        self,
        signature: Signature | tuple = _empty_signature,
        *,
        instance: Any = None,
        name: str | None = None,
        check_nargs_on_connect: bool = True,
        check_types_on_connect: bool = False,
        reemission: ReemissionVal = DEFAULT_REEMISSION,
    ) -> None:
        if isinstance(signature, (list, tuple)):
            signature = _build_signature(*signature)
        elif not isinstance(signature, Signature):  # pragma: no cover
            raise TypeError(
                "`signature` must be either a sequence of types, or an "
                "instance of `inspect.Signature`"
            )

        self._reemission = ReemissionMode.validate(reemission)
        self._name = name
        self._instance: Callable = self._instance_ref(instance)
        self._args_queue: list[tuple] = []  # filled when paused
        self._signature = signature
        self._check_nargs_on_connect = check_nargs_on_connect
        self._check_types_on_connect = check_types_on_connect
        self._slots: list[WeakCallback] = []
        self._is_blocked: bool = False
        self._is_paused: bool = False
        self._lock = threading.RLock()
        self._emit_queue: deque[tuple] = deque()
        self._recursion_depth: int = 0
        self._max_recursion_depth: int = 0
        self._run_emit_loop_inner: Callable[[], None]
        if self._reemission == ReemissionMode.QUEUED:
            self._run_emit_loop_inner = self._run_emit_loop_queued
        elif self._reemission == ReemissionMode.LATEST:
            self._run_emit_loop_inner = self._run_emit_loop_latest_only
        else:
            self._run_emit_loop_inner = self._run_emit_loop_immediate

        # whether any slots in self._slots have a priority other than 0
        self._priority_in_use = False

    @staticmethod
    def _instance_ref(instance: Any) -> Callable[[], Any]:
        if instance is None:
            return lambda: None

        try:
            return weakref.ref(instance)
        except TypeError:
            # fall back to strong reference if instance is not weak-referenceable
            return lambda: instance

    @property
    def signature(self) -> Signature:
        """Signature supported by this `SignalInstance`."""
        return self._signature

    @property
    def instance(self) -> Any:
        """Object that emits this `SignalInstance`."""
        return self._instance()

    @property
    def name(self) -> str:
        """Name of this `SignalInstance`."""
        return self._name or ""

    def __repr__(self) -> str:
        """Return repr."""
        name = f" {self._name!r}" if self._name else ""
        instance = f" on {self.instance!r}" if self.instance is not None else ""
        return f"<{type(self).__name__}{name}{instance}>"

    @overload
    def connect(
        self,
        *,
        thread: threading.Thread | Literal["main", "current"] | None = ...,
        check_nargs: bool | None = ...,
        check_types: bool | None = ...,
        unique: bool | str = ...,
        max_args: int | None = None,
        on_ref_error: RefErrorChoice = ...,
        priority: int = ...,
    ) -> Callable[[F], F]: ...

    @overload
    def connect(
        self,
        slot: F,
        *,
        thread: threading.Thread | Literal["main", "current"] | None = ...,
        check_nargs: bool | None = ...,
        check_types: bool | None = ...,
        unique: bool | str = ...,
        max_args: int | None = None,
        on_ref_error: RefErrorChoice = ...,
        priority: int = ...,
    ) -> F: ...

    def connect(
        self,
        slot: F | None = None,
        *,
        thread: threading.Thread | Literal["main", "current"] | None = None,
        check_nargs: bool | None = None,
        check_types: bool | None = None,
        unique: bool | str = False,
        max_args: int | None = None,
        on_ref_error: RefErrorChoice = "warn",
        priority: int = 0,
    ) -> Callable[[F], F] | F:
        """Connect a callback (`slot`) to this signal.

        `slot` is compatible if:

        * it requires no more than the number of positional arguments emitted by this
          `SignalInstance`.  (It *may* require less)
        * it has no *required* keyword arguments (keyword only arguments that have
          no default).
        * if `check_types` is `True`, the parameter types in the callback signature must
          match the signature of this `SignalInstance`.

        This method may be used as a decorator.

        ```python
        @signal.connect
        def my_function(): ...
        ```

        !!!important
            If a signal is connected with `thread != None`, then it is up to the user
            to ensure that `psygnal.emit_queued` is called, or that one of the backend
            convenience functions is used (e.g. `psygnal.qt.start_emitting_from_queue`).
            Otherwise, callbacks that are connected to signals that are emitted from
            another thread will never be called.

        Parameters
        ----------
        slot : Callable
            A callable to connect to this signal.  If the callable accepts less
            arguments than the signature of this slot, then they will be discarded when
            calling the slot.
        check_nargs : Optional[bool]
            If `True` and the provided `slot` requires more positional arguments than
            the signature of this Signal, raise `TypeError`. by default `True`.
        thread: Thread | Literal["main", "current"] | None
            If `None` (the default), this slot will be invoked immediately when a signal
            is emitted, from whatever thread emitted the signal. If a thread object is
            provided, then the callback will only be immediately invoked if the signal
            is emitted from that thread.  Otherwise, the callback will be added to a
            queue. **Note!**, when using the `thread` parameter, the user is responsible
            for calling `psygnal.emit_queued()` in the corresponding thread, otherwise
            the slot will never be invoked. (See note above). (The strings `"main"` and
            `"current"` are also accepted, and will be interpreted as the
            `threading.main_thread()` and `threading.current_thread()`, respectively).
        check_types : Optional[bool]
            If `True`, An additional check will be performed to make sure that types
            declared in the slot signature are compatible with the signature
            declared by this signal, by default `False`.
        unique : Union[bool, str, None]
            If `True`, returns without connecting if the slot has already been
            connected.  If the literal string "raise" is passed to `unique`, then a
            `ValueError` will be raised if the slot is already connected.
            By default `False`.
        max_args : Optional[int]
            If provided, `slot` will be called with no more more than `max_args` when
            this SignalInstance is emitted.  (regardless of how many arguments are
            emitted).
        on_ref_error : {'raise', 'warn', 'ignore'}, optional
            What to do if a weak reference cannot be created.  If 'raise', a
            ReferenceError will be raised.  If 'warn' (default), a warning will be
            issued and a strong-reference will be used. If 'ignore' a strong-reference
            will be used (silently).
        priority : int
            The priority of the callback. This is used to determine the order in which
            callbacks are called when multiple are connected to the same signal.
            Higher priority callbacks are called first. Negative values are allowed.
            The default is 0.

        Raises
        ------
        TypeError
            If a non-callable object is provided.
        ValueError
            If the provided slot fails validation, either due to mismatched positional
            argument requirements, or failed type checking.
        ValueError
            If `unique` is `True` and `slot` has already been connected.
        """
        if check_nargs is None:
            check_nargs = self._check_nargs_on_connect
        if check_types is None:
            check_types = self._check_types_on_connect

        def _wrapper(
            slot: F,
            max_args: int | None = max_args,
            _on_ref_err: RefErrorChoice = on_ref_error,
        ) -> F:
            if not callable(slot):
                raise TypeError(f"Cannot connect to non-callable object: {slot}")

            with self._lock:
                if unique and slot in self:
                    if unique == "raise":
                        raise ValueError(
                            "Slot already connect. Use `connect(..., unique=False)` "
                            "to allow duplicate connections"
                        )
                    return slot

                slot_sig: Signature | None = None
                if check_nargs and (max_args is None):
                    slot_sig, max_args, isqt = self._check_nargs(slot, self.signature)
                    if isqt:
                        _on_ref_err = "ignore"
                if check_types:
                    slot_sig = slot_sig or signature(slot)
                    if not _parameter_types_match(slot, self.signature, slot_sig):
                        extra = f"- Slot types {slot_sig} do not match types in signal."
                        self._raise_connection_error(slot, extra)

                cb = weak_callback(
                    slot,
                    max_args=max_args,
                    finalize=self._try_discard,
                    on_ref_error=_on_ref_err,
                    priority=priority,
                )
                if thread is not None:
                    cb = QueuedCallback(cb, thread=thread)
                self._append_slot(cb)
            return slot

        return _wrapper if slot is None else _wrapper(slot)

    def _append_slot(self, slot: WeakCallback) -> None:
        """Append a slot to the list of slots.

        Implementing this as a method allows us to override/extend it in subclasses.
        """
        # if no previously connected slots have a priority, and this slot also
        # has no priority, we can just (quickly) append it to the end of the list.
        if not self._priority_in_use:
            if not slot.priority:
                self._slots.append(slot)
                return
            # remember that we have a priority in use, so we skip this check
            self._priority_in_use = True

        # otherwise we need to (slowly) iterate over self._slots to
        # insert the slot in the correct position based on priority.
        # High priority slots are placed at the front of the list
        # low/negative priority slots are at the end of the list
        for i, s in enumerate(self._slots):
            if s.priority < slot.priority:
                self._slots.insert(i, slot)
                return
        self._slots.append(slot)

    def _remove_slot(self, slot: Literal["all"] | int | WeakCallback) -> None:
        """Remove a slot from the list of slots."""
        # implementing this as a method allows us to override/extend it in subclasses
        if slot == "all":
            self._slots.clear()
        elif isinstance(slot, int):
            self._slots.pop(slot)
        else:
            self._slots.remove(cast("WeakCallback", slot))

    def _try_discard(self, callback: WeakCallback, missing_ok: bool = True) -> None:
        """Try to discard a callback from the list of slots.

        Parameters
        ----------
        callback : WeakCallback
            A callback to discard.
        missing_ok : bool, optional
            If `True`, do not raise an error if the callback is not found in the list.
        """
        try:
            self._remove_slot(callback)
        except ValueError:
            if not missing_ok:
                raise

    def connect_setattr(
        self,
        obj: object,
        attr: str,
        maxargs: int | None | object = _NULL,
        *,
        on_ref_error: RefErrorChoice = "warn",
        priority: int = 0,
    ) -> WeakCallback[None]:
        """Bind an object attribute to the emitted value of this signal.

        Equivalent to calling `self.connect(functools.partial(setattr, obj, attr))`,
        but with additional weakref safety (i.e. a strong reference to `obj` will not
        be retained). The return object can be used to
        [`disconnect()`][psygnal.SignalInstance.disconnect], (or you can use
        [`disconnect_setattr()`][psygnal.SignalInstance.disconnect_setattr]).

        Parameters
        ----------
        obj : object
            An object.
        attr : str
            The name of an attribute on `obj` that should be set to the value of this
            signal when emitted.
        maxargs : Optional[int]
            max number of positional args to accept
        on_ref_error: {'raise', 'warn', 'ignore'}, optional
            What to do if a weak reference cannot be created.  If 'raise', a
            ReferenceError will be raised.  If 'warn' (default), a warning will be
            issued and a strong-reference will be used. If 'ignore' a strong-reference
            will be used (silently).
        priority : int
            The priority of the callback. This is used to determine the order in which
            callbacks are called when multiple are connected to the same signal.
            Higher priority callbacks are called first. Negative values are allowed.
            The default is 0.

        Returns
        -------
        Tuple
            (weakref.ref, name, callable).  Reference to the object, name of the
            attribute, and setattr closure.  Can be used to disconnect the slot.

        Raises
        ------
        ValueError
            If this is not a single-value signal
        AttributeError
            If `obj` has no attribute `attr`.

        Examples
        --------
        >>> class T:
        ...     sig = Signal(int)
        >>> class SomeObj:
        ...     x = 1
        >>> t = T()
        >>> my_obj = SomeObj()
        >>> t.sig.connect_setattr(my_obj, "x")
        >>> t.sig.emit(5)
        >>> assert my_obj.x == 5
        """
        if maxargs is _NULL:
            warnings.warn(
                "The default value of maxargs will change from `None` to `1` in "
                "version 0.11. To silence this warning, provide an explicit value for "
                "maxargs (`None` for current behavior, `1` for future behavior).",
                FutureWarning,
                stacklevel=2,
            )
            maxargs = None

        if not hasattr(obj, attr):
            raise AttributeError(f"Object {obj} has no attribute {attr!r}")

        with self._lock:
            caller = WeakSetattr(
                obj,
                attr,
                max_args=cast("int | None", maxargs),
                finalize=self._try_discard,
                on_ref_error=on_ref_error,
                priority=priority,
            )
            self._append_slot(caller)
        return caller

    def disconnect_setattr(
        self, obj: object, attr: str, missing_ok: bool = True
    ) -> None:
        """Disconnect a previously connected attribute setter.

        Parameters
        ----------
        obj : object
            An object.
        attr : str
            The name of an attribute on `obj` that was previously used for
            `connect_setattr`.
        missing_ok : bool
            If `False` and the provided `slot` is not connected, raises `ValueError`.
            by default `True`

        Raises
        ------
        ValueError
            If `missing_ok` is `True` and no attribute setter is connected.
        """
        with self._lock:
            cb = WeakSetattr(obj, attr, on_ref_error="ignore")
            self._try_discard(cb, missing_ok)

    def connect_setitem(
        self,
        obj: object,
        key: str,
        maxargs: int | None | object = _NULL,
        *,
        on_ref_error: RefErrorChoice = "warn",
        priority: int = 0,
    ) -> WeakCallback[None]:
        """Bind a container item (such as a dict key) to emitted value of this signal.

        Equivalent to calling `self.connect(functools.partial(obj.__setitem__, attr))`,
        but with additional weakref safety (i.e. a strong reference to `obj` will not
        be retained). The return object can be used to
        [`disconnect()`][psygnal.SignalInstance.disconnect], (or you can use
        [`disconnect_setitem()`][psygnal.SignalInstance.disconnect_setitem]).

        Parameters
        ----------
        obj : object
            An object.
        key : str
            Name of the key in `obj` that should be set to the value of this
            signal when emitted
        maxargs : Optional[int]
            max number of positional args to accept
        on_ref_error: {'raise', 'warn', 'ignore'}, optional
            What to do if a weak reference cannot be created.  If 'raise', a
            ReferenceError will be raised.  If 'warn' (default), a warning will be
            issued and a strong-reference will be used. If 'ignore' a strong-reference
            will be used (silently).
        priority : int
            The priority of the callback. This is used to determine the order in which
            callbacks are called when multiple are connected to the same signal.
            Higher priority callbacks are called first. Negative values are allowed.
            The default is 0.

        Returns
        -------
        Tuple
            (weakref.ref, name, callable).  Reference to the object, name of the
            attribute, and setitem closure.  Can be used to disconnect the slot.

        Raises
        ------
        ValueError
            If this is not a single-value signal
        TypeError
            If `obj` does not support __setitem__.

        Examples
        --------
        >>> class T:
        ...     sig = Signal(int)
        >>> t = T()
        >>> my_obj = dict()
        >>> t.sig.connect_setitem(my_obj, "x")
        >>> t.sig.emit(5)
        >>> assert my_obj == {"x": 5}
        """
        if maxargs is _NULL:
            warnings.warn(
                "The default value of maxargs will change from `None` to `1` in"
                "version 0.11. To silence this warning, provide an explicit value for "
                "maxargs (`None` for current behavior, `1` for future behavior).",
                FutureWarning,
                stacklevel=2,
            )
            maxargs = None

        if not hasattr(obj, "__setitem__"):
            raise TypeError(f"Object {obj} does not support __setitem__")

        with self._lock:
            caller = WeakSetitem(
                obj,
                key,
                max_args=cast("int | None", maxargs),
                finalize=self._try_discard,
                on_ref_error=on_ref_error,
                priority=priority,
            )
            self._append_slot(caller)

        return caller

    def disconnect_setitem(
        self, obj: object, key: str, missing_ok: bool = True
    ) -> None:
        """Disconnect a previously connected item setter.

        Parameters
        ----------
        obj : object
            An object.
        key : str
            The name of a key in `obj` that was previously used for
            `connect_setitem`.
        missing_ok : bool
            If `False` and the provided `slot` is not connected, raises `ValueError`.
            by default `True`

        Raises
        ------
        ValueError
            If `missing_ok` is `True` and no item setter is connected.
        """
        if not hasattr(obj, "__setitem__"):
            raise TypeError(f"Object {obj} does not support __setitem__")

        with self._lock:
            caller = WeakSetitem(obj, key, on_ref_error="ignore")
            self._try_discard(caller, missing_ok)

    def _check_nargs(
        self, slot: Callable, spec: Signature
    ) -> tuple[Signature | None, int | None, bool]:
        """Make sure slot is compatible with signature.

        Also returns the maximum number of arguments that we can pass to the slot

        Returns
        -------
        slot_sig : Signature | None
            The signature of the slot, or None if it could not be determined.
        maxargs : int | None
            The maximum number of arguments that we can pass to the slot.
        is_qt : bool
            Whether the slot is a Qt slot.
        """
        try:
            slot_sig = _get_signature_possibly_qt(slot)
        except ValueError as e:
            warnings.warn(
                f"{e}. To silence this warning, connect with " "`check_nargs=False`",
                stacklevel=2,
            )
            return None, None, False
        try:
            minargs, maxargs = _acceptable_posarg_range(slot_sig)
        except ValueError as e:
            if isinstance(slot, partial):
                raise ValueError(
                    f"{e}. (Note: prefer using positional args with "
                    "functools.partials when possible)."
                ) from e
            raise

        # if `slot` requires more arguments than we will provide, raise.
        if minargs > (n_spec_params := len(spec.parameters)):
            extra = (
                f"- Slot requires at least {minargs} positional "
                f"arguments, but spec only provides {n_spec_params}"
            )
            self._raise_connection_error(slot, extra)

        return None if isinstance(slot_sig, str) else slot_sig, maxargs, True

    def _raise_connection_error(self, slot: Callable, extra: str = "") -> NoReturn:
        name = getattr(slot, "__name__", str(slot))
        msg = f"Cannot connect slot {name!r} with signature: {signature(slot)}:\n"
        msg += extra
        msg += f"\n\nAccepted signature: {self.signature}"
        raise ValueError(msg)

    def _slot_index(self, slot: Callable) -> int:
        """Get index of `slot` in `self._slots`.  Return -1 if not connected."""
        with self._lock:
            normed = weak_callback(slot, on_ref_error="ignore")
            # NOTE:
            # the == method here relies on the __eq__ method of each SlotCaller subclass
            return next((i for i, s in enumerate(self._slots) if s == normed), -1)

    def disconnect(self, slot: Callable | None = None, missing_ok: bool = True) -> None:
        """Disconnect slot from signal.

        Parameters
        ----------
        slot : callable, optional
            The specific slot to disconnect.  If `None`, all slots will be disconnected,
            by default `None`
        missing_ok : Optional[bool]
            If `False` and the provided `slot` is not connected, raises `ValueError.
            by default `True`

        Raises
        ------
        ValueError
            If `slot` is not connected and `missing_ok` is False.
        """
        with self._lock:
            if slot is None:
                # NOTE: clearing an empty list is actually a RuntimeError in Qt
                self._remove_slot("all")
                return

            idx = self._slot_index(slot)
            if idx != -1:
                self._remove_slot(idx)
            elif not missing_ok:
                raise ValueError(f"slot is not connected: {slot}")

    def __contains__(self, slot: Callable) -> bool:
        """Return `True` if slot is connected."""
        return self._slot_index(slot) >= 0

    def __len__(self) -> int:
        """Return number of connected slots."""
        return len(self._slots)

    def emit(
        self, *args: Any, check_nargs: bool = False, check_types: bool = False
    ) -> None:
        """Emit this signal with arguments `args`.

        !!! note

            `check_args` and `check_types` both add overhead when calling emit.

        Parameters
        ----------
        *args : Any
            These arguments will be passed when calling each slot (unless the slot
            accepts fewer arguments, in which case extra args will be discarded.)
        check_nargs : bool
            If `False` and the provided arguments cannot be successfully bound to the
            signature of this Signal, raise `TypeError`.  Incurs some overhead.
            by default False.
        check_types : bool
            If `False` and the provided arguments do not match the types declared by
            the signature of this Signal, raise `TypeError`.  Incurs some overhead.
            by default False.

        Raises
        ------
        TypeError
            If `check_nargs` and/or `check_types` are `True`, and the corresponding
            checks fail.
        """
        if self._is_blocked:
            return

        if check_nargs:
            try:
                self.signature.bind(*args)
            except TypeError as e:
                raise TypeError(
                    f"Cannot emit args {args} from signal {self!r} with "
                    f"signature {self.signature}:\n{e}"
                ) from e

        if check_types and not _parameter_types_match(
            lambda: None, self.signature, _build_signature(*[type(a) for a in args])
        ):
            raise TypeError(
                f"Types provided to '{self.name}.emit' "
                f"{tuple(type(a).__name__ for a in args)} do not match signal "
                f"signature: {self.signature}"
            )

        if self._is_paused:
            self._args_queue.append(args)
            return

        if SignalInstance._debug_hook is not None:
            from ._group import EmissionInfo

            SignalInstance._debug_hook(EmissionInfo(self, args))

        self._run_emit_loop(args)

    def __call__(
        self, *args: Any, check_nargs: bool = False, check_types: bool = False
    ) -> None:
        """Alias for `emit()`. But prefer using `emit()` for clarity."""
        return self.emit(*args, check_nargs=check_nargs, check_types=check_types)

    def _run_emit_loop(self, args: tuple[Any, ...]) -> None:
        with self._lock:
            self._emit_queue.append(args)
            if len(self._emit_queue) > 1:
                return
            try:
                # allow receiver to query sender with Signal.current_emitter()
                self._recursion_depth += 1
                self._max_recursion_depth = max(
                    self._max_recursion_depth, self._recursion_depth
                )
                with Signal._emitting(self):
                    self._run_emit_loop_inner()
            except RecursionError as e:
                raise RecursionError(
                    f"RecursionError when "
                    f"emitting signal {self.name!r} with args {args}"
                ) from e
            except Exception as cb_err:
                if isinstance(cb_err, EmitLoopError):
                    raise cb_err
                loop_err = EmitLoopError(
                    exc=cb_err,
                    signal=self,
                    recursion_depth=self._recursion_depth - 1,
                    reemission=self._reemission,
                    emit_queue=self._emit_queue,
                ).with_traceback(cb_err.__traceback__)
                # this comment will show up in the traceback
                raise loop_err from cb_err  # emit() call ABOVE || callback error BELOW
            finally:
                self._recursion_depth -= 1
                # we're back to the root level of the emit loop, reset max_depth
                if self._recursion_depth <= 0:
                    self._max_recursion_depth = 0
                    self._recursion_depth = 0
                self._emit_queue.clear()

    def _run_emit_loop_immediate(self) -> None:
        args = self._emit_queue.popleft()
        for caller in self._slots:
            caller.cb(args)

    def _run_emit_loop_latest_only(self) -> None:
        self._args = args = self._emit_queue.popleft()
        for caller in self._slots:
            if self._recursion_depth < self._max_recursion_depth:
                # we've already entered a deeper emit loop
                # we should drop the remaining slots in this round and return
                break
            self._caller = caller
            caller.cb(args)

    def _run_emit_loop_queued(self) -> None:
        i = 0
        while i < len(self._emit_queue):
            args = self._emit_queue[i]
            for caller in self._slots:
                caller.cb(args)
                if len(self._emit_queue) > RECURSION_LIMIT:
                    raise RecursionError
            i += 1

    def block(self, exclude: Iterable[str | SignalInstance] = ()) -> None:
        """Block this signal from emitting.

        NOTE: the `exclude` argument is only for SignalGroup subclass, but we
        have to include it here to make mypyc happy.
        """
        self._is_blocked = True

    def unblock(self) -> None:
        """Unblock this signal, allowing it to emit."""
        self._is_blocked = False

    def blocked(self) -> ContextManager[None]:
        """Context manager to temporarily block this signal.

        Useful if you need to temporarily block all emission of a given signal,
        (for example, to avoid a recursive signal loop)

        Examples
        --------
        ```python
        class MyEmitter:
            changed = Signal()

            def make_a_change(self):
                self.changed.emit()

        obj = MyEmitter()

        with obj.changed.blocked()
            obj.make_a_change()  # will NOT emit a changed signal.
        ```
        """
        return _SignalBlocker(self)

    def pause(self) -> None:
        """Pause all emission and collect *args tuples from emit().

        args passed to `emit` will be collected and re-emitted when `resume()` is
        called. For a context manager version, see `paused()`.
        """
        self._is_paused = True

    def resume(self, reducer: ReducerFunc | None = None, initial: Any = _NULL) -> None:
        """Resume (unpause) this signal, emitting everything in the queue.

        Parameters
        ----------
        reducer : Callable | None
            A optional function to reduce the args collected while paused into a single
            emitted group of args.  If not provided, all emissions will be re-emitted
            as they were collected when the signal is resumed. May be:

            - a function that takes two args tuples and returns a single args tuple.
              This will be passed to `functools.reduce` and is expected to reduce all
              collected/emitted args into a single tuple.
              For example, three `emit(1)` events would be reduced and re-emitted as
              follows: `self.emit(*functools.reduce(reducer, [(1,), (1,), (1,)]))`
            - a function that takes a single argument (an iterable of args tuples) and
              returns a tuple (the reduced args). This will be *not* be passed to
              `functools.reduce`. If `reducer` is a function that takes a single
              argument, `initial` will be ignored.
        initial: any, optional
            initial value to pass to `functools.reduce`

        Examples
        --------
        >>> class T:
        ...     sig = Signal(int)
        >>> t = T()
        >>> t.sig.pause()
        >>> t.sig.emit(1)
        >>> t.sig.emit(2)
        >>> t.sig.emit(3)
        >>> t.sig.resume(lambda a, b: (a[0].union(set(b)),), (set(),))
        >>> # results in t.sig.emit({1, 2, 3})
        """
        self._is_paused = False
        # not sure why this attribute wouldn't be set, but when resuming in
        # EventedModel.update, it may be undefined (as seen in tests)
        if not getattr(self, "_args_queue", None):
            return
        if len(self._slots) == 0:
            self._args_queue.clear()
            return

        if reducer is not None:
            if len(inspect.signature(reducer).parameters) == 1:
                args = cast("ReducerOneArg", reducer)(self._args_queue)
            else:
                reducer = cast("ReducerTwoArgs", reducer)
                if initial is _NULL:
                    args = reduce(reducer, self._args_queue)
                else:
                    args = reduce(reducer, self._args_queue, initial)
            self._run_emit_loop(args)
        else:
            for args in self._args_queue:
                self._run_emit_loop(args)
        self._args_queue.clear()

    def paused(
        self, reducer: ReducerFunc | None = None, initial: Any = _NULL
    ) -> ContextManager[None]:
        """Context manager to temporarily pause this signal.

        Parameters
        ----------
        reducer : Callable | None
            A optional function to reduce the args collected while paused into a single
            emitted group of args.  If not provided, all emissions will be re-emitted
            as they were collected when the signal is resumed. May be:

            - a function that takes two args tuples and returns a single args tuple.
              This will be passed to `functools.reduce` and is expected to reduce all
              collected/emitted args into a single tuple.
              For example, three `emit(1)` events would be reduced and re-emitted as
              follows: `self.emit(*functools.reduce(reducer, [(1,), (1,), (1,)]))`
            - a function that takes a single argument (an iterable of args tuples) and
              returns a tuple (the reduced args). This will be *not* be passed to
              `functools.reduce`. If `reducer` is a function that takes a single
              argument, `initial` will be ignored.
        initial: any, optional
            initial value to pass to `functools.reduce`

        Examples
        --------
        >>> with obj.signal.paused(lambda a, b: (a[0].union(set(b)),), (set(),)):
        ...     t.sig.emit(1)
        ...     t.sig.emit(2)
        ...     t.sig.emit(3)
        >>> # results in obj.signal.emit({1, 2, 3})
        """
        return _SignalPauser(self, reducer, initial)

    def __getstate__(self) -> dict:
        """Return dict of current state, for pickle."""
        attrs = (
            "_signature",
            "_name",
            "_is_blocked",
            "_is_paused",
            "_args_queue",
            "_check_nargs_on_connect",
            "_check_types_on_connect",
            "_emit_queue",
            "_priority_in_use",
            "_reemission",
            "_max_recursion_depth",
            "_recursion_depth",
        )
        dd = {slot: getattr(self, slot) for slot in attrs}
        dd["_instance"] = self._instance()
        dd["_slots"] = [x for x in self._slots if isinstance(x, StrongFunction)]
        if len(self._slots) > len(dd["_slots"]):
            warnings.warn(
                "Pickling a SignalInstance does not copy connected weakly referenced "
                "slots.",
                stacklevel=2,
            )

        return dd

    def __setstate__(self, state: dict) -> None:
        """Restore state from pickle."""
        # don't use __dict__, mypyc doesn't have it
        for k, v in state.items():
            if k == "_instance":
                self._instance = self._instance_ref(v)
            else:
                setattr(self, k, v)
        self._lock = threading.RLock()
        if self._reemission == ReemissionMode.QUEUED:  # pragma: no cover
            self._run_emit_loop_inner = self._run_emit_loop_queued
        elif self._reemission == ReemissionMode.LATEST:  # pragma: no cover
            self._run_emit_loop_inner = self._run_emit_loop_latest_only
        else:
            self._run_emit_loop_inner = self._run_emit_loop_immediate


class _SignalBlocker:
    """Context manager to block and unblock a signal."""

    def __init__(
        self, signal: SignalInstance, exclude: Iterable[str | SignalInstance] = ()
    ) -> None:
        self._signal = signal
        self._exclude = exclude
        self._was_blocked = signal._is_blocked

    def __enter__(self) -> None:
        self._signal.block(exclude=self._exclude)

    def __exit__(self, *args: Any) -> None:
        if not self._was_blocked:
            self._signal.unblock()


class _SignalPauser:
    """Context manager to pause and resume a signal."""

    def __init__(
        self, signal: SignalInstance, reducer: ReducerFunc | None, initial: Any
    ) -> None:
        self._was_paused = signal._is_paused
        self._signal = signal
        self._reducer = reducer
        self._initial = initial

    def __enter__(self) -> None:
        self._signal.pause()

    def __exit__(self, *args: Any) -> None:
        if not self._was_paused:
            self._signal.resume(self._reducer, self._initial)


# #############################################################################
# #############################################################################


def signature(obj: Any) -> inspect.Signature:
    try:
        return inspect.signature(obj)
    except ValueError as e:
        with suppress(Exception):
            if not inspect.ismethod(obj):
                return _stub_sig(obj)
        raise e from e


_ANYSIG = Signature(
    [
        Parameter(name="args", kind=Parameter.VAR_POSITIONAL),
        Parameter(name="kwargs", kind=Parameter.VAR_KEYWORD),
    ]
)


@lru_cache(maxsize=None)
def _stub_sig(obj: Any) -> Signature:
    """Called as a backup when inspect.signature fails."""
    import builtins

    # this nonsense is here because it's hard to get the signature of mypyc-compiled
    # objects, but we still want to be able to connect a signal instance.
    if (
        type(getattr(obj, "__self__", None)) is SignalInstance
        and getattr(obj, "__name__", None) == "emit"
    ) or type(obj) is SignalInstance:
        # we won't reach this in testing because
        # Compiled functions don't trigger profiling and tracing hooks
        return _ANYSIG  # pragma: no cover

    # just a common case
    if obj is builtins.print:
        params = [
            Parameter(name="value", kind=Parameter.VAR_POSITIONAL),
            Parameter(name="sep", kind=Parameter.KEYWORD_ONLY, default=" "),
            Parameter(name="end", kind=Parameter.KEYWORD_ONLY, default="\n"),
            Parameter(name="file", kind=Parameter.KEYWORD_ONLY, default=None),
            Parameter(name="flush", kind=Parameter.KEYWORD_ONLY, default=False),
        ]
        return Signature(params)
    raise ValueError("unknown object")


def _build_signature(*types: type[Any]) -> Signature:
    params = [
        Parameter(name=f"p{i}", kind=Parameter.POSITIONAL_ONLY, annotation=t)
        for i, t in enumerate(types)
    ]
    return Signature(params)


# def f(a, /, b, c=None, *d, f=None, **g): print(locals())
#
# a: kind=POSITIONAL_ONLY,       default=Parameter.empty    # 1 required posarg
# b: kind=POSITIONAL_OR_KEYWORD, default=Parameter.empty    # 1 requires posarg
# c: kind=POSITIONAL_OR_KEYWORD, default=None               # 1 optional posarg
# d: kind=VAR_POSITIONAL,        default=Parameter.empty    # N optional posargs
# e: kind=KEYWORD_ONLY,          default=Parameter.empty    # 1 REQUIRED kwarg
# f: kind=KEYWORD_ONLY,          default=None               # 1 optional kwarg
# g: kind=VAR_KEYWORD,           default=Parameter.empty    # N optional kwargs


def _get_signature_possibly_qt(slot: Callable) -> Signature | str:
    # checking qt has to come first, since the signature of the emit method
    # of a Qt SignalInstance is just <Signature (*args: typing.Any) -> None>
    # https://bugreports.qt.io/browse/PYSIDE-1713
    sig = _guess_qtsignal_signature(slot)
    return signature(slot) if sig is None else sig


def _acceptable_posarg_range(
    sig: Signature | str, forbid_required_kwarg: bool = True
) -> tuple[int, int | None]:
    """Return tuple of (min, max) accepted positional arguments.

    Parameters
    ----------
    sig : Signature
        Signature object to evaluate
    forbid_required_kwarg : Optional[bool]
        Whether to allow required KEYWORD_ONLY parameters. by default True.

    Returns
    -------
    arg_range : Tuple[int, int]
        minimum, maximum number of acceptable positional arguments

    Raises
    ------
    ValueError
        If the signature has a required keyword_only parameter and
        `forbid_required_kwarg` is `True`.
    """
    if isinstance(sig, str):
        if "(" not in sig:  # pragma: no cover
            raise ValueError(f"Unrecognized string signature format: {sig!r}")
        inner = sig.split("(", 1)[1].split(")", 1)[0]
        minargs = maxargs = inner.count(",") + 1 if inner else 0
        return minargs, maxargs

    required = 0
    optional = 0
    posargs_unlimited = False
    _pos_required = {Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD}
    for param in sig.parameters.values():
        if param.kind in _pos_required:
            if param.default is Parameter.empty:
                required += 1
            else:
                optional += 1
        elif param.kind is Parameter.VAR_POSITIONAL:
            posargs_unlimited = True
        elif (
            param.kind is Parameter.KEYWORD_ONLY
            and param.default is Parameter.empty
            and forbid_required_kwarg
        ):
            raise ValueError(f"Unsupported KEYWORD_ONLY parameters in signature: {sig}")
    return (required, None if posargs_unlimited else required + optional)


def _parameter_types_match(
    function: Callable, spec: Signature, func_sig: Signature | None = None
) -> bool:
    """Return True if types in `function` signature match those in `spec`.

    Parameters
    ----------
    function : Callable
        A function to validate
    spec : Signature
        The Signature against which the `function` should be validated.
    func_sig : Signature, optional
        Signature for `function`, if `None`, signature will be inspected.
        by default None

    Returns
    -------
    bool
        True if the parameter types match.
    """
    fsig = func_sig or signature(function)

    func_hints: dict | None = None
    for f_param, spec_param in zip(fsig.parameters.values(), spec.parameters.values()):
        f_anno = f_param.annotation
        if f_anno is fsig.empty:
            # if function parameter is not type annotated, allow it.
            continue

        if isinstance(f_anno, str):
            if func_hints is None:
                func_hints = get_type_hints(function)
            f_anno = func_hints.get(f_param.name)

        if not _is_subclass(f_anno, spec_param.annotation):
            return False
    return True


def _is_subclass(left: type[Any], right: type) -> bool:
    """Variant of issubclass with support for unions."""
    if not isclass(left) and get_origin(left) is Union:
        return any(issubclass(i, right) for i in get_args(left))
    return issubclass(left, right)


def _guess_qtsignal_signature(obj: Any) -> str | None:
    """Return string signature if `obj` is a SignalInstance or Qt emit method.

    This is a bit of a hack, but we found no better way:
    https://stackoverflow.com/q/69976089/1631624
    https://bugreports.qt.io/browse/PYSIDE-1713
    """
    # on my machine, this takes ~700ns on PyQt5 and 8.7µs on PySide2
    type_ = type(obj)
    if "pyqtBoundSignal" in type_.__name__:
        return cast("str", obj.signal)
    qualname = getattr(obj, "__qualname__", "")
    if qualname == "pyqtBoundSignal.emit":
        return cast("str", obj.__self__.signal)

    # note: this IS all actually covered in tests... but only in the Qt tests,
    # so it (annoyingly) briefly looks like it fails coverage.
    if qualname == "SignalInstance.emit" and type_.__name__.startswith("builtin"):
        # we likely have the emit method of a SignalInstance
        # call it with ridiculous params to get the err
        return _ridiculously_call_emit(obj.__self__.emit)  # pragma: no cover
    if "SignalInstance" in type_.__name__ and "QtCore" in getattr(
        type_, "__module__", ""
    ):  # pragma: no cover
        return _ridiculously_call_emit(obj.emit)
    return None


_CRAZY_ARGS = (1,) * 255


# note: this IS all actually covered in tests... but only in the Qt tests,
# so it (annoyingly) briefly looks like it fails coverage.
def _ridiculously_call_emit(emitter: Any) -> str | None:  # pragma: no cover
    """Call SignalInstance emit() to get the signature from err message."""
    try:
        emitter(*_CRAZY_ARGS)
    except TypeError as e:
        if "only accepts" in str(e):
            return str(e).split("only accepts")[0].strip()
    return None  # pragma: no cover


_compiled: bool


def __getattr__(name: str) -> Any:
    if name == "_compiled":
        return hasattr(Signal, "__mypyc_attrs__")
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
