=== modified file '.bzrignore'
@@ -3,3 +3,6 @@
./build
./dist
/tags
+.testrepository
+*.egg
+lava_tool_coverage
=== added file '.coveragerc'
@@ -0,0 +1,14 @@
+[run]
+branch = True
+source = .
+omit =
+ setup*
+ */tests/*
+
+[report]
+precision = 2
+show_missing = True
+
+[html]
+title = Code Coverage of lava-tool
+directory = lava_tool_coverage
=== added file '.testr.conf'
@@ -0,0 +1,3 @@
+[DEFAULT]
+test_command=python -m subunit.run $IDLIST
+test_id_list_default=lava_tool.tests.test_suite
=== added file 'HACKING'
@@ -0,0 +1,17 @@
+Tests Code Coverage
+===================
+
+To have a nicely HTML viewable report on tests code coverage, do as follows:
+
+* Install `python-coverage` (`pip install coverage` in case you use pip)
+* Run the following command:
+
+ python-coverage run -m unittest lava_tool.tests.test_suite 2>/dev/null && python-coverage html
+
+* The report will be saved in a directory called `lava_tool_coverage`: open
+the `index.html` file in there to see the report.
+
+Notes:
+
+ * To re-run the coverage report, you have to delete the `lava_tool_coverage`
+directory first, otherwise `python-coverage` will fail.
=== modified file 'ci-build'
@@ -1,11 +1,15 @@
#!/bin/sh
+VENV_DIR="/tmp/ci-build-venv"
+# Directory where coverage HTML report will be written.
+COVERAGE_REPORT_DIR="lava_tool_coverage"
+
set -e
if test -z "$VIRTUAL_ENV"; then
set -x
- virtualenv ci-build-venv
- . ci-build-venv/bin/activate
+ virtualenv $VENV_DIR
+ . $VENV_DIR/bin/activate
python setup.py develop
fi
@@ -21,6 +25,14 @@
pip install mocker
fi
+if ! pip show mock | grep -q mock; then
+ pip install mock
+fi
+# Requirement to run code coverage tests.
+if ! pip show coverage | grep -q coverage; then
+ pip install coverage
+fi
+
export LAVACONFIG=/dev/null
if test -z "$DISPLAY"; then
@@ -39,4 +51,10 @@
python -m unittest lava_tool.tests.test_suite < /dev/null
fi
+if test -d $COVERAGE_REPORT_DIR; then
+ rm -rf $COVERAGE_REPORT_DIR
+fi
+# Runs python-coverage.
+python-coverage run -m unittest lava_tool.tests.test_suite 2>/dev/null && python-coverage html
+
./integration-tests
=== modified file 'entry_points.ini'
@@ -8,6 +8,14 @@
scheduler = lava_scheduler_tool.commands:scheduler
dashboard = lava_dashboard_tool.commands:dashboard
job = lava.job.commands:job
+device = lava.device.commands:device
+testdef = lava.testdef.commands:testdef
+init = lava.commands:init
+submit = lava.commands:submit
+run = lava.commands:run
+status = lava.job.commands:status
+update = lava.commands:update
+script = lava.script.commands:script
[lava_tool.commands]
help = lava.tool.commands.help:help
@@ -71,4 +79,19 @@
[lava.job.commands]
new = lava.job.commands:new
submit = lava.job.commands:submit
+status = lava.job.commands:status
run = lava.job.commands:run
+
+[lava.device.commands]
+add = lava.device.commands:add
+remove = lava.device.commands:remove
+config = lava.device.commands:config
+
+[lava.testdef.commands]
+new = lava.testdef.commands:new
+run = lava.testdef.commands:run
+submit = lava.testdef.commands:submit
+
+[lava.script.commands]
+run = lava.script.commands:run
+submit = lava.script.commands:submit
=== added file 'lava/commands.py'
@@ -0,0 +1,227 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Lava init commands.
+
+When invoking:
+
+ `lava init [DIR]`
+
+the command will create a default directory and files structure as follows:
+
+DIR/
+ |
+ +- JOB_FILE.json
+ +- tests/
+ |
+ + mytest.sh
+ + lavatest.yaml
+
+If DIR is not passed, it will use the current working directory.
+JOB_FILE is a file name that will be asked to the user, along with
+other necessary information to define the tests.
+
+If the user manually updates either the lavatest.yaml or mytest.sh file, it is
+necessary to run the following command in order to update the job definition:
+
+ `lava update [JOB|DIR]`
+"""
+
+import copy
+import json
+import os
+import sys
+
+from lava.helper.command import BaseCommand
+from lava.helper.template import (
+ expand_template,
+ set_value
+)
+from lava.job import (
+ JOB_FILE_EXTENSIONS,
+)
+from lava.job.templates import (
+ LAVA_TEST_SHELL_TAR_REPO_KEY,
+)
+from lava.parameter import (
+ Parameter,
+)
+from lava.testdef import (
+ DEFAULT_TESTDEF_FILENAME,
+)
+from lava.tool.errors import CommandError
+from lava_tool.utils import (
+ base64_encode,
+ create_dir,
+ create_tar,
+ edit_file,
+ retrieve_file,
+ write_file,
+)
+
+# Default directory structure name.
+TESTS_DIR = "tests"
+
+# Internal parameter ids.
+JOBFILE_ID = "jobfile"
+
+JOBFILE_PARAMETER = Parameter(JOBFILE_ID)
+JOBFILE_PARAMETER.store = False
+
+INIT_TEMPLATE = {
+ JOBFILE_ID: JOBFILE_PARAMETER,
+}
+
+
+class init(BaseCommand):
+ """Set-ups the base directory structure."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(init, cls).register_arguments(parser)
+ parser.add_argument("DIR",
+ help=("The name of the directory to initialize. "
+ "Defaults to current working directory."),
+ nargs="?",
+ default=os.getcwd())
+
+ def invoke(self):
+ full_path = os.path.abspath(self.args.DIR)
+
+ if os.path.isfile(full_path):
+ raise CommandError("'{0}' already exists, and is a "
+ "file.".format(self.args.DIR))
+
+ create_dir(full_path)
+ data = self._update_data()
+
+ # Create the directory that will contain the test definition and
+ # shell script.
+ test_path = create_dir(full_path, TESTS_DIR)
+ shell_script = self.create_shell_script(test_path)
+ # Let the user modify the file.
+ edit_file(shell_script)
+
+ testdef_file = self.create_test_definition(
+ os.path.join(test_path, DEFAULT_TESTDEF_FILENAME))
+
+ job = data[JOBFILE_ID]
+ self.create_tar_repo_job(
+ os.path.join(full_path, job), testdef_file, test_path)
+
+ def _update_data(self):
+ """Updates the template and ask values to the user.
+
+ The template in this case is a layout of the directory structure as it
+ would be written to disk.
+
+ :return A dictionary containing all the necessary file names to create.
+ """
+ data = copy.deepcopy(INIT_TEMPLATE)
+ expand_template(data, self.config)
+
+ return data
+
+
+class run(BaseCommand):
+ """Runs a job on the local dispatcher."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(run, cls).register_arguments(parser)
+ parser.add_argument("JOB",
+ help=("The job file to run, or a directory "
+ "containing a job file. If nothing is "
+ "passed, it uses the current working "
+ "directory."),
+ nargs="?",
+ default=os.getcwd())
+
+ def invoke(self):
+ full_path = os.path.abspath(self.args.JOB)
+ job_file = retrieve_file(full_path, JOB_FILE_EXTENSIONS)
+
+ super(run, self).run(job_file)
+
+
+class submit(BaseCommand):
+ """Submits a job to LAVA."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(submit, cls).register_arguments(parser)
+ parser.add_argument("JOB",
+ help=("The job file to send, or a directory "
+ "containing a job file. If nothing is "
+ "passed, it uses the current working "
+ "directory."),
+ nargs="?",
+ default=os.getcwd())
+
+ def invoke(self):
+ full_path = os.path.abspath(self.args.JOB)
+ job_file = retrieve_file(full_path, JOB_FILE_EXTENSIONS)
+
+ super(submit, self).submit(job_file)
+
+
+class update(BaseCommand):
+ """Updates a job file with the correct data."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(update, cls).register_arguments(parser)
+ parser.add_argument("JOB",
+ help=("Automatically updates a job file "
+ "definition. If nothing is passed, it uses"
+ "the current working directory."),
+ nargs="?",
+ default=os.getcwd())
+
+ def invoke(self):
+ full_path = os.path.abspath(self.args.JOB)
+ job_file = self.retrieve_file(full_path, JOB_FILE_EXTENSIONS)
+ job_dir = os.path.dirname(job_file)
+ tests_dir = os.path.join(job_dir, TESTS_DIR)
+
+ if os.path.isdir(tests_dir):
+ tar_repo = None
+ try:
+ tar_repo = create_tar(tests_dir)
+ encoded_tests = base64_encode(tar_repo)
+
+ json_data = None
+ with open(job_file, "r") as json_file:
+ try:
+ json_data = json.load(json_file)
+ set_value(json_data, LAVA_TEST_SHELL_TAR_REPO_KEY,
+ encoded_tests)
+ except Exception:
+ raise CommandError("Cannot read job file "
+ "'{0}'.".format(job_file))
+
+ content = json.dumps(json_data, indent=4)
+ write_file(job_file, content)
+
+ print >> sys.stdout, "Job definition updated."
+ finally:
+ if tar_repo and os.path.isfile(tar_repo):
+ os.unlink(tar_repo)
+ else:
+ raise CommandError("Cannot find tests directory.")
=== modified file 'lava/config.py'
@@ -16,80 +16,235 @@
# You should have received a copy of the GNU Lesser General Public License
# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+"""
+Config class.
+"""
+
import atexit
-from ConfigParser import ConfigParser, NoOptionError, NoSectionError
import os
import readline
-__all__ = ['InteractiveConfig', 'NonInteractiveConfig']
-
-history = os.path.join(os.path.expanduser("~"), ".lava_history")
+from ConfigParser import (
+ ConfigParser,
+ NoOptionError,
+ NoSectionError,
+)
+
+from lava.parameter import Parameter
+from lava.tool.errors import CommandError
+
+__all__ = ['Config', 'InteractiveConfig']
+
+# Store for function calls to be made at exit time.
+AT_EXIT_CALLS = set()
+# Config default section.
+DEFAULT_SECTION = "DEFAULT"
+
+HISTORY = os.path.join(os.path.expanduser("~"), ".lava_history")
try:
- readline.read_history_file(history)
+ readline.read_history_file(HISTORY)
except IOError:
pass
-atexit.register(readline.write_history_file, history)
-
-config_file = os.environ.get('LAVACONFIG') or os.path.join(os.path.expanduser('~'), '.lavaconfig')
-config_backend = ConfigParser()
-config_backend.read([config_file])
-def save_config():
- with open(config_file, 'w') as f:
- config_backend.write(f)
-atexit.register(save_config)
-
-class InteractiveConfig(object):
-
- def __init__(self, force_interactive=False):
- self._force_interactive = force_interactive
+atexit.register(readline.write_history_file, HISTORY)
+
+
+def _run_at_exit():
+ """Runs all the function at exit."""
+ for call in list(AT_EXIT_CALLS):
+ call()
+atexit.register(_run_at_exit)
+
+
+class Config(object):
+ """A generic config object."""
+
+ def __init__(self):
+ # The cache where to store parameters.
self._cache = {}
-
- def get(self, parameter):
- key = parameter.id
- value = None
+ self._config_file = None
+ self._config_backend = None
+ AT_EXIT_CALLS.add(self.save)
+
+ @property
+ def config_file(self):
+ if self._config_file is None:
+ self._config_file = (os.environ.get('LAVACONFIG') or
+ os.path.join(os.path.expanduser('~'),
+ '.lavaconfig'))
+ return self._config_file
+
+ @config_file.setter
+ def config_file(self, value):
+ self._config_file = value
+
+ @property
+ def config_backend(self):
+ if self._config_backend is None:
+ self._config_backend = ConfigParser()
+ self._config_backend.read([self.config_file])
+ return self._config_backend
+
+ def _calculate_config_section(self, parameter):
+ """Calculates the config section of the specified parameter.
+
+ :param parameter: The parameter to calculate the section of.
+ :type Parameter
+ :return The config section.
+ """
+ section = DEFAULT_SECTION
if parameter.depends:
- pass
- config_section = parameter.depends.id + '=' + self.get(parameter.depends)
+ section = "{0}={1}".format(parameter.depends.id,
+ self.get(parameter.depends))
+ return section
+
+ def get(self, parameter, section=None):
+ """Retrieves a Parameter value.
+
+ The value is taken either from the Parameter itself, or from the cache,
+ or from the config file.
+
+ :param parameter: The parameter to search.
+ :type Parameter
+ :return The parameter value, or None if it is not found.
+ """
+ if not section:
+ section = self._calculate_config_section(parameter)
+ # Try to get the parameter value first if it has one.
+ if parameter.value is not None:
+ value = parameter.value
else:
- config_section = "DEFAULT"
-
- if config_section in self._cache:
- if key in self._cache[config_section]:
- return self._cache[config_section][key]
-
- prompt = '%s: ' % key
-
+ value = self._get_from_cache(parameter, section)
+
+ if value is None:
+ value = self._get_from_backend(parameter, section)
+ return value
+
+ def get_from_backend(self, parameter, section=None):
+ """Gets a configuration parameter directly from the config file."""
+ if not section:
+ section = self._calculate_config_section(parameter)
+ return self._get_from_backend(parameter, section)
+
+ def _get_from_backend(self, parameter, section):
+ """Gets the parameter value from the config backend.
+
+ :param parameter: The Parameter to look up.
+ :param section: The section in the Config.
+ """
+ value = None
try:
- value = config_backend.get(config_section, key)
+ value = self.config_backend.get(section, parameter.id)
except (NoOptionError, NoSectionError):
+ # Ignore, we return None.
pass
- if value:
- if self._force_interactive:
- prompt = "%s[%s]: " % (key, value)
- else:
- return value
- try:
- user_input = raw_input(prompt).strip()
- except EOFError:
- user_input = None
- if user_input:
- value = user_input
- if not config_backend.has_section(config_section) and config_section != 'DEFAULT':
- config_backend.add_section(config_section)
- config_backend.set(config_section, key, value)
-
- if value:
- if config_section not in self._cache:
- self._cache[config_section] = {}
- self._cache[config_section][key] = value
- return value
- else:
- raise KeyError(key)
-
-class NonInteractiveConfig(object):
-
- def __init__(self, data):
- self.data = data
-
- def get(self, parameter):
- return self.data[parameter.id]
+ return value
+
+ def _get_from_cache(self, parameter, section):
+ """Looks for the specified parameter in the internal cache.
+
+ :param parameter: The parameter to search.
+ :type Parameter
+ :return The parameter value, of None if it is not found.
+ """
+ value = None
+ if section in self._cache.keys():
+ if parameter.id in self._cache[section].keys():
+ value = self._cache[section][parameter.id]
+ return value
+
+ def _put_in_cache(self, key, value, section=DEFAULT_SECTION):
+ """Insert the passed parameter in the internal cache.
+
+ :param parameter: The parameter to insert.
+ :type Parameter
+ :param section: The name of the section in the config file.
+ :type str
+ """
+ if section not in self._cache.keys():
+ self._cache[section] = {}
+ self._cache[section][key] = value
+
+ def put(self, key, value, section=DEFAULT_SECTION):
+ """Adds a parameter to the config file.
+
+ :param key: The key to add.
+ :param value: The value to add.
+ :param section: The name of the section as in the config file.
+ """
+ if (not self.config_backend.has_section(section) and
+ section != DEFAULT_SECTION):
+ self.config_backend.add_section(section)
+
+ # This is done to serialize a list when ConfigParser is written to
+ # file. Since there is no real support for list in ConfigParser, we
+ # serialized it in a common way that can get easily deserialized.
+ if isinstance(value, list):
+ value = Parameter.serialize(value)
+
+ self.config_backend.set(section, key, value)
+ # Store in the cache too.
+ self._put_in_cache(key, value, section)
+
+ def put_parameter(self, parameter, value=None, section=None):
+ """Adds a Parameter to the config file and cache.
+
+ :param Parameter: The parameter to add.
+ :type Parameter
+ :param value: The value of the parameter. Defaults to None.
+ :param section: The section where this parameter should be stored.
+ Defaults to None.
+ """
+ if not section:
+ section = self._calculate_config_section(parameter)
+
+ if value is None and parameter.value is not None:
+ value = parameter.value
+ elif value is None:
+ raise CommandError("No value assigned to '{0}'.".format(
+ parameter.id))
+ self.put(parameter.id, value, section)
+
+ def save(self):
+ """Saves the config to file."""
+ # Since we lazy load the config_backend property, this check is needed
+ # when a user enters a wrong command or it will overwrite the 'config'
+ # file with empty contents.
+ if self._config_backend:
+ with open(self.config_file, "w") as write_file:
+ self.config_backend.write(write_file)
+
+
+class InteractiveConfig(Config):
+ """An interactive config.
+
+ If a value is not found in the config file, it will ask it and then stores
+ it.
+ """
+ def __init__(self, force_interactive=True):
+ super(InteractiveConfig, self).__init__()
+ self._force_interactive = force_interactive
+
+ @property
+ def force_interactive(self):
+ return self._force_interactive
+
+ @force_interactive.setter
+ def force_interactive(self, value):
+ self._force_interactive = value
+
+ def get(self, parameter, section=None):
+ """Overrides the parent one.
+
+ The only difference with the parent one, is that it will ask to type
+ a parameter value in case it is not found.
+ """
+ if not section:
+ section = self._calculate_config_section(parameter)
+ value = super(InteractiveConfig, self).get(parameter, section)
+
+ if value is None or self.force_interactive:
+ value = parameter.prompt(old_value=value)
+
+ if value is not None and parameter.store:
+ self.put(parameter.id, value, section)
+ return value
=== added directory 'lava/device'
=== added file 'lava/device/__init__.py'
@@ -0,0 +1,97 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""Device class."""
+
+import re
+
+from copy import deepcopy
+
+from lava.device.templates import (
+ DEFAULT_TEMPLATE,
+ HOSTNAME_PARAMETER,
+ KNOWN_TEMPLATES,
+)
+from lava.helper.template import expand_template
+
+
+def __re_compile(name):
+ """Creates a generic regex for the specified device name.
+
+ :param name: The name of the device.
+ :return A Pattern object.
+ """
+ return re.compile('^.*{0}.*'.format(name), re.I)
+
+
+# Dictionary of know devices.
+# Keys are the general device name taken from lava.device.templates, values
+# are tuples of: a regex matcher to match the device, and the device associated
+# template.
+KNOWN_DEVICES = dict([(device, (__re_compile(device), template))
+ for device, template in KNOWN_TEMPLATES.iteritems()])
+
+
+class Device(object):
+
+ """A generic device."""
+
+ def __init__(self, data, hostname=None):
+ self.data = deepcopy(data)
+ self.hostname = hostname
+
+ def write(self, conf_file):
+ """Writes the object to file.
+
+ :param conf_file: The full path of the file where to write."""
+ with open(conf_file, 'w') as write_file:
+ write_file.write(str(self))
+
+ def update(self, config):
+ """Updates the Device object values based on the provided config.
+
+ :param config: A Config instance.
+ """
+ # We should always have a hostname, since it defaults to the name
+ # given on the command line for the config file.
+ if self.hostname is not None:
+ # We do not ask the user again this parameter.
+ self.data[HOSTNAME_PARAMETER.id].value = self.hostname
+ self.data[HOSTNAME_PARAMETER.id].asked = True
+
+ expand_template(self.data, config)
+
+ def __str__(self):
+ string_list = []
+ for key, value in self.data.iteritems():
+ string_list.append("{0} = {1}\n".format(str(key), str(value)))
+ return "".join(string_list)
+
+
+def get_known_device(name):
+ """Tries to match a device name with a known device type.
+
+ :param name: The name of the device we want matched to a real device.
+ :return A Device instance.
+ """
+ instance = Device(DEFAULT_TEMPLATE, hostname=name)
+ for _, (matcher, dev_template) in KNOWN_DEVICES.iteritems():
+ if matcher.match(name):
+ instance = Device(dev_template, hostname=name)
+ break
+ return instance
=== added file 'lava/device/commands.py'
@@ -0,0 +1,122 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Device specific commands class.
+"""
+
+import os
+import sys
+
+from lava.device import get_known_device
+from lava.helper.command import (
+ BaseCommand,
+)
+from lava.helper.dispatcher import (
+ get_device_file,
+ get_devices_path,
+)
+from lava.tool.command import CommandGroup
+from lava.tool.errors import CommandError
+from lava_tool.utils import (
+ can_edit_file,
+ edit_file,
+)
+
+DEVICE_FILE_SUFFIX = "conf"
+
+
+class device(CommandGroup):
+ """LAVA devices handling."""
+
+ namespace = "lava.device.commands"
+
+
+class add(BaseCommand):
+ """Adds a new device."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(add, cls).register_arguments(parser)
+ parser.add_argument("DEVICE", help="The name of the device to add.")
+
+ def invoke(self):
+ real_file_name = ".".join([self.args.DEVICE, DEVICE_FILE_SUFFIX])
+
+ if get_device_file(real_file_name) is not None:
+ print >> sys.stdout, ("A device configuration file named '{0}' "
+ "already exists.".format(real_file_name))
+ print >> sys.stdout, ("Use 'lava device config {0}' to edit "
+ "it.".format(self.args.DEVICE))
+ sys.exit(-1)
+
+ devices_path = get_devices_path()
+ device_conf_file = os.path.abspath(os.path.join(devices_path,
+ real_file_name))
+
+ device = get_known_device(self.args.DEVICE)
+ device.update(self.config)
+ device.write(device_conf_file)
+
+ print >> sys.stdout, ("Created device file '{0}' in: {1}".format(
+ real_file_name, devices_path))
+ edit_file(device_conf_file)
+
+
+class remove(BaseCommand):
+ """Removes the specified device."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(remove, cls).register_arguments(parser)
+ parser.add_argument("DEVICE",
+ help="The name of the device to remove.")
+
+ def invoke(self):
+ real_file_name = ".".join([self.args.DEVICE, DEVICE_FILE_SUFFIX])
+ device_conf = get_device_file(real_file_name)
+
+ if device_conf:
+ try:
+ os.remove(device_conf)
+ print >> sys.stdout, ("Device configuration file '{0}' "
+ "removed.".format(real_file_name))
+ except OSError:
+ raise CommandError("Cannot remove file '{0}' at: {1}.".format(
+ real_file_name, os.path.dirname(device_conf)))
+ else:
+ print >> sys.stdout, ("No device configuration file '{0}' "
+ "found.".format(real_file_name))
+
+
+class config(BaseCommand):
+ """Opens the specified device config file."""
+ @classmethod
+ def register_arguments(cls, parser):
+ super(config, cls).register_arguments(parser)
+ parser.add_argument("DEVICE",
+ help="The name of the device to edit.")
+
+ def invoke(self):
+ real_file_name = ".".join([self.args.DEVICE, DEVICE_FILE_SUFFIX])
+ device_conf = get_device_file(real_file_name)
+
+ if device_conf and can_edit_file(device_conf):
+ edit_file(device_conf)
+ else:
+ raise CommandError("Cannot edit file '{0}'".format(real_file_name))
=== added file 'lava/device/templates.py'
@@ -0,0 +1,82 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+This is just a place where to store a template like dictionary that
+will be used to serialize a Device object.
+"""
+
+from copy import copy
+
+from lava.parameter import Parameter
+
+# The hostname parameter is always in the DEFAULT config section.
+HOSTNAME_PARAMETER = Parameter("hostname")
+DEVICE_TYPE_PARAMETER = Parameter("device_type", depends=HOSTNAME_PARAMETER)
+CONNECTION_COMMAND_PARMAETER = Parameter("connection_command",
+ depends=DEVICE_TYPE_PARAMETER)
+
+DEFAULT_TEMPLATE = {
+ 'hostname': HOSTNAME_PARAMETER,
+ 'device_type': DEVICE_TYPE_PARAMETER,
+ 'connection_command': CONNECTION_COMMAND_PARMAETER,
+}
+
+# Specialized copies of the parameters.
+# We need this or we might end up asking the user twice the same parameter due
+# to different object references when one Parameter depends on a "specialized"
+# one, different from the defaults.
+PANDA_DEVICE_TYPE = copy(DEVICE_TYPE_PARAMETER)
+PANDA_DEVICE_TYPE.value = "panda"
+PANDA_DEVICE_TYPE.asked = True
+
+PANDA_CONNECTION_COMMAND = copy(CONNECTION_COMMAND_PARMAETER)
+PANDA_CONNECTION_COMMAND.depends = PANDA_DEVICE_TYPE
+
+VEXPRESS_DEVICE_TYPE = copy(DEVICE_TYPE_PARAMETER)
+VEXPRESS_DEVICE_TYPE.value = "vexpress"
+VEXPRESS_DEVICE_TYPE.asked = True
+
+VEXPRESS_CONNECTION_COMMAND = copy(CONNECTION_COMMAND_PARMAETER)
+VEXPRESS_CONNECTION_COMMAND.depends = VEXPRESS_DEVICE_TYPE
+
+QEMU_DEVICE_TYPE = copy(DEVICE_TYPE_PARAMETER)
+QEMU_DEVICE_TYPE.value = "qemu"
+QEMU_DEVICE_TYPE.asked = True
+
+QEMU_CONNECTION_COMMAND = copy(CONNECTION_COMMAND_PARMAETER)
+QEMU_CONNECTION_COMMAND.depends = QEMU_DEVICE_TYPE
+
+# Dictionary with templates of known devices.
+KNOWN_TEMPLATES = {
+ 'panda': {
+ 'hostname': HOSTNAME_PARAMETER,
+ 'device_type': PANDA_DEVICE_TYPE,
+ 'connection_command': PANDA_CONNECTION_COMMAND,
+ },
+ 'vexpress': {
+ 'hostname': HOSTNAME_PARAMETER,
+ 'device_type': VEXPRESS_DEVICE_TYPE,
+ 'connection_command': VEXPRESS_CONNECTION_COMMAND,
+ },
+ 'qemu': {
+ 'hostname': HOSTNAME_PARAMETER,
+ 'device_type': QEMU_DEVICE_TYPE,
+ 'connection_command': QEMU_CONNECTION_COMMAND,
+ }
+}
=== added directory 'lava/device/tests'
=== added file 'lava/device/tests/__init__.py'
=== added file 'lava/device/tests/test_commands.py'
@@ -0,0 +1,182 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+lava.device.commands unit tests.
+"""
+
+import os
+
+from mock import (
+ MagicMock,
+ call,
+ patch,
+)
+
+from lava.device.commands import (
+ add,
+ config,
+ remove,
+)
+from lava.helper.tests.helper_test import HelperTest
+from lava.tool.errors import CommandError
+
+
+class AddCommandTest(HelperTest):
+
+ def test_register_argument(self):
+ # Make sure that the parser add_argument is called and we have the
+ # correct argument.
+ add_command = add(self.parser, self.args)
+ add_command.register_arguments(self.parser)
+ name, args, kwargs = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ name, args, kwargs = self.parser.method_calls[1]
+ self.assertIn("DEVICE", args)
+
+ @patch("lava.device.commands.edit_file", create=True)
+ @patch("lava.device.Device.__str__")
+ @patch("lava.device.Device.update")
+ @patch("lava.device.commands.get_device_file")
+ @patch("lava.device.commands.get_devices_path")
+ def test_add_invoke_0(self, mocked_get_devices_path,
+ mocked_get_device_file, mocked_update, mocked_str,
+ mocked_edit_file):
+ # Tests invocation of the add command. Verifies that the conf file is
+ # written to disk.
+ mocked_get_devices_path.return_value = self.temp_dir
+ mocked_get_device_file.return_value = None
+ mocked_str.return_value = ""
+
+ add_command = add(self.parser, self.args)
+ add_command.invoke()
+
+ expected_path = os.path.join(self.temp_dir,
+ ".".join([self.device, "conf"]))
+ self.assertTrue(os.path.isfile(expected_path))
+
+ @patch("lava.device.commands.edit_file", create=True)
+ @patch("lava.device.commands.get_known_device")
+ @patch("lava.device.commands.get_devices_path")
+ @patch("lava.device.commands.sys.exit")
+ @patch("lava.device.commands.get_device_file")
+ def test_add_invoke_1(self, mocked_get_device_file, mocked_sys_exit,
+ mocked_get_devices_path, mocked_get_known_device,
+ mocked_edit_file):
+ mocked_get_devices_path.return_value = self.temp_dir
+ mocked_get_device_file.return_value = self.temp_file.name
+
+ add_command = add(self.parser, self.args)
+ add_command.invoke()
+
+ self.assertTrue(mocked_sys_exit.called)
+
+
+class RemoveCommandTests(HelperTest):
+
+ def test_register_argument(self):
+ # Make sure that the parser add_argument is called and we have the
+ # correct argument.
+ command = remove(self.parser, self.args)
+ command.register_arguments(self.parser)
+ name, args, kwargs = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ name, args, kwargs = self.parser.method_calls[1]
+ self.assertIn("DEVICE", args)
+
+ @patch("lava.device.commands.edit_file", create=True)
+ @patch("lava.device.Device.__str__", return_value="")
+ @patch("lava.device.Device.update")
+ @patch("lava.device.commands.get_device_file")
+ @patch("lava.device.commands.get_devices_path")
+ def test_remove_invoke(self, get_devices_path_mock, get_device_file_mock,
+ mocked_update, mocked_str, mocked_edit_file):
+ # Tests invocation of the remove command. Verifies that the conf file
+ # has been correctly removed.
+ # First we add a new conf file, then we remove it.
+ get_device_file_mock.return_value = None
+ get_devices_path_mock.return_value = self.temp_dir
+
+ add_command = add(self.parser, self.args)
+ add_command.invoke()
+
+ expected_path = os.path.join(self.temp_dir,
+ ".".join([self.device, "conf"]))
+
+ # Set new values for the mocked function.
+ get_device_file_mock.return_value = expected_path
+
+ remove_command = remove(self.parser, self.args)
+ remove_command.invoke()
+
+ self.assertFalse(os.path.isfile(expected_path))
+
+ @patch("lava.device.commands.get_device_file",
+ new=MagicMock(return_value="/root"))
+ def test_remove_invoke_raises(self):
+ # Tests invocation of the remove command, with a non existent device
+ # configuration file.
+ remove_command = remove(self.parser, self.args)
+ self.assertRaises(CommandError, remove_command.invoke)
+
+
+class ConfigCommanTests(HelperTest):
+
+ def test_register_argument(self):
+ # Make sure that the parser add_argument is called and we have the
+ # correct argument.
+ command = config(self.parser, self.args)
+ command.register_arguments(self.parser)
+ name, args, kwargs = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ name, args, kwargs = self.parser.method_calls[1]
+ self.assertIn("DEVICE", args)
+
+ @patch("lava.device.commands.can_edit_file", create=True)
+ @patch("lava.device.commands.edit_file", create=True)
+ @patch("lava.device.commands.get_device_file")
+ def test_config_invoke_0(self, mocked_get_device_file, mocked_edit_file,
+ mocked_can_edit_file):
+ command = config(self.parser, self.args)
+
+ mocked_can_edit_file.return_value = True
+ mocked_get_device_file.return_value = self.temp_file.name
+ command.invoke()
+
+ self.assertTrue(mocked_edit_file.called)
+ self.assertEqual([call(self.temp_file.name)],
+ mocked_edit_file.call_args_list)
+
+ @patch("lava.device.commands.get_device_file",
+ new=MagicMock(return_value=None))
+ def test_config_invoke_raises_0(self):
+ # Tests invocation of the config command, with a non existent device
+ # configuration file.
+ config_command = config(self.parser, self.args)
+ self.assertRaises(CommandError, config_command.invoke)
+
+ @patch("lava.device.commands.get_device_file",
+ new=MagicMock(return_value="/etc/password"))
+ def test_config_invoke_raises_1(self):
+ # Tests invocation of the config command, with a non writable file.
+ # Hopefully tests are not run as root.
+ config_command = config(self.parser, self.args)
+ self.assertRaises(CommandError, config_command.invoke)
=== added file 'lava/device/tests/test_device.py'
@@ -0,0 +1,119 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Device class unit tests.
+"""
+
+from mock import patch
+
+from lava.config import Config
+from lava.device import (
+ Device,
+ get_known_device,
+)
+from lava.device.templates import (
+ HOSTNAME_PARAMETER,
+ PANDA_DEVICE_TYPE,
+ PANDA_CONNECTION_COMMAND,
+)
+from lava.helper.tests.helper_test import HelperTest
+from lava.parameter import Parameter
+
+
+class DeviceTest(HelperTest):
+
+ def test_get_known_device_panda_0(self):
+ # User creates a new device with a guessable name for a device.
+ instance = get_known_device('panda_new_01')
+ self.assertIsInstance(instance, Device)
+ self.assertEqual(instance.data['device_type'].value, 'panda')
+
+ def test_get_known_device_panda_1(self):
+ # User creates a new device with a guessable name for a device.
+ # Name passed has capital letters.
+ instance = get_known_device('new_PanDa_02')
+ self.assertIsInstance(instance, Device)
+ self.assertEqual(instance.data['device_type'].value, 'panda')
+
+ def test_get_known_device_vexpress_0(self):
+ # User creates a new device with a guessable name for a device.
+ # Name passed has capital letters.
+ instance = get_known_device('a_VexPress_Device')
+ self.assertIsInstance(instance, Device)
+ self.assertEqual(instance.data['device_type'].value, 'vexpress')
+
+ def test_get_known_device_vexpress_1(self):
+ # User creates a new device with a guessable name for a device.
+ instance = get_known_device('another-vexpress')
+ self.assertIsInstance(instance, Device)
+ self.assertIsInstance(instance.data['device_type'], Parameter)
+ self.assertEqual(instance.data['device_type'].value, 'vexpress')
+
+ @patch("lava.config.Config.save")
+ def test_device_update_1(self, patched_save):
+ # Tests that when calling update() on a Device, the template gets
+ # updated with the correct values from a Config instance.
+ hostname = "panda_device"
+
+ config = Config()
+ config._config_file = self.temp_file.name
+ config.put_parameter(HOSTNAME_PARAMETER, hostname)
+ config.put_parameter(PANDA_DEVICE_TYPE, "panda")
+ config.put_parameter(PANDA_CONNECTION_COMMAND, "test")
+
+ expected = {
+ "hostname": hostname,
+ "device_type": "panda",
+ "connection_command": "test"
+ }
+
+ instance = get_known_device(hostname)
+ instance.update(config)
+
+ self.assertEqual(expected, instance.data)
+
+ @patch("lava.config.Config.save")
+ def test_device_write(self, mocked_save):
+ # User tries to create a new panda device. The conf file is written
+ # and contains the expected results.
+ hostname = "panda_device"
+
+ config = Config()
+ config._config_file = self.temp_file.name
+ config.put_parameter(HOSTNAME_PARAMETER, hostname)
+ config.put_parameter(PANDA_DEVICE_TYPE, "panda")
+ config.put_parameter(PANDA_CONNECTION_COMMAND, "test")
+
+ expected = {
+ "hostname": hostname,
+ "device_type": "panda",
+ "connection_command": "test"
+ }
+
+ instance = get_known_device(hostname)
+ instance.update(config)
+ instance.write(self.temp_file.name)
+
+ expected = ("hostname = panda_device\nconnection_command = test\n"
+ "device_type = panda\n")
+
+ obtained = ""
+ with open(self.temp_file.name) as f:
+ obtained = f.read()
+ self.assertEqual(expected, obtained)
=== added directory 'lava/helper'
=== added file 'lava/helper/__init__.py'
=== added file 'lava/helper/command.py'
@@ -0,0 +1,242 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""Base command class common to lava commands series."""
+
+import os
+import sys
+import xmlrpclib
+
+from lava.config import InteractiveConfig
+from lava.helper.dispatcher import get_devices
+from lava.job import Job
+from lava.job.templates import (
+ LAVA_TEST_SHELL_TAR_REPO,
+ LAVA_TEST_SHELL_TAR_REPO_KEY,
+ LAVA_TEST_SHELL_TESDEF_KEY,
+)
+from lava.parameter import (
+ Parameter,
+ SingleChoiceParameter,
+)
+from lava.script import (
+ ShellScript,
+ DEFAULT_TESTDEF_SCRIPT,
+)
+from lava.testdef import TestDefinition
+from lava.testdef.templates import (
+ TESTDEF_STEPS_KEY,
+ TESTDEF_TEMPLATE,
+)
+from lava.tool.command import Command
+from lava.tool.errors import CommandError
+from lava_tool.authtoken import (
+ AuthenticatingServerProxy,
+ KeyringAuthBackend
+)
+from lava_tool.utils import (
+ base64_encode,
+ create_tar,
+ execute,
+ has_command,
+ to_list,
+ verify_and_create_url,
+)
+
+CONFIG = InteractiveConfig()
+
+
+class BaseCommand(Command):
+
+ """Base command class for all lava commands."""
+
+ def __init__(self, parser, args):
+ super(BaseCommand, self).__init__(parser, args)
+ self.config = CONFIG
+ self.config.force_interactive = self.args.non_interactive
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(BaseCommand, cls).register_arguments(parser)
+ parser.add_argument("--non-interactive", "-n",
+ action='store_false',
+ help=("Do not ask for input parameters."))
+
+ def authenticated_server(self):
+ """Returns a connection to a LAVA server.
+
+ It will ask the user the necessary parameters to establish the
+ connection.
+ """
+ print >> sys.stdout, "\nServer connection parameters:"
+
+ server_name_parameter = Parameter("server")
+ rpc_endpoint_parameter = Parameter("rpc_endpoint",
+ depends=server_name_parameter)
+
+ server_url = self.config.get(server_name_parameter)
+ endpoint = self.config.get(rpc_endpoint_parameter)
+
+ rpc_url = verify_and_create_url(server_url, endpoint)
+ server = AuthenticatingServerProxy(rpc_url,
+ auth_backend=KeyringAuthBackend())
+ return server
+
+ def submit(self, job_file):
+ """Submits a job file to a LAVA server.
+
+ :param job_file: The job file to submit.
+ :return The job ID on success.
+ """
+ if os.path.isfile(job_file):
+ try:
+ jobdata = open(job_file, 'rb').read()
+ server = self.authenticated_server()
+
+ job_id = server.scheduler.submit_job(jobdata)
+ print >> sys.stdout, ("Job submitted with job "
+ "ID {0}.".format(job_id))
+
+ return job_id
+ except xmlrpclib.Fault, exc:
+ raise CommandError(str(exc))
+ else:
+ raise CommandError("Job file '{0}' does not exists, or is not "
+ "a file.".format(job_file))
+
+ def run(self, job_file):
+ """Runs a job file on the local LAVA dispatcher.
+
+ :param job_file: The job file to run.
+ """
+ if os.path.isfile(job_file):
+ if has_command("lava-dispatch"):
+ devices = get_devices()
+ if devices:
+ if len(devices) > 1:
+ device_names = [device.hostname for device in devices]
+ device_param = SingleChoiceParameter("device",
+ device_names)
+ device = device_param.prompt("Device to use: ")
+ else:
+ device = devices[0].hostname
+ execute(["lava-dispatch", "--target", device, job_file])
+ else:
+ raise CommandError("Cannot find lava-dispatcher installation.")
+ else:
+ raise CommandError("Job file '{0}' does not exists, or it is not "
+ "a file.".format(job_file))
+
+ def status(self, job_id):
+ """Retrieves the status of a LAVA job.
+
+ :param job_id: The ID of the job to look up.
+ """
+ job_id = str(job_id)
+
+ try:
+ server = self.authenticated_server()
+ job_status = server.scheduler.job_status(job_id)
+
+ status = job_status["job_status"].lower()
+ bundle = job_status["bundle_sha1"]
+
+ print >> sys.stdout, "\nJob id: {0}".format(job_id)
+ print >> sys.stdout, "Status: {0}".format(status)
+ print >> sys.stdout, "Bundle: {0}".format(bundle)
+ except xmlrpclib.Fault, exc:
+ raise CommandError(str(exc))
+
+ def create_tar_repo_job(self, job_file, testdef_file, tar_content):
+ """Creates a job file based on the tar-repo template.
+
+ The tar repo is not kept on the file system.
+
+ :param job_file: The path of the job file to create.
+ :param testdef_file: The path of the test definition file.
+ :param tar_content: What should go into the tarball repository.
+ :return The path of the job file created.
+ """
+
+ print >> sys.stdout, "\nCreating job file..."
+
+ try:
+ tar_repo = create_tar(tar_content)
+
+ job_instance = Job(LAVA_TEST_SHELL_TAR_REPO, job_file)
+ job_instance.update(self.config)
+
+ job_instance.set(LAVA_TEST_SHELL_TAR_REPO_KEY,
+ base64_encode(tar_repo))
+ job_instance.set(LAVA_TEST_SHELL_TESDEF_KEY,
+ os.path.basename(testdef_file))
+
+ job_instance.write()
+
+ basename = os.path.basename(job_instance.file_name)
+ print >> sys.stdout, ("\nCreated job file "
+ "'{0}'.".format(basename))
+
+ return job_instance.file_name
+ finally:
+ if os.path.isfile(tar_repo):
+ os.unlink(tar_repo)
+
+ def create_test_definition(self, testdef_file, template=TESTDEF_TEMPLATE,
+ steps=None):
+ """Creates a test definition YAML file.
+
+ :param testdef_file: The file to create.
+ :return The path of the file created.
+ """
+
+ print >> sys.stdout, "\nCreating test definition file..."
+
+ testdef = TestDefinition(template, testdef_file)
+ if steps:
+ steps = to_list(steps)
+ testdef.set(TESTDEF_STEPS_KEY, steps)
+ testdef.update(self.config)
+ testdef.write()
+
+ basename = os.path.basename(testdef.file_name)
+ print >> sys.stdout, ("\nCreated test definition "
+ "'{0}'.".format(basename))
+
+ return testdef.file_name
+
+ def create_shell_script(self, test_path,
+ script_name=DEFAULT_TESTDEF_SCRIPT):
+ """Creates a shell script with some default content.
+
+ :param test_path: The directory where to create the script.
+ :param script_name: The name of the script.
+ :return The full path to the script file.
+ """
+ default_script = os.path.join(test_path, script_name)
+
+ if not os.path.isfile(default_script):
+ print >> sys.stdout, "Creating shell script..."
+
+ shell_script = ShellScript(default_script)
+ shell_script.write()
+
+ print >> sys.stdout, ("\nCreated shell script "
+ "'{0}'.".format(script_name))
+
+ return default_script
=== added file 'lava/helper/dispatcher.py'
@@ -0,0 +1,110 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""Classes and functions to interact with the lava-dispatcher."""
+
+import random
+import string
+import os
+
+from lava.tool.errors import CommandError
+
+# Default devices path, has to be joined with the dispatcher path.
+DEFAULT_DEVICES_PATH = "devices"
+
+
+def get_dispatcher_paths():
+ """Tries to get the dispatcher paths from lava-dispatcher.
+
+ :return A list of paths.
+ """
+ try:
+ from lava_dispatcher.config import write_path
+ return write_path()
+ except ImportError:
+ raise CommandError("Cannot find lava-dispatcher installation.")
+
+
+def get_devices():
+ """Gets the devices list from the dispatcher.
+
+ :return A list of DeviceConfig.
+ """
+ try:
+ from lava_dispatcher.config import get_devices
+ return get_devices()
+ except ImportError:
+ raise CommandError("Cannot find lava-dispatcher installation.")
+
+
+def get_device_file(file_name):
+ """Retrieves the config file name specified, if it exists.
+
+ :param file_name: The config file name to search.
+ :return The path to the file, or None if it does not exist.
+ """
+ try:
+ from lava_dispatcher.config import get_config_file
+ return get_config_file(os.path.join(DEFAULT_DEVICES_PATH,
+ file_name))
+ except ImportError:
+ raise CommandError("Cannot find lava-dispatcher installation.")
+
+
+def choose_devices_path(paths):
+ """Picks the first path that is writable by the user.
+
+ :param paths: A list of paths.
+ :return The first path where it is possible to write.
+ """
+ valid_path = None
+ for path in paths:
+ path = os.path.join(path, DEFAULT_DEVICES_PATH)
+ if os.path.exists(path):
+ name = "".join(random.choice(string.ascii_letters)
+ for x in range(6))
+ test_file = os.path.join(path, name)
+ try:
+ fp = open(test_file, 'a')
+ fp.close()
+ except IOError:
+ # Cannot write here.
+ continue
+ else:
+ valid_path = path
+ if os.path.isfile(test_file):
+ os.unlink(test_file)
+ break
+ else:
+ try:
+ os.makedirs(path)
+ except OSError:
+ # Cannot write here either.
+ continue
+ else:
+ valid_path = path
+ break
+ else:
+ raise CommandError("Insufficient permissions to create new "
+ "devices.")
+ return valid_path
+
+
+def get_devices_path():
+ """Gets the path to the devices in the LAVA dispatcher."""
+ return choose_devices_path(get_dispatcher_paths())
=== added file 'lava/helper/template.py'
@@ -0,0 +1,124 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""Helper functions for a template."""
+
+from lava.parameter import Parameter
+
+
+def expand_template(template, config):
+ """Updates a template based on the values from the provided config.
+
+ :param template: A template to be updated.
+ :param config: A Config instance where values should be taken.
+ """
+
+ def update(data):
+ """Internal recursive function."""
+ if isinstance(data, dict):
+ keys = data.keys()
+ elif isinstance(data, list):
+ keys = range(len(data))
+ else:
+ return
+ for key in keys:
+ entry = data[key]
+ if isinstance(entry, Parameter):
+ data[key] = config.get(entry)
+ else:
+ update(entry)
+
+ update(template)
+
+
+def get_key(data, search_key):
+ """Goes through a template looking for a key.
+
+ :param data: The template to traverse.
+ :param search_key: The key to look for.
+ :return The key value.
+ """
+ return_value = None
+ found = False
+
+ if isinstance(data, dict):
+ bucket = []
+
+ for key, value in data.iteritems():
+ if key == search_key:
+ return_value = value
+ found = True
+ break
+ else:
+ bucket.append(value)
+
+ if bucket and not found:
+ for element in bucket:
+ if isinstance(element, list):
+ for element in element:
+ bucket.append(element)
+ elif isinstance(element, dict):
+ for key, value in element.iteritems():
+ if key == search_key:
+ return_value = value
+ found = True
+ break
+ else:
+ bucket.append(value)
+ if found:
+ break
+
+ return return_value
+
+
+def set_value(data, search_key, new_value):
+ """Sets a new value for a template key.
+
+ :param data: The data structure to update.
+ :type dict
+ :param search_key: The key to search and update.
+ :param new_value: The new value to set.
+ """
+ is_set = False
+
+ if isinstance(data, dict):
+ bucket = []
+
+ for key, value in data.iteritems():
+ if key == search_key:
+ data[key] = new_value
+ is_set = True
+ break
+ else:
+ bucket.append(value)
+
+ if bucket and not is_set:
+ for element in bucket:
+ if isinstance(element, list):
+ for element in element:
+ bucket.append(element)
+ elif isinstance(element, dict):
+ for key, value in element.iteritems():
+ if key == search_key:
+ element[key] = new_value
+ is_set = True
+ break
+ else:
+ bucket.append(value)
+ if is_set:
+ break
=== added directory 'lava/helper/tests'
=== added file 'lava/helper/tests/__init__.py'
=== added file 'lava/helper/tests/helper_test.py'
@@ -0,0 +1,81 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+A test helper class.
+
+Here we define a general test class and its own setUp and tearDown methods that
+all other test classes can inherit from.
+"""
+
+import os
+import shutil
+import sys
+import tempfile
+
+from unittest import TestCase
+from mock import (
+ MagicMock,
+ patch
+)
+
+
+class HelperTest(TestCase):
+ """Helper test class that all tests under the lava package can inherit."""
+
+ def setUp(self):
+ # Need to patch it here, not as a decorator, or running the tests
+ # via `./setup.py test` will fail.
+ self.at_exit_patcher = patch("lava.config.AT_EXIT_CALLS", spec=set)
+ self.at_exit_patcher.start()
+ self.original_stdout = sys.stdout
+ sys.stdout = open("/dev/null", "w")
+ self.original_stderr = sys.stderr
+ sys.stderr = open("/dev/null", "w")
+ self.original_stdin = sys.stdin
+
+ self.device = "a_fake_panda02"
+
+ self.temp_file = tempfile.NamedTemporaryFile(delete=False)
+ self.temp_dir = tempfile.mkdtemp()
+ self.parser = MagicMock()
+ self.args = MagicMock()
+ self.args.interactive = MagicMock(return_value=False)
+ self.args.DEVICE = self.device
+
+ def tearDown(self):
+ self.at_exit_patcher.stop()
+ sys.stdin = self.original_stdin
+ sys.stdout = self.original_stdout
+ sys.stderr = self.original_stderr
+ shutil.rmtree(self.temp_dir)
+ os.unlink(self.temp_file.name)
+
+ def tmp(self, name):
+ """
+ Returns the full path to a file, or directory, called `name` in a
+ temporary directory.
+
+ This method does not create the file, it only gives a full filename
+ where you can actually write some data. The file will not be removed
+ by this method.
+
+ :param name: The name the file/directory should have.
+ :return A path.
+ """
+ return os.path.join(tempfile.gettempdir(), name)
=== added file 'lava/helper/tests/test_command.py'
@@ -0,0 +1,47 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""lava.helper.command module tests."""
+
+from mock import MagicMock, patch
+
+
+from lava.helper.command import BaseCommand
+from lava.helper.tests.helper_test import HelperTest
+
+
+class BaseCommandTests(HelperTest):
+
+ def test_register_argument(self):
+ # Make sure that the parser add_argument is called and we have the
+ # correct argument.
+ command = BaseCommand(self.parser, self.args)
+ command.register_arguments(self.parser)
+ name, args, kwargs = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ @patch("lava.helper.command.AuthenticatingServerProxy", create=True)
+ def test_authenticated_server(self, mocked_auth_server):
+ command = BaseCommand(self.parser, self.args)
+ command.config = MagicMock()
+ command.config.get = MagicMock()
+ command.config.get.side_effect = ["www.example.org", "RPC"]
+
+ command.authenticated_server()
+
+ self.assertTrue(mocked_auth_server.called)
=== added file 'lava/helper/tests/test_dispatcher.py'
@@ -0,0 +1,77 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""lava.helper.dispatcher tests."""
+
+import os
+import tempfile
+
+from mock import patch
+
+from lava.tool.errors import CommandError
+from lava.helper.tests.helper_test import HelperTest
+from lava.helper.dispatcher import (
+ choose_devices_path,
+)
+
+
+class DispatcherTests(HelperTest):
+
+ def setUp(self):
+ super(DispatcherTests, self).setUp()
+ self.devices_dir = os.path.join(tempfile.gettempdir(), "devices")
+ os.makedirs(self.devices_dir)
+
+ def tearDown(self):
+ super(DispatcherTests, self).tearDown()
+ os.removedirs(self.devices_dir)
+
+ def test_choose_devices_path_0(self):
+ # Tests that when passing more than one path, the first writable one
+ # is returned.
+ obtained = choose_devices_path(
+ ["/", "/root", self.temp_dir, os.path.expanduser("~")])
+ expected = os.path.join(self.temp_dir, "devices")
+ self.assertEqual(expected, obtained)
+
+ def test_choose_devices_path_1(self):
+ # Tests that when passing a path that is not writable, CommandError
+ # is raised.
+ self.assertRaises(CommandError, choose_devices_path,
+ ["/", "/root", "/root/tmpdir"])
+
+ def test_choose_devices_path_2(self):
+ # Tests that the correct path for devices is created on the filesystem.
+ expected_path = os.path.join(self.temp_dir, "devices")
+ obtained = choose_devices_path([self.temp_dir])
+ self.assertEqual(expected_path, obtained)
+ self.assertTrue(os.path.isdir(expected_path))
+
+ def test_choose_devices_path_3(self):
+ # Tests that returns the already existing devices path.
+ obtained = choose_devices_path([tempfile.gettempdir()])
+ self.assertEqual(self.devices_dir, obtained)
+
+ @patch("__builtin__.open")
+ def test_choose_devices_path_4(self, mocked_open):
+ # Tests that when IOError is raised and we pass only one dir
+ # CommandError is raised.
+ mocked_open.side_effect = IOError()
+ self.assertRaises(CommandError, choose_devices_path,
+ [tempfile.gettempdir()])
+ self.assertTrue(mocked_open.called)
=== added file 'lava/helper/tests/test_template.py'
@@ -0,0 +1,102 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+""" """
+
+import copy
+from unittest import TestCase
+
+from lava.helper.template import (
+ get_key,
+ set_value
+)
+
+
+TEST_TEMPLATE = {
+ "key1": "value1",
+ "key2": [
+ "value2", "value3"
+ ],
+ "key3": [
+ {
+ "key4": "value4",
+ "key5": "value5"
+ },
+ {
+ "key6": "value6",
+ "key7": "value7"
+ },
+ [
+ {
+ "key8": "value8"
+ }
+ ]
+ ],
+ "key10": {
+ "key11": "value11"
+ }
+}
+
+
+class TestParameter(TestCase):
+
+ def test_get_key_simple_key(self):
+ expected = "value1"
+ obtained = get_key(TEST_TEMPLATE, "key1")
+ self.assertEquals(expected, obtained)
+
+ def test_get_key_nested_key(self):
+ expected = "value4"
+ obtained = get_key(TEST_TEMPLATE, "key4")
+ self.assertEquals(expected, obtained)
+
+ def test_get_key_nested_key_1(self):
+ expected = "value7"
+ obtained = get_key(TEST_TEMPLATE, "key7")
+ self.assertEquals(expected, obtained)
+
+ def test_get_key_nested_key_2(self):
+ expected = "value8"
+ obtained = get_key(TEST_TEMPLATE, "key8")
+ self.assertEquals(expected, obtained)
+
+ def test_get_key_nested_key_3(self):
+ expected = "value11"
+ obtained = get_key(TEST_TEMPLATE, "key11")
+ self.assertEquals(expected, obtained)
+
+ def test_set_value_0(self):
+ data = copy.deepcopy(TEST_TEMPLATE)
+ expected = "foo"
+ set_value(data, "key1", expected)
+ obtained = get_key(data, "key1")
+ self.assertEquals(expected, obtained)
+
+ def test_set_value_1(self):
+ data = copy.deepcopy(TEST_TEMPLATE)
+ expected = "foo"
+ set_value(data, "key6", expected)
+ obtained = get_key(data, "key6")
+ self.assertEquals(expected, obtained)
+
+ def test_set_value_2(self):
+ data = copy.deepcopy(TEST_TEMPLATE)
+ expected = "foo"
+ set_value(data, "key11", expected)
+ obtained = get_key(data, "key11")
+ self.assertEquals(expected, obtained)
=== modified file 'lava/job/__init__.py'
@@ -16,32 +16,58 @@
# You should have received a copy of the GNU Lesser General Public License
# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+"""Job class."""
+
+import json
+
from copy import deepcopy
-import json
-
-from lava.job.templates import Parameter
-
-class Job:
-
- def __init__(self, template):
- self.data = deepcopy(template)
-
- def fill_in(self, config):
- def insert_data(data):
- if isinstance(data, dict):
- keys = data.keys()
- elif isinstance(data, list):
- keys = range(len(data))
- else:
- return
- for key in keys:
- entry = data[key]
- if isinstance(entry, Parameter):
- data[key] = config.get(entry)
- else:
- insert_data(entry)
- insert_data(self.data)
-
- def write(self, stream):
- stream.write(json.dumps(self.data, indent=4))
-
+
+from lava.helper.template import (
+ expand_template,
+ set_value,
+)
+from lava_tool.utils import (
+ verify_file_extension,
+ verify_path_existance,
+ write_file
+)
+
+# A default name for job files.
+DEFAULT_JOB_FILENAME = "lava-tool-job.json"
+# Default job file extension.
+DEFAULT_JOB_EXTENSION = "json"
+# Possible extension for a job file.
+JOB_FILE_EXTENSIONS = [DEFAULT_JOB_EXTENSION]
+
+
+class Job(object):
+
+ """A Job object.
+
+ This class should be used to create new job files. The initialization
+ enforces a default file name extension, and makes sure that the file is
+ not already present on the file system.
+ """
+
+ def __init__(self, data, file_name):
+ self.file_name = verify_file_extension(file_name,
+ DEFAULT_JOB_EXTENSION,
+ JOB_FILE_EXTENSIONS)
+ verify_path_existance(self.file_name)
+ self.data = deepcopy(data)
+
+ def set(self, key, value):
+ """Set key to the specified value.
+
+ :param key: The key to look in the object data.
+ :param value: The value to set.
+ """
+ set_value(self.data, key, value)
+
+ def update(self, config):
+ """Updates the Job object based on the provided config."""
+ expand_template(self.data, config)
+
+ def write(self):
+ """Writes the Job object to file."""
+ write_file(self.file_name, json.dumps(self.data, indent=4))
=== modified file 'lava/job/commands.py'
@@ -16,79 +16,92 @@
# You should have received a copy of the GNU Lesser General Public License
# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-from os.path import exists
-
-from lava.config import InteractiveConfig
+"""
+LAVA job commands.
+"""
+
+import os
+
+from lava.helper.command import BaseCommand
from lava.job import Job
-from lava.job.templates import *
-from lava.tool.command import Command, CommandGroup
+from lava.job.templates import (
+ BOOT_TEST_KEY,
+ JOB_TYPES,
+)
+from lava.tool.command import CommandGroup
from lava.tool.errors import CommandError
-from lava_tool.authtoken import AuthenticatingServerProxy, KeyringAuthBackend
-import xmlrpclib
class job(CommandGroup):
- """
- LAVA job file handling
- """
-
+ """LAVA job file handling."""
namespace = 'lava.job.commands'
-class BaseCommand(Command):
-
- def __init__(self, parser, args):
- super(BaseCommand, self).__init__(parser, args)
- self.config = InteractiveConfig(force_interactive=self.args.interactive)
-
- @classmethod
- def register_arguments(cls, parser):
- super(BaseCommand, cls).register_arguments(parser)
- parser.add_argument(
- "-i", "--interactive",
- action='store_true',
- help=("Forces asking for input parameters even if we already "
- "have them cached."))
class new(BaseCommand):
+ """Creates a new job file."""
@classmethod
def register_arguments(cls, parser):
super(new, cls).register_arguments(parser)
parser.add_argument("FILE", help=("Job file to be created."))
-
- def invoke(self):
- if exists(self.args.FILE):
- raise CommandError('%s already exists' % self.args.FILE)
-
- with open(self.args.FILE, 'w') as f:
- job = Job(BOOT_TEST)
- job.fill_in(self.config)
- job.write(f)
+ parser.add_argument("--type",
+ help=("The type of job to create. Defaults to "
+ "'{0}'.".format(BOOT_TEST_KEY)),
+ choices=JOB_TYPES.keys(),
+ default=BOOT_TEST_KEY)
+
+ def invoke(self, job_template=None):
+ if not job_template:
+ job_template = JOB_TYPES.get(self.args.type)
+
+ full_path = os.path.abspath(self.args.FILE)
+
+ job_instance = Job(job_template, full_path)
+ job_instance.update(self.config)
+ job_instance.write()
class submit(BaseCommand):
+
+ """Submits the specified job file."""
+
@classmethod
def register_arguments(cls, parser):
super(submit, cls).register_arguments(parser)
- parser.add_argument("FILE", help=("The job file to submit"))
+ parser.add_argument("FILE", help=("The job file to submit."))
def invoke(self):
- jobfile = self.args.FILE
- jobdata = open(jobfile, 'rb').read()
-
- server_name = Parameter('server')
- rpc_endpoint = Parameter('rpc_endpoint', depends=server_name)
- self.config.get(server_name)
- endpoint = self.config.get(rpc_endpoint)
-
- server = AuthenticatingServerProxy(endpoint,
- auth_backend=KeyringAuthBackend())
- try:
- job_id = server.scheduler.submit_job(jobdata)
- print "Job submitted with job ID %d" % job_id
- except xmlrpclib.Fault, e:
- raise CommandError(str(e))
+ super(submit, self).submit(self.args.FILE)
+
class run(BaseCommand):
- def invoke(self):
- print("hello world")
+
+ """Runs the specified job file on the local dispatcher."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(run, cls).register_arguments(parser)
+ parser.add_argument("FILE", help=("The job file to submit."))
+
+ def invoke(self):
+ super(run, self).run(self.args.FILE)
+
+
+class status(BaseCommand):
+
+ """Retrieves the status of a job."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(status, cls).register_arguments(parser)
+ parser.add_argument("JOB_ID",
+ help=("Prints status information about the "
+ "provided job id."),
+ nargs="?",
+ default=None)
+
+ def invoke(self):
+ if self.args.JOB_ID:
+ super(status, self).status(self.args.JOB_ID)
+ else:
+ raise CommandError("It is necessary to specify a job id.")
=== modified file 'lava/job/templates.py'
@@ -16,23 +16,33 @@
# You should have received a copy of the GNU Lesser General Public License
# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-class Parameter(object):
-
- def __init__(self, id, depends=None):
- self.id = id
- self.depends = depends
-
-device_type = Parameter("device_type")
-prebuilt_image = Parameter("prebuilt_image", depends=device_type)
+from lava.parameter import (
+ ListParameter,
+ Parameter,
+)
+
+LAVA_TEST_SHELL_TAR_REPO_KEY = "tar-repo"
+LAVA_TEST_SHELL_TESDEF_KEY = "testdef"
+
+DEVICE_TYPE_PARAMETER = Parameter("device_type")
+PREBUILT_IMAGE_PARAMETER = Parameter("image", depends=DEVICE_TYPE_PARAMETER)
+
+TESTDEF_URLS_PARAMETER = ListParameter("testdef_urls")
+TESTDEF_URLS_PARAMETER.store = False
+
+# Use another ID for the server parameter, might be different.
+SERVER_PARAMETER = Parameter("stream_server")
+STREAM_PARAMETER = Parameter("stream")
BOOT_TEST = {
+ "timeout": 18000,
"job_name": "Boot test",
- "device_type": device_type,
+ "device_type": DEVICE_TYPE_PARAMETER,
"actions": [
{
"command": "deploy_linaro_image",
"parameters": {
- "image": prebuilt_image
+ "image": PREBUILT_IMAGE_PARAMETER
}
},
{
@@ -43,21 +53,72 @@
LAVA_TEST_SHELL = {
"job_name": "LAVA Test Shell",
- "device_type": device_type,
- "actions": [
- {
- "command": "deploy_linaro_image",
- "parameters": {
- "image": prebuilt_image,
- }
- },
- {
- "command": "lava_test_shell",
- "parameters": {
- "testdef_urls": [
- Parameter("testdef_url")
+ "timeout": 18000,
+ "device_type": DEVICE_TYPE_PARAMETER,
+ "actions": [
+ {
+ "command": "deploy_linaro_image",
+ "parameters": {
+ "image": PREBUILT_IMAGE_PARAMETER,
+ }
+ },
+ {
+ "command": "lava_test_shell",
+ "parameters": {
+ "timeout": 1800,
+ "testdef_urls": TESTDEF_URLS_PARAMETER,
+ }
+ },
+ {
+ "command": "submit_results",
+ "parameters": {
+ "stream": STREAM_PARAMETER,
+ "server": SERVER_PARAMETER
+ }
+ }
+ ]
+}
+
+# This is a special case template, only use when automatically create job files
+# starting from a testdef or a script. Never to be used directly by the user.
+LAVA_TEST_SHELL_TAR_REPO = {
+ "job_name": "LAVA Test Shell",
+ "timeout": 18000,
+ "device_type": DEVICE_TYPE_PARAMETER,
+ "actions": [
+ {
+ "command": "deploy_linaro_image",
+ "parameters": {
+ "image": PREBUILT_IMAGE_PARAMETER,
+ }
+ },
+ {
+ "command": "lava_test_shell",
+ "parameters": {
+ "timeout": 1800,
+ "testdef_repos": [
+ {
+ LAVA_TEST_SHELL_TESDEF_KEY: None,
+ LAVA_TEST_SHELL_TAR_REPO_KEY: None,
+ }
]
}
+ },
+ {
+ "command": "submit_results",
+ "parameters": {
+ "stream": STREAM_PARAMETER,
+ "server": SERVER_PARAMETER
+ }
}
]
}
+
+BOOT_TEST_KEY = "boot-test"
+LAVA_TEST_SHELL_KEY = "lava-test-shell"
+
+# Dict with all the user available job templates.
+JOB_TYPES = {
+ BOOT_TEST_KEY: BOOT_TEST,
+ LAVA_TEST_SHELL_KEY: LAVA_TEST_SHELL,
+}
=== modified file 'lava/job/tests/test_commands.py'
@@ -20,74 +20,136 @@
Unit tests for the commands classes
"""
-from argparse import ArgumentParser
import json
-from os import (
- makedirs,
- removedirs,
-)
-from os.path import(
- exists,
- join,
-)
-from shutil import(
- rmtree,
-)
-from tempfile import mkdtemp
-from unittest import TestCase
-
-from lava.config import NonInteractiveConfig
-from lava.job.commands import *
+import os
+
+from mock import patch
+
+from lava.config import Config
+from lava.helper.tests.helper_test import HelperTest
+from lava.job.commands import (
+ new,
+ run,
+ submit,
+ status,
+)
+from lava.parameter import Parameter
from lava.tool.errors import CommandError
-from mocker import Mocker
-
-def make_command(command, *args):
- parser = ArgumentParser(description="fake argument parser")
- command.register_arguments(parser)
- the_args = parser.parse_args(*args)
- cmd = command(parser, the_args)
- cmd.config = NonInteractiveConfig({ 'device_type': 'foo', 'prebuilt_image': 'bar' })
- return cmd
-
-class CommandTest(TestCase):
-
- def setUp(self):
- self.tmpdir = mkdtemp()
+
+class CommandTest(HelperTest):
+
+ def setUp(self):
+ super(CommandTest, self).setUp()
+ self.args.FILE = self.temp_file.name
+ self.args.type = "boot-test"
+
+ self.device_type = Parameter('device_type')
+ self.prebuilt_image = Parameter('prebuilt_image',
+ depends=self.device_type)
+ self.config = Config()
+ self.config.put_parameter(self.device_type, 'foo')
+ self.config.put_parameter(self.prebuilt_image, 'bar')
+
+
+class JobNewTest(CommandTest):
+
+ def setUp(self):
+ super(JobNewTest, self).setUp()
+ self.args.FILE = self.tmp("new_file.json")
+ self.new_command = new(self.parser, self.args)
+ self.new_command.config = self.config
def tearDown(self):
- rmtree(self.tmpdir)
-
- def tmp(self, filename):
- return join(self.tmpdir, filename)
-
-class JobNewTest(CommandTest):
+ super(JobNewTest, self).tearDown()
+ if os.path.exists(self.args.FILE):
+ os.unlink(self.args.FILE)
+
+ def test_register_arguments(self):
+ new_cmd = new(self.parser, self.args)
+ new_cmd.register_arguments(self.parser)
+
+ # Make sure we do not forget about this test.
+ self.assertEqual(3, len(self.parser.method_calls))
+
+ _, args, _ = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ _, args, _ = self.parser.method_calls[1]
+ self.assertIn("FILE", args)
+
+ _, args, _ = self.parser.method_calls[2]
+ self.assertIn("--type", args)
def test_create_new_file(self):
- f = self.tmp('file.json')
- command = make_command(new, [f])
- command.invoke()
- self.assertTrue(exists(f))
+ self.new_command.invoke()
+ self.assertTrue(os.path.exists(self.args.FILE))
def test_fills_in_template_parameters(self):
- f = self.tmp('myjob.json')
- command = make_command(new, [f])
- command.invoke()
+ self.new_command.invoke()
- data = json.loads(open(f).read())
+ data = json.loads(open(self.args.FILE).read())
self.assertEqual(data['device_type'], 'foo')
- def test_wont_overwriteexisting_file(self):
- existing = self.tmp('existing.json')
- with open(existing, 'w') as f:
+ def test_wont_overwrite_existing_file(self):
+ with open(self.args.FILE, 'w') as f:
f.write("CONTENTS")
- command = make_command(new, [existing])
- with self.assertRaises(CommandError):
- command.invoke()
- self.assertEqual("CONTENTS", open(existing).read())
+
+ self.assertRaises(CommandError, self.new_command.invoke)
+ self.assertEqual("CONTENTS", open(self.args.FILE).read())
+
class JobSubmitTest(CommandTest):
def test_receives_job_file_in_cmdline(self):
- cmd = make_command(new, ['FOO.json'])
- self.assertEqual('FOO.json', cmd.args.FILE)
+ command = submit(self.parser, self.args)
+ command.register_arguments(self.parser)
+ name, args, kwargs = self.parser.method_calls[1]
+ self.assertIn("FILE", args)
+
+
+class JobRunTest(CommandTest):
+
+ def test_register_arguments(self):
+ run_cmd = run(self.parser, self.args)
+ run_cmd.register_arguments(self.parser)
+
+ # Make sure we do not forget about this test.
+ self.assertEqual(2, len(self.parser.method_calls))
+
+ _, args, _ = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ _, args, _ = self.parser.method_calls[1]
+ self.assertIn("FILE", args)
+
+ def test_invoke_raises_0(self):
+ # Users passes a non existing job file to the run command.
+ self.args.FILE = self.tmp("test_invoke_raises_0.json")
+ command = run(self.parser, self.args)
+ self.assertRaises(CommandError, command.invoke)
+
+ @patch("lava.helper.command.has_command", create=True)
+ def test_invoke_raises_1(self, mocked_has_command):
+ # User passes a valid file to the run command, but she does not have
+ # the dispatcher installed.
+ mocked_has_command.return_value = False
+ command = run(self.parser, self.args)
+ self.assertRaises(CommandError, command.invoke)
+
+
+class TestsStatusCommand(CommandTest):
+
+ def test_register_arguments(self):
+ self.args.JOB_ID = "1"
+ status_cmd = status(self.parser, self.args)
+ status_cmd.register_arguments(self.parser)
+
+ # Make sure we do not forget about this test.
+ self.assertEqual(2, len(self.parser.method_calls))
+
+ _, args, _ = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ _, args, _ = self.parser.method_calls[1]
+ self.assertIn("JOB_ID", args)
=== modified file 'lava/job/tests/test_job.py'
@@ -20,49 +20,73 @@
Unit tests for the Job class
"""
+import os
import json
-from unittest import TestCase
-from StringIO import StringIO
-
-from lava.config import NonInteractiveConfig
-from lava.job.templates import *
+import tempfile
+
+from mock import patch
+
+from lava.config import Config
+from lava.helper.tests.helper_test import HelperTest
from lava.job import Job
-
-class JobTest(TestCase):
+from lava.job.templates import BOOT_TEST
+from lava.parameter import Parameter
+
+
+class JobTest(HelperTest):
+
+ @patch("lava.config.Config.save")
+ def setUp(self, mocked_config):
+ super(JobTest, self).setUp()
+ self.config = Config()
+ self.config.config_file = self.temp_file.name
def test_from_template(self):
template = {}
- job = Job(template)
+ job = Job(template, self.temp_file.name)
self.assertEqual(job.data, template)
self.assertIsNot(job.data, template)
- def test_fill_in_data(self):
- job = Job(BOOT_TEST)
+ def test_update_data(self):
image = "/path/to/panda.img"
- config = NonInteractiveConfig(
- {
- "device_type": "panda",
- "prebuilt_image": image,
- }
- )
- job.fill_in(config)
+ param1 = Parameter("device_type")
+ param2 = Parameter("image", depends=param1)
+ self.config.put_parameter(param1, "panda")
+ self.config.put_parameter(param2, image)
+
+ job = Job(BOOT_TEST, self.temp_file.name)
+ job.update(self.config)
self.assertEqual(job.data['device_type'], "panda")
self.assertEqual(job.data['actions'][0]["parameters"]["image"], image)
def test_write(self):
- orig_data = { "foo": "bar" }
- job = Job(orig_data)
- output = StringIO()
- job.write(output)
-
- data = json.loads(output.getvalue())
- self.assertEqual(data, orig_data)
+ try:
+ orig_data = {"foo": "bar"}
+ job_file = os.path.join(tempfile.gettempdir(), "a_json_file.json")
+ job = Job(orig_data, job_file)
+ job.write()
+
+ output = ""
+ with open(job_file) as read_file:
+ output = read_file.read()
+
+ data = json.loads(output)
+ self.assertEqual(data, orig_data)
+ finally:
+ os.unlink(job_file)
def test_writes_nicely_formatted_json(self):
- orig_data = { "foo": "bar" }
- job = Job(orig_data)
- output = StringIO()
- job.write(output)
-
- self.assertTrue(output.getvalue().startswith("{\n"))
+ try:
+ orig_data = {"foo": "bar"}
+ job_file = os.path.join(tempfile.gettempdir(), "b_json_file.json")
+ job = Job(orig_data, job_file)
+ job.write()
+
+ output = ""
+ with open(job_file) as read_file:
+ output = read_file.read()
+
+ self.assertTrue(output.startswith("{\n"))
+ finally:
+ os.unlink(job_file)
=== added file 'lava/parameter.py'
@@ -0,0 +1,251 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Parameter class and its accessory methods/functions.
+"""
+
+import sys
+import types
+
+from lava_tool.utils import to_list
+
+# Character used to join serialized list parameters.
+LIST_SERIALIZE_DELIMITER = ","
+
+
+class Parameter(object):
+ """A parameter with an optional dependency."""
+ def __init__(self, id, value=None, depends=None):
+ """Creates a new parameter.
+
+ :param id: The name of this parameter.
+ :param value: The value of this parameter. Defaults to None.
+ :param depends: If this Parameter depends on another one. Defaults
+ to None.
+ :type Parameter
+ """
+ self.id = id
+ self.value = value
+ self.depends = depends
+ self.asked = False
+ # Whether to store or not the parameter in the user config file.
+ self.store = True
+
+ def set(self, value):
+ """Sets the value of the parameter.
+
+ :param value: The value to set.
+ """
+ self.value = value
+
+ def prompt(self, old_value=None):
+ """Gets the parameter value from the user.
+
+ To get user input, the builtin `raw_input` function will be used. Input
+ will also be stripped of possible whitespace chars. If Enter or any
+ sort of whitespace chars in typed, the old Parameter value will be
+ returned.
+
+ :param old_value: The old parameter value.
+ :return The input as typed by the user, or the old value.
+ """
+ if not self.asked:
+ if old_value is not None:
+ prompt = "{0} [{1}]: ".format(self.id, old_value)
+ else:
+ prompt = "{0}: ".format(self.id)
+
+ user_input = self.get_user_input(prompt)
+
+ if user_input is not None:
+ if len(user_input) == 0 and old_value:
+ # Keep the old value when user press enter or another
+ # whitespace char.
+ self.value = old_value
+ else:
+ self.value = user_input
+
+ self.asked = True
+
+ return self.value
+
+ @classmethod
+ def get_user_input(cls, prompt=""):
+ """Asks the user for input data.
+
+ :param prompt: The prompt that should be given to the user.
+ :return A string with what the user typed.
+ """
+ data = None
+ try:
+ data = raw_input(prompt).strip()
+ except EOFError:
+ # Force to return None.
+ data = None
+ except KeyboardInterrupt:
+ sys.exit(-1)
+ return data
+
+ @classmethod
+ def serialize(cls, value):
+ """Serializes the passed value to be friendly written to file.
+
+ Lists are serialized as a comma separated string of values.
+
+ :param value: The value to serialize.
+ :return The serialized value as string.
+ """
+ serialized = ""
+ if isinstance(value, list):
+ serialized = LIST_SERIALIZE_DELIMITER.join(
+ str(x) for x in value if x)
+ else:
+ serialized = str(value)
+ return serialized
+
+ @classmethod
+ def deserialize(cls, value):
+ """Deserialize a value into a list.
+
+ The value must have been serialized with the class instance serialize()
+ method.
+
+ :param value: The string value to be deserialized.
+ :type str
+ :return A list of values.
+ """
+ deserialized = []
+ if isinstance(value, types.StringTypes):
+ deserialized = filter(None, (x.strip() for x in value.split(
+ LIST_SERIALIZE_DELIMITER)))
+ else:
+ deserialized = list(value)
+ return deserialized
+
+
+class SingleChoiceParameter(Parameter):
+ """A parameter implemeting a single choice between multiple choices."""
+ def __init__(self, id, choices):
+ super(SingleChoiceParameter, self).__init__(id)
+ self.choices = to_list(choices)
+
+ def prompt(self, prompt, old_value=None):
+ """Asks the user for their choice."""
+ # Sliglty different than the other parameters: here we first present
+ # the user with what the choices are about.
+ print >> sys.stdout, prompt
+
+ index = 1
+ for choice in self.choices:
+ print >> sys.stdout, "\t{0:d}. {1}".format(index, choice)
+ index += 1
+
+ choices_len = len(self.choices)
+ while True:
+ user_input = self.get_user_input("Choice: ")
+
+ if len(user_input) == 0 and old_value:
+ choice = old_value
+ break
+ elif user_input in [str(x) for x in range(1, choices_len + 1)]:
+ choice = self.choices[int(user_input) - 1]
+ break
+
+ return choice
+
+
+class ListParameter(Parameter):
+ """A specialized Parameter to handle list values."""
+
+ # This is used as a deletion character. When we have an old value and the
+ # user enters this char, it sort of deletes the value.
+ DELETE_CHAR = "-"
+
+ def __init__(self, id, value=None, depends=None):
+ super(ListParameter, self).__init__(id, depends=depends)
+ self.value = []
+ if value:
+ self.set(value)
+
+ def set(self, value):
+ """Sets the value of the parameter.
+
+ :param value: The value to set.
+ """
+ self.value = to_list(value)
+
+ def add(self, value):
+ """Adds a new value to the list of values of this parameter.
+
+ :param value: The value to add.
+ """
+ if isinstance(value, list):
+ self.value.extend(value)
+ else:
+ self.value.append(value)
+
+ def prompt(self, old_value=None):
+ """Gets the parameter in a list form.
+
+ To exit the input procedure it is necessary to insert an empty line.
+
+ :return The list of values.
+ """
+
+ if not self.asked:
+ if old_value is not None:
+ # We might get the old value read from file via ConfigParser,
+ # and usually it comes in string format.
+ old_value = self.deserialize(old_value)
+
+ print >> sys.stdout, "Values for '{0}': ".format(self.id)
+
+ index = 1
+ while True:
+ user_input = None
+ if old_value is not None and (0 < len(old_value) >= index):
+ prompt = "{0:>3d}.\n\told: {1}\n\tnew: ".format(
+ index, old_value[index-1])
+ user_input = self.get_user_input(prompt)
+ else:
+ prompt = "{0:>3d}. ".format(index)
+ user_input = self.get_user_input(prompt)
+
+ if user_input is not None:
+ # The user has pressed Enter.
+ if len(user_input) == 0:
+ if old_value is not None and \
+ (0 < len(old_value) >= index):
+ user_input = old_value[index-1]
+ else:
+ break
+
+ if len(user_input) == 1 and user_input == \
+ self.DELETE_CHAR and (0 < len(old_value) >= index):
+ # We have an old value, user presses the DELETE_CHAR
+ # and we do not store anything. This is done to delete
+ # an old entry.
+ pass
+ else:
+ self.value.append(user_input)
+ index += 1
+
+ self.asked = True
+
+ return self.value
=== added directory 'lava/script'
=== added file 'lava/script/__init__.py'
@@ -0,0 +1,51 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""Scripts handling class."""
+
+import os
+import stat
+
+from lava_tool.utils import write_file
+
+
+DEFAULT_MOD = stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH
+DEFAULT_TESTDEF_SCRIPT_CONTENT = """#!/bin/sh
+# Automatic generated content by lava-tool.
+# Please add your own instructions.
+#
+# You can use all the avialable Bash commands.
+#
+# For the available LAVA commands, see:
+# http://lava.readthedocs.org/
+#
+"""
+DEFAULT_TESTDEF_SCRIPT = "mytest.sh"
+
+
+class ShellScript(object):
+
+ """Creates a shell script on the file system with some content."""
+
+ def __init__(self, file_name):
+ self.file_name = file_name
+
+ def write(self):
+ write_file(self.file_name, DEFAULT_TESTDEF_SCRIPT_CONTENT)
+ # Make sure the script is executable.
+ os.chmod(self.file_name, DEFAULT_MOD)
=== added file 'lava/script/commands.py'
@@ -0,0 +1,115 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""Commands to run or submit a script."""
+
+import os
+import tempfile
+
+from lava.helper.command import BaseCommand
+from lava.job import DEFAULT_JOB_FILENAME
+from lava.testdef import DEFAULT_TESTDEF_FILENAME
+from lava.tool.command import CommandGroup
+from lava_tool.utils import verify_path_non_existance
+
+
+class script(CommandGroup):
+
+ """LAVA script file handling."""
+
+ namespace = "lava.script.commands"
+
+
+class ScriptBaseCommand(BaseCommand):
+
+ def _create_tmp_job_file(self, script_file):
+ """Creates a temporary job file to run or submit the passed file.
+
+ The temporary job file and its accessory test definition file are
+ not removed by this method.
+
+ :param script_file: The script file that has to be run or submitted.
+ :return A tuple with the job file path, and the test definition path.
+ """
+ script_file = os.path.abspath(script_file)
+ verify_path_non_existance(script_file)
+
+ temp_dir = tempfile.gettempdir()
+
+ # The name of the job and testdef files.
+ job_file = os.path.join(temp_dir, DEFAULT_JOB_FILENAME)
+ testdef_file = os.path.join(temp_dir, DEFAULT_TESTDEF_FILENAME)
+
+ # The steps that the testdef file should have. We need to change it
+ # from the default one, since the users are passing their own file.
+ steps = "./" + os.path.basename(script_file)
+ testdef_file = self.create_test_definition(testdef_file,
+ steps=steps)
+
+ # The content of the tar file.
+ tar_content = [script_file, testdef_file]
+ job_file = self.create_tar_repo_job(job_file, testdef_file,
+ tar_content)
+
+ return (job_file, testdef_file)
+
+
+class run(ScriptBaseCommand):
+
+ """Runs the specified shell script on a local device."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(run, cls).register_arguments(parser)
+ parser.add_argument("FILE", help="Shell script file to run.")
+
+ def invoke(self):
+ job_file = ""
+ testdef_file = ""
+
+ try:
+ job_file, testdef_file = self._create_tmp_job_file(self.args.FILE)
+ super(run, self).run(job_file)
+ finally:
+ if os.path.isfile(job_file):
+ os.unlink(job_file)
+ if os.path.isfile(testdef_file):
+ os.unlink(testdef_file)
+
+
+class submit(ScriptBaseCommand):
+
+ """Submits the specified shell script to a LAVA server."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(submit, cls).register_arguments(parser)
+ parser.add_argument("FILE", help="Shell script file to send.")
+
+ def invoke(self):
+ job_file = ""
+ testdef_file = ""
+
+ try:
+ job_file, testdef_file = self._create_tmp_job_file(self.args.FILE)
+ super(submit, self).submit(job_file)
+ finally:
+ if os.path.isfile(job_file):
+ os.unlink(job_file)
+ if os.path.isfile(testdef_file):
+ os.unlink(testdef_file)
=== added directory 'lava/script/tests'
=== added file 'lava/script/tests/__init__.py'
=== added file 'lava/script/tests/test_commands.py'
@@ -0,0 +1,59 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Tests for lava.script.commands.
+"""
+
+from lava.helper.tests.helper_test import HelperTest
+from lava.script.commands import (
+ run,
+ submit,
+)
+
+
+class RunCommandTests(HelperTest):
+
+ def test_register_arguments(self):
+ run_cmd = run(self.parser, self.args)
+ run_cmd.register_arguments(self.parser)
+
+ # Make sure we do not forget about this test.
+ self.assertEqual(2, len(self.parser.method_calls))
+
+ _, args, _ = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ _, args, _ = self.parser.method_calls[1]
+ self.assertIn("FILE", args)
+
+
+class SubmitCommandTests(HelperTest):
+
+ def test_register_arguments(self):
+ submit_cmd = submit(self.parser, self.args)
+ submit_cmd.register_arguments(self.parser)
+
+ # Make sure we do not forget about this test.
+ self.assertEqual(2, len(self.parser.method_calls))
+
+ _, args, _ = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ _, args, _ = self.parser.method_calls[1]
+ self.assertIn("FILE", args)
=== added file 'lava/script/tests/test_script.py'
@@ -0,0 +1,80 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Unittests for the ShellScript class.
+"""
+
+import os
+import stat
+
+from lava.helper.tests.helper_test import HelperTest
+from lava.script import ShellScript
+
+
+class ShellScriptTests(HelperTest):
+
+ """ShellScript tests."""
+
+ def test_create_file(self):
+ # Tests that a shell script is actually written.
+ try:
+ temp_file = self.tmp("a_shell_test")
+ script = ShellScript(temp_file)
+ script.write()
+
+ self.assertTrue(os.path.isfile(temp_file))
+ finally:
+ os.unlink(temp_file)
+
+ def test_assure_executable(self):
+ # Tests that the shell script created is executable.
+ try:
+ temp_file = self.tmp("a_shell_test")
+ script = ShellScript(temp_file)
+ script.write()
+
+ expected = (stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH |
+ stat.S_IXOTH)
+
+ obtained = stat.S_IMODE(os.stat(temp_file).st_mode)
+ self.assertEquals(expected, obtained)
+ finally:
+ os.unlink(temp_file)
+
+ def test_shell_script_content(self):
+ # Tests that the shell script created contains the exepcted content.
+ try:
+ temp_file = self.tmp("a_shell_test")
+ script = ShellScript(temp_file)
+ script.write()
+
+ obtained = ""
+ with open(temp_file) as read_file:
+ obtained = read_file.read()
+
+ expected = ("#!/bin/sh\n# Automatic generated "
+ "content by lava-tool.\n# Please add your own "
+ "instructions.\n#\n# You can use all the avialable "
+ "Bash commands.\n#\n# For the available LAVA "
+ "commands, see:\n# http://lava.readthedocs.org/\n"
+ "#\n")
+
+ self.assertEquals(expected, obtained)
+ finally:
+ os.unlink(temp_file)
=== added directory 'lava/testdef'
=== added file 'lava/testdef/__init__.py'
@@ -0,0 +1,82 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""TestDefinition class."""
+
+import yaml
+
+from copy import deepcopy
+
+from lava.helper.template import (
+ expand_template,
+ set_value,
+)
+from lava_tool.utils import (
+ write_file,
+ verify_path_existance,
+ verify_file_extension,
+)
+
+# Default name for a test definition.
+DEFAULT_TESTDEF_FILENAME = "lavatest.yaml"
+# Default test def file extension.
+DEFAULT_TESTDEF_EXTENSION = "yaml"
+# Possible extensions for a test def file.
+TESTDEF_FILE_EXTENSIONS = [DEFAULT_TESTDEF_EXTENSION]
+
+
+class TestDefinition(object):
+
+ """A test definition object.
+
+ This class should be used to create test definitions. The initialization
+ enforces a default file name extension, and makes sure that the file is
+ not already present on the file system.
+ """
+
+ def __init__(self, data, file_name):
+ """Initialize the object.
+
+ :param data: The serializable data to be used, usually a template.
+ :type dict
+ :param file_name: Where the test definition will be written.
+ :type str
+ """
+ self.file_name = verify_file_extension(file_name,
+ DEFAULT_TESTDEF_EXTENSION,
+ TESTDEF_FILE_EXTENSIONS)
+ verify_path_existance(self.file_name)
+
+ self.data = deepcopy(data)
+
+ def set(self, key, value):
+ """Set key to the specified value.
+
+ :param key: The key to look in the object data.
+ :param value: The value to set.
+ """
+ set_value(self.data, key, value)
+
+ def write(self):
+ """Writes the test definition to file."""
+ content = yaml.dump(self.data, default_flow_style=False, indent=4)
+ write_file(self.file_name, content)
+
+ def update(self, config):
+ """Updates the TestDefinition object based on the provided config."""
+ expand_template(self.data, config)
=== added file 'lava/testdef/commands.py'
@@ -0,0 +1,104 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test definition commands class.
+"""
+
+import os
+import tempfile
+
+from lava.helper.command import BaseCommand
+from lava.job import DEFAULT_JOB_FILENAME
+from lava.tool.command import CommandGroup
+from lava_tool.utils import verify_path_non_existance
+
+
+class testdef(CommandGroup):
+
+ """LAVA test definitions handling."""
+
+ namespace = "lava.testdef.commands"
+
+
+class TestdefBaseCommand(BaseCommand):
+
+ def _create_tmp_job_file(self, testdef_file):
+ testdef_file = os.path.abspath(testdef_file)
+ verify_path_non_existance(testdef_file)
+
+ job_file = os.path.join(tempfile.gettempdir(),
+ DEFAULT_JOB_FILENAME)
+
+ tar_content = [testdef_file]
+ job_file = self.create_tar_repo_job(job_file, testdef_file,
+ tar_content)
+
+ return job_file
+
+
+class new(TestdefBaseCommand):
+
+ """Creates a new test definition file."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(new, cls).register_arguments(parser)
+ parser.add_argument("FILE", help="Test definition file to create.")
+
+ def invoke(self):
+ full_path = os.path.abspath(self.args.FILE)
+ self.create_test_definition(full_path)
+
+
+class run(TestdefBaseCommand):
+
+ """Runs the specified test definition on a local device."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(run, cls).register_arguments(parser)
+ parser.add_argument("FILE", help="Test definition file to run.")
+
+ def invoke(self):
+ job_file = ""
+ try:
+ job_file = self._create_tmp_job_file(self.args.FILE)
+ super(run, self).run(job_file)
+ finally:
+ if os.path.isfile(job_file):
+ os.unlink(job_file)
+
+
+class submit(TestdefBaseCommand):
+
+ """Submits the specified test definition to a LAVA server."""
+
+ @classmethod
+ def register_arguments(cls, parser):
+ super(submit, cls).register_arguments(parser)
+ parser.add_argument("FILE", help="Test definition file to send.")
+
+ def invoke(self):
+ job_file = ""
+ try:
+ job_file = self._create_tmp_job_file(self.args.FILE)
+ super(submit, self).submit(job_file)
+ finally:
+ if os.path.isfile(job_file):
+ os.unlink(job_file)
=== added file 'lava/testdef/templates.py'
@@ -0,0 +1,52 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test definition templates."""
+
+from lava.parameter import (
+ Parameter,
+)
+
+DEFAULT_TESTDEF_VERSION = "1.0"
+DEFAULT_TESTDEF_FORMAT = "Lava-Test Test Definition 1.0"
+DEFAULT_ENVIRONMET_VALUE = "lava_test_shell"
+
+# All these parameters will not be stored on the local config file.
+NAME_PARAMETER = Parameter("name")
+NAME_PARAMETER.store = False
+
+DESCRIPTION_PARAMETER = Parameter("description", depends=NAME_PARAMETER)
+DESCRIPTION_PARAMETER.store = False
+
+TESTDEF_STEPS_KEY = "steps"
+
+TESTDEF_TEMPLATE = {
+ "metadata": {
+ "name": NAME_PARAMETER,
+ "format": DEFAULT_TESTDEF_FORMAT,
+ "version": DEFAULT_TESTDEF_VERSION,
+ "description": DESCRIPTION_PARAMETER,
+ "environment": [DEFAULT_ENVIRONMET_VALUE],
+ },
+ "run": {
+ TESTDEF_STEPS_KEY: ["./mytest.sh"]
+ },
+ "parse": {
+ "pattern": r'^\s*(?P<test_case_id>\w+)=(?P<result>\w+)\s*$'
+ }
+}
=== added directory 'lava/testdef/tests'
=== added file 'lava/testdef/tests/__init__.py'
=== added file 'lava/testdef/tests/test_commands.py'
@@ -0,0 +1,153 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Tests for lava.testdef.commands.
+"""
+
+import os
+import tempfile
+import yaml
+
+from mock import (
+ patch,
+)
+
+from lava.config import InteractiveConfig
+from lava.helper.tests.helper_test import HelperTest
+from lava.testdef.commands import (
+ new,
+)
+from lava.tool.errors import CommandError
+
+
+class NewCommandTest(HelperTest):
+ """Class for the lava.testdef new command tests."""
+
+ @patch("lava.config.Config.save")
+ def setUp(self, mocked_save):
+ super(NewCommandTest, self).setUp()
+ self.file_name = "fake_testdef.yaml"
+ self.file_path = os.path.join(tempfile.gettempdir(), self.file_name)
+ self.args.FILE = self.file_path
+
+ self.temp_yaml = tempfile.NamedTemporaryFile(suffix=".yaml",
+ delete=False)
+
+ self.config_file = tempfile.NamedTemporaryFile(delete=False)
+ self.config = InteractiveConfig()
+ self.config.config_file = self.config_file.name
+ # Patch class raw_input, start it, and stop it on tearDown.
+ self.patcher1 = patch("lava.parameter.raw_input", create=True)
+ self.mocked_raw_input = self.patcher1.start()
+
+ def tearDown(self):
+ super(NewCommandTest, self).tearDown()
+ if os.path.isfile(self.file_path):
+ os.unlink(self.file_path)
+ os.unlink(self.config_file.name)
+ os.unlink(self.temp_yaml.name)
+ self.patcher1.stop()
+
+ def test_register_arguments(self):
+ # Make sure that the parser add_argument is called and we have the
+ # correct argument.
+ new_command = new(self.parser, self.args)
+ new_command.register_arguments(self.parser)
+
+ # Make sure we do not forget about this test.
+ self.assertEqual(2, len(self.parser.method_calls))
+
+ _, args, _ = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ _, args, _ = self.parser.method_calls[1]
+ self.assertIn("FILE", args)
+
+ def test_invoke_0(self):
+ # Test that passing a file on the command line, it is created on the
+ # file system.
+ self.mocked_raw_input.return_value = "\n"
+ new_command = new(self.parser, self.args)
+ new_command.invoke()
+ self.assertTrue(os.path.exists(self.file_path))
+
+ def test_invoke_1(self):
+ # Test that when passing an already existing file, an exception is
+ # thrown.
+ self.args.FILE = self.temp_yaml.name
+ new_command = new(self.parser, self.args)
+ self.assertRaises(CommandError, new_command.invoke)
+
+ def test_invoke_2(self):
+ # Tests that when adding a new test definition and writing it to file
+ # a correct YAML structure is created.
+ self.mocked_raw_input.return_value = "\n"
+ new_command = new(self.parser, self.args)
+ new_command.config = self.config
+ new_command.invoke()
+ expected = {'run': {'steps': ["./mytest.sh"]},
+ 'metadata': {
+ 'environment': ['lava_test_shell'],
+ 'format': 'Lava-Test Test Definition 1.0',
+ 'version': '1.0',
+ 'description': '',
+ 'name': ''},
+ 'parse': {
+ 'pattern':
+ '^\\s*(?P<test_case_id>\\w+)=(?P<result>\\w+)\\s*$'
+ },
+ }
+ obtained = None
+ with open(self.file_path, 'r') as read_file:
+ obtained = yaml.load(read_file)
+ self.assertEqual(expected, obtained)
+
+ def test_invoke_3(self):
+ # Tests that when adding a new test definition and writing it to a file
+ # in a directory withour permissions, exception is raised.
+ self.args.FILE = "/test_file.yaml"
+ self.mocked_raw_input.return_value = "\n"
+ new_command = new(self.parser, self.args)
+ self.assertRaises(CommandError, new_command.invoke)
+ self.assertFalse(os.path.exists(self.args.FILE))
+
+ def test_invoke_4(self):
+ # Tests that when passing values for the "steps" ListParameter, we get
+ # back the correct data structure.
+ self.mocked_raw_input.side_effect = ["foo", "\n", "\n", "\n", "\n",
+ "\n"]
+ new_command = new(self.parser, self.args)
+ new_command.invoke()
+ expected = {'run': {'steps': ["./mytest.sh"]},
+ 'metadata': {
+ 'environment': ['lava_test_shell'],
+ 'format': 'Lava-Test Test Definition 1.0',
+ 'version': '1.0',
+ 'description': '',
+ 'name': 'foo'
+ },
+ 'parse': {
+ 'pattern':
+ '^\\s*(?P<test_case_id>\\w+)=(?P<result>\\w+)\\s*$'
+ },
+ }
+ obtained = None
+ with open(self.file_path, 'r') as read_file:
+ obtained = yaml.load(read_file)
+ self.assertEqual(expected, obtained)
=== added directory 'lava/tests'
=== added file 'lava/tests/__init__.py'
=== added file 'lava/tests/test_commands.py'
@@ -0,0 +1,127 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Tests for lava.commands.
+"""
+
+import os
+import tempfile
+
+from mock import (
+ MagicMock,
+ patch
+)
+
+from lava.commands import (
+ init,
+ submit,
+)
+from lava.config import Config
+from lava.helper.tests.helper_test import HelperTest
+from lava.tool.errors import CommandError
+
+
+class InitCommandTests(HelperTest):
+
+ def setUp(self):
+ super(InitCommandTests, self).setUp()
+ self.config_file = self.tmp("init_command_tests")
+ self.config = Config()
+ self.config.config_file = self.config_file
+
+ def tearDown(self):
+ super(InitCommandTests, self).tearDown()
+ if os.path.isfile(self.config_file):
+ os.unlink(self.config_file)
+
+ def test_register_arguments(self):
+ self.args.DIR = os.path.join(tempfile.gettempdir(), "a_fake_dir")
+ init_command = init(self.parser, self.args)
+ init_command.register_arguments(self.parser)
+
+ # Make sure we do not forget about this test.
+ self.assertEqual(2, len(self.parser.method_calls))
+
+ _, args, _ = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ _, args, _ = self.parser.method_calls[1]
+ self.assertIn("DIR", args)
+
+ @patch("lava.commands.edit_file", create=True)
+ def test_command_invoke_0(self, mocked_edit_file):
+ # Invoke the init command passing a path to a file. Should raise an
+ # exception.
+ self.args.DIR = self.temp_file.name
+ init_command = init(self.parser, self.args)
+ self.assertRaises(CommandError, init_command.invoke)
+
+ def test_command_invoke_2(self):
+ # Invoke the init command passing a path where the user cannot write.
+ try:
+ self.args.DIR = "/root/a_temp_dir"
+ init_command = init(self.parser, self.args)
+ self.assertRaises(CommandError, init_command.invoke)
+ finally:
+ if os.path.exists(self.args.DIR):
+ os.removedirs(self.args.DIR)
+
+ def test_update_data(self):
+ # Make sure the template is updated accordingly with the provided data.
+ self.args.DIR = self.temp_file.name
+
+ init_command = init(self.parser, self.args)
+ init_command.config.get = MagicMock()
+ init_command.config.save = MagicMock()
+ init_command.config.get.side_effect = ["a_job.json"]
+
+ expected = {
+ "jobfile": "a_job.json",
+ }
+
+ obtained = init_command._update_data()
+ self.assertEqual(expected, obtained)
+
+
+class SubmitCommandTests(HelperTest):
+ def setUp(self):
+ super(SubmitCommandTests, self).setUp()
+ self.config_file = self.tmp("submit_command_tests")
+ self.config = Config()
+ self.config.config_file = self.config_file
+ self.config.save = MagicMock()
+
+ def tearDown(self):
+ super(SubmitCommandTests, self).tearDown()
+ if os.path.isfile(self.config_file):
+ os.unlink(self.config_file)
+
+ def test_register_arguments(self):
+ self.args.JOB = os.path.join(tempfile.gettempdir(), "a_fake_file")
+ submit_command = submit(self.parser, self.args)
+ submit_command.register_arguments(self.parser)
+
+ # Make sure we do not forget about this test.
+ self.assertEqual(2, len(self.parser.method_calls))
+
+ _, args, _ = self.parser.method_calls[0]
+ self.assertIn("--non-interactive", args)
+
+ _, args, _ = self.parser.method_calls[1]
+ self.assertIn("JOB", args)
=== added file 'lava/tests/test_config.py'
@@ -0,0 +1,281 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+lava.config unit tests.
+"""
+
+import sys
+
+from StringIO import StringIO
+from mock import (
+ MagicMock,
+ call,
+ patch,
+)
+
+from lava.config import (
+ Config,
+ InteractiveConfig,
+)
+from lava.helper.tests.helper_test import HelperTest
+from lava.parameter import (
+ Parameter,
+ ListParameter,
+)
+from lava.tool.errors import CommandError
+
+
+class ConfigTestCase(HelperTest):
+ """General test case class for the different Config classes."""
+ def setUp(self):
+ super(ConfigTestCase, self).setUp()
+ self.param1 = Parameter("foo")
+ self.param2 = Parameter("bar", depends=self.param1)
+
+
+class ConfigTest(ConfigTestCase):
+
+ @patch("lava.config.Config.save")
+ def setUp(self, mocked_save):
+ super(ConfigTest, self).setUp()
+ self.config = Config()
+ self.config.config_file = self.temp_file.name
+
+ def test_assert_temp_config_file(self):
+ # Dummy test to make sure we are overriding correctly the Config class.
+ self.assertEqual(self.config.config_file, self.temp_file.name)
+
+ def test_config_put_in_cache_0(self):
+ self.config._put_in_cache("key", "value", "section")
+ self.assertEqual(self.config._cache["section"]["key"], "value")
+
+ def test_config_get_from_cache_0(self):
+ self.config._put_in_cache("key", "value", "section")
+ obtained = self.config._get_from_cache(Parameter("key"), "section")
+ self.assertEqual("value", obtained)
+
+ def test_config_get_from_cache_1(self):
+ self.config._put_in_cache("key", "value", "DEFAULT")
+ obtained = self.config._get_from_cache(Parameter("key"), "DEFAULT")
+ self.assertEqual("value", obtained)
+
+ def test_config_put_0(self):
+ # Puts a value in the DEFAULT section.
+ self.config._put_in_cache = MagicMock()
+ self.config.put("foo", "foo")
+ expected = "foo"
+ obtained = self.config._config_backend.get("DEFAULT", "foo")
+ self.assertEqual(expected, obtained)
+
+ def test_config_put_1(self):
+ # Puts a value in a new section.
+ self.config._put_in_cache = MagicMock()
+ self.config.put("foo", "foo", "bar")
+ expected = "foo"
+ obtained = self.config._config_backend.get("bar", "foo")
+ self.assertEqual(expected, obtained)
+
+ def test_config_put_parameter_0(self):
+ self.config._calculate_config_section = MagicMock(return_value="")
+ self.assertRaises(CommandError, self.config.put_parameter, self.param1)
+
+ @patch("lava.config.Config.put")
+ def test_config_put_parameter_1(self, mocked_config_put):
+ self.config._calculate_config_section = MagicMock(
+ return_value="DEFAULT")
+
+ self.param1.value = "bar"
+ self.config.put_parameter(self.param1)
+
+ self.assertEqual(mocked_config_put.mock_calls,
+ [call("foo", "bar", "DEFAULT")])
+
+ def test_config_get_0(self):
+ # Tests that with a non existing parameter, it returns None.
+ param = Parameter("baz")
+ self.config._get_from_cache = MagicMock(return_value=None)
+ self.config._calculate_config_section = MagicMock(
+ return_value="DEFAULT")
+
+ expected = None
+ obtained = self.config.get(param)
+ self.assertEqual(expected, obtained)
+
+ def test_config_get_1(self):
+ self.config.put_parameter(self.param1, "foo")
+ self.config._get_from_cache = MagicMock(return_value=None)
+ self.config._calculate_config_section = MagicMock(
+ return_value="DEFAULT")
+
+ expected = "foo"
+ obtained = self.config.get(self.param1)
+ self.assertEqual(expected, obtained)
+
+ def test_calculate_config_section_0(self):
+ expected = "DEFAULT"
+ obtained = self.config._calculate_config_section(self.param1)
+ self.assertEqual(expected, obtained)
+
+ def test_calculate_config_section_1(self):
+ self.config.put_parameter(self.param1, "foo")
+ expected = "foo=foo"
+ obtained = self.config._calculate_config_section(self.param2)
+ self.assertEqual(expected, obtained)
+
+ def test_config_save(self):
+ self.config.put_parameter(self.param1, "foo")
+ self.config.save()
+
+ expected = "[DEFAULT]\nfoo = foo\n\n"
+ obtained = ""
+ with open(self.temp_file.name) as tmp_file:
+ obtained = tmp_file.read()
+ self.assertEqual(expected, obtained)
+
+ def test_config_get_from_backend_public(self):
+ # Need to to this, since we want a clean Config instance, with
+ # a config_file with some content.
+ with open(self.config.config_file, "w") as write_config:
+ write_config.write("[DEFAULT]\nfoo=bar\n")
+ param = Parameter("foo")
+ obtained = self.config.get_from_backend(param)
+ self.assertEquals("bar", obtained)
+
+
+class InteractiveConfigTest(ConfigTestCase):
+
+ @patch("lava.config.Config.save")
+ def setUp(self, mocked_save):
+ super(InteractiveConfigTest, self).setUp()
+ self.config = InteractiveConfig()
+ self.config.config_file = self.temp_file.name
+
+ @patch("lava.config.Config.get", new=MagicMock(return_value=None))
+ def test_non_interactive_config_0(self):
+ # Try to get a value that does not exists, users just press enter when
+ # asked for a value. Value will be empty.
+ self.config.force_interactive = False
+ sys.stdin = StringIO("\n")
+ value = self.config.get(Parameter("foo"))
+ self.assertEqual("", value)
+
+ @patch("lava.config.Config.get", new=MagicMock(return_value="value"))
+ def test_non_interactive_config_1(self):
+ # Parent class config returns value, but we are not interactive.
+ self.config.force_interactive = False
+ value = self.config.get(Parameter("foo"))
+ self.assertEqual("value", value)
+
+ @patch("lava.config.Config.get", new=MagicMock(return_value=None))
+ def test_non_interactive_config_2(self):
+ self.config.force_interactive = False
+ expected = "bar"
+ sys.stdin = StringIO(expected)
+ value = self.config.get(Parameter("foo"))
+ self.assertEqual(expected, value)
+
+ @patch("lava.config.Config.get", new=MagicMock(return_value="value"))
+ def test_interactive_config_0(self):
+ # We force to be interactive, meaning that even if a value is found,
+ # it will be asked anyway.
+ self.config.force_interactive = True
+ expected = "a_new_value"
+ sys.stdin = StringIO(expected)
+ value = self.config.get(Parameter("foo"))
+ self.assertEqual(expected, value)
+
+ @patch("lava.config.Config.get", new=MagicMock(return_value="value"))
+ def test_interactive_config_1(self):
+ # Force to be interactive, but when asked for the new value press
+ # Enter. The old value should be returned.
+ self.config.force_interactive = True
+ sys.stdin = StringIO("\n")
+ value = self.config.get(Parameter("foo"))
+ self.assertEqual("value", value)
+
+ def test_calculate_config_section_0(self):
+ self.config.force_interactive = True
+ obtained = self.config._calculate_config_section(self.param1)
+ expected = "DEFAULT"
+ self.assertEqual(expected, obtained)
+
+ def test_calculate_config_section_1(self):
+ self.param1.set("foo")
+ self.param2.depends.asked = True
+ self.config.force_interactive = True
+ obtained = self.config._calculate_config_section(self.param2)
+ expected = "foo=foo"
+ self.assertEqual(expected, obtained)
+
+ def test_calculate_config_section_2(self):
+ self.config.force_interactive = True
+ self.config.config_backend.get = MagicMock(return_value=None)
+ sys.stdin = StringIO("baz")
+ expected = "foo=baz"
+ obtained = self.config._calculate_config_section(self.param2)
+ self.assertEqual(expected, obtained)
+
+ def test_calculate_config_section_3(self):
+ # Tests that when a parameter has its value in the cache and also on
+ # file, we honor the cached version.
+ self.param1.set("bar")
+ self.param2.depends.asked = True
+ self.config.force_interactive = True
+ expected = "foo=bar"
+ obtained = self.config._calculate_config_section(self.param2)
+ self.assertEqual(expected, obtained)
+
+ @patch("lava.config.Config.get", new=MagicMock(return_value=None))
+ @patch("lava.parameter.sys.exit")
+ @patch("lava.parameter.raw_input", create=True)
+ def test_interactive_config_exit(self, mocked_raw, mocked_sys_exit):
+ self.config._calculate_config_section = MagicMock(
+ return_value="DEFAULT")
+
+ mocked_raw.side_effect = KeyboardInterrupt()
+
+ self.config.force_interactive = True
+ self.config.get(self.param1)
+ self.assertTrue(mocked_sys_exit.called)
+
+ @patch("lava.parameter.raw_input", create=True)
+ def test_interactive_config_with_list_parameter(self, mocked_raw_input):
+ # Tests that we get a list back in the Config class when using
+ # ListParameter and that it contains the expected values.
+ expected = ["foo", "bar"]
+ mocked_raw_input.side_effect = expected + ["\n"]
+ obtained = self.config.get(ListParameter("list"))
+ self.assertIsInstance(obtained, list)
+ self.assertEqual(expected, obtained)
+
+ def test_interactive_save_list_param(self):
+ # Tests that when saved to file, the ListParameter parameter is stored
+ # correctly.
+ param_values = ["foo", "more than one words", "bar"]
+ list_param = ListParameter("list")
+ list_param.set(param_values)
+
+ self.config.put_parameter(list_param, param_values)
+ self.config.save()
+
+ expected = "[DEFAULT]\nlist = " + ",".join(param_values) + "\n\n"
+ obtained = ""
+ with open(self.temp_file.name, "r") as read_file:
+ obtained = read_file.read()
+ self.assertEqual(expected, obtained)
=== added file 'lava/tests/test_parameter.py'
@@ -0,0 +1,206 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+lava.parameter unit tests.
+"""
+
+from mock import patch
+
+from lava.helper.tests.helper_test import HelperTest
+from lava.parameter import (
+ ListParameter,
+ Parameter,
+ SingleChoiceParameter,
+)
+from lava_tool.utils import to_list
+
+
+class GeneralParameterTest(HelperTest):
+ """General class with setUp and tearDown methods for Parameter tests."""
+ def setUp(self):
+ super(GeneralParameterTest, self).setUp()
+ # Patch class raw_input, start it, and stop it on tearDown.
+ self.patcher1 = patch("lava.parameter.raw_input", create=True)
+ self.mocked_raw_input = self.patcher1.start()
+
+ def tearDown(self):
+ super(GeneralParameterTest, self).tearDown()
+ self.patcher1.stop()
+
+
+class ParameterTest(GeneralParameterTest):
+ """Tests for the Parameter class."""
+
+ def setUp(self):
+ super(ParameterTest, self).setUp()
+ self.parameter1 = Parameter("foo", value="baz")
+
+ def test_prompt_0(self):
+ # Tests that when we have a value in the parameters and the user press
+ # Enter, we get the old value back.
+ self.mocked_raw_input.return_value = "\n"
+ obtained = self.parameter1.prompt()
+ self.assertEqual(self.parameter1.value, obtained)
+
+ def test_prompt_1(self,):
+ # Tests that with a value stored in the parameter, if and EOFError is
+ # raised when getting user input, we get back the old value.
+ self.mocked_raw_input.side_effect = EOFError()
+ obtained = self.parameter1.prompt()
+ self.assertEqual(self.parameter1.value, obtained)
+
+ def test_to_list_0(self):
+ value = "a_value"
+ expected = [value]
+ obtained = to_list(value)
+ self.assertIsInstance(obtained, list)
+ self.assertEquals(expected, obtained)
+
+ def test_to_list_1(self):
+ expected = ["a_value", "b_value"]
+ obtained = to_list(expected)
+ self.assertIsInstance(obtained, list)
+ self.assertEquals(expected, obtained)
+
+
+class ListParameterTest(GeneralParameterTest):
+
+ """Tests for the specialized ListParameter class."""
+
+ def setUp(self):
+ super(ListParameterTest, self).setUp()
+ self.list_parameter = ListParameter("list")
+
+ def test_prompt_0(self):
+ # Test that when pressing Enter, the prompt stops and the list is
+ # returned.
+ expected = []
+ self.mocked_raw_input.return_value = "\n"
+ obtained = self.list_parameter.prompt()
+ self.assertEqual(expected, obtained)
+
+ def test_prompt_1(self):
+ # Tests that when passing 3 values, a list with those values
+ # is returned
+ expected = ["foo", "bar", "foobar"]
+ self.mocked_raw_input.side_effect = expected + ["\n"]
+ obtained = self.list_parameter.prompt()
+ self.assertEqual(expected, obtained)
+
+ def test_serialize_0(self):
+ # Tests the serialize method of ListParameter passing a list.
+ expected = "foo,bar,baz,1"
+ to_serialize = ["foo", "bar", "baz", "", 1]
+
+ obtained = self.list_parameter.serialize(to_serialize)
+ self.assertEqual(expected, obtained)
+
+ def test_serialize_1(self):
+ # Tests the serialize method of ListParameter passing an int.
+ expected = "1"
+ to_serialize = 1
+
+ obtained = self.list_parameter.serialize(to_serialize)
+ self.assertEqual(expected, obtained)
+
+ def test_deserialize_0(self):
+ # Tests the deserialize method of ListParameter with a string
+ # of values.
+ expected = ["foo", "bar", "baz"]
+ to_deserialize = "foo,bar,,baz,"
+ obtained = self.list_parameter.deserialize(to_deserialize)
+ self.assertEqual(expected, obtained)
+
+ def test_deserialize_1(self):
+ # Tests the deserialization method of ListParameter passing a list.
+ expected = ["foo", 1, "", "bar"]
+ obtained = self.list_parameter.deserialize(expected)
+ self.assertEqual(expected, obtained)
+
+ def test_set_value_0(self):
+ # Pass a string to a ListParameter, expect a list.
+ set_value = "foo"
+ expected = [set_value]
+ self.list_parameter.set(set_value)
+ self.assertEquals(expected, self.list_parameter.value)
+
+ def test_set_value_1(self):
+ # Pass a list to a ListParameter, expect the same list.
+ expected = ["foo", "bar"]
+ self.list_parameter.set(expected)
+ self.assertEquals(expected, self.list_parameter.value)
+
+ def test_add_value_0(self):
+ # Add a value to a ListParameter, expect a list back.
+ add_value = "foo"
+ expected = [add_value]
+ self.list_parameter.add(add_value)
+ self.assertEquals(expected, self.list_parameter.value)
+
+ def test_add_value_1(self):
+ # Add a list value to a ListParameter with already a value set, expect
+ # a list with both values.
+ # The ListParameter is initialized with a string.
+ add_value = ["foo"]
+ list_param = ListParameter("list", value="bar")
+ expected = ["bar", "foo"]
+ list_param.add(add_value)
+ self.assertEquals(expected, list_param.value)
+
+ def test_add_value_2(self):
+ # Add a list value to a ListParameter with already a value set, expect
+ # a list with both values.
+ # The ListParameter is initialized with a list.
+ add_value = ["foo"]
+ list_param = ListParameter("list", value=["bar", "baz"])
+ expected = ["bar", "baz", "foo"]
+ list_param.add(add_value)
+ self.assertEquals(expected, list_param.value)
+
+
+class TestsSingleChoiceParameter(GeneralParameterTest):
+
+ def setUp(self):
+ super(TestsSingleChoiceParameter, self).setUp()
+ self.choices = ["foo", "bar", "baz", "bam"]
+ self.param_id = "single_choice"
+ self.single_choice_param = SingleChoiceParameter(self.param_id,
+ self.choices)
+
+ def test_with_old_value(self):
+ # There is an old value for a single choice parameter, the user
+ # is prompted to select from the list of values, but she presses
+ # enter. The old value is returned.
+ old_value = "bat"
+ self.mocked_raw_input.side_effect = ["\n"]
+ obtained = self.single_choice_param.prompt("", old_value=old_value)
+ self.assertEquals(old_value, obtained)
+
+ def test_without_old_value(self):
+ # There is no old value, user just select the first choice.
+ self.mocked_raw_input.side_effect = ["1"]
+ obtained = self.single_choice_param.prompt("")
+ self.assertEquals("foo", obtained)
+
+ def test_with_wrong_user_input(self):
+ # No old value, user inserts at least two wrong choices, and the select
+ # the third one.
+ self.mocked_raw_input.side_effect = ["1000", "0", "3"]
+ obtained = self.single_choice_param.prompt("")
+ self.assertEquals("baz", obtained)
=== modified file 'lava_tool/tests/__init__.py'
@@ -26,22 +26,34 @@
def app_modules():
return [
- 'lava_tool.commands',
- 'lava_tool.dispatcher',
- 'lava_tool.interface',
- 'lava_dashboard_tool.commands',
- ]
+ 'lava_tool.commands',
+ 'lava_tool.dispatcher',
+ 'lava_tool.interface',
+ 'lava_dashboard_tool.commands',
+ ]
def test_modules():
return [
- 'lava_tool.tests.test_authtoken',
- 'lava_tool.tests.test_auth_commands',
- 'lava_tool.tests.test_commands',
- 'lava_dashboard_tool.tests.test_commands',
- 'lava.job.tests.test_job',
- 'lava.job.tests.test_commands',
- ]
+ 'lava.device.tests.test_commands',
+ 'lava.device.tests.test_device',
+ 'lava.helper.tests.test_command',
+ 'lava.helper.tests.test_dispatcher',
+ 'lava.helper.tests.test_template',
+ 'lava.job.tests.test_commands',
+ 'lava.job.tests.test_job',
+ 'lava.script.tests.test_commands',
+ 'lava.script.tests.test_script',
+ 'lava.testdef.tests.test_commands',
+ 'lava.tests.test_commands',
+ 'lava.tests.test_config',
+ 'lava.tests.test_parameter',
+ 'lava_dashboard_tool.tests.test_commands',
+ 'lava_tool.tests.test_auth_commands',
+ 'lava_tool.tests.test_authtoken',
+ 'lava_tool.tests.test_commands',
+ 'lava_tool.tests.test_utils',
+ ]
def test_suite():
@@ -52,6 +64,7 @@
modules = app_modules() + test_modules()
suite = unittest.TestSuite()
loader = unittest.TestLoader()
+
for name in modules:
unit_suite = loader.loadTestsFromName(name)
suite.addTests(unit_suite)
=== modified file 'lava_tool/tests/test_authtoken.py'
@@ -24,7 +24,6 @@
import StringIO
from unittest import TestCase
import urlparse
-import sys
import xmlrpclib
from mocker import ARGS, KWARGS, Mocker
=== added file 'lava_tool/tests/test_utils.py'
@@ -0,0 +1,282 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+"""lava_tool.utils tests."""
+
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+
+from unittest import TestCase
+from mock import (
+ MagicMock,
+ call,
+ patch,
+)
+
+from lava.tool.errors import CommandError
+from lava_tool.utils import (
+ can_edit_file,
+ create_dir,
+ edit_file,
+ execute,
+ has_command,
+ retrieve_file,
+ verify_and_create_url,
+ verify_file_extension,
+)
+
+
+class UtilTests(TestCase):
+
+ def setUp(self):
+ self.original_stdout = sys.stdout
+ sys.stdout = open("/dev/null", "w")
+ self.original_stderr = sys.stderr
+ sys.stderr = open("/dev/null", "w")
+ self.original_stdin = sys.stdin
+ self.temp_file = tempfile.NamedTemporaryFile(delete=False)
+
+ def tearDown(self):
+ sys.stdin = self.original_stdin
+ sys.stdout = self.original_stdout
+ sys.stderr = self.original_stderr
+ os.unlink(self.temp_file.name)
+
+ @patch("lava_tool.utils.subprocess.check_call")
+ def test_has_command_0(self, mocked_check_call):
+ # Make sure we raise an exception when the subprocess is called.
+ mocked_check_call.side_effect = subprocess.CalledProcessError(0, "")
+ self.assertFalse(has_command(""))
+
+ @patch("lava_tool.utils.subprocess.check_call")
+ def test_has_command_1(self, mocked_check_call):
+ # Check that a "command" exists. The call to subprocess is mocked.
+ mocked_check_call.return_value = 0
+ self.assertTrue(has_command(""))
+
+ def test_verify_file_extension_with_extension(self):
+ extension = ".test"
+ supported = [extension[1:]]
+ try:
+ temp_file = tempfile.NamedTemporaryFile(suffix=extension,
+ delete=False)
+ obtained = verify_file_extension(
+ temp_file.name, extension[1:], supported)
+ self.assertEquals(temp_file.name, obtained)
+ finally:
+ if os.path.isfile(temp_file.name):
+ os.unlink(temp_file.name)
+
+ def test_verify_file_extension_without_extension(self):
+ extension = "json"
+ supported = [extension]
+ expected = "/tmp/a_fake.json"
+ obtained = verify_file_extension("/tmp/a_fake", extension, supported)
+ self.assertEquals(expected, obtained)
+
+ def test_verify_file_extension_with_unsupported_extension(self):
+ extension = "json"
+ supported = [extension]
+ expected = "/tmp/a_fake.json"
+ obtained = verify_file_extension(
+ "/tmp/a_fake.extension", extension, supported)
+ self.assertEquals(expected, obtained)
+
+ @patch("os.listdir")
+ def test_retrieve_job_file_0(self, mocked_os_listdir):
+ # Make sure that exception is raised if we go through all the elements
+ # returned by os.listdir().
+ mocked_os_listdir.return_value = ["a_file"]
+ self.assertRaises(CommandError, retrieve_file,
+ "a_path", ["ext"])
+
+ @patch("os.listdir")
+ def test_retrieve_job_file_1(self, mocked_os_listdir):
+ # Pass some files and directories to retrieve_file(), and make
+ # sure a file with .json suffix is returned.
+ # Pass also a hidden file.
+ try:
+ json_file = tempfile.NamedTemporaryFile(suffix=".json")
+ json_file_name = os.path.basename(json_file.name)
+
+ file_name_no_suffix = tempfile.NamedTemporaryFile(delete=False)
+ file_name_with_suffix = tempfile.NamedTemporaryFile(
+ suffix=".bork", delete=False)
+
+ temp_dir_name = "submit_command_test_tmp_dir"
+ temp_dir_path = os.path.join(tempfile.gettempdir(), temp_dir_name)
+ os.makedirs(temp_dir_path)
+
+ hidden_file = tempfile.NamedTemporaryFile(
+ prefix=".tmp", delete=False)
+
+ mocked_os_listdir.return_value = [
+ temp_dir_name, file_name_no_suffix.name,
+ file_name_with_suffix.name, json_file_name, hidden_file.name]
+
+ obtained = retrieve_file(tempfile.gettempdir(), ["json"])
+ self.assertEqual(json_file.name, obtained)
+ finally:
+ os.removedirs(temp_dir_path)
+ os.unlink(file_name_no_suffix.name)
+ os.unlink(file_name_with_suffix.name)
+ os.unlink(hidden_file.name)
+
+ def test_retrieve_job_file_2(self):
+ # Pass a file with the valid extension.
+ temp_file = tempfile.NamedTemporaryFile(suffix=".json")
+ obtained = retrieve_file(temp_file.name, ["json"])
+ self.assertEquals(temp_file.name, obtained)
+
+ def test_retrieve_job_file_3(self):
+ # Pass a file with a non-valid extension.
+ temp_file = tempfile.NamedTemporaryFile(suffix=".bork")
+ self.assertRaises(
+ CommandError, retrieve_file, temp_file.name, ["json"])
+
+ @patch("os.listdir")
+ def test_retrieve_job_file_4(self, mocked_os_listdir):
+ # Pass hidden and wrong files and make sure exception is thrown.
+ a_hidden_file = ".a_hidden.json"
+ b_hidden_file = ".b_hidden.json"
+ c_wrong_file = "a_wrong_file.bork"
+
+ mocked_os_listdir.return_value = [a_hidden_file, b_hidden_file, c_wrong_file]
+ self.assertRaises(
+ CommandError, retrieve_file, tempfile.gettempdir(), ["json"])
+
+ @patch("lava_tool.utils.subprocess")
+ def test_execute_0(self, mocked_subprocess):
+ mocked_subprocess.check_call = MagicMock()
+ execute("foo")
+ self.assertEqual(mocked_subprocess.check_call.call_args_list,
+ [call(["foo"])])
+ self.assertTrue(mocked_subprocess.check_call.called)
+
+ @patch("lava_tool.utils.subprocess.check_call")
+ def test_execute_1(self, mocked_check_call):
+ mocked_check_call.side_effect = subprocess.CalledProcessError(1, "foo")
+ self.assertRaises(CommandError, execute, ["foo"])
+
+ @patch("lava_tool.utils.subprocess")
+ @patch("lava_tool.utils.has_command", return_value=False)
+ @patch("lava_tool.utils.os.environ.get", return_value=None)
+ @patch("lava_tool.utils.sys.exit")
+ def test_edit_file_0(self, mocked_sys_exit, mocked_env_get,
+ mocked_has_command, mocked_subprocess):
+ edit_file(self.temp_file.name)
+ self.assertTrue(mocked_sys_exit.called)
+
+ @patch("lava_tool.utils.subprocess")
+ @patch("lava_tool.utils.has_command", side_effect=[True, False])
+ @patch("lava_tool.utils.os.environ.get", return_value=None)
+ def test_edit_file_1(self, mocked_env_get, mocked_has_command,
+ mocked_subprocess):
+ mocked_subprocess.Popen = MagicMock()
+ edit_file(self.temp_file.name)
+ expected = [call(["sensible-editor", self.temp_file.name])]
+ self.assertEqual(expected, mocked_subprocess.Popen.call_args_list)
+
+ @patch("lava_tool.utils.subprocess")
+ @patch("lava_tool.utils.has_command", side_effect=[False, True])
+ @patch("lava_tool.utils.os.environ.get", return_value=None)
+ def test_edit_file_2(self, mocked_env_get, mocked_has_command,
+ mocked_subprocess):
+ mocked_subprocess.Popen = MagicMock()
+ edit_file(self.temp_file.name)
+ expected = [call(["xdg-open", self.temp_file.name])]
+ self.assertEqual(expected, mocked_subprocess.Popen.call_args_list)
+
+ @patch("lava_tool.utils.subprocess")
+ @patch("lava_tool.utils.has_command", return_value=False)
+ @patch("lava_tool.utils.os.environ.get", return_value="vim")
+ def test_edit_file_3(self, mocked_env_get, mocked_has_command,
+ mocked_subprocess):
+ mocked_subprocess.Popen = MagicMock()
+ edit_file(self.temp_file.name)
+ expected = [call(["vim", self.temp_file.name])]
+ self.assertEqual(expected, mocked_subprocess.Popen.call_args_list)
+
+ @patch("lava_tool.utils.subprocess")
+ @patch("lava_tool.utils.has_command", return_value=False)
+ @patch("lava_tool.utils.os.environ.get", return_value="vim")
+ def test_edit_file_4(self, mocked_env_get, mocked_has_command,
+ mocked_subprocess):
+ mocked_subprocess.Popen = MagicMock()
+ mocked_subprocess.Popen.side_effect = Exception()
+ self.assertRaises(CommandError, edit_file, self.temp_file.name)
+
+ def test_can_edit_file(self):
+ # Tests the can_edit_file method of the config command.
+ # This is to make sure the device config file is not erased when
+ # checking if it is possible to open it.
+ expected = ("hostname = a_fake_panda02\nconnection_command = \n"
+ "device_type = panda\n")
+
+ with open(self.temp_file.name, "w") as f:
+ f.write(expected)
+
+ self.assertTrue(can_edit_file(self.temp_file.name))
+ obtained = ""
+ with open(self.temp_file.name) as f:
+ obtained = f.read()
+
+ self.assertEqual(expected, obtained)
+
+ def test_verify_and_create_url_0(self):
+ expected = "https://www.example.org/"
+ obtained = verify_and_create_url("www.example.org", "")
+ self.assertEquals(expected, obtained)
+
+ def test_verify_and_create_url_1(self):
+ expected = "http://www.example.org/"
+ obtained = verify_and_create_url("http://www.example.org")
+ self.assertEquals(expected, obtained)
+
+ def test_verify_and_create_url_2(self):
+ expected = "http://www.example.org/RPC/"
+ obtained = verify_and_create_url("http://www.example.org", "RPC")
+ self.assertEquals(expected, obtained)
+
+ def test_verify_and_create_url_3(self):
+ expected = "https://www.example.org/RPC/"
+ obtained = verify_and_create_url("www.example.org/", "/RPC/")
+ self.assertEquals(expected, obtained)
+
+ def test_create_dir_0(self):
+ try:
+ temp_dir = os.path.join(tempfile.gettempdir(), "a_dir")
+ create_dir(temp_dir)
+ self.assertTrue(os.path.isdir(temp_dir))
+ finally:
+ shutil.rmtree(temp_dir)
+
+ def test_create_dir_1(self):
+ try:
+ temp_dir = os.path.join(tempfile.gettempdir(), "a_dir")
+ create_dir(temp_dir, "subdir")
+ self.assertTrue(os.path.isdir(os.path.join(temp_dir, "subdir")))
+ finally:
+ shutil.rmtree(temp_dir)
+
+ def test_create_dir_2(self):
+ temp_dir = os.path.join("/", "a_temp_dir")
+ self.assertRaises(CommandError, create_dir, temp_dir)
=== added file 'lava_tool/utils.py'
@@ -0,0 +1,332 @@
+# Copyright (C) 2013 Linaro Limited
+#
+# Author: Milo Casagrande <milo.casagrande@linaro.org>
+#
+# This file is part of lava-tool.
+#
+# lava-tool is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3
+# as published by the Free Software Foundation
+#
+# lava-tool is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
+
+import StringIO
+import base64
+import os
+import tarfile
+import tempfile
+import types
+import subprocess
+import sys
+import urlparse
+
+from lava.tool.errors import CommandError
+
+
+def has_command(command):
+ """Checks that the given command is available.
+
+ :param command: The name of the command to check availability.
+ """
+ command_available = True
+ try:
+ subprocess.check_call(["which", command],
+ stdout=open(os.path.devnull, 'w'))
+ except subprocess.CalledProcessError:
+ command_available = False
+ return command_available
+
+
+def to_list(value):
+ """Return a list from the passed value.
+
+ :param value: The parameter to turn into a list.
+ """
+ return_value = []
+ if isinstance(value, types.StringType):
+ return_value = [value]
+ else:
+ return_value = list(value)
+ return return_value
+
+
+def create_tar(paths):
+ """Creates a temporary tar file with the provided paths.
+
+ The tar file is not deleted at the end, it has to be delete by who calls
+ this function.
+
+ If just a directory is passed, it will be flattened out: its contents will
+ be added, but not the directory itself.
+
+ :param paths: List of paths to be included in the tar archive.
+ :type list
+ :return The path to the temporary tar file.
+ """
+ paths = to_list(paths)
+ try:
+ temp_tar_file = tempfile.NamedTemporaryFile(suffix=".tar",
+ delete=False)
+ with tarfile.open(temp_tar_file.name, "w") as tar_file:
+ for path in paths:
+ full_path = os.path.abspath(path)
+ if os.path.isfile(full_path):
+ arcname = os.path.basename(full_path)
+ tar_file.add(full_path, arcname=arcname)
+ elif os.path.isdir(full_path):
+ # If we pass a directory, flatten it out.
+ # List its contents, and add them as they are.
+ for element in os.listdir(full_path):
+ arcname = element
+ tar_file.add(os.path.join(full_path, element),
+ arcname=arcname)
+ return temp_tar_file.name
+ except tarfile.TarError:
+ raise CommandError("Error creating the temporary tar archive.")
+
+
+def base64_encode(path):
+ """Encode in base64 the provided file.
+
+ :param path: The path to a file.
+ :return The file content encoded in base64.
+ """
+ if os.path.isfile(path):
+ encoded_content = StringIO.StringIO()
+
+ try:
+ with open(path) as read_file:
+ base64.encode(read_file, encoded_content)
+
+ return encoded_content.getvalue().strip()
+ except IOError:
+ raise CommandError("Cannot read file "
+ "'{0}'.".format(path))
+ else:
+ raise CommandError("Provided path does not exists or is not a file: "
+ "{0}.".format(path))
+
+
+def retrieve_file(path, extensions):
+ """Searches for a file that has one of the supported extensions.
+
+ The path of the first file that matches one of the supported provided
+ extensions will be returned. The files are examined in alphabetical
+ order.
+
+ :param path: Where to look for the file.
+ :param extensions: A list of extensions the file to look for should
+ have.
+ :return The full path of the file.
+ """
+ if os.path.isfile(path):
+ if check_valid_extension(path, extensions):
+ retrieved_path = path
+ else:
+ raise CommandError("The provided file '{0}' is not "
+ "valid: extension not supported.".format(path))
+ else:
+ dir_listing = os.listdir(path)
+ dir_listing.sort()
+
+ for element in dir_listing:
+ if element.startswith("."):
+ continue
+
+ element_path = os.path.join(path, element)
+ if os.path.isdir(element_path):
+ continue
+ elif os.path.isfile(element_path):
+ if check_valid_extension(element_path, extensions):
+ retrieved_path = element_path
+ break
+ else:
+ raise CommandError("No suitable file found in '{0}'".format(path))
+
+ return retrieved_path
+
+
+def check_valid_extension(path, extensions):
+ """Checks that a file has one of the supported extensions.
+
+ :param path: The file to check.
+ :param extensions: A list of supported extensions.
+ """
+ is_valid = False
+
+ local_path, file_name = os.path.split(path)
+ name, full_extension = os.path.splitext(file_name)
+
+ if full_extension:
+ extension = full_extension[1:].strip().lower()
+ if extension in extensions:
+ is_valid = True
+ return is_valid
+
+
+def verify_file_extension(path, default, supported):
+ """Verifies if a file has a supported extensions.
+
+ If the file does not have one, it will add the default extension
+ provided.
+
+ :param path: The path of a file to verify.
+ :param default: The default extension to use.
+ :param supported: A list of supported extensions to check against.
+ :return The path of the file.
+ """
+ full_path, file_name = os.path.split(path)
+ name, extension = os.path.splitext(file_name)
+ if not extension:
+ path = ".".join([path, default])
+ elif extension[1:].lower() not in supported:
+ path = os.path.join(full_path, ".".join([name, default]))
+ return path
+
+
+def verify_path_existance(path):
+ """Verifies if a given path exists on the file system.
+
+ Raises a CommandError in case it exists.
+
+ :param path: The path to verify.
+ """
+ if os.path.exists(path):
+ raise CommandError("{0} already exists.".format(path))
+
+
+def verify_path_non_existance(path):
+ """Verifies if a given path does not exist on the file system.
+
+ Raises a CommandError in case it does not exist.
+
+ :param path: The path to verify.
+ """
+ if not os.path.exists(path):
+ raise CommandError("{0} does not exists.".format(path))
+
+
+def write_file(path, content):
+ """Creates a file with the specified content.
+
+ :param path: The path of the file to write.
+ :param content: What to write in the file.
+ """
+ try:
+ with open(path, "w") as to_write:
+ to_write.write(content)
+ except (OSError, IOError):
+ raise CommandError("Error writing file '{0}'".format(path))
+
+
+def execute(cmd_args):
+ """Executes the supplied command args.
+
+ :param cmd_args: The command, and its optional arguments, to run.
+ :return The command execution return code.
+ """
+ cmd_args = to_list(cmd_args)
+ try:
+ return subprocess.check_call(cmd_args)
+ except subprocess.CalledProcessError:
+ raise CommandError("Error running the following command: "
+ "{0}".format(" ".join(cmd_args)))
+
+
+def can_edit_file(path):
+ """Checks if a file can be opend in write mode.
+
+ :param path: The path to the file.
+ :return True if it is possible to write on the file, False otherwise.
+ """
+ can_edit = True
+ try:
+ fp = open(path, "a")
+ fp.close()
+ except IOError:
+ can_edit = False
+ return can_edit
+
+
+def edit_file(file_to_edit):
+ """Opens the specified file with the default file editor.
+
+ :param file_to_edit: The file to edit.
+ """
+ editor = os.environ.get("EDITOR", None)
+ if editor is None:
+ if has_command("sensible-editor"):
+ editor = "sensible-editor"
+ elif has_command("xdg-open"):
+ editor = "xdg-open"
+ else:
+ # We really do not know how to open a file.
+ print >> sys.stdout, ("Cannot find an editor to open the "
+ "file '{0}'.".format(file_to_edit))
+ print >> sys.stdout, ("Either set the 'EDITOR' environment "
+ "variable, or install 'sensible-editor' "
+ "or 'xdg-open'.")
+ sys.exit(-1)
+ try:
+ subprocess.Popen([editor, file_to_edit]).wait()
+ except Exception:
+ raise CommandError("Error opening the file '{0}' with the "
+ "following editor: {1}.".format(file_to_edit,
+ editor))
+
+
+def verify_and_create_url(server, endpoint=""):
+ """Checks that the provided values make a correct URL.
+
+ If the server address does not contain a scheme, by default it will use
+ HTTPS.
+ The endpoint is then added at the URL.
+
+ :param server: A server URL to verify.
+ :return A URL.
+ """
+ scheme, netloc, path, params, query, fragment = \
+ urlparse.urlparse(server)
+ if not scheme:
+ scheme = "https"
+ if not netloc:
+ netloc, path = path, ""
+
+ if not netloc[-1:] == "/":
+ netloc += "/"
+
+ if endpoint:
+ if endpoint[0] == "/":
+ endpoint = endpoint[1:]
+ if not endpoint[-1:] == "/":
+ endpoint += "/"
+ netloc += endpoint
+
+ return urlparse.urlunparse(
+ (scheme, netloc, path, params, query, fragment))
+
+
+def create_dir(path, dir_name=None):
+ """Checks if a directory does not exists, and creates it.
+
+ :param path: The path where the directory should be created.
+ :param dir_name: An optional name for a directory to be created at
+ path (dir_name will be joined with path).
+ :return The path of the created directory."""
+ created_dir = path
+ if dir_name:
+ created_dir = os.path.join(path, dir_name)
+
+ if not os.path.isdir(created_dir):
+ try:
+ os.makedirs(created_dir)
+ except OSError:
+ raise CommandError("Cannot create directory "
+ "'{0}'.".format(created_dir))
+ return created_dir
=== modified file 'setup.py'
@@ -46,6 +46,7 @@
"Topic :: Software Development :: Testing",
],
install_requires=[
+ 'PyYAML >= 3.10',
'argparse >= 1.1',
'argcomplete >= 0.3',
'keyring',
@@ -53,5 +54,8 @@
'versiontools >= 1.3.1'
],
setup_requires=['versiontools >= 1.3.1'],
- tests_require=['mocker >= 1.0'],
+ tests_require=[
+ 'mocker >= 1.0',
+ 'mock >= 0.7.2'
+ ],
zip_safe=True)