=== modified file 'fake-dispatcher'
@@ -15,4 +15,3 @@
echo p $i
sleep 1
done
-echo dashboard-put-result: http://disney.com >&3
=== modified file 'lava_scheduler_app/models.py'
@@ -1,4 +1,5 @@
import logging
+import os
import simplejson
import urlparse
@@ -326,6 +327,25 @@
log_file = models.FileField(
upload_to='lava-logs', default=None, null=True, blank=True)
+ @property
+ def output_dir(self):
+ return os.path.join(settings.MEDIA_ROOT, 'job-output', 'job-%s' % self.id)
+
+ def output_file(self):
+ output_path = os.path.join(self.output_dir, 'output.txt')
+ if os.path.exists(output_path):
+ return open(output_path)
+ elif self.log_file:
+ log_file = self.log_file
+ if log_file:
+ try:
+ log_file.open()
+ except IOError:
+ log_file = None
+ return log_file
+ else:
+ return None
+
failure_tags = models.ManyToManyField(
JobFailureTag, blank=True, related_name='failure_tags')
failure_comment = models.TextField(null=True, blank=True)
=== modified file 'lava_scheduler_app/views.py'
@@ -567,12 +567,7 @@
'show_reload_page': job.status <= TestJob.RUNNING,
}
- log_file = job.log_file
- if log_file:
- try:
- log_file.open()
- except IOError:
- log_file = None
+ log_file = job.output_file()
if log_file:
job_errors = getDispatcherErrors(log_file)
@@ -584,13 +579,16 @@
for level, msg, _ in job_log_messages:
levels[level] += 1
levels = sorted(levels.items(), key=lambda (k,v):logging._levelNames.get(k))
+ with job.output_file() as f:
+ f.seek(0, 2)
+ job_file_size = f.tell()
data.update({
'job_file_present': True,
'job_errors' : job_errors,
'job_has_error' : len(job_errors) > 0,
'job_log_messages' : job_log_messages,
'levels': levels,
- 'job_file_size' : log_file.size,
+ 'job_file_size' : job_file_size,
})
else:
data.update({
@@ -603,12 +601,7 @@
def job_definition(request, pk):
job = get_restricted_job(request.user, pk)
- log_file = job.log_file
- if log_file:
- try:
- log_file.open()
- except IOError:
- log_file = None
+ log_file = job.output_file()
return render_to_response(
"lava_scheduler_app/job_definition.html",
{
@@ -628,21 +621,24 @@
@BreadCrumb("Complete log", parent=job_detail, needs=['pk'])
def job_log_file(request, pk):
job = get_restricted_job(request.user, pk)
- content = formatLogFile(job.log_file)
+ content = formatLogFile(job.output_file())
+ with job.output_file() as f:
+ f.seek(0, 2)
+ job_file_size = f.tell()
return render_to_response(
"lava_scheduler_app/job_log_file.html",
{
'job': TestJob.objects.get(pk=pk),
- 'job_file_present': bool(job.log_file),
+ 'job_file_present': bool(job.output_file()),
'sections' : content,
- 'job_file_size' : job.log_file.size,
+ 'job_file_size' : job_file_size,
},
RequestContext(request))
def job_log_file_plain(request, pk):
job = get_restricted_job(request.user, pk)
- response = HttpResponse(job.log_file, mimetype='text/plain')
+ response = HttpResponse(job.output_file(), mimetype='text/plain')
response['Content-Disposition'] = "attachment; filename=job_%d.log"%job.id
return response
@@ -650,7 +646,7 @@
def job_log_incremental(request, pk):
start = int(request.GET.get('start', 0))
job = get_restricted_job(request.user, pk)
- log_file = job.log_file
+ log_file = job.output_file()
log_file.seek(start)
new_content = log_file.read()
m = getDispatcherLogMessages(StringIO.StringIO(new_content))
@@ -665,7 +661,7 @@
def job_full_log_incremental(request, pk):
start = int(request.GET.get('start', 0))
job = get_restricted_job(request.user, pk)
- log_file = job.log_file
+ log_file = job.output_file()
log_file.seek(start)
new_content = log_file.read()
nl_index = new_content.rfind('\n', -NEWLINE_SCAN_SIZE)
@@ -692,7 +688,7 @@
return HttpResponseBadRequest("invalid start")
count_present = 'count' in request.GET
job = get_restricted_job(request.user, pk)
- log_file = job.log_file
+ log_file = job.output_file()
log_file.seek(0, os.SEEK_END)
size = int(request.GET.get('count', log_file.tell()))
if size - start > LOG_CHUNK_SIZE and not count_present:
=== modified file 'lava_scheduler_daemon/board.py'
@@ -1,14 +1,12 @@
import json
import os
import signal
-import sys
import tempfile
import logging
from twisted.internet.error import ProcessDone, ProcessExitedAlready
from twisted.internet.protocol import ProcessProtocol
from twisted.internet import defer, task
-from twisted.protocols.basic import LineReceiver
def catchall_errback(logger):
@@ -18,51 +16,21 @@
failure.getTraceback())
return eb
-OOB_FD = 3
-
-
-class OOBDataProtocol(LineReceiver):
-
- delimiter = '\n'
-
- def __init__(self, source, board_name, _source_lock):
- self.logger = logging.getLogger(__name__ + '.OOBDataProtocol')
- self.source = source
- self.board_name = board_name
- self._source_lock = _source_lock
-
- def lineReceived(self, line):
- if ':' not in line:
- self.logger.error('malformed oob data: %r' % line)
- return
- key, value = line.split(':', 1)
- self._source_lock.run(
- self.source.jobOobData, self.board_name, key,
- value.lstrip()).addErrback(
- catchall_errback(self.logger))
class DispatcherProcessProtocol(ProcessProtocol):
-
- def __init__(self, deferred, log_file, job):
+ def __init__(self, deferred, job):
self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
self.deferred = deferred
- self.log_file = log_file
self.log_size = 0
self.job = job
- self.oob_data = OOBDataProtocol(
- job.source, job.board_name, job._source_lock)
def childDataReceived(self, childFD, data):
- if childFD == OOB_FD:
- self.oob_data.dataReceived(data)
- self.log_file.write(data)
self.log_size += len(data)
if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
if not self.job._killing:
self.job.cancel("exceeded log size limit")
- self.log_file.flush()
def childConnectionLost(self, childFD):
self.logger.info("childConnectionLost for %s: %s",
@@ -75,13 +43,11 @@
def processEnded(self, reason):
self.logger.info("processEnded for %s: %s",
self.job.board_name, reason.value)
- self.log_file.close()
self.deferred.callback(reason.value.exitCode)
class Job(object):
-
def __init__(self, job_data, dispatcher, source, board_name, reactor,
daemon_options):
self.job_data = job_data
@@ -97,7 +63,7 @@
self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL']
self._time_limit_call = None
self._killing = False
- self.job_log_file = None
+ self._kill_reason = ''
def _checkCancel(self):
if self._killing:
@@ -108,11 +74,11 @@
self._maybeCancel)
def cancel(self, reason=None):
- if not self._killing and reason is None:
- reason = "killing job for unknown reason"
if not self._killing:
+ if reason is None:
+ reason = "killing job for unknown reason"
+ self._kill_reason = reason
self.logger.info(reason)
- self.job_log_file.write("\n%s\n" % reason.upper())
self._killing = True
if self._signals:
signame = self._signals.pop(0)
@@ -137,23 +103,21 @@
self.cancel("killing job for exceeding timeout")
def run(self):
- d = self.source.getLogFileForJobOnBoard(self.board_name)
+ d = self.source.getOutputDirForJobOnBoard(self.board_name)
return d.addCallback(self._run).addErrback(
catchall_errback(self.logger))
- def _run(self, job_log_file):
+ def _run(self, output_dir):
d = defer.Deferred()
json_data = self.job_data
fd, self._json_file = tempfile.mkstemp()
with os.fdopen(fd, 'wb') as f:
json.dump(json_data, f)
- self._protocol = DispatcherProcessProtocol(
- d, job_log_file, self)
- self.job_log_file = job_log_file
+ self._protocol = DispatcherProcessProtocol(d, self)
self.reactor.spawnProcess(
self._protocol, self.dispatcher, args=[
- self.dispatcher, self._json_file, '--oob-fd', str(OOB_FD)],
- childFDs={0:0, 1:'r', 2:'r', OOB_FD:'r'}, env=None)
+ self.dispatcher, self._json_file, '--output-dir', output_dir],
+ childFDs={0:0, 1:'r', 2:'r'}, env=None)
self._checkCancel_call.start(10)
timeout = max(
json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT'])
@@ -171,8 +135,11 @@
self._time_limit_call.cancel()
self._checkCancel_call.stop()
return self._source_lock.run(
- self.source.jobCompleted, self.board_name, exit_code).addCallback(
- lambda r:exit_code)
+ self.source.jobCompleted,
+ self.board_name,
+ exit_code,
+ self._killing).addCallback(
+ lambda r:exit_code)
class SchedulerMonitorPP(ProcessProtocol):
=== modified file 'lava_scheduler_daemon/dbjobsource.py'
@@ -1,10 +1,10 @@
import datetime
import logging
import os
+import shutil
import urlparse
from dashboard_app.models import Bundle
-import django.core.exceptions
from django.contrib.auth.models import User
from django.core.files.base import ContentFile
@@ -198,6 +198,7 @@
job.start_time = datetime.datetime.utcnow()
job.actual_device = device
device.status = Device.RUNNING
+ shutil.rmtree(job.output_dir, ignore_errors=True)
device.current_job = job
try:
# The unique constraint on current_job may cause this to
@@ -229,18 +230,15 @@
def getJobForBoard(self, board_name):
return self.deferForDB(self.getJobForBoard_impl, board_name)
- def getLogFileForJobOnBoard_impl(self, board_name):
+ def getOutputDirForJobOnBoard_impl(self, board_name):
device = Device.objects.get(hostname=board_name)
job = device.current_job
- log_file = job.log_file
- log_file.file.close()
- log_file.open('wb')
- return log_file
-
- def getLogFileForJobOnBoard(self, board_name):
- return self.deferForDB(self.getLogFileForJobOnBoard_impl, board_name)
-
- def jobCompleted_impl(self, board_name, exit_code):
+ return job.output_dir
+
+ def getOutputDirForJobOnBoard(self, board_name):
+ return self.deferForDB(self.getOutputDirForJobOnBoard_impl, board_name)
+
+ def jobCompleted_impl(self, board_name, exit_code, kill_reason):
self.logger.debug('marking job as complete on %s', board_name)
device = Device.objects.get(hostname=board_name)
old_device_status = device.status
@@ -279,6 +277,19 @@
elif job.status == TestJob.COMPLETE:
device.health_status = Device.HEALTH_PASS
+ bundle_file = os.path.join(job.output_dir, 'result-bundle')
+ if os.path.exists(bundle_file):
+ with open(bundle_file) as f:
+ results_link = f.read().strip()
+ job._results_link = results_link
+ sha1 = results_link.strip('/').split('/')[-1]
+ try:
+ bundle = Bundle.objects.get(content_sha1=sha1)
+ except Bundle.DoesNotExist:
+ pass
+ else:
+ job._results_bundle = bundle
+
job.end_time = datetime.datetime.utcnow()
token = job.submit_token
job.submit_token = None
@@ -294,27 +305,8 @@
'sending job summary mails for job %r failed', job.pk)
transaction.commit()
- def jobCompleted(self, board_name, exit_code):
- return self.deferForDB(self.jobCompleted_impl, board_name, exit_code)
-
- def jobOobData_impl(self, board_name, key, value):
- self.logger.info(
- "oob data received for %s: %s: %s", board_name, key, value)
- if key == 'dashboard-put-result':
- device = Device.objects.get(hostname=board_name)
- device.current_job._results_link = value
- sha1 = value.strip('/').split('/')[-1]
- try:
- bundle = Bundle.objects.get(content_sha1=sha1)
- except Bundle.DoesNotExist:
- pass
- else:
- device.current_job._results_bundle = bundle
- device.current_job.save()
- transaction.commit()
-
- def jobOobData(self, board_name, key, value):
- return self.deferForDB(self.jobOobData_impl, board_name, key, value)
+ def jobCompleted(self, board_name, exit_code, kill_reason):
+ return self.deferForDB(self.jobCompleted_impl, board_name, exit_code, kill_reason)
def jobCheckForCancellation_impl(self, board_name):
device = Device.objects.get(hostname=board_name)