diff mbox

[Branch,~linaro-validation/lava-scheduler/trunk] Rev 160: limit job run time and log file size in the monitor

Message ID 20120402002016.6872.57414.launchpad@ackee.canonical.com
State Accepted
Headers show

Commit Message

Michael-Doyle Hudson April 2, 2012, 12:20 a.m. UTC
Merge authors:
  Michael Hudson-Doyle (mwhudson)
Related merge proposals:
  https://code.launchpad.net/~mwhudson/lava-scheduler/limit-duration-log-size-kill-harder-bug-937443/+merge/99469
  proposed by: Michael Hudson-Doyle (mwhudson)
  review: Approve - Paul Larson (pwlars)
------------------------------------------------------------
revno: 160 [merge]
committer: Michael Hudson-Doyle <michael.hudson@linaro.org>
branch nick: trunk
timestamp: Mon 2012-04-02 12:17:56 +1200
message:
  limit job run time and log file size in the monitor
  use fiercer signals if needed to kill jobs
modified:
  doc/changes.rst
  fake-dispatcher
  lava_scheduler_app/extension.py
  lava_scheduler_app/management/commands/__init__.py
  lava_scheduler_app/management/commands/scheduler.py
  lava_scheduler_app/management/commands/schedulermonitor.py
  lava_scheduler_daemon/board.py
  lava_scheduler_daemon/service.py
  lava_scheduler_daemon/tests/test_board.py


--
lp:lava-scheduler
https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk

You are subscribed to branch lp:lava-scheduler.
To unsubscribe from this branch go to https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk/+edit-subscription
diff mbox

Patch

=== modified file 'doc/changes.rst'
--- doc/changes.rst	2012-03-23 01:41:47 +0000
+++ doc/changes.rst	2012-04-02 00:06:13 +0000
@@ -2,6 +2,16 @@ 
 ***************
 
 
+.. _version_0_12_1:
+
+Version 0.12.1
+==============
+
+* Enforce limits on how long jobs can run for and how large log files
+  can grow to in the scheduler monitor.
+* When killing off a process, escalate through SIGINT, SIGTERM,
+  SIGKILL signals.
+
 .. _version_0_12:
 
 Version 0.12

=== modified file 'fake-dispatcher'
--- fake-dispatcher	2012-03-19 04:32:52 +0000
+++ fake-dispatcher	2012-03-27 21:30:51 +0000
@@ -1,4 +1,14 @@ 
 #!/bin/sh
+
+# This is just a silly fake dispatcher script that can be used to test
+# the scheduler without requiring real hardware to be present.  To use
+# it, run "lava-scheduler manage scheduler --use-fake".  If you want
+# to test something in particular, just hack it as required...
+
+# This makes this script not exit when sent SIGINT, to test the
+# killing of jobs that do not die easily.
+trap "" 2
+
 echo starting processing $1
 echo error >&2
 for i in `seq 100`; do

=== modified file 'lava_scheduler_app/extension.py'
--- lava_scheduler_app/extension.py	2012-02-29 23:59:27 +0000
+++ lava_scheduler_app/extension.py	2012-03-27 05:41:09 +0000
@@ -65,3 +65,20 @@ 
     def contribute_to_settings(self, settings_module):
         super(SchedulerExtension, self).contribute_to_settings(settings_module)
         settings_module['INSTALLED_APPS'].append('django_tables2')
+        from_module = settings_module.get('SCHEDULER_DAEMON_OPTIONS', {})
+        settings_module['SCHEDULER_DAEMON_OPTIONS'] = {
+            'LOG_FILE_PATH': None,
+            'LOG_LEVEL': "WARNING",
+            # 500 megs should be enough for anyone
+            'LOG_FILE_SIZE_LIMIT': 500*1024*1024,
+            # Jobs always specify a timeout, but I suspect its often too low.
+            # So we don't let it go below this value, which defaults to a day.
+            'MIN_JOB_TIMEOUT': 24*60*60,
+            }
+        settings_module['SCHEDULER_DAEMON_OPTIONS'].update(from_module)
+
+    def contribute_to_settings_ex(self, settings_module, settings_object):
+        super(SchedulerExtension, self).contribute_to_settings_ex(
+            settings_module, settings_object)
+        settings_module['SCHEDULER_DAEMON_OPTIONS'].update(
+            settings_object.get_setting('SCHEDULER_DAEMON_OPTIONS', {}))

=== modified file 'lava_scheduler_app/management/commands/__init__.py'
--- lava_scheduler_app/management/commands/__init__.py	2012-02-16 01:46:46 +0000
+++ lava_scheduler_app/management/commands/__init__.py	2012-03-30 03:04:34 +0000
@@ -1,23 +1,99 @@ 
-import logging
-import sys
+import logging.config
+from optparse import make_option
 
 from django.core.management.base import BaseCommand
 
 
 class SchedulerCommand(BaseCommand):
 
+    option_list = BaseCommand.option_list + (
+        make_option('-l', '--loglevel',
+                    action='store',
+                    default=None,
+                    help="Log level, default is taken from settings."),
+        make_option('-f', '--logfile',
+                    action='store',
+                    default=None,
+                    help="Path to log file, default is taken from settings."),
+        )
+
     log_prefix = ''
 
-    def _configure_logging(self, loglevel, logfile=None):
-        logger = logging.getLogger('')
-        if logfile is None:
-            handler = logging.StreamHandler(sys.stderr)
+
+    _DEFAULT_LOGGING = {
+        'version': 1,
+        'disable_existing_loggers': True,
+        'root': {
+            'level': None,
+            'handlers': ['default'],
+        },
+        'formatters': {
+            'default': {
+                'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
+            },
+        },
+        'handlers': {
+            'default': {
+                'level': 'DEBUG',
+                'class': 'logging.FileHandler',
+                'formatter': 'verbose'
+            }
+        },
+    }
+
+
+    def _configure(self, options):
+        from django.conf import settings
+
+        daemon_options = settings.SCHEDULER_DAEMON_OPTIONS.copy()
+        if options['logfile'] is not None:
+            daemon_options['LOG_FILE_PATH'] = options['logfile']
+        if options['loglevel'] is not None:
+            daemon_options['LOG_LEVEL'] = options['loglevel']
+
+        if daemon_options['LOG_FILE_PATH'] in [None, '-']:
+            handler = {
+                'level': 'DEBUG',
+                'class': 'logging.StreamHandler',
+                'formatter': 'default',
+                }
         else:
-            handler = logging.FileHandler(logfile)
+            handler = {
+                'level': 'DEBUG',
+                'class': 'logging.FileHandler',
+                'filename': daemon_options['LOG_FILE_PATH'],
+                'formatter': 'default'
+                }
+
         fmt = "%(asctime)s [%(levelname)s] [%(name)s] %(message)s"
         if self.log_prefix:
             fmt = self.log_prefix + ' ' + fmt
-        handler.setFormatter(logging.Formatter(fmt))
-        logger.addHandler(handler)
-        logger.setLevel(getattr(logging, loglevel.upper()))
+
+
+        LOGGING = {
+            'version': 1,
+            'disable_existing_loggers': True,
+            'root': {
+                'level': daemon_options['LOG_LEVEL'].upper(),
+                'handlers': ['default'],
+            },
+            'formatters': {'default': {'format': fmt}},
+            'handlers': {'default': handler}
+            }
+
+        try:
+            import lava.raven
+        except ImportError:
+            pass
+        else:
+            LOGGING['handlers']['sentry'] = {
+                'level': 'ERROR',
+                'class': 'raven.contrib.django.handlers.SentryHandler',
+            }
+            LOGGING['root']['handlers'].append('sentry')
+
+
+        logging.config.dictConfig(LOGGING)
+
+        return daemon_options
 

=== modified file 'lava_scheduler_app/management/commands/scheduler.py'
--- lava_scheduler_app/management/commands/scheduler.py	2012-03-19 04:31:48 +0000
+++ lava_scheduler_app/management/commands/scheduler.py	2012-03-27 05:41:09 +0000
@@ -35,14 +35,6 @@ 
                     dest="dispatcher",
                     default="lava-dispatch",
                     help="Dispatcher command to invoke"),
-        make_option('-l', '--loglevel',
-                    action='store',
-                    default='WARNING',
-                    help="Log level, default is WARNING"),
-        make_option('-f', '--logfile',
-                    action='store',
-                    default=None,
-                    help="Path to log file"),
     )
 
     def handle(self, *args, **options):
@@ -51,10 +43,9 @@ 
         from twisted.internet import reactor
 
         from lava_scheduler_daemon.service import BoardSet
-
         from lava_scheduler_daemon.dbjobsource import DatabaseJobSource
 
-        self._configure_logging(options['loglevel'], options['logfile'])
+        daemon_options = self._configure(options)
 
         source = DatabaseJobSource()
 
@@ -67,7 +58,6 @@ 
         else:
             dispatcher = options['dispatcher']
         service = BoardSet(
-            source, dispatcher, reactor, log_file=options['logfile'],
-            log_level=options['loglevel'])
+            source, dispatcher, reactor, daemon_options=daemon_options)
         reactor.callWhenRunning(service.startService)
         reactor.run()

=== modified file 'lava_scheduler_app/management/commands/schedulermonitor.py'
--- lava_scheduler_app/management/commands/schedulermonitor.py	2012-02-16 01:39:54 +0000
+++ lava_scheduler_app/management/commands/schedulermonitor.py	2012-03-27 05:41:09 +0000
@@ -16,7 +16,6 @@ 
 # You should have received a copy of the GNU Affero General Public License
 # along with LAVA Scheduler.  If not, see <http://www.gnu.org/licenses/>.
 
-from optparse import make_option
 import simplejson
 
 
@@ -27,40 +26,19 @@ 
 class Command(SchedulerCommand):
 
     help = "Run the LAVA test job scheduler"
-    option_list = SchedulerCommand.option_list + (
-        make_option('--use-fake',
-                    action='store_true',
-                    dest='use_fake',
-                    default=False,
-                    help="Use fake dispatcher (for testing)"),
-        make_option('--dispatcher',
-                    action="store",
-                    dest="dispatcher",
-                    default="lava-dispatch",
-                    help="Dispatcher command to invoke"),
-        make_option('-l', '--loglevel',
-                    action='store',
-                    default='WARNING',
-                    help="Log level, default is WARNING"),
-        make_option('-f', '--logfile',
-                    action='store',
-                    default=None,
-                    help="Path to log file"),
-    )
 
     log_prefix = 'M'
 
     def handle(self, *args, **options):
         from twisted.internet import reactor
         from lava_scheduler_daemon.board import Job
+        daemon_options = self._configure(options)
         source = DatabaseJobSource()
         dispatcher, board_name, json_file = args
         job = Job(
             simplejson.load(open(json_file)), dispatcher,
-            source, board_name, reactor, log_file=options['logfile'],
-            log_level=options['loglevel'])
+            source, board_name, reactor, daemon_options=daemon_options)
         def run():
             job.run().addCallback(lambda result: reactor.stop())
         reactor.callWhenRunning(run)
-        self._configure_logging(options['loglevel'], options['logfile'])
         reactor.run()

=== modified file 'lava_scheduler_daemon/board.py'
--- lava_scheduler_daemon/board.py	2012-02-16 01:35:16 +0000
+++ lava_scheduler_daemon/board.py	2012-03-29 04:56:44 +0000
@@ -4,6 +4,7 @@ 
 import tempfile
 import logging
 
+from twisted.internet.error import ProcessExitedAlready
 from twisted.internet.protocol import ProcessProtocol
 from twisted.internet import defer, task
 from twisted.protocols.basic import LineReceiver
@@ -16,14 +17,15 @@ 
             failure.getTraceback())
     return eb
 
+OOB_FD = 3
+
 
 class OOBDataProtocol(LineReceiver):
 
-    logger = logging.getLogger(__name__ + '.OOBDataProtocol')
-
     delimiter = '\n'
 
     def __init__(self, source, board_name, _source_lock):
+        self.logger = logging.getLogger(__name__ + '.OOBDataProtocol')
         self.source = source
         self.board_name = board_name
         self._source_lock = _source_lock
@@ -41,18 +43,22 @@ 
 
 class DispatcherProcessProtocol(ProcessProtocol):
 
-    logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
 
-    def __init__(self, deferred, log_file, source, board_name, _source_lock):
+    def __init__(self, deferred, log_file, job):
+        self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
         self.deferred = deferred
         self.log_file = log_file
-        self.source = source
-        self.oob_data = OOBDataProtocol(source, board_name, _source_lock)
+        self.job = job
+        self.oob_data = OOBDataProtocol(
+            job.source, job.board_name, job._source_lock)
 
     def childDataReceived(self, childFD, data):
-        if childFD == 3:
+        if childFD == OOB_FD:
             self.oob_data.dataReceived(data)
         self.log_file.write(data)
+        if self.log_file.tell() > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
+            if not self.job._killing:
+                self.job.cancel("exceeded log size limit")
         self.log_file.flush()
 
     def processEnded(self, reason):
@@ -62,55 +68,96 @@ 
 
 class Job(object):
 
-    logger = logging.getLogger(__name__ + '.Job')
 
     def __init__(self, job_data, dispatcher, source, board_name, reactor,
-                 log_file, log_level):
+                 daemon_options):
         self.job_data = job_data
         self.dispatcher = dispatcher
         self.source = source
         self.board_name = board_name
+        self.logger = logging.getLogger(__name__ + '.Job.' + board_name)
         self.reactor = reactor
+        self.daemon_options = daemon_options
         self._json_file = None
         self._source_lock = defer.DeferredLock()
         self._checkCancel_call = task.LoopingCall(self._checkCancel)
+        self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL']
+        self._time_limit_call = None
+        self._killing = False
+        self.job_log_file = None
 
     def _checkCancel(self):
-        return self._source_lock.run(
-            self.source.jobCheckForCancellation, self.board_name).addCallback(
-            self._maybeCancel)
+        if self._killing:
+            self.cancel()
+        else:
+            return self._source_lock.run(
+                self.source.jobCheckForCancellation, self.board_name).addCallback(
+                self._maybeCancel)
+
+    def cancel(self, reason=None):
+        if not self._killing and reason is None:
+            reason = "killing job for unknown reason"
+        if not self._killing:
+            self.logger.info(reason)
+            self.job_log_file.write("\n%s\n" % reason.upper())
+        self._killing = True
+        if self._signals:
+            signame = self._signals.pop(0)
+        else:
+            self.logger.warning("self._signals is empty!")
+            signame = 'SIGKILL'
+        self.logger.info(
+            'attempting to kill job with signal %s' % signame)
+        try:
+            self._protocol.transport.signalProcess(getattr(signal, signame))
+        except ProcessExitedAlready:
+            pass
 
     def _maybeCancel(self, cancel):
         if cancel:
-            self._protocol.transport.signalProcess(signal.SIGINT)
+            self.cancel("killing job by user request")
+        else:
+            logging.debug('not cancelling')
+
+    def _time_limit_exceeded(self):
+        self._time_limit_call = None
+        self.cancel("killing job for exceeding timeout")
 
     def run(self):
         d = self.source.getLogFileForJobOnBoard(self.board_name)
         return d.addCallback(self._run).addErrback(
             catchall_errback(self.logger))
 
-    def _run(self, log_file):
+    def _run(self, job_log_file):
         d = defer.Deferred()
         json_data = self.job_data
         fd, self._json_file = tempfile.mkstemp()
         with os.fdopen(fd, 'wb') as f:
             json.dump(json_data, f)
         self._protocol = DispatcherProcessProtocol(
-            d, log_file, self.source, self.board_name, self._source_lock)
+            d, job_log_file, self)
+        self.job_log_file = job_log_file
         self.reactor.spawnProcess(
             self._protocol, self.dispatcher, args=[
-                self.dispatcher, self._json_file, '--oob-fd', '3'],
-            childFDs={0:0, 1:'r', 2:'r', 3:'r'}, env=None)
+                self.dispatcher, self._json_file, '--oob-fd', str(OOB_FD)],
+            childFDs={0:0, 1:'r', 2:'r', OOB_FD:'r'}, env=None)
         self._checkCancel_call.start(10)
+        timeout = max(
+            json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT'])
+        self._time_limit_call = self.reactor.callLater(
+            timeout, self._time_limit_exceeded)
         d.addBoth(self._exited)
         return d
 
+
     def _exited(self, exit_code):
         self.logger.info("job finished on %s", self.job_data['target'])
         if self._json_file is not None:
             os.unlink(self._json_file)
         self.logger.info("reporting job completed")
-        self._source_lock.run(self._checkCancel_call.stop)
+        if self._time_limit_call is not None:
+            self._time_limit_call.cancel()
+        self._checkCancel_call.stop()
         return self._source_lock.run(
             self.source.jobCompleted, self.board_name, exit_code).addCallback(
             lambda r:exit_code)
@@ -125,17 +172,16 @@ 
 
 class MonitorJob(object):
 
-    logger = logging.getLogger(__name__ + '.MonitorJob')
 
     def __init__(self, job_data, dispatcher, source, board_name, reactor,
-                 log_file, log_level):
+                 daemon_options):
+        self.logger = logging.getLogger(__name__ + '.MonitorJob')
         self.job_data = job_data
         self.dispatcher = dispatcher
         self.source = source
         self.board_name = board_name
         self.reactor = reactor
-        self.log_file = log_file
-        self.log_level = log_level
+        self.daemon_options = daemon_options
         self._json_file = None
 
     def run(self):
@@ -147,9 +193,9 @@ 
         args = [
             'setsid', 'lava-server', 'manage', 'schedulermonitor',
             self.dispatcher, str(self.board_name), self._json_file,
-            '-l', self.log_level]
-        if self.log_file:
-            args.extend(['-f', self.log_file])
+            '-l', self.daemon_options['LOG_LEVEL']]
+        if self.daemon_options['LOG_FILE_PATH']:
+            args.extend(['-f', self.daemon_options['LOG_FILE_PATH']])
         self.logger.info('executing "%s"', ' '.join(args))
         self.reactor.spawnProcess(
             SimplePP(d), 'setsid', childFDs={0:0, 1:1, 2:2},
@@ -213,14 +259,12 @@ 
 
     job_cls = MonitorJob
 
-    def __init__(self, source, board_name, dispatcher, reactor, log_file,
-                 log_level, job_cls=None):
+    def __init__(self, source, board_name, dispatcher, reactor, daemon_options, job_cls=None):
         self.source = source
         self.board_name = board_name
         self.dispatcher = dispatcher
         self.reactor = reactor
-        self.log_file = log_file
-        self.log_level = log_level
+        self.daemon_options = daemon_options
         if job_cls is not None:
             self.job_cls = job_cls
         self.running_job = None
@@ -301,7 +345,7 @@ 
         self.logger.info("starting job %r", job_data)
         self.running_job = self.job_cls(
             job_data, self.dispatcher, self.source, self.board_name,
-            self.reactor, self.log_file, self.log_level)
+            self.reactor, self.daemon_options)
         d = self.running_job.run()
         d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
 

=== modified file 'lava_scheduler_daemon/service.py'
--- lava_scheduler_daemon/service.py	2012-02-16 01:35:16 +0000
+++ lava_scheduler_daemon/service.py	2012-03-29 04:56:44 +0000
@@ -9,15 +9,13 @@ 
 
 class BoardSet(Service):
 
-    logger = logging.getLogger(__name__ + '.BoardSet')
-
-    def __init__(self, source, dispatcher, reactor, log_file, log_level):
+    def __init__(self, source, dispatcher, reactor, daemon_options):
+        self.logger = logging.getLogger(__name__ + '.BoardSet')
         self.source = source
         self.boards = {}
         self.dispatcher = dispatcher
         self.reactor = reactor
-        self.log_file = log_file
-        self.log_level = log_level
+        self.daemon_options = daemon_options
         self._update_boards_call = LoopingCall(self._updateBoards)
         self._update_boards_call.clock = reactor
 
@@ -37,7 +35,7 @@ 
             else:
                 new_boards[board_name] = Board(
                     self.source, board_name, self.dispatcher, self.reactor,
-                    self.log_file, self.log_level)
+                    self.daemon_options)
                 new_boards[board_name].start()
         for board in self.boards.values():
             board.stop()

=== modified file 'lava_scheduler_daemon/tests/test_board.py'
--- lava_scheduler_daemon/tests/test_board.py	2012-02-16 01:35:16 +0000
+++ lava_scheduler_daemon/tests/test_board.py	2012-03-27 02:57:52 +0000
@@ -36,7 +36,7 @@ 
 
 class TestJob(object):
 
-    def __init__(self, job_data, dispatcher, source, board_name, reactor):
+    def __init__(self, job_data, dispatcher, source, board_name, reactor, options):
         self.json_data = job_data
         self.dispatcher = dispatcher
         self.reactor = reactor
@@ -76,7 +76,7 @@ 
 
     def make_board(self, board_name):
         board = Board(
-            self.source, board_name, 'script', self.clock, job_cls=TestJob)
+            self.source, board_name, 'script', self.clock, None, job_cls=TestJob)
         board.logger.addHandler(self._handler)
         board.logger.setLevel(logging.DEBUG)
         return board