From b1c0ef094b94ec2105a7b86e2c18217544755f2c Mon Sep 17 00:00:00 2001
From: Martin Blanchard <martin.blanchard@codethink.co.uk>
Date: Tue, 30 Oct 2018 13:08:06 +0000
Subject: [PATCH] Keep track of peers watching jobs

https://gitlab.com/BuildGrid/buildgrid/issues/75
---
 buildgrid/server/execution/instance.py  | 24 +++++++--------
 buildgrid/server/execution/service.py   | 16 ++++++----
 buildgrid/server/job.py                 | 30 ++++++++++--------
 buildgrid/server/operations/instance.py | 23 --------------
 buildgrid/server/scheduler.py           | 41 ++++++++++++++++++++-----
 5 files changed, 72 insertions(+), 62 deletions(-)

diff --git a/buildgrid/server/execution/instance.py b/buildgrid/server/execution/instance.py
index d1574dc56..7d72a2b8a 100644
--- a/buildgrid/server/execution/instance.py
+++ b/buildgrid/server/execution/instance.py
@@ -21,7 +21,7 @@ An instance of the Remote Execution Service.
 
 import logging
 
-from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
+from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError, NotFoundError
 from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
 
 from ..job import Job
@@ -46,7 +46,7 @@ class ExecutionInstance:
     def hash_type(self):
         return get_hash_type()
 
-    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
+    def execute(self, action_digest, skip_cache_lookup, peer=None, message_queue=None):
         """ Sends a job for execution.
         Queues an action and creates an Operation instance to be associated with
         this action.
@@ -58,26 +58,26 @@ class ExecutionInstance:
             raise FailedPreconditionError("Could not get action from storage.")
 
         job = Job(action, action_digest)
-        if message_queue is not None:
-            job.register_client(message_queue)
+        if peer is not None and message_queue is not None:
+            job.register_operation_client(peer, message_queue)
 
         self._scheduler.queue_job(job, skip_cache_lookup)
 
         return job.operation
 
-    def register_message_client(self, name, queue):
+    def register_operation_client(self, job_name, peer, message_queue):
         try:
-            self._scheduler.register_client(name, queue)
+            self._scheduler.register_operation_client(job_name, peer, message_queue)
 
-        except KeyError:
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
+        except NotFoundError:
+            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
 
-    def unregister_message_client(self, name, queue):
+    def unregister_operation_client(self, job_name, peer):
         try:
-            self._scheduler.unregister_client(name, queue)
+            self._scheduler.unregister_operation_client(job_name, peer)
 
-        except KeyError:
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
+        except NotFoundError:
+            raise InvalidArgumentError("Operation name does not exist: [{}]".format(job_name))
 
     def stream_operation_updates(self, message_queue, operation_name):
         job = message_queue.get()
diff --git a/buildgrid/server/execution/service.py b/buildgrid/server/execution/service.py
index 111f4a8e5..64477b06d 100644
--- a/buildgrid/server/execution/service.py
+++ b/buildgrid/server/execution/service.py
@@ -96,12 +96,14 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
 
         try:
             instance = self._get_instance(instance_name)
+
             operation = instance.execute(request.action_digest,
                                          request.skip_cache_lookup,
-                                         message_queue)
+                                         peer=peer,
+                                         message_queue=message_queue)
 
             context.add_callback(partial(self._rpc_termination_callback,
-                                         peer, instance_name, operation.name, message_queue))
+                                         peer, instance_name, operation.name))
 
             if self._is_instrumented:
                 if peer not in self.__peers:
@@ -157,9 +159,11 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
         try:
             instance = self._get_instance(instance_name)
 
-            instance.register_message_client(operation_name, message_queue)
+            instance.register_operation_client(operation_name,
+                                               peer, message_queue)
+
             context.add_callback(partial(self._rpc_termination_callback,
-                                         peer, instance_name, operation_name, message_queue))
+                                         peer, instance_name, operation_name))
 
             if self._is_instrumented:
                 if peer not in self.__peers:
@@ -208,10 +212,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
 
     # --- Private API ---
 
-    def _rpc_termination_callback(self, peer, instance_name, job_name, message_queue):
+    def _rpc_termination_callback(self, peer, instance_name, job_name):
         instance = self._get_instance(instance_name)
 
-        instance.unregister_message_client(job_name, message_queue)
+        instance.unregister_operation_client(job_name, peer)
 
         if self._is_instrumented:
             if self.__peers[peer] > 1:
diff --git a/buildgrid/server/job.py b/buildgrid/server/job.py
index 950cb0b48..10cb35700 100644
--- a/buildgrid/server/job.py
+++ b/buildgrid/server/job.py
@@ -46,6 +46,7 @@ class Job:
         self.__worker_start_timestamp = timestamp_pb2.Timestamp()
         self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
 
+        self.__operation_message_queues = {}
         self.__operation_cancelled = False
         self.__lease_cancelled = False
 
@@ -54,7 +55,6 @@ class Job:
 
         self._action.CopyFrom(action)
         self._do_not_cache = self._action.do_not_cache
-        self._operation_update_queues = []
         self._operation.name = self._name
         self._operation.done = False
         self._n_tries = 0
@@ -132,26 +132,30 @@ class Job:
 
     @property
     def n_clients(self):
-        return len(self._operation_update_queues)
+        return len(self.__operation_message_queues)
 
-    def register_client(self, queue):
-        """Subscribes to the job's :class:`Operation` stage change events.
+    def register_operation_client(self, peer, message_queue):
+        """Subscribes to the job's :class:`Operation` stage changes.
 
         Queues this :object:`Job` instance.
 
         Args:
-            queue (queue.Queue): the event queue to register.
+            peer (str): a unique string identifying the client.
+            message_queue (queue.Queue): the event queue to register.
         """
-        self._operation_update_queues.append(queue)
-        queue.put(self)
+        if peer not in self.__operation_message_queues:
+            self.__operation_message_queues[peer] = message_queue
 
-    def unregister_client(self, queue):
-        """Unsubscribes to the job's :class:`Operation` stage change events.
+        message_queue.put(self)
+
+    def unregister_operation_client(self, peer):
+        """Unsubscribes to the job's :class:`Operation` stage change.
 
         Args:
-            queue (queue.Queue): the event queue to unregister.
+            peer (str): a unique string identifying the client.
         """
-        self._operation_update_queues.remove(queue)
+        if peer not in self.__operation_message_queues:
+            del self.__operation_message_queues[peer]
 
     def set_cached_result(self, action_result):
         """Allows specifying an action result form the action cache for the job.
@@ -262,8 +266,8 @@ class Job:
 
         self._operation.metadata.Pack(self.__operation_metadata)
 
-        for queue in self._operation_update_queues:
-            queue.put(self)
+        for message_queue in self.__operation_message_queues.values():
+            message_queue.put(self)
 
     def check_operation_status(self):
         """Reports errors on unexpected job's :class:Operation state.
diff --git a/buildgrid/server/operations/instance.py b/buildgrid/server/operations/instance.py
index abacd5c97..d8d00ce4a 100644
--- a/buildgrid/server/operations/instance.py
+++ b/buildgrid/server/operations/instance.py
@@ -75,26 +75,3 @@ class OperationsInstance:
 
         except KeyError:
             raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
-
-    def register_message_client(self, name, queue):
-        try:
-            self._scheduler.register_client(name, queue)
-
-        except KeyError:
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
-
-    def unregister_message_client(self, name, queue):
-        try:
-            self._scheduler.unregister_client(name, queue)
-
-        except KeyError:
-            raise InvalidArgumentError("Operation name does not exist: [{}]".format(name))
-
-    def stream_operation_updates(self, message_queue, operation_name):
-        job = message_queue.get()
-        while not job.operation.done:
-            yield job.operation
-            job = message_queue.get()
-            job.check_operation_status()
-
-        yield job.operation
diff --git a/buildgrid/server/scheduler.py b/buildgrid/server/scheduler.py
index fdb5a2523..f2781cf5d 100644
--- a/buildgrid/server/scheduler.py
+++ b/buildgrid/server/scheduler.py
@@ -52,18 +52,43 @@ class Scheduler:
 
     # --- Public API ---
 
-    def register_client(self, job_name, queue):
-        job = self.jobs[job_name]
+    def register_operation_client(self, job_name, peer, message_queue):
+        """Subscribes to one of the job's :class:`Operation` stage changes.
 
-        job.register_client(queue)
+        Args:
+            job_name (str): name of the job subscribe to.
+            peer (str): a unique string identifying the client.
+            message_queue (queue.Queue): the event queue to register.
 
-    def unregister_client(self, job_name, queue):
-        job = self.jobs[job_name]
+        Raises:
+            NotFoundError: If no job with `job_name` exists.
+        """
+        try:
+            job = self.jobs[job_name]
+        except KeyError:
+            raise NotFoundError('No job named {} found.'.format(job_name))
+
+        job.register_operation_client(peer, message_queue)
+
+    def unregister_operation_client(self, job_name, peer):
+        """Unsubscribes to one of the job's :class:`Operation` stage change.
+
+        Args:
+            job_name (str): name of the job to unsubscribe from.
+            peer (str): a unique string identifying the client.
+
+        Raises:
+            NotFoundError: If no job with `job_name` exists.
+        """
+        try:
+            job = self.jobs[job_name]
+        except KeyError:
+            raise NotFoundError('No job named {} found.'.format(job_name))
 
-        job.unregister_client(queue)
+        job.unregister_operation_client(peer)
 
-        if not job.n_clients and job.operation.done:
-            del self.jobs[job_name]
+        if job.n_clients == 0 and job.operation.done:
+            del self.jobs[job.name]
 
             if self._is_instrumented:
                 self.__operations_by_stage[OperationStage.CACHE_CHECK].discard(job_name)
-- 
GitLab