
    cn                         d Z ddlmZ ddlZddlZddlZddlZddlZddlZ	 ddl	Z	n# e
$ r ddlZ	Y nw xY wddlZddlmZ ddlmZ  ej        d          ZdZdZ G d d	          Zd
 Zedk    r e             dS dS )a/  Worker ("slave") process used in computing distributed Latent Dirichlet Allocation
(LDA, :class:`~gensim.models.ldamodel.LdaModel`).

Run this script on every node in your cluster. If you wish, you may even run it multiple times on a single machine,
to make better use of multiple cores (just beware that memory footprint increases accordingly).


How to use distributed :class:`~gensim.models.ldamodel.LdaModel`
----------------------------------------------------------------

#. Install needed dependencies (Pyro4) ::

    pip install gensim[distributed]

#. Setup serialization (on each machine) ::

    export PYRO_SERIALIZERS_ACCEPTED=pickle
    export PYRO_SERIALIZER=pickle

#. Run nameserver ::

    python -m Pyro4.naming -n 0.0.0.0 &

#. Run workers (on each machine) ::

    python -m gensim.models.lda_worker &

#. Run dispatcher ::

    python -m gensim.models.lda_dispatcher &

#. Run :class:`~gensim.models.ldamodel.LdaModel` in distributed mode :

.. sourcecode:: pycon

    >>> from gensim.test.utils import common_corpus, common_dictionary
    >>> from gensim.models import LdaModel
    >>>
    >>> model = LdaModel(common_corpus, id2word=common_dictionary, distributed=True)


Command line arguments
----------------------

.. program-output:: python -m gensim.models.lda_worker --help
   :ellipsis: 0, -7

    )with_statementN)ldamodel)utilszgensim.models.lda_workerzgensim.lda_workerc                   |   e Zd ZdZd Zej        d             Zej        ej        d                         Z	 e
j        d          d             Zej        d             Zej         e
j        d          d                         Zej         e
j        d          d	                         Zej        d
             ZdS )WorkerzUsed as a Pyro4 class with exposed methods.

    Exposes every non-private method and property of the class automatically to be available for remote access.

    c                     d| _         dS )zPartly initialize the model.N)modelselfs    8lib/python3.11/site-packages/gensim/models/lda_worker.py__init__zWorker.__init__X   s    


    c                     t          j                    | _        d| _        || _        || _        d| _        t                              d|           t          j
        di || _        dS )a  Fully initialize the worker.

        Parameters
        ----------
        myid : int
            An ID number used to identify this worker in the dispatcher object.
        dispatcher : :class:`~gensim.models.lda_dispatcher.Dispatcher`
            The dispatcher responsible for scheduling this worker.
        **model_params
            Keyword parameters to initialize the inner LDA model,see :class:`~gensim.models.ldamodel.LdaModel`.

        r   Fzinitializing worker #%sN )	threadingLocklock_updatejobsdonemyid
dispatcherfinishedloggerinfor   LdaModelr	   )r   r   r   model_paramss       r   
initializezWorker.initialize\   sa     %>++	$-t444&6666


r   c                    | j         t          d          d}|F| j        s?	 | j                            | j                  }n# t          j        $ r Y ;w xY w|| j        ?|\t          	                    d| j        | j
                   |                     |           | j                            | j                   dS t          	                    d| j                   dS )a   Request jobs from the dispatcher, in a perpetual loop until :meth:`gensim.models.lda_worker.Worker.getstate`
        is called.

        Raises
        ------
        RuntimeError
            If `self.model` is None (i.e. worker non initialized).

        Nz0worker must be initialized before receiving jobszworker #%s received job #%iz#worker #%i stopping asking for jobs)r	   RuntimeErrorr   r   getjobr   QueueEmptyr   r   r   
processjobjobdone)r   jobs     r   
requestjobzWorker.requestjobs   s     : 	SQRRR 	$- 	o,,TY77;     	$- 	  	JKK5ty$-PPPOOC   O##DI.....KK=tyIIIIIs   A AAr   c                    t                               d| j                   | j                            |           | xj        dz  c_        t
          r^| j        t
          z  dk    rKt          j                            t          j
                    d          }| j                            |           t                               d| j        dz
             dS )zIncrementally process the job and potentially logs progress.

        Parameters
        ----------
        job : iterable of list of (int, float)
            Corpus in BoW format.

        zstarting to process job #%i   r   zlda_worker.pklzfinished processing job #%iN)r   debugr   r	   do_estep
SAVE_DEBUGospathjointempfile
gettempdirsaver   )r   r$   fnames      r   r"   zWorker.processjob   s     	2DMBBB
C    	#$-*49 	#GLL!4!6!68HIIEJOOE"""14=13DEEEEEr   c                     dS )z"Test the connectivity with Worker.Tr   r
   s    r   pingzWorker.ping   s	     tr   c                     t                               d| j        | j                   | j        j        }t          |t          j                  sJ | j        	                                 d| _
        |S )zLog and get the LDA model's current state.

        Returns
        -------
        result : :class:`~gensim.models.ldamodel.LdaState`
            The current state.

        z,worker #%i returning its state after %s jobsT)r   r   r   r   r	   state
isinstancer   LdaStateclearr   )r   results     r   getstatezWorker.getstate   sb     	BDIt}]]]!&("344444
r   c                     |J t                               d| j                   || j        _        | j                                         | j        j                                         d| _        dS )zReset the worker by setting sufficient stats to 0.

        Parameters
        ----------
        state : :class:`~gensim.models.ldamodel.LdaState`
            Encapsulates information for distributed computation of LdaModel objects.

        Nzresetting worker #%iF)r   r   r   r	   r5   
sync_stateresetr   )r   r5   s     r   r=   zWorker.reset   sg        *DI666 


   r   c                 n    t                               d| j                   t          j        d           dS )zTerminate the worker.zterminating worker #%ir   N)r   r   r   r+   _exitr
   s    r   exitzWorker.exit   s,     	,di888
r   N)__name__
__module____qualname____doc__r   Pyro4exposer   onewayr%   r   synchronousr"   r3   r:   r=   r@   r   r   r   r   r   Q   sF           \7 7 \7, \
\J J \ \J6 U}%%F F &%F" \  \ \U}%%  &% \  \U}%%  &% \  \  \  r   r   c            	      V   t          j        t          d d         t           j                  } |                     ddd            |                     ddd t
                     |                     d	d
ddd           |                     ddd            |                     dddddt          j        t          j                   | 	                                }t          j
        d|j                   t                              dd                    t          j                             |j        |j        |j        |j        d}t+          j        t.          t1                      d|           t                              dd                    t          j                             d S )Ni~)descriptionformatter_classz--hostz*Nameserver hostname (default: %(default)s))helpdefaultz--portz&Nameserver port (default: %(default)s))rL   rM   typez--no-broadcastz(Disable broadcast (default: %(default)s)store_constTF)rL   actionrM   constz--hmacz*Nameserver hmac key (default: %(default)s)z-vz	--verbosezVerbose flagloglevel)rL   rP   destrQ   rM   z)%(asctime)s : %(levelname)s : %(message)s)formatlevelz
running %s )	broadcasthostporthmac_key)random_suffixns_confzfinished running %s)argparseArgumentParserrD   RawTextHelpFormatteradd_argumentintloggingINFOWARNING
parse_argsbasicConfigrR   r   r   r-   sysargvno_broadcastrX   rY   hmacr   pyro_daemonLDA_WORKER_PREFIXr   )parserargsr\   s      r   mainro      s   $$QYQnoooF
'S]abbb
'OY]dghhh
IR_E     'S]abbb
k}:lGO     DJRVR_````
KKchhsx00111 &		I	 G 
'wWWWW
KK%sxx'9'9:::::r   __main__)rD   
__future__r   r+   rg   rb   r   r.   r]   r    ImportErrorqueuerE   gensim.modelsr   gensimr   	getLoggerr   r*   rl   r   ro   rA   r   r   r   <module>rw      sO  / /b & % % % % % 				 



       LLLL     " " " " " "      		5	6	6 
' ~ ~ ~ ~ ~ ~ ~ ~B; ; ;8 z DFFFFF s   ' 	33