[Branch,~linaro-validation/lava-scheduler/trunk] Rev 196: Changes required for the scheduler to properly run jobs via celery remotely

Message ID 20120711002411.1145.77691.launchpad@ackee.canonical.com
State Accepted
Headers show

Commit Message

Andy Doan July 11, 2012, 12:24 a.m.
Merge authors:
  Andy Doan (doanac)
Related merge proposals:
  https://code.launchpad.net/~doanac/lava-scheduler/celery-support/+merge/114092
  proposed by: Andy Doan (doanac)
  review: Approve - Michael Hudson-Doyle (mwhudson)
------------------------------------------------------------
revno: 196 [merge]
committer: Andy Doan <andy.doan@linaro.org>
branch nick: lava-scheduler
timestamp: Tue 2012-07-10 19:22:46 -0500
message:
  Changes required for the scheduler to properly run jobs via celery remotely
added:
  lava_scheduler_app/migrations/0023_auto__add_field_devicetype_use_celery.py
modified:
  lava_scheduler_app/management/commands/schedulermonitor.py
  lava_scheduler_app/models.py
  lava_scheduler_daemon/board.py
  lava_scheduler_daemon/dbjobsource.py
  lava_scheduler_daemon/service.py


--
lp:lava-scheduler
https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk

You are subscribed to branch lp:lava-scheduler.
To unsubscribe from this branch go to https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk/+edit-subscription

Patch

=== modified file 'lava_scheduler_app/management/commands/schedulermonitor.py'
--- lava_scheduler_app/management/commands/schedulermonitor.py	2012-03-27 05:41:09 +0000
+++ lava_scheduler_app/management/commands/schedulermonitor.py	2012-07-09 02:56:07 +0000
@@ -16,6 +16,7 @@ 
 # You should have received a copy of the GNU Affero General Public License
 # along with LAVA Scheduler.  If not, see <http://www.gnu.org/licenses/>.
 
+import os
 import simplejson
 
 
@@ -35,9 +36,13 @@ 
         daemon_options = self._configure(options)
         source = DatabaseJobSource()
         dispatcher, board_name, json_file = args
+
+        log_to_stdout = os.getenv("CELERY_CONFIG_MODULE", False)
+
         job = Job(
             simplejson.load(open(json_file)), dispatcher,
-            source, board_name, reactor, daemon_options=daemon_options)
+            source, board_name, reactor, daemon_options=daemon_options,
+            log_to_stdout=log_to_stdout)
         def run():
             job.run().addCallback(lambda result: reactor.stop())
         reactor.callWhenRunning(run)

=== added file 'lava_scheduler_app/migrations/0023_auto__add_field_devicetype_use_celery.py'
--- lava_scheduler_app/migrations/0023_auto__add_field_devicetype_use_celery.py	1970-01-01 00:00:00 +0000
+++ lava_scheduler_app/migrations/0023_auto__add_field_devicetype_use_celery.py	2012-07-08 16:48:10 +0000
@@ -0,0 +1,149 @@ 
+# -*- coding: utf-8 -*-
+import datetime
+from south.db import db
+from south.v2 import SchemaMigration
+from django.db import models
+
+
+class Migration(SchemaMigration):
+
+    def forwards(self, orm):
+        # Adding field 'DeviceType.use_celery'
+        db.add_column('lava_scheduler_app_devicetype', 'use_celery',
+                      self.gf('django.db.models.fields.BooleanField')(default=False),
+                      keep_default=False)
+
+
+    def backwards(self, orm):
+        # Deleting field 'DeviceType.use_celery'
+        db.delete_column('lava_scheduler_app_devicetype', 'use_celery')
+
+
+    models = {
+        'auth.group': {
+            'Meta': {'object_name': 'Group'},
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '80'}),
+            'permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'})
+        },
+        'auth.permission': {
+            'Meta': {'ordering': "('content_type__app_label', 'content_type__model', 'codename')", 'unique_together': "(('content_type', 'codename'),)", 'object_name': 'Permission'},
+            'codename': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+            'content_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['contenttypes.ContentType']"}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
+        },
+        'auth.user': {
+            'Meta': {'object_name': 'User'},
+            'date_joined': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
+            'email': ('django.db.models.fields.EmailField', [], {'max_length': '75', 'blank': 'True'}),
+            'first_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
+            'groups': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Group']", 'symmetrical': 'False', 'blank': 'True'}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'is_active': ('django.db.models.fields.BooleanField', [], {'default': 'True'}),
+            'is_staff': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+            'is_superuser': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+            'last_login': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.now'}),
+            'last_name': ('django.db.models.fields.CharField', [], {'max_length': '30', 'blank': 'True'}),
+            'password': ('django.db.models.fields.CharField', [], {'max_length': '128'}),
+            'user_permissions': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['auth.Permission']", 'symmetrical': 'False', 'blank': 'True'}),
+            'username': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '30'})
+        },
+        'contenttypes.contenttype': {
+            'Meta': {'ordering': "('name',)", 'unique_together': "(('app_label', 'model'),)", 'object_name': 'ContentType', 'db_table': "'django_content_type'"},
+            'app_label': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'model': ('django.db.models.fields.CharField', [], {'max_length': '100'}),
+            'name': ('django.db.models.fields.CharField', [], {'max_length': '100'})
+        },
+        'dashboard_app.bundle': {
+            'Meta': {'ordering': "['-uploaded_on']", 'object_name': 'Bundle'},
+            '_gz_content': ('django.db.models.fields.files.FileField', [], {'max_length': '100', 'null': 'True', 'db_column': "'gz_content'"}),
+            '_raw_content': ('django.db.models.fields.files.FileField', [], {'max_length': '100', 'null': 'True', 'db_column': "'content'"}),
+            'bundle_stream': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'bundles'", 'to': "orm['dashboard_app.BundleStream']"}),
+            'content_filename': ('django.db.models.fields.CharField', [], {'max_length': '256'}),
+            'content_sha1': ('django.db.models.fields.CharField', [], {'max_length': '40', 'unique': 'True', 'null': 'True'}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'is_deserialized': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+            'uploaded_by': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'uploaded_bundles'", 'null': 'True', 'to': "orm['auth.User']"}),
+            'uploaded_on': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime.utcnow'})
+        },
+        'dashboard_app.bundlestream': {
+            'Meta': {'object_name': 'BundleStream'},
+            'group': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.Group']", 'null': 'True', 'blank': 'True'}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'is_anonymous': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+            'is_public': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+            'name': ('django.db.models.fields.CharField', [], {'max_length': '64', 'blank': 'True'}),
+            'pathname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '128'}),
+            'slug': ('django.db.models.fields.CharField', [], {'max_length': '64', 'blank': 'True'}),
+            'user': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'blank': 'True'})
+        },
+        'lava_scheduler_app.device': {
+            'Meta': {'object_name': 'Device'},
+            'current_job': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'+'", 'unique': 'True', 'null': 'True', 'to': "orm['lava_scheduler_app.TestJob']"}),
+            'device_type': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['lava_scheduler_app.DeviceType']"}),
+            'health_status': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
+            'hostname': ('django.db.models.fields.CharField', [], {'max_length': '200', 'primary_key': 'True'}),
+            'last_health_report_job': ('django.db.models.fields.related.ForeignKey', [], {'blank': 'True', 'related_name': "'+'", 'unique': 'True', 'null': 'True', 'to': "orm['lava_scheduler_app.TestJob']"}),
+            'status': ('django.db.models.fields.IntegerField', [], {'default': '1'}),
+            'tags': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['lava_scheduler_app.Tag']", 'symmetrical': 'False', 'blank': 'True'})
+        },
+        'lava_scheduler_app.devicestatetransition': {
+            'Meta': {'object_name': 'DeviceStateTransition'},
+            'created_by': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'blank': 'True'}),
+            'created_on': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
+            'device': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'transitions'", 'to': "orm['lava_scheduler_app.Device']"}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'job': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['lava_scheduler_app.TestJob']", 'null': 'True', 'blank': 'True'}),
+            'message': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+            'new_state': ('django.db.models.fields.IntegerField', [], {}),
+            'old_state': ('django.db.models.fields.IntegerField', [], {})
+        },
+        'lava_scheduler_app.devicetype': {
+            'Meta': {'object_name': 'DeviceType'},
+            'health_check_job': ('django.db.models.fields.TextField', [], {'default': 'None', 'null': 'True', 'blank': 'True'}),
+            'name': ('django.db.models.fields.SlugField', [], {'max_length': '50', 'primary_key': 'True'}),
+            'use_celery': ('django.db.models.fields.BooleanField', [], {'default': 'False'})
+        },
+        'lava_scheduler_app.tag': {
+            'Meta': {'object_name': 'Tag'},
+            'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'name': ('django.db.models.fields.SlugField', [], {'unique': 'True', 'max_length': '50'})
+        },
+        'lava_scheduler_app.testjob': {
+            'Meta': {'object_name': 'TestJob'},
+            '_results_bundle': ('django.db.models.fields.related.OneToOneField', [], {'to': "orm['dashboard_app.Bundle']", 'unique': 'True', 'null': 'True', 'db_column': "'results_bundle_id'", 'blank': 'True'}),
+            '_results_link': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '400', 'null': 'True', 'db_column': "'results_link'", 'blank': 'True'}),
+            'actual_device': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.Device']"}),
+            'definition': ('django.db.models.fields.TextField', [], {}),
+            'description': ('django.db.models.fields.CharField', [], {'default': 'None', 'max_length': '200', 'null': 'True', 'blank': 'True'}),
+            'end_time': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
+            'group': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.Group']", 'null': 'True', 'blank': 'True'}),
+            'health_check': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'is_public': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
+            'log_file': ('django.db.models.fields.files.FileField', [], {'default': 'None', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
+            'requested_device': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.Device']"}),
+            'requested_device_type': ('django.db.models.fields.related.ForeignKey', [], {'default': 'None', 'related_name': "'+'", 'null': 'True', 'blank': 'True', 'to': "orm['lava_scheduler_app.DeviceType']"}),
+            'start_time': ('django.db.models.fields.DateTimeField', [], {'null': 'True', 'blank': 'True'}),
+            'status': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
+            'submit_time': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
+            'submit_token': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['linaro_django_xmlrpc.AuthToken']", 'null': 'True', 'blank': 'True'}),
+            'submitter': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'to': "orm['auth.User']"}),
+            'tags': ('django.db.models.fields.related.ManyToManyField', [], {'to': "orm['lava_scheduler_app.Tag']", 'symmetrical': 'False', 'blank': 'True'}),
+            'user': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['auth.User']", 'null': 'True', 'blank': 'True'})
+        },
+        'linaro_django_xmlrpc.authtoken': {
+            'Meta': {'object_name': 'AuthToken'},
+            'created_on': ('django.db.models.fields.DateTimeField', [], {'auto_now': 'True', 'blank': 'True'}),
+            'description': ('django.db.models.fields.TextField', [], {'default': "''", 'blank': 'True'}),
+            'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
+            'last_used_on': ('django.db.models.fields.DateTimeField', [], {'null': 'True'}),
+            'secret': ('django.db.models.fields.CharField', [], {'default': "'fxxmj9mnu0ox1a771w9c53gxjh07mnlc5bj5gthygd6jraelz0wpac744ls1ucrz0dyb5s9sbrojbk00lcw7tx4iczlu3le1qi63aejomaiuc4bnr7e3uhesli16em4r'", 'unique': 'True', 'max_length': '128'}),
+            'user': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'auth_tokens'", 'to': "orm['auth.User']"})
+        }
+    }
+
+    complete_apps = ['lava_scheduler_app']
\ No newline at end of file

=== modified file 'lava_scheduler_app/models.py'
--- lava_scheduler_app/models.py	2012-06-29 03:26:14 +0000
+++ lava_scheduler_app/models.py	2012-07-08 16:48:10 +0000
@@ -55,6 +55,10 @@ 
     health_check_job = models.TextField(
         null=True, blank=True, default=None, validators=[validate_job_json])
 
+    use_celery = models.BooleanField(default=False,
+        help_text=("Denotes the job should be run via the celery "\
+            "schedulermonitor rather than the local one"))
+
     @models.permalink
     def get_absolute_url(self):
         return ("lava.scheduler.device_type.detail", [self.pk])
@@ -128,6 +132,9 @@ 
     def get_device_health_url(self):
         return ("lava.scheduler.labhealth.detail", [self.pk])
 
+    def use_celery(self):
+        return self.device_type.use_celery
+
     def recent_jobs(self):
         return TestJob.objects.select_related(
             "actual_device",

=== modified file 'lava_scheduler_daemon/board.py'
--- lava_scheduler_daemon/board.py	2012-03-29 04:56:44 +0000
+++ lava_scheduler_daemon/board.py	2012-07-10 22:45:19 +0000
@@ -1,6 +1,7 @@ 
 import json
 import os
 import signal
+import sys
 import tempfile
 import logging
 
@@ -48,6 +49,7 @@ 
         self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
         self.deferred = deferred
         self.log_file = log_file
+        self.log_size = 0
         self.job = job
         self.oob_data = OOBDataProtocol(
             job.source, job.board_name, job._source_lock)
@@ -56,7 +58,8 @@ 
         if childFD == OOB_FD:
             self.oob_data.dataReceived(data)
         self.log_file.write(data)
-        if self.log_file.tell() > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
+        self.log_size += len(data)
+        if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
             if not self.job._killing:
                 self.job.cancel("exceeded log size limit")
         self.log_file.flush()
@@ -70,7 +73,7 @@ 
 
 
     def __init__(self, job_data, dispatcher, source, board_name, reactor,
-                 daemon_options):
+                 daemon_options, log_to_stdout=False):
         self.job_data = job_data
         self.dispatcher = dispatcher
         self.source = source
@@ -85,6 +88,7 @@ 
         self._time_limit_call = None
         self._killing = False
         self.job_log_file = None
+        self._log_to_stdout = log_to_stdout
 
     def _checkCancel(self):
         if self._killing:
@@ -124,6 +128,9 @@ 
         self.cancel("killing job for exceeding timeout")
 
     def run(self):
+        if self._log_to_stdout:
+            return self._run(sys.stdout)
+
         d = self.source.getLogFileForJobOnBoard(self.board_name)
         return d.addCallback(self._run).addErrback(
             catchall_errback(self.logger))
@@ -174,7 +181,7 @@ 
 
 
     def __init__(self, job_data, dispatcher, source, board_name, reactor,
-                 daemon_options):
+                 daemon_options, use_celery=False):
         self.logger = logging.getLogger(__name__ + '.MonitorJob')
         self.job_data = job_data
         self.dispatcher = dispatcher
@@ -182,6 +189,7 @@ 
         self.board_name = board_name
         self.reactor = reactor
         self.daemon_options = daemon_options
+        self.use_celery = use_celery
         self._json_file = None
 
     def run(self):
@@ -190,12 +198,18 @@ 
         fd, self._json_file = tempfile.mkstemp()
         with os.fdopen(fd, 'wb') as f:
             json.dump(json_data, f)
-        args = [
-            'setsid', 'lava-server', 'manage', 'schedulermonitor',
-            self.dispatcher, str(self.board_name), self._json_file,
-            '-l', self.daemon_options['LOG_LEVEL']]
-        if self.daemon_options['LOG_FILE_PATH']:
-            args.extend(['-f', self.daemon_options['LOG_FILE_PATH']])
+
+        if self.use_celery:
+            args = [
+                'setsid', 'lava', 'celery-schedulermonitor',
+                self.dispatcher, str(self.board_name), self._json_file]
+        else:
+            args = [
+                'setsid', 'lava-server', 'manage', 'schedulermonitor',
+                self.dispatcher, str(self.board_name), self._json_file,
+                '-l', self.daemon_options['LOG_LEVEL']]
+            if self.daemon_options['LOG_FILE_PATH']:
+                args.extend(['-f', self.daemon_options['LOG_FILE_PATH']])
         self.logger.info('executing "%s"', ' '.join(args))
         self.reactor.spawnProcess(
             SimplePP(d), 'setsid', childFDs={0:0, 1:1, 2:2},
@@ -259,7 +273,8 @@ 
 
     job_cls = MonitorJob
 
-    def __init__(self, source, board_name, dispatcher, reactor, daemon_options, job_cls=None):
+    def __init__(self, source, board_name, dispatcher, reactor, daemon_options,
+                use_celery=False, job_cls=None):
         self.source = source
         self.board_name = board_name
         self.dispatcher = dispatcher
@@ -272,6 +287,7 @@ 
         self._stopping_deferreds = []
         self.logger = logging.getLogger(__name__ + '.Board.' + board_name)
         self.checking = False
+        self.use_celery = use_celery
 
     def _state_name(self):
         if self.running_job:
@@ -345,7 +361,7 @@ 
         self.logger.info("starting job %r", job_data)
         self.running_job = self.job_cls(
             job_data, self.dispatcher, self.source, self.board_name,
-            self.reactor, self.daemon_options)
+            self.reactor, self.daemon_options, self.use_celery)
         d = self.running_job.run()
         d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
 

=== modified file 'lava_scheduler_daemon/dbjobsource.py'
--- lava_scheduler_daemon/dbjobsource.py	2012-06-29 03:14:32 +0000
+++ lava_scheduler_daemon/dbjobsource.py	2012-07-08 16:48:10 +0000
@@ -88,7 +88,8 @@ 
         return self.deferToThread(wrapper, *args, **kw)
 
     def getBoardList_impl(self):
-        return [d.hostname for d in Device.objects.all()]
+        return [ {'hostname': d.hostname, 'use_celery': d.use_celery()}
+            for d in Device.objects.all()]
 
     def getBoardList(self):
         return self.deferForDB(self.getBoardList_impl)

=== modified file 'lava_scheduler_daemon/service.py'
--- lava_scheduler_daemon/service.py	2012-03-29 04:56:44 +0000
+++ lava_scheduler_daemon/service.py	2012-07-08 16:48:10 +0000
@@ -24,20 +24,28 @@ 
         return self.source.getBoardList().addCallback(
             self._cbUpdateBoards).addErrback(catchall_errback(self.logger))
 
-    def _cbUpdateBoards(self, board_names):
-        if set(board_names) == set(self.boards):
-            return
-        self.logger.info("New board list %s", board_names)
+    def _cbUpdateBoards(self, board_cfgs):
+        '''board_cfgs is an array of dicts {hostname=name, use_celery=...} '''
         new_boards = {}
-        for board_name in board_names:
-            if board_name in self.boards:
-                new_boards[board_name] = self.boards.pop(board_name)
+        for board_cfg in board_cfgs:
+            board_name = board_cfg['hostname']
+            use_celery = board_cfg['use_celery']
+
+            if board_cfg['hostname'] in self.boards:
+                board = self.boards.pop(board_name)
+                if use_celery != board.use_celery:
+                    board.use_celery = use_celery
+                    self.logger.info("use_celery changed for %s to '%s'" % \
+                        (board_name, use_celery))
+                new_boards[board_name] = board
             else:
+                self.logger.info("Adding board: %s" % board_name)
                 new_boards[board_name] = Board(
                     self.source, board_name, self.dispatcher, self.reactor,
-                    self.daemon_options)
+                    self.daemon_options, use_celery)
                 new_boards[board_name].start()
         for board in self.boards.values():
+            self.logger.info("Removing board: %s" % board.board_name)
             board.stop()
         self.boards = new_boards