Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edbaunton/buildgrid
  • BuildGrid/buildgrid
  • bloomberg/buildgrid
  • devcurmudgeon/buildgrid
  • mhadjimichael/buildgrid
  • jmacarthur/buildgrid
  • rkothur/buildgrid
  • valentindavid/buildgrid
  • jjardon/buildgrid
  • RichKen/buildgrid
  • jbonney/buildgrid
  • onsha_alexander/buildgrid
  • santigl/buildgrid
  • mostynb/buildgrid
  • hoffbrinkle/buildgrid
  • Malinskiy/buildgrid
  • coldtom/buildgrid
  • azeemb_a/buildgrid
  • pointswaves/buildgrid
  • BenjaminSchubert/buildgrid
  • michaellee8/buildgrid
  • anil-anil/buildgrid
  • seanborg/buildgrid
  • jdelong12/buildgrid
  • jclay/buildgrid
  • bweston92/buildgrid
  • zchen723/buildgrid
  • cpratt34/buildgrid
  • armbiant/apache-buildgrid
  • armbiant/android-buildgrid
  • itsme300/buildgrid
  • sbairoliya/buildgrid
32 results
Show changes
Commits on Source (2)
......@@ -9,8 +9,23 @@ We welcome contributions in the form of bug fixes or feature additions / enhance
Any major feature additions should be raised as a proposal on the `Mailing List <https://lists.buildgrid.build/cgi-bin/mailman/listinfo/buildgrid/>`_ to be discussed, and then eventually followed up with an issue here on gitlab. We recommend that you propose the feature in advance of commencing work. We are also on irc, but do not have our own dedicated channel - you can find us on #buildstream on GIMPNet and #bazel on freenode.
The author of any patch is expected to take ownership of that code and is to support it for a reasonable
time-frame. This means addressing any unforeseen side effects and quirks the feature may have introduced.
The author of any patch is expected to take ownership of that code and is to support it for a reasonable time-frame. This means addressing any unforeseen side effects and quirks the feature may have introduced. More on this below in 'Granting Committer Access'.
Granting Committer Access
-------------------------
We'll hand out commit access to anyone who has successfully landed a single patch to the code base. Please request this via irc or the mailing list.
This of course relies on contributors being responsive and show willingness to address problems after landing branches there should not be any problems here.
What we are expecting of committers here in general is basically to
escalate the review in cases of uncertainty:
* If the patch/branch is very trivial (obvious few line changes or typos etc), and you are confident of the change, there is no need for review.
* If the patch/branch is non trivial, please obtain a review from another committer who is familiar with the area which the branch effects. An approval from someone who is not the patch author will be needed before any merge.
We don't have any detailed policy for "bad actors", but will of course handle things on a case by case basis - commit access should not result in commit wars or be used as a tool to subvert the project when disagreements arise, such incidents (if any) would surely lead to temporary suspension of commit rights.
Patch Submissions
-----------------
......
......@@ -34,12 +34,12 @@ class ExecutionInstance():
self.logger = logging.getLogger(__name__)
self._scheduler = scheduler
def execute(self, action_digest, skip_cache_lookup):
def execute(self, action_digest, skip_cache_lookup, message_queue=None):
""" Sends a job for execution.
Queues an action and creates an Operation instance to be associated with
this action.
"""
job = Job(action_digest)
job = Job(action_digest, message_queue)
self.logger.info("Operation name: {}".format(job.name))
if not skip_cache_lookup:
......@@ -70,3 +70,15 @@ class ExecutionInstance():
def cancel_operation(self, name):
# TODO: Cancel leases
raise NotImplementedError("Cancelled operations not supported")
def register_message_client(self, name, queue):
try:
self._scheduler.register_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
def unregister_message_client(self, name, queue):
try:
self._scheduler.unregister_client(name, queue)
except KeyError:
raise InvalidArgumentError("Operation name does not exist: {}".format(name))
......@@ -22,10 +22,9 @@ ExecutionService
Serves remote execution requests.
"""
import copy
import grpc
import logging
import time
import queue
from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
......@@ -35,21 +34,27 @@ from ._exceptions import InvalidArgumentError
class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def __init__(self, instance):
self._instance = instance
self.logger = logging.getLogger(__name__)
self._instance = instance
def Execute(self, request, context):
# Ignore request.instance_name for now
# Have only one instance
try:
message_queue = queue.Queue()
operation = self._instance.execute(request.action_digest,
request.skip_cache_lookup)
request.skip_cache_lookup,
message_queue)
yield from self._stream_operation_updates(operation.name)
remove_client = lambda : self._remove_client(operation.name, message_queue)
context.add_callback(remove_client)
yield from self._stream_operation_updates(message_queue,
operation.name)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_details(sxtr(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
except NotImplementedError as e:
......@@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
def WaitExecution(self, request, context):
try:
yield from self._stream_operation_updates(request.name)
message_queue = queue.Queue()
operation_name = request.name
self._instance.register_message_client(operation_name, message_queue)
remove_client = lambda : self._remove_client(operation_name, message_queue)
context.add_callback(remove_client)
yield from self._stream_operation_updates(message_queue,
operation_name)
except InvalidArgumentError as e:
self.logger.error(e)
context.set_details(str(e))
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
def _stream_operation_updates(self, name):
stream_previous = None
while True:
stream = self._instance.get_operation(name)
if stream != stream_previous:
yield stream
if stream.done == True: break
stream_previous = copy.deepcopy(stream)
time.sleep(1)
def _remove_client(self, operation_name, message_queue):
self._instance.unregister_message_client(operation_name, message_queue)
def _stream_operation_updates(self, message_queue, operation_name):
operation = message_queue.get()
while not operation.done:
yield operation
operation = message_queue.get()
yield operation
......@@ -51,9 +51,8 @@ class LeaseState(Enum):
class Job():
def __init__(self, action):
self.action = action
self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
def __init__(self, action_digest, message_queue=None):
self.action_digest = action_digest
self.execute_stage = ExecuteStage.UNKNOWN
self.lease = None
self.logger = logging.getLogger(__name__)
......@@ -62,10 +61,24 @@ class Job():
self._n_tries = 0
self._operation = operations_pb2.Operation(name = self.name)
self._operation_update_queues = []
if message_queue is not None:
self.register_client(message_queue)
def check_job_finished(self):
if not self._operation_update_queues:
return self._operation.done
return False
def register_client(self, queue):
self._operation_update_queues.append(queue)
def unregister_client(self, queue):
self._operation_update_queues.remove(queue)
def get_operation(self):
self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
if self.result is not None:
self._operation.done = True
response = ExecuteResponse()
......@@ -81,10 +94,10 @@ class Job():
return meta
def create_lease(self):
action = self._pack_any(self.action)
action_digest = self._pack_any(self.action_digest)
lease = bots_pb2.Lease(id = self.name,
payload = action,
payload = action_digest,
state = LeaseState.PENDING.value)
self.lease = lease
return lease
......@@ -92,6 +105,11 @@ class Job():
def get_operations(self):
return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
def update_execute_stage(self, stage):
self.execute_stage = stage
for queue in self._operation_update_queues:
queue.put(self.get_operation())
def _pack_any(self, pack):
any = any_pb2.Any()
any.Pack(pack)
......
......@@ -35,8 +35,17 @@ class Scheduler():
self.jobs = {}
self.queue = deque()
def register_client(self, name, queue):
self.jobs[name].register_client(queue)
def unregister_client(self, name, queue):
job = self.jobs[name]
job.unregister_client(queue)
if job.check_job_finished():
del self.jobs[name]
def append_job(self, job):
job.execute_stage = ExecuteStage.QUEUED
job.update_execute_stage(ExecuteStage.QUEUED)
self.jobs[job.name] = job
self.queue.append(job)
......@@ -45,9 +54,9 @@ class Scheduler():
if job.n_tries >= self.MAX_N_TRIES:
# TODO: Decide what to do with these jobs
job.execute_stage = ExecuteStage.COMPLETED
job.update_execute_stage(ExecuteStage.COMPLETED)
else:
job.execute_stage = ExecuteStage.QUEUED
job.update_execute_stage(ExecuteStage.QUEUED)
job.n_tries += 1
self.queue.appendleft(job)
......@@ -56,15 +65,14 @@ class Scheduler():
def create_job(self):
if len(self.queue) > 0:
job = self.queue.popleft()
job.execute_stage = ExecuteStage.EXECUTING
job.update_execute_stage(ExecuteStage.EXECUTING)
self.jobs[job.name] = job
return job
return None
def job_complete(self, name, result):
job = self.jobs[name]
job.execute_stage = ExecuteStage.COMPLETED
job.result = result
job.update_execute_stage(ExecuteStage.COMPLETED)
self.jobs[name] = job
def get_operations(self):
......@@ -122,3 +130,7 @@ class Scheduler():
if state == LeaseState.PENDING.value or \
state == LeaseState.ACTIVE.value:
self.retry_job(name)
def _update_execute_stage(self, job, stage):
job.update_execute_stage(stage)
return job
......@@ -72,14 +72,17 @@ def test_wait_execution(instance, context):
action_digest = remote_execution_pb2.Digest()
action_digest.hash = 'zhora'
execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
action_digest = action_digest,
skip_cache_lookup = True)
execution_response = next(instance.Execute(execution_request, context))
j = job.Job(action_digest, None)
j._operation.done = True
request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
instance._instance._scheduler.jobs[j.name] = j
response = next(instance.WaitExecution(request, context))
action_result_any = any_pb2.Any()
action_result = remote_execution_pb2.ActionResult()
action_result_any.Pack(action_result)
assert response == execution_response
instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
response = instance.WaitExecution(request, context)