diff mbox

[Branch,~linaro-validation/lava-scheduler/trunk] Rev 5: first cut at a scheduler daemon: this one just uses the filesystem as its datastore

Message ID 20110622225313.6818.78445.launchpad@loganberry.canonical.com
State Accepted
Headers show

Commit Message

Michael-Doyle Hudson June 22, 2011, 10:53 p.m. UTC
Merge authors:
  Michael Hudson-Doyle (mwhudson)
Related merge proposals:
  https://code.launchpad.net/~mwhudson/lava-scheduler/daemon-v0/+merge/65287
  proposed by: Michael Hudson-Doyle (mwhudson)
  review: Approve - Paul Larson (pwlars)
------------------------------------------------------------
revno: 5 [merge]
committer: Michael-Doyle Hudson <michael.hudson@linaro.org>
branch nick: trunk
timestamp: Thu 2011-06-23 10:51:42 +1200
message:
  first cut at a scheduler daemon: this one just uses the filesystem as its datastore
added:
  fake-dispatcher
  lava-scheduler-daemon.tac
  lava_scheduler_daemon/
  lava_scheduler_daemon/__init__.py
  lava_scheduler_daemon/board.py
  lava_scheduler_daemon/jobsource.py
  lava_scheduler_daemon/service.py
  lava_scheduler_daemon/tests/
  lava_scheduler_daemon/tests/__init__.py
  lava_scheduler_daemon/tests/test_board.py
modified:
  .bzrignore


--
lp:lava-scheduler
https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk

You are subscribed to branch lp:lava-scheduler.
To unsubscribe from this branch go to https://code.launchpad.net/~linaro-validation/lava-scheduler/trunk/+edit-subscription
diff mbox

Patch

=== modified file '.bzrignore'
--- .bzrignore	2011-06-07 06:10:56 +0000
+++ .bzrignore	2011-06-21 00:58:19 +0000
@@ -1,1 +1,3 @@ 
 *.egg-info
+./twistd.pid
+./_trial_temp

=== added file 'fake-dispatcher'
--- fake-dispatcher	1970-01-01 00:00:00 +0000
+++ fake-dispatcher	2011-06-15 04:57:02 +0000
@@ -0,0 +1,6 @@ 
+#!/bin/sh
+echo starting processing $1
+echo error >&2
+sleep 10
+cat $1
+echo ending

=== added file 'lava-scheduler-daemon.tac'
--- lava-scheduler-daemon.tac	1970-01-01 00:00:00 +0000
+++ lava-scheduler-daemon.tac	2011-06-20 01:08:32 +0000
@@ -0,0 +1,22 @@ 
+import logging
+import sys
+
+from twisted.application import service
+from twisted.application import internet
+from twisted.python import filepath
+from twisted.internet import reactor
+
+from lava_scheduler_daemon.service import BoardSet
+from lava_scheduler_daemon.jobsource import DirectoryJobSource
+
+application = service.Application("lava scheduler daemon")
+
+source = DirectoryJobSource(filepath.FilePath('/tmp/lava-jobs'))
+board_set = BoardSet(source, 'fake-dispatcher', reactor)
+board_set.setServiceParent(application)
+
+logger = logging.getLogger('')
+handler = logging.StreamHandler(sys.stdout)
+handler.setFormatter(logging.Formatter("[%(name)s] %(message)s"))
+logger.addHandler(handler)
+logger.setLevel(logging.DEBUG)

=== added directory 'lava_scheduler_daemon'
=== added file 'lava_scheduler_daemon/__init__.py'
=== added file 'lava_scheduler_daemon/board.py'
--- lava_scheduler_daemon/board.py	1970-01-01 00:00:00 +0000
+++ lava_scheduler_daemon/board.py	2011-06-21 03:10:04 +0000
@@ -0,0 +1,223 @@ 
+import json
+import os
+import tempfile
+import logging
+
+from twisted.internet.protocol import ProcessProtocol
+from twisted.internet import defer
+
+
+logger = logging.getLogger(__name__)
+
+
+class DispatcherProcessProtocol(ProcessProtocol):
+
+    logger = logger.getChild('DispatcherProcessProtocol')
+
+    def __init__(self, deferred):
+        self.deferred = deferred
+
+    def connectionMade(self):
+        fd, self._logpath = tempfile.mkstemp()
+        self._output = os.fdopen(fd, 'wb')
+
+    def outReceived(self, text):
+        self._output.write(text)
+
+    errReceived = outReceived
+
+    def _cleanUp(self, result):
+        os.unlink(self._logpath)
+        return result
+
+    def processEnded(self, reason):
+        # This discards the process exit value.
+        self._output.close()
+        self.deferred.callback(self._logpath)
+        self.deferred.addCallback(self._cleanUp)
+
+
+class Job(object):
+
+    logger = logger.getChild('Job')
+
+    def __init__(self, json_data, dispatcher, reactor):
+        self.json_data = json_data
+        self.dispatcher = dispatcher
+        self.reactor = reactor
+        self._json_file = None
+
+    def run(self):
+        d = defer.Deferred()
+        fd, self._json_file = tempfile.mkstemp()
+        with os.fdopen(fd, 'wb') as f:
+            json.dump(self.json_data, f)
+        self.reactor.spawnProcess(
+            DispatcherProcessProtocol(d), self.dispatcher,
+            args=[self.dispatcher, self._json_file],
+            childFDs={0:0, 1:'r', 2:'r'})
+        d.addBoth(self._exited)
+        return d
+
+    def _exited(self, log_file_path):
+        self.logger.info("job finished on %s", self.json_data['target'])
+        if self._json_file is not None:
+            os.unlink(self._json_file)
+        return log_file_path
+
+
+class Board(object):
+    """
+
+    A board runs jobs.  A board can be in four main states:
+
+     * stopped (S)
+       * the board is not looking for or processing jobs
+     * checking (C)
+       * a call to check for a new job is in progress
+     * waiting (W)
+       * no job was found by the last call to getJobForBoard and so the board
+         is waiting for a while before calling again.
+     * running (R)
+       * a job is running (or a job has completed but the call to jobCompleted
+         on the job source has not)
+
+    In addition, because we can't stop a job instantly nor abort a check for a
+    new job safely (because a if getJobForBoard returns a job, it has already
+    been marked as started), there are variations on the 'checking' and
+    'running' states -- 'checking with stop requested' (C+S) and 'running with
+    stop requested' (R+S).  Even this is a little simplistic as there is the
+    possibility of .start() being called before the process of stopping
+    completes, but we deal with this by deferring any actions taken by
+    .start() until the board is really stopped.
+
+    Events that cause state transitions are:
+
+     * start() is called.  We cheat and pretend that this can only happen in
+       the stopped state by stopping first, and then move into the C state.
+
+     * stop() is called.  If we in the C or R state we move to C+S or R+S
+       resepectively.  If we are in S, C+S or R+S, we stay there.  If we are
+       in W, we just move straight to S.
+
+     * getJobForBoard() returns a job.  We can only be in C or C+S here, and
+       move into R or R+S respectively.
+
+     * getJobForBoard() indicates that there is no job to perform.  Again we
+       can only be in C or C+S and move into W or S respectively.
+
+     * a job completes (i.e. the call to jobCompleted() on the source
+       returns).  We can only be in R or R+S and move to C or S respectively.
+
+     * the timer that being in state W implies expires.  We move into C.
+
+    The cheating around start means that interleaving start and stop calls may
+    not always do what you expect.  So don't mess around in that way please.
+    """
+
+    logger = logger.getChild('Board')
+
+    job_cls = Job
+
+    def __init__(self, source, board_name, dispatcher, reactor, job_cls=None):
+        self.source = source
+        self.board_name = board_name
+        self.dispatcher = dispatcher
+        self.reactor = reactor
+        if job_cls is not None:
+            self.job_cls = job_cls
+        self.running_job = None
+        self._check_call = None
+        self._stopping_deferreds = []
+        self.logger = self.logger.getChild(board_name)
+        self.checking = False
+
+    def _state_name(self):
+        if self.running_job:
+            state = "R"
+        elif self._check_call:
+            assert not self._stopping_deferreds
+            state = "W"
+        elif self.checking:
+            state = "C"
+        else:
+            assert not self._stopping_deferreds
+            state = "S"
+        if self._stopping_deferreds:
+            state += "+S"
+        return state
+
+    def start(self):
+        self.logger.debug("start requested")
+        self.stop().addCallback(self._start)
+
+    def _start(self, ignored):
+        self.logger.debug("starting")
+        self._stopping_deferreds = []
+        self._checkForJob()
+
+    def stop(self):
+        self.logger.debug("stopping")
+        if self._check_call is not None:
+            self._check_call.cancel()
+            self._check_call = None
+
+        if self.running_job is not None or self.checking:
+            self.logger.debug("job running; deferring stop")
+            self._stopping_deferreds.append(defer.Deferred())
+            return self._stopping_deferreds[-1]
+        else:
+            self.logger.debug("stopping immediately")
+            return defer.succeed(None)
+
+    def _checkForJob(self):
+        self.logger.debug("checking for job")
+        self._check_call = None
+        self.checking = True
+        self.source.getJobForBoard(self.board_name).addCallbacks(
+            self._maybeStartJob, self._ebCheckForJob)
+
+    def _ebCheckForJob(self, result):
+        self.logger.exception(result.value)
+        self._maybeStartJob(None)
+
+    def _finish_stop(self):
+        self.logger.debug(
+            "calling %s deferreds returned from stop()",
+            len(self._stopping_deferreds))
+        for d in self._stopping_deferreds:
+            d.callback(None)
+        self._stopping_deferreds = []
+
+    def _maybeStartJob(self, json_data):
+        self.checking = False
+        if json_data is None:
+            self.logger.debug("no job found")
+            if self._stopping_deferreds:
+                self._finish_stop()
+            else:
+                self._check_call = self.reactor.callLater(
+                    10, self._checkForJob)
+            return
+        self.logger.debug("starting job")
+        self.running_job = self.job_cls(
+            json_data, self.dispatcher, self.reactor)
+        d = self.running_job.run()
+        d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
+
+    def _cbJobFinished(self, log_file_path):
+        self.logger.info("reporting job completed")
+        self.source.jobCompleted(
+            self.board_name, log_file_path). addCallback(
+            self._cbJobCompleted)
+
+    def _ebJobFinished(self, result):
+        self.logger.exception(result.value)
+        self._checkForJob()
+
+    def _cbJobCompleted(self, result):
+        self.running_job = None
+        if self._stopping_deferreds:
+            self._finish_stop()
+        else:
+            self._checkForJob()

=== added file 'lava_scheduler_daemon/jobsource.py'
--- lava_scheduler_daemon/jobsource.py	1970-01-01 00:00:00 +0000
+++ lava_scheduler_daemon/jobsource.py	2011-06-21 03:15:13 +0000
@@ -0,0 +1,95 @@ 
+import json
+import logging
+
+from twisted.internet import defer
+
+from zope.interface import (
+    implements,
+    Interface,
+    )
+
+logger = logging.getLogger(__name__)
+
+
+class IJobSource(Interface):
+
+    def getBoardList():
+        """Get the list of currently configured board names."""
+
+    def getJobForBoard(board_name):
+        """Return the json data of a job for board_name to run.
+
+        The job should be marked as started before it is returned.
+        """
+
+    def jobCompleted(board_name, log_file_path):
+        """Mark the job currently running on `board_name` as completed."""
+
+
+class DirectoryJobSource(object):
+
+    implements(IJobSource)
+
+    logger = logger.getChild('DirectoryJobSource')
+
+    def __init__(self, directory):
+        self.directory = directory
+        if not self.directory.isdir():
+            self.logger.critical("%s is not a directory", self.directory)
+            raise RuntimeError("%s must be a directory" % self.directory)
+        boards = self.directory.child('boards')
+        if not boards.isdir():
+            self.logger.critical("%s is not a directory", boards)
+            raise RuntimeError("%s must be a directory" % boards)
+        for subdir in 'incoming', 'completed', 'broken':
+            subdir = self.directory.child(subdir)
+            if not subdir.isdir():
+                subdir.createDirectory()
+        self.logger.info("starting to look for jobs in %s", self.directory)
+
+    def _getBoardList(self):
+        return self.directory.child('boards').listdir()
+
+    def getBoardList(self):
+        return defer.maybeDeferred(self._getBoardList)
+
+    def _jsons(self, kind):
+        files = self.directory.child(kind).globChildren("*.json")
+        for json_file in files:
+            yield (json.load(json_file.open()), json_file)
+
+    def _board_dir(self, board_name):
+        return self.directory.child('boards').child(board_name)
+
+    def _getJobForBoard(self, board_name):
+        self.logger.debug('getting job for %s', board_name)
+        board_dir = self._board_dir(board_name)
+        if board_dir.listdir() != []:
+            self.logger.debug('board %s busy', board_name)
+            return None
+        for json_data, json_file in self._jsons('incoming'):
+            self.logger.debug('considering %s for %s', json_file, board_name)
+            if json_data['target'] == board_name:
+                self.logger.debug('running %s on %s', json_file, board_name)
+                json_file.moveTo(board_dir.child(json_file.basename()))
+                return json_data
+        else:
+            return None
+
+    def getJobForBoard(self, board_name):
+        return defer.maybeDeferred(self._getJobForBoard, board_name)
+
+    def _jobCompleted(self, board_name, log_file_path):
+        [json_file] = self._board_dir(board_name).children()
+        completed = self.directory.child('completed')
+        counter = 0
+        while True:
+            fname = '%03d%s' % (counter, json_file.basename())
+            if not completed.child(fname).exists():
+                break
+            counter += 1
+        json_file.moveTo(completed.child(fname))
+
+    def jobCompleted(self, board_name, log_file_path):
+        return defer.maybeDeferred(
+            self._jobCompleted, board_name, log_file_path)

=== added file 'lava_scheduler_daemon/service.py'
--- lava_scheduler_daemon/service.py	1970-01-01 00:00:00 +0000
+++ lava_scheduler_daemon/service.py	2011-06-17 02:44:13 +0000
@@ -0,0 +1,56 @@ 
+import logging
+
+from twisted.application.service import Service
+from twisted.internet import defer
+from twisted.internet.task import LoopingCall
+
+from lava_scheduler_daemon.board import Board
+
+
+logger = logging.getLogger(__name__)
+
+
+class BoardSet(Service):
+
+    logger = logger.getChild('BoardSet')
+
+    def __init__(self, source, dispatcher, reactor):
+        self.source = source
+        self.boards = {}
+        self.dispatcher = dispatcher
+        self.reactor = reactor
+        self._update_boards_call = LoopingCall(self._updateBoards)
+        self._update_boards_call.clock = reactor
+
+    def _updateBoards(self):
+        self.logger.info("Refreshing board list")
+        return self.source.getBoardList().addCallback(self._cbUpdateBoards)
+
+    def _cbUpdateBoards(self, board_names):
+        self.logger.info("New board list %s", board_names)
+        new_boards = {}
+        for board_name in board_names:
+            if board_name in self.boards:
+                new_boards[board_name] = self.boards.pop(board_name)
+            else:
+                new_boards[board_name] = Board(
+                    self.source, board_name, self.dispatcher, self.reactor)
+                new_boards[board_name].start()
+        for board in self.boards.values():
+            board.stop()
+        self.boards = new_boards
+
+    def startService(self):
+        self._update_boards_call.start(20)
+
+    def stopService(self):
+        self._update_boards_call.stop()
+        ds = []
+        dead_boards = []
+        for board in self.boards.itervalues():
+            ds.append(board.stop().addCallback(dead_boards.append))
+        self.logger.info(
+            "waiting for %s boards", len(self.boards) - len(dead_boards))
+        return defer.gatherResults(ds)
+
+

=== added directory 'lava_scheduler_daemon/tests'
=== added file 'lava_scheduler_daemon/tests/__init__.py'
=== added file 'lava_scheduler_daemon/tests/test_board.py'
--- lava_scheduler_daemon/tests/test_board.py	1970-01-01 00:00:00 +0000
+++ lava_scheduler_daemon/tests/test_board.py	2011-06-21 03:10:04 +0000
@@ -0,0 +1,184 @@ 
+from collections import defaultdict
+import logging
+
+from twisted.internet import defer
+from twisted.internet.task import Clock
+from twisted.trial.unittest import TestCase
+
+from lava_scheduler_daemon.board import Board
+
+def stub_method(method_name):
+    def method_impl(self, board_name, *args):
+        assert method_name not in self._requests[board_name], (
+            'overlapping call to %s on %s' % (method_name, board_name))
+        d = self._requests[method_name][board_name] = defer.Deferred()
+        def _remove_request(result):
+            del self._requests[method_name][board_name]
+            return result
+        d.addBoth(_remove_request)
+        self._calls[board_name][method_name].append(args)
+        return d
+    return method_impl
+
+
+class TestJobSource(object):
+
+    def __init__(self):
+        self._calls = defaultdict(lambda :defaultdict(list))
+        self._requests = defaultdict(dict)
+
+    jobCompleted = stub_method('jobCompleted')
+    getJobForBoard = stub_method('getJobForBoard')
+
+    def _completeCall(self, method_name, board_name, result):
+        self._requests[method_name][board_name].callback(result)
+
+class TestJob(object):
+
+    def __init__(self, json_data, dispatcher, reactor):
+        self.json_data = json_data
+        self.dispatcher = dispatcher
+        self.reactor = reactor
+        self.deferred = defer.Deferred()
+
+    def run(self):
+        return self.deferred
+
+
+class AppendingHandler(logging.Handler):
+
+    def __init__(self, target_list):
+        logging.Handler.__init__(self)
+        self.target_list = target_list
+
+    def emit(self, record):
+        self.target_list.append((record.levelno, self.format(record)))
+
+
+class TestBoard(TestCase):
+
+    def setUp(self):
+        TestCase.setUp(self)
+        self.clock = Clock()
+        self.source = TestJobSource()
+        self._log_messages = []
+        self._handler = AppendingHandler(self._log_messages)
+        self.addCleanup(self._checkNoLogs)
+
+    def _checkNoLogs(self):
+        warnings = [message for (level, message) in self._log_messages
+                    if level >= logging.WARNING]
+        if warnings:
+            self.fail("Logged warnings: %s" % warnings)
+
+    def make_board(self, board_name):
+        board = Board(self.source, board_name, 'script', self.clock, TestJob)
+        board.logger.addHandler(self._handler)
+        board.logger.setLevel(logging.DEBUG)
+        return board
+
+    def test_initial_state_is_stopped(self):
+        b = self.make_board('board')
+        self.assertEqual('S', b._state_name())
+
+    def test_start_checks(self):
+        b = self.make_board('board')
+        b.start()
+        self.assertEqual('C', b._state_name())
+
+    def test_no_job_waits(self):
+        b = self.make_board('board')
+        b.start()
+        self.source._completeCall('getJobForBoard', 'board', None)
+        self.assertEqual('W', b._state_name())
+
+    def test_actual_job_runs(self):
+        b = self.make_board('board')
+        b.start()
+        self.source._completeCall('getJobForBoard', 'board', {})
+        self.assertEqual('R', b._state_name())
+
+    def test_completion_calls_jobCompleted(self):
+        b = self.make_board('board')
+        b.start()
+        self.source._completeCall('getJobForBoard', 'board', {})
+        b.running_job.deferred.callback('path')
+        self.assertEqual(
+            1, len(self.source._calls['board']['jobCompleted']))
+
+    def test_still_running_during_jobCompleted(self):
+        b = self.make_board('board')
+        b.start()
+        self.source._completeCall('getJobForBoard', 'board', {})
+        b.running_job.deferred.callback('path')
+        self.assertEqual('R', b._state_name())
+
+    def test_check_again_on_completion(self):
+        b = self.make_board('board')
+        b.start()
+        self.source._completeCall('getJobForBoard', 'board', {})
+        b.running_job.deferred.callback('path')
+        self.source._completeCall('jobCompleted', 'board', None)
+        self.assertEqual('C', b._state_name())
+
+    def test_stop_while_checking_moves_to_check_plus_stop(self):
+        b = self.make_board('board')
+        b.start()
+        b.stop()
+        self.assertEqual('C+S', b._state_name())
+
+    def test_stop_while_checking_no_job_stops(self):
+        b = self.make_board('board')
+        b.start()
+        s = b.stop()
+        stop_results = []
+        s.addCallback(stop_results.append)
+        self.assertEqual(0, len(stop_results))
+        self.source._completeCall('getJobForBoard', 'board', None)
+        self.assertEqual(1, len(stop_results))
+        self.assertEqual('S', b._state_name())
+
+    def test_stop_while_checking_actual_job_runs(self):
+        b = self.make_board('board')
+        b.start()
+        s = b.stop()
+        stop_results = []
+        s.addCallback(stop_results.append)
+        self.assertEqual(0, len(stop_results))
+        self.source._completeCall('getJobForBoard', 'board', {})
+        self.assertEqual(0, len(stop_results))
+        self.assertEqual('R+S', b._state_name())
+
+    def test_stop_while_checking_actual_job_stops_on_complete(self):
+        b = self.make_board('board')
+        b.start()
+        s = b.stop()
+        stop_results = []
+        s.addCallback(stop_results.append)
+        self.assertEqual(0, len(stop_results))
+        self.source._completeCall('getJobForBoard', 'board', {})
+        b.running_job.deferred.callback(None)
+        self.source._completeCall('jobCompleted', 'board', None)
+        self.assertEqual(1, len(stop_results))
+        self.assertEqual('S', b._state_name())
+
+    def test_stop_while_running_job_stops_on_complete(self):
+        b = self.make_board('board')
+        b.start()
+        self.source._completeCall('getJobForBoard', 'board', {})
+        self.assertEqual('R', b._state_name())
+        s = b.stop()
+        stop_results = []
+        s.addCallback(stop_results.append)
+        self.assertEqual(0, len(stop_results))
+        b.running_job.deferred.callback(None)
+        self.source._completeCall('jobCompleted', 'board', None)
+        self.assertEqual(1, len(stop_results))
+        self.assertEqual('S', b._state_name())
+
+    def test_wait_expires_check_again(self):
+        b = self.make_board('board')
+        b.start()
+        self.source._completeCall('getJobForBoard', 'board', None)
+        self.clock.advance(10000) # hack: the delay should be config data
+        self.assertEqual('C', b._state_name())