From patchwork Wed Jan 30 01:32:11 2013 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Michael-Doyle Hudson X-Patchwork-Id: 14346 Return-Path: X-Original-To: patchwork@peony.canonical.com Delivered-To: patchwork@peony.canonical.com Received: from fiordland.canonical.com (fiordland.canonical.com [91.189.94.145]) by peony.canonical.com (Postfix) with ESMTP id 8608023E00 for ; Wed, 30 Jan 2013 01:32:22 +0000 (UTC) Received: from mail-vc0-f176.google.com (mail-vc0-f176.google.com [209.85.220.176]) by fiordland.canonical.com (Postfix) with ESMTP id C28F7A18C52 for ; Wed, 30 Jan 2013 01:32:21 +0000 (UTC) Received: by mail-vc0-f176.google.com with SMTP id fy27so708884vcb.21 for ; Tue, 29 Jan 2013 17:32:21 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=x-received:x-forwarded-to:x-forwarded-for:delivered-to:x-received :received-spf:content-type:mime-version:x-launchpad-project :x-launchpad-branch:x-launchpad-message-rationale :x-launchpad-branch-revision-number:x-launchpad-notification-type:to :from:subject:message-id:date:reply-to:sender:errors-to:precedence :x-generated-by:x-launchpad-hash:x-gm-message-state; bh=SwWupz8DOwPRVw4rzfk4+J/tOoP41id0p3NxVUfM02M=; b=cKC7zEMFMUlYEvdaKyVortD6rn5KmO+lju0IUjqnNW2i5d0iPrkgIaj6g6SVTpPOeV nAkEv07uFMr/jLBsk6cROiqJCBNvVviBh1nZilkv+q7EwHEfZeMwoXx8O29mxMn2TTHy WBLoGOKk7Mbn7BbcFsW1gBc99gXtIfPrLjOd3zc4iE9usCEO/GF3FMpcr+QV8MALlkS+ HP+YGFatsyOfoCtWtZ44vrPFtl7cKybb31VfD3wFbi8Sjf3CIdEbWIYBeMYnPJplGdXf tJyGY3K9ysQqzK0fjdXiC0HlLtoFjEmdlkjAeejWS7gg+k8kVPWrmcOgDovXVC9Pc2Wv GZvQ== X-Received: by 10.52.70.205 with SMTP id o13mr2941834vdu.75.1359509541121; Tue, 29 Jan 2013 17:32:21 -0800 (PST) X-Forwarded-To: linaro-patchwork@canonical.com X-Forwarded-For: patch@linaro.org linaro-patchwork@canonical.com Delivered-To: patches@linaro.org Received: by 10.58.145.101 with SMTP id st5csp165462veb; Tue, 29 Jan 2013 17:32:20 -0800 (PST) X-Received: by 10.180.99.72 with SMTP id eo8mr5750447wib.34.1359509539619; Tue, 29 Jan 2013 17:32:19 -0800 (PST) Received: from indium.canonical.com (indium.canonical.com. [91.189.90.7]) by mx.google.com with ESMTPS id j5si5167461wjr.218.2013.01.29.17.32.11 (version=TLSv1 cipher=RC4-SHA bits=128/128); Tue, 29 Jan 2013 17:32:19 -0800 (PST) Received-SPF: pass (google.com: best guess record for domain of bounces@canonical.com designates 91.189.90.7 as permitted sender) client-ip=91.189.90.7; Authentication-Results: mx.google.com; spf=pass (google.com: best guess record for domain of bounces@canonical.com designates 91.189.90.7 as permitted sender) smtp.mail=bounces@canonical.com Received: from ackee.canonical.com ([91.189.89.26]) by indium.canonical.com with esmtp (Exim 4.71 #1 (Debian)) id 1U0MX5-0003bk-RG for ; Wed, 30 Jan 2013 01:32:11 +0000 Received: from ackee.canonical.com (localhost [127.0.0.1]) by ackee.canonical.com (Postfix) with ESMTP id BF2C5E0415 for ; Wed, 30 Jan 2013 01:32:11 +0000 (UTC) MIME-Version: 1.0 X-Launchpad-Project: lava-scheduler X-Launchpad-Branch: ~linaro-validation/lava-scheduler/trunk X-Launchpad-Message-Rationale: Subscriber X-Launchpad-Branch-Revision-Number: 239 X-Launchpad-Notification-Type: branch-revision To: Linaro Patch Tracker From: noreply@launchpad.net Subject: [Branch ~linaro-validation/lava-scheduler/trunk] Rev 239: use the --output-dir flag of lava-dispatcher to get structured data out of it rather than --oob-fd Message-Id: <20130130013211.8348.27287.launchpad@ackee.canonical.com> Date: Wed, 30 Jan 2013 01:32:11 -0000 Reply-To: noreply@launchpad.net Sender: bounces@canonical.com Errors-To: bounces@canonical.com Precedence: bulk X-Generated-By: Launchpad (canonical.com); Revision="16455"; Instance="launchpad-lazr.conf" X-Launchpad-Hash: 4b2ac3f2751df6aebce159b377e9ca3790a8b7f6 X-Gm-Message-State: ALoCoQnyxXjSD+RWM4VSVh10LoWYGWkOvjNNdFUal62b4ZjSdxxRwmhfUqg2CS0B7AjwDu/cb+V3 Merge authors: Michael Hudson-Doyle (mwhudson) Related merge proposals: https://code.launchpad.net/~mwhudson/lava-scheduler/use-dispatcher-output-dir/+merge/144615 proposed by: Michael Hudson-Doyle (mwhudson) ------------------------------------------------------------ revno: 239 [merge] committer: Michael Hudson-Doyle branch nick: trunk timestamp: Wed 2013-01-30 14:31:16 +1300 message: use the --output-dir flag of lava-dispatcher to get structured data out of it rather than --oob-fd modified: fake-dispatcher lava_scheduler_app/models.py lava_scheduler_app/views.py lava_scheduler_daemon/board.py lava_scheduler_daemon/dbjobsource.py --- lp:lava-scheduler https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk You are subscribed to branch lp:lava-scheduler. To unsubscribe from this branch go to https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk/+edit-subscription === modified file 'fake-dispatcher' --- fake-dispatcher 2012-08-30 03:58:19 +0000 +++ fake-dispatcher 2013-01-15 02:23:49 +0000 @@ -15,4 +15,3 @@ echo p $i sleep 1 done -echo dashboard-put-result: http://disney.com >&3 === modified file 'lava_scheduler_app/models.py' --- lava_scheduler_app/models.py 2013-01-15 20:53:29 +0000 +++ lava_scheduler_app/models.py 2013-01-30 01:31:16 +0000 @@ -1,4 +1,5 @@ import logging +import os import simplejson import urlparse @@ -326,6 +327,25 @@ log_file = models.FileField( upload_to='lava-logs', default=None, null=True, blank=True) + @property + def output_dir(self): + return os.path.join(settings.MEDIA_ROOT, 'job-output', 'job-%s' % self.id) + + def output_file(self): + output_path = os.path.join(self.output_dir, 'output.txt') + if os.path.exists(output_path): + return open(output_path) + elif self.log_file: + log_file = self.log_file + if log_file: + try: + log_file.open() + except IOError: + log_file = None + return log_file + else: + return None + failure_tags = models.ManyToManyField( JobFailureTag, blank=True, related_name='failure_tags') failure_comment = models.TextField(null=True, blank=True) === modified file 'lava_scheduler_app/views.py' --- lava_scheduler_app/views.py 2013-01-15 17:44:36 +0000 +++ lava_scheduler_app/views.py 2013-01-30 01:31:16 +0000 @@ -567,12 +567,7 @@ 'show_reload_page': job.status <= TestJob.RUNNING, } - log_file = job.log_file - if log_file: - try: - log_file.open() - except IOError: - log_file = None + log_file = job.output_file() if log_file: job_errors = getDispatcherErrors(log_file) @@ -584,13 +579,16 @@ for level, msg, _ in job_log_messages: levels[level] += 1 levels = sorted(levels.items(), key=lambda (k,v):logging._levelNames.get(k)) + with job.output_file() as f: + f.seek(0, 2) + job_file_size = f.tell() data.update({ 'job_file_present': True, 'job_errors' : job_errors, 'job_has_error' : len(job_errors) > 0, 'job_log_messages' : job_log_messages, 'levels': levels, - 'job_file_size' : log_file.size, + 'job_file_size' : job_file_size, }) else: data.update({ @@ -603,12 +601,7 @@ def job_definition(request, pk): job = get_restricted_job(request.user, pk) - log_file = job.log_file - if log_file: - try: - log_file.open() - except IOError: - log_file = None + log_file = job.output_file() return render_to_response( "lava_scheduler_app/job_definition.html", { @@ -628,21 +621,24 @@ @BreadCrumb("Complete log", parent=job_detail, needs=['pk']) def job_log_file(request, pk): job = get_restricted_job(request.user, pk) - content = formatLogFile(job.log_file) + content = formatLogFile(job.output_file()) + with job.output_file() as f: + f.seek(0, 2) + job_file_size = f.tell() return render_to_response( "lava_scheduler_app/job_log_file.html", { 'job': TestJob.objects.get(pk=pk), - 'job_file_present': bool(job.log_file), + 'job_file_present': bool(job.output_file()), 'sections' : content, - 'job_file_size' : job.log_file.size, + 'job_file_size' : job_file_size, }, RequestContext(request)) def job_log_file_plain(request, pk): job = get_restricted_job(request.user, pk) - response = HttpResponse(job.log_file, mimetype='text/plain') + response = HttpResponse(job.output_file(), mimetype='text/plain') response['Content-Disposition'] = "attachment; filename=job_%d.log"%job.id return response @@ -650,7 +646,7 @@ def job_log_incremental(request, pk): start = int(request.GET.get('start', 0)) job = get_restricted_job(request.user, pk) - log_file = job.log_file + log_file = job.output_file() log_file.seek(start) new_content = log_file.read() m = getDispatcherLogMessages(StringIO.StringIO(new_content)) @@ -665,7 +661,7 @@ def job_full_log_incremental(request, pk): start = int(request.GET.get('start', 0)) job = get_restricted_job(request.user, pk) - log_file = job.log_file + log_file = job.output_file() log_file.seek(start) new_content = log_file.read() nl_index = new_content.rfind('\n', -NEWLINE_SCAN_SIZE) @@ -692,7 +688,7 @@ return HttpResponseBadRequest("invalid start") count_present = 'count' in request.GET job = get_restricted_job(request.user, pk) - log_file = job.log_file + log_file = job.output_file() log_file.seek(0, os.SEEK_END) size = int(request.GET.get('count', log_file.tell())) if size - start > LOG_CHUNK_SIZE and not count_present: === modified file 'lava_scheduler_daemon/board.py' --- lava_scheduler_daemon/board.py 2012-12-03 05:03:38 +0000 +++ lava_scheduler_daemon/board.py 2013-01-24 00:02:29 +0000 @@ -1,14 +1,12 @@ import json import os import signal -import sys import tempfile import logging from twisted.internet.error import ProcessDone, ProcessExitedAlready from twisted.internet.protocol import ProcessProtocol from twisted.internet import defer, task -from twisted.protocols.basic import LineReceiver def catchall_errback(logger): @@ -18,51 +16,21 @@ failure.getTraceback()) return eb -OOB_FD = 3 - - -class OOBDataProtocol(LineReceiver): - - delimiter = '\n' - - def __init__(self, source, board_name, _source_lock): - self.logger = logging.getLogger(__name__ + '.OOBDataProtocol') - self.source = source - self.board_name = board_name - self._source_lock = _source_lock - - def lineReceived(self, line): - if ':' not in line: - self.logger.error('malformed oob data: %r' % line) - return - key, value = line.split(':', 1) - self._source_lock.run( - self.source.jobOobData, self.board_name, key, - value.lstrip()).addErrback( - catchall_errback(self.logger)) class DispatcherProcessProtocol(ProcessProtocol): - - def __init__(self, deferred, log_file, job): + def __init__(self, deferred, job): self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol') self.deferred = deferred - self.log_file = log_file self.log_size = 0 self.job = job - self.oob_data = OOBDataProtocol( - job.source, job.board_name, job._source_lock) def childDataReceived(self, childFD, data): - if childFD == OOB_FD: - self.oob_data.dataReceived(data) - self.log_file.write(data) self.log_size += len(data) if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']: if not self.job._killing: self.job.cancel("exceeded log size limit") - self.log_file.flush() def childConnectionLost(self, childFD): self.logger.info("childConnectionLost for %s: %s", @@ -75,13 +43,11 @@ def processEnded(self, reason): self.logger.info("processEnded for %s: %s", self.job.board_name, reason.value) - self.log_file.close() self.deferred.callback(reason.value.exitCode) class Job(object): - def __init__(self, job_data, dispatcher, source, board_name, reactor, daemon_options): self.job_data = job_data @@ -97,7 +63,7 @@ self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL'] self._time_limit_call = None self._killing = False - self.job_log_file = None + self._kill_reason = '' def _checkCancel(self): if self._killing: @@ -108,11 +74,11 @@ self._maybeCancel) def cancel(self, reason=None): - if not self._killing and reason is None: - reason = "killing job for unknown reason" if not self._killing: + if reason is None: + reason = "killing job for unknown reason" + self._kill_reason = reason self.logger.info(reason) - self.job_log_file.write("\n%s\n" % reason.upper()) self._killing = True if self._signals: signame = self._signals.pop(0) @@ -137,23 +103,21 @@ self.cancel("killing job for exceeding timeout") def run(self): - d = self.source.getLogFileForJobOnBoard(self.board_name) + d = self.source.getOutputDirForJobOnBoard(self.board_name) return d.addCallback(self._run).addErrback( catchall_errback(self.logger)) - def _run(self, job_log_file): + def _run(self, output_dir): d = defer.Deferred() json_data = self.job_data fd, self._json_file = tempfile.mkstemp() with os.fdopen(fd, 'wb') as f: json.dump(json_data, f) - self._protocol = DispatcherProcessProtocol( - d, job_log_file, self) - self.job_log_file = job_log_file + self._protocol = DispatcherProcessProtocol(d, self) self.reactor.spawnProcess( self._protocol, self.dispatcher, args=[ - self.dispatcher, self._json_file, '--oob-fd', str(OOB_FD)], - childFDs={0:0, 1:'r', 2:'r', OOB_FD:'r'}, env=None) + self.dispatcher, self._json_file, '--output-dir', output_dir], + childFDs={0:0, 1:'r', 2:'r'}, env=None) self._checkCancel_call.start(10) timeout = max( json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT']) @@ -171,8 +135,11 @@ self._time_limit_call.cancel() self._checkCancel_call.stop() return self._source_lock.run( - self.source.jobCompleted, self.board_name, exit_code).addCallback( - lambda r:exit_code) + self.source.jobCompleted, + self.board_name, + exit_code, + self._killing).addCallback( + lambda r:exit_code) class SchedulerMonitorPP(ProcessProtocol): === modified file 'lava_scheduler_daemon/dbjobsource.py' --- lava_scheduler_daemon/dbjobsource.py 2012-12-03 05:03:38 +0000 +++ lava_scheduler_daemon/dbjobsource.py 2013-01-24 00:19:27 +0000 @@ -1,10 +1,10 @@ import datetime import logging import os +import shutil import urlparse from dashboard_app.models import Bundle -import django.core.exceptions from django.contrib.auth.models import User from django.core.files.base import ContentFile @@ -198,6 +198,7 @@ job.start_time = datetime.datetime.utcnow() job.actual_device = device device.status = Device.RUNNING + shutil.rmtree(job.output_dir, ignore_errors=True) device.current_job = job try: # The unique constraint on current_job may cause this to @@ -229,18 +230,15 @@ def getJobForBoard(self, board_name): return self.deferForDB(self.getJobForBoard_impl, board_name) - def getLogFileForJobOnBoard_impl(self, board_name): + def getOutputDirForJobOnBoard_impl(self, board_name): device = Device.objects.get(hostname=board_name) job = device.current_job - log_file = job.log_file - log_file.file.close() - log_file.open('wb') - return log_file - - def getLogFileForJobOnBoard(self, board_name): - return self.deferForDB(self.getLogFileForJobOnBoard_impl, board_name) - - def jobCompleted_impl(self, board_name, exit_code): + return job.output_dir + + def getOutputDirForJobOnBoard(self, board_name): + return self.deferForDB(self.getOutputDirForJobOnBoard_impl, board_name) + + def jobCompleted_impl(self, board_name, exit_code, kill_reason): self.logger.debug('marking job as complete on %s', board_name) device = Device.objects.get(hostname=board_name) old_device_status = device.status @@ -279,6 +277,19 @@ elif job.status == TestJob.COMPLETE: device.health_status = Device.HEALTH_PASS + bundle_file = os.path.join(job.output_dir, 'result-bundle') + if os.path.exists(bundle_file): + with open(bundle_file) as f: + results_link = f.read().strip() + job._results_link = results_link + sha1 = results_link.strip('/').split('/')[-1] + try: + bundle = Bundle.objects.get(content_sha1=sha1) + except Bundle.DoesNotExist: + pass + else: + job._results_bundle = bundle + job.end_time = datetime.datetime.utcnow() token = job.submit_token job.submit_token = None @@ -294,27 +305,8 @@ 'sending job summary mails for job %r failed', job.pk) transaction.commit() - def jobCompleted(self, board_name, exit_code): - return self.deferForDB(self.jobCompleted_impl, board_name, exit_code) - - def jobOobData_impl(self, board_name, key, value): - self.logger.info( - "oob data received for %s: %s: %s", board_name, key, value) - if key == 'dashboard-put-result': - device = Device.objects.get(hostname=board_name) - device.current_job._results_link = value - sha1 = value.strip('/').split('/')[-1] - try: - bundle = Bundle.objects.get(content_sha1=sha1) - except Bundle.DoesNotExist: - pass - else: - device.current_job._results_bundle = bundle - device.current_job.save() - transaction.commit() - - def jobOobData(self, board_name, key, value): - return self.deferForDB(self.jobOobData_impl, board_name, key, value) + def jobCompleted(self, board_name, exit_code, kill_reason): + return self.deferForDB(self.jobCompleted_impl, board_name, exit_code, kill_reason) def jobCheckForCancellation_impl(self, board_name): device = Device.objects.get(hostname=board_name)