=== removed file 'linaro-fetch-image'
@@ -1,79 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) 2010, 2011 Linaro
-#
-# Author: James Tunnicliffe <james.tunnicliffe@linaro.org>
-#
-# This file is part of Linaro Image Tools.
-#
-# Linaro Image Tools is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# Linaro Image Tools is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Linaro Image Tools; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
-# USA.
-
-import sys
-import os
-import linaro_image_tools.fetch_image as fetch_image
-import logging
-
-
-def main():
- file_handler = fetch_image.FileHandler()
- config = fetch_image.FetchImageConfig()
-
- # Unfortunately we need to do a bit of a hack here and look for some
- # options before performing a full options parse.
- clean_cache = ("--clean-cache" in sys.argv[1:]
- or "-x" in sys.argv[1:])
-
- force_download = ("--force-download" in sys.argv[1:]
- or "-d" in sys.argv[1:])
-
- if clean_cache:
- file_handler.clean_cache()
-
- # If the settings file and server index need updating, grab them
- file_handler.update_files_from_server(force_download)
-
- # Load settings YAML, which defines the parameters we ask for and
- # acceptable responses from the user
- config.read_config(file_handler.settings_file)
-
- # Using the settings that the YAML defines as what we need for a build,
- # generate a command line parser and parse the command line
- config.parse_args(sys.argv[1:])
-
- if config.args['platform'] == "snapshot":
- config.args['release_or_snapshot'] = "snapshot"
- else:
- config.args['release_or_snapshot'] = "release"
-
- # Using the config we have, look up URLs to download data from in the
- # server index
- db = fetch_image.DB(file_handler.index_file)
-
- image_url, hwpack_url = db.get_image_and_hwpack_urls(config.args)
-
- if(image_url and hwpack_url):
-
- tools_dir = os.path.dirname(__file__)
- if tools_dir == '':
- tools_dir = None
-
- file_handler.create_media(image_url, hwpack_url,
- config.args, tools_dir)
- else:
- logging.error(
- "Unable to find files that match the parameters specified")
-
-if __name__ == '__main__':
- main()
=== removed file 'linaro-fetch-image-ui'
@@ -1,1760 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) 2010, 2011 Linaro
-#
-# Author: James Tunnicliffe <james.tunnicliffe@linaro.org>
-#
-# This file is part of Linaro Image Tools.
-#
-# Linaro Image Tools is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# Linaro Image Tools is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Linaro Image Tools; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
-# USA.
-
-import wx
-import wx.html
-import wx.wizard
-import wx.wizard as wiz
-import sys
-import re
-import os
-import linaro_image_tools.fetch_image as fetch_image
-import string
-import operator
-import Queue
-import time
-import datetime
-from linaro_image_tools.fetch_image import (QEMU, HARDWARE)
-from linaro_image_tools.media_create.check_device import (
- _get_system_bus_and_udisks_iface,
- _get_dbus_property,
- )
-import dbus
-
-def add_button(bind_to,
- sizer,
- label,
- style,
- select_event,
- hover_event,
- unhover_event):
-
- """Create a radio button with event bindings."""
- if(style != None):
- radio_button = wx.RadioButton(bind_to, label=label, style=style)
- else:
- radio_button = wx.RadioButton(bind_to, label=label)
-
- sizer.Add(radio_button, 0, wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP, 5)
- bind_to.Bind(wx.EVT_RADIOBUTTON, select_event, radio_button)
- wx.EVT_ENTER_WINDOW(radio_button, hover_event)
- wx.EVT_LEAVE_WINDOW(radio_button, unhover_event)
-
- return radio_button
-
-
-class ReleaseOrSnapshotPage(wiz.PyWizardPage):
- """Ask the user if they want to use a release or a snapshot"""
-
- def __init__(self, parent, config, db, pages, width):
- wiz.PyWizardPage.__init__(self, parent)
- self.config = config
- self.settings = self.config.settings
- self.sizer = wx.BoxSizer(wx.VERTICAL)
- self.next = None
- self.prev = None
- self.db = db
- self.wizard = parent
- self.pages = pages
- self.width = width
- self.settings['image'] = None
- self.os_selected = True
- self.width = width
-
- message = ("Would you like to use a Linaro release, or a more up to "
- "date, but possibly unstable build?")
- header = wx.StaticText(self, -1, message)
- header.Wrap(width - 10) # -10 because boarder below is 5 pixels wide
-
- self.box1 = wx.BoxSizer(wx.VERTICAL)
-
- self.button_text = {'release': "I would like to run stable, "
- "tested software.",
- 'snapshot': "I would like to run untested, but "
- "more up-to-date software."}
-
- self.rel_btn = add_button(self, self.box1, self.button_text['release'],
- wx.RB_GROUP, self.event_radio_button_select,
- None, None)
-
- # Save the setting for the default selected value
- self.settings['release_or_snapshot'] = "release"
-
- self.snap_btn = add_button(self, self.box1,
- self.button_text['snapshot'], None,
- self.event_radio_button_select, None, None)
-
- self.cp = wx.CollapsiblePane(self, label="Advanced Options",
- style=wx.CP_DEFAULT_STYLE |
- wx.CP_NO_TLW_RESIZE)
- self.Bind(wx.EVT_COLLAPSIBLEPANE_CHANGED, self.event_on_pane_changed,
- self.cp)
- self.make_pane_content(self.cp.GetPane())
- self.box2 = wx.BoxSizer(wx.VERTICAL)
- self.box2.Add(self.cp, 0)
-
- self.help_text_main = wx.StaticText(self, -1, "")
-
- self.sizer.Add(header)
- self.sizer.Add(self.box1, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.sizer.Add(self.box2, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.sizer.Add(self.help_text_main, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.cp.SetSizer(self.sizer)
- self.SetSizerAndFit(self.sizer)
- self.sizer.Fit(self)
- self.Move((50, 50))
-
- def make_pane_content(self, pane):
- self.adv_box = wx.BoxSizer(wx.VERTICAL)
-
- message = ("You have the option of selecting from several OS images "
- "and builds.")
- header = wx.StaticText(pane, -1, message)
- header.Wrap(self.width - 10) # -10 because boarder below is 5 px wide
- self.adv_box.Add(header)
- self.grid1 = wx.FlexGridSizer(0, 2, 0, 0)
- self.grid2 = wx.FlexGridSizer(0, 2, 0, 0)
- self.grid3 = wx.FlexGridSizer(0, 1, 0, 0)
-
- platforms = []
- for key, value in self.settings['choice']['platform'].items():
- platforms.append(key)
-
- style = wx.CB_DROPDOWN | wx.CB_READONLY
- self.settings['platform'] = None
-
- self.cb_release = wx.ComboBox(pane, style=style)
- self.Bind(wx.EVT_COMBOBOX, self.event_cb_release, self.cb_release)
-
- self.cb_build = wx.ComboBox(pane, style=style)
- self.Bind(wx.EVT_COMBOBOX, self.event_combo_box_build, self.cb_build)
-
- self.cb_image = wx.ComboBox(pane, style=style)
- self.Bind(wx.EVT_COMBOBOX, self.event_combo_box_os, self.cb_image)
-
- self.help_text = wx.StaticText(pane, -1, "")
-
- alignment = wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP
- align_text = alignment | wx.ALIGN_CENTER_VERTICAL
- self.grid1.Add(self.cb_image, 0, alignment, 5)
- self.grid1.Add(wx.StaticText(pane, -1, "The OS to run"),
- 0, align_text, 5)
-
- self.grid2.Add(self.cb_release, 0, alignment, 5)
- self.grid2.Add(wx.StaticText(pane, -1, "Available Linaro releases"),
- 0, align_text, 5)
-
- self.grid2.Add(self.cb_build, 0, alignment, 5)
- self.grid2.Add(wx.StaticText(pane, -1, "Available milestones"),
- 0, align_text, 5)
-
- self.adv_box.Add(self.grid1, 0, alignment, 0)
- self.adv_box.Add(self.grid2, 0, alignment, 0)
- self.adv_box.Add(self.grid3, 0, alignment, 0)
- self.update_visibility()
- self.adv_box.Add(self.help_text, 0, alignment, 5)
-
- pane.SetSizer(self.adv_box)
- self.adv_box.Fit(pane)
-
- # The following line is here because when the pane is shown and grid2
- # is not visible and an option is then selected to make it visible,
- # the pane does not (and seemingly can not) resize to accommodate
- # the contents of grid2. This results in some content staying
- # invisible until the pane is hidden and shown again. This blank bit
- # of text is just there to bulk up the size of the pane so work around
- # this. Padding is the size of grid2 / 2 (padding is applied all around
- # the StaticText).
-
- self.grid3.Add(wx.StaticText(pane, -1, ""), 0, align_text,
- self.grid2.GetSize().height/2)
- self.adv_box.Fit(pane)
-
- def update_visibility(self):
- if self.settings['release_or_snapshot'] == "snapshot":
- self.adv_box.Hide(self.grid2, True)
- else:
- self.adv_box.Show(self.grid2, True)
-
- def update_next_active(self):
- if self.build_available and self.os_selected:
- self.wizard.FindWindowById(wx.ID_FORWARD).Enable()
- else:
- self.wizard.FindWindowById(wx.ID_FORWARD).Disable()
-
- def update_build_box(self):
- """Depending on what hardware has been chosen, the OS list may be
- restricted. Filter out anything that is unavailable."""
- self.cb_build.Clear()
-
- if self.settings['release_or_snapshot'] == "snapshot":
- # This combo box is not used by snapshot builds
- self.cb_build.SetValue("Not applicable")
- self.build_available = True
- return
-
- builds = self.db.get_builds(self.settings['platform'])
- self.cb_build.SetValue("No build available")
-
- for build in builds:
- if(self.db.hardware_is_available_for_platform_build(
- self.settings['compatable_hwpacks'],
- self.settings['platform'],
- build)
- and self.db.build_is_available_for_platform_image(
- "release_binaries",
- self.settings['platform'],
- self.settings['image'],
- build)):
-
- self.cb_build.Append(build)
- self.cb_build.SetValue(build)
- self.settings['release_build'] = build
-
- available_hwpacks = (
- self.db.get_available_hwpacks_for_hardware_build_plaform(
- self.settings['compatable_hwpacks'],
- self.settings['platform'],
- self.settings['release_build']))
-
- if len(available_hwpacks):
- self.settings['hwpack'] = available_hwpacks[0]
- self.build_available = True
- else:
- self.build_available = False
-
- self.update_next_active()
-
- def update_release_and_build_boxes(self):
- """Depending on what hardware has been chosen, some builds may be
- unavailable..."""
- self.cb_release.Clear()
-
- if self.settings['release_or_snapshot'] == "snapshot":
- # This combo box is not used by snapshot builds
- self.cb_release.SetValue("Not applicable")
- return
-
- if not self.os_selected:
- self.cb_release.SetValue("-")
- self.update_build_box()
- self.update_next_active()
- return
-
- default_release = None
- for platform, value in self.settings['choice']['platform'].items():
- if(self.db.hardware_is_available_for_platform(
- self.settings['compatable_hwpacks'],
- platform)
- and len(self.db.execute_return_list(
- 'select * from release_binaries '
- 'where platform == ? and image == ?',
- (platform, self.settings['image'])))):
-
- if platform in self.settings['UI']['translate']:
- platform = self.settings['UI']['translate'][platform]
-
- self.cb_release.Append(platform, platform.upper())
- if not default_release or default_release < platform:
- default_release = platform
-
- if default_release in self.settings['UI']['reverse-translate']:
- default = self.settings['UI']['reverse-translate'][default_release]
- else:
- default = default_release
-
- if not self.settings['platform']:
- # No platform has been chose, go with the default
- self.settings['platform'] = default
- self.cb_release.SetValue(default_release)
- else:
- # Don't change the value of platform if it is set.
- pf = self.settings['UI']['translate'][self.settings['platform']]
- self.cb_release.SetValue(pf)
-
- self.update_build_box()
- self.update_next_active()
-
- def get_human_os_name(self, item):
- """Given an OS name from the database, return a human name (either
- translated from the YAML settings, or just prettified) and if it is a
- LEB OS or not"""
-
- item = re.sub("linaro-", "", item) # Remove any linaro- decoration
-
- if item in self.settings['UI']['descriptions']:
- human_name = self.settings['UI']['descriptions'][item]
- else:
- # Make human_name look nicer...
- human_name = string.capwords(item)
-
- leb_search = re.search("^LEB:\s*(.*)$", human_name)
-
- if leb_search:
- return leb_search.group(1), True
-
- return human_name, False
-
- def fill_os_list(self):
- """Filter the list of OS's from the config file based on the users
- preferences so all choices in the list are valid (i.e. their hardware
- is supported for the build they have chosen)."""
-
- # select unique image from snapshot_binaries/release_binaries to
- # generate list
- os_list = None
- if self.settings['release_or_snapshot'] == "release":
- os_list = self.db.get_os_list_from('release_binaries')
- else:
- os_list = self.db.get_os_list_from('snapshot_binaries')
-
- self.cb_image.Clear()
-
- printed_tag = None
- last_name = None
- current_image_setting_valid = False
-
- for state in ["LEB", "other"]:
- for item in os_list:
- if item == "old":
- # Old is a directory that sometimes hangs around,
- # but isn't one we want to display
- continue
-
- # Save the original, untouched image name for use later.
- # We give it a more human name for display
- original = item
- item = re.sub("linaro-", "", item)
-
- os_hardware_combo_available = (
- self.db.image_hardware_combo_available(
- self.settings['release_or_snapshot'],
- original,
- self.settings['compatable_hwpacks']))
-
- if os_hardware_combo_available:
- human_name, is_LEB = self.get_human_os_name(item)
-
- if item == self.settings['image']:
- current_image_setting_valid = True
-
- if state == "LEB" and is_LEB:
-
- if printed_tag != state:
- self.cb_image.Append(
- "- Linaro Supported Releases -")
- printed_tag = state
-
- self.cb_image.Append(human_name, original)
-
- if self.settings['image'] == None:
- self.settings['image'] = original
- self.os_selected = True
-
- elif state != "LEB" and not is_LEB:
- if printed_tag != state:
- self.cb_image.Append(
- "- Community Supported Releases -")
- printed_tag = state
-
- self.cb_image.Append(human_name, original)
-
- last_name = original
-
- if( self.settings['image'] != None
- and current_image_setting_valid == False):
- # If we have an image setting, but it doesn't match the OS list, we
- # have switched OS list. It may be that adding/removing "linaro-"
- # from the name will get a match.
-
- if re.search("linaro-", self.settings['image']):
- test_name = re.sub("linaro-", "", self.settings['image'])
- else:
- test_name = "linaro-" + self.settings['image']
-
- if test_name in os_list:
- # Success! We have translated the name and can retain the
- # "old setting"
- self.settings['image'] = test_name
- current_image_setting_valid = True
- self.os_selected = True
-
- if( self.settings['image'] == None
- or current_image_setting_valid == False):
- # This should only get hit if there are no LEBs available
- self.settings['image'] = last_name
- self.os_selected = True
-
- assert self.settings['image']
-
- # Make sure the visible selected value matches the saved setting
- self.cb_image.SetValue(
- self.get_human_os_name(self.settings['image'])[0])
-
- def force_rel_snap_if_hw_requires(self):
- """
- If a hardware pack is only available on a release or snapshot build
- then force self.settings['release_or_snapshot'] and grey out the
- other radio button. Also display a helpful message.
- """
-
- snapshot_ok = False
- release_ok = False
-
- for hwpack in self.settings['compatable_hwpacks']:
- if self.db.hardware_is_available_in_table("snapshot_hwpacks",
- hwpack):
- snapshot_ok = True
- if self.db.hardware_is_available_in_table("release_hwpacks",
- hwpack):
- release_ok = True
-
- assert release_ok or snapshot_ok, ("release or snapshot should have"
- "a hardware pack available")
-
- self.rel_btn.Enable()
- self.snap_btn.Enable()
- message = ""
-
- if not release_ok:
- self.settings['release_or_snapshot'] = "snapshot"
- self.rel_btn.Disable()
- self.rel_btn.SetValue(False)
- self.snap_btn.SetValue(True)
- message = ("The hardware you have chosen is only available for "
- "snapshot builds, so release builds have been disabled")
-
- if not snapshot_ok:
- self.settings['release_or_snapshot'] = "release"
- self.snap_btn.Disable()
- self.snap_btn.SetValue(False)
- self.rel_btn.SetValue(True)
- message = ("The hardware you have chosen is only available for "
- "release builds, so snapshot builds have been disabled")
-
- self.help_text_main.SetLabel(message)
- self.help_text_main.Wrap(self.width - 10)
-
- def GetNext(self):
- if self.settings['release_or_snapshot'] == "release":
- return self.pages['lmc_settings']
- else:
- return self.pages['select_snapshot']
-
- def GetPrev(self):
- return self.pages['hardware_details']
-
- #--- Event(s) ---
- def event_radio_button_select(self, event):
- # The radio button can be release or snapshot
- self.radio_selected = event.GetEventObject().GetLabel()
-
- if self.radio_selected == self.button_text['release']:
- self.settings['release_or_snapshot'] = "release"
- else:
- self.settings['release_or_snapshot'] = "snapshot"
-
- self.event_on_pane_changed(event)
-
- def event_on_pane_changed(self, event):
- self.fill_os_list()
- self.update_release_and_build_boxes()
- self.update_visibility()
- self.Layout()
- self.update_next_active()
-
- def event_cb_release(self, evt):
- str = evt.GetString().encode('ascii').lower()
- if str in self.settings['UI']['reverse-translate']:
- str = self.settings['UI']['reverse-translate'][str]
- self.settings['platform'] = str
-
- self.update_build_box()
-
- def event_combo_box_build(self, evt):
- self.settings['release_build'] = evt.GetString().encode('ascii')
-
- def event_combo_box_os(self, evt):
- self.settings['image'] = self.cb_image.GetClientData(
- evt.GetSelection())
-
- if self.settings['image']: # Is None for items that aren't an OS
- self.os_selected = True
- image = re.sub("linaro-", "", self.settings['image'])
-
- if image + "::long" in self.settings['UI']['descriptions']:
- self.help_text.SetLabel(self.settings['UI']
- ['descriptions']
- [image + "::long"])
- else:
- self.help_text.SetLabel("")
-
- else: # Have selected help text
- self.os_selected = False
- self.help_text.SetLabel("Please select an operating system to run "
- "on your chosen hardware.")
-
- self.help_text.Wrap(self.width - 10)
- self.update_release_and_build_boxes()
- self.update_next_active()
- #--- END event(s) ---
-
-
-class AboutMyHardwarePage(wiz.PyWizardPage):
- """Ask the user about their hardware. This only asks about the board, not
- any specific hardware packs because there can be multiple names for the
- same hardware pack or sometimes a hardware pack is only available in the
- releases or snapshots repository. We whittle down the choice as we go
- and the user can chose a hardare pack (if they don't like the default)
- under advanced options in the Linaro Media Create options
- page"""
-
- def __init__(self, parent, config, db):
- wiz.PyWizardPage.__init__(self, parent)
- self.settings = config.settings
- self.db = db
- self.sizer = wx.BoxSizer(wx.VERTICAL)
- self.box1 = wx.BoxSizer(wx.VERTICAL)
- self.box2 = wx.BoxSizer(wx.VERTICAL)
- self.next = None
-
- message = """\
-This Wizard will write an operating system of your choosing to either a disk
-image or to an MMC card. First we need to know what hardware you have."""
- header = wx.StaticText(self, label=message)
-
- #--- Hardware Combo Box ---
- # Make sure that the displayed release is the one set in settings if
- # no selection is made
- if "panda" in self.settings['choice']['hardware'].keys():
- default_hardware = "panda"
- else:
- default_hardware = self.settings['choice']['hardware'].keys()[-1]
-
- self.settings['hardware'] = default_hardware
- self.settings['compatable_hwpacks'] = (
- self.settings['choice']['hwpack'][self.settings['hardware']])
-
- self.cb_hardware = wx.ComboBox(self,
- value=self.settings['choice']
- ['hardware']
- [default_hardware],
- style=wx.CB_DROPDOWN | wx.CB_READONLY)
-
- self.Bind(wx.EVT_COMBOBOX,
- self.event_combo_box_hardware,
- self.cb_hardware)
-
- file_dev_grid = wx.FlexGridSizer(0, 1, 0, 0)
- line_1_grid = wx.FlexGridSizer(0, 2, 0, 0)
- self.box2.Add(file_dev_grid, 0, wx.EXPAND)
-
- # self.settings['write_to_file_or_device'] should match the first
- # button below...
- self.button_text = {'hardware': "I have a",
- 'sim': "I want to run on a hardware simulation."}
-
- self.settings['hw_or_qemu'] = HARDWARE
- add_button(self,
- line_1_grid,
- self.button_text['hardware'],
- wx.RB_GROUP,
- self.event_radio_button_select,
- None, None)
-
- line_1_grid.Add(self.cb_hardware)
- file_dev_grid.Add(line_1_grid)
-
- add_button(self,
- file_dev_grid,
- self.button_text['sim'],
- None,
- self.event_radio_button_select,
- None, None)
-
- self.sizer.Add(header)
- self.sizer.Add(self.box1, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.sizer.Add(self.box2, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.SetSizerAndFit(self.sizer)
- self.sizer.Fit(self)
- self.Move((50, 50))
-
- def on_page_changing(self):
- self.update_hardware_box()
-
- def update_hardware_box(self):
- self.cb_hardware.Clear()
-
- sorted_hardware_names = sorted(self.settings['choice']['hardware']
- .iteritems(),
- key=operator.itemgetter(1))
-
- for device_name, human_readable_name in sorted_hardware_names:
- for hwpack in self.settings['choice']['hwpack'][device_name]:
- if(self.db.hardware_is_available_in_table("snapshot_hwpacks",
- hwpack)
- or self.db.hardware_is_available_in_table("release_hwpacks",
- hwpack)):
- self.cb_hardware.Append(human_readable_name, device_name)
- break
-
- #--- Event(s) ---
- def event_combo_box_hardware(self, event):
- self.settings['hardware'] = (event
- .GetEventObject()
- .GetClientData(event.GetSelection())
- .encode('ascii'))
-
- self.settings['compatable_hwpacks'] = (
- self.settings['choice']['hwpack'][self.settings['hardware']])
- def event_radio_button_select(self, event):
- val = event.GetEventObject().GetLabel()
- if val == self.button_text['sim']:
- self.settings['hw_or_qemu'] = QEMU
- assert "beagle" in self.settings['choice']['hardware'].keys()
- self.settings['hardware'] = "beagle"
- self.settings['compatable_hwpacks'] = (
- self.settings['choice']['hwpack'][self.settings['hardware']])
-
- elif val == self.button_text['hardware']:
- self.settings['hw_or_qemu'] = HARDWARE
- #--- END event(s) ---
-
- def SetNext(self, next):
- self.next = next
-
- def GetNext(self):
- return self.next
-
-class SelectSnapshot(wiz.WizardPageSimple):
- """Present the user with a calendar widget and a list of builds available
- on the selected date so they can chose a snapshot. Filter out days when
- their chosen hardware does not have an available build."""
-
- def __init__(self, parent, config, db, width):
- wiz.WizardPageSimple.__init__(self, parent)
- self.settings = config.settings
- self.db = db
- self.wizard = parent
- self.width = width
- self.sizer = wx.BoxSizer(wx.VERTICAL)
-
- header = wx.StaticText(self,
- label="Builds are created most days. First "
- "please select the day on which the "
- "build you would like to use was built,"
- " then, if there was more than one "
- "build that day you will be able to "
- "select the build number.")
- header.Wrap(width - 10) # -10 because boarder below is 5 pixels wide
-
- box1 = wx.BoxSizer(wx.VERTICAL)
- self.sizer.Add(header)
-
- # Set today as the default build date in settings
- # (matches the date picker)
- self.today = wx.DateTime()
- self.today.SetToCurrent()
- self.settings['build_date'] = self.today.FormatISODate().encode('ascii')
-
- dpc = wx.DatePickerCtrl(self, size=(120, -1),
- style=wx.DP_DEFAULT)
- self.Bind(wx.EVT_DATE_CHANGED, self.on_date_changed, dpc)
-
- #--- Build number Combo Box ---
- # Make sure that the displayed build is the one set in settings if no
- # selection is made
- self.settings['build_number'] = 0
- self.update_build()
- self.cb_build = wx.ComboBox(self,
- style=wx.CB_DROPDOWN | wx.CB_READONLY)
- self.Bind(wx.EVT_COMBOBOX, self.event_combo_box_build, self.cb_build)
-
- #--- Layout ---
- # -- Combo boxes for hardware and image selection --
-
- grid2 = wx.FlexGridSizer(0, 2, 0, 0)
- grid2.Add(dpc, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- grid2.Add(self.cb_build, 0, wx.ALIGN_LEFT | wx.ALL, 5)
-
- box1.Add(grid2, 0, wx.ALIGN_LEFT | wx.ALL, 5)
-
- self.sizer.Add(box1, 0, wx.ALIGN_LEFT | wx.ALL, 5)
-
- self.help_text = wx.StaticText(self)
- self.sizer.Add(self.help_text, 1, wx.EXPAND, 5)
-
- self.SetSizer(self.sizer)
- self.sizer.Fit(self)
- self.Move((50, 50))
-
- def update_platform(self):
- build_and_date = self.settings['snapshot_build'].split(":")
-
- if len(build_and_date) == 2:
- self.settings['platform'] = (
- self.db.execute_return_list(
- "select platform from snapshot_binaries "
- "where date == ? and build == ?",
- (build_and_date[0], build_and_date[1])))
-
- if len(self.settings['platform']) > 0:
- self.settings['platform'] = self.settings['platform'][0][0]
-
- def update_build(self):
- small_date = re.sub('-', '', self.settings['build_date'])
- self.settings['snapshot_build'] = (small_date
- + ":"
- + str(self.settings['build_number']))
-
- def fill_build_combo_box_for_date(self, date):
- """Every time a date is chosen, this function should be called. It will
- check to see if a compatible build is available. If there isn't, it
- will search for one and provide some help text to tell the user when
- compatable builds were built."""
- # Re-populate the build combo box
-
- self.cb_build.Clear()
-
- builds = self.db.get_binary_builds_on_day_from_db(
- self.settings['image'],
- date,
- self.settings['compatable_hwpacks'])
-
- if len(builds):
- max = 0
- for item in builds:
- #Always get a tuple, only interested in the first entry
- item = item[0]
- self.cb_build.Append(item, item.upper())
-
- if item > max:
- max = item
-
- self.cb_build.SetValue(max)
- self.wizard.FindWindowById(wx.ID_FORWARD).Enable()
- self.help_text.SetLabel("")
-
- else:
- self.cb_build.SetValue("No builds available")
- future_date, past_date = self.db.get_next_prev_day_with_builds(
- self.settings['image'],
- date,
- self.settings['compatable_hwpacks'])
-
- help_text = None
-
- if future_date and past_date:
- help_text = ("There are no builds that match your "
- "specifications available on the selected date. "
- "The previous build was on " + past_date +
- " and the next build was on " + future_date + ".")
- elif future_date:
- help_text = ("There are no builds that match your "
- "specifications available on the selected date. "
- "The next build was on " + future_date +
- " and I couldn't find a past build (looked one "
- "year back from the selected date).")
- elif past_date:
- help_text = ("There are no builds that match your "
- "specifications available on the selected date. "
- "The previous build was on " + past_date)
- if date != self.today.FormatISODate().encode('ascii'):
- help_text += (" and I couldn't find a future build (I "
- "looked up to one year forward from the "
- "selected date).")
- else:
- help_text = ("I could not find any builds that match your "
- "specifications close to the selected date (I "
- "looked forward and back one year from the "
- "selected date).")
-
- self.help_text.SetLabel(help_text)
- self.help_text.Wrap(self.width - 10)
- self.wizard.FindWindowById(wx.ID_FORWARD).Disable()
-
- #--- Event(s) ---
- def on_date_changed(self, evt):
- self.settings['build_date'] = evt.GetDate().FormatISODate().encode('ascii')
- self.fill_build_combo_box_for_date(self.settings['build_date'])
- self.update_build()
-
- def event_combo_box_build(self, evt):
- self.settings['build_number'] = evt.GetString().encode('ascii').lower()
- self.update_build()
- #--- END event(s) ---
-
-
-class LMC_settings(wiz.WizardPageSimple):
- """Present the user with, intially, the choice of writing the file system
- they are going to have created to a file, or directly to a device. Ask
- which file/device to write to.
-
- If writing to a device, the user is asked to tick a box saying that they
- understand that the device they have chosen will be erased.
-
- If the user ticks the advanced box, more options are shown."""
-
- def __init__(self, parent, config, db, width):
- wiz.WizardPageSimple.__init__(self, parent)
- self.settings = config.settings
- self.wizard = parent
- self.sizer = wx.BoxSizer(wx.VERTICAL)
- self.yes_use_mmc = False
- self.db = db
-
- self.settings['path_selected'] = ""
-
- header = wx.StaticText(self,
- label="Media Creation Settings\n\n"
- "Please select if you would like to write the "
- "file system I am about to create to a memory "
- "card, or to a file on the local file system.")
- header.Wrap(width - 10) # -10 because boarder below is 5 pixels wide
-
- #--- Layout ---
- self.sizer.Add(header, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- box1 = wx.BoxSizer(wx.VERTICAL)
- file_dev_grid = wx.FlexGridSizer(0, 2, 0, 0)
- box1.Add(file_dev_grid, 0, wx.EXPAND)
-
- # self.settings['write_to_file_or_device'] should match the first
- # button below...
- self.settings['write_to_file_or_device'] = "file"
- self.file_button = add_button(
- self,
- file_dev_grid,
- "Write to file",
- wx.RB_GROUP,
- self.event_radio_button_select,
- None, None)
-
- self.device_button = add_button(
- self,
- file_dev_grid,
- "Write to device",
- None,
- self.event_radio_button_select,
- None, None)
-
- self.help_text_values = {"device": "Please select a device to write "
- "the file system to:",
-
- "file": "Please select a file to write the "
- "file system to:"}
-
- self.help_text = wx.StaticText(
- self,
- label=self.help_text_values[
- self.settings['write_to_file_or_device']])
- self.help_text.Wrap(width - 10)
-
- #-- File/dev picker --
- file_browse_button = wx.Button(self, -1, "Browse")
- file_browse_grid = wx.FlexGridSizer(0, 2, 0, 0)
- self.file_path_and_name = wx.TextCtrl(self, -1, "", size=(300, -1))
-
- file_browse_grid.Add(self.file_path_and_name, 0, wx.EXPAND)
- file_browse_grid.Add(file_browse_button, 0, wx.EXPAND)
-
- self.Bind(wx.EVT_BUTTON,
- self.event_open_file_control,
- file_browse_button)
-
- self.Bind(wx.EVT_TEXT,
- self.event_file_path_and_name,
- self.file_path_and_name)
-
- box1.Add(self.help_text, 0, wx.ALIGN_LEFT | wx.ALL, 5)
-
- box1.Add(file_browse_grid, 0, wx.EXPAND)
-
- # Advanced settings collapsible pane
- self.cp = wx.CollapsiblePane(self, label="Advanced Options",
- style=wx.CP_DEFAULT_STYLE |
- wx.CP_NO_TLW_RESIZE)
- self.Bind(wx.EVT_COLLAPSIBLEPANE_CHANGED, self.event_on_pane_changed,
- self.cp)
- self.make_pane_content(self.cp.GetPane())
- self.box2 = wx.BoxSizer(wx.VERTICAL)
- self.box2.Add(self.cp)
-
- self.sizer.Add(box1, 0, wx.ALIGN_LEFT | wx.ALL, 0)
- self.sizer.Add(self.box2, 0, wx.ALIGN_LEFT | wx.ALL, 0)
- self.SetSizer(self.sizer)
- self.sizer.Fit(self)
- self.Move((50, 50))
-
- def make_pane_content(self, pane):
- self.box = wx.BoxSizer(wx.VERTICAL)
- grid1 = wx.FlexGridSizer(0, 2, 0, 0)
-
- #--- Build some widgets ---
- #-- Target file system --
- file_systems = ["ext4", "btrfs", "ext3", "ext2"]
- default_target = file_systems[0]
- self.settings['rootfs'] = default_target
- cb_rootfs = wx.ComboBox(pane,
- value=default_target,
- style=wx.CB_DROPDOWN | wx.CB_READONLY)
-
- for item in file_systems:
- cb_rootfs.Append(item, item.upper())
-
- self.Bind(wx.EVT_COMBOBOX, self.event_combo_box_rootfs, cb_rootfs)
-
- #-- Image size spinner
- self.image_size_spinner = wx.SpinCtrl(pane, -1, "")
- self.Bind(wx.EVT_SPINCTRL,
- self.event_image_size,
- self.image_size_spinner)
-
- #-- Swap size spinner
- self.swap_size_spinner = wx.SpinCtrl(pane, -1, "")
- self.Bind(wx.EVT_SPINCTRL,
- self.event_swap_size,
- self.swap_size_spinner)
-
-
- alignment = wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP
- grid1.Add(cb_rootfs, 0, alignment, 5)
-
- grid1.Add(wx.StaticText(pane,
- label="The root file system of the image"),
- 0, alignment, 5)
-
- # We want to sub-devide the cell, to add another grid sizer...
- file_size_grid = wx.FlexGridSizer(0, 2, 0, 0)
-
- grid1.Add(file_size_grid,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP)
-
- # Add a spinner that allows us to type/click a numerical value (defined above)
- file_size_grid.Add(self.image_size_spinner,
- 0,
- alignment,
- 5)
-
- # Add a choice of MB or GB for size input
- units = ["GB", "MB"]
- self.size_unit = units[0] # Set the default unit
- unit_choice = wx.Choice(pane, -1, (100, 50), choices=units)
- self.Bind(wx.EVT_CHOICE, self.event_chose_unit, unit_choice)
- file_size_grid.Add(unit_choice, 0, wx.ALIGN_RIGHT | wx.TOP, 5)
-
- # Back out of the extra grid, add some help text
- grid1.Add(wx.StaticText(
- pane,
- label="Writing to file only: Image file size"),
- 0,
- alignment,
- 5)
-
- # The swap size (MB only)
- grid1.Add(self.swap_size_spinner,
- 0,
- alignment,
- 5)
-
- grid1.Add(wx.StaticText(pane, label="Swap file size in MB"),
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.cb_hwpacks = wx.ComboBox(
- pane,
- value=self.settings['compatable_hwpacks'][0],
- style=wx.CB_DROPDOWN | wx.CB_READONLY)
-
- self.Bind(wx.EVT_COMBOBOX,
- self.event_combo_box_hwpack,
- self.cb_hwpacks)
-
- grid1.Add(self.cb_hwpacks,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- grid1.Add(wx.StaticText(pane, label="Compatible hardware packs"),
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
- desc_link = wx.HyperlinkCtrl(
- pane, id=-1, url="",
- label="Hardware pack descriptions")
- self.Bind(wx.EVT_HYPERLINK, self.event_hwpack_link, desc_link)
- grid1.Add(desc_link,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.box.Add(grid1, 0, alignment, 0)
- pane.SetSizer(self.box)
- self.box.Fit(pane)
-
- def on_activate(self):
- self.update_forward_active_and_mmc_confirm_box_visible()
- self.set_hwpacks_for_hardware()
- self.update_dev_file_buttons()
-
- def update_dev_file_buttons(self):
- if self.settings['hw_or_qemu'] == QEMU:
- self.device_button.Disable()
- self.device_button.SetValue(False)
- self.file_button.SetValue(True)
- else:
- self.device_button.Enable()
-
- def set_hwpacks_for_hardware(self):
- self.cb_hwpacks.Clear()
-
- if self.settings['release_or_snapshot'] == "snapshot":
- self.settings['build'] = self.settings['snapshot_build']
-
- date_and_build = self.settings['build'].split(":")
-
- compatable_hwpacks = (
- self.db.get_available_hwpacks_for_hardware_snapshot_build(
- self.settings['compatable_hwpacks'],
- self.settings['platform'],
- date_and_build[0],
- date_and_build[1]))
- else:
- self.settings['build'] = self.settings['release_build']
- compatable_hwpacks = (
- self.db.get_available_hwpacks_for_hardware_build_plaform(
- self.settings['compatable_hwpacks'],
- self.settings['platform'],
- self.settings['build']))
-
- for hwpack in compatable_hwpacks:
- self.cb_hwpacks.Append(hwpack)
-
- self.cb_hwpacks.SetStringSelection(compatable_hwpacks[0])
- self.settings['hwpack'] = compatable_hwpacks[0]
-
- def update_forward_active_and_mmc_confirm_box_visible(self):
- if( self.settings['path_selected']
- and self.settings['path_selected'] != ""):
- self.wizard.FindWindowById(wx.ID_FORWARD).Enable()
- else:
- self.wizard.FindWindowById(wx.ID_FORWARD).Disable()
-
- # --- Event Handlers ---
- def event_on_pane_changed(self, event):
- self.Layout()
-
- def event_open_file_control(self, event):
- if self.settings['write_to_file_or_device'] == "file":
-
- dlg = wx.FileDialog(self,
- message="Save file as ...",
- defaultDir=os.getcwd(),
- defaultFile="",
- style=wx.SAVE)
-
- elif self.settings['write_to_file_or_device'] == "device":
- dlg = DevChoser(self, -1, "Please chose a device", self.settings)
-
- result = dlg.ShowModal()
- file_or_dev = self.settings['write_to_file_or_device']
-
- if result == wx.ID_OK or file_or_dev == "device":
- # The dev chooser doesn't do ok/cancel, it just finishes.
- if self.settings['write_to_file_or_device'] == "file":
- self.settings['path_selected'] = dlg.GetPaths()[0]
- self.file_path_and_name.SetValue(self.settings['path_selected'])
-
- dlg.Destroy()
- self.update_forward_active_and_mmc_confirm_box_visible()
-
- def event_file_path_and_name(self, event):
- self.settings['path_selected'] = event.GetString()
- self.update_forward_active_and_mmc_confirm_box_visible()
-
- def event_combo_box_hwpack(self, event):
- self.settings['hwpack'] = event.GetString().encode('ascii')
-
- def event_combo_box_rootfs(self, evt):
- self.settings['rootfs'] = evt.GetString().encode('ascii').lower()
-
- def event_hwpack_link(self, event):
- hw_desc = self.settings['UI']['hwpack-descriptions']
- body = "<h3>%s</h3><ul>" % self.settings['hardware']
- for hwpack in self.cb_hwpacks.GetItems():
- desc = ''
- if hw_desc.has_key(hwpack):
- desc = " - %s" % hw_desc[hwpack]
- body += "<li><b>%s</b><i>%s</i></li>" % (hwpack, desc)
- body += "</ul>"
- HtmlDialog( event.GetEventObject().GetParent(),
- "Hardware Pack Descriptions: %s",
- body)
-
- def event_radio_button_select(self, event):
- """Search the label of the button that has been selected to work out
- what we are writing to."""
- setting_search = re.search(
- "write to (\w+)",
- event
- .GetEventObject()
- .GetLabel()
- .encode('ascii')
- .lower())
-
- assert setting_search
-
- self.settings['write_to_file_or_device'] = setting_search.group(1)
-
- self.help_text.SetLabel(
- self.help_text_values[self.settings['write_to_file_or_device']])
-
- self.update_forward_active_and_mmc_confirm_box_visible()
-
- def event_pick_file_path(self, evt):
- self.settings['path_selected'] = os.path.abspath(evt.GetPath())
- self.update_forward_active_and_mmc_confirm_box_visible()
-
- def update_image_size_setting(self):
- if(self.image_size_spinner.GetValue() > 0):
- self.settings['image_size'] = (str(self.image_size_spinner
- .GetValue())
- + self.size_unit[0])
- else:
- self.settings['image_size'] = None
-
- def event_image_size(self, event):
- self.update_image_size_setting()
-
- def event_chose_unit(self, event):
- self.size_unit = event.GetString()
- self.update_image_size_setting()
-
- def event_swap_size(self, event):
- self.settings['swap_file'] = str(self.image_size_spinner.GetValue())
-
-class HtmlDialog(wx.Dialog):
- def __init__(self, parent, title, content):
- wx.Dialog.__init__(self, parent, -1, title)
- html = wx.html.HtmlWindow(self)
-
- html.SetPage(content)
- self.SetSize(wx.Size(500,450))
- self.ShowModal()
- self.Destroy()
-
-class DevChoser(wx.Dialog):
- def __init__(self, parent, id, title, settings):
- self.settings = settings
- wx.Dialog.__init__(self, parent, id, title)
-
- vbox = wx.BoxSizer(wx.VERTICAL)
-
- dev_info = self.get_device_info()
- dev_number = 1
-
- grid1 = wx.FlexGridSizer(0, len(dev_info[0].keys())+1, 0, 0)
- group = wx.RB_GROUP
- alignment = wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP
- keys = dev_info[0].keys()
-
- grid1.Add(wx.StaticText(self, label=""))
- for key in keys:
- grid1.Add(wx.StaticText(self, label=key), 0, alignment, 5)
-
- self.id_dev_map = {}
- first = True
- for info in dev_info:
- button = add_button(self, grid1, str(dev_number), group,
- self.event_radio_button_select, None, None)
- group = None
-
- for key in keys:
- message = info[key]
- grid1.Add(wx.StaticText(self, label=message), 0, alignment, 5)
-
- if key == "path":
- self.id_dev_map[str(dev_number)] = info[key]
-
- if self.settings['path_selected'] == info[key]:
- button.SetValue(True)
-
- if first and not self.settings['path_selected']:
- self.settings['path_selected'] = info[key]
-
- dev_number += 1
-
- vbox.Add(grid1)
- hbox = wx.BoxSizer(wx.HORIZONTAL)
- okButton = wx.Button(self, -1, 'Use Selected Device')
- okButton.Bind(wx.EVT_BUTTON, self.OnCloseMe)
- hbox.Add(okButton, 1)
-
- vbox.Add(hbox, 1, wx.ALIGN_CENTER | wx.TOP | wx.BOTTOM, 10)
- vbox.Fit(self)
- self.SetSizer(vbox)
-
- def event_radio_button_select(self, event):
- val = event.GetEventObject().GetLabel()
- self.settings['path_selected'] = self.id_dev_map[val]
-
- def get_device_info(self):
- """Get information about devices found on the system.
- """
- bus, udisks = _get_system_bus_and_udisks_iface()
- devices = udisks.get_dbus_method('EnumerateDevices')()
- devices.sort()
- dev_info = []
- for path in devices:
- device = bus.get_object("org.freedesktop.UDisks", path)
- props = ['drive-model', 'drive-vendor',
- 'drive-connection-interface', 'drive-media']
- info = {'path': _get_dbus_property('DeviceFile', device, path)}
- if(not _get_dbus_property('device-is-partition', device, path) and
- _get_dbus_property('device-is-media-available', device, path)):
- for prop in props:
- info[prop] = str(_get_dbus_property(prop, device, path))
- dev_info.append(info)
-
- return dev_info
-
- def OnCloseMe(self, event):
- self.Close(True)
-
- def OnCloseWindow(self, event):
- self.Destroy()
-
-class RunLMC(wiz.WizardPageSimple):
- """Present the user with some information about their choices and a button
- to start linaro-media-create. The linaro-media-create process is started in
- a new thread and important events are communicated back to the UI through a
- queue."""
-
- def __init__(self, parent, config, db, width):
- wiz.WizardPageSimple.__init__(self, parent)
- self.settings = config.settings
- self.sizer = wx.BoxSizer(wx.VERTICAL)
- self.db = db
- self.width = width
- self.wizard = parent
-
- header = wx.StaticText(self, label="""Installing...""")
- header.Wrap(width - 10) # -10 because boarder below is 5 pixels wide
-
- self.sizer.Add(header)
- self.box1 = wx.BoxSizer(wx.VERTICAL)
-
- # We expect to print 4 lines of information, reserve space using blank
- # lines.
- self.settings_summary_text = wx.StaticText(self, label="\n\n\n\n")
- self.settings_summary_text.Wrap(width - 10)
-
- self.box1.Add(self.settings_summary_text, 0, wx.ALIGN_LEFT | wx.ALL, 5)
-
- self.start_button = wx.Button(self, 10, "Start", (20, 20))
- self.Bind(wx.EVT_BUTTON, self.start_lmc, self.start_button)
-
- self.start_button.SetToolTipString("Start creating an image, using the"
- "above settings.")
-
- self.start_button.SetSize(self.start_button.GetBestSize())
- self.box1.Add(self.start_button, 0, wx.ALIGN_LEFT | wx.ALL, 5)
-
- self.download_guage = wx.Gauge(self,
- -1,
- 1000,
- size=(self.width * 2 / 3, 25))
-
- self.status_grid = wx.FlexGridSizer(0, 2)
-
- self.status_grid.Add(wx.StaticText(self, label="Downloading files"),
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.downloading_files_status = wx.StaticText(self, label="")
-
- self.status_grid.Add(self.downloading_files_status,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.status_grid.Add(self.download_guage,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.downloading_files_info = wx.StaticText(self, label="")
-
- self.status_grid.Add(self.downloading_files_info,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.status_grid.Add(wx.StaticText(self, label="Unpacking downloads"),
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.unpacking_files_status = wx.StaticText(self, label="")
-
- self.status_grid.Add(self.unpacking_files_status,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.status_grid.Add(wx.StaticText(self, label="Installing packages"),
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.installing_packages_status = wx.StaticText(self, label="")
-
- self.status_grid.Add(self.installing_packages_status,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.status_grid.Add(wx.StaticText(self, label="Create file system"),
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.create_file_system_status = wx.StaticText(self, label="")
-
- self.status_grid.Add(self.create_file_system_status,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.status_grid.Add(wx.StaticText(self, label="Populate file system"),
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.populate_file_system_status = wx.StaticText(self, label="")
-
- self.status_grid.Add(self.populate_file_system_status,
- 0,
- wx.ALIGN_LEFT | wx.LEFT | wx.RIGHT | wx.TOP,
- 5)
-
- self.box2 = wx.BoxSizer(wx.VERTICAL)
- self.blank = wx.StaticText(self, label="") # Just like a spacer GIF...
- self.messages = wx.StaticText(self, label="")
- self.box2.Add(self.blank, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.box2.Add(self.messages, 0, wx.ALIGN_LEFT | wx.ALL, 5)
-
- self.sizer.Add(self.box1, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.sizer.Add(self.status_grid, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.sizer.Add(self.box2, 0, wx.ALIGN_LEFT | wx.ALL, 5)
- self.SetSizerAndFit(self.sizer)
- self.sizer.Fit(self)
- self.Move((50, 50))
-
- def on_activate(self):
- """Called just before the page is displayed to update the text based on
- the users preferences."""
-
- # The build is stored in different forms depending on if we are using a
- # release or snapshot but from here on in it is a common value
- if self.settings['release_or_snapshot'] == "snapshot":
- self.settings['build'] = self.settings['snapshot_build']
- else:
- self.settings['build'] = self.settings['release_build']
-
- settings_summary = ("Press start to create an image with the "
- "following settings:\n")
- settings_summary += "Operating System: " + self.settings['image'] + "\n"
- settings_summary += "Hardware: " + self.settings['hardware'] + "\n"
-
- # Assumption is that a file may be in a long path, we don't know how
- # big the font is and we don't want to allow the path to run off the
- # end of the line, so if a file is chosen, just show the file name.
- # Devices are (probably) /dev/some_short_name and the user really needs
- # to check them, so we show the whole thing.
- path = self.settings['path_selected']
- if self.settings['write_to_file_or_device'] == "file":
- path = self.settings['path_selected'].split(os.sep)[-1]
-
- settings_summary += ( "Writing image to "
- + self.settings['write_to_file_or_device']
- + " "
- + path)
-
- self.settings_summary_text.SetLabel(settings_summary)
- self.settings_summary_text.Wrap(self.width - 10)
-
- def start_lmc(self, event):
- """Start a thread that runs linaro-media-create and a timer, which
- checks for UI updates every 100ms"""
-
- if self.settings['write_to_file_or_device'] == "file":
- self.settings['image_file'] = self.settings['path_selected']
- elif self.settings['write_to_file_or_device'] == "device":
- self.settings['mmc'] = self.settings['path_selected']
-
- title = 'Are you sure?'
- text = "Completely erase {0}?".format(self.settings['mmc'])
- dlg = wx.MessageDialog(None, text, title,
- wx.YES_NO | wx.NO_DEFAULT | wx.ICON_QUESTION)
- if dlg.ShowModal() != wx.ID_YES:
- return
- else:
- assert False, ("self.config.settings['write_to_file_or_device'] "
- "was an unexpected value"
- + self.settings['write_to_file_or_device'])
-
- image_url, hwpack_url = self.db.get_image_and_hwpack_urls(self.settings)
-
- # Currently the UI is blocked when LMC is running, so grey out the
- # buttons to indicate to the user that they won't work!
- self.wizard.FindWindowById(wx.ID_BACKWARD).Disable()
- self.wizard.FindWindowById(wx.ID_CANCEL).Disable()
-
- if(image_url and hwpack_url):
-
- self.file_handler = fetch_image.FileHandler()
-
- self.timer = wx.Timer(self)
- self.Bind(wx.EVT_TIMER, self.timer_ping, self.timer)
- self.timer.Start(milliseconds=100, oneShot=True)
-
- tools_dir = os.path.dirname(__file__)
- if tools_dir == '':
- tools_dir = None
-
- self.start_button.Disable()
- self.event_queue = Queue.Queue()
- self.lmc_thread = self.file_handler.LinaroMediaCreate(
- image_url,
- hwpack_url,
- self.file_handler,
- self.event_queue,
- self.settings,
- tools_dir)
- self.lmc_thread.start()
- else:
- print >> sys.stderr, ("Unable to find files that match the"
- "parameters specified")
-
- def timer_ping(self, event):
- """During start_lmc a timer is started to poll for events from
- linaro-media-create every 100ms. This is the function which is called
- to do that polling."""
-
- while not self.event_queue.empty():
- event = self.event_queue.get()
-
- if event[0] == "start":
- self.event_start(event[1])
-
- elif event[0] == "end":
- self.event_end(event[1])
-
- elif event == "terminate" or event == "abort":
- # Process complete. Enable next button.
- self.wizard.FindWindowById(wx.ID_FORWARD).Enable()
- if event == "terminate":
- self.populate_file_system_status.SetLabel("Done")
- else:
- self.populate_file_system_status.SetLabel("Failed")
- return # Even if queue isn't empty, stop processing it
-
- elif event[0] == "update":
- self.event_update(event[1], event[2], event[3])
-
- elif event[0] == "message":
- self.messages.SetLabel(event[1])
-
- else:
- print >> sys.stderr, "timer_ping: Unhandled event", event
-
- self.timer.Start(milliseconds=50, oneShot=True)
-
- def unsigned_packages_query(self, package_list):
- message = ('In order to continue, I need to install some unsigned'
- 'packages into the image. Is this OK? The packages are:'
- '\n\n' + package_list)
-
- dlg = wx.MessageDialog(self,
- message,
- 'Install Unsigned Packages Into Image?',
- wx.YES_NO | wx.NO_DEFAULT)
-
- choice = dlg.ShowModal()
- dlg.Destroy()
-
- return choice == wx.ID_YES
-
- #--- Event(s) ---
- def event_start(self, event):
- if event == "download":
- pass
- elif event == "unpack":
- self.unpacking_files_status.SetLabel("Running")
- elif event == "installing packages":
- self.installing_packages_status.SetLabel("Running")
-
- elif re.search('^unverified_packages:', event):
- # Get rid of event ID and whitespace invariance
- packages = " ".join(event.split()[1:])
- install_unsigned_packages = self.unsigned_packages_query(packages)
-
- if install_unsigned_packages == False:
- # TODO: Tidy up other threads
- sys.exit(1)
- else:
- self.lmc_thread.send_to_create_process("y")
-
- elif event == "create file system":
- self.create_file_system_status.SetLabel("Running")
- elif event == "populate file system":
- self.populate_file_system_status.SetLabel("Running")
- else:
- print "Unhandled start event:", event
-
- def event_end(self, event):
- if event == "download":
- self.downloading_files_status.SetLabel("Done")
- elif event == "unpack":
- self.unpacking_files_status.SetLabel("Done")
- elif event == "installing packages":
- self.installing_packages_status.SetLabel("Done")
- elif event == "create file system":
- self.create_file_system_status.SetLabel("Done")
- elif event == "populate file system":
- self.populate_file_system_status.SetLabel("Done")
- else:
- print "Unhhandled end event:", event
-
- def event_update(self, task, update_type, value):
- if task == "download":
- if update_type == "progress":
- self.total_bytes_downloaded += value
-
- time_difference = time.time() - self.old_time
-
- if time_difference > 1.0:
- self.old_time = time.time()
-
- # More than a second has passed since we calculated data
- # rate
- speed = ( float( self.total_bytes_downloaded
- - self.old_bytes_downloaded)
- / time_difference)
-
- self.old_bytes_downloaded = self.total_bytes_downloaded
-
- self.speeds.append(speed)
-
- average_speed = 0
- speeds_accumulated = 0
- for speed in reversed(self.speeds):
- average_speed += speed
- speeds_accumulated += 1
-
- if speeds_accumulated == 6:
- break # do rolling average of 6 seconds
-
- average_speed /= speeds_accumulated
-
- time_remaining = ( ( self.total_bytes_to_download
- - self.total_bytes_downloaded)
- / speed)
-
- pretty_time = str(datetime.timedelta(seconds=int(
- time_remaining)))
-
- # Following table assumes we don't get past TBps internet
- # connections soon :-)
- units = ["Bps", "kBps", "MBps", "GBps", "TBps"]
- units_index = 0
- while speed > 1024:
- speed /= 1024
- units_index += 1
-
- info = "Downloading at {0:.1f} {1}".format(
- speed,
- units[units_index])
-
- self.downloading_files_status.SetLabel(info)
-
- info = "{0} remaining".format(
- pretty_time)
-
- self.downloading_files_info.SetLabel(info)
-
- self.download_guage.SetValue( 1000
- * self.total_bytes_downloaded
- / self.total_bytes_to_download)
-
- elif update_type == "total bytes":
- self.old_time = time.time()
- self.old_bytes_downloaded = 0
- self.total_bytes_to_download = value
- self.total_bytes_downloaded = 0
- self.speeds = [] # keep an array of speeds used to calculate
- # the estimated time remaining - by not just using the
- # current speed we can stop the ETA bouncing around too much.
-
- elif update_type == "message":
- self.downloading_files_status.SetLabel(value)
-
- def event_combo_box_release(self, evt):
- pass
-
- def event_combo_box_build(self, evt):
- pass
- #--- END event(s) ---
-
-
-class TestDriveWizard(wx.wizard.Wizard):
- def __init__(self, title):
- wx.wizard.Wizard.__init__(self, None, -1, title, wx.NullBitmap)
- self.Bind(wx.wizard.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing)
-
- def on_page_changing(self, evt):
- """Executed before the page changes."""
-
- hw_details_pg = self.pages['hardware_details']
- rel_or_snap_pg = self.pages['release_or_snapshot']
- select_snap_pg = self.pages['select_snapshot']
- lmc_settings_pg = self.pages['lmc_settings']
-
- page = evt.GetPage()
-
- if evt.GetDirection(): # If going forwards...
- # Always enable back button if going forwards
- self.wizard.FindWindowById(wx.ID_BACKWARD).Enable()
-
- # If going from a select snapshot or select release page, record
- # which we were on so the back button of the next page works
- if self.config.settings['release_or_snapshot'] == "release":
- lmc_settings_pg.SetPrev(rel_or_snap_pg)
- else:
- select_snap_pg.SetNext(lmc_settings_pg)
-
- lmc_settings_pg.SetPrev(select_snap_pg)
- select_snap_pg.SetPrev(rel_or_snap_pg)
-
- if page == rel_or_snap_pg:
- select_snap_pg.fill_build_combo_box_for_date(
- self.config.settings['build_date'])
-
- if page == hw_details_pg:
- rel_or_snap_pg.force_rel_snap_if_hw_requires()
- rel_or_snap_pg.fill_os_list()
- rel_or_snap_pg.update_release_and_build_boxes()
-
- if page == select_snap_pg:
- # Execute when exiting page
- select_snap_pg.update_platform()
-
- if( page == select_snap_pg
- or (page == rel_or_snap_pg and
- self.config.settings['release_or_snapshot'] == "release")):
- lmc_settings_pg.on_activate()
-
- if page == lmc_settings_pg:
- # Forward stays disabled until LMC has finished running
- self.wizard.FindWindowById(wx.ID_FORWARD).Disable()
- self.pages['run_lmc'].on_activate()
-
- else: # Always enable the forward button if reversing into a page
- self.wizard.FindWindowById(wx.ID_FORWARD).Enable()
-
- def go(self):
- file_handler = fetch_image.FileHandler()
- self.config = fetch_image.FetchImageConfig()
- self.config.settings["force_download"] = False
- self.config.settings['compatable_hwpacks'] = ['foo']
-
- # If the settings file and server index need updating, grab them
- file_handler.update_files_from_server()
-
- # Load settings YAML, which defines the parameters we ask for and
- # acceptable responses from the user
- self.config.read_config(file_handler.settings_file)
-
- # Using the config we have, look up URLs to download data from in
- # the server index
- db = fetch_image.DB(file_handler.index_file)
-
- # Create the wizard and the pages
- self.wizard = wiz.Wizard(self, -1, "Linaro Media Builder")
-
- self.pages = {}
- self.pages['hardware_details'] = AboutMyHardwarePage(self.wizard,
- self.config,
- db)
-
- self.wizard.FitToPage(self.pages['hardware_details'])
- (width, height) = self.wizard.GetSize()
-
- self.pages['release_or_snapshot'] = ReleaseOrSnapshotPage(self.wizard,
- self.config,
- db,
- self.pages,
- width)
-
- self.pages['select_snapshot'] = SelectSnapshot(self.wizard,
- self.config,
- db,
- width)
-
- self.pages['lmc_settings'] = LMC_settings(self.wizard,
- self.config,
- db,
- width)
-
- self.pages['run_lmc'] = RunLMC(self.wizard,
- self.config,
- db,
- width)
-
- self.pages['hardware_details'].SetNext(
- self.pages['release_or_snapshot'])
-
- self.pages['lmc_settings'].SetNext(self.pages['run_lmc'])
- self.pages['run_lmc'].SetPrev(self.pages['lmc_settings'])
-
- for (name, page) in self.pages.items():
- self.wizard.GetPageAreaSizer().Add(page)
-
- self.pages['hardware_details'].on_page_changing()
- self.wizard.RunWizard(self.pages['hardware_details'])
-
-
-def run():
- """Wrapper around the full wizard. Is encapsulated in its own function to
- allow a restart to be performed, as described in __main___, easily"""
- app = wx.PySimpleApp() # Start the application
- if app:
- pass # We don't use this directly. Stop pyflakes complaining!
-
- w = TestDriveWizard('Simple Wizard')
- return w.go()
-
-if __name__ == '__main__':
- run()
=== removed file 'linaro-image-indexer'
@@ -1,238 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) 2010, 2011 Linaro
-#
-# Author: James Tunnicliffe <james.tunnicliffe@linaro.org>
-#
-# This file is part of Linaro Image Tools.
-#
-# Linaro Image Tools is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# Linaro Image Tools is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Linaro Image Tools; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
-# USA.
-
-import os
-import re
-import urlparse
-import logging
-import bz2
-import linaro_image_tools.fetch_image
-
-RELEASES_WWW_DOCUMENT_ROOT = "/srv/releases.linaro.org/www"
-RELEASE_URL = "http://releases.linaro.org/"
-OLD_RELEASES_WWW_DOCUMENT_ROOT = "/srv/releases.linaro.org/www/platform"
-OLD_RELEASE_URL = "http://releases.linaro.org/platform/"
-SNAPSHOTS_WWW_DOCUMENT_ROOT = "/srv/snapshots.linaro.org/www/"
-SNAPSHOTS_URL = "http://snapshots.linaro.org/"
-
-class ServerIndexer():
- """Create a database of files on the linaro image servers for use by image
- creation tools."""
- def reset(self):
- self.url_parse = []
-
- def __init__(self):
- self.reset()
- self.db_file_name = "server_index"
- self.db = linaro_image_tools.fetch_image.DB(self.db_file_name)
-
- def regexp_list_matches_some(self, to_search, list):
- assert len(list), "empty list passed"
-
- for item in list:
- if re.search(item, to_search):
- return True
-
- return False
-
- def regexp_list_matches_all(self, to_search, list):
- assert len(list), "empty list passed"
-
- for item in list:
- if not re.search(item, to_search):
- return False
-
- return True
-
- def crawl(self):
- self.db.set_url_parse_info(self.url_parse)
- logging.getLogger("linaro_image_tools").info(self.url_parse)
-
- for index in range(len(self.url_parse)):
- info = self.url_parse[index]
- table = info["table"]
-
- logging.getLogger("linaro_image_tools").info("%s %s %s %s %s" % \
- (info["base_dir"], info["base_url"], table,
- info["url_validator"], info["url_chunks"]))
-
- self.go(info, table, index)
-
- logging.getLogger("linaro_image_tools").info("")
-
-
- def go(self, info, table, index):
- root_url = info["base_url"]
- root_dir = info["base_dir"]
-
- for root, subFolders, files in os.walk( root_dir ):
- for file in files:
- relative_location = re.sub(root_dir, "",
- os.path.join(root, file))
- relative_location = relative_location.lstrip("/")
-
- to_match = info["url_validator"][0]
- not_match = info["url_validator"][1]
-
- url = urlparse.urljoin(root_url, relative_location)
- url = urlparse.urljoin(url, file)
-
- to_match_ok = False
- if len(to_match) == 0:
- to_match_ok = True
- if len(to_match) and self.regexp_list_matches_all(
- relative_location, to_match):
- to_match_ok = True
-
- not_match_ok = True
- if len(not_match) and self.regexp_list_matches_some(
- relative_location, not_match):
- not_match_ok = False
-
- if( not (to_match_ok and not_match_ok)
- or not re.search("\.gz$", file)):
- continue # URL doesn't match the validator. Ignore.
-
- logging.getLogger("linaro_image_tools").info(url)
- self.db.record_url(url, index)
-
- self.dump()
-
- def dump(self):
- self.db.commit()
-
- def close_and_bzip2(self):
- # After finishing creating the database, create a compressed version
- # for more efficient downloads
- self.db.close()
- bz2_db_file = bz2.BZ2File(self.db_file_name + ".bz2", "w")
- db_file = open(self.db_file_name)
- bz2_db_file.write(db_file.read())
- bz2_db_file.close()
-
- def add_directory_parse_list(self,
- base_dir,
- base_url,
- url_validator,
- db_columns,
- table,
- url_chunks):
-
- if not id in self.url_parse:
- self.url_parse.append({"base_dir": base_dir,
- "base_url": base_url,
- "url_validator": url_validator,
- "db_columns": db_columns,
- "url_chunks": url_chunks,
- "table": table})
- logging.getLogger("linaro_image_tools").info(base_dir)
-
- # Construct data needed to create the table
- items = []
- for item in url_chunks:
- if(item != ""):
- # If the entry is a tuple, it indicates it is of the
- # form name, regexp
- if(isinstance(item, tuple)):
- items.append(item[0])
- else:
- items.append(item)
-
- self.db.create_table_with_name_columns(table, db_columns)
-
- def clean_removed_urls_from_db(self):
- self.db.clean_removed_urls_from_db()
-
-if __name__ == '__main__':
- crawler = ServerIndexer()
-
- ch = logging.StreamHandler()
- ch.setLevel(logging.CRITICAL)
- formatter = logging.Formatter("%(message)s")
- ch.setFormatter(formatter)
- logger = logging.getLogger("linaro_image_tools")
- logger.setLevel(logging.CRITICAL)
- logger.addHandler(ch)
-
- #linaro-n/ubuntu-desktop/11.09
- crawler.add_directory_parse_list(OLD_RELEASES_WWW_DOCUMENT_ROOT,
- OLD_RELEASE_URL,
- ([], ["platform/", "old/", "hwpack",
- "alpha", "beta", "final", "leb",
- "leb", "release-candidate"]),
- ["platform", "image", "build=final"],
- "release_binaries",
- ["", "image", "platform"])
-
- #linaro-n/hwpacks/11.09
- crawler.add_directory_parse_list(OLD_RELEASES_WWW_DOCUMENT_ROOT,
- OLD_RELEASE_URL,
- (["/hwpacks/"],
- ["alpha", "beta", "final", "leb",
- "release-candidate"]),
- ["platform", "hardware", "build=final"],
- "release_hwpacks",
- ["", "", "platform",
- ("hardware", r"hwpack_linaro-(.*?)_")])
-
- # 11.10/ubuntu/oneiric-images/ubuntu-desktop/
- # NOT images/...
- crawler.add_directory_parse_list(RELEASES_WWW_DOCUMENT_ROOT,
- RELEASE_URL,
- (["\d+\.\d+", "ubuntu", "oneiric-images"],
- ["latest/", "platform/", "old/",
- "hwpack", "^images/"]),
- ["platform", "image", "build=final"],
- "release_binaries",
- ["platform", "", "", "image"])
-
- # 11.10/ubuntu/oneiric-hwpacks/
- crawler.add_directory_parse_list(RELEASES_WWW_DOCUMENT_ROOT,
- RELEASE_URL,
- (["\d+\.\d+", "ubuntu", "oneiric-hwpacks"],
- ["latest/", "platform/", "old/",
- "^images/"]),
- ["platform", "hardware", "build=final"],
- "release_hwpacks",
- ["platform", "", "",
- ("hardware", r"hwpack_linaro-(.*?)_")])
-
- #oneiric/linaro-o-alip/20111026/0/images/tar/
- crawler.add_directory_parse_list(SNAPSHOTS_WWW_DOCUMENT_ROOT,
- SNAPSHOTS_URL,
- (["^oneiric/"], ["/hwpack"]),
- ["platform", "image", "date", "build"],
- "snapshot_binaries",
- ["platform", "image", "date", "build"])
-
- #oneiric/lt-panda-oneiric/20111026/0/images/hwpack/
- crawler.add_directory_parse_list(SNAPSHOTS_WWW_DOCUMENT_ROOT,
- SNAPSHOTS_URL,
- (["^oneiric/", "/hwpack"], []),
- ["platform", "hardware", "date", "build"],
- "snapshot_hwpacks",
- ["platform", "hardware", "date", "build"])
-
- crawler.crawl()
- crawler.clean_removed_urls_from_db()
- crawler.dump()
- crawler.close_and_bzip2()
=== removed file 'linaro_image_tools/fetch_image.py'
@@ -1,1607 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) 2010, 2011 Linaro
-#
-# Author: James Tunnicliffe <james.tunnicliffe@linaro.org>
-#
-# This file is part of Linaro Image Tools.
-#
-# Linaro Image Tools is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# Linaro Image Tools is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Linaro Image Tools; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
-# USA.
-
-import os
-import sys
-import re
-import urllib2
-import argparse
-import sqlite3
-import yaml
-import urlparse
-import logging
-import bz2
-import time
-import shutil
-import datetime
-import threading
-import subprocess
-import utils
-import xdg.BaseDirectory as xdgBaseDir
-
-
-QEMU = "qemu"
-HARDWARE = "hardware"
-
-class DownloadManager():
- def __init__(self, cachedir):
- self.cachedir = cachedir
- self.image_url = None
- self.hwpack_url = None
- self.settings = None
- self.event_queue = None
- self.to_download = None
- self.sha1_files = None
- self.downloaded_files = None
- self.sig_files = None
- self.verified_files = None
- self.gpg_sig_ok = None
- self.have_sha1sums = None
- self.have_gpg_sigs = None
-
- def name_and_path_from_url(self, url):
- """Return the file name and the path at which the file will be stored
- on the local system based on the URL we are downloading from"""
- # Use urlparse to get rid of everything but the host and path
- scheme, host, path, params, query, fragment = urlparse.urlparse(url)
-
- url_search = re.search(r"^(.*)/(.*?)$", host + path)
- assert url_search, "URL in unexpectd format" + host + path
-
- # Everything in url_search.group(1) should be directories and the
- # server name and url_search.group(2) the file name
- file_path = self.cachedir + os.sep + url_search.group(1)
- file_name = url_search.group(2)
-
- return file_name, file_path
-
- def urllib2_open(self, url):
- maxtries = 10
- for trycount in range(0, maxtries):
- try:
- response = urllib2.urlopen(url)
- except:
- if trycount < maxtries - 1:
- print "Unable to connect to", url, "retrying in 5 seconds."
- time.sleep(5)
- continue
- else:
- print "Connect failed:", url
- raise
- return None
- else:
- return response
-
- return None
-
- def list_files_in_dir_of_url(self, url):
- """
- return a directory listing of the directory that url sits in
- """
-
- import BeautifulSoup
-
- url = os.path.dirname(url)
- response = self.urllib2_open(url)
- page = response.read()
-
- dir = []
- # Use BeautifulSoup to parse the HTML and iterate over all 'a' tags
- soup = BeautifulSoup.BeautifulSoup(page)
- for link in soup.findAll('a'):
- for attr, value in link.attrs:
- if( attr == "href"
- and not re.search(r'[\?=;/]', value)):
- # Ignore links that don't look like plain files
- dir.append('/'.join([url, value]))
-
- return dir
-
- def sha1sum_file_download_list(self):
- """
- Need more than just the hwpack and OS image if we want signature
- verification to work, we need sha1sums, signatures for sha1sums
- and manifest files.
-
- Note that this code is a bit sloppy and may result in downloading
- more sums than are strictly required, but the
- files are small and the wrong files won't be used.
- """
-
- downloads_list = []
-
- # Get list of files in OS image directory
- image_dir = self.list_files_in_dir_of_url(self.image_url)
- hwpack_dir = self.list_files_in_dir_of_url(self.hwpack_url)
-
- for link in image_dir:
- if re.search("sha1sums\.txt$", link):
- downloads_list.append(link)
-
- for link in hwpack_dir:
- if( re.search(self.settings['hwpack'], link)
- and re.search("sha1sums\.txt$", link)):
- downloads_list.append(link)
-
- return downloads_list
-
- def generate_download_list(self):
- """
- Generate a list of files based on what is in the sums files that
- we have downloaded. We may have downloaded some sig files based on
- a rather sloppy file name match that we don't want to use. For the
- hwpack sig files, check to see if the hwpack listed matches the hwpack
- URL. If it doesn't ignore it.
-
- 1. Download sig file(s) that match the hardware spec (done by this
- point).
- 2. Find which sig file really matches the hardware pack we have
- downloaded. (this function calculates this list)
- 3. Download all the files listed in the sig file (done by another func)
-
- We go through this process because sometimes a directory will have
- more than one hardware pack that will match the hardware pack name,
- for example panda and panda-x11 will both match "panda". These checks
- make sure we only try and validate the signatures of the files that
- we should be downloading and not try and validatate a signature of a
- file that there is no reason for us to download, which would result in
- an an invalid warning about installing unsigned packages when running
- linaro-media-create.
- """
-
- downloads_list = [self.image_url, self.hwpack_url]
-
- for sha1sum_file_name in self.sha1_files.values():
- sha1sum_file = open(sha1sum_file_name)
-
- common_prefix = None
-
- files = []
- for line in sha1sum_file:
- line = line.split() # line[1] is now the file name
-
- if line[1] == os.path.basename(self.image_url):
- # Found a line that matches an image or hwpack URL - keep
- # the contents of this sig file
- common_prefix = os.path.dirname(self.image_url)
-
- if line[1] == os.path.basename(self.hwpack_url):
- # Found a line that matches an image or hwpack URL - keep
- # the contents of this sig file
- common_prefix = os.path.dirname(self.hwpack_url)
-
- if common_prefix:
- for file_name in files:
- downloads_list.append('/'.join([common_prefix,
- file_name]))
-
- dir = self.list_files_in_dir_of_url(common_prefix + '/')
-
- # We include the sha1sum files that pointed to the files that
- # we are going to download so the full file list can be parsed
- # later. The files won't be re-downloaded because they will be
- # cached.
- file_name = os.path.basename(sha1sum_file_name)
- downloads_list.append('/'.join([common_prefix, file_name]))
- signed_sums = os.path.basename(sha1sum_file_name) + ".asc"
-
- for link in dir:
- if re.search(signed_sums, link):
- downloads_list.append(link)
-
- sha1sum_file.close()
-
- return downloads_list
-
- def download_files(self,
- downloads_list,
- settings,
- event_queue=None,
- force_download=False):
- """
- Download files specified in the downloads_list, which is a list of
- url, name tuples.
- """
-
- force_download = settings["force_download"] or force_download
-
- downloaded_files = {}
-
- bytes_to_download = 0
-
- for url in downloads_list:
- file_name, file_path = self.name_and_path_from_url(url)
-
- file_name = file_path + os.sep + file_name
- if os.path.exists(file_name) and not force_download:
- continue # If file already exists, don't download it
-
- response = self.urllib2_open(url)
- if response:
- bytes_to_download += int(response.info()
- .getheader('Content-Length').strip())
- response.close()
-
- if event_queue:
- event_queue.put(("start", "download"))
- event_queue.put(("update",
- "download",
- "total bytes",
- bytes_to_download))
-
- for url in downloads_list:
- path = None
- try:
- path = self.download(url,
- event_queue,
- force_download)
- except Exception:
- # Download error. Hardly matters what, we can't continue.
- print "Unexpected error:", sys.exc_info()[0]
- logging.error("Unable to download " + url + " - aborting.")
-
- if path == None: # User hit cancel when downloading
- sys.exit(0)
-
- downloaded_files[url] = path
- logging.info("Have downloaded {0} to {1}".format(url, path))
-
- if event_queue:
- event_queue.put(("end", "download"))
-
- return downloaded_files
-
- def download(self,
- url,
- event_queue,
- force_download=False):
- """Downloads the file requested buy URL to the local cache and returns
- the full path to the downloaded file"""
-
- file_name, file_path = self.name_and_path_from_url(url)
- file_name = file_path + os.sep + file_name
-
- if not os.path.isdir(file_path):
- os.makedirs(file_path)
-
- if force_download != True and os.path.exists(file_name):
- logging.info(file_name + " already cached. Not downloading (use "
- "--force-download to override).")
- return file_name
-
- logging.info("Fetching " + url)
-
- response = self.urllib2_open(url)
-
- self.do_download = True
- file_out = open(file_name, 'w')
- download_size_in_bytes = int(response.info()
- .getheader('Content-Length').strip())
- chunks_downloaded = 0
-
- show_progress = download_size_in_bytes > 1024 * 200
-
- if show_progress:
- chunk_size = download_size_in_bytes / 1000
- if not event_queue:
- print "Fetching", url
- else:
- chunk_size = download_size_in_bytes
-
- printed_progress = False
- while self.do_download:
- chunk = response.read(chunk_size)
- if len(chunk):
- # Print a % download complete so we don't get too bored
- if show_progress:
- if event_queue:
- event_queue.put(("update",
- "download",
- "progress",
- len(chunk)))
- else:
- # Have 1000 chunks so div by 10 to get %...
- sys.stdout.write("\r%d%%" % (chunks_downloaded / 10))
- printed_progress = True
- sys.stdout.flush()
-
- file_out.write(chunk)
- chunks_downloaded += 1
-
- else:
- if printed_progress:
- print ""
- break
-
- file_out.close()
-
- if self.do_download == False:
- os.remove(file_name)
- return None
-
- return file_name
-
- def download_if_old(self, url, event_queue, force_download):
- file_name, file_path = self.name_and_path_from_url(url)
-
- file_path_and_name = file_path + os.sep + file_name
-
- if(not os.path.isdir(file_path)):
- os.makedirs(file_path)
- try:
- force_download = (force_download == True
- or ( time.mktime(time.localtime())
- - os.path.getmtime(file_path_and_name)
- > 60 * 60))
- except OSError:
- force_download = True # File not found...
-
- return self.download(url, event_queue, force_download)
-
- def get_sig_files(self):
- """
- Find sha1sum.txt files in downloaded_files, append ".asc" and return
- list.
-
- The reason for not just searching for .asc files is because if they
- don't exist, utils.verify_file_integrity(sig_files) won't check the
- sha1sums that do exist, and they are useful to us. Trying to check
- a GPG signature that doesn't exist just results in the signature check
- failing, which is correct and we act accordingly.
- """
-
- self.sig_files = []
-
- for filename in self.downloaded_files.values():
- if re.search(r"sha1sums\.txt$", filename):
- self.sig_files.append(filename + ".asc")
-
- def _download_sigs_gen_download_list(self, force_download=False):
-
- to_download = self.sha1sum_file_download_list()
- self.sha1_files = self.download_files(
- to_download, self.settings,
- force_download=force_download)
-
- self.to_download = self.generate_download_list()
-
- gpg_urls = [f for f in self.to_download if re.search('\.asc$', f)]
- self.download_files(
- gpg_urls, self.settings, force_download=force_download)
-
- def _check_downloads(self):
- self.get_sig_files()
-
- (self.verified_files, self.gpg_sig_ok,
- self.gpg_out) = utils.verify_file_integrity(self.sig_files)
-
- # Expect to have 2 sha1sum files (one for hwpack, one for OS bin)
- self.have_sha1sums = len(self.sha1_files) ==2
-
- # We expect 2 GPG signatures. Check that we have both of them.
-
- self.have_gpg_sigs = (len(self.sig_files) == 2 and
- os.path.exists(self.sig_files[0]) and
- os.path.exists(self.sig_files[1]))
-
- def _unverified_files(self):
- """
- Return a list of URLs of files that didn't get verified
- """
- verified_files = self.verified_files
-
- unverified_files = []
- for sig_file in self.sig_files:
- name = os.path.basename(sig_file)
- verified_files.append(name)
- verified_files.append(name[0:-len('.asc')])
-
- # Generate a list of files that didn't verify
- for url in self.to_download:
- url_file = os.path.basename(url)
- if url_file not in verified_files:
- unverified_files.append(url)
-
- return unverified_files
-
- def download_files_and_verify(self, image_url, hwpack_url,
- settings, event_queue=None):
-
- self.image_url = image_url
- self.hwpack_url = hwpack_url
- self.settings = settings
- self.event_queue = event_queue
-
- self._download_sigs_gen_download_list()
-
- self.downloaded_files = self.download_files(self.to_download,
- self.settings,
- self.event_queue)
-
- self._check_downloads()
-
- if(self.have_sha1sums and self.have_gpg_sigs
- and not self.gpg_sig_ok):
- # GPG signatures failed for sha1sum files. Re-download the
- # sha1sums and GPG signatues. If the GPG signature then
- # matches the sha1sums we will re-download any failing hwpack
- # and OS binary files in the if below.
-
- no_pubkey_search = re.search("\[GNUPG:\] NO_PUBKEY (\S+)",
- self.gpg_out)
- if no_pubkey_search:
- message = ("Package signature check failed.\n"
- "To check package signatures, please import "
- "key {0}")
- # The GPG output we are using gives us the long key format,
- # which doesn't match anything in the key management app
- # that ships with Ubuntu Desktop. The last 8 digits though
- # are the short key, which are what we normally deal with.
- # That is, this seems to be the case. I haven't found any
- # answers after searching around about the long keyID format,
- # but this works for keys I have tested with...
- message = message.format(no_pubkey_search.group(1)[-8:])
- if self.event_queue:
- self.event_queue.put(("message", message))
- else:
- print >> sys.stderr, message
-
- else:
- self._download_sigs_gen_download_list(force_download=True)
- self._check_downloads()
-
- if(self.have_sha1sums and self.have_gpg_sigs
- and not self.gpg_sig_ok):
- # If after re-trying the downloads we still can't get a GPG
- # signature match on a sha1sum file (and both files exist)
- # tell the user.
- message = "Package signature check failed"
- if self.event_queue:
- self.event_queue.put(("message", message))
- self.event_queue.put("abort")
- else:
- print >> sys.stderr, message
-
- return [], False
-
- if(self.have_sha1sums and
- self.gpg_sig_ok or not self.have_gpg_sigs):
- # The GPG signature is OK, so the sha1sums file and GPG sig are
- # both good. OR We have sha1sum files and no GPG sigs. We can
- # Still validate downloads against the sha1sums, just not mark
- # the packages as signed.
-
- to_retry = self._unverified_files()
-
- if len(to_retry):
- if self.event_queue:
- self.event_queue.put(("message", "Retrying bad files"))
- else:
- print "Re-downloading corrupt files"
- # There are some files to re-download
- self.download_files(to_retry,
- self.settings,
- self.event_queue,
- force_download=True)
-
- (self.verified_files, self.gpg_sig_ok,
- self.gpg_out) = utils.verify_file_integrity(self.sig_files)
-
- to_retry = self._unverified_files()
-
- if len(to_retry):
- # We can't get valid package downloads. Either the sha1sums
- # file was incorrectly generated or the download is
- # corrupt. Display a message to the user and quit.
- message = "Download retry failed. Aborting"
- if self.event_queue:
- self.event_queue.put(("message", message))
- self.event_queue.put("abort")
- else:
- print >> sys.stderr, message
-
- return [], False
-
- hwpack = os.path.basename(self.downloaded_files[hwpack_url])
- hwpack_verified = (hwpack in self.verified_files) and self.gpg_sig_ok
-
- return self.downloaded_files, hwpack_verified
-
-
-class FileHandler():
- """Downloads files and creates images from them by calling
- linaro-media-create"""
- def __init__(self):
- self.datadir = os.path.join(xdgBaseDir.xdg_data_home,
- "linaro",
- "image-tools",
- "fetch_image")
-
- self.cachedir = os.path.join(xdgBaseDir.xdg_cache_home,
- "linaro",
- "image-tools",
- "fetch_image")
-
- self.downloader = DownloadManager(self.cachedir)
-
- def has_key_and_evaluates_True(self, dictionary, key):
- return bool(key in dictionary and dictionary[key])
-
- def append_setting_to(self, list, dictionary, key, setting_name=None):
- if not setting_name:
- setting_name = "--" + key
-
- if self.has_key_and_evaluates_True(dictionary, key):
- list.append(setting_name)
- list.append(dictionary[key])
-
- def build_lmc_command(self,
- binary_file,
- hwpack_file,
- settings,
- tools_dir,
- hwpack_verified,
- run_in_gui=False):
-
- import linaro_image_tools.utils
-
- args = ["pkexec"]
-
- # Prefer a local linaro-media-create (from a bzr checkout for instance)
- # to an installed one
- lmc_command = linaro_image_tools.utils.find_command(
- 'linaro-media-create',
- tools_dir)
-
- if lmc_command:
- args.append(os.path.abspath(lmc_command))
- else:
- args.append("linaro-media-create")
-
- if run_in_gui:
- args.append("--nocheck-mmc")
-
- if hwpack_verified:
- # We verify the hwpack before calling linaro-media-create because
- # these tools run linaro-media-create as root and to get the GPG
- # signature verification to work, root would need to have GPG set
- # up with the Canonical Linaro Image Build Automatic Signing Key
- # imported. It seems far more likely that users will import keys
- # as themselves, not root!
- args.append("--hwpack-force-yes")
-
- if 'rootfs' in settings and settings['rootfs']:
- args.append("--rootfs")
- args.append(settings['rootfs'])
-
- assert(self.has_key_and_evaluates_True(settings, 'image_file') ^
- self.has_key_and_evaluates_True(settings, 'mmc')), ("Please "
- "specify either an image file, or an mmc target, not both.")
-
- self.append_setting_to(args, settings, 'mmc')
- self.append_setting_to(args, settings, 'image_file', '--image-file')
- self.append_setting_to(args, settings, 'image_size', '--image-size')
- self.append_setting_to(args, settings, 'swap_size', '--swap-size')
- self.append_setting_to(args, settings, 'swap_file', '--swap-file')
- self.append_setting_to(args, settings, 'yes_to_mmc_selection',
- '--nocheck-mmc')
-
- args.append("--dev")
- args.append(settings['hardware'])
- args.append("--binary")
- args.append(binary_file)
- args.append("--hwpack")
- args.append(hwpack_file)
-
- if not os.path.isdir(self.datadir):
- os.makedirs(self.datadir)
- with open(os.path.join(self.datadir, "linaro-media-create.log"),
- mode='w') as lmc_log:
- lmc_log.write(" ".join(args) + "\n")
-
- logging.info(" ".join(args))
-
- return args
-
- def create_media(self, image_url, hwpack_url, settings, tools_dir):
- """Create a command line for linaro-media-create based on the settings
- provided then run linaro-media-create, either in a separate thread
- (GUI mode) or in the current one (CLI mode)."""
-
- downloaded_files, hwpack_verified = (
- self.downloader.download_files_and_verify(image_url, hwpack_url,
- settings))
-
- if len(downloaded_files) == 0:
- return
-
- lmc_command = self.build_lmc_command(downloaded_files[image_url],
- downloaded_files[hwpack_url],
- settings,
- tools_dir,
- hwpack_verified)
-
- self.create_process = subprocess.Popen(lmc_command)
- self.create_process.wait()
-
- class LinaroMediaCreate(threading.Thread):
- """Thread class for running linaro-media-create"""
- def __init__(self,
- image_url,
- hwpack_url,
- file_handler,
- event_queue,
- settings,
- tools_dir):
-
- threading.Thread.__init__(self)
-
- self.image_url = image_url
- self.hwpack_url = hwpack_url
- self.file_handler = file_handler
- self.downloader = file_handler.downloader
- self.event_queue = event_queue
- self.settings = settings
- self.tools_dir = tools_dir
-
- self.datadir = os.path.join(xdgBaseDir.xdg_data_home,
- "linaro",
- "image-tools",
- "fetch_image")
-
- def run(self):
- """
- 1. Download required files.
- 2. Build linaro-media-create command
- 3. Start linaro-media-create and look for lines in the output that:
- 1. Tell us that an event has happened that we can use to update
- the UI progress.
- 2. Tell us that linaro-media-create is asking a question that
- needs to be re-directed to the GUI
- """
-
- self.event_queue.put(("update",
- "download",
- "message",
- "Downloading"))
-
- files, hwpack_ok = self.downloader.download_files_and_verify(
- self.image_url, self.hwpack_url,
- self.settings, self.event_queue)
-
- if len(files) == 0:
- return
-
- lmc_command = self.file_handler.build_lmc_command(
- files[self.image_url],
- files[self.hwpack_url],
- self.settings,
- self.tools_dir,
- hwpack_ok,
- True)
-
- self.create_process = subprocess.Popen(lmc_command,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
-
- self.line = ""
- self.saved_lines = ""
- self.save_lines = False
- self.state = None
- self.waiting_for_event_response = False
- self.event_queue.put(("start", "unpack"))
-
- lmc_log = open(os.path.join(self.datadir,
- "linaro-media-create.log"), mode='a')
-
- while(1):
- if self.create_process.poll() != None:
- # linaro-media-create has finished. Tell the GUI the return
- # code so it can report pass/fail.
- if self.create_process.returncode:
- self.event_queue.put("abort")
- else:
- self.event_queue.put("terminate")
- return
-
- self.input = self.create_process.stdout.read(1)
-
- # We build up lines by extracting data from linaro-media-create
- # a single character at a time. This is because if we fetch
- # whole lines, using Popen.communicate a character is
- # automatically sent back to the running process, which if the
- # process is waiting for user input, can trigger a default
- # action. By using stdout.read(1) we pick off a character at a
- # time and avoid this problem.
- if self.input == '\n':
- if self.save_lines:
- self.saved_lines += self.line
-
- lmc_log.write(self.line + "\n")
- lmc_log.flush()
- self.line = ""
- else:
- self.line += self.input
-
- if self.line == "Updating apt package lists ...":
- self.event_queue.put(("end", "unpack"))
- self.event_queue.put(("start", "installing packages"))
- self.state = "apt"
-
- elif(self.line ==
- "WARNING: The following packages cannot be authenticated!"):
- self.saved_lines = ""
- self.save_lines = True
-
- elif(self.line ==
- "Install these packages without verification [y/N]? "):
- self.saved_lines = re.sub("WARNING: The following packages"
- " cannot be authenticated!",
- "", self.saved_lines)
- self.event_queue.put(("start",
- "unverified_packages:"
- + self.saved_lines))
- self.line = ""
- self.waiting_for_event_response = True # Wait for restart
-
- elif self.line == "Do you want to continue [Y/n]? ":
- self.send_to_create_process("y")
- self.line = ""
-
- elif self.line == "Done" and self.state == "apt":
- self.state = "create file system"
- self.event_queue.put(("end", "installing packages"))
- self.event_queue.put(("start", "create file system"))
-
- elif( self.line == "Created:"
- and self.state == "create file system"):
- self.event_queue.put(("end", "create file system"))
- self.event_queue.put(("start", "populate file system"))
- self.state = "populate file system"
-
- while self.waiting_for_event_response:
- time.sleep(0.2)
-
- lmc_log.close()
-
- def send_to_create_process(self, text):
- print >> self.create_process.stdin, text
- self.waiting_for_event_response = False
-
-
- def update_files_from_server(self, force_download=False,
- event_queue=None):
-
- settings_url = "http://releases.linaro.org/fetch_image/fetch_image_settings.yaml"
- server_index_url = "http://releases.linaro.org/fetch_image/server_index.bz2"
-
- self.settings_file = self.downloader.download_if_old(settings_url,
- event_queue,
- force_download)
-
- self.index_file = self.downloader.download_if_old(server_index_url,
- event_queue,
- force_download)
-
- zip_search = re.search(r"^(.*)\.bz2$", self.index_file)
-
- if zip_search:
- # index file is compressed, unzip it
- zipped = bz2.BZ2File(self.index_file)
- unzipped = open(zip_search.group(1), "w")
-
- unzipped.write(zipped.read())
-
- zipped.close()
- unzipped.close()
-
- self.index_file = zip_search.group(1)
-
- def clean_cache(self):
- shutil.rmtree(self.cachedir)
-
-
-class FetchImageConfig():
- """Reads settings from the settings YAML file as well as those from the
- command line providing a central settings repository."""
-
- def __init__(self):
- self.settings = {}
-
- def read_config(self, file_name):
- try:
- f = open(file_name, "r")
-
- if(f):
- self.settings = dict(self.settings.items() +
- yaml.load(f.read()).items())
-
- except IOError:
- print "Unable to read settings file %s", file_name
- logging.error("Unable to read settings file %s", file_name)
- sys.exit(0)
-
- self.settings['write_to_file_or_device'] = "file"
-
- # At this point, assume settings have been loaded.
- # We need some reverse mappings set up
- self.settings['UI']['reverse-descriptions'] = {}
- for (key, value) in self.settings['UI']['descriptions'].items():
- if isinstance(value, basestring):
- value = re.sub('LEB:\s*', '', value)
- self.settings['UI']['reverse-descriptions'][value] = key
-
- # If an item doesn't have a translation, just add in a null one
- if not self.settings['UI']['translate']:
- self.settings['UI']['translate'] = {}
-
- for key, value in self.settings['choice']['platform'].items():
- if not key in self.settings['UI']['translate']:
- self.settings['UI']['translate'][key] = key
-
- self.settings['UI']['reverse-translate'] = {}
- for (key, value) in self.settings['UI']['translate'].items():
- self.settings['UI']['reverse-translate'][value] = key
-
- def parse_args(self, args):
- parser = argparse.ArgumentParser(description="Create a board image, "
- "first downloading any required "
- "files.")
-
- for (key, value) in self.settings['choice'].items():
- parser.add_argument(
- "-" + self.settings['UI']['cmdline short names'][key],
- "--" + key,
- help=self.settings['UI']['descriptions']['choice'][key],
- required=True)
-
- parser.add_argument("-x", "--clean-cache",
- help="Delete all cached downloads",
- action='store_true')
- parser.add_argument("-d", "--force-download",
- help="Force re-downloading of cached files",
- action='store_true')
- parser.add_argument(
- "-t", "--image-file",
- help="Where to write image file to (use this XOR mmc)")
- parser.add_argument(
- "-m", "--mmc",
- help="What disk to write image to (use this XOR image-file)")
- parser.add_argument("--swap-file",
- help="Swap file size for created image")
- parser.add_argument("--image-size",
- help="Image file size for created image")
- parser.add_argument("--rootfs",
- help="Root file system type for created image")
-
- self.args = vars(parser.parse_args(args))
-
-
-class DB():
- """Interacts with the database storing URLs of files that are available
- for download to create images with. Provides functions for both creation
- and reading."""
-
- def __init__(self, index_name):
- # http://www.sqlite.org/lang_transaction.html - defer acquiring a locK
- # until it is required.
- self.db_file_name = index_name
- self.database = sqlite3.connect(index_name, isolation_level="DEFERRED")
- self.c = self.database.cursor()
- self.touched_urls = {}
-
- def close(self):
- self.database.close()
- self.database = None
-
- def set_url_parse_info(self, url_parse):
- self.url_parse = url_parse
-
- def record_url(self, url, index):
- """Check to see if the record exists in the index, if not, add it"""
-
- assert self.url_parse[index]["base_url"], ("Can not match the "
- "URL received (%s) to an entry provided by add_url_parse_list",
- url)
- assert re.search('^' + self.url_parse[index]["base_url"], url), (
- "Base url is not part of the url to record.")
-
- logging.info("Recording URL %s %d", url, index)
-
- assert url not in self.touched_urls, ("URLs expected to only be added "
- "to 1 place\n" + url)
-
- self.touched_urls[url] = True
- table = self.url_parse[index]["table"]
-
- # Do not add the record if it already exists
- self.c.execute("select url from " + table + " where url == ?", (url,))
- if self.c.fetchone():
- return
-
- url_match = re.search(self.url_parse[index]["base_url"] + r"(.*)$",
- url)
- url_chunks = url_match.group(1).lstrip('/').encode('ascii').split('/')
- # url_chunks now contains all parts of the url, split on /,
- # not including the base URL
-
- # We now construct an SQL command to insert the index data into the
- # database using the information we have.
-
- sqlcmd = "INSERT INTO " + table + " ("
- length = 0
- for name in (self.url_parse[index]["url_chunks"] + ["url"]):
- if name != "":
- if isinstance(name, tuple):
- name = name[0]
- sqlcmd += name + ", "
- length += 1
-
- # Handle fixed value columns
- for name in self.url_parse[index]["db_columns"]:
- name_search = re.search("(\w+)=(.*)", name)
- if name_search:
- sqlcmd += name_search.group(1) + ", "
- length += 1
-
- sqlcmd = sqlcmd.rstrip(", ") # get rid of unwanted space & comma
- sqlcmd += ") VALUES ("
-
- # Add the appropriate number of ?s (+1 is so we have room for url)
- sqlcmd += "".join(["?, " for x in range(length)])
- sqlcmd = sqlcmd.rstrip(", ") # get rid of unwanted space and comma
- sqlcmd += ")"
-
- # Get the parameters from the URL to record in the SQL database
- sqlparams = []
- chunk_index = 0
- for name in self.url_parse[index]["url_chunks"]:
- # If this part of the URL isn't a parameter, don't insert it
- if name != "":
- # If the entry is a tuple, it indicates it is of the form
- # name, regexp
- if isinstance(name, tuple):
- # use stored regexp to extract data for the database
- match = re.search(name[1], url_chunks[chunk_index])
- assert match, ("Unable to match regexp to string ",
- + url_chunks[chunk_index] + " " + name[1])
- sqlparams.append(match.group(1))
-
- else:
- sqlparams.append(url_chunks[chunk_index])
-
- chunk_index += 1
-
- sqlparams.append(url)
-
- # Handle fixed value columns
- for name in self.url_parse[index]["db_columns"]:
- name_search = re.search("(\w+)=(.*)", name)
- if name_search:
- sqlparams.append(name_search.group(2))
-
- logging.info("{0}: {1}".format(sqlcmd, sqlparams))
- self.c.execute(sqlcmd, tuple(sqlparams))
-
- def commit(self):
- self.database.commit()
-
- def __del__(self):
- if(self.database):
- self.commit()
- self.database.close()
-
- def create_table_with_name_columns(self, table, items):
- cmd = "create table if not exists "
- cmd += table + " ("
-
- # Handle fixed items (kept in because a field can no longer be derived)
- for item in items:
- item = re.sub("=.*", "", item)
- cmd += item + " TEXT, "
-
- cmd += "url TEXT)"
-
- self.execute(cmd)
-
- def execute(self, cmd, params=None):
- self.c = self.database.cursor()
- logging.info(cmd)
- if(params):
- logging.info(params)
- self.c.execute(cmd, params)
- else:
- self.c.execute(cmd)
-
- def execute_return_list(self, cmd, params=None):
- self.execute(cmd, params)
- list = []
- item = self.c.fetchone()
- while item:
- list.append(item)
- item = self.c.fetchone()
-
- return list
-
- def delete_by_url(self, table, url):
- self.execute("delete from " + table + " where url == ?", (url,))
-
- def clean_removed_urls_from_db(self):
- self.c = self.database.cursor()
-
- for info in self.url_parse:
- table = info["table"]
-
- self.c.execute("select url from " + table)
- to_delete = []
-
- while(1):
- url = self.c.fetchone()
- if(url == None):
- break
-
- if(url[0] not in self.touched_urls):
- to_delete.append(url[0])
-
- # We can't delete an item from the database while iterating on the
- # results of a previous query, so we store the URLs of entries to
- # delete, and batch them up
- for url in to_delete:
- self.delete_by_url(table, url)
-
- self.commit()
-
- def get_url(self, table, key_value_pairs, sql_extras=None):
- """Return the first matching URL from the specified table building up a
- SQL query based on the values inkey_value_pairs creating key == value
- sections of the query. sql_extras is appended to the created SQL query,
- so we can get the first entry in an ordered list for instance"""
- self.c = self.database.cursor()
-
- cmd = "select url from " + table + " where "
- params = []
-
- first = True
- for key, value in key_value_pairs:
- if(first == False):
- cmd += " AND "
- else:
- first = False
-
- cmd += key + " == ? "
- params.append(value)
-
- if(sql_extras):
- cmd += " " + sql_extras
-
- self.c.execute(cmd, tuple(params))
- url = self.c.fetchone()
-
- return url
-
- def get_platforms(self, table):
- self.execute("select distinct platform from ?", table)
-
- platforms = []
- platform = self.c.fetchone()
-
- while(platform):
- platforms.append(platform)
- platform = self.c.fetchone()
-
- return platforms
-
- def get_builds(self, platform, image=None):
- """return a complete list of builds available from the releases
- repository"""
-
- build_tuples = self.execute_return_list(
- 'select distinct build from release_binaries '
- 'where platform == "' + platform + '"')
- hwpack_build_tuples = self.execute_return_list(
- 'select distinct build from release_hwpacks'
- ' where platform == "' + platform + '"')
-
- # Need to also check hwpacks and image -> some builds are only
- # Available for a particular image (OS). Guess this makes sense as OS
- # customisations may come and go.
-
- image_builds = [build[0] for build in build_tuples]
- hwpack_builds = [build[0] for build in hwpack_build_tuples]
-
- builds = []
- for build in image_builds:
- if build in hwpack_builds:
- # Only use a build if it exists for the hardware pack as well
- # as the image and platform chosen
- builds.append(build)
-
- builds.sort()
-
- # Just so final is always the last item, if it exists...
- if "final" in builds:
- builds.remove("final")
- builds.append("final")
-
- return builds
-
- def get_hwpacks(self, table, platform=None):
- """Return a list of all the hardware packs available in the specified
- table"""
-
- query = 'select distinct hardware from ' + table
- if platform:
- query += ' where platform == "' + platform + '"'
-
- results = self.execute_return_list(query)
-
- hwpacks = [item[0] for item in results]
- return hwpacks
-
- def get_dates_and_builds_of_snapshots(self):
- """Return all the dates that match an an OS and hardware pack build"""
- # First get all dates from hwpack table
- self.execute("select distinct date from snapshot_hwpacks "
- "order by date")
-
- date_with_hwpack = {}
- date = self.c.fetchone()
-
- while date:
- date_with_hwpack[date] = 1
- date = self.c.fetchone()
-
- # Now make sure there is a file system image for that date as well
- self.execute("select distinct date from snapshot_binaries "
- "order by date")
-
- date = self.c.fetchone()
- while date:
- if date in date_with_hwpack:
- date_with_hwpack[date] = 2
- date = self.c.fetchone()
-
- # Now anything with both a hwpack and a file system has a value of 2
- # recorded in date_with_hwpack[date]
- date_with_build = {}
-
- for key, date in date_with_hwpack.items():
- if(key == 2):
- date_with_build[date] = True
-
- return date_with_build
-
- def get_hardware_from_db(self, table):
- """Get a list of hardware available from the given table"""
- self.execute("select distinct hardware from " + table +
- " order by hardware")
-
- hardware = []
- hw = self.c.fetchone()
- while hw:
- hardware.append(hw[0])
- hw = self.c.fetchone()
-
- return hardware
-
- def image_hardware_combo_available(self,
- release_or_snapshot,
- image,
- hwpacks):
- """Try and find a matching plaform, build pair for both the provided
- image and one of the hwpacks in the list."""
- binary_table = release_or_snapshot + "_binaries"
- hwpack_table = release_or_snapshot + "_hwpacks"
-
- binary_list = self.execute_return_list(
- 'select distinct platform, build from '
- + binary_table + ' where image == ?', (image,))
-
- for hwpack in hwpacks:
- hwpack_list = self.execute_return_list(
- 'select distinct platform, build from '
- + hwpack_table +
- ' where hardware == ?', (hwpack,))
-
- for item in binary_list:
- if item in hwpack_list:
- return True
-
- return False
-
- def hardware_is_available_in_table(self, table, hardware):
- return len(self.execute_return_list(
- 'select url from ' + table +
- ' where hardware == ?', (hardware,))) > 0
-
- def hardware_is_available_for_platform(self,
- hardware_list,
- platform,
- table="release_hwpacks"):
-
- for hardware in self.execute_return_list(
- 'select distinct hardware from ' + table +
- ' where platform == ?', (platform,)):
- if hardware[0] in hardware_list:
- return True
-
- return False
-
- def get_available_hwpacks_for_hardware_build_plaform(self,
- hardware_list,
- platform,
- build):
- hwpacks = []
- for hardware in self.execute_return_list(
- 'select distinct hardware from release_hwpacks '
- 'where platform == ? and build == ?',
- (platform, build)):
-
- if hardware[0] in hardware_list:
- hwpacks.append(hardware[0])
-
- return hwpacks
-
- def image_is_available_for_platform(self,
- table,
- platform,
- image):
-
- return len(self.execute_return_list(
- 'select * from ' + table +
- ' where platform == ? and image == ?',
- (platform, image))) > 0
-
- def hardware_is_available_for_platform_build(self,
- hardware_list,
- platform,
- build):
-
- for hardware in self.execute_return_list(
- 'select distinct hardware from release_hwpacks '
- 'where platform == ? and build == ?',
- (platform, build)):
-
- if hardware[0] in hardware_list:
- return True
-
- return False
-
- def build_is_available_for_platform_image(self,
- table,
- platform,
- image,
- build):
-
- return len(self.execute_return_list(
- 'select * from ' + table +
- ' where platform == ? and image == ? and build == ?',
- (platform, image, build))) > 0
-
- def get_available_hwpacks_for_hardware_snapshot_build(self,
- hardware_list,
- platform,
- date,
- build):
- hwpacks = []
- for hardware in self.execute_return_list(
- 'select distinct hardware from snapshot_hwpacks '
- 'where platform == ? '
- 'and date == ? '
- 'and build == ?',
- (platform, date, build)):
- if hardware[0] in hardware_list:
- hwpacks.append(hardware[0])
-
- return hwpacks
-
- def get_binary_builds_on_day_from_db(self, image, date, hwpacks):
- """Return a list of build numbers that are available on the given date
- for the given image ensuring that the reqiested hardware is also
- available."""
-
- # Remove the dashes from the date we get in (YYYY-MM-DD --> YYYYMMDD)
- date = re.sub('-', '', date)
-
- binaries = self.execute_return_list(
- "select build from snapshot_binaries "
- "where date == ? and image == ?",
- (date, image))
-
- if len(binaries) == 0:
- # Try adding "linaro-" to the beginning of the image name
- binaries = self.execute_return_list(
- "select build from snapshot_binaries "
- "where date == ? and image == ?",
- (date, "linaro-" + image))
-
- for hwpack in hwpacks:
- builds = self.execute_return_list(
- "select build from snapshot_hwpacks "
- "where date == ? and hardware == ?",
- (date, hwpack))
-
- if len(builds):
- # A hardware pack exists for that date, return a list of builds
- # that exist for both hwpack and binary
- ret = []
- for build in builds:
- if build in binaries:
- ret.append(build)
- return ret
-
- # No hardware pack exists for the date requested, return empty table
- return []
-
- def get_next_prev_day_with_builds(self, image, date, hwpacks):
- """Searches forwards and backwards in the database for a date with a
- build. Will look 1 year in each direction. Returns a tuple of dates
- in YYYYMMDD format."""
-
- # Split ISO date into year, month, day
- date_chunks = date.split('-')
-
- current_date = datetime.date(int(date_chunks[0]),
- int(date_chunks[1]),
- int(date_chunks[2]))
-
- one_day = datetime.timedelta(days=1)
-
- # In case of time zone issues we add 1 day to max_search_date
- max_search_date = datetime.date.today() + one_day
-
- day_count = 0
- # Look in the future & past from the given date until we find a day
- # with a build on it
-
- test_date = {'future': current_date,
- 'past': current_date}
-
- for in_the in ["future", "past"]:
- test_date[in_the] = None
-
- if in_the == "future":
- loop_date_increment = one_day
- else:
- loop_date_increment = -one_day
-
- test_date[in_the] = current_date
-
- while test_date[in_the] <= max_search_date:
- test_date[in_the] += loop_date_increment
-
- builds = []
- for hwpack in hwpacks:
- builds = self.get_binary_builds_on_day_from_db(
- image,
- test_date[in_the].isoformat(),
- [hwpack])
- if len(builds):
- break
-
- if len(builds):
- break
-
- day_count += 1
- if day_count > 365:
- test_date[in_the] = None
- break
-
- if test_date[in_the] and test_date[in_the] > max_search_date:
- test_date[in_the] = None
-
- if test_date[in_the]:
- test_date[in_the] = test_date[in_the].isoformat()
-
- return(test_date['future'], test_date['past'])
-
- def get_os_list_from(self, table):
- return [item[0] for item in self.execute_return_list(
- "select distinct image from "
- + table)]
-
- def get_image_and_hwpack_urls(self, args):
- """ We need a platform image and a hardware pack. These are specified
- either as
- Release:
- Image: platform, image, build
- HW Pack: platform, build, hardware
- Snapshot:
- Image: platform, image, date, build
- HW Pack: platform, hardware, date, build"""
- image_url = None
- hwpack_url = None
- old_image = args['image']
- build_bits = None
-
- if(args['release_or_snapshot'] == "snapshot"):
- count = 0
- while(1): # Just so we can try several times...
- if(args['build'] == "latest"):
- # Start from today, add 1 day because
- # get_next_prev_day_with_builds aways searches from the
- # day passed to it, not including that day. This will give
- # us the latest build dated today or earlier with both
- # a hwpack and OS binary
- date = (datetime.date.today() +
- datetime.timedelta(days=1)).isoformat()
-
- _, past_date = self.get_next_prev_day_with_builds(
- args['image'],
- date,
- [args['hwpack']])
-
- builds = self.get_binary_builds_on_day_from_db(
- args['image'], past_date, [args['hwpack']])
-
- # Work out latest build
- build = None
- for b in builds:
- if build == None or b[0] > build:
- build = b[0]
-
- # We store dates in the DB in ISO format with the dashes
- # missing
- date = re.sub('-', '', past_date)
-
- image_url = self.get_url("snapshot_binaries",
- [
- ("image", args['image']),
- ("build", build),
- ("date", date)])
-
- hwpack_url = self.get_url("snapshot_hwpacks",
- [
- ("hardware", args['hwpack']),
- ("build", build),
- ("date", date)])
-
- else:
- build_bits = args['build'].split(":")
- assert re.search("^\d+$", build_bits[0]), (
- "Unexpected date format in build parameter "
- + build_bits[0])
- assert re.search("^\d+$", build_bits[1]), (
- "Build number in shapshot build parameter should "
- "be an integer " + build_bits[1])
-
- image_url = self.get_url("snapshot_binaries",
- [
- ("image", args['image']),
- ("build", build_bits[1]),
- ("date", build_bits[0])])
-
- hwpack_url = self.get_url("snapshot_hwpacks",
- [
- ("hardware", args['hwpack']),
- ("build", build_bits[1]),
- ("date", build_bits[0])])
-
- if(count == 1):
- break
- count += 1
-
- if(image_url == None):
- # If we didn't match anything, try prepending "linaro-"
- # to the image name
- args['image'] = "linaro-" + args['image']
- else:
- break # Got a URL, go.
-
- elif args['release_or_snapshot'] == "release":
- image_url = self.get_url("release_binaries",
- [
- ("image", args['image']),
- ("build", args['build']),
- ("platform", args['platform'])])
-
- hwpack_url = self.get_url("release_hwpacks",
- [
- ("hardware", args['hwpack']),
- ("build", args['build']),
- ("platform", args['platform'])])
-
- else:
- message = "Unexpected args['release_or_snapshot']: {0}".format(
- args['release_or_snapshot'])
- raise AssertionError(message)
-
- if(not image_url):
- # If didn't get an image URL set up something so the return line
- # doesn't crash
- image_url = [None]
-
- if args['release_or_snapshot'] == "snapshot":
- table = "snapshot_binaries"
- else:
- table = "release_binaries"
-
- if not (self.get_url(table, [("image", old_image)]) or
- self.get_url(table, [("image", args['image'])])):
- msg = "{0} does not match any OS indexed".format(old_image)
- logging.error(msg)
-
- if args['release_or_snapshot'] == "snapshot":
- if not self.get_url("snapshot_binaries",
- [("date", build_bits[0])]):
- msg = "Can not find requested OS on date {0}".format(
- build_bits[0])
- logging.error(msg)
-
- elif not self.get_url("snapshot_binaries",
- [("build", build_bits[1])]):
- msg = "Can not find build {0} of requested OS".format(
- build_bits[1])
- logging.error(msg)
-
- else: # Release...
- if not self.get_url("release_binaries",
- [("build", args['build'])]):
- msg = "Can not find build {0} of selected OS".format(
- args['build'])
- logging.error(msg)
-
- if not self.get_url("release_binaries",
- [("platform", args['platform'])]):
- msg = "Can not find OS for platform {0}".format(
- args['platform'])
- logging.error(msg)
-
- if(not hwpack_url):
- # If didn't get a hardware pack URL set up something so the return
- # line doesn't crash
- hwpack_url = [None]
-
- table = None
- if args['release_or_snapshot'] == "snapshot":
- table = "snapshot_hwpacks"
- build = build_bits[1]
-
- elif args['release_or_snapshot'] == "release":
- table = "release_hwpacks"
- build = args['build']
-
- if not self.get_url(table, [("hardware", args['hwpack'])]):
- msg = "{0} does not match any hardware pack indexed".format(
- args['hwpack'])
- logging.error(msg)
-
- if args['release_or_snapshot'] == "snapshot":
- if not self.get_url("snapshot_hwpacks",
- [("date", build_bits[0])]):
- msg = "Hardware pack does not exist on date {0}".format(
- build_bits[0])
- logging.error(msg)
-
- else: # Release...
- if not self.get_url("release_hwpacks",
- [("platform", args['platform'])]):
- msg = "Hardware pack unavailable for platform {0}".format(
- args['platform'])
-
- if not self.get_url(table, [("build", build)]):
- msg = ("Build {0} doesn't exist for any hardware "
- "pack indexed".format(build))
- logging.error(msg)
-
- return(image_url[0], hwpack_url[0])
=== removed file 'linaro_image_tools/fetch_image_settings.yaml'
@@ -1,177 +0,0 @@
-UI:
- descriptions:
- alip: ARM Internet Platform
- choice:
- build: The build (alpha-1 / beta / final) or date of snapshot (YYYYMMDD:build_number or 'latest')
- to fetch.
- hardware: The board you are building the image to run on.
- hwpack: The hardware pack to use when building the image.
- image: A distribuion image, e.g. headless.
- platform: Which Linaro platform to build an image from. Specify 'snapshot' to
- use a snapshot rather than a release.
- developer: Developer Tools
- o-developer: Developer Tools
- graphics: Graphics
- o-graphics: Graphics
- multimedia: Multimedia
- o-multimedia: Multimedia
- nano: Nano
- o-nano: Nano
- ubuntu-desktop: 'LEB: Linaro Ubuntu Desktop'
- o-ubuntu-desktop: 'LEB: Linaro Ubuntu Desktop'
- ubuntu-desktop::long: Linux for humans on low power machines
- ubuntu-desktop::release_note: Shiny!
- o-alip: ARM Linux Internet Platform
- alip: ARM Linux Internet Platform
- server: Server
- o-server: Server
- hwpack-descriptions:
- omap3: Basic support for Beagle boards
- omap3-x11-base: Includes support for 3D acceleration
- overo: Same kernel as omap3.
- panda: Basic support for Panda from linux-linaro-3.0
- panda-x11-base: Basic support for Panda from linux-linaro-3.0 plus support for 3D acceleration
- bsp-omap4: (DEPRECATED) - Used in Maverick release
- lt-panda: TI Landing Team kernel (kernel-tilt.git)
- lt-panda-x11-base-natty: TI Landing Team kernel (kernel-tilt.git) plus support for 3D acceleration
- s5pv310: Support for this has been dropped in order to focus development efforts on Origen
- lt-s5pv310: Support for this has been dropped in order to focus development efforts on Origen
-
- help:
- latest snapshot: Latest Snapshot help text
- release: Release help text
- snapshot: Snapshot help text
- translate:
- cmdline short names:
- build: b
- hardware: w
- platform: a
- image: i
- hwpack: p
-choice:
- build: {}
- hardware:
- beagle: Beagle Board
- igep: IGEPv2
- panda: Panda Board
- vexpress: Versatile Express
- ux500: UX500
- efikamx: EFIKA MX Smarttop
- efikasb: EFIKA MX Smartbook
- mx51evk: i.MX51
- mx53loco: i.MX53 Quick Start
- # I don't have a --dev setting to linaro-media-create for a u8500 yet.
- # u8500: U8500
- overo: Overo
- smdkv310: S5PV310
- origen: Origen
- # UI may need some work to support snowball. Currently under investigation.
- #snowball_sd: Snowball
-
- hwpack:
- beagle:
- - omap3
- - omap3-x11-base
- - omap3-oneiric
- - omap3-x11-base-oneiric
-
- igep:
- - igep
- - igep-oneiric
-
- panda:
- - panda
- - panda-x11-base
- - bsp-omap4
- - lt-panda-x11-base-natty
- - lt-panda
- - panda-oneiric
- - panda-x11-base-oneiric
- - lt-panda-x11-base-oneiric
- - lt-panda-oneiric
-
- vexpress:
- - vexpress
- - vexpress-oneiric
- - lt-vexpress-a9-oneiric
-
- ux500:
- - bsp-ux500
-
- efikamx:
- - efikamx
- - efikamx-oneiric
-
- efikasb:
- - efikamx
- - efikamx-oneiric
-
- mx51evk:
- - imx51
- - imx51-oneiric
-
- mx53loco:
- - lt-mx5
- - lt-mx5-oneiric
-
- u8500:
- - lt-u8500
-
- overo:
- - overo
- - overo-oneiric
-
- smdkv310:
- - lt-s5pv310
- - s5pv310
-
- origen:
- - lt-origen
- - lt-origen-oneiric
-
- snowball_sd:
- - lt-snowball
- - lt-snowball-v2
- - lt-snowball-v3-oneiric
- - lt-snowball-v2-oneiric
-
- image:
- - alip
- - developer
- - headless
- - ubuntu-desktop
- platform:
- linaro-m:
- - final
- linaro-n:
- - alpha-1
- - alpha-2
- - alpha-3
- - beta
- - rc
- - final
-
- platform:
- '11.05':
- - final
- '11.06':
- - final
- '11.07':
- - final
- '11.08':
- - final
- '11.09':
- - final
- '11.10':
- - final
- '11.11':
- - final
- '11.12':
- - final
- '12.01':
- - final
-
-platform: linaro-n
-repository: release
-
-
=== modified file 'linaro_image_tools/tests/__init__.py'
@@ -11,7 +11,6 @@
module_names = [
'linaro_image_tools.tests.test_cmd_runner',
'linaro_image_tools.tests.test_utils',
- 'linaro_image_tools.tests.test_fetch_image',
]
# if pyflakes is installed and we're running from a bzr checkout...
if has_command('pyflakes') and not os.path.isabs(__file__):
=== removed file 'linaro_image_tools/tests/test_fetch_image.py'
@@ -1,196 +0,0 @@
-# Copyright (C) 2010, 2011 Linaro
-#
-# Author: James Tunnicliffe <james.tunnicliffe@linaro.org>
-#
-# This file is part of Linaro Image Tools.
-#
-# Linaro Image Tools is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Linaro Image Tools is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
-from linaro_image_tools.testing import TestCaseWithFixtures
-import re
-import linaro_image_tools.fetch_image as fetch_image
-
-
-class TestURLLookupFunctions(TestCaseWithFixtures):
-
- def setUp(self):
- # We use local files for testing, so get paths sorted.
- this_file = os.path.abspath(__file__)
- this_dir = os.path.dirname(this_file)
- yaml_file_location = os.path.join(this_dir, "../"
- "fetch_image_settings.yaml")
- sample_db_location = os.path.join(this_dir, "test_server_index.sqlite")
- self.file_handler = fetch_image.FileHandler()
- self.config = fetch_image.FetchImageConfig()
- self.config.settings["force_download"] = False
-
- # Load settings YAML, which defines the parameters we ask for and
- # acceptable responses from the user
- self.config.read_config(yaml_file_location)
-
- # Using the config we have, look up URLs to download data from in the
- # server index
- self.db = fetch_image.DB(sample_db_location)
-
- super(TestURLLookupFunctions, self).setUp()
-
- def test_url_lookup(self):
- self.settings = self.config.settings
- self.settings['release_or_snapshot'] = "snapshot"
-
- #--- Test first with a snapshot build lookup ---
- # -- Fix a build date --
- # We only need to look up a single snapshot date. We just use the
- # latest in the database (we could use today and search from it, but
- # the database is just one that is checked in, so it could be old
- # and db.get_next_prev_day_with_builds may give up before finding it).
- date = self.db.execute_return_list(
- "SELECT MAX(date) FROM snapshot_binaries")[0][0]
- d = re.search("(\d{4})(\d{2})(\d{2})", date)
- date = (d.group(1) + "-" + d.group(2) + "-" + d.group(3))
-
- # -- Don't iterate through platforms for snapshot --
-
- # -- Select hardware --
- for self.settings['hardware'] in (
- self.settings['choice']['hardware'].keys()):
-
- compatable_hwpacks = self.settings['choice']['hwpack'][
- self.settings['hardware']]
-
- future_date, past_date = self.db.get_next_prev_day_with_builds(
- "linaro-alip",
- date,
- compatable_hwpacks)
-
- if past_date == None:
- # Some hardware packs are not available in the snapshot repo,
- # so just skip if they aren't
- continue
-
- builds = self.db.get_binary_builds_on_day_from_db(
- "linaro-alip",
- past_date,
- compatable_hwpacks)
-
- self.assertTrue(len(builds))
- # If the above assert fails, either the DB is empty, or
- # db.get_binary_builds_on_day_from_db failed
-
- small_date = re.sub('-', '', past_date)
- self.settings['build'] = small_date + ":" + "0"
-
- # -- Iterate through hardware packs --
- for self.settings['hwpack'] in compatable_hwpacks:
-
- # If hardware pack is available...
- if(self.settings['hwpack']
- in self.db.get_hwpacks('snapshot_hwpacks')):
-
- # -- Iterate through images
- os_list = self.db.get_os_list_from('snapshot_binaries')
-
- for self.settings['image'] in os_list:
- if re.search('old', self.settings['image']):
- # Directories with old in the name are of no
- # interest to us
- continue
-
- # -- Check build which matches these parameters
- # (builds that don't match are excluded in UI) --
- if( len(self.db.execute_return_list(
- 'select * from snapshot_hwpacks '
- 'where hardware == ? '
- 'and date == ? '
- 'and build == ?',
- (self.settings['hwpack'],
- small_date,
- "0")))
- and len(self.db.execute_return_list(
- 'select * from snapshot_binaries '
- 'where image == ? '
- 'and date == ? '
- 'and build == ?',
- (self.settings['image'],
- small_date,
- "0")))):
-
- # - Run the function under test! -
- image_url, hwpack_url = (
- self.db.get_image_and_hwpack_urls(self.settings))
-
- self.assertTrue(image_url)
- self.assertTrue(hwpack_url)
-
- #--- Now test release build lookup ---
- self.settings['release_or_snapshot'] = "release"
- # -- Select hardware --
- for self.settings['hardware'] in (
- self.settings['choice']['hardware'].keys()):
- compatable_hwpacks = (
- self.settings['choice']['hwpack'][self.settings['hardware']])
-
- # -- Iterate through hardware packs --
- for self.settings['hwpack'] in compatable_hwpacks:
-
- # If hardware pack is available...
- if(self.settings['hwpack']
- in self.db.get_hwpacks('release_hwpacks')):
-
- # -- Iterate through images
- os_list = self.db.get_os_list_from('release_binaries')
-
- for self.settings['image'] in os_list:
- if re.search('old', self.settings['image']):
- # Directories with old in the name are of no
- # interest to us
- continue
-
- for platform, ignore in (
- self.settings['choice']['platform'].items()):
- self.settings['platform'] = platform
-
- # -- Iterate through available builds --
- builds = self.db.get_builds(
- self.settings['platform'],
- self.settings['image'])
-
- for build in builds:
- self.settings['build'] = build
-
- # -- Check build which matches these parameters
- #(builds that don't match are excluded in UI)--
- if( len(self.db.execute_return_list(
- 'select * from release_hwpacks '
- 'where platform == ? '
- 'and hardware == ? '
- 'and build == ?',
- (self.settings['platform'],
- self.settings['hwpack'],
- self.settings['build'])))
- and len(self.db.execute_return_list(
- 'select * from release_binaries '
- 'where platform == ? '
- 'and image == ? '
- 'and build == ?',
- (self.settings['platform'],
- self.settings['image'],
- self.settings['build'])))):
-
- # - Run the function under test! -
- image_url, hwpack_url = (
- self.db.get_image_and_hwpack_urls(
- self.settings))
- self.assertTrue(image_url)
- self.assertTrue(hwpack_url)
=== removed file 'linaro_image_tools/tests/test_server_index.sqlite'
Binary files linaro_image_tools/tests/test_server_index.sqlite 2011-08-11 16:18:34 +0000 and linaro_image_tools/tests/test_server_index.sqlite 1970-01-01 00:00:00 +0000 differ