[Branch,~linaro-validation/lava-scheduler/trunk] Rev 239: use the --output-dir flag of lava-dispatcher to get structured data out of it rather than --oob-fd

Message ID 20130130013211.8348.27287.launchpad@ackee.canonical.com
State Accepted
Headers show

Commit Message

Michael-Doyle Hudson Jan. 30, 2013, 1:32 a.m.
Merge authors:
  Michael Hudson-Doyle (mwhudson)
Related merge proposals:
  https://code.launchpad.net/~mwhudson/lava-scheduler/use-dispatcher-output-dir/+merge/144615
  proposed by: Michael Hudson-Doyle (mwhudson)
------------------------------------------------------------
revno: 239 [merge]
committer: Michael Hudson-Doyle <michael.hudson@linaro.org>
branch nick: trunk
timestamp: Wed 2013-01-30 14:31:16 +1300
message:
  use the --output-dir flag of lava-dispatcher to get structured data out of it rather than --oob-fd
modified:
  fake-dispatcher
  lava_scheduler_app/models.py
  lava_scheduler_app/views.py
  lava_scheduler_daemon/board.py
  lava_scheduler_daemon/dbjobsource.py


--
lp:lava-scheduler
https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk

You are subscribed to branch lp:lava-scheduler.
To unsubscribe from this branch go to https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk/+edit-subscription

Patch

=== modified file 'fake-dispatcher'
--- fake-dispatcher	2012-08-30 03:58:19 +0000
+++ fake-dispatcher	2013-01-15 02:23:49 +0000
@@ -15,4 +15,3 @@ 
 echo p $i
 sleep 1
 done
-echo dashboard-put-result: http://disney.com >&3

=== modified file 'lava_scheduler_app/models.py'
--- lava_scheduler_app/models.py	2013-01-15 20:53:29 +0000
+++ lava_scheduler_app/models.py	2013-01-30 01:31:16 +0000
@@ -1,4 +1,5 @@ 
 import logging
+import os
 import simplejson
 import urlparse
 
@@ -326,6 +327,25 @@ 
     log_file = models.FileField(
         upload_to='lava-logs', default=None, null=True, blank=True)
 
+    @property
+    def output_dir(self):
+        return os.path.join(settings.MEDIA_ROOT, 'job-output', 'job-%s' % self.id)
+
+    def output_file(self):
+        output_path = os.path.join(self.output_dir, 'output.txt')
+        if os.path.exists(output_path):
+            return open(output_path)
+        elif self.log_file:
+            log_file = self.log_file
+            if log_file:
+                try:
+                    log_file.open()
+                except IOError:
+                    log_file = None
+            return log_file
+        else:
+            return None
+
     failure_tags = models.ManyToManyField(
         JobFailureTag, blank=True, related_name='failure_tags')
     failure_comment = models.TextField(null=True, blank=True)

=== modified file 'lava_scheduler_app/views.py'
--- lava_scheduler_app/views.py	2013-01-15 17:44:36 +0000
+++ lava_scheduler_app/views.py	2013-01-30 01:31:16 +0000
@@ -567,12 +567,7 @@ 
         'show_reload_page': job.status <= TestJob.RUNNING,
     }
 
-    log_file = job.log_file
-    if log_file:
-        try:
-            log_file.open()
-        except IOError:
-            log_file = None
+    log_file = job.output_file()
 
     if log_file:
         job_errors = getDispatcherErrors(log_file)
@@ -584,13 +579,16 @@ 
         for level, msg, _ in job_log_messages:
             levels[level] += 1
         levels = sorted(levels.items(), key=lambda (k,v):logging._levelNames.get(k))
+        with job.output_file() as f:
+            f.seek(0, 2)
+            job_file_size = f.tell()
         data.update({
             'job_file_present': True,
             'job_errors' : job_errors,
             'job_has_error' : len(job_errors) > 0,
             'job_log_messages' : job_log_messages,
             'levels': levels,
-            'job_file_size' : log_file.size,
+            'job_file_size' : job_file_size,
             })
     else:
         data.update({
@@ -603,12 +601,7 @@ 
 
 def job_definition(request, pk):
     job = get_restricted_job(request.user, pk)
-    log_file = job.log_file
-    if log_file:
-        try:
-            log_file.open()
-        except IOError:
-            log_file = None
+    log_file = job.output_file()
     return render_to_response(
         "lava_scheduler_app/job_definition.html",
         {
@@ -628,21 +621,24 @@ 
 @BreadCrumb("Complete log", parent=job_detail, needs=['pk'])
 def job_log_file(request, pk):
     job = get_restricted_job(request.user, pk)
-    content = formatLogFile(job.log_file)
+    content = formatLogFile(job.output_file())
+    with job.output_file() as f:
+        f.seek(0, 2)
+        job_file_size = f.tell()
     return render_to_response(
         "lava_scheduler_app/job_log_file.html",
         {
             'job': TestJob.objects.get(pk=pk),
-            'job_file_present': bool(job.log_file),
+            'job_file_present': bool(job.output_file()),
             'sections' : content,
-            'job_file_size' : job.log_file.size,
+            'job_file_size' : job_file_size,
         },
         RequestContext(request))
 
 
 def job_log_file_plain(request, pk):
     job = get_restricted_job(request.user, pk)
-    response = HttpResponse(job.log_file, mimetype='text/plain')
+    response = HttpResponse(job.output_file(), mimetype='text/plain')
     response['Content-Disposition'] = "attachment; filename=job_%d.log"%job.id
     return response
 
@@ -650,7 +646,7 @@ 
 def job_log_incremental(request, pk):
     start = int(request.GET.get('start', 0))
     job = get_restricted_job(request.user, pk)
-    log_file = job.log_file
+    log_file = job.output_file()
     log_file.seek(start)
     new_content = log_file.read()
     m = getDispatcherLogMessages(StringIO.StringIO(new_content))
@@ -665,7 +661,7 @@ 
 def job_full_log_incremental(request, pk):
     start = int(request.GET.get('start', 0))
     job = get_restricted_job(request.user, pk)
-    log_file = job.log_file
+    log_file = job.output_file()
     log_file.seek(start)
     new_content = log_file.read()
     nl_index = new_content.rfind('\n', -NEWLINE_SCAN_SIZE)
@@ -692,7 +688,7 @@ 
         return HttpResponseBadRequest("invalid start")
     count_present = 'count' in request.GET
     job = get_restricted_job(request.user, pk)
-    log_file = job.log_file
+    log_file = job.output_file()
     log_file.seek(0, os.SEEK_END)
     size = int(request.GET.get('count', log_file.tell()))
     if size - start > LOG_CHUNK_SIZE and not count_present:

=== modified file 'lava_scheduler_daemon/board.py'
--- lava_scheduler_daemon/board.py	2012-12-03 05:03:38 +0000
+++ lava_scheduler_daemon/board.py	2013-01-24 00:02:29 +0000
@@ -1,14 +1,12 @@ 
 import json
 import os
 import signal
-import sys
 import tempfile
 import logging
 
 from twisted.internet.error import ProcessDone, ProcessExitedAlready
 from twisted.internet.protocol import ProcessProtocol
 from twisted.internet import defer, task
-from twisted.protocols.basic import LineReceiver
 
 
 def catchall_errback(logger):
@@ -18,51 +16,21 @@ 
             failure.getTraceback())
     return eb
 
-OOB_FD = 3
-
-
-class OOBDataProtocol(LineReceiver):
-
-    delimiter = '\n'
-
-    def __init__(self, source, board_name, _source_lock):
-        self.logger = logging.getLogger(__name__ + '.OOBDataProtocol')
-        self.source = source
-        self.board_name = board_name
-        self._source_lock = _source_lock
-
-    def lineReceived(self, line):
-        if ':' not in line:
-            self.logger.error('malformed oob data: %r' % line)
-            return
-        key, value = line.split(':', 1)
-        self._source_lock.run(
-            self.source.jobOobData, self.board_name, key,
-            value.lstrip()).addErrback(
-                catchall_errback(self.logger))
 
 
 class DispatcherProcessProtocol(ProcessProtocol):
 
-
-    def __init__(self, deferred, log_file, job):
+    def __init__(self, deferred, job):
         self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
         self.deferred = deferred
-        self.log_file = log_file
         self.log_size = 0
         self.job = job
-        self.oob_data = OOBDataProtocol(
-            job.source, job.board_name, job._source_lock)
 
     def childDataReceived(self, childFD, data):
-        if childFD == OOB_FD:
-            self.oob_data.dataReceived(data)
-        self.log_file.write(data)
         self.log_size += len(data)
         if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
             if not self.job._killing:
                 self.job.cancel("exceeded log size limit")
-        self.log_file.flush()
 
     def childConnectionLost(self, childFD):
         self.logger.info("childConnectionLost for %s: %s",
@@ -75,13 +43,11 @@ 
     def processEnded(self, reason):
         self.logger.info("processEnded for %s: %s",
             self.job.board_name, reason.value)
-        self.log_file.close()
         self.deferred.callback(reason.value.exitCode)
 
 
 class Job(object):
 
-
     def __init__(self, job_data, dispatcher, source, board_name, reactor,
                  daemon_options):
         self.job_data = job_data
@@ -97,7 +63,7 @@ 
         self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL']
         self._time_limit_call = None
         self._killing = False
-        self.job_log_file = None
+        self._kill_reason = ''
 
     def _checkCancel(self):
         if self._killing:
@@ -108,11 +74,11 @@ 
                 self._maybeCancel)
 
     def cancel(self, reason=None):
-        if not self._killing and reason is None:
-            reason = "killing job for unknown reason"
         if not self._killing:
+            if reason is None:
+                reason = "killing job for unknown reason"
+            self._kill_reason = reason
             self.logger.info(reason)
-            self.job_log_file.write("\n%s\n" % reason.upper())
         self._killing = True
         if self._signals:
             signame = self._signals.pop(0)
@@ -137,23 +103,21 @@ 
         self.cancel("killing job for exceeding timeout")
 
     def run(self):
-        d = self.source.getLogFileForJobOnBoard(self.board_name)
+        d = self.source.getOutputDirForJobOnBoard(self.board_name)
         return d.addCallback(self._run).addErrback(
             catchall_errback(self.logger))
 
-    def _run(self, job_log_file):
+    def _run(self, output_dir):
         d = defer.Deferred()
         json_data = self.job_data
         fd, self._json_file = tempfile.mkstemp()
         with os.fdopen(fd, 'wb') as f:
             json.dump(json_data, f)
-        self._protocol = DispatcherProcessProtocol(
-            d, job_log_file, self)
-        self.job_log_file = job_log_file
+        self._protocol = DispatcherProcessProtocol(d, self)
         self.reactor.spawnProcess(
             self._protocol, self.dispatcher, args=[
-                self.dispatcher, self._json_file, '--oob-fd', str(OOB_FD)],
-            childFDs={0:0, 1:'r', 2:'r', OOB_FD:'r'}, env=None)
+                self.dispatcher, self._json_file, '--output-dir', output_dir],
+            childFDs={0:0, 1:'r', 2:'r'}, env=None)
         self._checkCancel_call.start(10)
         timeout = max(
             json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT'])
@@ -171,8 +135,11 @@ 
             self._time_limit_call.cancel()
         self._checkCancel_call.stop()
         return self._source_lock.run(
-            self.source.jobCompleted, self.board_name, exit_code).addCallback(
-            lambda r:exit_code)
+            self.source.jobCompleted,
+            self.board_name,
+            exit_code,
+            self._killing).addCallback(
+                lambda r:exit_code)
 
 
 class SchedulerMonitorPP(ProcessProtocol):

=== modified file 'lava_scheduler_daemon/dbjobsource.py'
--- lava_scheduler_daemon/dbjobsource.py	2012-12-03 05:03:38 +0000
+++ lava_scheduler_daemon/dbjobsource.py	2013-01-24 00:19:27 +0000
@@ -1,10 +1,10 @@ 
 import datetime
 import logging
 import os
+import shutil
 import urlparse
 
 from dashboard_app.models import Bundle
-import django.core.exceptions
 
 from django.contrib.auth.models import User
 from django.core.files.base import ContentFile
@@ -198,6 +198,7 @@ 
                 job.start_time = datetime.datetime.utcnow()
                 job.actual_device = device
                 device.status = Device.RUNNING
+                shutil.rmtree(job.output_dir, ignore_errors=True)
                 device.current_job = job
                 try:
                     # The unique constraint on current_job may cause this to
@@ -229,18 +230,15 @@ 
     def getJobForBoard(self, board_name):
         return self.deferForDB(self.getJobForBoard_impl, board_name)
 
-    def getLogFileForJobOnBoard_impl(self, board_name):
+    def getOutputDirForJobOnBoard_impl(self, board_name):
         device = Device.objects.get(hostname=board_name)
         job = device.current_job
-        log_file = job.log_file
-        log_file.file.close()
-        log_file.open('wb')
-        return log_file
-
-    def getLogFileForJobOnBoard(self, board_name):
-        return self.deferForDB(self.getLogFileForJobOnBoard_impl, board_name)
-
-    def jobCompleted_impl(self, board_name, exit_code):
+        return job.output_dir
+
+    def getOutputDirForJobOnBoard(self, board_name):
+        return self.deferForDB(self.getOutputDirForJobOnBoard_impl, board_name)
+
+    def jobCompleted_impl(self, board_name, exit_code, kill_reason):
         self.logger.debug('marking job as complete on %s', board_name)
         device = Device.objects.get(hostname=board_name)
         old_device_status = device.status
@@ -279,6 +277,19 @@ 
                 elif job.status == TestJob.COMPLETE:
                     device.health_status = Device.HEALTH_PASS
 
+        bundle_file = os.path.join(job.output_dir, 'result-bundle')
+        if os.path.exists(bundle_file):
+            with open(bundle_file) as f:
+                results_link = f.read().strip()
+            job._results_link = results_link
+            sha1 = results_link.strip('/').split('/')[-1]
+            try:
+                bundle = Bundle.objects.get(content_sha1=sha1)
+            except Bundle.DoesNotExist:
+                pass
+            else:
+                job._results_bundle = bundle
+
         job.end_time = datetime.datetime.utcnow()
         token = job.submit_token
         job.submit_token = None
@@ -294,27 +305,8 @@ 
                 'sending job summary mails for job %r failed', job.pk)
         transaction.commit()
 
-    def jobCompleted(self, board_name, exit_code):
-        return self.deferForDB(self.jobCompleted_impl, board_name, exit_code)
-
-    def jobOobData_impl(self, board_name, key, value):
-        self.logger.info(
-            "oob data received for %s: %s: %s", board_name, key, value)
-        if key == 'dashboard-put-result':
-            device = Device.objects.get(hostname=board_name)
-            device.current_job._results_link = value
-            sha1 = value.strip('/').split('/')[-1]
-            try:
-                bundle = Bundle.objects.get(content_sha1=sha1)
-            except Bundle.DoesNotExist:
-                pass
-            else:
-                device.current_job._results_bundle = bundle
-            device.current_job.save()
-            transaction.commit()
-
-    def jobOobData(self, board_name, key, value):
-        return self.deferForDB(self.jobOobData_impl, board_name, key, value)
+    def jobCompleted(self, board_name, exit_code, kill_reason):
+        return self.deferForDB(self.jobCompleted_impl, board_name, exit_code, kill_reason)
 
     def jobCheckForCancellation_impl(self, board_name):
         device = Device.objects.get(hostname=board_name)