=== removed file 'COPYING'
@@ -1,165 +0,0 @@
- GNU LESSER GENERAL PUBLIC LICENSE
- Version 3, 29 June 2007
-
- Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
- Everyone is permitted to copy and distribute verbatim copies
- of this license document, but changing it is not allowed.
-
-
- This version of the GNU Lesser General Public License incorporates
-the terms and conditions of version 3 of the GNU General Public
-License, supplemented by the additional permissions listed below.
-
- 0. Additional Definitions.
-
- As used herein, "this License" refers to version 3 of the GNU Lesser
-General Public License, and the "GNU GPL" refers to version 3 of the GNU
-General Public License.
-
- "The Library" refers to a covered work governed by this License,
-other than an Application or a Combined Work as defined below.
-
- An "Application" is any work that makes use of an interface provided
-by the Library, but which is not otherwise based on the Library.
-Defining a subclass of a class defined by the Library is deemed a mode
-of using an interface provided by the Library.
-
- A "Combined Work" is a work produced by combining or linking an
-Application with the Library. The particular version of the Library
-with which the Combined Work was made is also called the "Linked
-Version".
-
- The "Minimal Corresponding Source" for a Combined Work means the
-Corresponding Source for the Combined Work, excluding any source code
-for portions of the Combined Work that, considered in isolation, are
-based on the Application, and not on the Linked Version.
-
- The "Corresponding Application Code" for a Combined Work means the
-object code and/or source code for the Application, including any data
-and utility programs needed for reproducing the Combined Work from the
-Application, but excluding the System Libraries of the Combined Work.
-
- 1. Exception to Section 3 of the GNU GPL.
-
- You may convey a covered work under sections 3 and 4 of this License
-without being bound by section 3 of the GNU GPL.
-
- 2. Conveying Modified Versions.
-
- If you modify a copy of the Library, and, in your modifications, a
-facility refers to a function or data to be supplied by an Application
-that uses the facility (other than as an argument passed when the
-facility is invoked), then you may convey a copy of the modified
-version:
-
- a) under this License, provided that you make a good faith effort to
- ensure that, in the event an Application does not supply the
- function or data, the facility still operates, and performs
- whatever part of its purpose remains meaningful, or
-
- b) under the GNU GPL, with none of the additional permissions of
- this License applicable to that copy.
-
- 3. Object Code Incorporating Material from Library Header Files.
-
- The object code form of an Application may incorporate material from
-a header file that is part of the Library. You may convey such object
-code under terms of your choice, provided that, if the incorporated
-material is not limited to numerical parameters, data structure
-layouts and accessors, or small macros, inline functions and templates
-(ten or fewer lines in length), you do both of the following:
-
- a) Give prominent notice with each copy of the object code that the
- Library is used in it and that the Library and its use are
- covered by this License.
-
- b) Accompany the object code with a copy of the GNU GPL and this license
- document.
-
- 4. Combined Works.
-
- You may convey a Combined Work under terms of your choice that,
-taken together, effectively do not restrict modification of the
-portions of the Library contained in the Combined Work and reverse
-engineering for debugging such modifications, if you also do each of
-the following:
-
- a) Give prominent notice with each copy of the Combined Work that
- the Library is used in it and that the Library and its use are
- covered by this License.
-
- b) Accompany the Combined Work with a copy of the GNU GPL and this license
- document.
-
- c) For a Combined Work that displays copyright notices during
- execution, include the copyright notice for the Library among
- these notices, as well as a reference directing the user to the
- copies of the GNU GPL and this license document.
-
- d) Do one of the following:
-
- 0) Convey the Minimal Corresponding Source under the terms of this
- License, and the Corresponding Application Code in a form
- suitable for, and under terms that permit, the user to
- recombine or relink the Application with a modified version of
- the Linked Version to produce a modified Combined Work, in the
- manner specified by section 6 of the GNU GPL for conveying
- Corresponding Source.
-
- 1) Use a suitable shared library mechanism for linking with the
- Library. A suitable mechanism is one that (a) uses at run time
- a copy of the Library already present on the user's computer
- system, and (b) will operate properly with a modified version
- of the Library that is interface-compatible with the Linked
- Version.
-
- e) Provide Installation Information, but only if you would otherwise
- be required to provide such information under section 6 of the
- GNU GPL, and only to the extent that such information is
- necessary to install and execute a modified version of the
- Combined Work produced by recombining or relinking the
- Application with a modified version of the Linked Version. (If
- you use option 4d0, the Installation Information must accompany
- the Minimal Corresponding Source and Corresponding Application
- Code. If you use option 4d1, you must provide the Installation
- Information in the manner specified by section 6 of the GNU GPL
- for conveying Corresponding Source.)
-
- 5. Combined Libraries.
-
- You may place library facilities that are a work based on the
-Library side by side in a single library together with other library
-facilities that are not Applications and are not covered by this
-License, and convey such a combined library under terms of your
-choice, if you do both of the following:
-
- a) Accompany the combined library with a copy of the same work based
- on the Library, uncombined with any other library facilities,
- conveyed under the terms of this License.
-
- b) Give prominent notice with the combined library that part of it
- is a work based on the Library, and explaining where to find the
- accompanying uncombined form of the same work.
-
- 6. Revised Versions of the GNU Lesser General Public License.
-
- The Free Software Foundation may publish revised and/or new versions
-of the GNU Lesser General Public License from time to time. Such new
-versions will be similar in spirit to the present version, but may
-differ in detail to address new problems or concerns.
-
- Each version is given a distinguishing version number. If the
-Library as you received it specifies that a certain numbered version
-of the GNU Lesser General Public License "or any later version"
-applies to it, you have the option of following the terms and
-conditions either of that published version or of any later version
-published by the Free Software Foundation. If the Library as you
-received it does not specify a version number of the GNU Lesser
-General Public License, you may choose any version of the GNU Lesser
-General Public License ever published by the Free Software Foundation.
-
- If the Library as you received it specifies that a proxy can decide
-whether future versions of the GNU Lesser General Public License shall
-apply, that proxy's public statement of acceptance of any version is
-permanent authorization for you to choose that version for the
-Library.
=== removed file 'HACKING'
@@ -1,17 +0,0 @@
-Tests Code Coverage
-===================
-
-To have a nicely HTML viewable report on tests code coverage, do as follows:
-
-* Install `python-coverage` (`pip install coverage` in case you use pip)
-* Run the following command:
-
- python-coverage run -m unittest lava_tool.tests.test_suite 2>/dev/null && python-coverage html
-
-* The report will be saved in a directory called `lava_tool_coverage`: open
-the `index.html` file in there to see the report.
-
-Notes:
-
- * To re-run the coverage report, you have to delete the `lava_tool_coverage`
-directory first, otherwise `python-coverage` will fail.
=== removed file 'INSTALL'
@@ -1,22 +0,0 @@
-Installation
-============
-
-Installation needs to be done via setup.py. For the impatient,
-
-$ python setup.py develop --user
-
-is a least effort, minimally disruptive starting point.
-
-This will put a script you can invoke in ~/.local/bin/. You may need to include
-that directory into your path.
-
-To undo this process do:
-
-$ python setup.py develop --user --uninstall
-$ rm -f ~/.local/bin/lava-tool
-
-Note that for various reasons the actual program (lava-tool) is not removed by
-this step. Because this program is automatically generated it is safe to remove
-it manually at any time.
-
-
=== removed file 'NEWS'
=== removed file 'README'
@@ -1,54 +0,0 @@
-About
-=====
-
-This source package contains the command-line tool for interacting
-with the various services built by the Linaro (www.linaro.org)
-Infrastructure Team.
-
-Note that this package only contains the core tool; to actually
-interact with a service you'll need to install a corresponding plugin.
-XXX explain where to get some plugins.
-
-Installation
-============
-
-See INSTALL
-
-Usage
-=====
-
-Dealing with jobs
-
- $ lava job new file.json # creates file.json from a template
- $ lava job submit file.json # submits file.json to a remote LAVA server
- $ lava job run file.json # runs file.json on a local LAVA device
-
-Dealing with LAVA Test Shell Test Definitions
-
- $ lava testdef new file.yml # creates file.yml from a template
- $ lava testdef submit file.yml # submits file.yml to a remote LAVA server
- $ lava testdef run file.yml # runs file.yml on a local LAVA device
-
-Dealing with LAVA Test Shell Scripts
-
- $ lava script submit SCRIPT # submits SCRIPT to a remote LAVA server
- $ lava script run SCRIPT # runs SCRIPT on a local LAVA device
-
-Bash completion
-===============
-
-Once lava-tool is installed, you can turn bash completion on for the `lava` and
-`lava-tool` programs with the following commands (which you can also paste in
-your ~/.bashrc):
-
- eval "$(register-python-argcomplete lava)"
- eval "$(register-python-argcomplete lava-tool)"
-
-Then if you type for example "lava-tool su<TAB>", it will complete that "su"
-with "submit-job" for you.
-
-Reporting Bugs
-==============
-
-All bugs should be reported to the launchpad project at
-https://bugs.launchpad.net/lava-tool/+filebug
=== added file 'README.obsolete.txt'
@@ -0,0 +1,3 @@
+This package is obsolete.
+
+See http://git.linaro.org/gitweb?p=lava/lava-tool.git instead
=== removed file 'ci-build'
@@ -1,58 +0,0 @@
-#!/bin/sh
-
-VENV_DIR="/tmp/ci-build-venv"
-# Directory where coverage HTML report will be written.
-COVERAGE_REPORT_DIR="lava_tool_coverage"
-
-set -e
-
-if test -z "$VIRTUAL_ENV"; then
- set -x
- virtualenv $VENV_DIR
- . $VENV_DIR/bin/activate
- python setup.py develop
-fi
-
-# requirement for integration tests
-if ! pip show Flask | grep -q Flask; then
- pip install 'Flask==0.9'
-fi
-if ! pip show PyYAML | grep -q PyYAML; then
- pip install PyYAML
-fi
-# requirement for unit tests
-if ! pip show mocker | grep -q mocker; then
- pip install mocker
-fi
-
-if ! pip show mock | grep -q mock; then
- pip install mock
-fi
-# Requirement to run code coverage tests.
-if ! pip show coverage | grep -q coverage; then
- pip install coverage
-fi
-
-if test -z "$DISPLAY"; then
- # actual CI
-
- # will install tests dependencies automatically. The output is also more
- # verbose
- python setup.py test < /dev/null
-
- # integration-tests will pick this up and provide detailed output
- export VERBOSE=1
-else
- # in a development workstation, this will produce shorter/nicer output, but
- # requires the test dependencies to be installed manually (or by running
- # `python setup.py test` before).
- python -m unittest lava_tool.tests.test_suite < /dev/null
-fi
-
-if test -d $COVERAGE_REPORT_DIR; then
- rm -rf $COVERAGE_REPORT_DIR
-fi
-# Runs python-coverage.
-python-coverage run -m unittest lava_tool.tests.test_suite 2>/dev/null && python-coverage html
-
-./integration-tests
=== removed file 'entry_points.ini'
@@ -1,97 +0,0 @@
-[console_scripts]
-lava-tool = lava_tool.dispatcher:main
-lava = lava.tool.main:LavaDispatcher.run
-lava-dashboard-tool=lava_dashboard_tool.main:main
-
-[lava.commands]
-help = lava.tool.commands.help:help
-scheduler = lava_scheduler_tool.commands:scheduler
-dashboard = lava_dashboard_tool.commands:dashboard
-job = lava.job.commands:job
-device = lava.device.commands:device
-testdef = lava.testdef.commands:testdef
-init = lava.commands:init
-submit = lava.commands:submit
-run = lava.commands:run
-status = lava.job.commands:status
-update = lava.commands:update
-script = lava.script.commands:script
-
-[lava_tool.commands]
-help = lava.tool.commands.help:help
-auth-add = lava_tool.commands.auth:auth_add
-submit-job = lava_scheduler_tool.commands:submit_job
-resubmit-job = lava_scheduler_tool.commands:resubmit_job
-cancel-job = lava_scheduler_tool.commands:cancel_job
-job-output = lava_scheduler_tool.commands:job_output
-job-status = lava_scheduler_tool.commands:job_status
-backup=lava_dashboard_tool.commands:backup
-bundles=lava_dashboard_tool.commands:bundles
-data_views=lava_dashboard_tool.commands:data_views
-deserialize=lava_dashboard_tool.commands:deserialize
-get=lava_dashboard_tool.commands:get
-make_stream=lava_dashboard_tool.commands:make_stream
-pull=lava_dashboard_tool.commands:pull
-put=lava_dashboard_tool.commands:put
-query_data_view=lava_dashboard_tool.commands:query_data_view
-restore=lava_dashboard_tool.commands:restore
-server_version=lava_dashboard_tool.commands:server_version
-streams=lava_dashboard_tool.commands:streams
-version=lava_dashboard_tool.commands:version
-
-[lava.scheduler.commands]
-submit-job = lava_scheduler_tool.commands:submit_job
-resubmit-job = lava_scheduler_tool.commands:resubmit_job
-cancel-job = lava_scheduler_tool.commands:cancel_job
-job-output = lava_scheduler_tool.commands:job_output
-job-status = lava_scheduler_tool.commands:job_status
-
-[lava.dashboard.commands]
-backup=lava_dashboard_tool.commands:backup
-bundles=lava_dashboard_tool.commands:bundles
-data_views=lava_dashboard_tool.commands:data_views
-deserialize=lava_dashboard_tool.commands:deserialize
-get=lava_dashboard_tool.commands:get
-make_stream=lava_dashboard_tool.commands:make_stream
-pull=lava_dashboard_tool.commands:pull
-put=lava_dashboard_tool.commands:put
-query_data_view=lava_dashboard_tool.commands:query_data_view
-restore=lava_dashboard_tool.commands:restore
-server_version=lava_dashboard_tool.commands:server_version
-streams=lava_dashboard_tool.commands:streams
-version=lava_dashboard_tool.commands:version
-
-[lava_dashboard_tool.commands]
-backup=lava_dashboard_tool.commands:backup
-bundles=lava_dashboard_tool.commands:bundles
-data_views=lava_dashboard_tool.commands:data_views
-deserialize=lava_dashboard_tool.commands:deserialize
-get=lava_dashboard_tool.commands:get
-make_stream=lava_dashboard_tool.commands:make_stream
-pull=lava_dashboard_tool.commands:pull
-put=lava_dashboard_tool.commands:put
-query_data_view=lava_dashboard_tool.commands:query_data_view
-restore=lava_dashboard_tool.commands:restore
-server_version=lava_dashboard_tool.commands:server_version
-streams=lava_dashboard_tool.commands:streams
-version=lava_dashboard_tool.commands:version
-
-[lava.job.commands]
-new = lava.job.commands:new
-submit = lava.job.commands:submit
-status = lava.job.commands:status
-run = lava.job.commands:run
-
-[lava.device.commands]
-add = lava.device.commands:add
-remove = lava.device.commands:remove
-config = lava.device.commands:config
-
-[lava.testdef.commands]
-new = lava.testdef.commands:new
-run = lava.testdef.commands:run
-submit = lava.testdef.commands:submit
-
-[lava.script.commands]
-run = lava.script.commands:run
-submit = lava.script.commands:submit
=== removed file 'integration-tests'
@@ -1,80 +0,0 @@
-#!/bin/sh
-
-set -e
-
-green() {
- test -t 1 && printf "\033[0;32;40m$@\033[m\n" || echo "$@"
-}
-
-red() {
- test -t 2 && printf "\033[0;31;40m$@\033[m\n" >&2 || echo "$2" >&2
-}
-
-start_server() {
- server_dir="${base_tmpdir}/_server"
- mkdir -p "${server_dir}"
- server_log="${server_dir}/log"
- python integration-tests.d/lib/server.py > "${server_log}" 2>&1 &
- server_pid=$?
-}
-
-stop_server() {
- curl -q http://localhost:5000/exit
-}
-
-run_test() {
- local testfile="$1"
- local logfile="$2"
- rc=0
- if test -n "$VERBOSE"; then
- sh -x "$testfile" < /dev/null || rc=$?
- else
- sh -x "$testfile" > "${logfile}" 2>&1 < /dev/null || rc=$?
- fi
- if test $rc -eq 0; then
- green "$testname: PASS"
- passed=$(($passed + 1))
- else
- failed=$(($failed + 1))
- red "$testname: FAIL"
- if test -f "$logfile"; then
- cat "$logfile"
- fi
- fi
-}
-
-passed=0
-failed=0
-base_tmpdir=$(mktemp -d)
-logs="${base_tmpdir}/logs"
-mkdir "$logs"
-
-export PATH="$(dirname $0)"/integration-tests.d/lib:$PATH
-
-start_server
-
-tests="$@"
-if test -z "$tests"; then
- tests=$(echo integration-tests.d/*.sh)
-fi
-
-for testfile in $tests; do
- testname=$(basename "$testfile")
- logfile="${logs}/${testname}.log"
- export tmpdir="${base_tmpdir}/${testname}"
- export LAVACONFIG="${tmpdir}/config"
- mkdir "${tmpdir}"
- run_test "$testfile" "$logfile"
-done
-
-stop_server
-
-rm -rf "${base_tmpdir}"
-
-echo
-if [ "$failed" -eq 0 ]; then
- green "$passed tests passed, $failed tests failed."
-else
- red "$passed tests passed, $failed tests failed."
- exit 1
-fi
=== removed directory 'integration-tests.d'
=== removed file 'integration-tests.d/lava-job-new-existing.sh'
@@ -1,5 +0,0 @@
-touch "${tmpdir}/foo.json"
-lava job new "${tmpdir}/foo.json"
-rc="$?"
-test "$rc" -gt 0
-
=== removed file 'integration-tests.d/lava-job-new-with-config.sh'
@@ -1,10 +0,0 @@
-cat > "${tmpdir}/config" <<EOF
-[DEFAULT]
-device_type = panda
-
-[device_type=panda]
-image = file:///path/to/panda.img
-EOF
-LAVACACHE="${tmpdir}/config" lava job new "${tmpdir}/job.json" -n
-cat "${tmpdir}/job.json"
-grep "device_type.*panda" "${tmpdir}/job.json" && grep "image.*path.to.panda.img" "${tmpdir}/job.json"
=== removed file 'integration-tests.d/lava-job-submit.sh'
@@ -1,12 +0,0 @@
-fixed_response 999
-
-lava_config <<EOF
-[DEFAULT]
-server = validation.example.com
-
-[server=validation.example.com]
-rpc_endpoint = http://localhost:5000/ok
-EOF
-
-LAVACACHE="${tmpdir}/config" lava job submit integration-tests.d/sample/nexus.json -n > $tmpdir/output
-grep "Job submitted with job ID 999" $tmpdir/output
=== removed directory 'integration-tests.d/lib'
=== removed file 'integration-tests.d/lib/fixed_response'
@@ -1,11 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import sys
-import xmlrpclib
-
-data = eval(sys.argv[1])
-
-output = os.path.join(os.path.dirname(__file__), 'fixed_response.txt')
-with open(output, 'w') as f:
- f.write(xmlrpclib.dumps((data,), methodresponse=True))
=== removed file 'integration-tests.d/lib/lava_config'
@@ -1,5 +0,0 @@
-#!/bin/sh
-
-set -e
-
-cat > "$LAVACONFIG"
=== removed file 'integration-tests.d/lib/server.py'
@@ -1,58 +0,0 @@
-import os
-import yaml
-
-from flask import (
- Flask,
- request,
-)
-
-app = Flask(__name__)
-
-aliases = {
- 'ok': 200,
- 'forbidden': 403,
- 'notfound': 404,
-}
-
-fixed_response = None
-
-@app.route('/exit')
-def exit():
- # http://werkzeug.pocoo.org/docs/serving/#shutting-down-the-server
- if not 'werkzeug.server.shutdown' in request.environ:
- raise RuntimeError('Not running the development server')
- request.environ['werkzeug.server.shutdown']()
- return ""
-
-@app.route('/<status_code>/', methods=['GET', 'POST'])
-def reply(status_code):
-
- status = int(aliases.get(status_code, status_code))
-
- headers = {}
- for k,v in request.headers:
- headers[k] = v
-
- response_file = os.path.join(os.path.dirname(__file__), 'fixed_response.txt')
- if os.path.exists(response_file):
- response = open(response_file).read()
- os.remove(response_file)
- else:
- response = yaml.dump(
- {
- 'status': status,
- 'headers': headers,
- 'body': request.form.keys(),
- 'query': dict(request.args),
- },
- encoding='utf-8',
- default_flow_style=False,
- )
- return response, status, { 'Content-Type': 'text/plain' }
-
-@app.route('/', methods=['GET','POST'])
-def root():
- return reply(200)
-
-if __name__ == '__main__':
- app.run(debug=('DEBUG' in os.environ))
=== removed directory 'integration-tests.d/sample'
=== removed file 'integration-tests.d/sample/nexus.json'
@@ -1,15 +0,0 @@
-{
- "device_type": "nexus",
- "job_name": "Boot test",
- "actions": [
- {
- "command": "deploy_linaro_image",
- "parameters": {
- "image": "http:///url/to/nexus.img"
- }
- },
- {
- "command": "boot_linaro_image"
- }
- ]
-}
\ No newline at end of file
=== removed directory 'lava'
=== removed file 'lava/__init__.py'
@@ -1,3 +0,0 @@
-__import__('pkg_resources').declare_namespace(__name__)
-# DO NOT ADD ANYTHING TO THIS FILE!
-# IT MUST STAY AS IS (empty apart from the two lines above)
=== removed file 'lava/commands.py'
@@ -1,227 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Lava init commands.
-
-When invoking:
-
- `lava init [DIR]`
-
-the command will create a default directory and files structure as follows:
-
-DIR/
- |
- +- JOB_FILE.json
- +- tests/
- |
- + mytest.sh
- + lavatest.yaml
-
-If DIR is not passed, it will use the current working directory.
-JOB_FILE is a file name that will be asked to the user, along with
-other necessary information to define the tests.
-
-If the user manually updates either the lavatest.yaml or mytest.sh file, it is
-necessary to run the following command in order to update the job definition:
-
- `lava update [JOB|DIR]`
-"""
-
-import copy
-import json
-import os
-import sys
-
-from lava.helper.command import BaseCommand
-from lava.helper.template import (
- expand_template,
- set_value
-)
-from lava.job import (
- JOB_FILE_EXTENSIONS,
-)
-from lava.job.templates import (
- LAVA_TEST_SHELL_TAR_REPO_KEY,
-)
-from lava.parameter import (
- Parameter,
-)
-from lava.testdef import (
- DEFAULT_TESTDEF_FILENAME,
-)
-from lava.tool.errors import CommandError
-from lava_tool.utils import (
- base64_encode,
- create_dir,
- create_tar,
- edit_file,
- retrieve_file,
- write_file,
-)
-
-# Default directory structure name.
-TESTS_DIR = "tests"
-
-# Internal parameter ids.
-JOBFILE_ID = "jobfile"
-
-JOBFILE_PARAMETER = Parameter(JOBFILE_ID)
-JOBFILE_PARAMETER.store = False
-
-INIT_TEMPLATE = {
- JOBFILE_ID: JOBFILE_PARAMETER,
-}
-
-
-class init(BaseCommand):
- """Set-ups the base directory structure."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(init, cls).register_arguments(parser)
- parser.add_argument("DIR",
- help=("The name of the directory to initialize. "
- "Defaults to current working directory."),
- nargs="?",
- default=os.getcwd())
-
- def invoke(self):
- full_path = os.path.abspath(self.args.DIR)
-
- if os.path.isfile(full_path):
- raise CommandError("'{0}' already exists, and is a "
- "file.".format(self.args.DIR))
-
- create_dir(full_path)
- data = self._update_data()
-
- # Create the directory that will contain the test definition and
- # shell script.
- test_path = create_dir(full_path, TESTS_DIR)
- shell_script = self.create_shell_script(test_path)
- # Let the user modify the file.
- edit_file(shell_script)
-
- testdef_file = self.create_test_definition(
- os.path.join(test_path, DEFAULT_TESTDEF_FILENAME))
-
- job = data[JOBFILE_ID]
- self.create_tar_repo_job(
- os.path.join(full_path, job), testdef_file, test_path)
-
- def _update_data(self):
- """Updates the template and ask values to the user.
-
- The template in this case is a layout of the directory structure as it
- would be written to disk.
-
- :return A dictionary containing all the necessary file names to create.
- """
- data = copy.deepcopy(INIT_TEMPLATE)
- expand_template(data, self.config)
-
- return data
-
-
-class run(BaseCommand):
- """Runs a job on the local dispatcher."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(run, cls).register_arguments(parser)
- parser.add_argument("JOB",
- help=("The job file to run, or a directory "
- "containing a job file. If nothing is "
- "passed, it uses the current working "
- "directory."),
- nargs="?",
- default=os.getcwd())
-
- def invoke(self):
- full_path = os.path.abspath(self.args.JOB)
- job_file = retrieve_file(full_path, JOB_FILE_EXTENSIONS)
-
- super(run, self).run(job_file)
-
-
-class submit(BaseCommand):
- """Submits a job to LAVA."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(submit, cls).register_arguments(parser)
- parser.add_argument("JOB",
- help=("The job file to send, or a directory "
- "containing a job file. If nothing is "
- "passed, it uses the current working "
- "directory."),
- nargs="?",
- default=os.getcwd())
-
- def invoke(self):
- full_path = os.path.abspath(self.args.JOB)
- job_file = retrieve_file(full_path, JOB_FILE_EXTENSIONS)
-
- super(submit, self).submit(job_file)
-
-
-class update(BaseCommand):
- """Updates a job file with the correct data."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(update, cls).register_arguments(parser)
- parser.add_argument("JOB",
- help=("Automatically updates a job file "
- "definition. If nothing is passed, it uses"
- "the current working directory."),
- nargs="?",
- default=os.getcwd())
-
- def invoke(self):
- full_path = os.path.abspath(self.args.JOB)
- job_file = self.retrieve_file(full_path, JOB_FILE_EXTENSIONS)
- job_dir = os.path.dirname(job_file)
- tests_dir = os.path.join(job_dir, TESTS_DIR)
-
- if os.path.isdir(tests_dir):
- tar_repo = None
- try:
- tar_repo = create_tar(tests_dir)
- encoded_tests = base64_encode(tar_repo)
-
- json_data = None
- with open(job_file, "r") as json_file:
- try:
- json_data = json.load(json_file)
- set_value(json_data, LAVA_TEST_SHELL_TAR_REPO_KEY,
- encoded_tests)
- except Exception:
- raise CommandError("Cannot read job file "
- "'{0}'.".format(job_file))
-
- content = json.dumps(json_data, indent=4)
- write_file(job_file, content)
-
- print >> sys.stdout, "Job definition updated."
- finally:
- if tar_repo and os.path.isfile(tar_repo):
- os.unlink(tar_repo)
- else:
- raise CommandError("Cannot find tests directory.")
=== removed file 'lava/config.py'
@@ -1,294 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Config class.
-"""
-
-import atexit
-import os
-import readline
-import xdg.BaseDirectory as xdgBaseDir
-
-from ConfigParser import (
- ConfigParser,
- NoOptionError,
- NoSectionError,
-)
-
-from lava.parameter import Parameter
-from lava.tool.errors import CommandError
-
-__all__ = ['Config', 'InteractiveCache', 'InteractiveConfig']
-
-# Store for function calls to be made at exit time.
-AT_EXIT_CALLS = set()
-# Config default section.
-DEFAULT_SECTION = "DEFAULT"
-# This is the default base name used to create XDG resources.
-DEFAULT_XDG_RESOURCE = "linaro"
-# This is the default name for lava-tool resources.
-DEFAULT_LAVA_TOOL_RESOURCE = "lava-tool"
-
-HISTORY = os.path.join(os.path.expanduser("~"), ".lava_history")
-try:
- readline.read_history_file(HISTORY)
-except IOError:
- pass
-atexit.register(readline.write_history_file, HISTORY)
-
-
-def _run_at_exit():
- """Runs all the function at exit."""
- for call in list(AT_EXIT_CALLS):
- call()
-atexit.register(_run_at_exit)
-
-
-class Config(object):
- """A generic config object."""
-
- def __init__(self):
- # The cache where to store parameters.
- self._cache = {}
- self._config_file = None
- self._config_backend = None
- AT_EXIT_CALLS.add(self.save)
-
- @property
- def config_file(self):
- if self._config_file is None:
- self._config_file = (os.environ.get('LAVACONFIG') or
- os.path.join(self._ensure_xdg_dirs(),
- 'lava-tool.ini'))
- return self._config_file
-
- @config_file.setter
- def config_file(self, value):
- self._config_file = value
-
- @property
- def config_backend(self):
- if self._config_backend is None:
- self._config_backend = ConfigParser()
- self._config_backend.read([self.config_file])
- return self._config_backend
-
- def _ensure_xdg_dirs(self):
- """Make sure we have the default resource.
-
- :return The path to the XDG resource.
- """
- return xdgBaseDir.save_config_path(DEFAULT_XDG_RESOURCE,
- DEFAULT_LAVA_TOOL_RESOURCE)
-
- def _calculate_config_section(self, parameter):
- """Calculates the config section of the specified parameter.
-
- :param parameter: The parameter to calculate the section of.
- :type Parameter
- :return The config section.
- """
- section = DEFAULT_SECTION
- if parameter.depends:
- section = "{0}={1}".format(parameter.depends.id,
- self.get(parameter.depends))
- return section
-
- def get(self, parameter, section=None):
- """Retrieves a Parameter value.
-
- The value is taken either from the Parameter itself, or from the cache,
- or from the config file.
-
- :param parameter: The parameter to search.
- :type Parameter
- :return The parameter value, or None if it is not found.
- """
- if not section:
- section = self._calculate_config_section(parameter)
- # Try to get the parameter value first if it has one.
- if parameter.value is not None:
- value = parameter.value
- else:
- value = self._get_from_cache(parameter, section)
-
- if value is None:
- value = self._get_from_backend(parameter, section)
- return value
-
- def get_from_backend(self, parameter, section=None):
- """Gets a configuration parameter directly from the config file."""
- if not section:
- section = self._calculate_config_section(parameter)
- return self._get_from_backend(parameter, section)
-
- def _get_from_backend(self, parameter, section):
- """Gets the parameter value from the config backend.
-
- :param parameter: The Parameter to look up.
- :param section: The section in the Config.
- """
- value = None
- try:
- value = self.config_backend.get(section, parameter.id)
- except (NoOptionError, NoSectionError):
- # Ignore, we return None.
- pass
- return value
-
- def _get_from_cache(self, parameter, section):
- """Looks for the specified parameter in the internal cache.
-
- :param parameter: The parameter to search.
- :type Parameter
- :return The parameter value, of None if it is not found.
- """
- value = None
- if section in self._cache.keys():
- if parameter.id in self._cache[section].keys():
- value = self._cache[section][parameter.id]
- return value
-
- def _put_in_cache(self, key, value, section=DEFAULT_SECTION):
- """Insert the passed parameter in the internal cache.
-
- :param parameter: The parameter to insert.
- :type Parameter
- :param section: The name of the section in the config file.
- :type str
- """
- if section not in self._cache.keys():
- self._cache[section] = {}
- self._cache[section][key] = value
-
- def put(self, key, value, section=DEFAULT_SECTION):
- """Adds a parameter to the config file.
-
- :param key: The key to add.
- :param value: The value to add.
- :param section: The name of the section as in the config file.
- """
- if (not self.config_backend.has_section(section) and
- section != DEFAULT_SECTION):
- self.config_backend.add_section(section)
-
- # This is done to serialize a list when ConfigParser is written to
- # file. Since there is no real support for list in ConfigParser, we
- # serialized it in a common way that can get easily deserialized.
- if isinstance(value, list):
- value = Parameter.serialize(value)
-
- self.config_backend.set(section, key, value)
- # Store in the cache too.
- self._put_in_cache(key, value, section)
-
- def put_parameter(self, parameter, value=None, section=None):
- """Adds a Parameter to the config file and cache.
-
- :param Parameter: The parameter to add.
- :type Parameter
- :param value: The value of the parameter. Defaults to None.
- :param section: The section where this parameter should be stored.
- Defaults to None.
- """
- if not section:
- section = self._calculate_config_section(parameter)
-
- if value is None and parameter.value is not None:
- value = parameter.value
- elif value is None:
- raise CommandError("No value assigned to '{0}'.".format(
- parameter.id))
- self.put(parameter.id, value, section)
-
- def save(self):
- """Saves the config to file."""
- # Since we lazy load the config_backend property, this check is needed
- # when a user enters a wrong command or it will overwrite the 'config'
- # file with empty contents.
- if self._config_backend:
- with open(self.config_file, "w") as write_file:
- self.config_backend.write(write_file)
-
-
-class InteractiveConfig(Config):
- """An interactive config.
-
- If a value is not found in the config file, it will ask it and then stores
- it.
- """
- def __init__(self, force_interactive=True):
- super(InteractiveConfig, self).__init__()
- self._force_interactive = force_interactive
-
- @property
- def force_interactive(self):
- return self._force_interactive
-
- @force_interactive.setter
- def force_interactive(self, value):
- self._force_interactive = value
-
- def get(self, parameter, section=None):
- """Overrides the parent one.
-
- The only difference with the parent one, is that it will ask to type
- a parameter value in case it is not found.
- """
- if not section:
- section = self._calculate_config_section(parameter)
- value = super(InteractiveConfig, self).get(parameter, section)
-
- if value is None or self.force_interactive:
- value = parameter.prompt(old_value=value)
-
- if value is not None and parameter.store:
- self.put(parameter.id, value, section)
- return value
-
-
-class InteractiveCache(InteractiveConfig):
-
- """An interactive cache where parameters that can change are stored.
-
- This class is basically the same as the Confing and InteractiveConfig ones,
- only the base directory where the cache file is stored is different.
-
- In this case it will use the $XDG_CACHE_HOME value as defined in XDG.
- """
-
- @property
- def config_file(self):
- if self._config_file is None:
- self._config_file = (os.environ.get('LAVACACHE') or
- os.path.join(self._ensure_xdg_dirs(),
- 'parameters.ini'))
- return self._config_file
-
- @config_file.setter
- def config_file(self, value):
- self._config_file = value
-
- def _ensure_xdg_dirs(self):
- """Make sure we have the default resource.
-
- :return The path to the XDG resource.
- """
- return xdgBaseDir.save_cache_path(DEFAULT_XDG_RESOURCE,
- DEFAULT_LAVA_TOOL_RESOURCE)
=== removed directory 'lava/device'
=== removed file 'lava/device/__init__.py'
@@ -1,97 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""Device class."""
-
-import re
-
-from copy import deepcopy
-
-from lava.device.templates import (
- DEFAULT_TEMPLATE,
- HOSTNAME_PARAMETER,
- KNOWN_TEMPLATES,
-)
-from lava.helper.template import expand_template
-
-
-def __re_compile(name):
- """Creates a generic regex for the specified device name.
-
- :param name: The name of the device.
- :return A Pattern object.
- """
- return re.compile('^.*{0}.*'.format(name), re.I)
-
-
-# Dictionary of know devices.
-# Keys are the general device name taken from lava.device.templates, values
-# are tuples of: a regex matcher to match the device, and the device associated
-# template.
-KNOWN_DEVICES = dict([(device, (__re_compile(device), template))
- for device, template in KNOWN_TEMPLATES.iteritems()])
-
-
-class Device(object):
-
- """A generic device."""
-
- def __init__(self, data, hostname=None):
- self.data = deepcopy(data)
- self.hostname = hostname
-
- def write(self, conf_file):
- """Writes the object to file.
-
- :param conf_file: The full path of the file where to write."""
- with open(conf_file, 'w') as write_file:
- write_file.write(str(self))
-
- def update(self, config):
- """Updates the Device object values based on the provided config.
-
- :param config: A Config instance.
- """
- # We should always have a hostname, since it defaults to the name
- # given on the command line for the config file.
- if self.hostname is not None:
- # We do not ask the user again this parameter.
- self.data[HOSTNAME_PARAMETER.id].value = self.hostname
- self.data[HOSTNAME_PARAMETER.id].asked = True
-
- expand_template(self.data, config)
-
- def __str__(self):
- string_list = []
- for key, value in self.data.iteritems():
- string_list.append("{0} = {1}\n".format(str(key), str(value)))
- return "".join(string_list)
-
-
-def get_known_device(name):
- """Tries to match a device name with a known device type.
-
- :param name: The name of the device we want matched to a real device.
- :return A Device instance.
- """
- instance = Device(DEFAULT_TEMPLATE, hostname=name)
- for _, (matcher, dev_template) in KNOWN_DEVICES.iteritems():
- if matcher.match(name):
- instance = Device(dev_template, hostname=name)
- break
- return instance
=== removed file 'lava/device/commands.py'
@@ -1,122 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Device specific commands class.
-"""
-
-import os
-import sys
-
-from lava.device import get_known_device
-from lava.helper.command import (
- BaseCommand,
-)
-from lava.helper.dispatcher import (
- get_device_file,
- get_devices_path,
-)
-from lava.tool.command import CommandGroup
-from lava.tool.errors import CommandError
-from lava_tool.utils import (
- can_edit_file,
- edit_file,
-)
-
-DEVICE_FILE_SUFFIX = "conf"
-
-
-class device(CommandGroup):
- """LAVA devices handling."""
-
- namespace = "lava.device.commands"
-
-
-class add(BaseCommand):
- """Adds a new device."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(add, cls).register_arguments(parser)
- parser.add_argument("DEVICE", help="The name of the device to add.")
-
- def invoke(self):
- real_file_name = ".".join([self.args.DEVICE, DEVICE_FILE_SUFFIX])
-
- if get_device_file(real_file_name) is not None:
- print >> sys.stdout, ("A device configuration file named '{0}' "
- "already exists.".format(real_file_name))
- print >> sys.stdout, ("Use 'lava device config {0}' to edit "
- "it.".format(self.args.DEVICE))
- sys.exit(-1)
-
- devices_path = get_devices_path()
- device_conf_file = os.path.abspath(os.path.join(devices_path,
- real_file_name))
-
- device = get_known_device(self.args.DEVICE)
- device.update(self.config)
- device.write(device_conf_file)
-
- print >> sys.stdout, ("Created device file '{0}' in: {1}".format(
- real_file_name, devices_path))
- edit_file(device_conf_file)
-
-
-class remove(BaseCommand):
- """Removes the specified device."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(remove, cls).register_arguments(parser)
- parser.add_argument("DEVICE",
- help="The name of the device to remove.")
-
- def invoke(self):
- real_file_name = ".".join([self.args.DEVICE, DEVICE_FILE_SUFFIX])
- device_conf = get_device_file(real_file_name)
-
- if device_conf:
- try:
- os.remove(device_conf)
- print >> sys.stdout, ("Device configuration file '{0}' "
- "removed.".format(real_file_name))
- except OSError:
- raise CommandError("Cannot remove file '{0}' at: {1}.".format(
- real_file_name, os.path.dirname(device_conf)))
- else:
- print >> sys.stdout, ("No device configuration file '{0}' "
- "found.".format(real_file_name))
-
-
-class config(BaseCommand):
- """Opens the specified device config file."""
- @classmethod
- def register_arguments(cls, parser):
- super(config, cls).register_arguments(parser)
- parser.add_argument("DEVICE",
- help="The name of the device to edit.")
-
- def invoke(self):
- real_file_name = ".".join([self.args.DEVICE, DEVICE_FILE_SUFFIX])
- device_conf = get_device_file(real_file_name)
-
- if device_conf and can_edit_file(device_conf):
- edit_file(device_conf)
- else:
- raise CommandError("Cannot edit file '{0}'".format(real_file_name))
=== removed file 'lava/device/templates.py'
@@ -1,82 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-This is just a place where to store a template like dictionary that
-will be used to serialize a Device object.
-"""
-
-from copy import copy
-
-from lava.parameter import Parameter
-
-# The hostname parameter is always in the DEFAULT config section.
-HOSTNAME_PARAMETER = Parameter("hostname")
-DEVICE_TYPE_PARAMETER = Parameter("device_type", depends=HOSTNAME_PARAMETER)
-CONNECTION_COMMAND_PARMAETER = Parameter("connection_command",
- depends=DEVICE_TYPE_PARAMETER)
-
-DEFAULT_TEMPLATE = {
- 'hostname': HOSTNAME_PARAMETER,
- 'device_type': DEVICE_TYPE_PARAMETER,
- 'connection_command': CONNECTION_COMMAND_PARMAETER,
-}
-
-# Specialized copies of the parameters.
-# We need this or we might end up asking the user twice the same parameter due
-# to different object references when one Parameter depends on a "specialized"
-# one, different from the defaults.
-PANDA_DEVICE_TYPE = copy(DEVICE_TYPE_PARAMETER)
-PANDA_DEVICE_TYPE.value = "panda"
-PANDA_DEVICE_TYPE.asked = True
-
-PANDA_CONNECTION_COMMAND = copy(CONNECTION_COMMAND_PARMAETER)
-PANDA_CONNECTION_COMMAND.depends = PANDA_DEVICE_TYPE
-
-VEXPRESS_DEVICE_TYPE = copy(DEVICE_TYPE_PARAMETER)
-VEXPRESS_DEVICE_TYPE.value = "vexpress"
-VEXPRESS_DEVICE_TYPE.asked = True
-
-VEXPRESS_CONNECTION_COMMAND = copy(CONNECTION_COMMAND_PARMAETER)
-VEXPRESS_CONNECTION_COMMAND.depends = VEXPRESS_DEVICE_TYPE
-
-QEMU_DEVICE_TYPE = copy(DEVICE_TYPE_PARAMETER)
-QEMU_DEVICE_TYPE.value = "qemu"
-QEMU_DEVICE_TYPE.asked = True
-
-QEMU_CONNECTION_COMMAND = copy(CONNECTION_COMMAND_PARMAETER)
-QEMU_CONNECTION_COMMAND.depends = QEMU_DEVICE_TYPE
-
-# Dictionary with templates of known devices.
-KNOWN_TEMPLATES = {
- 'panda': {
- 'hostname': HOSTNAME_PARAMETER,
- 'device_type': PANDA_DEVICE_TYPE,
- 'connection_command': PANDA_CONNECTION_COMMAND,
- },
- 'vexpress': {
- 'hostname': HOSTNAME_PARAMETER,
- 'device_type': VEXPRESS_DEVICE_TYPE,
- 'connection_command': VEXPRESS_CONNECTION_COMMAND,
- },
- 'qemu': {
- 'hostname': HOSTNAME_PARAMETER,
- 'device_type': QEMU_DEVICE_TYPE,
- 'connection_command': QEMU_CONNECTION_COMMAND,
- }
-}
=== removed directory 'lava/device/tests'
=== removed file 'lava/device/tests/__init__.py'
=== removed file 'lava/device/tests/test_commands.py'
@@ -1,182 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-lava.device.commands unit tests.
-"""
-
-import os
-
-from mock import (
- MagicMock,
- call,
- patch,
-)
-
-from lava.device.commands import (
- add,
- config,
- remove,
-)
-from lava.helper.tests.helper_test import HelperTest
-from lava.tool.errors import CommandError
-
-
-class AddCommandTest(HelperTest):
-
- def test_register_argument(self):
- # Make sure that the parser add_argument is called and we have the
- # correct argument.
- add_command = add(self.parser, self.args)
- add_command.register_arguments(self.parser)
- name, args, kwargs = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- name, args, kwargs = self.parser.method_calls[1]
- self.assertIn("DEVICE", args)
-
- @patch("lava.device.commands.edit_file", create=True)
- @patch("lava.device.Device.__str__")
- @patch("lava.device.Device.update")
- @patch("lava.device.commands.get_device_file")
- @patch("lava.device.commands.get_devices_path")
- def test_add_invoke_0(self, mocked_get_devices_path,
- mocked_get_device_file, mocked_update, mocked_str,
- mocked_edit_file):
- # Tests invocation of the add command. Verifies that the conf file is
- # written to disk.
- mocked_get_devices_path.return_value = self.temp_dir
- mocked_get_device_file.return_value = None
- mocked_str.return_value = ""
-
- add_command = add(self.parser, self.args)
- add_command.invoke()
-
- expected_path = os.path.join(self.temp_dir,
- ".".join([self.device, "conf"]))
- self.assertTrue(os.path.isfile(expected_path))
-
- @patch("lava.device.commands.edit_file", create=True)
- @patch("lava.device.commands.get_known_device")
- @patch("lava.device.commands.get_devices_path")
- @patch("lava.device.commands.sys.exit")
- @patch("lava.device.commands.get_device_file")
- def test_add_invoke_1(self, mocked_get_device_file, mocked_sys_exit,
- mocked_get_devices_path, mocked_get_known_device,
- mocked_edit_file):
- mocked_get_devices_path.return_value = self.temp_dir
- mocked_get_device_file.return_value = self.temp_file.name
-
- add_command = add(self.parser, self.args)
- add_command.invoke()
-
- self.assertTrue(mocked_sys_exit.called)
-
-
-class RemoveCommandTests(HelperTest):
-
- def test_register_argument(self):
- # Make sure that the parser add_argument is called and we have the
- # correct argument.
- command = remove(self.parser, self.args)
- command.register_arguments(self.parser)
- name, args, kwargs = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- name, args, kwargs = self.parser.method_calls[1]
- self.assertIn("DEVICE", args)
-
- @patch("lava.device.commands.edit_file", create=True)
- @patch("lava.device.Device.__str__", return_value="")
- @patch("lava.device.Device.update")
- @patch("lava.device.commands.get_device_file")
- @patch("lava.device.commands.get_devices_path")
- def test_remove_invoke(self, get_devices_path_mock, get_device_file_mock,
- mocked_update, mocked_str, mocked_edit_file):
- # Tests invocation of the remove command. Verifies that the conf file
- # has been correctly removed.
- # First we add a new conf file, then we remove it.
- get_device_file_mock.return_value = None
- get_devices_path_mock.return_value = self.temp_dir
-
- add_command = add(self.parser, self.args)
- add_command.invoke()
-
- expected_path = os.path.join(self.temp_dir,
- ".".join([self.device, "conf"]))
-
- # Set new values for the mocked function.
- get_device_file_mock.return_value = expected_path
-
- remove_command = remove(self.parser, self.args)
- remove_command.invoke()
-
- self.assertFalse(os.path.isfile(expected_path))
-
- @patch("lava.device.commands.get_device_file",
- new=MagicMock(return_value="/root"))
- def test_remove_invoke_raises(self):
- # Tests invocation of the remove command, with a non existent device
- # configuration file.
- remove_command = remove(self.parser, self.args)
- self.assertRaises(CommandError, remove_command.invoke)
-
-
-class ConfigCommanTests(HelperTest):
-
- def test_register_argument(self):
- # Make sure that the parser add_argument is called and we have the
- # correct argument.
- command = config(self.parser, self.args)
- command.register_arguments(self.parser)
- name, args, kwargs = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- name, args, kwargs = self.parser.method_calls[1]
- self.assertIn("DEVICE", args)
-
- @patch("lava.device.commands.can_edit_file", create=True)
- @patch("lava.device.commands.edit_file", create=True)
- @patch("lava.device.commands.get_device_file")
- def test_config_invoke_0(self, mocked_get_device_file, mocked_edit_file,
- mocked_can_edit_file):
- command = config(self.parser, self.args)
-
- mocked_can_edit_file.return_value = True
- mocked_get_device_file.return_value = self.temp_file.name
- command.invoke()
-
- self.assertTrue(mocked_edit_file.called)
- self.assertEqual([call(self.temp_file.name)],
- mocked_edit_file.call_args_list)
-
- @patch("lava.device.commands.get_device_file",
- new=MagicMock(return_value=None))
- def test_config_invoke_raises_0(self):
- # Tests invocation of the config command, with a non existent device
- # configuration file.
- config_command = config(self.parser, self.args)
- self.assertRaises(CommandError, config_command.invoke)
-
- @patch("lava.device.commands.get_device_file",
- new=MagicMock(return_value="/etc/password"))
- def test_config_invoke_raises_1(self):
- # Tests invocation of the config command, with a non writable file.
- # Hopefully tests are not run as root.
- config_command = config(self.parser, self.args)
- self.assertRaises(CommandError, config_command.invoke)
=== removed file 'lava/device/tests/test_device.py'
@@ -1,119 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Device class unit tests.
-"""
-
-from mock import patch
-
-from lava.config import Config
-from lava.device import (
- Device,
- get_known_device,
-)
-from lava.device.templates import (
- HOSTNAME_PARAMETER,
- PANDA_DEVICE_TYPE,
- PANDA_CONNECTION_COMMAND,
-)
-from lava.helper.tests.helper_test import HelperTest
-from lava.parameter import Parameter
-
-
-class DeviceTest(HelperTest):
-
- def test_get_known_device_panda_0(self):
- # User creates a new device with a guessable name for a device.
- instance = get_known_device('panda_new_01')
- self.assertIsInstance(instance, Device)
- self.assertEqual(instance.data['device_type'].value, 'panda')
-
- def test_get_known_device_panda_1(self):
- # User creates a new device with a guessable name for a device.
- # Name passed has capital letters.
- instance = get_known_device('new_PanDa_02')
- self.assertIsInstance(instance, Device)
- self.assertEqual(instance.data['device_type'].value, 'panda')
-
- def test_get_known_device_vexpress_0(self):
- # User creates a new device with a guessable name for a device.
- # Name passed has capital letters.
- instance = get_known_device('a_VexPress_Device')
- self.assertIsInstance(instance, Device)
- self.assertEqual(instance.data['device_type'].value, 'vexpress')
-
- def test_get_known_device_vexpress_1(self):
- # User creates a new device with a guessable name for a device.
- instance = get_known_device('another-vexpress')
- self.assertIsInstance(instance, Device)
- self.assertIsInstance(instance.data['device_type'], Parameter)
- self.assertEqual(instance.data['device_type'].value, 'vexpress')
-
- @patch("lava.config.Config.save")
- def test_device_update_1(self, patched_save):
- # Tests that when calling update() on a Device, the template gets
- # updated with the correct values from a Config instance.
- hostname = "panda_device"
-
- config = Config()
- config._config_file = self.temp_file.name
- config.put_parameter(HOSTNAME_PARAMETER, hostname)
- config.put_parameter(PANDA_DEVICE_TYPE, "panda")
- config.put_parameter(PANDA_CONNECTION_COMMAND, "test")
-
- expected = {
- "hostname": hostname,
- "device_type": "panda",
- "connection_command": "test"
- }
-
- instance = get_known_device(hostname)
- instance.update(config)
-
- self.assertEqual(expected, instance.data)
-
- @patch("lava.config.Config.save")
- def test_device_write(self, mocked_save):
- # User tries to create a new panda device. The conf file is written
- # and contains the expected results.
- hostname = "panda_device"
-
- config = Config()
- config._config_file = self.temp_file.name
- config.put_parameter(HOSTNAME_PARAMETER, hostname)
- config.put_parameter(PANDA_DEVICE_TYPE, "panda")
- config.put_parameter(PANDA_CONNECTION_COMMAND, "test")
-
- expected = {
- "hostname": hostname,
- "device_type": "panda",
- "connection_command": "test"
- }
-
- instance = get_known_device(hostname)
- instance.update(config)
- instance.write(self.temp_file.name)
-
- expected = ("hostname = panda_device\nconnection_command = test\n"
- "device_type = panda\n")
-
- obtained = ""
- with open(self.temp_file.name) as f:
- obtained = f.read()
- self.assertEqual(expected, obtained)
=== removed directory 'lava/helper'
=== removed file 'lava/helper/__init__.py'
=== removed file 'lava/helper/command.py'
@@ -1,244 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""Base command class common to lava commands series."""
-
-import os
-import sys
-import xmlrpclib
-
-from lava.config import (
- InteractiveCache,
-)
-from lava.helper.dispatcher import get_devices
-from lava.job import Job
-from lava.job.templates import (
- LAVA_TEST_SHELL_TAR_REPO,
- LAVA_TEST_SHELL_TAR_REPO_KEY,
- LAVA_TEST_SHELL_TESDEF_KEY,
-)
-from lava.parameter import (
- Parameter,
- SingleChoiceParameter,
-)
-from lava.script import (
- ShellScript,
- DEFAULT_TESTDEF_SCRIPT,
-)
-from lava.testdef import TestDefinition
-from lava.testdef.templates import (
- TESTDEF_STEPS_KEY,
- TESTDEF_TEMPLATE,
-)
-from lava.tool.command import Command
-from lava.tool.errors import CommandError
-from lava_tool.authtoken import (
- AuthenticatingServerProxy,
- KeyringAuthBackend
-)
-from lava_tool.utils import (
- base64_encode,
- create_tar,
- execute,
- has_command,
- to_list,
- verify_and_create_url,
-)
-
-CONFIG = InteractiveCache()
-
-
-class BaseCommand(Command):
-
- """Base command class for all lava commands."""
-
- def __init__(self, parser, args):
- super(BaseCommand, self).__init__(parser, args)
- self.config = CONFIG
- self.config.force_interactive = self.args.non_interactive
-
- @classmethod
- def register_arguments(cls, parser):
- super(BaseCommand, cls).register_arguments(parser)
- parser.add_argument("--non-interactive", "-n",
- action='store_false',
- help=("Do not ask for input parameters."))
-
- def authenticated_server(self):
- """Returns a connection to a LAVA server.
-
- It will ask the user the necessary parameters to establish the
- connection.
- """
- print >> sys.stdout, "\nServer connection parameters:"
-
- server_name_parameter = Parameter("server")
- rpc_endpoint_parameter = Parameter("rpc_endpoint",
- depends=server_name_parameter)
-
- self.config.get(server_name_parameter)
- endpoint = self.config.get(rpc_endpoint_parameter)
-
- rpc_url = verify_and_create_url(endpoint)
- server = AuthenticatingServerProxy(rpc_url,
- auth_backend=KeyringAuthBackend())
- return server
-
- def submit(self, job_file):
- """Submits a job file to a LAVA server.
-
- :param job_file: The job file to submit.
- :return The job ID on success.
- """
- if os.path.isfile(job_file):
- try:
- jobdata = open(job_file, 'rb').read()
- server = self.authenticated_server()
-
- job_id = server.scheduler.submit_job(jobdata)
- print >> sys.stdout, ("Job submitted with job "
- "ID {0}.".format(job_id))
-
- return job_id
- except xmlrpclib.Fault, exc:
- raise CommandError(str(exc))
- else:
- raise CommandError("Job file '{0}' does not exists, or is not "
- "a file.".format(job_file))
-
- def run(self, job_file):
- """Runs a job file on the local LAVA dispatcher.
-
- :param job_file: The job file to run.
- """
- if os.path.isfile(job_file):
- if has_command("lava-dispatch"):
- devices = get_devices()
- if devices:
- if len(devices) > 1:
- device_names = [device.hostname for device in devices]
- device_param = SingleChoiceParameter("device",
- device_names)
- device = device_param.prompt("Device to use: ")
- else:
- device = devices[0].hostname
- execute(["lava-dispatch", "--target", device, job_file])
- else:
- raise CommandError("Cannot find lava-dispatcher installation.")
- else:
- raise CommandError("Job file '{0}' does not exists, or it is not "
- "a file.".format(job_file))
-
- def status(self, job_id):
- """Retrieves the status of a LAVA job.
-
- :param job_id: The ID of the job to look up.
- """
- job_id = str(job_id)
-
- try:
- server = self.authenticated_server()
- job_status = server.scheduler.job_status(job_id)
-
- status = job_status["job_status"].lower()
- bundle = job_status["bundle_sha1"]
-
- print >> sys.stdout, "\nJob id: {0}".format(job_id)
- print >> sys.stdout, "Status: {0}".format(status)
- print >> sys.stdout, "Bundle: {0}".format(bundle)
- except xmlrpclib.Fault, exc:
- raise CommandError(str(exc))
-
- def create_tar_repo_job(self, job_file, testdef_file, tar_content):
- """Creates a job file based on the tar-repo template.
-
- The tar repo is not kept on the file system.
-
- :param job_file: The path of the job file to create.
- :param testdef_file: The path of the test definition file.
- :param tar_content: What should go into the tarball repository.
- :return The path of the job file created.
- """
-
- print >> sys.stdout, "\nCreating job file..."
-
- try:
- tar_repo = create_tar(tar_content)
-
- job_instance = Job(LAVA_TEST_SHELL_TAR_REPO, job_file)
- job_instance.update(self.config)
-
- job_instance.set(LAVA_TEST_SHELL_TAR_REPO_KEY,
- base64_encode(tar_repo))
- job_instance.set(LAVA_TEST_SHELL_TESDEF_KEY,
- os.path.basename(testdef_file))
-
- job_instance.write()
-
- basename = os.path.basename(job_instance.file_name)
- print >> sys.stdout, ("\nCreated job file "
- "'{0}'.".format(basename))
-
- return job_instance.file_name
- finally:
- if os.path.isfile(tar_repo):
- os.unlink(tar_repo)
-
- def create_test_definition(self, testdef_file, template=TESTDEF_TEMPLATE,
- steps=None):
- """Creates a test definition YAML file.
-
- :param testdef_file: The file to create.
- :return The path of the file created.
- """
-
- print >> sys.stdout, "\nCreating test definition file..."
-
- testdef = TestDefinition(template, testdef_file)
- if steps:
- steps = to_list(steps)
- testdef.set(TESTDEF_STEPS_KEY, steps)
- testdef.update(self.config)
- testdef.write()
-
- basename = os.path.basename(testdef.file_name)
- print >> sys.stdout, ("\nCreated test definition "
- "'{0}'.".format(basename))
-
- return testdef.file_name
-
- def create_shell_script(self, test_path,
- script_name=DEFAULT_TESTDEF_SCRIPT):
- """Creates a shell script with some default content.
-
- :param test_path: The directory where to create the script.
- :param script_name: The name of the script.
- :return The full path to the script file.
- """
- default_script = os.path.join(test_path, script_name)
-
- if not os.path.isfile(default_script):
- print >> sys.stdout, "Creating shell script..."
-
- shell_script = ShellScript(default_script)
- shell_script.write()
-
- print >> sys.stdout, ("\nCreated shell script "
- "'{0}'.".format(script_name))
-
- return default_script
=== removed file 'lava/helper/dispatcher.py'
@@ -1,110 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""Classes and functions to interact with the lava-dispatcher."""
-
-import random
-import string
-import os
-
-from lava.tool.errors import CommandError
-
-# Default devices path, has to be joined with the dispatcher path.
-DEFAULT_DEVICES_PATH = "devices"
-
-
-def get_dispatcher_paths():
- """Tries to get the dispatcher paths from lava-dispatcher.
-
- :return A list of paths.
- """
- try:
- from lava_dispatcher.config import write_path
- return write_path()
- except ImportError:
- raise CommandError("Cannot find lava-dispatcher installation.")
-
-
-def get_devices():
- """Gets the devices list from the dispatcher.
-
- :return A list of DeviceConfig.
- """
- try:
- from lava_dispatcher.config import get_devices
- return get_devices()
- except ImportError:
- raise CommandError("Cannot find lava-dispatcher installation.")
-
-
-def get_device_file(file_name):
- """Retrieves the config file name specified, if it exists.
-
- :param file_name: The config file name to search.
- :return The path to the file, or None if it does not exist.
- """
- try:
- from lava_dispatcher.config import get_config_file
- return get_config_file(os.path.join(DEFAULT_DEVICES_PATH,
- file_name))
- except ImportError:
- raise CommandError("Cannot find lava-dispatcher installation.")
-
-
-def choose_devices_path(paths):
- """Picks the first path that is writable by the user.
-
- :param paths: A list of paths.
- :return The first path where it is possible to write.
- """
- valid_path = None
- for path in paths:
- path = os.path.join(path, DEFAULT_DEVICES_PATH)
- if os.path.exists(path):
- name = "".join(random.choice(string.ascii_letters)
- for x in range(6))
- test_file = os.path.join(path, name)
- try:
- fp = open(test_file, 'a')
- fp.close()
- except IOError:
- # Cannot write here.
- continue
- else:
- valid_path = path
- if os.path.isfile(test_file):
- os.unlink(test_file)
- break
- else:
- try:
- os.makedirs(path)
- except OSError:
- # Cannot write here either.
- continue
- else:
- valid_path = path
- break
- else:
- raise CommandError("Insufficient permissions to create new "
- "devices.")
- return valid_path
-
-
-def get_devices_path():
- """Gets the path to the devices in the LAVA dispatcher."""
- return choose_devices_path(get_dispatcher_paths())
=== removed file 'lava/helper/template.py'
@@ -1,124 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""Helper functions for a template."""
-
-from lava.parameter import Parameter
-
-
-def expand_template(template, config):
- """Updates a template based on the values from the provided config.
-
- :param template: A template to be updated.
- :param config: A Config instance where values should be taken.
- """
-
- def update(data):
- """Internal recursive function."""
- if isinstance(data, dict):
- keys = data.keys()
- elif isinstance(data, list):
- keys = range(len(data))
- else:
- return
- for key in keys:
- entry = data[key]
- if isinstance(entry, Parameter):
- data[key] = config.get(entry)
- else:
- update(entry)
-
- update(template)
-
-
-def get_key(data, search_key):
- """Goes through a template looking for a key.
-
- :param data: The template to traverse.
- :param search_key: The key to look for.
- :return The key value.
- """
- return_value = None
- found = False
-
- if isinstance(data, dict):
- bucket = []
-
- for key, value in data.iteritems():
- if key == search_key:
- return_value = value
- found = True
- break
- else:
- bucket.append(value)
-
- if bucket and not found:
- for element in bucket:
- if isinstance(element, list):
- for element in element:
- bucket.append(element)
- elif isinstance(element, dict):
- for key, value in element.iteritems():
- if key == search_key:
- return_value = value
- found = True
- break
- else:
- bucket.append(value)
- if found:
- break
-
- return return_value
-
-
-def set_value(data, search_key, new_value):
- """Sets a new value for a template key.
-
- :param data: The data structure to update.
- :type dict
- :param search_key: The key to search and update.
- :param new_value: The new value to set.
- """
- is_set = False
-
- if isinstance(data, dict):
- bucket = []
-
- for key, value in data.iteritems():
- if key == search_key:
- data[key] = new_value
- is_set = True
- break
- else:
- bucket.append(value)
-
- if bucket and not is_set:
- for element in bucket:
- if isinstance(element, list):
- for element in element:
- bucket.append(element)
- elif isinstance(element, dict):
- for key, value in element.iteritems():
- if key == search_key:
- element[key] = new_value
- is_set = True
- break
- else:
- bucket.append(value)
- if is_set:
- break
=== removed directory 'lava/helper/tests'
=== removed file 'lava/helper/tests/__init__.py'
=== removed file 'lava/helper/tests/helper_test.py'
@@ -1,81 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-A test helper class.
-
-Here we define a general test class and its own setUp and tearDown methods that
-all other test classes can inherit from.
-"""
-
-import os
-import shutil
-import sys
-import tempfile
-
-from unittest import TestCase
-from mock import (
- MagicMock,
- patch
-)
-
-
-class HelperTest(TestCase):
- """Helper test class that all tests under the lava package can inherit."""
-
- def setUp(self):
- # Need to patch it here, not as a decorator, or running the tests
- # via `./setup.py test` will fail.
- self.at_exit_patcher = patch("lava.config.AT_EXIT_CALLS", spec=set)
- self.at_exit_patcher.start()
- self.original_stdout = sys.stdout
- sys.stdout = open("/dev/null", "w")
- self.original_stderr = sys.stderr
- sys.stderr = open("/dev/null", "w")
- self.original_stdin = sys.stdin
-
- self.device = "a_fake_panda02"
-
- self.temp_file = tempfile.NamedTemporaryFile(delete=False)
- self.temp_dir = tempfile.mkdtemp()
- self.parser = MagicMock()
- self.args = MagicMock()
- self.args.interactive = MagicMock(return_value=False)
- self.args.DEVICE = self.device
-
- def tearDown(self):
- self.at_exit_patcher.stop()
- sys.stdin = self.original_stdin
- sys.stdout = self.original_stdout
- sys.stderr = self.original_stderr
- shutil.rmtree(self.temp_dir)
- os.unlink(self.temp_file.name)
-
- def tmp(self, name):
- """
- Returns the full path to a file, or directory, called `name` in a
- temporary directory.
-
- This method does not create the file, it only gives a full filename
- where you can actually write some data. The file will not be removed
- by this method.
-
- :param name: The name the file/directory should have.
- :return A path.
- """
- return os.path.join(tempfile.gettempdir(), name)
=== removed file 'lava/helper/tests/test_command.py'
@@ -1,47 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""lava.helper.command module tests."""
-
-from mock import MagicMock, patch
-
-
-from lava.helper.command import BaseCommand
-from lava.helper.tests.helper_test import HelperTest
-
-
-class BaseCommandTests(HelperTest):
-
- def test_register_argument(self):
- # Make sure that the parser add_argument is called and we have the
- # correct argument.
- command = BaseCommand(self.parser, self.args)
- command.register_arguments(self.parser)
- name, args, kwargs = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- @patch("lava.helper.command.AuthenticatingServerProxy", create=True)
- def test_authenticated_server(self, mocked_auth_server):
- command = BaseCommand(self.parser, self.args)
- command.config = MagicMock()
- command.config.get = MagicMock()
- command.config.get.side_effect = ["www.example.org", "RPC"]
-
- command.authenticated_server()
-
- self.assertTrue(mocked_auth_server.called)
=== removed file 'lava/helper/tests/test_dispatcher.py'
@@ -1,77 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""lava.helper.dispatcher tests."""
-
-import os
-import tempfile
-
-from mock import patch
-
-from lava.tool.errors import CommandError
-from lava.helper.tests.helper_test import HelperTest
-from lava.helper.dispatcher import (
- choose_devices_path,
-)
-
-
-class DispatcherTests(HelperTest):
-
- def setUp(self):
- super(DispatcherTests, self).setUp()
- self.devices_dir = os.path.join(tempfile.gettempdir(), "devices")
- os.makedirs(self.devices_dir)
-
- def tearDown(self):
- super(DispatcherTests, self).tearDown()
- os.removedirs(self.devices_dir)
-
- def test_choose_devices_path_0(self):
- # Tests that when passing more than one path, the first writable one
- # is returned.
- obtained = choose_devices_path(
- ["/", "/root", self.temp_dir, os.path.expanduser("~")])
- expected = os.path.join(self.temp_dir, "devices")
- self.assertEqual(expected, obtained)
-
- def test_choose_devices_path_1(self):
- # Tests that when passing a path that is not writable, CommandError
- # is raised.
- self.assertRaises(CommandError, choose_devices_path,
- ["/", "/root", "/root/tmpdir"])
-
- def test_choose_devices_path_2(self):
- # Tests that the correct path for devices is created on the filesystem.
- expected_path = os.path.join(self.temp_dir, "devices")
- obtained = choose_devices_path([self.temp_dir])
- self.assertEqual(expected_path, obtained)
- self.assertTrue(os.path.isdir(expected_path))
-
- def test_choose_devices_path_3(self):
- # Tests that returns the already existing devices path.
- obtained = choose_devices_path([tempfile.gettempdir()])
- self.assertEqual(self.devices_dir, obtained)
-
- @patch("__builtin__.open")
- def test_choose_devices_path_4(self, mocked_open):
- # Tests that when IOError is raised and we pass only one dir
- # CommandError is raised.
- mocked_open.side_effect = IOError()
- self.assertRaises(CommandError, choose_devices_path,
- [tempfile.gettempdir()])
- self.assertTrue(mocked_open.called)
=== removed file 'lava/helper/tests/test_template.py'
@@ -1,102 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-""" """
-
-import copy
-from unittest import TestCase
-
-from lava.helper.template import (
- get_key,
- set_value
-)
-
-
-TEST_TEMPLATE = {
- "key1": "value1",
- "key2": [
- "value2", "value3"
- ],
- "key3": [
- {
- "key4": "value4",
- "key5": "value5"
- },
- {
- "key6": "value6",
- "key7": "value7"
- },
- [
- {
- "key8": "value8"
- }
- ]
- ],
- "key10": {
- "key11": "value11"
- }
-}
-
-
-class TestParameter(TestCase):
-
- def test_get_key_simple_key(self):
- expected = "value1"
- obtained = get_key(TEST_TEMPLATE, "key1")
- self.assertEquals(expected, obtained)
-
- def test_get_key_nested_key(self):
- expected = "value4"
- obtained = get_key(TEST_TEMPLATE, "key4")
- self.assertEquals(expected, obtained)
-
- def test_get_key_nested_key_1(self):
- expected = "value7"
- obtained = get_key(TEST_TEMPLATE, "key7")
- self.assertEquals(expected, obtained)
-
- def test_get_key_nested_key_2(self):
- expected = "value8"
- obtained = get_key(TEST_TEMPLATE, "key8")
- self.assertEquals(expected, obtained)
-
- def test_get_key_nested_key_3(self):
- expected = "value11"
- obtained = get_key(TEST_TEMPLATE, "key11")
- self.assertEquals(expected, obtained)
-
- def test_set_value_0(self):
- data = copy.deepcopy(TEST_TEMPLATE)
- expected = "foo"
- set_value(data, "key1", expected)
- obtained = get_key(data, "key1")
- self.assertEquals(expected, obtained)
-
- def test_set_value_1(self):
- data = copy.deepcopy(TEST_TEMPLATE)
- expected = "foo"
- set_value(data, "key6", expected)
- obtained = get_key(data, "key6")
- self.assertEquals(expected, obtained)
-
- def test_set_value_2(self):
- data = copy.deepcopy(TEST_TEMPLATE)
- expected = "foo"
- set_value(data, "key11", expected)
- obtained = get_key(data, "key11")
- self.assertEquals(expected, obtained)
=== removed directory 'lava/job'
=== removed file 'lava/job/__init__.py'
@@ -1,73 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""Job class."""
-
-import json
-
-from copy import deepcopy
-
-from lava.helper.template import (
- expand_template,
- set_value,
-)
-from lava_tool.utils import (
- verify_file_extension,
- verify_path_existance,
- write_file
-)
-
-# A default name for job files.
-DEFAULT_JOB_FILENAME = "lava-tool-job.json"
-# Default job file extension.
-DEFAULT_JOB_EXTENSION = "json"
-# Possible extension for a job file.
-JOB_FILE_EXTENSIONS = [DEFAULT_JOB_EXTENSION]
-
-
-class Job(object):
-
- """A Job object.
-
- This class should be used to create new job files. The initialization
- enforces a default file name extension, and makes sure that the file is
- not already present on the file system.
- """
-
- def __init__(self, data, file_name):
- self.file_name = verify_file_extension(file_name,
- DEFAULT_JOB_EXTENSION,
- JOB_FILE_EXTENSIONS)
- verify_path_existance(self.file_name)
- self.data = deepcopy(data)
-
- def set(self, key, value):
- """Set key to the specified value.
-
- :param key: The key to look in the object data.
- :param value: The value to set.
- """
- set_value(self.data, key, value)
-
- def update(self, config):
- """Updates the Job object based on the provided config."""
- expand_template(self.data, config)
-
- def write(self):
- """Writes the Job object to file."""
- write_file(self.file_name, json.dumps(self.data, indent=4))
=== removed file 'lava/job/commands.py'
@@ -1,107 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-LAVA job commands.
-"""
-
-import os
-
-from lava.helper.command import BaseCommand
-from lava.job import Job
-from lava.job.templates import (
- BOOT_TEST_KEY,
- JOB_TYPES,
-)
-from lava.tool.command import CommandGroup
-from lava.tool.errors import CommandError
-
-
-class job(CommandGroup):
- """LAVA job file handling."""
- namespace = 'lava.job.commands'
-
-
-class new(BaseCommand):
- """Creates a new job file."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(new, cls).register_arguments(parser)
- parser.add_argument("FILE", help=("Job file to be created."))
- parser.add_argument("--type",
- help=("The type of job to create. Defaults to "
- "'{0}'.".format(BOOT_TEST_KEY)),
- choices=JOB_TYPES.keys(),
- default=BOOT_TEST_KEY)
-
- def invoke(self, job_template=None):
- if not job_template:
- job_template = JOB_TYPES.get(self.args.type)
-
- full_path = os.path.abspath(self.args.FILE)
-
- job_instance = Job(job_template, full_path)
- job_instance.update(self.config)
- job_instance.write()
-
-
-class submit(BaseCommand):
-
- """Submits the specified job file."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(submit, cls).register_arguments(parser)
- parser.add_argument("FILE", help=("The job file to submit."))
-
- def invoke(self):
- super(submit, self).submit(self.args.FILE)
-
-
-class run(BaseCommand):
-
- """Runs the specified job file on the local dispatcher."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(run, cls).register_arguments(parser)
- parser.add_argument("FILE", help=("The job file to submit."))
-
- def invoke(self):
- super(run, self).run(self.args.FILE)
-
-
-class status(BaseCommand):
-
- """Retrieves the status of a job."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(status, cls).register_arguments(parser)
- parser.add_argument("JOB_ID",
- help=("Prints status information about the "
- "provided job id."),
- nargs="?",
- default=None)
-
- def invoke(self):
- if self.args.JOB_ID:
- super(status, self).status(self.args.JOB_ID)
- else:
- raise CommandError("It is necessary to specify a job id.")
=== removed file 'lava/job/templates.py'
@@ -1,106 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-from lava.parameter import (
- ListParameter,
- Parameter,
-)
-
-LAVA_TEST_SHELL_TAR_REPO_KEY = "tar-repo"
-LAVA_TEST_SHELL_TESDEF_KEY = "testdef"
-
-DEVICE_TYPE_PARAMETER = Parameter("device_type")
-PREBUILT_IMAGE_PARAMETER = Parameter("image", depends=DEVICE_TYPE_PARAMETER)
-
-TESTDEF_URLS_PARAMETER = ListParameter("testdef_urls")
-TESTDEF_URLS_PARAMETER.store = False
-
-BOOT_TEST = {
- "timeout": 18000,
- "job_name": "Boot test",
- "device_type": DEVICE_TYPE_PARAMETER,
- "actions": [
- {
- "command": "deploy_linaro_image",
- "parameters": {
- "image": PREBUILT_IMAGE_PARAMETER
- }
- },
- {
- "command": "boot_linaro_image"
- }
- ]
-}
-
-LAVA_TEST_SHELL = {
- "job_name": "LAVA Test Shell",
- "timeout": 18000,
- "device_type": DEVICE_TYPE_PARAMETER,
- "actions": [
- {
- "command": "deploy_linaro_image",
- "parameters": {
- "image": PREBUILT_IMAGE_PARAMETER,
- }
- },
- {
- "command": "lava_test_shell",
- "parameters": {
- "timeout": 1800,
- "testdef_urls": TESTDEF_URLS_PARAMETER,
- }
- }
- ]
-}
-
-# This is a special case template, only use when automatically create job files
-# starting from a testdef or a script. Never to be used directly by the user.
-LAVA_TEST_SHELL_TAR_REPO = {
- "job_name": "LAVA Test Shell",
- "timeout": 18000,
- "device_type": DEVICE_TYPE_PARAMETER,
- "actions": [
- {
- "command": "deploy_linaro_image",
- "parameters": {
- "image": PREBUILT_IMAGE_PARAMETER,
- }
- },
- {
- "command": "lava_test_shell",
- "parameters": {
- "timeout": 1800,
- "testdef_repos": [
- {
- LAVA_TEST_SHELL_TESDEF_KEY: None,
- LAVA_TEST_SHELL_TAR_REPO_KEY: None,
- }
- ]
- }
- }
- ]
-}
-
-BOOT_TEST_KEY = "boot-test"
-LAVA_TEST_SHELL_KEY = "lava-test-shell"
-
-# Dict with all the user available job templates.
-JOB_TYPES = {
- BOOT_TEST_KEY: BOOT_TEST,
- LAVA_TEST_SHELL_KEY: LAVA_TEST_SHELL,
-}
=== removed directory 'lava/job/tests'
=== removed file 'lava/job/tests/__init__.py'
=== removed file 'lava/job/tests/test_commands.py'
@@ -1,155 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Unit tests for the commands classes
-"""
-
-import json
-import os
-
-from mock import patch
-
-from lava.config import Config
-from lava.helper.tests.helper_test import HelperTest
-from lava.job.commands import (
- new,
- run,
- submit,
- status,
-)
-from lava.parameter import Parameter
-from lava.tool.errors import CommandError
-
-
-class CommandTest(HelperTest):
-
- def setUp(self):
- super(CommandTest, self).setUp()
- self.args.FILE = self.temp_file.name
- self.args.type = "boot-test"
-
- self.device_type = Parameter('device_type')
- self.prebuilt_image = Parameter('prebuilt_image',
- depends=self.device_type)
- self.config = Config()
- self.config.put_parameter(self.device_type, 'foo')
- self.config.put_parameter(self.prebuilt_image, 'bar')
-
-
-class JobNewTest(CommandTest):
-
- def setUp(self):
- super(JobNewTest, self).setUp()
- self.args.FILE = self.tmp("new_file.json")
- self.new_command = new(self.parser, self.args)
- self.new_command.config = self.config
-
- def tearDown(self):
- super(JobNewTest, self).tearDown()
- if os.path.exists(self.args.FILE):
- os.unlink(self.args.FILE)
-
- def test_register_arguments(self):
- new_cmd = new(self.parser, self.args)
- new_cmd.register_arguments(self.parser)
-
- # Make sure we do not forget about this test.
- self.assertEqual(3, len(self.parser.method_calls))
-
- _, args, _ = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- _, args, _ = self.parser.method_calls[1]
- self.assertIn("FILE", args)
-
- _, args, _ = self.parser.method_calls[2]
- self.assertIn("--type", args)
-
- def test_create_new_file(self):
- self.new_command.invoke()
- self.assertTrue(os.path.exists(self.args.FILE))
-
- def test_fills_in_template_parameters(self):
- self.new_command.invoke()
-
- data = json.loads(open(self.args.FILE).read())
- self.assertEqual(data['device_type'], 'foo')
-
- def test_wont_overwrite_existing_file(self):
- with open(self.args.FILE, 'w') as f:
- f.write("CONTENTS")
-
- self.assertRaises(CommandError, self.new_command.invoke)
- self.assertEqual("CONTENTS", open(self.args.FILE).read())
-
-
-class JobSubmitTest(CommandTest):
-
- def test_receives_job_file_in_cmdline(self):
- command = submit(self.parser, self.args)
- command.register_arguments(self.parser)
- name, args, kwargs = self.parser.method_calls[1]
- self.assertIn("FILE", args)
-
-
-class JobRunTest(CommandTest):
-
- def test_register_arguments(self):
- run_cmd = run(self.parser, self.args)
- run_cmd.register_arguments(self.parser)
-
- # Make sure we do not forget about this test.
- self.assertEqual(2, len(self.parser.method_calls))
-
- _, args, _ = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- _, args, _ = self.parser.method_calls[1]
- self.assertIn("FILE", args)
-
- def test_invoke_raises_0(self):
- # Users passes a non existing job file to the run command.
- self.args.FILE = self.tmp("test_invoke_raises_0.json")
- command = run(self.parser, self.args)
- self.assertRaises(CommandError, command.invoke)
-
- @patch("lava.helper.command.has_command", create=True)
- def test_invoke_raises_1(self, mocked_has_command):
- # User passes a valid file to the run command, but she does not have
- # the dispatcher installed.
- mocked_has_command.return_value = False
- command = run(self.parser, self.args)
- self.assertRaises(CommandError, command.invoke)
-
-
-class TestsStatusCommand(CommandTest):
-
- def test_register_arguments(self):
- self.args.JOB_ID = "1"
- status_cmd = status(self.parser, self.args)
- status_cmd.register_arguments(self.parser)
-
- # Make sure we do not forget about this test.
- self.assertEqual(2, len(self.parser.method_calls))
-
- _, args, _ = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- _, args, _ = self.parser.method_calls[1]
- self.assertIn("JOB_ID", args)
=== removed file 'lava/job/tests/test_job.py'
@@ -1,92 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Unit tests for the Job class
-"""
-
-import os
-import json
-import tempfile
-
-from mock import patch
-
-from lava.config import Config
-from lava.helper.tests.helper_test import HelperTest
-from lava.job import Job
-from lava.job.templates import BOOT_TEST
-from lava.parameter import Parameter
-
-
-class JobTest(HelperTest):
-
- @patch("lava.config.Config.save")
- def setUp(self, mocked_config):
- super(JobTest, self).setUp()
- self.config = Config()
- self.config.config_file = self.temp_file.name
-
- def test_from_template(self):
- template = {}
- job = Job(template, self.temp_file.name)
- self.assertEqual(job.data, template)
- self.assertIsNot(job.data, template)
-
- def test_update_data(self):
- image = "/path/to/panda.img"
- param1 = Parameter("device_type")
- param2 = Parameter("image", depends=param1)
- self.config.put_parameter(param1, "panda")
- self.config.put_parameter(param2, image)
-
- job = Job(BOOT_TEST, self.temp_file.name)
- job.update(self.config)
-
- self.assertEqual(job.data['device_type'], "panda")
- self.assertEqual(job.data['actions'][0]["parameters"]["image"], image)
-
- def test_write(self):
- try:
- orig_data = {"foo": "bar"}
- job_file = os.path.join(tempfile.gettempdir(), "a_json_file.json")
- job = Job(orig_data, job_file)
- job.write()
-
- output = ""
- with open(job_file) as read_file:
- output = read_file.read()
-
- data = json.loads(output)
- self.assertEqual(data, orig_data)
- finally:
- os.unlink(job_file)
-
- def test_writes_nicely_formatted_json(self):
- try:
- orig_data = {"foo": "bar"}
- job_file = os.path.join(tempfile.gettempdir(), "b_json_file.json")
- job = Job(orig_data, job_file)
- job.write()
-
- output = ""
- with open(job_file) as read_file:
- output = read_file.read()
-
- self.assertTrue(output.startswith("{\n"))
- finally:
- os.unlink(job_file)
=== removed file 'lava/parameter.py'
@@ -1,251 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Antonio Terceiro <antonio.terceiro@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Parameter class and its accessory methods/functions.
-"""
-
-import sys
-import types
-
-from lava_tool.utils import to_list
-
-# Character used to join serialized list parameters.
-LIST_SERIALIZE_DELIMITER = ","
-
-
-class Parameter(object):
- """A parameter with an optional dependency."""
- def __init__(self, id, value=None, depends=None):
- """Creates a new parameter.
-
- :param id: The name of this parameter.
- :param value: The value of this parameter. Defaults to None.
- :param depends: If this Parameter depends on another one. Defaults
- to None.
- :type Parameter
- """
- self.id = id
- self.value = value
- self.depends = depends
- self.asked = False
- # Whether to store or not the parameter in the user config file.
- self.store = True
-
- def set(self, value):
- """Sets the value of the parameter.
-
- :param value: The value to set.
- """
- self.value = value
-
- def prompt(self, old_value=None):
- """Gets the parameter value from the user.
-
- To get user input, the builtin `raw_input` function will be used. Input
- will also be stripped of possible whitespace chars. If Enter or any
- sort of whitespace chars in typed, the old Parameter value will be
- returned.
-
- :param old_value: The old parameter value.
- :return The input as typed by the user, or the old value.
- """
- if not self.asked:
- if old_value is not None:
- prompt = "{0} [{1}]: ".format(self.id, old_value)
- else:
- prompt = "{0}: ".format(self.id)
-
- user_input = self.get_user_input(prompt)
-
- if user_input is not None:
- if len(user_input) == 0 and old_value:
- # Keep the old value when user press enter or another
- # whitespace char.
- self.value = old_value
- else:
- self.value = user_input
-
- self.asked = True
-
- return self.value
-
- @classmethod
- def get_user_input(cls, prompt=""):
- """Asks the user for input data.
-
- :param prompt: The prompt that should be given to the user.
- :return A string with what the user typed.
- """
- data = None
- try:
- data = raw_input(prompt).strip()
- except EOFError:
- # Force to return None.
- data = None
- except KeyboardInterrupt:
- sys.exit(-1)
- return data
-
- @classmethod
- def serialize(cls, value):
- """Serializes the passed value to be friendly written to file.
-
- Lists are serialized as a comma separated string of values.
-
- :param value: The value to serialize.
- :return The serialized value as string.
- """
- serialized = ""
- if isinstance(value, list):
- serialized = LIST_SERIALIZE_DELIMITER.join(
- str(x) for x in value if x)
- else:
- serialized = str(value)
- return serialized
-
- @classmethod
- def deserialize(cls, value):
- """Deserialize a value into a list.
-
- The value must have been serialized with the class instance serialize()
- method.
-
- :param value: The string value to be deserialized.
- :type str
- :return A list of values.
- """
- deserialized = []
- if isinstance(value, types.StringTypes):
- deserialized = filter(None, (x.strip() for x in value.split(
- LIST_SERIALIZE_DELIMITER)))
- else:
- deserialized = list(value)
- return deserialized
-
-
-class SingleChoiceParameter(Parameter):
- """A parameter implemeting a single choice between multiple choices."""
- def __init__(self, id, choices):
- super(SingleChoiceParameter, self).__init__(id)
- self.choices = to_list(choices)
-
- def prompt(self, prompt, old_value=None):
- """Asks the user for their choice."""
- # Sliglty different than the other parameters: here we first present
- # the user with what the choices are about.
- print >> sys.stdout, prompt
-
- index = 1
- for choice in self.choices:
- print >> sys.stdout, "\t{0:d}. {1}".format(index, choice)
- index += 1
-
- choices_len = len(self.choices)
- while True:
- user_input = self.get_user_input("Choice: ")
-
- if len(user_input) == 0 and old_value:
- choice = old_value
- break
- elif user_input in [str(x) for x in range(1, choices_len + 1)]:
- choice = self.choices[int(user_input) - 1]
- break
-
- return choice
-
-
-class ListParameter(Parameter):
- """A specialized Parameter to handle list values."""
-
- # This is used as a deletion character. When we have an old value and the
- # user enters this char, it sort of deletes the value.
- DELETE_CHAR = "-"
-
- def __init__(self, id, value=None, depends=None):
- super(ListParameter, self).__init__(id, depends=depends)
- self.value = []
- if value:
- self.set(value)
-
- def set(self, value):
- """Sets the value of the parameter.
-
- :param value: The value to set.
- """
- self.value = to_list(value)
-
- def add(self, value):
- """Adds a new value to the list of values of this parameter.
-
- :param value: The value to add.
- """
- if isinstance(value, list):
- self.value.extend(value)
- else:
- self.value.append(value)
-
- def prompt(self, old_value=None):
- """Gets the parameter in a list form.
-
- To exit the input procedure it is necessary to insert an empty line.
-
- :return The list of values.
- """
-
- if not self.asked:
- if old_value is not None:
- # We might get the old value read from file via ConfigParser,
- # and usually it comes in string format.
- old_value = self.deserialize(old_value)
-
- print >> sys.stdout, "Values for '{0}': ".format(self.id)
-
- index = 1
- while True:
- user_input = None
- if old_value is not None and (0 < len(old_value) >= index):
- prompt = "{0:>3d}.\n\told: {1}\n\tnew: ".format(
- index, old_value[index-1])
- user_input = self.get_user_input(prompt)
- else:
- prompt = "{0:>3d}. ".format(index)
- user_input = self.get_user_input(prompt)
-
- if user_input is not None:
- # The user has pressed Enter.
- if len(user_input) == 0:
- if old_value is not None and \
- (0 < len(old_value) >= index):
- user_input = old_value[index-1]
- else:
- break
-
- if len(user_input) == 1 and user_input == \
- self.DELETE_CHAR and (0 < len(old_value) >= index):
- # We have an old value, user presses the DELETE_CHAR
- # and we do not store anything. This is done to delete
- # an old entry.
- pass
- else:
- self.value.append(user_input)
- index += 1
-
- self.asked = True
-
- return self.value
=== removed directory 'lava/script'
=== removed file 'lava/script/__init__.py'
@@ -1,51 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""Scripts handling class."""
-
-import os
-import stat
-
-from lava_tool.utils import write_file
-
-
-DEFAULT_MOD = stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH
-DEFAULT_TESTDEF_SCRIPT_CONTENT = """#!/bin/sh
-# Automatic generated content by lava-tool.
-# Please add your own instructions.
-#
-# You can use all the avialable Bash commands.
-#
-# For the available LAVA commands, see:
-# http://lava.readthedocs.org/
-#
-"""
-DEFAULT_TESTDEF_SCRIPT = "mytest.sh"
-
-
-class ShellScript(object):
-
- """Creates a shell script on the file system with some content."""
-
- def __init__(self, file_name):
- self.file_name = file_name
-
- def write(self):
- write_file(self.file_name, DEFAULT_TESTDEF_SCRIPT_CONTENT)
- # Make sure the script is executable.
- os.chmod(self.file_name, DEFAULT_MOD)
=== removed file 'lava/script/commands.py'
@@ -1,115 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""Commands to run or submit a script."""
-
-import os
-import tempfile
-
-from lava.helper.command import BaseCommand
-from lava.job import DEFAULT_JOB_FILENAME
-from lava.testdef import DEFAULT_TESTDEF_FILENAME
-from lava.tool.command import CommandGroup
-from lava_tool.utils import verify_path_non_existance
-
-
-class script(CommandGroup):
-
- """LAVA script file handling."""
-
- namespace = "lava.script.commands"
-
-
-class ScriptBaseCommand(BaseCommand):
-
- def _create_tmp_job_file(self, script_file):
- """Creates a temporary job file to run or submit the passed file.
-
- The temporary job file and its accessory test definition file are
- not removed by this method.
-
- :param script_file: The script file that has to be run or submitted.
- :return A tuple with the job file path, and the test definition path.
- """
- script_file = os.path.abspath(script_file)
- verify_path_non_existance(script_file)
-
- temp_dir = tempfile.gettempdir()
-
- # The name of the job and testdef files.
- job_file = os.path.join(temp_dir, DEFAULT_JOB_FILENAME)
- testdef_file = os.path.join(temp_dir, DEFAULT_TESTDEF_FILENAME)
-
- # The steps that the testdef file should have. We need to change it
- # from the default one, since the users are passing their own file.
- steps = "./" + os.path.basename(script_file)
- testdef_file = self.create_test_definition(testdef_file,
- steps=steps)
-
- # The content of the tar file.
- tar_content = [script_file, testdef_file]
- job_file = self.create_tar_repo_job(job_file, testdef_file,
- tar_content)
-
- return (job_file, testdef_file)
-
-
-class run(ScriptBaseCommand):
-
- """Runs the specified shell script on a local device."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(run, cls).register_arguments(parser)
- parser.add_argument("FILE", help="Shell script file to run.")
-
- def invoke(self):
- job_file = ""
- testdef_file = ""
-
- try:
- job_file, testdef_file = self._create_tmp_job_file(self.args.FILE)
- super(run, self).run(job_file)
- finally:
- if os.path.isfile(job_file):
- os.unlink(job_file)
- if os.path.isfile(testdef_file):
- os.unlink(testdef_file)
-
-
-class submit(ScriptBaseCommand):
-
- """Submits the specified shell script to a LAVA server."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(submit, cls).register_arguments(parser)
- parser.add_argument("FILE", help="Shell script file to send.")
-
- def invoke(self):
- job_file = ""
- testdef_file = ""
-
- try:
- job_file, testdef_file = self._create_tmp_job_file(self.args.FILE)
- super(submit, self).submit(job_file)
- finally:
- if os.path.isfile(job_file):
- os.unlink(job_file)
- if os.path.isfile(testdef_file):
- os.unlink(testdef_file)
=== removed directory 'lava/script/tests'
=== removed file 'lava/script/tests/__init__.py'
=== removed file 'lava/script/tests/test_commands.py'
@@ -1,59 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Tests for lava.script.commands.
-"""
-
-from lava.helper.tests.helper_test import HelperTest
-from lava.script.commands import (
- run,
- submit,
-)
-
-
-class RunCommandTests(HelperTest):
-
- def test_register_arguments(self):
- run_cmd = run(self.parser, self.args)
- run_cmd.register_arguments(self.parser)
-
- # Make sure we do not forget about this test.
- self.assertEqual(2, len(self.parser.method_calls))
-
- _, args, _ = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- _, args, _ = self.parser.method_calls[1]
- self.assertIn("FILE", args)
-
-
-class SubmitCommandTests(HelperTest):
-
- def test_register_arguments(self):
- submit_cmd = submit(self.parser, self.args)
- submit_cmd.register_arguments(self.parser)
-
- # Make sure we do not forget about this test.
- self.assertEqual(2, len(self.parser.method_calls))
-
- _, args, _ = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- _, args, _ = self.parser.method_calls[1]
- self.assertIn("FILE", args)
=== removed file 'lava/script/tests/test_script.py'
@@ -1,80 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Unittests for the ShellScript class.
-"""
-
-import os
-import stat
-
-from lava.helper.tests.helper_test import HelperTest
-from lava.script import ShellScript
-
-
-class ShellScriptTests(HelperTest):
-
- """ShellScript tests."""
-
- def test_create_file(self):
- # Tests that a shell script is actually written.
- try:
- temp_file = self.tmp("a_shell_test")
- script = ShellScript(temp_file)
- script.write()
-
- self.assertTrue(os.path.isfile(temp_file))
- finally:
- os.unlink(temp_file)
-
- def test_assure_executable(self):
- # Tests that the shell script created is executable.
- try:
- temp_file = self.tmp("a_shell_test")
- script = ShellScript(temp_file)
- script.write()
-
- expected = (stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH |
- stat.S_IXOTH)
-
- obtained = stat.S_IMODE(os.stat(temp_file).st_mode)
- self.assertEquals(expected, obtained)
- finally:
- os.unlink(temp_file)
-
- def test_shell_script_content(self):
- # Tests that the shell script created contains the exepcted content.
- try:
- temp_file = self.tmp("a_shell_test")
- script = ShellScript(temp_file)
- script.write()
-
- obtained = ""
- with open(temp_file) as read_file:
- obtained = read_file.read()
-
- expected = ("#!/bin/sh\n# Automatic generated "
- "content by lava-tool.\n# Please add your own "
- "instructions.\n#\n# You can use all the avialable "
- "Bash commands.\n#\n# For the available LAVA "
- "commands, see:\n# http://lava.readthedocs.org/\n"
- "#\n")
-
- self.assertEquals(expected, obtained)
- finally:
- os.unlink(temp_file)
=== removed directory 'lava/testdef'
=== removed file 'lava/testdef/__init__.py'
@@ -1,82 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""TestDefinition class."""
-
-import yaml
-
-from copy import deepcopy
-
-from lava.helper.template import (
- expand_template,
- set_value,
-)
-from lava_tool.utils import (
- write_file,
- verify_path_existance,
- verify_file_extension,
-)
-
-# Default name for a test definition.
-DEFAULT_TESTDEF_FILENAME = "lavatest.yaml"
-# Default test def file extension.
-DEFAULT_TESTDEF_EXTENSION = "yaml"
-# Possible extensions for a test def file.
-TESTDEF_FILE_EXTENSIONS = [DEFAULT_TESTDEF_EXTENSION]
-
-
-class TestDefinition(object):
-
- """A test definition object.
-
- This class should be used to create test definitions. The initialization
- enforces a default file name extension, and makes sure that the file is
- not already present on the file system.
- """
-
- def __init__(self, data, file_name):
- """Initialize the object.
-
- :param data: The serializable data to be used, usually a template.
- :type dict
- :param file_name: Where the test definition will be written.
- :type str
- """
- self.file_name = verify_file_extension(file_name,
- DEFAULT_TESTDEF_EXTENSION,
- TESTDEF_FILE_EXTENSIONS)
- verify_path_existance(self.file_name)
-
- self.data = deepcopy(data)
-
- def set(self, key, value):
- """Set key to the specified value.
-
- :param key: The key to look in the object data.
- :param value: The value to set.
- """
- set_value(self.data, key, value)
-
- def write(self):
- """Writes the test definition to file."""
- content = yaml.dump(self.data, default_flow_style=False, indent=4)
- write_file(self.file_name, content)
-
- def update(self, config):
- """Updates the TestDefinition object based on the provided config."""
- expand_template(self.data, config)
=== removed file 'lava/testdef/commands.py'
@@ -1,104 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Test definition commands class.
-"""
-
-import os
-import tempfile
-
-from lava.helper.command import BaseCommand
-from lava.job import DEFAULT_JOB_FILENAME
-from lava.tool.command import CommandGroup
-from lava_tool.utils import verify_path_non_existance
-
-
-class testdef(CommandGroup):
-
- """LAVA test definitions handling."""
-
- namespace = "lava.testdef.commands"
-
-
-class TestdefBaseCommand(BaseCommand):
-
- def _create_tmp_job_file(self, testdef_file):
- testdef_file = os.path.abspath(testdef_file)
- verify_path_non_existance(testdef_file)
-
- job_file = os.path.join(tempfile.gettempdir(),
- DEFAULT_JOB_FILENAME)
-
- tar_content = [testdef_file]
- job_file = self.create_tar_repo_job(job_file, testdef_file,
- tar_content)
-
- return job_file
-
-
-class new(TestdefBaseCommand):
-
- """Creates a new test definition file."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(new, cls).register_arguments(parser)
- parser.add_argument("FILE", help="Test definition file to create.")
-
- def invoke(self):
- full_path = os.path.abspath(self.args.FILE)
- self.create_test_definition(full_path)
-
-
-class run(TestdefBaseCommand):
-
- """Runs the specified test definition on a local device."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(run, cls).register_arguments(parser)
- parser.add_argument("FILE", help="Test definition file to run.")
-
- def invoke(self):
- job_file = ""
- try:
- job_file = self._create_tmp_job_file(self.args.FILE)
- super(run, self).run(job_file)
- finally:
- if os.path.isfile(job_file):
- os.unlink(job_file)
-
-
-class submit(TestdefBaseCommand):
-
- """Submits the specified test definition to a LAVA server."""
-
- @classmethod
- def register_arguments(cls, parser):
- super(submit, cls).register_arguments(parser)
- parser.add_argument("FILE", help="Test definition file to send.")
-
- def invoke(self):
- job_file = ""
- try:
- job_file = self._create_tmp_job_file(self.args.FILE)
- super(submit, self).submit(job_file)
- finally:
- if os.path.isfile(job_file):
- os.unlink(job_file)
=== removed file 'lava/testdef/templates.py'
@@ -1,52 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test definition templates."""
-
-from lava.parameter import (
- Parameter,
-)
-
-DEFAULT_TESTDEF_VERSION = "1.0"
-DEFAULT_TESTDEF_FORMAT = "Lava-Test Test Definition 1.0"
-DEFAULT_ENVIRONMET_VALUE = "lava_test_shell"
-
-# All these parameters will not be stored on the local config file.
-NAME_PARAMETER = Parameter("name")
-NAME_PARAMETER.store = False
-
-DESCRIPTION_PARAMETER = Parameter("description", depends=NAME_PARAMETER)
-DESCRIPTION_PARAMETER.store = False
-
-TESTDEF_STEPS_KEY = "steps"
-
-TESTDEF_TEMPLATE = {
- "metadata": {
- "name": NAME_PARAMETER,
- "format": DEFAULT_TESTDEF_FORMAT,
- "version": DEFAULT_TESTDEF_VERSION,
- "description": DESCRIPTION_PARAMETER,
- "environment": [DEFAULT_ENVIRONMET_VALUE],
- },
- "run": {
- TESTDEF_STEPS_KEY: ["./mytest.sh"]
- },
- "parse": {
- "pattern": r'^\s*(?P<test_case_id>\w+)=(?P<result>\w+)\s*$'
- }
-}
=== removed directory 'lava/testdef/tests'
=== removed file 'lava/testdef/tests/__init__.py'
=== removed file 'lava/testdef/tests/test_commands.py'
@@ -1,158 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Tests for lava.testdef.commands.
-"""
-
-import os
-import tempfile
-import yaml
-
-from mock import (
- MagicMock,
- patch,
-)
-
-from lava.config import InteractiveCache
-from lava.helper.tests.helper_test import HelperTest
-from lava.testdef.commands import (
- new,
-)
-from lava.tool.errors import CommandError
-
-
-class NewCommandTest(HelperTest):
- """Class for the lava.testdef new command tests."""
-
- def setUp(self):
- super(NewCommandTest, self).setUp()
- self.file_name = "fake_testdef.yaml"
- self.file_path = os.path.join(tempfile.gettempdir(), self.file_name)
- self.args.FILE = self.file_path
-
- self.temp_yaml = tempfile.NamedTemporaryFile(suffix=".yaml",
- delete=False)
-
- self.config_file = tempfile.NamedTemporaryFile(delete=False)
- self.config = InteractiveCache()
- self.config.save = MagicMock()
- self.config.config_file = self.config_file.name
- # Patch class raw_input, start it, and stop it on tearDown.
- self.patcher1 = patch("lava.parameter.raw_input", create=True)
- self.mocked_raw_input = self.patcher1.start()
-
- def tearDown(self):
- super(NewCommandTest, self).tearDown()
- if os.path.isfile(self.file_path):
- os.unlink(self.file_path)
- os.unlink(self.config_file.name)
- os.unlink(self.temp_yaml.name)
- self.patcher1.stop()
-
- def test_register_arguments(self):
- # Make sure that the parser add_argument is called and we have the
- # correct argument.
- new_command = new(self.parser, self.args)
- new_command.register_arguments(self.parser)
-
- # Make sure we do not forget about this test.
- self.assertEqual(2, len(self.parser.method_calls))
-
- _, args, _ = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- _, args, _ = self.parser.method_calls[1]
- self.assertIn("FILE", args)
-
- def test_invoke_0(self):
- # Test that passing a file on the command line, it is created on the
- # file system.
- self.mocked_raw_input.return_value = "\n"
- new_command = new(self.parser, self.args)
- new_command.config = self.config
- new_command.invoke()
- self.assertTrue(os.path.exists(self.file_path))
-
- def test_invoke_1(self):
- # Test that when passing an already existing file, an exception is
- # thrown.
- self.args.FILE = self.temp_yaml.name
- new_command = new(self.parser, self.args)
- new_command.config = self.config
- self.assertRaises(CommandError, new_command.invoke)
-
- def test_invoke_2(self):
- # Tests that when adding a new test definition and writing it to file
- # a correct YAML structure is created.
- self.mocked_raw_input.return_value = "\n"
- new_command = new(self.parser, self.args)
- new_command.config = self.config
- new_command.invoke()
- expected = {'run': {'steps': ["./mytest.sh"]},
- 'metadata': {
- 'environment': ['lava_test_shell'],
- 'format': 'Lava-Test Test Definition 1.0',
- 'version': '1.0',
- 'description': '',
- 'name': ''},
- 'parse': {
- 'pattern':
- '^\\s*(?P<test_case_id>\\w+)=(?P<result>\\w+)\\s*$'
- },
- }
- obtained = None
- with open(self.file_path, 'r') as read_file:
- obtained = yaml.load(read_file)
- self.assertEqual(expected, obtained)
-
- def test_invoke_3(self):
- # Tests that when adding a new test definition and writing it to a file
- # in a directory withour permissions, exception is raised.
- self.args.FILE = "/test_file.yaml"
- self.mocked_raw_input.return_value = "\n"
- new_command = new(self.parser, self.args)
- new_command.config = self.config
- self.assertRaises(CommandError, new_command.invoke)
- self.assertFalse(os.path.exists(self.args.FILE))
-
- def test_invoke_4(self):
- # Tests that when passing values for the "steps" ListParameter, we get
- # back the correct data structure.
- self.mocked_raw_input.side_effect = ["foo", "\n", "\n", "\n", "\n",
- "\n"]
- new_command = new(self.parser, self.args)
- new_command.config = self.config
- new_command.invoke()
- expected = {'run': {'steps': ["./mytest.sh"]},
- 'metadata': {
- 'environment': ['lava_test_shell'],
- 'format': 'Lava-Test Test Definition 1.0',
- 'version': '1.0',
- 'description': '',
- 'name': 'foo'
- },
- 'parse': {
- 'pattern':
- '^\\s*(?P<test_case_id>\\w+)=(?P<result>\\w+)\\s*$'
- },
- }
- obtained = None
- with open(self.file_path, 'r') as read_file:
- obtained = yaml.load(read_file)
- self.assertEqual(expected, obtained)
=== removed directory 'lava/tests'
=== removed file 'lava/tests/__init__.py'
=== removed file 'lava/tests/test_commands.py'
@@ -1,127 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Tests for lava.commands.
-"""
-
-import os
-import tempfile
-
-from mock import (
- MagicMock,
- patch
-)
-
-from lava.commands import (
- init,
- submit,
-)
-from lava.config import Config
-from lava.helper.tests.helper_test import HelperTest
-from lava.tool.errors import CommandError
-
-
-class InitCommandTests(HelperTest):
-
- def setUp(self):
- super(InitCommandTests, self).setUp()
- self.config_file = self.tmp("init_command_tests")
- self.config = Config()
- self.config.config_file = self.config_file
-
- def tearDown(self):
- super(InitCommandTests, self).tearDown()
- if os.path.isfile(self.config_file):
- os.unlink(self.config_file)
-
- def test_register_arguments(self):
- self.args.DIR = os.path.join(tempfile.gettempdir(), "a_fake_dir")
- init_command = init(self.parser, self.args)
- init_command.register_arguments(self.parser)
-
- # Make sure we do not forget about this test.
- self.assertEqual(2, len(self.parser.method_calls))
-
- _, args, _ = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- _, args, _ = self.parser.method_calls[1]
- self.assertIn("DIR", args)
-
- @patch("lava.commands.edit_file", create=True)
- def test_command_invoke_0(self, mocked_edit_file):
- # Invoke the init command passing a path to a file. Should raise an
- # exception.
- self.args.DIR = self.temp_file.name
- init_command = init(self.parser, self.args)
- self.assertRaises(CommandError, init_command.invoke)
-
- def test_command_invoke_2(self):
- # Invoke the init command passing a path where the user cannot write.
- try:
- self.args.DIR = "/root/a_temp_dir"
- init_command = init(self.parser, self.args)
- self.assertRaises(CommandError, init_command.invoke)
- finally:
- if os.path.exists(self.args.DIR):
- os.removedirs(self.args.DIR)
-
- def test_update_data(self):
- # Make sure the template is updated accordingly with the provided data.
- self.args.DIR = self.temp_file.name
-
- init_command = init(self.parser, self.args)
- init_command.config.get = MagicMock()
- init_command.config.save = MagicMock()
- init_command.config.get.side_effect = ["a_job.json"]
-
- expected = {
- "jobfile": "a_job.json",
- }
-
- obtained = init_command._update_data()
- self.assertEqual(expected, obtained)
-
-
-class SubmitCommandTests(HelperTest):
- def setUp(self):
- super(SubmitCommandTests, self).setUp()
- self.config_file = self.tmp("submit_command_tests")
- self.config = Config()
- self.config.config_file = self.config_file
- self.config.save = MagicMock()
-
- def tearDown(self):
- super(SubmitCommandTests, self).tearDown()
- if os.path.isfile(self.config_file):
- os.unlink(self.config_file)
-
- def test_register_arguments(self):
- self.args.JOB = os.path.join(tempfile.gettempdir(), "a_fake_file")
- submit_command = submit(self.parser, self.args)
- submit_command.register_arguments(self.parser)
-
- # Make sure we do not forget about this test.
- self.assertEqual(2, len(self.parser.method_calls))
-
- _, args, _ = self.parser.method_calls[0]
- self.assertIn("--non-interactive", args)
-
- _, args, _ = self.parser.method_calls[1]
- self.assertIn("JOB", args)
=== removed file 'lava/tests/test_config.py'
@@ -1,320 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-lava.config unit tests.
-"""
-
-import os
-import shutil
-import sys
-import tempfile
-
-from StringIO import StringIO
-from mock import (
- MagicMock,
- call,
- patch,
-)
-
-from lava.config import (
- Config,
- InteractiveCache,
- InteractiveConfig,
-)
-from lava.helper.tests.helper_test import HelperTest
-from lava.parameter import (
- Parameter,
- ListParameter,
-)
-from lava.tool.errors import CommandError
-
-
-class ConfigTestCase(HelperTest):
- """General test case class for the different Config classes."""
- def setUp(self):
- super(ConfigTestCase, self).setUp()
- self.param1 = Parameter("foo")
- self.param2 = Parameter("bar", depends=self.param1)
-
-
-class TestConfigSave(ConfigTestCase):
-
- """Used to test the save() method of config class.
-
- Done here since in the other tests we want to mock the atexit save call
- in order not to write the file, or accidentaly overwrite the real
- user file.
- """
-
- def setUp(self):
- super(TestConfigSave, self).setUp()
- self.config = Config()
- self.config.config_file = self.temp_file.name
-
- def test_config_save(self):
- self.config.put_parameter(self.param1, "foo")
- self.config.save()
-
- expected = "[DEFAULT]\nfoo = foo\n\n"
- obtained = ""
- with open(self.temp_file.name) as tmp_file:
- obtained = tmp_file.read()
- self.assertEqual(expected, obtained)
-
- def test_save_list_param(self):
- # Tests that when saved to file, the ListParameter parameter is stored
- # correctly.
- param_values = ["foo", "more than one words", "bar"]
- list_param = ListParameter("list")
- list_param.set(param_values)
-
- self.config.put_parameter(list_param, param_values)
- self.config.save()
-
- expected = "[DEFAULT]\nlist = " + ",".join(param_values) + "\n\n"
- obtained = ""
- with open(self.temp_file.name, "r") as read_file:
- obtained = read_file.read()
- self.assertEqual(expected, obtained)
-
-
-class ConfigTest(ConfigTestCase):
-
- def setUp(self):
- super(ConfigTest, self).setUp()
-
- self.config_dir = os.path.join(tempfile.gettempdir(), "config")
- self.xdg_resource = os.path.join(self.config_dir, "linaro")
- self.lavatool_resource = os.path.join(self.xdg_resource, "lava-tool")
-
- os.makedirs(self.lavatool_resource)
-
- self.config = Config()
- self.config._ensure_xdg_dirs = MagicMock(
- return_value=self.lavatool_resource)
- self.config.save = MagicMock()
-
- def tearDown(self):
- super(ConfigTest, self).tearDown()
- if os.path.isdir(self.config_dir):
- shutil.rmtree(self.config_dir)
-
- def test_ensure_xdg_dirs(self):
- # Test that xdg can create the correct cache path, we remove it
- # at the end since we patch the default value.
- obtained = self.config._ensure_xdg_dirs()
- self.assertEquals(self.lavatool_resource, obtained)
-
- def test_config_file(self):
- expected = os.path.join(self.lavatool_resource, "lava-tool.ini")
- obtained = self.config.config_file
- self.assertEquals(expected, obtained)
-
- def test_config_put_in_cache_0(self):
- self.config._put_in_cache("key", "value", "section")
- self.assertEqual(self.config._cache["section"]["key"], "value")
-
- def test_config_get_from_cache_0(self):
- self.config._put_in_cache("key", "value", "section")
- obtained = self.config._get_from_cache(Parameter("key"), "section")
- self.assertEqual("value", obtained)
-
- def test_config_get_from_cache_1(self):
- self.config._put_in_cache("key", "value", "DEFAULT")
- obtained = self.config._get_from_cache(Parameter("key"), "DEFAULT")
- self.assertEqual("value", obtained)
-
- def test_config_put_0(self):
- # Puts a value in the DEFAULT section.
- self.config._put_in_cache = MagicMock()
- self.config.put("foo", "foo")
- expected = "foo"
- obtained = self.config._config_backend.get("DEFAULT", "foo")
- self.assertEqual(expected, obtained)
-
- def test_config_put_1(self):
- # Puts a value in a new section.
- self.config._put_in_cache = MagicMock()
- self.config.put("foo", "foo", "bar")
- expected = "foo"
- obtained = self.config._config_backend.get("bar", "foo")
- self.assertEqual(expected, obtained)
-
- def test_config_put_parameter_0(self):
- self.config._calculate_config_section = MagicMock(return_value="")
- self.assertRaises(CommandError, self.config.put_parameter, self.param1)
-
- @patch("lava.config.Config.put")
- def test_config_put_parameter_1(self, mocked_config_put):
- self.config._calculate_config_section = MagicMock(
- return_value="DEFAULT")
-
- self.param1.value = "bar"
- self.config.put_parameter(self.param1)
-
- self.assertEqual(mocked_config_put.mock_calls,
- [call("foo", "bar", "DEFAULT")])
-
- def test_config_get_0(self):
- # Tests that with a non existing parameter, it returns None.
- param = Parameter("baz")
- self.config._get_from_cache = MagicMock(return_value=None)
- self.config._calculate_config_section = MagicMock(
- return_value="DEFAULT")
-
- expected = None
- obtained = self.config.get(param)
- self.assertEqual(expected, obtained)
-
- def test_config_get_1(self):
- self.config.put_parameter(self.param1, "foo")
- self.config._get_from_cache = MagicMock(return_value=None)
- self.config._calculate_config_section = MagicMock(
- return_value="DEFAULT")
-
- expected = "foo"
- obtained = self.config.get(self.param1)
- self.assertEqual(expected, obtained)
-
- def test_calculate_config_section_0(self):
- expected = "DEFAULT"
- obtained = self.config._calculate_config_section(self.param1)
- self.assertEqual(expected, obtained)
-
- def test_calculate_config_section_1(self):
- self.config.put_parameter(self.param1, "foo")
- expected = "foo=foo"
- obtained = self.config._calculate_config_section(self.param2)
- self.assertEqual(expected, obtained)
-
- def test_config_get_from_backend_public(self):
- # Need to to this, since we want a clean Config instance, with
- # a config_file with some content.
- with open(self.config.config_file, "w") as write_config:
- write_config.write("[DEFAULT]\nfoo=bar\n")
- param = Parameter("foo")
- obtained = self.config.get_from_backend(param)
- self.assertEquals("bar", obtained)
-
-
-class InteractiveConfigTest(ConfigTestCase):
-
- def setUp(self):
- super(InteractiveConfigTest, self).setUp()
- self.config = InteractiveConfig()
- self.config.save = MagicMock()
- self.config.config_file = self.temp_file.name
-
- @patch("lava.config.Config.get", new=MagicMock(return_value=None))
- def test_non_interactive_config_0(self):
- # Try to get a value that does not exists, users just press enter when
- # asked for a value. Value will be empty.
- self.config.force_interactive = False
- sys.stdin = StringIO("\n")
- value = self.config.get(Parameter("foo"))
- self.assertEqual("", value)
-
- @patch("lava.config.Config.get", new=MagicMock(return_value="value"))
- def test_non_interactive_config_1(self):
- # Parent class config returns value, but we are not interactive.
- self.config.force_interactive = False
- value = self.config.get(Parameter("foo"))
- self.assertEqual("value", value)
-
- @patch("lava.config.Config.get", new=MagicMock(return_value=None))
- def test_non_interactive_config_2(self):
- self.config.force_interactive = False
- expected = "bar"
- sys.stdin = StringIO(expected)
- value = self.config.get(Parameter("foo"))
- self.assertEqual(expected, value)
-
- @patch("lava.config.Config.get", new=MagicMock(return_value="value"))
- def test_interactive_config_0(self):
- # We force to be interactive, meaning that even if a value is found,
- # it will be asked anyway.
- self.config.force_interactive = True
- expected = "a_new_value"
- sys.stdin = StringIO(expected)
- value = self.config.get(Parameter("foo"))
- self.assertEqual(expected, value)
-
- @patch("lava.config.Config.get", new=MagicMock(return_value="value"))
- def test_interactive_config_1(self):
- # Force to be interactive, but when asked for the new value press
- # Enter. The old value should be returned.
- self.config.force_interactive = True
- sys.stdin = StringIO("\n")
- value = self.config.get(Parameter("foo"))
- self.assertEqual("value", value)
-
- def test_calculate_config_section_0(self):
- self.config.force_interactive = True
- obtained = self.config._calculate_config_section(self.param1)
- expected = "DEFAULT"
- self.assertEqual(expected, obtained)
-
- def test_calculate_config_section_1(self):
- self.param1.set("foo")
- self.param2.depends.asked = True
- self.config.force_interactive = True
- obtained = self.config._calculate_config_section(self.param2)
- expected = "foo=foo"
- self.assertEqual(expected, obtained)
-
- def test_calculate_config_section_2(self):
- self.config.force_interactive = True
- self.config.config_backend.get = MagicMock(return_value=None)
- sys.stdin = StringIO("baz")
- expected = "foo=baz"
- obtained = self.config._calculate_config_section(self.param2)
- self.assertEqual(expected, obtained)
-
- def test_calculate_config_section_3(self):
- # Tests that when a parameter has its value in the cache and also on
- # file, we honor the cached version.
- self.param1.set("bar")
- self.param2.depends.asked = True
- self.config.force_interactive = True
- expected = "foo=bar"
- obtained = self.config._calculate_config_section(self.param2)
- self.assertEqual(expected, obtained)
-
- @patch("lava.config.Config.get", new=MagicMock(return_value=None))
- @patch("lava.parameter.sys.exit")
- @patch("lava.parameter.raw_input", create=True)
- def test_interactive_config_exit(self, mocked_raw, mocked_sys_exit):
- self.config._calculate_config_section = MagicMock(
- return_value="DEFAULT")
-
- mocked_raw.side_effect = KeyboardInterrupt()
-
- self.config.force_interactive = True
- self.config.get(self.param1)
- self.assertTrue(mocked_sys_exit.called)
-
- @patch("lava.parameter.raw_input", create=True)
- def test_interactive_config_with_list_parameter(self, mocked_raw_input):
- # Tests that we get a list back in the Config class when using
- # ListParameter and that it contains the expected values.
- expected = ["foo", "bar"]
- mocked_raw_input.side_effect = expected + ["\n"]
- obtained = self.config.get(ListParameter("list"))
- self.assertIsInstance(obtained, list)
- self.assertEqual(expected, obtained)
=== removed file 'lava/tests/test_parameter.py'
@@ -1,206 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-lava.parameter unit tests.
-"""
-
-from mock import patch
-
-from lava.helper.tests.helper_test import HelperTest
-from lava.parameter import (
- ListParameter,
- Parameter,
- SingleChoiceParameter,
-)
-from lava_tool.utils import to_list
-
-
-class GeneralParameterTest(HelperTest):
- """General class with setUp and tearDown methods for Parameter tests."""
- def setUp(self):
- super(GeneralParameterTest, self).setUp()
- # Patch class raw_input, start it, and stop it on tearDown.
- self.patcher1 = patch("lava.parameter.raw_input", create=True)
- self.mocked_raw_input = self.patcher1.start()
-
- def tearDown(self):
- super(GeneralParameterTest, self).tearDown()
- self.patcher1.stop()
-
-
-class ParameterTest(GeneralParameterTest):
- """Tests for the Parameter class."""
-
- def setUp(self):
- super(ParameterTest, self).setUp()
- self.parameter1 = Parameter("foo", value="baz")
-
- def test_prompt_0(self):
- # Tests that when we have a value in the parameters and the user press
- # Enter, we get the old value back.
- self.mocked_raw_input.return_value = "\n"
- obtained = self.parameter1.prompt()
- self.assertEqual(self.parameter1.value, obtained)
-
- def test_prompt_1(self,):
- # Tests that with a value stored in the parameter, if and EOFError is
- # raised when getting user input, we get back the old value.
- self.mocked_raw_input.side_effect = EOFError()
- obtained = self.parameter1.prompt()
- self.assertEqual(self.parameter1.value, obtained)
-
- def test_to_list_0(self):
- value = "a_value"
- expected = [value]
- obtained = to_list(value)
- self.assertIsInstance(obtained, list)
- self.assertEquals(expected, obtained)
-
- def test_to_list_1(self):
- expected = ["a_value", "b_value"]
- obtained = to_list(expected)
- self.assertIsInstance(obtained, list)
- self.assertEquals(expected, obtained)
-
-
-class ListParameterTest(GeneralParameterTest):
-
- """Tests for the specialized ListParameter class."""
-
- def setUp(self):
- super(ListParameterTest, self).setUp()
- self.list_parameter = ListParameter("list")
-
- def test_prompt_0(self):
- # Test that when pressing Enter, the prompt stops and the list is
- # returned.
- expected = []
- self.mocked_raw_input.return_value = "\n"
- obtained = self.list_parameter.prompt()
- self.assertEqual(expected, obtained)
-
- def test_prompt_1(self):
- # Tests that when passing 3 values, a list with those values
- # is returned
- expected = ["foo", "bar", "foobar"]
- self.mocked_raw_input.side_effect = expected + ["\n"]
- obtained = self.list_parameter.prompt()
- self.assertEqual(expected, obtained)
-
- def test_serialize_0(self):
- # Tests the serialize method of ListParameter passing a list.
- expected = "foo,bar,baz,1"
- to_serialize = ["foo", "bar", "baz", "", 1]
-
- obtained = self.list_parameter.serialize(to_serialize)
- self.assertEqual(expected, obtained)
-
- def test_serialize_1(self):
- # Tests the serialize method of ListParameter passing an int.
- expected = "1"
- to_serialize = 1
-
- obtained = self.list_parameter.serialize(to_serialize)
- self.assertEqual(expected, obtained)
-
- def test_deserialize_0(self):
- # Tests the deserialize method of ListParameter with a string
- # of values.
- expected = ["foo", "bar", "baz"]
- to_deserialize = "foo,bar,,baz,"
- obtained = self.list_parameter.deserialize(to_deserialize)
- self.assertEqual(expected, obtained)
-
- def test_deserialize_1(self):
- # Tests the deserialization method of ListParameter passing a list.
- expected = ["foo", 1, "", "bar"]
- obtained = self.list_parameter.deserialize(expected)
- self.assertEqual(expected, obtained)
-
- def test_set_value_0(self):
- # Pass a string to a ListParameter, expect a list.
- set_value = "foo"
- expected = [set_value]
- self.list_parameter.set(set_value)
- self.assertEquals(expected, self.list_parameter.value)
-
- def test_set_value_1(self):
- # Pass a list to a ListParameter, expect the same list.
- expected = ["foo", "bar"]
- self.list_parameter.set(expected)
- self.assertEquals(expected, self.list_parameter.value)
-
- def test_add_value_0(self):
- # Add a value to a ListParameter, expect a list back.
- add_value = "foo"
- expected = [add_value]
- self.list_parameter.add(add_value)
- self.assertEquals(expected, self.list_parameter.value)
-
- def test_add_value_1(self):
- # Add a list value to a ListParameter with already a value set, expect
- # a list with both values.
- # The ListParameter is initialized with a string.
- add_value = ["foo"]
- list_param = ListParameter("list", value="bar")
- expected = ["bar", "foo"]
- list_param.add(add_value)
- self.assertEquals(expected, list_param.value)
-
- def test_add_value_2(self):
- # Add a list value to a ListParameter with already a value set, expect
- # a list with both values.
- # The ListParameter is initialized with a list.
- add_value = ["foo"]
- list_param = ListParameter("list", value=["bar", "baz"])
- expected = ["bar", "baz", "foo"]
- list_param.add(add_value)
- self.assertEquals(expected, list_param.value)
-
-
-class TestsSingleChoiceParameter(GeneralParameterTest):
-
- def setUp(self):
- super(TestsSingleChoiceParameter, self).setUp()
- self.choices = ["foo", "bar", "baz", "bam"]
- self.param_id = "single_choice"
- self.single_choice_param = SingleChoiceParameter(self.param_id,
- self.choices)
-
- def test_with_old_value(self):
- # There is an old value for a single choice parameter, the user
- # is prompted to select from the list of values, but she presses
- # enter. The old value is returned.
- old_value = "bat"
- self.mocked_raw_input.side_effect = ["\n"]
- obtained = self.single_choice_param.prompt("", old_value=old_value)
- self.assertEquals(old_value, obtained)
-
- def test_without_old_value(self):
- # There is no old value, user just select the first choice.
- self.mocked_raw_input.side_effect = ["1"]
- obtained = self.single_choice_param.prompt("")
- self.assertEquals("foo", obtained)
-
- def test_with_wrong_user_input(self):
- # No old value, user inserts at least two wrong choices, and the select
- # the third one.
- self.mocked_raw_input.side_effect = ["1000", "0", "3"]
- obtained = self.single_choice_param.prompt("")
- self.assertEquals("baz", obtained)
=== removed directory 'lava/tool'
=== removed file 'lava/tool/__init__.py'
@@ -1,27 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-lava.tool
-=========
-
-Generic code for command line utilities for LAVA
-"""
-
-__version__ = (0, 7, 1, "final", 0)
=== removed file 'lava/tool/command.py'
@@ -1,166 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Interface for all lava-tool commands
-"""
-
-import inspect
-
-
-class Command(object):
- """
- Base class for all command line tool sub-commands.
- """
-
- def __init__(self, parser, args):
- """
- Prepare instance for executing commands.
-
- This method is called immediately after all arguments are parsed and
- results are available. This gives subclasses a chance to configure
- themselves. The provided parser is an instance of
- argparse.ArgumentParser but it may not be the top-level parser (it will
- be a parser specific for this command)
-
- The default implementation stores both arguments as instance
- attributes.
- """
- self.parser = parser
- self.args = args
-
- def say(self, message, *args, **kwargs):
- """
- Handy wrapper for print + format
- """
- self.args.dispatcher.say(self, message, *args, **kwargs)
-
- def invoke(self):
- """
- Invoke command action.
- """
- raise NotImplementedError()
-
- def reparse_arguments(self, parser, raw_args):
- """
- Re-parse raw arguments into normal arguments
-
- Parser is the same as in register_arguments (a sub-parser) The true,
- topmost parser is in self.parser.
-
- This method is only needed for specific commands that need to peek at
- the arguments before being able to truly redefine the parser and
- re-parse the raw arguments again.
- """
- raise NotImplementedError()
-
- @classmethod
- def get_name(cls):
- """
- Return the name of this command.
-
- The default implementation strips any leading underscores and replaces
- all other underscores with dashes.
- """
- return cls.__name__.lstrip("_").replace("_", "-")
-
- @classmethod
- def get_help(cls):
- """
- Return the help message of this command
- """
- doc = inspect.getdoc(cls)
- if doc is not None and "" in doc:
- doc = doc[:doc.index("")].rstrip()
- return doc
-
- @classmethod
- def get_epilog(cls):
- """
- Return the epilog of the help message
- """
- doc = inspect.getdoc(cls)
- if doc is not None and "" in doc:
- doc = doc[doc.index("") + 1:].lstrip()
- else:
- doc = None
- return doc
-
- @classmethod
- def register_arguments(cls, parser):
- """
- Register arguments if required.
-
- Subclasses can override this to add any arguments that will be
- exposed to the command line interface.
- """
- pass
-
-
-class CommandGroup(Command):
- """
- Base class for all command sub-command hubs.
-
- This class is needed when one wants to get a custom level of command
- options that can be freely extended, just like the top-level lava-tool
- command.
-
- For example, a CommandGroup 'actions' will load additional commands from a
- the 'lava.actions' namespace. For the end user it will be available as::
-
- $ lava-tool foo actions xxx
-
- Where xxx is one of the Commands that is declared to live in the namespace
- provided by 'foo actions'.
- """
-
- namespace = None
-
- @classmethod
- def get_namespace(cls):
- """
- Return the pkg-resources entry point namespace name from which
- sub-commands will be loaded.
- """
- return cls.namespace
-
- @classmethod
- def register_subcommands(cls, parser):
- """
- Register sub commands.
-
- This method is called around the same time as register_arguments()
- would be called for the plain command classes. It loads commands from
- the entry point namespace returned by get_namespace() and registeres
- them with a Dispatcher class. The parsers used by that dispatcher
- are linked to the calling dispatcher parser so the new commands enrich
- the top-level parser tree.
-
- In addition, the provided parser stores a dispatcher instance in its
- defaults. This is useful when one wants to access it later. To a final
- command instance it shall be available as self.args.dispatcher.
- """
- from lava.tool.dispatcher import Dispatcher
- dispatcher = Dispatcher(parser, name=cls.get_name())
- namespace = cls.get_namespace()
- if namespace is not None:
- dispatcher.import_commands(namespace)
- parser.set_defaults(dispatcher=dispatcher)
-
-
-SubCommand = CommandGroup
=== removed directory 'lava/tool/commands'
=== removed file 'lava/tool/commands/__init__.py'
@@ -1,83 +0,0 @@
-# Copyright (C) 2010 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Package with command line commands
-"""
-
-import argparse
-import re
-
-
-class ExperimentalNoticeAction(argparse.Action):
- """
- Argparse action that implements the --experimental-notice
- """
-
- message = """
- Some lc-tool sub-commands are marked as EXPERIMENTAL. Those commands are
- not guaranteed to work identically, or have identical interface between
- subsequent lc-tool releases.
-
- We do that to make it possible to provide good user interface and
- server-side API when working on new features. Once a feature is stabilized
- the UI will be frozen and all subsequent changes will retain backwards
- compatibility.
- """
- message = message.lstrip()
- message = re.sub(re.compile("[ \t]+", re.M), " ", message)
- message = re.sub(re.compile("^ ", re.M), "", message)
-
- def __init__(self,
- option_strings, dest, default=None, required=False,
- help=None):
- super(ExperimentalNoticeAction, self).__init__(
- option_strings=option_strings, dest=dest, default=default, nargs=0,
- help=help)
-
- def __call__(self, parser, namespace, values, option_string=None):
- parser.exit(message=self.message)
-
-
-class ExperimentalCommandMixIn(object):
- """
- Experimental command.
-
- Prints a warning message on each call to invoke()
- """
-
- def invoke(self):
- self.print_experimental_notice()
- return super(ExperimentalCommandMixIn, self).invoke()
-
- @classmethod
- def register_arguments(cls, parser):
- retval = super(ExperimentalCommandMixIn,
- cls).register_arguments(parser)
- parser.register("action", "experimental_notice",
- ExperimentalNoticeAction)
- group = parser.add_argument_group("experimental commands")
- group.add_argument("--experimental-notice",
- action="experimental_notice",
- default=argparse.SUPPRESS,
- help="Explain the nature of experimental commands")
- return retval
-
- def print_experimental_notice(self):
- print ("EXPERIMENTAL - SUBJECT TO CHANGE"
- " (See --experimental-notice for more info)")
=== removed file 'lava/tool/commands/help.py'
@@ -1,35 +0,0 @@
-# Copyright (C) 2010 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-lava.tool.commands.help
-=======================
-
-Implementation of `lava help`
-"""
-
-from lava.tool.command import Command
-
-
-class help(Command):
- """
- Show a summary of all available commands
- """
-
- def invoke(self):
- self.parser.print_help()
=== removed file 'lava/tool/dispatcher.py'
@@ -1,158 +0,0 @@
-# Copyright (C) 2010, 2011, 2012 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Module with LavaDispatcher - the command dispatcher
-"""
-
-import argparse
-import argcomplete
-import logging
-import pkg_resources
-import sys
-
-from lava.tool.errors import CommandError
-
-
-class Dispatcher(object):
- """
- Class implementing command line interface for launch control
- """
-
- description = None
- epilog = None
-
- def __init__(self, parser=None, name=None):
- self.parser = parser or self.construct_parser()
- self.subparsers = self.parser.add_subparsers(
- title="Sub-command to invoke")
- self.name = name
-
- def __repr__(self):
- return "%r(name=%r)" % (self.__class__.__name__, self.name)
-
- @classmethod
- def construct_parser(cls):
- """
- Construct a parser for this dispatcher.
-
- This is only used if the parser is not provided by the parent
- dispatcher instance.
- """
- parser_args = dict(add_help=True)
- # Set description based on class description
- if cls.description is not None:
- parser_args['description'] = cls.description
- # Set the epilog based on class epilog
- if cls.epilog is not None:
- parser_args['epilog'] = cls.epilog
- # Return the fresh parser
- return argparse.ArgumentParser(**parser_args)
-
- def import_commands(self, entrypoint_name):
- """
- Import commands from given entry point namespace
- """
- logging.debug("Loading commands in entry point %r", entrypoint_name)
- for entrypoint in pkg_resources.iter_entry_points(entrypoint_name):
- try:
- command_cls = entrypoint.load()
- except (ImportError, pkg_resources.DistributionNotFound) as exc:
- logging.exception("Unable to load command: %s", entrypoint.name)
- else:
- self.add_command_cls(command_cls)
-
- def add_command_cls(self, command_cls):
- """
- Add a new command class to this dispatcher.
-
- The command must be a subclass of Command or SubCommand.
- """
- logging.debug("Loading command class %r", command_cls)
- # Create a sub-parser where the command/sub-command can register
- # things.
- sub_parser = self.subparsers.add_parser(
- command_cls.get_name(),
- help=command_cls.get_help(),
- epilog=command_cls.get_epilog())
- from lava.tool.command import CommandGroup
- if issubclass(command_cls, CommandGroup):
- # Handle CommandGroup somewhat different. Instead of calling
- # register_arguments we call register_subcommands
- command_cls.register_subcommands(sub_parser)
- # Let's also call register arguments in case we need both
- command_cls.register_arguments(sub_parser)
- else:
- # Handle plain commands by recording their commands in the
- # dedicated sub-parser we've crated for them.
- command_cls.register_arguments(sub_parser)
- # In addition, since we don't want to require all sub-classes of
- # Command to super-call register_arguments (everyone would forget
- # this anyway) we manually register the command class for that
- # sub-parser so that dispatch() can look it up later.
- sub_parser.set_defaults(
- command_cls=command_cls,
- parser=sub_parser)
- # Make sure the sub-parser knows about this dispatcher
- sub_parser.set_defaults(dispatcher=self)
-
- def _adjust_logging_level(self, args):
- """
- Adjust logging level after seeing the initial arguments
- """
-
- def dispatch(self, raw_args=None):
- """
- Dispatch a command with the specified arguments.
-
- If arguments are left out they are looked up in sys.argv automatically
- """
- # Before anything, hook into the bash completion
- argcomplete.autocomplete(self.parser)
- # First parse whatever input arguments we've got
- args = self.parser.parse_args(raw_args)
- # Adjust logging level after seeing arguments
- self._adjust_logging_level(args)
- # Then look up the command class and construct it with the parser it
- # belongs to and the parsed arguments.
- command = args.command_cls(args.parser, args)
- try:
- # Give the command a chance to re-parse command line arguments
- command.reparse_arguments(args.parser, raw_args)
- except NotImplementedError:
- pass
- try:
- return command.invoke()
- except CommandError as ex:
- print >> sys.stderr, "ERROR: %s" % (ex,)
- return 1
-
- @classmethod
- def run(cls, args=None):
- """
- Dispatch commandsd and exit
- """
- raise SystemExit(cls().dispatch(args))
-
- def say(self, command, message, *args, **kwargs):
- """
- Handy wrapper for print + format
- """
- print "{0} >>> {1}".format(
- command.get_name(),
- message.format(*args, **kwargs))
=== removed file 'lava/tool/errors.py'
@@ -1,31 +0,0 @@
-# Copyright (C) 2010, 2011, 2012 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-lava.tool.errors
-================
-
-Error classes for LAVA Tool.
-"""
-
-class CommandError(Exception):
- """
- Raise this from a Command's invoke() method to display an error nicely.
-
- lava-tool will exit with a status of 1 if this is raised.
- """
=== removed file 'lava/tool/main.py'
@@ -1,132 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-lava.tool.main
-==============
-
-Implementation of the `lava` shell command.
-"""
-
-import logging
-import sys
-
-from lava.tool.dispatcher import Dispatcher
-
-
-class LavaDispatcher(Dispatcher):
- """
- Dispatcher implementing the `lava` shell command
-
- This dispatcher imports plugins from `lava.commands` pkg_resources
- namespace. Additional plugins can be registered as either
- :class:`lava.command.Command` or :class:`lava.command.SubCommand`
- sub-classes.
- """
-
- def __init__(self):
- # Call this early so that we don't get logging.basicConfig
- # being called by accident. Otherwise we'd have to
- # purge all loggers from the root logger and that sucks
- self.setup_logging()
- # Initialize the base dispatcher
- super(LavaDispatcher, self).__init__()
- # And import the non-flat namespace commands
- self.import_commands('lava.commands')
-
- @classmethod
- def construct_parser(cls):
- """
- Construct a parser for this dispatcher.
-
- This is only used if the parser is not provided by the parent
- dispatcher instance.
- """
- # Construct a basic parser
- parser = super(LavaDispatcher, cls).construct_parser()
- # Add the --verbose flag
- parser.add_argument(
- "-v", "--verbose",
- default=False,
- action="store_true",
- help="Be more verbose (displays more messages globally)")
- # Add the --debug flag
- parser.add_argument(
- "-D", "--debug",
- action="store_true",
- default=False,
- help="Enable debugging on all loggers")
- # Add the --trace flag
- parser.add_argument(
- "-T", "--trace",
- action="append",
- default=[],
- help="Enable debugging of the specified logger, can be specified multiple times")
- # Return the improved parser
- return parser
-
- def setup_logging(self):
- """
- Setup logging for the root dispatcher
- """
- # Enable warning/error message handler
- class OnlyProblemsFilter(logging.Filterer):
- def filter(self, record):
- if record.levelno >= logging.WARN:
- return 1
- return 0
- err_handler = logging.StreamHandler(sys.stderr)
- err_handler.setLevel(logging.WARN)
- err_handler.setFormatter(
- logging.Formatter("%(levelname)s: %(message)s"))
- err_handler.addFilter(OnlyProblemsFilter())
- logging.getLogger().addHandler(err_handler)
- # Enable the debug handler
- class DebugFilter(logging.Filter):
- def filter(self, record):
- if record.levelno == logging.DEBUG:
- return 1
- return 0
- dbg_handler = logging.StreamHandler(sys.stderr)
- dbg_handler.setLevel(logging.DEBUG)
- dbg_handler.setFormatter(
- logging.Formatter("%(levelname)s %(name)s: %(message)s"))
- dbg_handler.addFilter(DebugFilter())
- logging.getLogger().addHandler(dbg_handler)
-
- def _adjust_logging_level(self, args):
- # Enable verbose message handler
- if args.verbose:
- logging.getLogger().setLevel(logging.INFO)
- class OnlyInfoFilter(logging.Filterer):
- def filter(self, record):
- if record.levelno == logging.INFO:
- return 1
- return 0
- msg_handler = logging.StreamHandler(sys.stdout)
- msg_handler.setLevel(logging.INFO)
- msg_handler.setFormatter(
- logging.Formatter("%(message)s"))
- msg_handler.addFilter(OnlyInfoFilter())
- logging.getLogger().addHandler(msg_handler)
- # Enable debugging
- if args.debug:
- logging.getLogger().setLevel(logging.DEBUG)
- # Enable trace loggers
- for name in args.trace:
- logging.getLogger(name).setLevel(logging.DEBUG)
=== removed directory 'lava_dashboard_tool'
=== removed file 'lava_dashboard_tool/__init__.py'
@@ -1,23 +0,0 @@
-# Copyright (C) 2010,2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-dashboard-tool.
-#
-# lava-dashboard-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-dashboard-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-dashboard-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Launch Control Tool package
-"""
-
-__version__ = (0, 8, 0, "dev", 0)
=== removed file 'lava_dashboard_tool/commands.py'
@@ -1,981 +0,0 @@
-# Copyright (C) 2010,2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-dashboard-tool.
-#
-# lava-dashboard-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-dashboard-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-dashboard-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Module with command-line tool commands that interact with the dashboard
-server. All commands listed here should have counterparts in the
-launch_control.dashboard_app.xml_rpc package.
-"""
-
-import argparse
-import contextlib
-import errno
-import os
-import re
-import socket
-import sys
-import urllib
-import urlparse
-import xmlrpclib
-
-import simplejson
-from json_schema_validator.extensions import datetime_extension
-
-from lava_tool.authtoken import AuthenticatingServerProxy, KeyringAuthBackend
-from lava.tool.commands import ExperimentalCommandMixIn
-from lava.tool.command import Command, CommandGroup
-
-
-class dashboard(CommandGroup):
- """
- Commands for interacting with LAVA Dashboard
- """
-
- namespace = "lava.dashboard.commands"
-
-
-class InsufficientServerVersion(Exception):
- """
- Exception raised when server version that a command interacts with is too
- old to support required features.
- """
- def __init__(self, server_version, required_version):
- self.server_version = server_version
- self.required_version = required_version
-
-
-class DataSetRenderer(object):
- """
- Support class for rendering a table out of list of dictionaries.
-
- It supports several features that make printing tabular data easier.
- * Automatic layout
- * Custom column headers
- * Custom cell formatting
- * Custom table captions
- * Custom column ordering
- * Custom Column separators
- * Custom dataset notification
-
- The primary method is render() which does all of the above. You
- need to pass a dataset argument which is a list of dictionaries.
- Each dictionary must have the same keys. In particular the first row
- is used to determine columns.
- """
- def __init__(self, column_map=None, row_formatter=None, empty=None,
- order=None, caption=None, separator=" ", header_separator=None):
- if column_map is None:
- column_map = {}
- if row_formatter is None:
- row_formatter = {}
- if empty is None:
- empty = "There is no data to display"
- self.column_map = column_map
- self.row_formatter = row_formatter
- self.empty = empty
- self.order = order
- self.separator = separator
- self.caption = caption
- self.header_separator = header_separator
-
- def _analyze_dataset(self, dataset):
- """
- Determine the columns that will be displayed and the maximum
- length of each of those columns.
-
- Returns a tuple (dataset, columms, maxlen) where columns is a
- list of column names and maxlen is a dictionary mapping from
- column name to maximum length of any value in the row or the
- column header and the dataset is a copy of the dataset altered
- as necessary.
-
- Some examples:
-
- First the dataset, an array of dictionaries
- >>> dataset = [
- ... {'a': 'shorter', 'bee': ''},
- ... {'a': 'little longer', 'bee': 'b'}]
-
- Note that column 'bee' is actually three characters long as the
- column name made it wider.
- >>> dataset_out, columns, maxlen = DataSetRenderer(
- ... )._analyze_dataset(dataset)
-
- Unless you format rows with a custom function the data is not altered.
- >>> dataset_out is dataset
- True
-
- Columns come out in sorted alphabetic order
- >>> columns
- ['a', 'bee']
-
- Maximum length determines the width of each column. Note that
- the header affects the column width.
- >>> maxlen
- {'a': 13, 'bee': 3}
-
- You can constrain or reorder columns. In that case columns you
- decided to ignore are simply left out of the output.
- >>> dataset_out, columns, maxlen = DataSetRenderer(
- ... order=['bee'])._analyze_dataset(dataset)
- >>> columns
- ['bee']
- >>> maxlen
- {'bee': 3}
-
- You can format values anyway you like:
- >>> dataset_out, columns, maxlen = DataSetRenderer(row_formatter={
- ... 'bee': lambda value: "%10s" % value}
- ... )._analyze_dataset(dataset)
-
- Dataset is altered to take account of the row formatting
- function. The original dataset argument is copied.
- >>> dataset_out
- [{'a': 'shorter', 'bee': ' '}, {'a': 'little longer', 'bee': ' b'}]
- >>> dataset_out is not dataset
- True
-
- Columns stay the same though:
- >>> columns
- ['a', 'bee']
-
- Note how formatting altered the width of the column 'bee'
- >>> maxlen
- {'a': 13, 'bee': 10}
-
- You can also format columns (with nice aliases).Note how
- column 'bee' maximum width is now dominated by the long column
- name:
- >>> dataset_out, columns, maxlen = DataSetRenderer(column_map={
- ... 'bee': "Column B"})._analyze_dataset(dataset)
- >>> maxlen
- {'a': 13, 'bee': 8}
- """
- if self.order:
- columns = self.order
- else:
- columns = sorted(dataset[0].keys())
- if self.row_formatter:
- dataset_out = [dict(row) for row in dataset]
- else:
- dataset_out = dataset
- for row in dataset_out:
- for column in row:
- if column in self.row_formatter:
- row[column] = self.row_formatter[column](row[column])
- maxlen = dict(
- [(column, max(
- len(self.column_map.get(column, column)),
- max([
- len(str(row[column])) for row in dataset_out])))
- for column in columns])
- return dataset_out, columns, maxlen
-
- def _render_header(self, dataset, columns, maxlen):
- """
- Render a header, possibly with a caption string
-
- Caption is controlled by the constructor.
- >>> dataset = [
- ... {'a': 'shorter', 'bee': ''},
- ... {'a': 'little longer', 'bee': 'b'}]
- >>> columns = ['a', 'bee']
- >>> maxlen = {'a': 13, 'bee': 3}
-
- By default there is no caption, just column names:
- >>> DataSetRenderer()._render_header(
- ... dataset, columns, maxlen)
- a bee
-
- If you enable the header separator then column names will be visually
- separated from the first row of data.
- >>> DataSetRenderer(header_separator=True)._render_header(
- ... dataset, columns, maxlen)
- a bee
- -----------------
-
- If you provide a caption it gets rendered as a centered
- underlined text before the data:
- >>> DataSetRenderer(caption="Dataset")._render_header(
- ... dataset, columns, maxlen)
- Dataset
- =================
- a bee
-
- You can use both caption and header separator
- >>> DataSetRenderer(caption="Dataset", header_separator=True)._render_header(
- ... dataset, columns, maxlen)
- Dataset
- =================
- a bee
- -----------------
-
- Observe how the total length of the output horizontal line
- depends on the separator! Also note the columns labels are
- aligned to the center of the column
- >>> DataSetRenderer(caption="Dataset", separator=" | ")._render_header(
- ... dataset, columns, maxlen)
- Dataset
- ===================
- a | bee
- """
- total_len = sum(maxlen.itervalues())
- if len(columns):
- total_len += len(self.separator) * (len(columns) - 1)
- # Print the caption
- if self.caption:
- print "{0:^{1}}".format(self.caption, total_len)
- print "=" * total_len
- # Now print the coulum names
- print self.separator.join([
- "{0:^{1}}".format(self.column_map.get(column, column),
- maxlen[column]) for column in columns])
- # Finally print the header separator
- if self.header_separator:
- print "-" * total_len
-
- def _render_rows(self, dataset, columns, maxlen):
- """
- Render rows of the dataset.
-
- Each row is printed on one line using the maxlen argument to
- determine correct column size. Text is aligned left.
-
- First the dataset, columns and maxlen as produced by
- _analyze_dataset()
- >>> dataset = [
- ... {'a': 'shorter', 'bee': ''},
- ... {'a': 'little longer', 'bee': 'b'}]
- >>> columns = ['a', 'bee']
- >>> maxlen = {'a': 13, 'bee': 3}
-
- Now a plain table. Note! To really understand this test
- you should check out the length of the strings below. There
- are two more spaces after 'b' in the second row
- >>> DataSetRenderer()._render_rows(dataset, columns, maxlen)
- shorter
- little longer b
- """
- for row in dataset:
- print self.separator.join([
- "{0!s:{1}}".format(row[column], maxlen[column])
- for column in columns])
-
- def _render_dataset(self, dataset):
- """
- Render the header followed by the rows of data.
- """
- dataset, columns, maxlen = self._analyze_dataset(dataset)
- self._render_header(dataset, columns, maxlen)
- self._render_rows(dataset, columns, maxlen)
-
- def _render_empty_dataset(self):
- """
- Render empty dataset.
-
- By default it just prints out a fixed sentence:
- >>> DataSetRenderer()._render_empty_dataset()
- There is no data to display
-
- This can be changed by passing an argument to the constructor
- >>> DataSetRenderer(empty="there is no data")._render_empty_dataset()
- there is no data
- """
- print self.empty
-
- def render(self, dataset):
- if len(dataset) > 0:
- self._render_dataset(dataset)
- else:
- self._render_empty_dataset()
-
-
-class XMLRPCCommand(Command):
- """
- Abstract base class for commands that interact with dashboard server
- over XML-RPC.
-
- The only difference is that you should implement invoke_remote()
- instead of invoke(). The provided implementation catches several
- socket and XML-RPC errors and prints a pretty error message.
- """
-
- @staticmethod
- def _construct_xml_rpc_url(url):
- """
- Construct URL to the XML-RPC service out of the given URL
- """
- parts = urlparse.urlsplit(url)
- if not parts.path.endswith("/RPC2/"):
- path = parts.path.rstrip("/") + "/xml-rpc/"
- else:
- path = parts.path
- return urlparse.urlunsplit(
- (parts.scheme, parts.netloc, path, "", ""))
-
- @staticmethod
- def _strict_server_version(version):
- """
- Calculate strict server version (as defined by
- distutils.version.StrictVersion). This works by discarding .candidate
- and .dev release-levels.
- >>> XMLRPCCommand._strict_server_version("0.4.0.candidate.5")
- '0.4.0'
- >>> XMLRPCCommand._strict_server_version("0.4.0.dev.126")
- '0.4.0'
- >>> XMLRPCCommand._strict_server_version("0.4.0.alpha.1")
- '0.4.0a1'
- >>> XMLRPCCommand._strict_server_version("0.4.0.beta.2")
- '0.4.0b2'
- """
- try:
- major, minor, micro, releaselevel, serial = version.split(".")
- except ValueError:
- raise ValueError(
- ("version %r does not follow pattern "
- "'major.minor.micro.releaselevel.serial'") % version)
- if releaselevel in ["dev", "candidate", "final"]:
- return "%s.%s.%s" % (major, minor, micro)
- elif releaselevel == "alpha":
- return "%s.%s.%sa%s" % (major, minor, micro, serial)
- elif releaselevel == "beta":
- return "%s.%s.%sb%s" % (major, minor, micro, serial)
- else:
- raise ValueError(
- ("releaselevel %r is not one of 'final', 'alpha', 'beta', "
- "'candidate' or 'final'") % releaselevel)
-
- def _check_server_version(self, server_obj, required_version):
- """
- Check that server object has is at least required_version.
-
- This method may raise InsufficientServerVersion.
- """
- from distutils.version import StrictVersion, LooseVersion
- # For backwards compatibility the server reports
- # major.minor.micro.releaselevel.serial which is not PEP-386 compliant
- server_version = StrictVersion(
- self._strict_server_version(server_obj.version()))
- required_version = StrictVersion(required_version)
- if server_version < required_version:
- raise InsufficientServerVersion(server_version, required_version)
-
- def __init__(self, parser, args):
- super(XMLRPCCommand, self).__init__(parser, args)
- xml_rpc_url = self._construct_xml_rpc_url(self.args.dashboard_url)
- self.server = AuthenticatingServerProxy(
- xml_rpc_url,
- verbose=args.verbose_xml_rpc,
- allow_none=True,
- use_datetime=True,
- auth_backend=KeyringAuthBackend())
-
- def use_non_legacy_api_if_possible(self, name='server'):
- # Legacy APIs are registered in top-level object, non-legacy APIs are
- # prefixed with extension name.
- if "dashboard.version" in getattr(self, name).system.listMethods():
- setattr(self, name, getattr(self, name).dashboard)
-
- @classmethod
- def register_arguments(cls, parser):
- dashboard_group = parser.add_argument_group("dashboard specific arguments")
- default_dashboard_url = os.getenv("DASHBOARD_URL")
- if default_dashboard_url:
- dashboard_group.add_argument("--dashboard-url",
- metavar="URL", help="URL of your validation dashboard (currently %(default)s)",
- default=default_dashboard_url)
- else:
- dashboard_group.add_argument("--dashboard-url", required=True,
- metavar="URL", help="URL of your validation dashboard")
- debug_group = parser.add_argument_group("debugging arguments")
- debug_group.add_argument("--verbose-xml-rpc",
- action="store_true", default=False,
- help="Show XML-RPC data")
- return dashboard_group
-
- @contextlib.contextmanager
- def safety_net(self):
- try:
- yield
- except socket.error as ex:
- print >> sys.stderr, "Unable to connect to server at %s" % (
- self.args.dashboard_url,)
- # It seems that some errors are reported as -errno
- # while others as +errno.
- ex.errno = abs(ex.errno)
- if ex.errno == errno.ECONNREFUSED:
- print >> sys.stderr, "Connection was refused."
- parts = urlparse.urlsplit(self.args.dashboard_url)
- if parts.netloc == "localhost:8000":
- print >> sys.stderr, "Perhaps the server is not running?"
- elif ex.errno == errno.ENOENT:
- print >> sys.stderr, "Unable to resolve address"
- else:
- print >> sys.stderr, "Socket %d: %s" % (ex.errno, ex.strerror)
- except xmlrpclib.ProtocolError as ex:
- print >> sys.stderr, "Unable to exchange XML-RPC message with dashboard server"
- print >> sys.stderr, "HTTP error code: %d/%s" % (ex.errcode, ex.errmsg)
- except xmlrpclib.Fault as ex:
- self.handle_xmlrpc_fault(ex.faultCode, ex.faultString)
- except InsufficientServerVersion as ex:
- print >> sys.stderr, ("This command requires at least server version "
- "%s, actual server version is %s" %
- (ex.required_version, ex.server_version))
-
- def invoke(self):
- with self.safety_net():
- self.use_non_legacy_api_if_possible()
- return self.invoke_remote()
-
- def handle_xmlrpc_fault(self, faultCode, faultString):
- if faultCode == 500:
- print >> sys.stderr, "Dashboard server has experienced internal error"
- print >> sys.stderr, faultString
- else:
- print >> sys.stderr, "XML-RPC error %d: %s" % (faultCode, faultString)
-
- def invoke_remote(self):
- raise NotImplementedError()
-
-
-class server_version(XMLRPCCommand):
- """
- Display dashboard server version
- """
-
- def invoke_remote(self):
- print "Dashboard server version: %s" % (self.server.version(),)
-
-
-class put(XMLRPCCommand):
- """
- Upload a bundle on the server
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(put, cls).register_arguments(parser)
- parser.add_argument("LOCAL",
- type=argparse.FileType("rb"),
- help="pathname on the local file system")
- parser.add_argument("REMOTE",
- default="/anonymous/", nargs='?',
- help="pathname on the server")
-
- def invoke_remote(self):
- content = self.args.LOCAL.read()
- filename = self.args.LOCAL.name
- pathname = self.args.REMOTE
- content_sha1 = self.server.put(content, filename, pathname)
- print "Stored as bundle {0}".format(content_sha1)
-
- def handle_xmlrpc_fault(self, faultCode, faultString):
- if faultCode == 404:
- print >> sys.stderr, "Bundle stream %s does not exist" % (
- self.args.REMOTE)
- elif faultCode == 409:
- print >> sys.stderr, "You have already uploaded this bundle to the dashboard"
- else:
- super(put, self).handle_xmlrpc_fault(faultCode, faultString)
-
-
-class get(XMLRPCCommand):
- """
- Download a bundle from the server
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(get, cls).register_arguments(parser)
- parser.add_argument("SHA1",
- type=str,
- help="SHA1 of the bundle to download")
- parser.add_argument("--overwrite",
- action="store_true",
- help="Overwrite files on the local disk")
- parser.add_argument("--output", "-o",
- type=argparse.FileType("wb"),
- default=None,
- help="Alternate name of the output file")
-
- def invoke_remote(self):
- response = self.server.get(self.args.SHA1)
- if self.args.output is None:
- filename = self.args.SHA1
- if os.path.exists(filename) and not self.args.overwrite:
- print >> sys.stderr, "File {filename!r} already exists".format(
- filename=filename)
- print >> sys.stderr, "You may pass --overwrite to write over it"
- return -1
- stream = open(filename, "wb")
- else:
- stream = self.args.output
- filename = self.args.output.name
- stream.write(response['content'])
- print "Downloaded bundle {0} to file {1!r}".format(
- self.args.SHA1, filename)
-
- def handle_xmlrpc_fault(self, faultCode, faultString):
- if faultCode == 404:
- print >> sys.stderr, "Bundle {sha1} does not exist".format(
- sha1=self.args.SHA1)
- else:
- super(get, self).handle_xmlrpc_fault(faultCode, faultString)
-
-
-class deserialize(XMLRPCCommand):
- """
- Deserialize a bundle on the server
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(deserialize, cls).register_arguments(parser)
- parser.add_argument("SHA1",
- type=str,
- help="SHA1 of the bundle to deserialize")
-
- def invoke_remote(self):
- response = self.server.deserialize(self.args.SHA1)
- print "Bundle {sha1} deserialized".format(
- sha1=self.args.SHA1)
-
- def handle_xmlrpc_fault(self, faultCode, faultString):
- if faultCode == 404:
- print >> sys.stderr, "Bundle {sha1} does not exist".format(
- sha1=self.args.SHA1)
- elif faultCode == 409:
- print >> sys.stderr, "Unable to deserialize bundle {sha1}".format(
- sha1=self.args.SHA1)
- print >> sys.stderr, faultString
- else:
- super(deserialize, self).handle_xmlrpc_fault(faultCode, faultString)
-
-
-def _get_pretty_renderer(**kwargs):
- if "separator" not in kwargs:
- kwargs["separator"] = " | "
- if "header_separator" not in kwargs:
- kwargs["header_separator"] = True
- return DataSetRenderer(**kwargs)
-
-
-class streams(XMLRPCCommand):
- """
- Show streams you have access to
- """
-
- renderer = _get_pretty_renderer(
- order=('pathname', 'bundle_count', 'name'),
- column_map={
- 'pathname': 'Pathname',
- 'bundle_count': 'Number of bundles',
- 'name': 'Name'},
- row_formatter={
- 'name': lambda name: name or "(not set)"},
- empty="There are no streams you can access on the server",
- caption="Bundle streams")
-
- def invoke_remote(self):
- self.renderer.render(self.server.streams())
-
-
-class bundles(XMLRPCCommand):
- """
- Show bundles in the specified stream
- """
-
- renderer = _get_pretty_renderer(
- column_map={
- 'uploaded_by': 'Uploader',
- 'uploaded_on': 'Upload date',
- 'content_filename': 'File name',
- 'content_sha1': 'SHA1',
- 'is_deserialized': "Deserialized?"},
- row_formatter={
- 'is_deserialized': lambda x: "yes" if x else "no",
- 'uploaded_by': lambda x: x or "(anonymous)",
- 'uploaded_on': lambda x: x},
- order=('content_sha1', 'content_filename', 'uploaded_by',
- 'uploaded_on', 'is_deserialized'),
- empty="There are no bundles in this stream",
- caption="Bundles",
- separator=" | ")
-
- @classmethod
- def register_arguments(cls, parser):
- super(bundles, cls).register_arguments(parser)
- parser.add_argument("PATHNAME",
- default="/anonymous/", nargs='?',
- help="pathname on the server (defaults to %(default)s)")
-
- def invoke_remote(self):
- self.renderer.render(self.server.bundles(self.args.PATHNAME))
-
- def handle_xmlrpc_fault(self, faultCode, faultString):
- if faultCode == 404:
- print >> sys.stderr, "Bundle stream %s does not exist" % (
- self.args.PATHNAME)
- else:
- super(bundles, self).handle_xmlrpc_fault(faultCode, faultString)
-
-
-class make_stream(XMLRPCCommand):
- """
- Create a bundle stream on the server
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(make_stream, cls).register_arguments(parser)
- parser.add_argument(
- "pathname",
- type=str,
- help="Pathname of the bundle stream to create")
- parser.add_argument(
- "--name",
- type=str,
- default="",
- help="Name of the bundle stream (description)")
-
- def invoke_remote(self):
- self._check_server_version(self.server, "0.3")
- pathname = self.server.make_stream(self.args.pathname, self.args.name)
- print "Bundle stream {pathname} created".format(pathname=pathname)
-
-
-class backup(XMLRPCCommand):
- """
- Backup data uploaded to a dashboard instance.
-
- Not all data is preserved. The following data is lost: identity of the user
- that uploaded each bundle, time of uploading and deserialization on the
- server, name of the bundle stream that contained the data
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(backup, cls).register_arguments(parser)
- parser.add_argument("BACKUP_DIR", type=str,
- help="Directory to backup to")
-
- def invoke_remote(self):
- if not os.path.exists(self.args.BACKUP_DIR):
- os.mkdir(self.args.BACKUP_DIR)
- for bundle_stream in self.server.streams():
- print "Processing stream %s" % bundle_stream["pathname"]
- bundle_stream_dir = os.path.join(self.args.BACKUP_DIR, urllib.quote_plus(bundle_stream["pathname"]))
- if not os.path.exists(bundle_stream_dir):
- os.mkdir(bundle_stream_dir)
- with open(os.path.join(bundle_stream_dir, "metadata.json"), "wt") as stream:
- simplejson.dump({
- "pathname": bundle_stream["pathname"],
- "name": bundle_stream["name"],
- "user": bundle_stream["user"],
- "group": bundle_stream["group"],
- }, stream)
- for bundle in self.server.bundles(bundle_stream["pathname"]):
- print " * Backing up bundle %s" % bundle["content_sha1"]
- data = self.server.get(bundle["content_sha1"])
- bundle_pathname = os.path.join(bundle_stream_dir, bundle["content_sha1"])
- # Note: we write bundles as binary data to preserve anything the user might have dumped on us
- with open(bundle_pathname + ".json", "wb") as stream:
- stream.write(data["content"])
- with open(bundle_pathname + ".metadata.json", "wt") as stream:
- simplejson.dump({
- "uploaded_by": bundle["uploaded_by"],
- "uploaded_on": datetime_extension.to_json(bundle["uploaded_on"]),
- "content_filename": bundle["content_filename"],
- "content_sha1": bundle["content_sha1"],
- "content_size": bundle["content_size"],
- }, stream)
-
-
-class restore(XMLRPCCommand):
- """
- Restore a dashboard instance from backup
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(restore, cls).register_arguments(parser)
- parser.add_argument("BACKUP_DIR", type=str,
- help="Directory to backup from")
-
- def invoke_remote(self):
- self._check_server_version(self.server, "0.3")
- for stream_pathname_quoted in os.listdir(self.args.BACKUP_DIR):
- filesystem_stream_pathname = os.path.join(self.args.BACKUP_DIR, stream_pathname_quoted)
- if not os.path.isdir(filesystem_stream_pathname):
- continue
- stream_pathname = urllib.unquote(stream_pathname_quoted)
- if os.path.exists(os.path.join(filesystem_stream_pathname, "metadata.json")):
- with open(os.path.join(filesystem_stream_pathname, "metadata.json"), "rt") as stream:
- stream_metadata = simplejson.load(stream)
- else:
- stream_metadata = {}
- print "Processing stream %s" % stream_pathname
- try:
- self.server.make_stream(stream_pathname, stream_metadata.get("name", "Restored from backup"))
- except xmlrpclib.Fault as ex:
- if ex.faultCode != 409:
- raise
- for content_sha1 in [item[:-len(".json")] for item in os.listdir(filesystem_stream_pathname) if item.endswith(".json") and not item.endswith(".metadata.json") and item != "metadata.json"]:
- filesystem_content_filename = os.path.join(filesystem_stream_pathname, content_sha1 + ".json")
- if not os.path.isfile(filesystem_content_filename):
- continue
- with open(os.path.join(filesystem_stream_pathname, content_sha1) + ".metadata.json", "rt") as stream:
- bundle_metadata = simplejson.load(stream)
- with open(filesystem_content_filename, "rb") as stream:
- content = stream.read()
- print " * Restoring bundle %s" % content_sha1
- try:
- self.server.put(content, bundle_metadata["content_filename"], stream_pathname)
- except xmlrpclib.Fault as ex:
- if ex.faultCode != 409:
- raise
-
-
-class pull(ExperimentalCommandMixIn, XMLRPCCommand):
- """
- Copy bundles and bundle streams from one dashboard to another.
-
- This command checks for two environment varialbes:
- The value of DASHBOARD_URL is used as a replacement for --dashbard-url.
- The value of REMOTE_DASHBOARD_URL as a replacement for FROM.
- Their presence automatically makes the corresponding argument optional.
- """
-
- def __init__(self, parser, args):
- super(pull, self).__init__(parser, args)
- remote_xml_rpc_url = self._construct_xml_rpc_url(self.args.FROM)
- self.remote_server = AuthenticatingServerProxy(
- remote_xml_rpc_url,
- verbose=args.verbose_xml_rpc,
- use_datetime=True,
- allow_none=True,
- auth_backend=KeyringAuthBackend())
- self.use_non_legacy_api_if_possible('remote_server')
-
- @classmethod
- def register_arguments(cls, parser):
- group = super(pull, cls).register_arguments(parser)
- default_remote_dashboard_url = os.getenv("REMOTE_DASHBOARD_URL")
- if default_remote_dashboard_url:
- group.add_argument(
- "FROM", nargs="?",
- help="URL of the remote validation dashboard (currently %(default)s)",
- default=default_remote_dashboard_url)
- else:
- group.add_argument(
- "FROM",
- help="URL of the remote validation dashboard)")
- group.add_argument("STREAM", nargs="*", help="Streams to pull from (all by default)")
-
- @staticmethod
- def _filesizeformat(num_bytes):
- """
- Formats the value like a 'human-readable' file size (i.e. 13 KB, 4.1 MB,
- 102 num_bytes, etc).
- """
- try:
- num_bytes = float(num_bytes)
- except (TypeError, ValueError, UnicodeDecodeError):
- return "%(size)d byte", "%(size)d num_bytes" % {'size': 0}
-
- filesize_number_format = lambda value: "%0.2f" % (round(value, 1),)
-
- if num_bytes < 1024:
- return "%(size)d bytes" % {'size': num_bytes}
- if num_bytes < 1024 * 1024:
- return "%s KB" % filesize_number_format(num_bytes / 1024)
- if num_bytes < 1024 * 1024 * 1024:
- return "%s MB" % filesize_number_format(num_bytes / (1024 * 1024))
- return "%s GB" % filesize_number_format(num_bytes / (1024 * 1024 * 1024))
-
- def invoke_remote(self):
- self._check_server_version(self.server, "0.3")
-
- print "Checking local and remote streams"
- remote = self.remote_server.streams()
- if self.args.STREAM:
- # Check that all requested streams are available remotely
- requested_set = frozenset(self.args.STREAM)
- remote_set = frozenset((stream["pathname"] for stream in remote))
- unavailable_set = requested_set - remote_set
- if unavailable_set:
- print >> sys.stderr, "Remote stream not found: %s" % ", ".join(unavailable_set)
- return -1
- # Limit to requested streams if necessary
- remote = [stream for stream in remote if stream["pathname"] in requested_set]
- local = self.server.streams()
- missing_pathnames = set([stream["pathname"] for stream in remote]) - set([stream["pathname"] for stream in local])
- for stream in remote:
- if stream["pathname"] in missing_pathnames:
- self.server.make_stream(stream["pathname"], stream["name"])
- local_bundles = []
- else:
- local_bundles = [bundle for bundle in self.server.bundles(stream["pathname"])]
- remote_bundles = [bundle for bundle in self.remote_server.bundles(stream["pathname"])]
- missing_bundles = set((bundle["content_sha1"] for bundle in remote_bundles))
- missing_bundles -= set((bundle["content_sha1"] for bundle in local_bundles))
- try:
- missing_bytes = sum(
- (bundle["content_size"]
- for bundle in remote_bundles
- if bundle["content_sha1"] in missing_bundles))
- except KeyError as ex:
- # Older servers did not return content_size so this part is optional
- missing_bytes = None
- if missing_bytes:
- print "Stream %s needs update (%s)" % (stream["pathname"], self._filesizeformat(missing_bytes))
- elif missing_bundles:
- print "Stream %s needs update (no estimate available)" % (stream["pathname"],)
- else:
- print "Stream %s is up to date" % (stream["pathname"],)
- for content_sha1 in missing_bundles:
- print "Getting %s" % (content_sha1,),
- sys.stdout.flush()
- data = self.remote_server.get(content_sha1)
- print "got %s, storing" % (self._filesizeformat(len(data["content"]))),
- sys.stdout.flush()
- try:
- self.server.put(data["content"], data["content_filename"], stream["pathname"])
- except xmlrpclib.Fault as ex:
- if ex.faultCode == 409: # duplicate
- print "already present (in another stream)"
- else:
- raise
- else:
- print "done"
-
-
-class data_views(ExperimentalCommandMixIn, XMLRPCCommand):
- """
- Show data views defined on the server
- """
- renderer = _get_pretty_renderer(
- column_map={
- 'name': 'Name',
- 'summary': 'Summary',
- },
- order=('name', 'summary'),
- empty="There are no data views defined yet",
- caption="Data Views")
-
- def invoke_remote(self):
- self._check_server_version(self.server, "0.4")
- self.renderer.render(self.server.data_views())
- print
- print "Tip: to invoke a data view try `lc-tool query-data-view`"
-
-
-class query_data_view(ExperimentalCommandMixIn, XMLRPCCommand):
- """
- Invoke a specified data view
- """
- @classmethod
- def register_arguments(cls, parser):
- super(query_data_view, cls).register_arguments(parser)
- parser.add_argument("QUERY", metavar="QUERY", nargs="...",
- help="Data view name and any optional and required arguments")
-
- def _probe_data_views(self):
- """
- Probe the server for information about data views
- """
- with self.safety_net():
- self.use_non_legacy_api_if_possible()
- self._check_server_version(self.server, "0.4")
- return self.server.data_views()
-
- def reparse_arguments(self, parser, raw_args):
- self.data_views = self._probe_data_views()
- if self.data_views is None:
- return
- # Here we hack a little, the last actuin is the QUERY action added
- # in register_arguments above. By removing it we make the output
- # of lc-tool query-data-view NAME --help more consistent.
- del parser._actions[-1]
- subparsers = parser.add_subparsers(
- title="Data views available on the server")
- for data_view in self.data_views:
- data_view_parser = subparsers.add_parser(
- data_view["name"],
- help=data_view["summary"],
- epilog=data_view["documentation"])
- data_view_parser.set_defaults(data_view=data_view)
- group = data_view_parser.add_argument_group("Data view parameters")
- for argument in data_view["arguments"]:
- if argument["default"] is None:
- group.add_argument(
- "--{name}".format(name=argument["name"].replace("_", "-")),
- dest=argument["name"],
- help=argument["help"],
- type=str,
- required=True)
- else:
- group.add_argument(
- "--{name}".format(name=argument["name"].replace("_", "-")),
- dest=argument["name"],
- help=argument["help"],
- type=str,
- default=argument["default"])
- self.args = self.parser.parse_args(raw_args)
-
- def invoke(self):
- # Override and _not_ call 'use_non_legacy_api_if_possible' as we
- # already did this reparse_arguments
- with self.safety_net():
- return self.invoke_remote()
-
- def invoke_remote(self):
- if self.data_views is None:
- return -1
- self._check_server_version(self.server, "0.4")
- # Build a collection of arguments for data view
- data_view_args = {}
- for argument in self.args.data_view["arguments"]:
- arg_name = argument["name"]
- if arg_name in self.args:
- data_view_args[arg_name] = getattr(self.args, arg_name)
- # Invoke the data view
- response = self.server.query_data_view(self.args.data_view["name"], data_view_args)
- # Create a pretty-printer
- renderer = _get_pretty_renderer(
- caption=self.args.data_view["summary"],
- order=[item["name"] for item in response["columns"]])
- # Post-process the data so that it fits the printer
- data_for_renderer = [
- dict(zip(
- [column["name"] for column in response["columns"]],
- row))
- for row in response["rows"]]
- # Print the data
- renderer.render(data_for_renderer)
-
-
-class version(Command):
- """
- Show dashboard client version
- """
- def invoke(self):
- import versiontools
- from lava_dashboard_tool import __version__
- print "Dashboard client version: {version}".format(
- version=versiontools.format_version(__version__))
=== removed file 'lava_dashboard_tool/main.py'
@@ -1,37 +0,0 @@
-# Copyright (C) 2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-dashboard-tool.
-#
-# lava-dashboard-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-dashboard-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-dashboard-tool. If not, see <http://www.gnu.org/licenses/>.
-
-
-from lava_tool.dispatcher import LavaDispatcher, run_with_dispatcher_class
-
-
-class LaunchControlDispatcher(LavaDispatcher):
-
- toolname = 'lava_dashboard_tool'
- description = """
- Command line tool for interacting with Launch Control
- """
- epilog = """
- Please report all bugs using the Launchpad bug tracker:
- http://bugs.launchpad.net/lava-dashboard-tool/+filebug
- """
-
-
-def main():
- run_with_dispatcher_class(LaunchControlDispatcher)
=== removed directory 'lava_dashboard_tool/tests'
=== removed file 'lava_dashboard_tool/tests/__init__.py'
=== removed file 'lava_dashboard_tool/tests/test_commands.py'
@@ -1,44 +0,0 @@
-# Copyright (C) 2010,2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-dashboard-tool.
-#
-# lava-dashboard-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-dashboard-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-dashboard-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Unit tests for the launch_control.commands package
-"""
-
-from unittest import TestCase
-
-from lava_dashboard_tool.commands import XMLRPCCommand
-
-
-class XMLRPCCommandTestCase(TestCase):
-
- def test_construct_xml_rpc_url_preserves_path(self):
- self.assertEqual(
- XMLRPCCommand._construct_xml_rpc_url("http://domain/path"),
- "http://domain/path/xml-rpc/")
- self.assertEqual(
- XMLRPCCommand._construct_xml_rpc_url("http://domain/path/"),
- "http://domain/path/xml-rpc/")
-
- def test_construct_xml_rpc_url_adds_proper_suffix(self):
- self.assertEqual(
- XMLRPCCommand._construct_xml_rpc_url("http://domain/"),
- "http://domain/xml-rpc/")
- self.assertEqual(
- XMLRPCCommand._construct_xml_rpc_url("http://domain"),
- "http://domain/xml-rpc/")
=== removed directory 'lava_scheduler_tool'
=== removed file 'lava_scheduler_tool/__init__.py'
@@ -1,19 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro Limited
-#
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-scheduler-tool.
-#
-# lava-scheduler-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-scheduler-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-scheduler-tool. If not, see <http://www.gnu.org/licenses/>.
-
-__version__ = (0, 6, 0, "dev", 0)
=== removed file 'lava_scheduler_tool/commands.py'
@@ -1,157 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro Limited
-#
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-scheduler-tool.
-#
-# lava-scheduler-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-scheduler-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-scheduler-tool. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import sys
-import argparse
-import xmlrpclib
-
-from lava_tool.authtoken import AuthenticatingServerProxy, KeyringAuthBackend
-from lava.tool.command import Command, CommandGroup
-from lava.tool.errors import CommandError
-from lava.tool.commands import ExperimentalCommandMixIn
-
-
-class scheduler(CommandGroup):
- """
- Interact with LAVA Scheduler
- """
-
- namespace = "lava.scheduler.commands"
-
-
-class submit_job(ExperimentalCommandMixIn, Command):
- """
- Submit a job to lava-scheduler
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(submit_job, cls).register_arguments(parser)
- parser.add_argument("SERVER")
- parser.add_argument("JSON_FILE")
-
- def invoke(self):
- self.print_experimental_notice()
- server = AuthenticatingServerProxy(
- self.args.SERVER, auth_backend=KeyringAuthBackend())
- with open(self.args.JSON_FILE, 'rb') as stream:
- command_text = stream.read()
- try:
- job_id = server.scheduler.submit_job(command_text)
- except xmlrpclib.Fault, e:
- raise CommandError(str(e))
- else:
- print "submitted as job id:", job_id
-
-
-class resubmit_job(ExperimentalCommandMixIn, Command):
-
- @classmethod
- def register_arguments(self, parser):
- parser.add_argument("SERVER")
- parser.add_argument("JOB_ID", type=int)
-
- def invoke(self):
- self.print_experimental_notice()
- server = AuthenticatingServerProxy(
- self.args.SERVER, auth_backend=KeyringAuthBackend())
- try:
- job_id = server.scheduler.resubmit_job(self.args.JOB_ID)
- except xmlrpclib.Fault, e:
- raise CommandError(str(e))
- else:
- print "resubmitted as job id:", job_id
-
-
-class cancel_job(ExperimentalCommandMixIn, Command):
-
- @classmethod
- def register_arguments(self, parser):
- parser.add_argument("SERVER")
- parser.add_argument("JOB_ID", type=int)
-
- def invoke(self):
- self.print_experimental_notice()
- server = AuthenticatingServerProxy(
- self.args.SERVER, auth_backend=KeyringAuthBackend())
- server.scheduler.cancel_job(self.args.JOB_ID)
-
-
-class job_output(Command):
- """
- Get job output from the scheduler.
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(job_output, cls).register_arguments(parser)
- parser.add_argument("SERVER")
- parser.add_argument("JOB_ID",
- type=int,
- help="Job ID to download output file")
- parser.add_argument("--overwrite",
- action="store_true",
- help="Overwrite files on the local disk")
- parser.add_argument("--output", "-o",
- type=argparse.FileType("wb"),
- default=None,
- help="Alternate name of the output file")
-
- def invoke(self):
- if self.args.output is None:
- filename = str(self.args.JOB_ID) + '_output.txt'
- if os.path.exists(filename) and not self.args.overwrite:
- print >> sys.stderr, "File {filename!r} already exists".format(
- filename=filename)
- print >> sys.stderr, "You may pass --overwrite to write over it"
- return -1
- stream = open(filename, "wb")
- else:
- stream = self.args.output
- filename = self.args.output.name
-
- server = AuthenticatingServerProxy(
- self.args.SERVER, auth_backend=KeyringAuthBackend())
- stream.write(server.scheduler.job_output(self.args.JOB_ID).data)
-
- print "Downloaded job output of {0} to file {1!r}".format(
- self.args.JOB_ID, filename)
-
-
-class job_status(Command):
- """
- Get job status and bundle sha1, if it existed, from the scheduler.
- """
-
- @classmethod
- def register_arguments(cls, parser):
- super(job_status, cls).register_arguments(parser)
- parser.add_argument("SERVER")
- parser.add_argument("JOB_ID",
- type=int,
- help="Job ID to check the status")
-
- def invoke(self):
- server = AuthenticatingServerProxy(
- self.args.SERVER, auth_backend=KeyringAuthBackend())
- job_status = server.scheduler.job_status(self.args.JOB_ID)
-
- print "Job ID: %d\nJob Status: %s\nBundle SHA1: %s" %(self.args.JOB_ID,
- job_status['job_status'], job_status['bundle_sha1'])
-
=== removed directory 'lava_tool'
=== removed file 'lava_tool/__init__.py'
@@ -1,24 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Deprecated lava_tool package
-"""
-
-from lava.tool import __version__
=== removed file 'lava_tool/authtoken.py'
@@ -1,112 +0,0 @@
-# Copyright (C) 2011 Linaro Limited
-#
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-import base64
-import urllib
-import urllib2
-import os
-import xmlrpclib
-
-import keyring.core
-
-from lava_tool.interface import LavaCommandError
-
-
-class AuthBackend(object):
-
- def add_token(self, username, endpoint_url, token):
- raise NotImplementedError
-
- def get_token_for_endpoint(self, user, endpoint_url):
- raise NotImplementedError
-
-
-class KeyringAuthBackend(AuthBackend):
-
- def add_token(self, username, endpoint_url, token):
- keyring.core.set_password(
- "lava-tool-%s" % endpoint_url, username, token)
-
- def get_token_for_endpoint(self, username, endpoint_url):
- return keyring.core.get_password(
- "lava-tool-%s" % endpoint_url, username)
-
-
-class MemoryAuthBackend(AuthBackend):
-
- def __init__(self, user_endpoint_token_list):
- self._tokens = {}
- for user, endpoint, token in user_endpoint_token_list:
- self._tokens[(user, endpoint)] = token
-
- def add_token(self, username, endpoint_url, token):
- self._tokens[(username, endpoint_url)] = token
-
- def get_token_for_endpoint(self, username, endpoint_url):
- return self._tokens.get((username, endpoint_url))
-
-
-class XMLRPCTransport(xmlrpclib.Transport):
-
- def __init__(self, scheme, auth_backend):
- xmlrpclib.Transport.__init__(self)
- self._scheme = scheme
- self.auth_backend = auth_backend
- self._opener = urllib2.build_opener()
- self.verbose = 0
-
- def request(self, host, handler, request_body, verbose=0):
- self.verbose = verbose
- request = self.build_http_request(host, handler, request_body)
- try:
- response = self._opener.open(request)
- except urllib2.HTTPError as e:
- raise xmlrpclib.ProtocolError(
- host + handler, e.code, e.msg, e.info())
- return self.parse_response(response)
-
- def build_http_request(self, host, handler, request_body):
- token = None
- user = None
- auth, host = urllib.splituser(host)
- if auth:
- user, token = urllib.splitpasswd(auth)
- url = self._scheme + "://" + host + handler
- if user is not None and token is None:
- token = self.auth_backend.get_token_for_endpoint(user, url)
- if token is None:
- raise LavaCommandError(
- "Username provided but no token found.")
- request = urllib2.Request(url, request_body)
- request.add_header("Content-Type", "text/xml")
- if token:
- auth = base64.b64encode(urllib.unquote(user + ':' + token))
- request.add_header("Authorization", "Basic " + auth)
-
- return request
-
-
-class AuthenticatingServerProxy(xmlrpclib.ServerProxy):
-
- def __init__(self, uri, transport=None, encoding=None, verbose=0,
- allow_none=0, use_datetime=0, auth_backend=None):
- if transport is None:
- scheme = urllib.splittype(uri)[0]
- transport = XMLRPCTransport(scheme, auth_backend=auth_backend)
- xmlrpclib.ServerProxy.__init__(
- self, uri, transport, encoding, verbose, allow_none, use_datetime)
=== removed directory 'lava_tool/commands'
=== removed file 'lava_tool/commands/__init__.py'
@@ -1,24 +0,0 @@
-# Copyright (C) 2010 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Package with command line commands
-"""
-
-
-from lava.tool.commands import ExperimentalNoticeAction, ExperimentalCommandMixIn
=== removed file 'lava_tool/commands/auth.py'
@@ -1,127 +0,0 @@
-# Copyright (C) 2011 Linaro Limited
-#
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-import getpass
-import urlparse
-import xmlrpclib
-
-from lava_tool.authtoken import (
- AuthenticatingServerProxy,
- KeyringAuthBackend,
- MemoryAuthBackend,
- )
-from lava_tool.interface import Command, LavaCommandError
-
-
-def normalize_xmlrpc_url(uri):
- if '://' not in uri:
- uri = 'http://' + uri
- if not uri.endswith('/'):
- uri += '/'
- if not uri.endswith('/RPC2/'):
- uri += 'RPC2/'
- return uri
-
-
-class auth_add(Command):
- """
- Add an authentication token.
- """
-
- def __init__(self, parser, args, auth_backend=None):
- super(auth_add, self).__init__(parser, args)
- if auth_backend is None:
- auth_backend = KeyringAuthBackend()
- self.auth_backend = auth_backend
-
- @classmethod
- def register_arguments(cls, parser):
- super(auth_add, cls).register_arguments(parser)
- parser.add_argument(
- "HOST",
- help=("Endpoint to add token for, in the form "
- "scheme://username@host. The username will default to "
- "the currently logged in user."))
- parser.add_argument(
- "--token-file", default=None,
- help="Read the secret from here rather than prompting for it.")
- parser.add_argument(
- "--no-check", action='store_true',
- help=("By default, a call to the remote server is made to check "
- "that the added token works before remembering it. "
- "Passing this option prevents this check."))
-
- def invoke(self):
- uri = normalize_xmlrpc_url(self.args.HOST)
- parsed_host = urlparse.urlparse(uri)
-
- if parsed_host.username:
- username = parsed_host.username
- else:
- username = getpass.getuser()
-
- host = parsed_host.hostname
- if parsed_host.port:
- host += ':' + str(parsed_host.port)
-
- uri = '%s://%s@%s%s' % (
- parsed_host.scheme, username, host, parsed_host.path)
-
- if self.args.token_file:
- if parsed_host.password:
- raise LavaCommandError(
- "Token specified in url but --token-file also passed.")
- else:
- try:
- token_file = open(self.args.token_file)
- except IOError as ex:
- raise LavaCommandError(
- "opening %r failed: %s" % (self.args.token_file, ex))
- token = token_file.read().strip()
- else:
- if parsed_host.password:
- token = parsed_host.password
- else:
- token = getpass.getpass("Paste token for %s: " % uri)
-
- userless_uri = '%s://%s%s' % (
- parsed_host.scheme, host, parsed_host.path)
-
- if not self.args.no_check:
- sp = AuthenticatingServerProxy(
- uri, auth_backend=MemoryAuthBackend(
- [(username, userless_uri, token)]))
- try:
- token_user = sp.system.whoami()
- except xmlrpclib.ProtocolError as ex:
- if ex.errcode == 401:
- raise LavaCommandError(
- "Token rejected by server for user %s." % username)
- else:
- raise
- except xmlrpclib.Fault as ex:
- raise LavaCommandError(
- "Server reported error during check: %s." % ex)
- if token_user != username:
- raise LavaCommandError(
- "whoami() returned %s rather than expected %s -- this is "
- "a bug." % (token_user, username))
-
- self.auth_backend.add_token(username, userless_uri, token)
-
- print 'Token added successfully for user %s.' % username
=== removed file 'lava_tool/dispatcher.py'
@@ -1,49 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Module with LavaDispatcher - the command dispatcher
-"""
-
-from lava.tool.dispatcher import Dispatcher
-from lava.tool.main import LavaDispatcher as LavaNonLegacyDispatcher
-from lava_tool.interface import LavaCommandError
-
-
-class LavaDispatcher(Dispatcher):
- """
- Class implementing command line interface for launch control
- """
-
- toolname = None
-
- def __init__(self):
- super(LavaDispatcher, self).__init__()
- prefixes = ['lava_tool']
- if self.toolname is not None:
- prefixes.append(self.toolname)
- for prefix in prefixes:
- self.import_commands("%s.commands" % prefix)
-
-
-def run_with_dispatcher_class(cls):
- raise cls.run()
-
-
-def main():
- LavaDispatcher.run()
=== removed file 'lava_tool/interface.py'
@@ -1,24 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Interface for all lava-tool commands
-"""
-
-from lava.tool.errors import CommandError as LavaCommandError
-from lava.tool.command import Command, CommandGroup as SubCommand
=== removed directory 'lava_tool/tests'
=== removed file 'lava_tool/tests/__init__.py'
@@ -1,73 +0,0 @@
-# Copyright (C) 2010 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Package with unit tests for lava_tool
-"""
-
-import doctest
-import unittest
-
-
-def app_modules():
- return [
- 'lava_tool.commands',
- 'lava_tool.dispatcher',
- 'lava_tool.interface',
- 'lava_dashboard_tool.commands',
- ]
-
-
-def test_modules():
- return [
- 'lava.device.tests.test_commands',
- 'lava.device.tests.test_device',
- 'lava.helper.tests.test_command',
- 'lava.helper.tests.test_dispatcher',
- 'lava.helper.tests.test_template',
- 'lava.job.tests.test_commands',
- 'lava.job.tests.test_job',
- 'lava.script.tests.test_commands',
- 'lava.script.tests.test_script',
- 'lava.testdef.tests.test_commands',
- 'lava.tests.test_commands',
- 'lava.tests.test_config',
- 'lava.tests.test_parameter',
- 'lava_dashboard_tool.tests.test_commands',
- 'lava_tool.tests.test_auth_commands',
- 'lava_tool.tests.test_authtoken',
- 'lava_tool.tests.test_commands',
- 'lava_tool.tests.test_utils',
- ]
-
-
-def test_suite():
- """
- Build an unittest.TestSuite() object with all the tests in _modules.
- Each module is harvested for both regular unittests and doctests
- """
- modules = app_modules() + test_modules()
- suite = unittest.TestSuite()
- loader = unittest.TestLoader()
-
- for name in modules:
- unit_suite = loader.loadTestsFromName(name)
- suite.addTests(unit_suite)
- doc_suite = doctest.DocTestSuite(name)
- suite.addTests(doc_suite)
- return suite
=== removed file 'lava_tool/tests/test_auth_commands.py'
@@ -1,259 +0,0 @@
-# Copyright (C) 2011 Linaro Limited
-#
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Unit tests for the lava_tool.commands.auth package
-"""
-
-import StringIO
-import sys
-import tempfile
-import xmlrpclib
-
-from mocker import ARGS, KWARGS, CONTAINS, MockerTestCase
-
-from lava_tool.authtoken import MemoryAuthBackend
-from lava_tool.interface import LavaCommandError
-from lava_tool.commands.auth import auth_add
-
-
-class FakeArgs:
- token_file = None
- no_check = False
-
-
-class AuthAddTests(MockerTestCase):
-
- def setUp(self):
- MockerTestCase.setUp(self)
- self.saved_stdout = sys.stdout
- sys.stdout = StringIO.StringIO()
- self.saved_stderr = sys.stderr
- sys.stderr = StringIO.StringIO()
-
- def tearDown(self):
- MockerTestCase.tearDown(self)
- sys.stdout = self.saved_stdout
- sys.stderr = self.saved_stderr
-
- def make_command(self, auth_backend, **kwargs):
- args = FakeArgs()
- args.__dict__.update(kwargs)
- return auth_add(None, args, auth_backend)
-
- def test_token_taken_from_argument(self):
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com/RPC2/',
- no_check=True)
- cmd.invoke()
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com/RPC2/'))
-
- def test_RPC2_implied(self):
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com', no_check=True)
- cmd.invoke()
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com/RPC2/'))
-
- def test_scheme_recorded(self):
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='https://user:TOKEN@example.com/RPC2/',
- no_check=True)
- cmd.invoke()
- self.assertEqual(
- None,
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com/RPC2/'))
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'https://example.com/RPC2/'))
-
- def test_path_on_server_recorded(self):
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='https://user:TOKEN@example.com/path',
- no_check=True)
- cmd.invoke()
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'https://example.com/path/RPC2/'))
-
- def test_token_taken_from_getpass(self):
- mocked_getpass = self.mocker.replace(
- 'getpass.getpass', passthrough=False)
- mocked_getpass(CONTAINS('Paste token'))
- self.mocker.result("TOKEN")
- self.mocker.replay()
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user@example.com', no_check=True)
- cmd.invoke()
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com/RPC2/'))
-
- def test_token_taken_from_file(self):
- auth_backend = MemoryAuthBackend([])
- token_file = tempfile.NamedTemporaryFile('w')
- token_file.write("TOKEN")
- token_file.flush()
- cmd = self.make_command(
- auth_backend, HOST='http://user@example.com', no_check=True,
- token_file=token_file.name)
- cmd.invoke()
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com/RPC2/'))
-
- def test_token_file_and_in_url_conflict(self):
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com', no_check=True,
- token_file='some-file-name')
- self.assertRaises(LavaCommandError, cmd.invoke)
-
- def test_non_existent_token_reported(self):
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com', no_check=True,
- token_file='does-not-exist')
- self.assertRaises(LavaCommandError, cmd.invoke)
-
- def test_user_taken_from_getuser(self):
- mocked_getuser = self.mocker.replace(
- 'getpass.getuser', passthrough=False)
- mocked_getuser()
- self.mocker.result("user")
- self.mocker.replay()
- auth_backend = MemoryAuthBackend([])
- token_file = tempfile.NamedTemporaryFile('w')
- token_file.write("TOKEN")
- token_file.flush()
- cmd = self.make_command(
- auth_backend, HOST='http://example.com', no_check=True,
- token_file=token_file.name)
- cmd.invoke()
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com/RPC2/'))
-
- def test_port_included(self):
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend,
- HOST='http://user:TOKEN@example.com:1234',
- no_check=True)
- cmd.invoke()
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com:1234/RPC2/'))
-
- def test_check_made(self):
- mocked_AuthenticatingServerProxy = self.mocker.replace(
- 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
- mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
- # nospec() is required because of
- # https://bugs.launchpad.net/mocker/+bug/794351
- self.mocker.nospec()
- mocked_sp.system.whoami()
- self.mocker.result('user')
- self.mocker.replay()
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
- cmd.invoke()
- self.assertEqual(
- 'TOKEN',
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com/RPC2/'))
-
- def test_check_auth_failure_reported_nicely(self):
- mocked_AuthenticatingServerProxy = self.mocker.replace(
- 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
- mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
- # nospec() is required because of
- # https://bugs.launchpad.net/mocker/+bug/794351
- self.mocker.nospec()
- mocked_sp.system.whoami()
- self.mocker.throw(xmlrpclib.ProtocolError('', 401, '', []))
- self.mocker.replay()
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
- self.assertRaises(LavaCommandError, cmd.invoke)
-
- def test_check_fails_token_not_recorded(self):
- mocked_AuthenticatingServerProxy = self.mocker.replace(
- 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
- mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
- self.mocker.nospec()
- mocked_sp.system.whoami()
- self.mocker.throw(xmlrpclib.ProtocolError('', 401, '', []))
- self.mocker.replay()
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
- self.assertRaises(LavaCommandError, cmd.invoke)
- self.assertEqual(
- None,
- auth_backend.get_token_for_endpoint(
- 'user', 'http://example.com/RPC2/'))
-
- def test_check_other_http_failure_just_raised(self):
- mocked_AuthenticatingServerProxy = self.mocker.replace(
- 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
- mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
- # nospec() is required because of
- # https://bugs.launchpad.net/mocker/+bug/794351
- self.mocker.nospec()
- mocked_sp.system.whoami()
- self.mocker.throw(xmlrpclib.ProtocolError('', 500, '', []))
- self.mocker.replay()
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
- self.assertRaises(xmlrpclib.ProtocolError, cmd.invoke)
-
- def test_fault_reported(self):
- mocked_AuthenticatingServerProxy = self.mocker.replace(
- 'lava_tool.authtoken.AuthenticatingServerProxy', passthrough=False)
- mocked_sp = mocked_AuthenticatingServerProxy(ARGS, KWARGS)
- # nospec() is required because of
- # https://bugs.launchpad.net/mocker/+bug/794351
- self.mocker.nospec()
- mocked_sp.system.whoami()
- self.mocker.throw(xmlrpclib.Fault(100, 'faultString'))
- self.mocker.replay()
- auth_backend = MemoryAuthBackend([])
- cmd = self.make_command(
- auth_backend, HOST='http://user:TOKEN@example.com', no_check=False)
- self.assertRaises(LavaCommandError, cmd.invoke)
=== removed file 'lava_tool/tests/test_authtoken.py'
@@ -1,117 +0,0 @@
-# Copyright (C) 2011 Linaro Limited
-#
-# Author: Michael Hudson-Doyle <michael.hudson@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Unit tests for the lava_tool.authtoken package
-"""
-
-import base64
-import StringIO
-from unittest import TestCase
-import urlparse
-import xmlrpclib
-
-from mocker import ARGS, KWARGS, Mocker
-
-from lava_tool.authtoken import (
- AuthenticatingServerProxy,
- XMLRPCTransport,
- MemoryAuthBackend,
- )
-from lava_tool.interface import LavaCommandError
-
-class TestAuthenticatingServerProxy(TestCase):
-
- def auth_headers_for_method_call_on(self, url, auth_backend):
- parsed = urlparse.urlparse(url)
-
- mocker = Mocker()
- transport = mocker.mock()
-
- auth_data = []
-
- def intercept_request(host, handler, request_body, verbose=0):
- actual_transport = XMLRPCTransport(parsed.scheme, auth_backend)
- request = actual_transport.build_http_request(host, handler, request_body)
- if (request.has_header('Authorization')):
- auth_data.append(request.get_header('Authorization'))
-
- response_body = xmlrpclib.dumps((1,), methodresponse=True)
- response = StringIO.StringIO(response_body)
- response.status = 200
- response.__len__ = lambda: len(response_body)
-
- transport.request(ARGS, KWARGS)
- mocker.call(intercept_request)
- mocker.result(response)
-
- with mocker:
- server_proxy = AuthenticatingServerProxy(
- url, auth_backend=auth_backend, transport=transport)
- server_proxy.method()
-
- return auth_data
-
- def user_and_password_from_auth_data(self, auth_data):
- if len(auth_data) != 1:
- self.fail("expected exactly 1 header, got %r" % len(auth_data))
- [value] = auth_data
- if not value.startswith("Basic "):
- self.fail("non-basic auth header found in %r" % auth_data)
- auth = base64.b64decode(value[len("Basic "):])
- if ':' in auth:
- return tuple(auth.split(':', 1))
- else:
- return (auth, None)
-
- def test_no_user_no_auth(self):
- auth_headers = self.auth_headers_for_method_call_on(
- 'http://localhost/RPC2/', MemoryAuthBackend([]))
- self.assertEqual([], auth_headers)
-
- def test_token_used_for_auth_http(self):
- auth_headers = self.auth_headers_for_method_call_on(
- 'http://user@localhost/RPC2/',
- MemoryAuthBackend([('user', 'http://localhost/RPC2/', 'TOKEN')]))
- self.assertEqual(
- ('user', 'TOKEN'),
- self.user_and_password_from_auth_data(auth_headers))
-
- def test_token_used_for_auth_https(self):
- auth_headers = self.auth_headers_for_method_call_on(
- 'https://user@localhost/RPC2/',
- MemoryAuthBackend([('user', 'https://localhost/RPC2/', 'TOKEN')]))
- self.assertEqual(
- ('user', 'TOKEN'),
- self.user_and_password_from_auth_data(auth_headers))
-
- def test_port_included(self):
- auth_headers = self.auth_headers_for_method_call_on(
- 'http://user@localhost:1234/RPC2/',
- MemoryAuthBackend(
- [('user', 'http://localhost:1234/RPC2/', 'TOKEN')]))
- self.assertEqual(
- ('user', 'TOKEN'),
- self.user_and_password_from_auth_data(auth_headers))
-
- def test_error_when_user_but_no_token(self):
- self.assertRaises(
- LavaCommandError,
- self.auth_headers_for_method_call_on,
- 'http://user@localhost/RPC2/',
- MemoryAuthBackend([]))
=== removed file 'lava_tool/tests/test_commands.py'
@@ -1,137 +0,0 @@
-# Copyright (C) 2010 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-Unit tests for the launch_control.commands package
-"""
-
-from mocker import MockerTestCase, ARGS
-
-from lava_tool.interface import (
- Command,
- LavaCommandError,
- )
-from lava_tool.dispatcher import (
- LavaDispatcher,
- main,
- )
-
-
-class CommandTestCase(MockerTestCase):
-
- def test_register_arguments_does_nothing(self):
- parser = self.mocker.mock()
- self.mocker.replay()
- Command.register_arguments(parser)
-
- def test_not_implemented(self):
- self.assertRaises(NotImplementedError, Command(None, None).invoke)
-
- def test_get_name_uses_class_name(self):
- class Foo(Command):
- pass
- self.assertEqual(Foo.get_name(), "Foo")
-
- def test_get_name_strips_leading_underscore(self):
- class _Bar(Command):
- pass
- self.assertEqual(_Bar.get_name(), "Bar")
-
- def test_get_name_converts_underscore_to_dash(self):
- class froz_bot(Command):
- pass
- self.assertEqual(froz_bot.get_name(), "froz-bot")
-
- def test_get_help_uses_docstring(self):
- class ASDF(Command):
- """
- This command was named after the lisp package management system
- """
- self.assertEqual(
- ASDF.get_help(),
- 'This command was named after the lisp package management system')
-
- def test_get_help_defaults_to_None(self):
- class mysterious(Command):
- pass
-
- self.assertEqual(mysterious.get_help(), None)
-
- def test_get_epilog_defaults_to_None(self):
- class mysterious(Command):
- pass
- self.assertEqual(mysterious.get_epilog(), None)
-
- def test_get_epilog_returns_data_after_carriage_L(self):
- # The dot after 'before' is to make pep8 happy
- class help_with_epilog(Command):
- """
- before
- .
- after
- """
- self.assertEqual(help_with_epilog.get_epilog(), "after")
-
- def test_get_help_returns_data_before_carriage_L(self):
- # The dot after 'before' is to make pep8 happy
- class help_with_epilog(Command):
- """
- before
- .
- after
- """
- self.assertEqual(help_with_epilog.get_help(), "before\n.")
-
-
-class DispatcherTestCase(MockerTestCase):
-
- def test_main(self):
- dispatcher = self.mocker.patch(LavaDispatcher)
- dispatcher.dispatch(ARGS)
- self.mocker.replay()
-
- self.assertRaises(SystemExit, main)
-
- def test_add_command_cls(self):
- test_calls = []
-
- class test(Command):
-
- def invoke(self):
- test_calls.append(None)
-
- dispatcher = LavaDispatcher()
- dispatcher.add_command_cls(test)
- dispatcher.dispatch(raw_args=['test'])
- self.assertEqual(1, len(test_calls))
-
- def test_print_LavaCommandError_nicely(self):
- stderr = self.mocker.replace('sys.stderr', passthrough=False)
- stderr.write("ERROR: error message")
- stderr.write("\n")
- self.mocker.replay()
-
- class error(Command):
-
- def invoke(self):
- raise LavaCommandError("error message")
-
- dispatcher = LavaDispatcher()
- dispatcher.add_command_cls(error)
- exit_code = dispatcher.dispatch(raw_args=['error'])
- self.assertEquals(1, exit_code)
=== removed file 'lava_tool/tests/test_utils.py'
@@ -1,282 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-"""lava_tool.utils tests."""
-
-import os
-import shutil
-import subprocess
-import sys
-import tempfile
-
-from unittest import TestCase
-from mock import (
- MagicMock,
- call,
- patch,
-)
-
-from lava.tool.errors import CommandError
-from lava_tool.utils import (
- can_edit_file,
- create_dir,
- edit_file,
- execute,
- has_command,
- retrieve_file,
- verify_and_create_url,
- verify_file_extension,
-)
-
-
-class UtilTests(TestCase):
-
- def setUp(self):
- self.original_stdout = sys.stdout
- sys.stdout = open("/dev/null", "w")
- self.original_stderr = sys.stderr
- sys.stderr = open("/dev/null", "w")
- self.original_stdin = sys.stdin
- self.temp_file = tempfile.NamedTemporaryFile(delete=False)
-
- def tearDown(self):
- sys.stdin = self.original_stdin
- sys.stdout = self.original_stdout
- sys.stderr = self.original_stderr
- os.unlink(self.temp_file.name)
-
- @patch("lava_tool.utils.subprocess.check_call")
- def test_has_command_0(self, mocked_check_call):
- # Make sure we raise an exception when the subprocess is called.
- mocked_check_call.side_effect = subprocess.CalledProcessError(0, "")
- self.assertFalse(has_command(""))
-
- @patch("lava_tool.utils.subprocess.check_call")
- def test_has_command_1(self, mocked_check_call):
- # Check that a "command" exists. The call to subprocess is mocked.
- mocked_check_call.return_value = 0
- self.assertTrue(has_command(""))
-
- def test_verify_file_extension_with_extension(self):
- extension = ".test"
- supported = [extension[1:]]
- try:
- temp_file = tempfile.NamedTemporaryFile(suffix=extension,
- delete=False)
- obtained = verify_file_extension(
- temp_file.name, extension[1:], supported)
- self.assertEquals(temp_file.name, obtained)
- finally:
- if os.path.isfile(temp_file.name):
- os.unlink(temp_file.name)
-
- def test_verify_file_extension_without_extension(self):
- extension = "json"
- supported = [extension]
- expected = "/tmp/a_fake.json"
- obtained = verify_file_extension("/tmp/a_fake", extension, supported)
- self.assertEquals(expected, obtained)
-
- def test_verify_file_extension_with_unsupported_extension(self):
- extension = "json"
- supported = [extension]
- expected = "/tmp/a_fake.json"
- obtained = verify_file_extension(
- "/tmp/a_fake.extension", extension, supported)
- self.assertEquals(expected, obtained)
-
- @patch("os.listdir")
- def test_retrieve_job_file_0(self, mocked_os_listdir):
- # Make sure that exception is raised if we go through all the elements
- # returned by os.listdir().
- mocked_os_listdir.return_value = ["a_file"]
- self.assertRaises(CommandError, retrieve_file,
- "a_path", ["ext"])
-
- @patch("os.listdir")
- def test_retrieve_job_file_1(self, mocked_os_listdir):
- # Pass some files and directories to retrieve_file(), and make
- # sure a file with .json suffix is returned.
- # Pass also a hidden file.
- try:
- json_file = tempfile.NamedTemporaryFile(suffix=".json")
- json_file_name = os.path.basename(json_file.name)
-
- file_name_no_suffix = tempfile.NamedTemporaryFile(delete=False)
- file_name_with_suffix = tempfile.NamedTemporaryFile(
- suffix=".bork", delete=False)
-
- temp_dir_name = "submit_command_test_tmp_dir"
- temp_dir_path = os.path.join(tempfile.gettempdir(), temp_dir_name)
- os.makedirs(temp_dir_path)
-
- hidden_file = tempfile.NamedTemporaryFile(
- prefix=".tmp", delete=False)
-
- mocked_os_listdir.return_value = [
- temp_dir_name, file_name_no_suffix.name,
- file_name_with_suffix.name, json_file_name, hidden_file.name]
-
- obtained = retrieve_file(tempfile.gettempdir(), ["json"])
- self.assertEqual(json_file.name, obtained)
- finally:
- os.removedirs(temp_dir_path)
- os.unlink(file_name_no_suffix.name)
- os.unlink(file_name_with_suffix.name)
- os.unlink(hidden_file.name)
-
- def test_retrieve_job_file_2(self):
- # Pass a file with the valid extension.
- temp_file = tempfile.NamedTemporaryFile(suffix=".json")
- obtained = retrieve_file(temp_file.name, ["json"])
- self.assertEquals(temp_file.name, obtained)
-
- def test_retrieve_job_file_3(self):
- # Pass a file with a non-valid extension.
- temp_file = tempfile.NamedTemporaryFile(suffix=".bork")
- self.assertRaises(
- CommandError, retrieve_file, temp_file.name, ["json"])
-
- @patch("os.listdir")
- def test_retrieve_job_file_4(self, mocked_os_listdir):
- # Pass hidden and wrong files and make sure exception is thrown.
- a_hidden_file = ".a_hidden.json"
- b_hidden_file = ".b_hidden.json"
- c_wrong_file = "a_wrong_file.bork"
-
- mocked_os_listdir.return_value = [a_hidden_file, b_hidden_file, c_wrong_file]
- self.assertRaises(
- CommandError, retrieve_file, tempfile.gettempdir(), ["json"])
-
- @patch("lava_tool.utils.subprocess")
- def test_execute_0(self, mocked_subprocess):
- mocked_subprocess.check_call = MagicMock()
- execute("foo")
- self.assertEqual(mocked_subprocess.check_call.call_args_list,
- [call(["foo"])])
- self.assertTrue(mocked_subprocess.check_call.called)
-
- @patch("lava_tool.utils.subprocess.check_call")
- def test_execute_1(self, mocked_check_call):
- mocked_check_call.side_effect = subprocess.CalledProcessError(1, "foo")
- self.assertRaises(CommandError, execute, ["foo"])
-
- @patch("lava_tool.utils.subprocess")
- @patch("lava_tool.utils.has_command", return_value=False)
- @patch("lava_tool.utils.os.environ.get", return_value=None)
- @patch("lava_tool.utils.sys.exit")
- def test_edit_file_0(self, mocked_sys_exit, mocked_env_get,
- mocked_has_command, mocked_subprocess):
- edit_file(self.temp_file.name)
- self.assertTrue(mocked_sys_exit.called)
-
- @patch("lava_tool.utils.subprocess")
- @patch("lava_tool.utils.has_command", side_effect=[True, False])
- @patch("lava_tool.utils.os.environ.get", return_value=None)
- def test_edit_file_1(self, mocked_env_get, mocked_has_command,
- mocked_subprocess):
- mocked_subprocess.Popen = MagicMock()
- edit_file(self.temp_file.name)
- expected = [call(["sensible-editor", self.temp_file.name])]
- self.assertEqual(expected, mocked_subprocess.Popen.call_args_list)
-
- @patch("lava_tool.utils.subprocess")
- @patch("lava_tool.utils.has_command", side_effect=[False, True])
- @patch("lava_tool.utils.os.environ.get", return_value=None)
- def test_edit_file_2(self, mocked_env_get, mocked_has_command,
- mocked_subprocess):
- mocked_subprocess.Popen = MagicMock()
- edit_file(self.temp_file.name)
- expected = [call(["xdg-open", self.temp_file.name])]
- self.assertEqual(expected, mocked_subprocess.Popen.call_args_list)
-
- @patch("lava_tool.utils.subprocess")
- @patch("lava_tool.utils.has_command", return_value=False)
- @patch("lava_tool.utils.os.environ.get", return_value="vim")
- def test_edit_file_3(self, mocked_env_get, mocked_has_command,
- mocked_subprocess):
- mocked_subprocess.Popen = MagicMock()
- edit_file(self.temp_file.name)
- expected = [call(["vim", self.temp_file.name])]
- self.assertEqual(expected, mocked_subprocess.Popen.call_args_list)
-
- @patch("lava_tool.utils.subprocess")
- @patch("lava_tool.utils.has_command", return_value=False)
- @patch("lava_tool.utils.os.environ.get", return_value="vim")
- def test_edit_file_4(self, mocked_env_get, mocked_has_command,
- mocked_subprocess):
- mocked_subprocess.Popen = MagicMock()
- mocked_subprocess.Popen.side_effect = Exception()
- self.assertRaises(CommandError, edit_file, self.temp_file.name)
-
- def test_can_edit_file(self):
- # Tests the can_edit_file method of the config command.
- # This is to make sure the device config file is not erased when
- # checking if it is possible to open it.
- expected = ("hostname = a_fake_panda02\nconnection_command = \n"
- "device_type = panda\n")
-
- with open(self.temp_file.name, "w") as f:
- f.write(expected)
-
- self.assertTrue(can_edit_file(self.temp_file.name))
- obtained = ""
- with open(self.temp_file.name) as f:
- obtained = f.read()
-
- self.assertEqual(expected, obtained)
-
- def test_verify_and_create_url_0(self):
- expected = "https://www.example.org/"
- obtained = verify_and_create_url("www.example.org")
- self.assertEquals(expected, obtained)
-
- def test_verify_and_create_url_1(self):
- expected = "http://www.example.org/"
- obtained = verify_and_create_url("http://www.example.org")
- self.assertEquals(expected, obtained)
-
- def test_verify_and_create_url_2(self):
- expected = "http://www.example.org/RPC/"
- obtained = verify_and_create_url("http://www.example.org/RPC")
- self.assertEquals(expected, obtained)
-
- def test_verify_and_create_url_3(self):
- expected = "https://www.example.org/RPC/"
- obtained = verify_and_create_url("www.example.org/RPC")
- self.assertEquals(expected, obtained)
-
- def test_create_dir_0(self):
- try:
- temp_dir = os.path.join(tempfile.gettempdir(), "a_dir")
- create_dir(temp_dir)
- self.assertTrue(os.path.isdir(temp_dir))
- finally:
- shutil.rmtree(temp_dir)
-
- def test_create_dir_1(self):
- try:
- temp_dir = os.path.join(tempfile.gettempdir(), "a_dir")
- create_dir(temp_dir, "subdir")
- self.assertTrue(os.path.isdir(os.path.join(temp_dir, "subdir")))
- finally:
- shutil.rmtree(temp_dir)
-
- def test_create_dir_2(self):
- temp_dir = os.path.join("/", "a_temp_dir")
- self.assertRaises(CommandError, create_dir, temp_dir)
=== removed file 'lava_tool/utils.py'
@@ -1,329 +0,0 @@
-# Copyright (C) 2013 Linaro Limited
-#
-# Author: Milo Casagrande <milo.casagrande@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-import StringIO
-import base64
-import os
-import tarfile
-import tempfile
-import types
-import subprocess
-import sys
-import urlparse
-
-from lava.tool.errors import CommandError
-
-
-def has_command(command):
- """Checks that the given command is available.
-
- :param command: The name of the command to check availability.
- """
- command_available = True
- try:
- subprocess.check_call(["which", command],
- stdout=open(os.path.devnull, 'w'))
- except subprocess.CalledProcessError:
- command_available = False
- return command_available
-
-
-def to_list(value):
- """Return a list from the passed value.
-
- :param value: The parameter to turn into a list.
- """
- return_value = []
- if isinstance(value, types.StringType):
- return_value = [value]
- else:
- return_value = list(value)
- return return_value
-
-
-def create_tar(paths):
- """Creates a temporary tar file with the provided paths.
-
- The tar file is not deleted at the end, it has to be delete by who calls
- this function.
-
- If just a directory is passed, it will be flattened out: its contents will
- be added, but not the directory itself.
-
- :param paths: List of paths to be included in the tar archive.
- :type list
- :return The path to the temporary tar file.
- """
- paths = to_list(paths)
- try:
- temp_tar_file = tempfile.NamedTemporaryFile(suffix=".tar",
- delete=False)
- with tarfile.open(temp_tar_file.name, "w") as tar_file:
- for path in paths:
- full_path = os.path.abspath(path)
- if os.path.isfile(full_path):
- arcname = os.path.basename(full_path)
- tar_file.add(full_path, arcname=arcname)
- elif os.path.isdir(full_path):
- # If we pass a directory, flatten it out.
- # List its contents, and add them as they are.
- for element in os.listdir(full_path):
- arcname = element
- tar_file.add(os.path.join(full_path, element),
- arcname=arcname)
- return temp_tar_file.name
- except tarfile.TarError:
- raise CommandError("Error creating the temporary tar archive.")
-
-
-def base64_encode(path):
- """Encode in base64 the provided file.
-
- :param path: The path to a file.
- :return The file content encoded in base64.
- """
- if os.path.isfile(path):
- encoded_content = StringIO.StringIO()
-
- try:
- with open(path) as read_file:
- base64.encode(read_file, encoded_content)
-
- return encoded_content.getvalue().strip()
- except IOError:
- raise CommandError("Cannot read file "
- "'{0}'.".format(path))
- else:
- raise CommandError("Provided path does not exists or is not a file: "
- "{0}.".format(path))
-
-
-def retrieve_file(path, extensions):
- """Searches for a file that has one of the supported extensions.
-
- The path of the first file that matches one of the supported provided
- extensions will be returned. The files are examined in alphabetical
- order.
-
- :param path: Where to look for the file.
- :param extensions: A list of extensions the file to look for should
- have.
- :return The full path of the file.
- """
- if os.path.isfile(path):
- if check_valid_extension(path, extensions):
- retrieved_path = path
- else:
- raise CommandError("The provided file '{0}' is not "
- "valid: extension not supported.".format(path))
- else:
- dir_listing = os.listdir(path)
- dir_listing.sort()
-
- for element in dir_listing:
- if element.startswith("."):
- continue
-
- element_path = os.path.join(path, element)
- if os.path.isdir(element_path):
- continue
- elif os.path.isfile(element_path):
- if check_valid_extension(element_path, extensions):
- retrieved_path = element_path
- break
- else:
- raise CommandError("No suitable file found in '{0}'".format(path))
-
- return retrieved_path
-
-
-def check_valid_extension(path, extensions):
- """Checks that a file has one of the supported extensions.
-
- :param path: The file to check.
- :param extensions: A list of supported extensions.
- """
- is_valid = False
-
- local_path, file_name = os.path.split(path)
- name, full_extension = os.path.splitext(file_name)
-
- if full_extension:
- extension = full_extension[1:].strip().lower()
- if extension in extensions:
- is_valid = True
- return is_valid
-
-
-def verify_file_extension(path, default, supported):
- """Verifies if a file has a supported extensions.
-
- If the file does not have one, it will add the default extension
- provided.
-
- :param path: The path of a file to verify.
- :param default: The default extension to use.
- :param supported: A list of supported extensions to check against.
- :return The path of the file.
- """
- full_path, file_name = os.path.split(path)
- name, extension = os.path.splitext(file_name)
- if not extension:
- path = ".".join([path, default])
- elif extension[1:].lower() not in supported:
- path = os.path.join(full_path, ".".join([name, default]))
- return path
-
-
-def verify_path_existance(path):
- """Verifies if a given path exists on the file system.
-
- Raises a CommandError in case it exists.
-
- :param path: The path to verify.
- """
- if os.path.exists(path):
- raise CommandError("{0} already exists.".format(path))
-
-
-def verify_path_non_existance(path):
- """Verifies if a given path does not exist on the file system.
-
- Raises a CommandError in case it does not exist.
-
- :param path: The path to verify.
- """
- if not os.path.exists(path):
- raise CommandError("{0} does not exists.".format(path))
-
-
-def write_file(path, content):
- """Creates a file with the specified content.
-
- :param path: The path of the file to write.
- :param content: What to write in the file.
- """
- try:
- with open(path, "w") as to_write:
- to_write.write(content)
- except (OSError, IOError):
- raise CommandError("Error writing file '{0}'".format(path))
-
-
-def execute(cmd_args):
- """Executes the supplied command args.
-
- :param cmd_args: The command, and its optional arguments, to run.
- :return The command execution return code.
- """
- cmd_args = to_list(cmd_args)
- try:
- return subprocess.check_call(cmd_args)
- except subprocess.CalledProcessError:
- raise CommandError("Error running the following command: "
- "{0}".format(" ".join(cmd_args)))
-
-
-def can_edit_file(path):
- """Checks if a file can be opend in write mode.
-
- :param path: The path to the file.
- :return True if it is possible to write on the file, False otherwise.
- """
- can_edit = True
- try:
- fp = open(path, "a")
- fp.close()
- except IOError:
- can_edit = False
- return can_edit
-
-
-def edit_file(file_to_edit):
- """Opens the specified file with the default file editor.
-
- :param file_to_edit: The file to edit.
- """
- editor = os.environ.get("EDITOR", None)
- if editor is None:
- if has_command("sensible-editor"):
- editor = "sensible-editor"
- elif has_command("xdg-open"):
- editor = "xdg-open"
- else:
- # We really do not know how to open a file.
- print >> sys.stdout, ("Cannot find an editor to open the "
- "file '{0}'.".format(file_to_edit))
- print >> sys.stdout, ("Either set the 'EDITOR' environment "
- "variable, or install 'sensible-editor' "
- "or 'xdg-open'.")
- sys.exit(-1)
- try:
- subprocess.Popen([editor, file_to_edit]).wait()
- except Exception:
- raise CommandError("Error opening the file '{0}' with the "
- "following editor: {1}.".format(file_to_edit,
- editor))
-
-
-def verify_and_create_url(endpoint):
- """Checks that the provided values make a correct URL.
-
- If the server address does not contain a scheme, by default it will use
- HTTPS.
- The endpoint is then added at the URL.
-
- :param server: A server URL to verify.
- :return A URL.
- """
- url = ""
- if endpoint:
- scheme, netloc, path, params, query, fragment = \
- urlparse.urlparse(endpoint)
- if not scheme:
- scheme = "https"
- if not netloc:
- netloc, path = path, ""
-
- url = urlparse.urlunparse(
- (scheme, netloc, path, params, query, fragment))
-
- if url[-1:] != "/":
- url += "/"
-
- return url
-
-
-def create_dir(path, dir_name=None):
- """Checks if a directory does not exists, and creates it.
-
- :param path: The path where the directory should be created.
- :param dir_name: An optional name for a directory to be created at
- path (dir_name will be joined with path).
- :return The path of the created directory."""
- created_dir = path
- if dir_name:
- created_dir = os.path.join(path, dir_name)
-
- if not os.path.isdir(created_dir):
- try:
- os.makedirs(created_dir)
- except OSError:
- raise CommandError("Cannot create directory "
- "'{0}'.".format(created_dir))
- return created_dir
=== removed file 'setup.cfg'
@@ -1,2 +0,0 @@
-[upload]
-sign=True
=== removed file 'setup.py'
@@ -1,62 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2010 Linaro Limited
-#
-# Author: Zygmunt Krynicki <zygmunt.krynicki@linaro.org>
-#
-# This file is part of lava-tool.
-#
-# lava-tool is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3
-# as published by the Free Software Foundation
-#
-# lava-tool is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with lava-tool. If not, see <http://www.gnu.org/licenses/>.
-
-from setuptools import setup, find_packages
-from os.path import dirname, join
-
-entry_points = open(join(dirname(__file__), 'entry_points.ini')).read()
-
-setup(
- name='lava-tool',
- version=":versiontools:lava.tool:__version__",
- author="Zygmunt Krynicki",
- author_email="zygmunt.krynicki@linaro.org",
- namespace_packages=['lava'],
- packages=find_packages(),
- description="Command line utility for Linaro validation services",
- url='https://launchpad.net/lava-tool',
- test_suite='lava_tool.tests.test_suite',
- license="LGPLv3",
- entry_points=entry_points,
- classifiers=[
- "Development Status :: 4 - Beta",
- "Intended Audience :: Developers",
- ("License :: OSI Approved :: GNU Library or Lesser General Public"
- " License (LGPL)"),
- "Operating System :: OS Independent",
- "Programming Language :: Python :: 2.6",
- "Programming Language :: Python :: 2.7",
- "Topic :: Software Development :: Testing",
- ],
- install_requires=[
- 'PyYAML >= 3.10',
- 'argparse >= 1.1',
- 'argcomplete >= 0.3',
- 'keyring',
- 'json-schema-validator >= 2.0',
- 'versiontools >= 1.3.1',
- 'pyxdg == 0.25',
- ],
- setup_requires=['versiontools >= 1.3.1'],
- tests_require=[
- 'mocker >= 1.0',
- 'mock >= 0.7.2'
- ],
- zip_safe=True)