From 622da8d1c381ac634635d0905679d6f7641698eb Mon Sep 17 00:00:00 2001 From: "Ryan S. Arnold" Date: Wed, 17 Aug 2016 11:56:01 -0500 Subject: Re-organize python code to use a directory module layout --- clone.py | 312 --------------------- custom_wordwrap.py | 31 --- gccclone.py | 27 -- gitrepo.py | 234 ---------------- handle_exit.py | 157 ----------- linaropy/__init__.py | 1 + linaropy/git/__init__.py | 0 linaropy/git/clone.py | 312 +++++++++++++++++++++ linaropy/git/gitrepo.py | 234 ++++++++++++++++ linaropy/git/workdir.py | 190 +++++++++++++ linaropy/handle_exit.py | 157 +++++++++++ linaropy/proj.py | 116 ++++++++ linaropy/rn/__init__.py | 0 linaropy/rn/custom_wordwrap.py | 31 +++ linaropy/rn/gccclone.py | 27 ++ linaropy/rn/linaroseries.py | 361 ++++++++++++++++++++++++ linaropy/rn/rngen.py | 303 ++++++++++++++++++++ linaropy/rn/rnseries.py | 193 +++++++++++++ linaropy/rn/template.py | 94 +++++++ linaropy/rninput.py | 35 +++ linaropy/series.py | 550 ++++++++++++++++++++++++++++++++++++ linaropy/vers.py | 615 +++++++++++++++++++++++++++++++++++++++++ linaroseries.py | 362 ------------------------ proj.py | 117 -------- rn.py | 22 +- rngen.py | 303 -------------------- rninput.py | 35 --- rnseries.py | 193 ------------- series.py | 550 ------------------------------------ template.py | 94 ------- testrn.py | 24 ++ vers.py | 615 ----------------------------------------- workdir.py | 190 ------------- 33 files changed, 3254 insertions(+), 3231 deletions(-) delete mode 100644 clone.py delete mode 100644 custom_wordwrap.py delete mode 100644 gccclone.py delete mode 100644 gitrepo.py delete mode 100644 handle_exit.py create mode 100644 linaropy/__init__.py create mode 100644 linaropy/git/__init__.py create mode 100644 linaropy/git/clone.py create mode 100644 linaropy/git/gitrepo.py create mode 100644 linaropy/git/workdir.py create mode 100644 linaropy/handle_exit.py create mode 100644 linaropy/proj.py create mode 100644 linaropy/rn/__init__.py create mode 100644 linaropy/rn/custom_wordwrap.py create mode 100644 linaropy/rn/gccclone.py create mode 100644 linaropy/rn/linaroseries.py create mode 100644 linaropy/rn/rngen.py create mode 100644 linaropy/rn/rnseries.py create mode 100644 linaropy/rn/template.py create mode 100644 linaropy/rninput.py create mode 100644 linaropy/series.py create mode 100644 linaropy/vers.py delete mode 100644 linaroseries.py delete mode 100644 proj.py delete mode 100644 rngen.py delete mode 100644 rninput.py delete mode 100644 rnseries.py delete mode 100644 series.py delete mode 100644 template.py create mode 100644 testrn.py delete mode 100644 vers.py delete mode 100644 workdir.py diff --git a/clone.py b/clone.py deleted file mode 100644 index a3544bf..0000000 --- a/clone.py +++ /dev/null @@ -1,312 +0,0 @@ -import unittest -import logging -import os - -from sh import git -from sh import ln -from sh import mkdir -from sh import ErrorReturnCode -from sh import chmod - -# In order to set directory read/write permissions for test cases -from stat import * - -from proj import Proj -from gitrepo import cd -from gitrepo import GitRepo - -import time - -class Clone(GitRepo): - def __init__(self, proj, clonedir=None, remote=None ): - super(Clone,self).__init__(proj) - - # If the user passes a clonedir then they want to reuse an existing - # clone. This will often be the case with large repositories like - # gcc, which take 20 minutes to clone. - - # Clone a new repo from the remote - if clonedir is None or (isinstance(clonedir, basestring) and clonedir == ""): - if remote=="" or remote is None: - raise TypeError('Clone input parameter \'remote\' can not be empty') - - # TODO test that remote is a well formed URL? - # TODO if there's a trailing / strip it. - self.remote=remote - - # Strip the end off the repo url to find the clonedir name. - clone_dirname=remote.rsplit('/', 1)[-1] - - logging.info('Cloning %s repository into %s/%s' % (self.remote, self.proj.projdir, clone_dirname)) - - self.repodir=self.proj.projdir + '/' + clone_dirname - try: - with cd(self.proj.projdir): - git("clone", self.remote, clone_dirname, _err_to_out=True, _out="/dev/null") - except ErrorReturnCode: - raise EnvironmentError("Git unable to clone into " + self.repodir) - - - # Reuse an existing git clone directory - else: - if not isinstance(clonedir, basestring): - raise TypeError('clonedir must be of type basestring') - elif not os.path.isdir(clonedir): - raise EnvironmentError("Specified clonedir directory '%s' does not exist" % clonedir) - - # Check to see if clonedir is a git repository. - with cd(clonedir): - # There might be a file called .git - if not os.path.isdir(clonedir + '/.git'): - try: - git("rev-parse", _err_to_out=True, _out="/dev/null") - except ErrorReturnCode: - raise EnvironmentError('Specified clonedir is not a git repository.') - - # Strip off the directory name - clone_dirname=clonedir.rsplit('/', 1)[-1] - self.repodir=self.proj.projdir + '/' + clone_dirname - - # TODO: verify that we don't get another clone dir in the projdir. - # It's possible the clonedir is already in the projdir. - if os.path.exists(self.repodir): - logging.info('The clonedir %s already exists in %s' % (clonedir, self.repodir)) - else: - logging.info('Reusing existing clonedir %s and symlinking as %s in %s' % (clonedir, self.repodir, self.proj.projdir)) - try: - ln("-s", clonedir, self.repodir) - except ErrorReturnCode: - raise EnvironmentError("Can't create symlink %s to %s." % (clonedir, self.repodir)) - - logging.info('Clone directory set as %s' % (self.repodir)) - return - - def clonedir(self): - return self.repodir - - def __repr__(self): - return "" - -class TestClone(unittest.TestCase): - testdirprefix="CloneUT" - - @classmethod - def setUpClass(cls): - # We need a Proj dir to store the mother clone in. - cls.proj=Proj(prefix=TestClone.testdirprefix) - - # Setup a single remote clone in a Proj directory and then all of the - # other tests in this module should clone from this resulting local - # clone as if it were a remote. This will reduce the network burden on - # the remote git server being tested. If this fails then the entire - # TestClone will be marked as not executed. - - #TODO: Perform a connectivity test to determine online or offline - # testing mode. - - cls.rnclone=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') - #OFFLINE: cls.rnclone=Clone(cls.proj,remote=u'/var/run/media/ryanarn/sidecar/reldir/release-notes') - cls.startbranch=cls.rnclone.getbranch() - - cls.rtremote=u'http://git.linaro.org/toolchain/tcwg-release-tools.git' - #OFFLINE: cls.rtremote=u'/var/run/media/ryanarn/sidecar/releases/tcwg-release-tools' - - # All further tests will use this as the remote. - cls.rnremote=cls.rnclone.repodir - - @classmethod - def tearDownClass(cls): - cls.proj.cleanup() - - # Every instance of TestClass requires its own proj directory. - def setUp(self): - # TODO: Is this redundant with the setUpClass()? - self.proj=Proj(prefix=TestClone.testdirprefix) - - def tearDown(self): - # TODO: Is this redundant with the tearDownClass()? - self.proj.cleanup() - - # These immediate tests clone from a remote repository - def test_clone(self): - self.clone=Clone(self.proj,remote=TestClone.rnremote) - self.assertTrue(os.path.isdir(self.clone.clonedir())) - - def test_two_different_clones_in_one_projdir(self): - self.clonern=Clone(self.proj,remote=TestClone.rnremote) - self.clonert=Clone(self.proj,remote=TestClone.rtremote) - self.assertTrue(os.path.isdir(self.clonern.clonedir()) and os.path.isdir(self.clonert.clonedir())) - - # All clones after this point should use TestClone.rnremote as the remote. - - def test_str_function(self): - self.clone=Clone(self.proj,remote=TestClone.rnremote) - self.assertEqual(str(self.clone), self.clone.clonedir()) - - def test_empty_string_clonedir_with_remote(self): - # This will use the remote because clonedir is an empty string. - self.clone=Clone(self.proj,remote=TestClone.rnremote, clonedir="") - self.assertTrue(os.path.isdir(self.clone.clonedir())) - - def test_none_clonedir_with_remote(self): - # This will use the remote because there is no clonedir - self.clone=Clone(self.proj,remote=TestClone.rnremote) - self.assertTrue(os.path.isdir(self.clone.clonedir())) - - def test_empty_remote_empty_clonedir(self): - # A bogus directory will fail. - with self.assertRaises(TypeError): - self.clone=Clone(self.proj,remote="", clonedir="") - - def test_non_string_clonedir(self): - self.foo=Clone(self.proj,remote=TestClone.rnremote, clonedir="") - with self.assertRaises(TypeError): - self.clone=Clone(self.proj, clonedir=self.foo) - - def test_no_string_clonedir_empty_remote(self): - # remote can't be be empty if clonedir is not set. - with self.assertRaises(TypeError): - self.clone=Clone(self.proj,remote="") - - def test_existing_clonedir(self): - # Clone's clonedir parameter allows a user to specify an existing - # directory use for the clone. It will setup a symlink in the proj - # directory pointing to the tree. This is a useful feature for - # very large repositories. - - # Use the clonedir from TestClone.rnclone - self.clone=Clone(self.proj,clonedir=TestClone.rnclone.clonedir()) - - # Verify that a symlink is used for the clone. - self.assertTrue(os.path.islink(self.clone.clonedir())) - - def test_failed_symlink(self): - # We'll make the projdir read-only so we can force symlink creation - # to fail. - with cd(self.proj.projdir): - # Set to read-only to force symlink to fail. - os.chmod(self.proj.projdir, S_IRUSR | S_IXUSR) - - # Use the clonedir from TestClone.rnclone - with self.assertRaises(EnvironmentError): - # Verify that this throws an exception. - self.clone=Clone(self.proj,clonedir=TestClone.rnclone.clonedir()) - - with cd(self.proj.projdir): - # Reset to writable so we can remove the projdir - os.chmod(self.proj.projdir, S_IWUSR | S_IXUSR) - - def test_clone_failure_due_to_write_permissions(self): - with cd(self.proj.projdir): - # Set to read-only to force the git clone to fail. - os.chmod(self.proj.projdir, S_IRUSR | S_IXUSR) - - with self.assertRaises(EnvironmentError): - self.clone=Clone(self.proj,remote=TestClone.rnremote) - - with cd(self.proj.projdir): - # Reset to writable so we can remove the projdir - os.chmod(self.proj.projdir, S_IWUSR | S_IXUSR) - - def test_nonexistant_clonedir(self): - with self.assertRaises(EnvironmentError): - self.clone_one=Clone(self.proj, clonedir="/a/nonexistant/directory/") - - def test_not_gitdir(self): - # Verify that an exception is raised when attempting to clone something - # that's not a git repository. - nongitdir=self.proj.projdir + "/nongitdir" - with cd (self.proj.projdir): - os.mkdir(nongitdir) - - with self.assertRaises(EnvironmentError): - self.clone=Clone(self.proj,remote="", clonedir=nongitdir) - - def test_not_gitdir_but_with_dot_git(self): - # Verify that an exception is raised when attempting to clone a - # directory that has a .git file but is NOT a git repository. - nongitdir=self.proj.projdir + "/nongitdir" - with cd (self.proj.projdir): - os.mkdir(nongitdir) - # Create and empty .git file. - with open(".git", 'a'): - os.utime(".git", None) - - with self.assertRaises(EnvironmentError): - self.clone=Clone(self.proj,remote="", clonedir=nongitdir) - - def test_incorrect_input_proj_type(self): - # Clone requires a valid Proj type as an input parameter. Verify that - # it throws an exception. - proj='stringtypeonpurpose' - with self.assertRaises(TypeError): - self.clone=Clone(proj,remote=TestClone.rnremote) - - def test_non_destructive_cleanup(self): - # Make sure that the file removal method in proj.cleanup() doesn't - # follow and remove symlinks. - self.clone_one=Clone(self.proj,remote=TestClone.rnremote) - self.proj_two=Proj(prefix=TestClone.testdirprefix) - # Use the clonedir from the first project as the clonedir for the - # second project. - self.clone_two=Clone(self.proj_two,clonedir=self.clone_one.clonedir()) - self.assertTrue(os.path.islink(self.clone_two.clonedir())) - - # Cleanup proj_two - self.proj_two.cleanup() - - # Make sure clone_one.clonedir wasn't removed when proj_two was removed - # which is a concern since proj_two used a clone directory from - # proj. - self.assertTrue(os.path.isdir(self.clone_one.clonedir())) - - def test_get_branch_clone(self): - self.clone=Clone(self.proj,remote=TestClone.rnremote) - self.assertEqual(TestClone.startbranch, self.clone.getbranch()) - - def test_checkoutbranch_clone(self): - self.clone=Clone(self.proj,remote=TestClone.rnremote) - - # Test a new branch - with self.clone.checkoutbranch("TestClone", "origin/HEAD"): - self.assertEqual("TestClone", self.clone.getbranch()) - - self.assertEqual(TestClone.startbranch, self.clone.getbranch()) - - # Test being able to switch to a branch that already exists. - with self.clone.checkoutbranch("TestClone"): - self.assertEqual("TestClone", self.clone.getbranch()) - - self.assertEqual(TestClone.startbranch, self.clone.getbranch()) - - #TODO test a non-default 'track' passed to checkoutbranch. - - def test_checkoutbranch_clone_with_local_changes(self): - self.clone=Clone(self.proj,remote=TestClone.rnremote) - - # create an empty file and add it to the repository. - with cd (self.clone.repodir): - # Create and empty file. - with open("checkouttest", 'a'): - os.utime("checkouttest", None) - - self.clone.add(self.clone.repodir + '/checkouttest') - - with self.clone.checkoutbranch("TestClone", "origin/HEAD"): - self.assertEqual("TestClone", self.clone.getbranch()) - - # Verify there's no file name 'checkouttest' when we switch branch. - with cd (self.clone.repodir): - self.assertFalse(os.path.isfile(self.clone.repodir + '/checkouttest')) - - # Verify that we've returned to the correct branch. - self.assertEqual(TestClone.startbranch, self.clone.getbranch()) - - # Show that checkouttest does exist once were out of the - # checkoutbranch context. - with cd (self.clone.repodir): - self.assertTrue(os.path.isfile(self.clone.repodir + '/checkouttest')) - -if __name__ == '__main__': - #logging.basicConfig(level="INFO") - unittest.main() diff --git a/custom_wordwrap.py b/custom_wordwrap.py deleted file mode 100644 index 4ed3e25..0000000 --- a/custom_wordwrap.py +++ /dev/null @@ -1,31 +0,0 @@ -#from jinja2.utils import Markup, escape, pformat, urlize, soft_unicode, \ -# unicode_urlencode -from jinja2.runtime import Undefined -#from jinja2.exceptions import FilterArgumentError -#from jinja2._compat import imap, string_types, text_type, iteritems - -def environmentfilter(f): - """Decorator for marking environment dependent filters. The current - :class:`Environment` is passed to the filter as first argument. - """ - f.environmentfilter = True - return f - -@environmentfilter -def do_customwordwrap(environment, s, width=79, break_long_words=True, - wrapstring=None, break_on_hyphens=False): - """ - Return a copy of the string passed to the filter wrapped after - ``79`` characters. You can override this default using the first - parameter. If you set the second parameter to `false` Jinja will not - split words apart if they are longer than `width`. By default, the newlines - will be the default newlines for the environment, but this can be changed - using the wrapstring keyword argument. - """ - if not wrapstring: - wrapstring = environment.newline_sequence - import textwrap - return wrapstring.join(textwrap.wrap(s, width=width, expand_tabs=False, - replace_whitespace=False, - break_long_words=break_long_words, - break_on_hyphens=break_on_hyphens)) diff --git a/gccclone.py b/gccclone.py deleted file mode 100644 index dcb8b71..0000000 --- a/gccclone.py +++ /dev/null @@ -1,27 +0,0 @@ - -from clone import Clone -from sh import git -from gitrepo import cd - -class GCCClone(Clone): - def __init__(self, proj, clonedir=None, remote=None ): - super(GCCClone,self).__init__(proj, clonedir, remote) - - with cd(clonedir): - try: - with open('gcc/BASE-VER', 'r') as f: - self.basever = f.readline().strip() - except: - raise IOError('gcc/BASE-VER not found in ' + clonedir) - - log=git("log", "-n 1", "--grep=Merge branch") - for logline in log: - if logline.lstrip().startswith("Merge"): - # For some reason rsplit('.') isn't working. - self.fsfrev=logline.lstrip().rsplit(None,1)[1].split('.',1)[0] - - def get_base_version(self): - return self.basever - - def get_fsf_revision(self): - return self.fsfrev diff --git a/gitrepo.py b/gitrepo.py deleted file mode 100644 index 06a79de..0000000 --- a/gitrepo.py +++ /dev/null @@ -1,234 +0,0 @@ -import unittest -import logging -import os - -from sh import git -from sh import ErrorReturnCode -from sh import ErrorReturnCode_128 - -import subprocess - -# Use this to manage 'cd' context -from contextlib import contextmanager - -from proj import Proj -# Return to prevdir when context is exited. -# Use in the following manner: -# -# with cd("foodir"): -# operation1_in_foodir -# operation2_in_foodir -# -# operation3_in_prevdir -# -@contextmanager -def cd(newdir): - prevdir = os.getcwd() - os.chdir(os.path.expanduser(newdir)) - try: - yield - finally: - os.chdir(prevdir) - -# This is called 'GitRepo' because a repository for a project doesn't only have -# to be a git repository. Abstract base class used to define the operations -# that both Clone and Workdir share. Notice that there is no actual repodir -# specified here. You must instantiate a subclass in order to use/test these -# operations. This is because 'clones' and 'workdirs' are created differently -# but you operate on them in the exact same way. -class GitRepo(object): - def __init__(self, proj): - self.repodir=u"" - if isinstance(proj, Proj): - self.proj=proj - else: - raise TypeError('proj input parameter is not of type Proj') - - def branchexists(self, branch): - with cd(self.repodir): - try: - # Quote the branchname because it might have strange - # characters in it. - br="%s" % branch - git("rev-parse", "--verify", br) - except ErrorReturnCode_128: - return False - return True - - def getbranch(self): - try: - with cd(self.repodir): - branch=git("rev-parse", "--abbrev-ref", "HEAD").stdout.rstrip() - except ErrorReturnCode: - raise EnvironmentError("Unable to get the current branch") - return branch - - # TODO: Fully test this. - # Stash current changes and change to new branch. - # Return to previous branch when context is exited. - @contextmanager - def checkoutbranch(self, branch, track=""): - with cd(self.repodir): - # The stashid will remain empty if there were no changes in the - # directory to stash. - stashid=u"" - - try: - self.prevbranch=git("rev-parse", "--abbrev-ref", "HEAD").stdout.rstrip() - except ErrorReturnCode: - raise EnvironmentError("Unable to get the current branch") - - try: - # TODO: do we want to do a blanket git add *? - stashid=git("stash", "create").stdout.rstrip() - # If there's no stash then there's no reason to save. - if stashid!="": - git("stash", "save") - except ErrorReturnCode: - raise EnvironmentError("Unable to get the current branch") - - try: - # TODO This is a nop as it is. - git("rev-parse", "--verify", branch) - # if the branch exists the previous statement won't cause an - # exception so we know we can just check it out. - git("checkout", branch) - except ErrorReturnCode: - try: - # if it doesn't exist we need to checkout a new branch - git("checkout", "-b", branch, track).stdout.rstrip() - except ErrorReturnCode as exc: - raise EnvironmentError("Unable to checkout -b %s %s" % (branch, track)) - try: - yield - finally: - try: - with cd(self.repodir): - git("checkout", self.prevbranch) - # If there's no stashid then there were no files stashed, - # so don't stash apply. - if stashid!="": - git("stash", "apply", stashid) - except ErrorReturnCode, exc: - raise EnvironmentError("Unable to return to the previous branch: %s" % exc.stderr) - - # TODO: Write a unit test for this. - def add(self, filetogitadd): - # TODO: Determine if filetogitadd is relative or absolute. - # TODO: verify that filetogitadd is in self.repodir - try: - with cd(self.repodir): - git("add", filetogitadd) - except ErrorReturnCode: - raise EnvironmentError("Unable to git add " + filetogitadd) - - # TODO: Write a unit test for this. - # TODO: fix the default - def commit(self, message="default"): - print "Committing changes." - try: - with cd(self.repodir): - # Git commit first with a boiler plate message and then allow the user - # to amend. - if git("status", "--porcelain"): - # print git("commit", "-m", '%s' % message, _err_to_out=True) - subprocess.call(["git", "commit", "-m", message]) - # using python sh will suppress the git editor - subprocess.call(["git", "commit", "--amend"]) - except ErrorReturnCode: - raise EnvironmentError("Unable to git commit ") - - def log(self, number): - try: - with cd(self.repodir): - print git("log", "-n %d" % number) - except ErrorReturnCode: - raise EnvironmentError("Unable to git add " + filetogitadd) - - # TODO: Does this need to 'cd' first? - def editFile(self, toedit): - editor = os.getenv('EDITOR') - if not editor: - editor='/usr/bin/vim' - os.system(editor + ' ' + self.repodir + '/' + toedit) - - self.add(toedit) - - return - - # TODO: Test this function with both dryrun=True and dryrun=False - def pushToBranch(self, tobranch, dryrun=True): - try: - # TODO: Check for un-merged commits. - with cd(self.repodir): - branch=self.getbranch() - if dryrun: - git("push", "--dry-run", branch, tobranch) - else: - git("push", branch, tobranch) - except ErrorReturnCode: - raise EnvironmentError("Unable to push branch %s to %s" % (branch, tobranch)) - - def __str__(self): - return self.repodir - -# TestGitRepo is an abstract baseclass. Verify that the constructor -# will throw an exception if you try to run operations without -# defining a subclass. -class TestGitRepo(unittest.TestCase): - repoprefix="GitRepoUT" - - # This class is only used to test the GitRepo functions since, as an, - # abstract-baseclass, GitRepo doesn't define repodir, and we need an - # actual repository to test git based functions. - class DerivedGitRepo(GitRepo): - def __init__(self, proj): - super(TestGitRepo.DerivedGitRepo,self).__init__(proj) - self.remote=u'http://git.linaro.org/toolchain/release-notes.git' - dgr_dirname="release-notes" - self.repodir=self.proj.projdir + "/" + dgr_dirname - try: - with cd(self.proj.projdir): - git("clone", self.remote, dgr_dirname, _err_to_out=True, _out="/dev/null") - except ErrorReturnCode: - raise EnvironmentError("Git unable to clone into " + self.repodir) - - @classmethod - def setUpClass(cls): - cls.proj=Proj(prefix=TestGitRepo.repoprefix, persist=False) - cls.dgr=TestGitRepo.DerivedGitRepo(cls.proj) - - @classmethod - def tearDownClass(cls): - cls.proj.cleanup() - - # Test that the abstract baseclass throws an exception if one of the - # functions are called from the baseclass without a derived class. - def test_repo(self): - self.repo=GitRepo(self.proj) - with self.assertRaises(OSError): - # We haven't set a repodir so this should throw an exception. - branch=self.repo.getbranch() - - # TODO: Create a test-branch in the repo so it's always there. - - def test_branchexists(self): - self.assertTrue(self.dgr.branchexists("remotes/origin/releases/linaro-4.9-2015.05")) - - def test_not_branchexists(self): - self.assertFalse(self.dgr.branchexists("foobar")) - - def test_getbranch(self): - with cd(self.dgr.repodir): - try: - git("checkout", "-b", "master_test", "origin/master").stdout.rstrip() - except ErrorReturnCode as exc: - raise EnvironmentError("Unable to checkout -b %s %s" % (branch, track)) - self.assertEquals(self.dgr.getbranch(), "master_test") - - # TODO: Test checkoutbranch with various combinations of polluted - # directories. - -if __name__ == '__main__': - #logging.basicConfig(level="INFO") - unittest.main() diff --git a/handle_exit.py b/handle_exit.py deleted file mode 100644 index 7eb4ae1..0000000 --- a/handle_exit.py +++ /dev/null @@ -1,157 +0,0 @@ -# Author: Giampaolo Rodola' -# License: MIT - -from __future__ import with_statement -import contextlib -import signal -import sys - - -def _sigterm_handler(signum, frame): - sys.exit(0) -_sigterm_handler.__enter_ctx__ = False - - -@contextlib.contextmanager -def handle_exit(callback=None, append=False): - """A context manager which properly handles SIGTERM and SIGINT - (KeyboardInterrupt) signals, registering a function which is - guaranteed to be called after signals are received. - Also, it makes sure to execute previously registered signal - handlers as well (if any). - - >>> app = App() - >>> with handle_exit(app.stop): - ... app.start() - ... - >>> - - If append == False raise RuntimeError if there's already a handler - registered for SIGTERM, otherwise both new and old handlers are - executed in this order. - """ - old_handler = signal.signal(signal.SIGTERM, _sigterm_handler) - if (old_handler != signal.SIG_DFL) and (old_handler != _sigterm_handler): - if not append: - raise RuntimeError("there is already a handler registered for " - "SIGTERM: %r" % old_handler) - - def handler(signum, frame): - try: - _sigterm_handler(signum, frame) - finally: - old_handler(signum, frame) - signal.signal(signal.SIGTERM, handler) - - if _sigterm_handler.__enter_ctx__: - raise RuntimeError("can't use nested contexts") - _sigterm_handler.__enter_ctx__ = True - - try: - yield - except KeyboardInterrupt: - pass - except SystemExit, err: - # code != 0 refers to an application error (e.g. explicit - # sys.exit('some error') call). - # We don't want that to pass silently. - # Nevertheless, the 'finally' clause below will always - # be executed. - if err.code != 0: - raise - finally: - _sigterm_handler.__enter_ctx__ = False - if callback is not None: - callback() - - -if __name__ == '__main__': - # =============================================================== - # --- test suite - # =============================================================== - - import unittest - import os - - class TestOnExit(unittest.TestCase): - - def setUp(self): - # reset signal handlers - signal.signal(signal.SIGTERM, signal.SIG_DFL) - self.flag = None - - def tearDown(self): - # make sure we exited the ctx manager - self.assertTrue(self.flag is not None) - - def test_base(self): - with handle_exit(): - pass - self.flag = True - - def test_callback(self): - callback = [] - with handle_exit(lambda: callback.append(None)): - pass - self.flag = True - self.assertEqual(callback, [None]) - - def test_kinterrupt(self): - with handle_exit(): - raise KeyboardInterrupt - self.flag = True - - def test_sigterm(self): - with handle_exit(): - os.kill(os.getpid(), signal.SIGTERM) - self.flag = True - - def test_sigint(self): - with handle_exit(): - os.kill(os.getpid(), signal.SIGINT) - self.flag = True - - def test_sigterm_old(self): - # make sure the old handler gets executed - queue = [] - signal.signal(signal.SIGTERM, lambda s, f: queue.append('old')) - with handle_exit(lambda: queue.append('new'), append=True): - os.kill(os.getpid(), signal.SIGTERM) - self.flag = True - self.assertEqual(queue, ['old', 'new']) - - def test_sigint_old(self): - # make sure the old handler gets executed - queue = [] - signal.signal(signal.SIGINT, lambda s, f: queue.append('old')) - with handle_exit(lambda: queue.append('new'), append=True): - os.kill(os.getpid(), signal.SIGINT) - self.flag = True - self.assertEqual(queue, ['old', 'new']) - - def test_no_append(self): - # make sure we can't use the context manager if there's - # already a handler registered for SIGTERM - signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0)) - try: - with handle_exit(lambda: self.flag.append(None)): - pass - except RuntimeError: - pass - else: - self.fail("exception not raised") - finally: - self.flag = True - - def test_nested_context(self): - self.flag = True - try: - with handle_exit(): - with handle_exit(): - pass - except RuntimeError: - pass - else: - self.fail("exception not raised") - - unittest.main() diff --git a/linaropy/__init__.py b/linaropy/__init__.py new file mode 100644 index 0000000..987d6df --- /dev/null +++ b/linaropy/__init__.py @@ -0,0 +1 @@ +#__all__ = ["linarorn", "linarogit"] diff --git a/linaropy/git/__init__.py b/linaropy/git/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/linaropy/git/clone.py b/linaropy/git/clone.py new file mode 100644 index 0000000..d8fe126 --- /dev/null +++ b/linaropy/git/clone.py @@ -0,0 +1,312 @@ +import unittest +import logging +import os + +from sh import git +from sh import ln +from sh import mkdir +from sh import ErrorReturnCode +from sh import chmod + +# In order to set directory read/write permissions for test cases +from stat import * + +from ..proj import Proj +from gitrepo import cd +from gitrepo import GitRepo + +import time + +class Clone(GitRepo): + def __init__(self, proj, clonedir=None, remote=None ): + super(Clone,self).__init__(proj) + + # If the user passes a clonedir then they want to reuse an existing + # clone. This will often be the case with large repositories like + # gcc, which take 20 minutes to clone. + + # Clone a new repo from the remote + if clonedir is None or (isinstance(clonedir, basestring) and clonedir == ""): + if remote=="" or remote is None: + raise TypeError('Clone input parameter \'remote\' can not be empty') + + # TODO test that remote is a well formed URL? + # TODO if there's a trailing / strip it. + self.remote=remote + + # Strip the end off the repo url to find the clonedir name. + clone_dirname=remote.rsplit('/', 1)[-1] + + logging.info('Cloning %s repository into %s/%s' % (self.remote, self.proj.projdir, clone_dirname)) + + self.repodir=self.proj.projdir + '/' + clone_dirname + try: + with cd(self.proj.projdir): + git("clone", self.remote, clone_dirname, _err_to_out=True, _out="/dev/null") + except ErrorReturnCode: + raise EnvironmentError("Git unable to clone into " + self.repodir) + + + # Reuse an existing git clone directory + else: + if not isinstance(clonedir, basestring): + raise TypeError('clonedir must be of type basestring') + elif not os.path.isdir(clonedir): + raise EnvironmentError("Specified clonedir directory '%s' does not exist" % clonedir) + + # Check to see if clonedir is a git repository. + with cd(clonedir): + # There might be a file called .git + if not os.path.isdir(clonedir + '/.git'): + try: + git("rev-parse", _err_to_out=True, _out="/dev/null") + except ErrorReturnCode: + raise EnvironmentError('Specified clonedir is not a git repository.') + + # Strip off the directory name + clone_dirname=clonedir.rsplit('/', 1)[-1] + self.repodir=self.proj.projdir + '/' + clone_dirname + + # TODO: verify that we don't get another clone dir in the projdir. + # It's possible the clonedir is already in the projdir. + if os.path.exists(self.repodir): + logging.info('The clonedir %s already exists in %s' % (clonedir, self.repodir)) + else: + logging.info('Reusing existing clonedir %s and symlinking as %s in %s' % (clonedir, self.repodir, self.proj.projdir)) + try: + ln("-s", clonedir, self.repodir) + except ErrorReturnCode: + raise EnvironmentError("Can't create symlink %s to %s." % (clonedir, self.repodir)) + + logging.info('Clone directory set as %s' % (self.repodir)) + return + + def clonedir(self): + return self.repodir + + def __repr__(self): + return "" + +class TestClone(unittest.TestCase): + testdirprefix="CloneUT" + + @classmethod + def setUpClass(cls): + # We need a Proj dir to store the mother clone in. + cls.proj=Proj(prefix=TestClone.testdirprefix) + + # Setup a single remote clone in a Proj directory and then all of the + # other tests in this module should clone from this resulting local + # clone as if it were a remote. This will reduce the network burden on + # the remote git server being tested. If this fails then the entire + # TestClone will be marked as not executed. + + #TODO: Perform a connectivity test to determine online or offline + # testing mode. + + cls.rnclone=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + #OFFLINE: cls.rnclone=Clone(cls.proj,remote=u'/var/run/media/ryanarn/sidecar/reldir/release-notes') + cls.startbranch=cls.rnclone.getbranch() + + cls.rtremote=u'http://git.linaro.org/toolchain/tcwg-release-tools.git' + #OFFLINE: cls.rtremote=u'/var/run/media/ryanarn/sidecar/releases/tcwg-release-tools' + + # All further tests will use this as the remote. + cls.rnremote=cls.rnclone.repodir + + @classmethod + def tearDownClass(cls): + cls.proj.cleanup() + + # Every instance of TestClass requires its own proj directory. + def setUp(self): + # TODO: Is this redundant with the setUpClass()? + self.proj=Proj(prefix=TestClone.testdirprefix) + + def tearDown(self): + # TODO: Is this redundant with the tearDownClass()? + self.proj.cleanup() + + # These immediate tests clone from a remote repository + def test_clone(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + self.assertTrue(os.path.isdir(self.clone.clonedir())) + + def test_two_different_clones_in_one_projdir(self): + self.clonern=Clone(self.proj,remote=TestClone.rnremote) + self.clonert=Clone(self.proj,remote=TestClone.rtremote) + self.assertTrue(os.path.isdir(self.clonern.clonedir()) and os.path.isdir(self.clonert.clonedir())) + + # All clones after this point should use TestClone.rnremote as the remote. + + def test_str_function(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + self.assertEqual(str(self.clone), self.clone.clonedir()) + + def test_empty_string_clonedir_with_remote(self): + # This will use the remote because clonedir is an empty string. + self.clone=Clone(self.proj,remote=TestClone.rnremote, clonedir="") + self.assertTrue(os.path.isdir(self.clone.clonedir())) + + def test_none_clonedir_with_remote(self): + # This will use the remote because there is no clonedir + self.clone=Clone(self.proj,remote=TestClone.rnremote) + self.assertTrue(os.path.isdir(self.clone.clonedir())) + + def test_empty_remote_empty_clonedir(self): + # A bogus directory will fail. + with self.assertRaises(TypeError): + self.clone=Clone(self.proj,remote="", clonedir="") + + def test_non_string_clonedir(self): + self.foo=Clone(self.proj,remote=TestClone.rnremote, clonedir="") + with self.assertRaises(TypeError): + self.clone=Clone(self.proj, clonedir=self.foo) + + def test_no_string_clonedir_empty_remote(self): + # remote can't be be empty if clonedir is not set. + with self.assertRaises(TypeError): + self.clone=Clone(self.proj,remote="") + + def test_existing_clonedir(self): + # Clone's clonedir parameter allows a user to specify an existing + # directory use for the clone. It will setup a symlink in the proj + # directory pointing to the tree. This is a useful feature for + # very large repositories. + + # Use the clonedir from TestClone.rnclone + self.clone=Clone(self.proj,clonedir=TestClone.rnclone.clonedir()) + + # Verify that a symlink is used for the clone. + self.assertTrue(os.path.islink(self.clone.clonedir())) + + def test_failed_symlink(self): + # We'll make the projdir read-only so we can force symlink creation + # to fail. + with cd(self.proj.projdir): + # Set to read-only to force symlink to fail. + os.chmod(self.proj.projdir, S_IRUSR | S_IXUSR) + + # Use the clonedir from TestClone.rnclone + with self.assertRaises(EnvironmentError): + # Verify that this throws an exception. + self.clone=Clone(self.proj,clonedir=TestClone.rnclone.clonedir()) + + with cd(self.proj.projdir): + # Reset to writable so we can remove the projdir + os.chmod(self.proj.projdir, S_IWUSR | S_IXUSR) + + def test_clone_failure_due_to_write_permissions(self): + with cd(self.proj.projdir): + # Set to read-only to force the git clone to fail. + os.chmod(self.proj.projdir, S_IRUSR | S_IXUSR) + + with self.assertRaises(EnvironmentError): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + + with cd(self.proj.projdir): + # Reset to writable so we can remove the projdir + os.chmod(self.proj.projdir, S_IWUSR | S_IXUSR) + + def test_nonexistant_clonedir(self): + with self.assertRaises(EnvironmentError): + self.clone_one=Clone(self.proj, clonedir="/a/nonexistant/directory/") + + def test_not_gitdir(self): + # Verify that an exception is raised when attempting to clone something + # that's not a git repository. + nongitdir=self.proj.projdir + "/nongitdir" + with cd (self.proj.projdir): + os.mkdir(nongitdir) + + with self.assertRaises(EnvironmentError): + self.clone=Clone(self.proj,remote="", clonedir=nongitdir) + + def test_not_gitdir_but_with_dot_git(self): + # Verify that an exception is raised when attempting to clone a + # directory that has a .git file but is NOT a git repository. + nongitdir=self.proj.projdir + "/nongitdir" + with cd (self.proj.projdir): + os.mkdir(nongitdir) + # Create and empty .git file. + with open(".git", 'a'): + os.utime(".git", None) + + with self.assertRaises(EnvironmentError): + self.clone=Clone(self.proj,remote="", clonedir=nongitdir) + + def test_incorrect_input_proj_type(self): + # Clone requires a valid Proj type as an input parameter. Verify that + # it throws an exception. + proj='stringtypeonpurpose' + with self.assertRaises(TypeError): + self.clone=Clone(proj,remote=TestClone.rnremote) + + def test_non_destructive_cleanup(self): + # Make sure that the file removal method in proj.cleanup() doesn't + # follow and remove symlinks. + self.clone_one=Clone(self.proj,remote=TestClone.rnremote) + self.proj_two=Proj(prefix=TestClone.testdirprefix) + # Use the clonedir from the first project as the clonedir for the + # second project. + self.clone_two=Clone(self.proj_two,clonedir=self.clone_one.clonedir()) + self.assertTrue(os.path.islink(self.clone_two.clonedir())) + + # Cleanup proj_two + self.proj_two.cleanup() + + # Make sure clone_one.clonedir wasn't removed when proj_two was removed + # which is a concern since proj_two used a clone directory from + # proj. + self.assertTrue(os.path.isdir(self.clone_one.clonedir())) + + def test_get_branch_clone(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + self.assertEqual(TestClone.startbranch, self.clone.getbranch()) + + def test_checkoutbranch_clone(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + + # Test a new branch + with self.clone.checkoutbranch("TestClone", "origin/HEAD"): + self.assertEqual("TestClone", self.clone.getbranch()) + + self.assertEqual(TestClone.startbranch, self.clone.getbranch()) + + # Test being able to switch to a branch that already exists. + with self.clone.checkoutbranch("TestClone"): + self.assertEqual("TestClone", self.clone.getbranch()) + + self.assertEqual(TestClone.startbranch, self.clone.getbranch()) + + #TODO test a non-default 'track' passed to checkoutbranch. + + def test_checkoutbranch_clone_with_local_changes(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + + # create an empty file and add it to the repository. + with cd (self.clone.repodir): + # Create and empty file. + with open("checkouttest", 'a'): + os.utime("checkouttest", None) + + self.clone.add(self.clone.repodir + '/checkouttest') + + with self.clone.checkoutbranch("TestClone", "origin/HEAD"): + self.assertEqual("TestClone", self.clone.getbranch()) + + # Verify there's no file name 'checkouttest' when we switch branch. + with cd (self.clone.repodir): + self.assertFalse(os.path.isfile(self.clone.repodir + '/checkouttest')) + + # Verify that we've returned to the correct branch. + self.assertEqual(TestClone.startbranch, self.clone.getbranch()) + + # Show that checkouttest does exist once were out of the + # checkoutbranch context. + with cd (self.clone.repodir): + self.assertTrue(os.path.isfile(self.clone.repodir + '/checkouttest')) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/linaropy/git/gitrepo.py b/linaropy/git/gitrepo.py new file mode 100644 index 0000000..99b2573 --- /dev/null +++ b/linaropy/git/gitrepo.py @@ -0,0 +1,234 @@ +import unittest +import logging +import os + +from sh import git +from sh import ErrorReturnCode +from sh import ErrorReturnCode_128 + +import subprocess + +# Use this to manage 'cd' context +from contextlib import contextmanager + +from ..proj import Proj +# Return to prevdir when context is exited. +# Use in the following manner: +# +# with cd("foodir"): +# operation1_in_foodir +# operation2_in_foodir +# +# operation3_in_prevdir +# +@contextmanager +def cd(newdir): + prevdir = os.getcwd() + os.chdir(os.path.expanduser(newdir)) + try: + yield + finally: + os.chdir(prevdir) + +# This is called 'GitRepo' because a repository for a project doesn't only have +# to be a git repository. Abstract base class used to define the operations +# that both Clone and Workdir share. Notice that there is no actual repodir +# specified here. You must instantiate a subclass in order to use/test these +# operations. This is because 'clones' and 'workdirs' are created differently +# but you operate on them in the exact same way. +class GitRepo(object): + def __init__(self, proj): + self.repodir=u"" + if isinstance(proj, Proj): + self.proj=proj + else: + raise TypeError('proj input parameter is not of type Proj') + + def branchexists(self, branch): + with cd(self.repodir): + try: + # Quote the branchname because it might have strange + # characters in it. + br="%s" % branch + git("rev-parse", "--verify", br) + except ErrorReturnCode_128: + return False + return True + + def getbranch(self): + try: + with cd(self.repodir): + branch=git("rev-parse", "--abbrev-ref", "HEAD").stdout.rstrip() + except ErrorReturnCode: + raise EnvironmentError("Unable to get the current branch") + return branch + + # TODO: Fully test this. + # Stash current changes and change to new branch. + # Return to previous branch when context is exited. + @contextmanager + def checkoutbranch(self, branch, track=""): + with cd(self.repodir): + # The stashid will remain empty if there were no changes in the + # directory to stash. + stashid=u"" + + try: + self.prevbranch=git("rev-parse", "--abbrev-ref", "HEAD").stdout.rstrip() + except ErrorReturnCode: + raise EnvironmentError("Unable to get the current branch") + + try: + # TODO: do we want to do a blanket git add *? + stashid=git("stash", "create").stdout.rstrip() + # If there's no stash then there's no reason to save. + if stashid!="": + git("stash", "save") + except ErrorReturnCode: + raise EnvironmentError("Unable to get the current branch") + + try: + # TODO This is a nop as it is. + git("rev-parse", "--verify", branch) + # if the branch exists the previous statement won't cause an + # exception so we know we can just check it out. + git("checkout", branch) + except ErrorReturnCode: + try: + # if it doesn't exist we need to checkout a new branch + git("checkout", "-b", branch, track).stdout.rstrip() + except ErrorReturnCode as exc: + raise EnvironmentError("Unable to checkout -b %s %s" % (branch, track)) + try: + yield + finally: + try: + with cd(self.repodir): + git("checkout", self.prevbranch) + # If there's no stashid then there were no files stashed, + # so don't stash apply. + if stashid!="": + git("stash", "apply", stashid) + except ErrorReturnCode, exc: + raise EnvironmentError("Unable to return to the previous branch: %s" % exc.stderr) + + # TODO: Write a unit test for this. + def add(self, filetogitadd): + # TODO: Determine if filetogitadd is relative or absolute. + # TODO: verify that filetogitadd is in self.repodir + try: + with cd(self.repodir): + git("add", filetogitadd) + except ErrorReturnCode: + raise EnvironmentError("Unable to git add " + filetogitadd) + + # TODO: Write a unit test for this. + # TODO: fix the default + def commit(self, message="default"): + print "Committing changes." + try: + with cd(self.repodir): + # Git commit first with a boiler plate message and then allow the user + # to amend. + if git("status", "--porcelain"): + # print git("commit", "-m", '%s' % message, _err_to_out=True) + subprocess.call(["git", "commit", "-m", message]) + # using python sh will suppress the git editor + subprocess.call(["git", "commit", "--amend"]) + except ErrorReturnCode: + raise EnvironmentError("Unable to git commit ") + + def log(self, number): + try: + with cd(self.repodir): + print git("log", "-n %d" % number) + except ErrorReturnCode: + raise EnvironmentError("Unable to git add " + filetogitadd) + + # TODO: Does this need to 'cd' first? + def editFile(self, toedit): + editor = os.getenv('EDITOR') + if not editor: + editor='/usr/bin/vim' + os.system(editor + ' ' + self.repodir + '/' + toedit) + + self.add(toedit) + + return + + # TODO: Test this function with both dryrun=True and dryrun=False + def pushToBranch(self, tobranch, dryrun=True): + try: + # TODO: Check for un-merged commits. + with cd(self.repodir): + branch=self.getbranch() + if dryrun: + git("push", "--dry-run", branch, tobranch) + else: + git("push", branch, tobranch) + except ErrorReturnCode: + raise EnvironmentError("Unable to push branch %s to %s" % (branch, tobranch)) + + def __str__(self): + return self.repodir + +# TestGitRepo is an abstract baseclass. Verify that the constructor +# will throw an exception if you try to run operations without +# defining a subclass. +class TestGitRepo(unittest.TestCase): + repoprefix="GitRepoUT" + + # This class is only used to test the GitRepo functions since, as an, + # abstract-baseclass, GitRepo doesn't define repodir, and we need an + # actual repository to test git based functions. + class DerivedGitRepo(GitRepo): + def __init__(self, proj): + super(TestGitRepo.DerivedGitRepo,self).__init__(proj) + self.remote=u'http://git.linaro.org/toolchain/release-notes.git' + dgr_dirname="release-notes" + self.repodir=self.proj.projdir + "/" + dgr_dirname + try: + with cd(self.proj.projdir): + git("clone", self.remote, dgr_dirname, _err_to_out=True, _out="/dev/null") + except ErrorReturnCode: + raise EnvironmentError("Git unable to clone into " + self.repodir) + + @classmethod + def setUpClass(cls): + cls.proj=Proj(prefix=TestGitRepo.repoprefix, persist=False) + cls.dgr=TestGitRepo.DerivedGitRepo(cls.proj) + + @classmethod + def tearDownClass(cls): + cls.proj.cleanup() + + # Test that the abstract baseclass throws an exception if one of the + # functions are called from the baseclass without a derived class. + def test_repo(self): + self.repo=GitRepo(self.proj) + with self.assertRaises(OSError): + # We haven't set a repodir so this should throw an exception. + branch=self.repo.getbranch() + + # TODO: Create a test-branch in the repo so it's always there. + + def test_branchexists(self): + self.assertTrue(self.dgr.branchexists("remotes/origin/releases/linaro-4.9-2015.05")) + + def test_not_branchexists(self): + self.assertFalse(self.dgr.branchexists("foobar")) + + def test_getbranch(self): + with cd(self.dgr.repodir): + try: + git("checkout", "-b", "master_test", "origin/master").stdout.rstrip() + except ErrorReturnCode as exc: + raise EnvironmentError("Unable to checkout -b %s %s" % (branch, track)) + self.assertEquals(self.dgr.getbranch(), "master_test") + + # TODO: Test checkoutbranch with various combinations of polluted + # directories. + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/linaropy/git/workdir.py b/linaropy/git/workdir.py new file mode 100644 index 0000000..77fa22a --- /dev/null +++ b/linaropy/git/workdir.py @@ -0,0 +1,190 @@ +import unittest +import logging +import os + +import uuid + +from ..proj import Proj + +from gitrepo import cd +from gitrepo import GitRepo + +from clone import Clone +from clone import cd + +from sh import git +from sh import git_new_workdir +from sh import ErrorReturnCode + +# TODO: Set the git repo pushurl. + +class Workdir(GitRepo): + # @ clone - clone this workdir from this Clone. + # @ workdir - name of the proposed workdir. + # @ track - derive the workdir from the branch designated by the string + # in 'track'. + # @ branchname - the name of the local branch + def __init__(self, proj, clone=None, workdir="", track=None, branchname=""): + super(Workdir,self).__init__(proj) + + # TODO: The Clone can be in a different project dir than proj. if + # clone.clonedir is not in proj.projdir then create a symlink in the + # current projdir. + + if not isinstance(clone, Clone): + raise TypeError('Clone input parameter is not of type Clone') + + if not isinstance(workdir, str): + raise TypeError('Workdir workdir parameter must be a string') + + # An empty workdir would result in an empty variable expansion + # and a malformed git new-workdir expression. + if workdir=="": + raise EnvironmentError('You must specify a workdir directory when creating a Workdir') + + if branchname=="": + raise EnvironmentError('You must specify a branchname when creating a Workdir') + + try: + with cd(self.proj.projdir): + logging.info("Workdir(): going to call git new-workdir tracking %s" % track) + if track: + logging.info("Workdir(): Calling git new-workdir for workdir %s, tracking %s" % (workdir, track)) + #TODO: Do we want to prevalidate that 'track' is a valid branch or tag? + # If we don't this will just throw an exception. + print git_new_workdir(clone.clonedir(), workdir, track, _err_to_out=True, _out="/dev/null") + else: + logging.info("Workdir(): Calling git new-workdir for workdir %s, tracking %s" % (workdir, "origin/HEAD")) + # Always just checkout HEAD if the track is not specified. + print git_new_workdir(clone.clonedir(), workdir, "origin/HEAD", _err_to_out=True, _out="/dev/null") + + except ErrorReturnCode as exc: + # if 'workdir' is None + # If 'workdir' already exists. + # If clone.clonedir() doesn't exist. + # If track isn't a valid branch. + raise EnvironmentError("Unable to create a git workdir ") + + self.repodir=self.proj.projdir + '/' + workdir + + try: + with cd(self.repodir): + git("checkout", "-b", branchname) + except ErrorReturnCode as exc: + raise EnvironmentError("Unable to checkout branch %s in workdir %s" % (branchname, self.repodir)) + + + #TODO: Verify that self.repodir exists or 'raise' exception? + + # TODO: write this function or the workdir will remain in detached + # head state if it's not checked out into a branch. + # def checkout(self, ): + + def workdir(self): + return self.repodir + +class TestWorkdir(unittest.TestCase): + testdirprefix="WorkdirUT" + + @classmethod + def setUpClass(cls): + # We need a Proj dir to store the necessary Clone in. + cls.proj=Proj(prefix=TestWorkdir.testdirprefix) + + #TODO: Perform a connectivity test to determine online or offline + # testing mode. + + # We'll create a Clone to be used by all the Workdir tests in order to + # be nice to the git server. + cls.rnclone=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + #OFFLINE: cls.rnclone=Clone(cls.proj,remote=u'/var/run/media/ryanarn/sidecar/reldir/release-notes') + + # All further tests will use this as the remote if they need to + # create more clones. + cls.rnremote=cls.rnclone.repodir + + # A common name to use for the workdir for all of the Workdir tests. + cls.workdir="testworkdir" + + @classmethod + def tearDownClass(cls): + cls.proj.cleanup() + + # Every instance of TestWorkdir requires its own proj directory. + def setUp(self): + self.proj=Proj(prefix=TestWorkdir.testdirprefix) + self.branchname="testworkdir-" + str(uuid.uuid4()) + + def tearDown(self): + self.proj.cleanup() + + def test_workdir(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track=None, branchname=self.branchname) + self.assertTrue(os.path.isdir(self.workdir.workdir())) + + def test_workdir_with_default_track(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) + self.assertTrue(os.path.isdir(self.workdir.workdir())) + + def test_workdir_with_track(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) + self.assertTrue(os.path.isdir(self.workdir.workdir())) + + def test_workdir_with_checkoutbranch(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname="startbranch") + self.assertTrue(os.path.isdir(self.workdir.workdir())) + + # Test a new branch (derived from the existing branch) + with self.workdir.checkoutbranch("TestWorkdir", self.workdir.getbranch()): + self.assertEqual("TestWorkdir", self.workdir.getbranch()) + + # Verify that leaving the context restored the original branch. + self.assertEqual(self.workdir.getbranch(), "startbranch") + + # Test being able to switch to a branch that already exists. + with self.workdir.checkoutbranch("TestWorkdir"): + self.assertEqual("TestWorkdir", self.workdir.getbranch()) + + self.assertEqual(self.workdir.getbranch(), "startbranch") + + def test_workdir_already_exists(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) + + # Should fail because TestWorkdir.workdir should already exist. + with self.assertRaises(EnvironmentError): + self.workdir2=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname + '_2') + + def test_workdir_exceptions(self): + with self.assertRaises(TypeError): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=None, branchname=self.branchname) + + with self.assertRaises(EnvironmentError): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="someinvalidbranchname", branchname=self.branchname) + + # Test if clone.clonedir() doesn't exist by breaking repodir. + brokenclone=Clone(TestWorkdir.proj,TestWorkdir.rnremote) + brokenclone.repodir="/breakthisrepodir" + + # Verify that a Clone with a broken repodir/clonedir results in an + # exception. + with self.assertRaises(EnvironmentError): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) + + # 'clone' isn't a valid Clone type. + with self.assertRaises(TypeError): + self.workdir=Workdir(self.proj, clone="foobar", workdir=TestWorkdir.workdir, branchname=self.branchname) + + def test_workdir_exceptions(self): + #TODO: branchname is empty. + with self.assertRaises(EnvironmentError): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname="") + + def test_workdir_exceptions(self): + #TODO: branchname already exists. + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) + with self.assertRaises(EnvironmentError): + self.workdir2=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/linaropy/handle_exit.py b/linaropy/handle_exit.py new file mode 100644 index 0000000..7eb4ae1 --- /dev/null +++ b/linaropy/handle_exit.py @@ -0,0 +1,157 @@ +# Author: Giampaolo Rodola' +# License: MIT + +from __future__ import with_statement +import contextlib +import signal +import sys + + +def _sigterm_handler(signum, frame): + sys.exit(0) +_sigterm_handler.__enter_ctx__ = False + + +@contextlib.contextmanager +def handle_exit(callback=None, append=False): + """A context manager which properly handles SIGTERM and SIGINT + (KeyboardInterrupt) signals, registering a function which is + guaranteed to be called after signals are received. + Also, it makes sure to execute previously registered signal + handlers as well (if any). + + >>> app = App() + >>> with handle_exit(app.stop): + ... app.start() + ... + >>> + + If append == False raise RuntimeError if there's already a handler + registered for SIGTERM, otherwise both new and old handlers are + executed in this order. + """ + old_handler = signal.signal(signal.SIGTERM, _sigterm_handler) + if (old_handler != signal.SIG_DFL) and (old_handler != _sigterm_handler): + if not append: + raise RuntimeError("there is already a handler registered for " + "SIGTERM: %r" % old_handler) + + def handler(signum, frame): + try: + _sigterm_handler(signum, frame) + finally: + old_handler(signum, frame) + signal.signal(signal.SIGTERM, handler) + + if _sigterm_handler.__enter_ctx__: + raise RuntimeError("can't use nested contexts") + _sigterm_handler.__enter_ctx__ = True + + try: + yield + except KeyboardInterrupt: + pass + except SystemExit, err: + # code != 0 refers to an application error (e.g. explicit + # sys.exit('some error') call). + # We don't want that to pass silently. + # Nevertheless, the 'finally' clause below will always + # be executed. + if err.code != 0: + raise + finally: + _sigterm_handler.__enter_ctx__ = False + if callback is not None: + callback() + + +if __name__ == '__main__': + # =============================================================== + # --- test suite + # =============================================================== + + import unittest + import os + + class TestOnExit(unittest.TestCase): + + def setUp(self): + # reset signal handlers + signal.signal(signal.SIGTERM, signal.SIG_DFL) + self.flag = None + + def tearDown(self): + # make sure we exited the ctx manager + self.assertTrue(self.flag is not None) + + def test_base(self): + with handle_exit(): + pass + self.flag = True + + def test_callback(self): + callback = [] + with handle_exit(lambda: callback.append(None)): + pass + self.flag = True + self.assertEqual(callback, [None]) + + def test_kinterrupt(self): + with handle_exit(): + raise KeyboardInterrupt + self.flag = True + + def test_sigterm(self): + with handle_exit(): + os.kill(os.getpid(), signal.SIGTERM) + self.flag = True + + def test_sigint(self): + with handle_exit(): + os.kill(os.getpid(), signal.SIGINT) + self.flag = True + + def test_sigterm_old(self): + # make sure the old handler gets executed + queue = [] + signal.signal(signal.SIGTERM, lambda s, f: queue.append('old')) + with handle_exit(lambda: queue.append('new'), append=True): + os.kill(os.getpid(), signal.SIGTERM) + self.flag = True + self.assertEqual(queue, ['old', 'new']) + + def test_sigint_old(self): + # make sure the old handler gets executed + queue = [] + signal.signal(signal.SIGINT, lambda s, f: queue.append('old')) + with handle_exit(lambda: queue.append('new'), append=True): + os.kill(os.getpid(), signal.SIGINT) + self.flag = True + self.assertEqual(queue, ['old', 'new']) + + def test_no_append(self): + # make sure we can't use the context manager if there's + # already a handler registered for SIGTERM + signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0)) + try: + with handle_exit(lambda: self.flag.append(None)): + pass + except RuntimeError: + pass + else: + self.fail("exception not raised") + finally: + self.flag = True + + def test_nested_context(self): + self.flag = True + try: + with handle_exit(): + with handle_exit(): + pass + except RuntimeError: + pass + else: + self.fail("exception not raised") + + unittest.main() diff --git a/linaropy/proj.py b/linaropy/proj.py new file mode 100644 index 0000000..819cbe4 --- /dev/null +++ b/linaropy/proj.py @@ -0,0 +1,116 @@ +import unittest +import logging +import os +import tempfile + +# Execute shell commands with the 'sh' package. +from sh import rm + +# TODO: Take an existing directory as an input, which allows the benefits of a +# Proj without having to create the directory. BUT make sure that +# persist=True is forced in such cases. + +# This class represents the project temporary directory whose lifetime is +# managed. It is not persistent by default. +class Proj(object): + # @ persist - True, False or None. Only True persists. + # @ prefix - Optional prefix appended to projdir directory name. + def __init__(self, prefix="", persist=None): + # This allows string or bool inputs. + if str(persist).lower() == "true": + self.persist=True + longevity="persistent" + else: + self.persist=False + longevity="temporary" + + self.projdir=unicode(tempfile.mkdtemp(prefix=prefix), "utf-8") + logging.info("Created %s project directory in %s" % (longevity, self.projdir)) + + def cleanup(self): + + #FIXME: Only rm -rf if the directory exists. Make sure it's not / or /home +# #TODO: make sure that persisdir isn't a mount-bind to /home + if not self.persist and self.projdir != "/" and self.projdir != "/home": + logging.info("Removing project dir %s" % self.projdir) + rm("-rf", "--preserve-root", self.projdir) + self.projdir=None + else: + logging.info("Project dir %s will persist" % self.projdir) + + +class TestProj(unittest.TestCase): + + def setUp(self): + self.persistdir="" + + def test_create(self): + self.proj=Proj() + self.assertNotEqual(self.proj.projdir, "") + + def test_create_dir(self): + self.proj=Proj() + self.assertTrue(os.path.isdir(self.proj.projdir)) + + def test_create_with_prefix(self): + self.proj=Proj("testprefix") + self.assertTrue("testprefix" in self.proj.projdir) + + def test_create_with_persist_bool(self): + self.proj=Proj(persist=True) + self.assertTrue(self.proj.persist) + self.persistdir=self.proj.projdir + + def test_create_with_persist_str(self): + self.proj=Proj(persist="true") + self.assertTrue(self.proj.persist) + self.persistdir=self.proj.projdir + + def test_create_with_persist_false_str(self): + self.proj=Proj(persist="false") + self.assertFalse(self.proj.persist) + # Just in-case it wasn't set properly + self.persistdir=self.proj.projdir + + def test_create_with_persist_false_bool(self): + self.proj=Proj(persist=False) + self.assertFalse(self.proj.persist) + # Just in-case it wasn't removed properly + self.persistdir=self.proj.projdir + + def test_create_with_persist_capstr(self): + self.proj=Proj(persist="TRUE") + self.assertTrue(self.proj.persist) + self.persistdir=self.proj.projdir + + def test_cleanup(self): + self.proj=Proj() + projdir=self.proj.projdir + self.proj.cleanup() + # Verify that the directory has been removed. + self.assertFalse(os.path.isdir(projdir)) + + def test_cleanup_with_persist(self): + self.proj=Proj(persist=True) + projdir=self.proj.projdir + self.proj.cleanup() + self.assertTrue(os.path.isdir(projdir)) + self.persistdir=self.proj.projdir + + def test_cleanup_with_persist_and_prefix(self): + self.proj=Proj(persist=True, prefix="thingy") + projdir=self.proj.projdir + self.proj.cleanup() + self.assertTrue(os.path.isdir(projdir)) + self.persistdir=self.proj.projdir + + def tearDown(self): + # if persistdir is set and isdir then remove it or we'll pollute /tmp. + if os.path.isdir(self.persistdir): + if self.persistdir != "/" and self.persistdir != "/home": + rm("-rf", "--preserve-root", self.persistdir) + + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/linaropy/rn/__init__.py b/linaropy/rn/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/linaropy/rn/custom_wordwrap.py b/linaropy/rn/custom_wordwrap.py new file mode 100644 index 0000000..4ed3e25 --- /dev/null +++ b/linaropy/rn/custom_wordwrap.py @@ -0,0 +1,31 @@ +#from jinja2.utils import Markup, escape, pformat, urlize, soft_unicode, \ +# unicode_urlencode +from jinja2.runtime import Undefined +#from jinja2.exceptions import FilterArgumentError +#from jinja2._compat import imap, string_types, text_type, iteritems + +def environmentfilter(f): + """Decorator for marking environment dependent filters. The current + :class:`Environment` is passed to the filter as first argument. + """ + f.environmentfilter = True + return f + +@environmentfilter +def do_customwordwrap(environment, s, width=79, break_long_words=True, + wrapstring=None, break_on_hyphens=False): + """ + Return a copy of the string passed to the filter wrapped after + ``79`` characters. You can override this default using the first + parameter. If you set the second parameter to `false` Jinja will not + split words apart if they are longer than `width`. By default, the newlines + will be the default newlines for the environment, but this can be changed + using the wrapstring keyword argument. + """ + if not wrapstring: + wrapstring = environment.newline_sequence + import textwrap + return wrapstring.join(textwrap.wrap(s, width=width, expand_tabs=False, + replace_whitespace=False, + break_long_words=break_long_words, + break_on_hyphens=break_on_hyphens)) diff --git a/linaropy/rn/gccclone.py b/linaropy/rn/gccclone.py new file mode 100644 index 0000000..36f610b --- /dev/null +++ b/linaropy/rn/gccclone.py @@ -0,0 +1,27 @@ + +from sh import git +from ..git.clone import Clone +from ..git.gitrepo import cd + +class GCCClone(Clone): + def __init__(self, proj, clonedir=None, remote=None ): + super(GCCClone,self).__init__(proj, clonedir, remote) + + with cd(clonedir): + try: + with open('gcc/BASE-VER', 'r') as f: + self.basever = f.readline().strip() + except: + raise IOError('gcc/BASE-VER not found in ' + clonedir) + + log=git("log", "-n 1", "--grep=Merge branch") + for logline in log: + if logline.lstrip().startswith("Merge"): + # For some reason rsplit('.') isn't working. + self.fsfrev=logline.lstrip().rsplit(None,1)[1].split('.',1)[0] + + def get_base_version(self): + return self.basever + + def get_fsf_revision(self): + return self.fsfrev diff --git a/linaropy/rn/linaroseries.py b/linaropy/rn/linaroseries.py new file mode 100644 index 0000000..f978655 --- /dev/null +++ b/linaropy/rn/linaroseries.py @@ -0,0 +1,361 @@ +import unittest +import copy + +from datetime import datetime +from dateutil.relativedelta import relativedelta +from ..vers import Spin +from ..vers import Rc +from ..series import Series +from ..series import seriesFromBranchname + +# Progression: +# The Linaro release progression follows. +# +# Snapshot 2016.01 +# spin 1 2016.01-1 +# spin 2 2016.01-2 +# Candidate 2016.01-rc1 +# rc 2 2016.01-rc2 +# Release 2016.01 +# +# Note: Remember, if a snapshot is respun to fix a +# bug then the fix should be cherry-picked into the +# release branch and a release-candidate spun from +# from the release branch. +# +# Candidate 2016.01-1-rc1 +# rc 2 2016.01-1-rc2 +# Release 2016.01-1 + +# Inherit Series in order to provide Linaro specific rules on transitions +# between series types. The baseclass Series doesn't have the toNext* +# functions defined. +class LinaroSeries(Series): + def __init__(self, seriestype, vendor=None, package=None, date=datetime.today(), spin=None, rc=None, strict=True): + + # This is a dispatch table so that we can use a 'toNext' function based + # on an input type and it will call the correct toNextFoo function. + # This will work if the key is an integer or a string. Ideally this + # would be added to Series and we wouldn't need to derive, but I + # couldn't figure out how to get the pointer tables correct. + self.dispatchnext = { + Series.series.index("candidate"): self.toNextCandidate, + 'candidate': self.toNextCandidate, + Series.series.index("snapshot"): self.toNextSnapshot, + 'snapshot': self.toNextSnapshot, + Series.series.index("release"): self.toNextRelease, + 'release': self.toNextRelease, + } + + super(LinaroSeries,self).__init__(seriestype, vendor, package, date,spin,rc, strict) + + def toNextCandidate(self, date=None, strict=False): + if self.seriestype < 0 or self.seriestype >= len(Series.series): + raise TypeError('toNextCandidate on an unknown series type.') + + candidate=copy.deepcopy(self) + candidate.seriestype = Series.series.index("candidate") + + if date: + if not isinstance(date,datetime): + raise TypeError('date is not of type datetime.') + candidate.date=date + + if self.seriestype == Series.series.index("candidate"): + candidate.rc.increment() + elif self.seriestype == Series.series.index("release"): + rc=Rc(1) + candidate.rc=rc + candidate.spin.increment() + elif self.seriestype == Series.series.index("snapshot"): + spin=Spin(None) + rc=Rc(1) + candidate.rc=rc + candidate.spin=spin + + # If the user hasn't specified a date, the implicit behavior is to + # increment the date by 1 month when moving from a snapshot to a + # candidate. + if not date: + candidate.incrementMonth() + # If the user did specify a date and we're in strict mode we need to + # verify that the date they chose is exactly one month difference. + elif strict: + if candidate.date != self.date + relativedelta(months=1): + raise ValueError('toNextCandidate invoked with strict=True. Snapshot date to candidate date can only be +1 month difference.') + return candidate + + if strict and candidate.date != self.date: + raise ValueError('Candidate date can only change if current series is a Snapshot when toNextCandidate is invoked with strict=True.') + + return candidate + + def toNextRelease(self, date=None, strict=False): + if self.seriestype < 0 or self.seriestype >= len(Series.series): + raise TypeError('toNextRelease called on an unknown series type.') + elif self.seriestype == Series.series.index("release"): + raise TypeError('A release series can not be the basis for another release. Move to a release candidate first.') + elif self.seriestype == Series.series.index("snapshot"): + raise TypeError('A snapshot series can not be the basis for a release. Move to a release candidate first.') + + # Only a candidate can turn into a release. + release=copy.deepcopy(self) + # if we have a candidate and ask for a release rc needs to be None. + rc=Rc() + release.rc=rc + release.seriestype = Series.series.index("release") + + if date: + if not isinstance(date,datetime): + raise TypeError('date is not of type datetime.') + release.date=date + + if strict and release.date != self.date: + raise ValueError('Release date can not change as toNextRelease was invoked with strict=True') + + return release + + def toNextSnapshot(self, date=None, strict=False): + if self.seriestype < 0 or self.seriestype >= len(Series.series): + raise TypeError('toNextRelease called on an unknown series type.') + elif self.seriestype == Series.series.index("candidate"): + raise TypeError('Series of type candidate can not be the basis for a snapshot.') + if self.seriestype == Series.series.index("release"): + raise TypeError('Series of type release can not be the basis for a snapshot.') + + # Only snapshots can be snapshots next. + snapshot=copy.deepcopy(self) + snapshot.seriestype = Series.series.index("snapshot") + snapshot.spin.increment() + + if date: + if not isinstance(date,datetime): + raise TypeError('date is not of type datetime.') + snapshot.date=date + + if strict and snapshot.date != self.date: + raise ValueError('Snapshot date can not change as toNextSnapshot was invoked with strict=True') + + return snapshot + + # toNext will take a 'key' as either a string representing the + # Series.series type or the index of the Series.series type, e.g., + # 'candidate' or Series.series.index("candidate") and it will forward + # to the correct toNext* function as mapped in the dispatchnext + # dictionary. + def toNext(self, seriestype, date=None, strict=False): + return self.dispatchnext[seriestype](date, strict) + +# Helper function which creates a LinaroSeries from a properly formed branch name +# input string. +def linaroSeriesFromBranchname(branch=None): + + # Create a Series using the helper function and then populate a + # LinaroSeries. TODO: There's probably a better way to do this such + # as a SeriesFactory that is overridden. + series=seriesFromBranchname(branch) + linaroseries=LinaroSeries(Series.series[series.seriestype], vendor="linaro", package=series.package ,date=series.date , spin=series.spin, rc=series.rc, strict=True ) + return linaroseries + +class TestLinaroSeries(unittest.TestCase): + + def test_candidate_to_candidate(self): + candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + candidate2=candidate.toNextCandidate() + self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.05-1-rc2") + self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) + self.assertEqual(candidate.date, candidate2.date) + + #TODO test date and strict=True + + def test_release_to_candidate(self): + # Test where spin is None and needs to be incremented. + release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15)) + candidate=release.toNextCandidate() + self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") + self.assertEqual(candidate.seriestype, Series.series.index("candidate")) + self.assertEqual(candidate.date, release.date) + + # Now test where spin is already non-zero. + release2=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + candidate2=release2.toNextCandidate() + self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.05-2-rc1") + self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) + + #TODO test date and strict=True + + def test_snapshot_to_candidate(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") + candidate=snapshot.toNextCandidate() + self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.06-rc1") + self.assertEqual(candidate.seriestype, Series.series.index("candidate")) + self.assertNotEqual(candidate.date, snapshot.date) + # Verify that it's one month apart. + self.assertEqual(candidate.date, snapshot.date + relativedelta(months=1)) + + # Make sure that the default date increment is just one month. + candidate2=snapshot.toNextCandidate(strict=True) + self.assertEqual(candidate2.date, snapshot.date + relativedelta(months=1)) + + # whether strict=True or not. + candidate3=snapshot.toNextCandidate(strict=False) + self.assertEqual(candidate3.date, snapshot.date + relativedelta(months=1)) + + # If the user passed a date and strict=yes, make sure the increment is only one month. + candidate4=snapshot.toNextCandidate(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) + plusonem=snapshot.date + relativedelta(months=1) + self.assertEqual(candidate4.date,plusonem) + + # Make sure the date can be set without error when strict is False + candidate5=snapshot.toNextCandidate(date=datetime.strptime("2016.07.15", "%Y.%m.%d"), strict=False) + self.assertEqual(candidate5.date, snapshot.date + relativedelta(months=2)) + + with self.assertRaises(ValueError): + # Attempting to increment the date when strict=True should raise an exception. + candidate6=snapshot.toNextCandidate(date=datetime.strptime("2016.07.15", "%Y.%m.%d"), strict=True) + + def test_unknown_to_candidate(self): + unknown=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + # Purposely override with an incorrect seriestype. + unknown.seriestype=99 + with self.assertRaises(TypeError): + candidate=unknown.toNextCandidate() + + def test_candidate_to_release(self): + candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + release=candidate.toNextRelease() + self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1") + self.assertEqual(release.seriestype, Series.series.index("release")) + self.assertEqual(candidate.date, release.date) + + release2=candidate.toNextRelease(strict=True) + self.assertEqual(release2.getBranchname(), "releases/linaro-5.3-2016.05-1") + self.assertEqual(release2.date, candidate.date) + + with self.assertRaises(ValueError): + # Attempting to increment the date when strict=True should raise an exception. + release3=candidate.toNextRelease(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) + + # If strict=False and the date is changed, go ahead and change it. + release4=candidate.toNextRelease(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=False) + self.assertEqual(release4.getBranchname(), "releases/linaro-5.3-2016.06-1") + self.assertNotEqual(release4.date, release.date) + + with self.assertRaises(TypeError): + release5=candidate.toNextRelease("2016.06", strict=False) + + def test_snapshot_to_release(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") + with self.assertRaises(TypeError): + release=snapshot.toNextRelease() + + def test_release_to_release(self): + release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") + with self.assertRaises(TypeError): + release2=release.toNextRelease() + + def test_unknown_to_release(self): + unknown=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + # Purposely override with an incorrect seriestype. + unknown.seriestype=99 + with self.assertRaises(TypeError): + release=unknown.toNextRelease() + + def test_candidate_to_snapshot(self): + candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + with self.assertRaises(TypeError): + snapshot=candidate.toNextSnapshot() + # This would/should be the case if this wasn't and invalid option. + # self.assertEqual(snapshot.seriestype, "snapshot") + + def test_release_to_snapshot(self): + release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + with self.assertRaises(TypeError): + snapshot=release.toNextSnapshot() + + def test_snapshot_to_snapshot(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") + snapshot2=snapshot.toNextSnapshot() + + self.assertEqual(snapshot2.getBranchname(), "snapshots/linaro-5.3-2016.05-7") + self.assertEqual(snapshot2.seriestype, Series.series.index("snapshot")) + self.assertEqual(snapshot.date, snapshot2.date) + + # Verify that the original hasn't changed. This verifies that a + # deepcopy is used on the toNext* functions. + self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-6") + + snapshot3=snapshot.toNextSnapshot(strict=True) + self.assertEqual(snapshot3.getBranchname(), "snapshots/linaro-5.3-2016.05-7") + self.assertEqual(snapshot3.seriestype, Series.series.index("snapshot")) + self.assertEqual(snapshot.date, snapshot3.date) + + with self.assertRaises(ValueError): + # Attempting to increment the date when strict=True should raise an exception. + snapshot4=snapshot.toNextSnapshot(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) + + # If strict=False and the date is changed, go ahead and change it. + snapshot5=snapshot.toNextSnapshot(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=False) + self.assertEqual(snapshot5.getBranchname(), "snapshots/linaro-5.3-2016.06-7") + self.assertNotEqual(snapshot5.date, snapshot.date) + + # Verify that a date as a string generates an exception. + with self.assertRaises(TypeError): + snapshot6=snapshot.toNextSnapshot("2016.06.15", strict=False) + + def test_unknown_to_snapshot(self): + unknown=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + # Purposely override with an incorrect seriestype. + unknown.seriestype=99 + with self.assertRaises(TypeError): + snapshot=unknown.toNextSnapshot() + + def test_snapshot_toNext(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-1") + + candidate=snapshot.toNext("candidate") + # A Snapshot -> Candidate traversal always results in spin being set to zero + # and the first release candidate being created. + self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.06-rc1") + self.assertEqual(candidate.seriestype, Series.series.index("candidate")) + + # Make sure toNext works with 'index' as well as the type string. + candidate2=snapshot.toNext(Series.series.index("candidate")) + # A Snapshot -> Candidate traversal always results in spin being set to zero + # and the first release candidate being created. + self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.06-rc1") + self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) + + with self.assertRaises(TypeError): + candidate=snapshot.toNext("release") + candidate=snapshot.toNext(Series.series.index("release")) + + def test_toNext_incorrect_keys(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-1") + with self.assertRaises(KeyError): + candidate=snapshot.toNext("candidat") + candidate=snapshot.toNext("snapsho") + candidate=snapshot.toNext("releas") + candidate=snapshot.toNext(-1) + candidate=snapshot.toNext(len(Series.series)+1) + # Index out of range of the Series.series array. + + # Test to see if the objects returned from the toNext* functions are + # new instances and not references to the input Series. + def test_toNextFOO_return_new_objects(self): + candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + candidate2=candidate.toNext("candidate") + release=candidate.toNext("release") + + self.assertIsNot(candidate,candidate2) + self.assertIsNot(candidate,release) + + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + snapshot2=snapshot.toNext("snapshot") + self.assertIsNot(snapshot,snapshot2) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/linaropy/rn/rngen.py b/linaropy/rn/rngen.py new file mode 100644 index 0000000..b030a3e --- /dev/null +++ b/linaropy/rn/rngen.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python +# vim: set tw=78 expandtab tabstop=4 shiftwidth=4 autoindent smartindent: + +import csv +import sys, getopt +reload(sys) +sys.setdefaultencoding('utf8') + +# For fileIO +import os.path + +# Generate html files from textile files. +import textile + +# Generate text files from html. +#from BeautifulSoup import BeautifulSoup +from bs4 import BeautifulSoup + +# Jinja2 is the templating engine. +from jinja2 import Template, FileSystemLoader, Environment + +from custom_wordwrap import do_customwordwrap + +from cStringIO import StringIO + +from datetime import datetime + +from shutil import copyfile + +# To test if a remote file exists. +import urllib2 + +from ..series import Series +from linaroseries import LinaroSeries +from rnseries import CandidateRN +from gccclone import GCCClone + +from ..rninput import finput + +def get_gcc_version(gccclone): + dct= {} + (dct["major"],dct["minor"],dct["revision"]) = gccclone.get_base_version().split(".") + return dct + +def rngen(candidate, gccclone): + #TODO Test that candidate is a CandidateRN + + trackseries=candidate.get_track_series() + nextseries=candidate.get_next_series() + + print "toNext: " + nextseries.getBranchname() + print "toNext: " + format(nextseries, '%N/%d') + + # TODO: What if 'track' is not a snapshot branch? To do this right we'd + # probably have to preserve basedon information in the release notes + # repository as a yaml file. + basedon='http://snapshots.linaro.org/components/toolchain/gcc-linaro/' + basedon=basedon + trackseries.getDir() + print "basedon: " + basedon + basis_url=basedon + +# print 'gccbaseversion is ' + gccclone.get_base_version() +# dct= {} +# (dct["major"],dct["minor"],dct["revision"]) = gccclone.get_base_version().split(".") +# print 'fsf revision is ' + gccclone.get_fsf_revision() + ver=get_gcc_version(gccclone) + print 'gcc major ' + ver['major'] + print 'gcc minor ' + ver['minor'] + + fsf=gccclone.get_fsf_revision() + + # The version numbering scheme changed with GCC 5. Now, every new release + # has the major version incremented. Prior to version five the minor was + # incremented per-release. + # TODO: read stable and maintenance information from a config file. + if int(ver['major']) <= 4: + gcc_version_path= u'' + ver['major'] + '.' + ver['minor'] + stab_maint='Maintenance' + else: + gcc_version_path=ver['major'] + stab_maint='Stable' + + print "This is a %s candidate for GCC version %s" % (stab_maint, + gcc_version_path) + + history=finput('Please enter the location of the changelog csv file: ', "6.1-2016.06.csv") + + print "using the following .csv file: " + history + + #month_year=format(nextseries, '%D') + month_year=nextseries.date + + outfile=u'GCC-' + ver['major'] + u'.' + ver['minor'] + u'-' + month_year.strftime("%Y.%m") + +# if spin != "": +# outfile=outfile + u'-' + spin +# +# if rc != "": +# outfile=outfile + u'-rc' + rc + outfile=outfile + str(nextseries.spin) + str(nextseries.rc) + print "outfile: " + outfile + + with open(history, mode='r') as backports: + backports.seek(0) + backports_content = backports.read() + in_memory_backports = StringIO(backports_content) + + backportsDict = csv.DictReader(in_memory_backports, fieldnames=['rev', 'fsf', 'linaro', 'summary', 'target', 'type', 'notes', 'backport', 'owner', 'author'], delimiter=',') + backports.close(); + + release_type=nextseries.longuppertype() + + spin=str(nextseries.spin).strip('-') + rc=str(nextseries.rc).strip("-rc") + + template_dir=candidate.rntemplate.workdir() + series_dir=candidate.rnworkdir.workdir() + + # Verify that the basis url is actually valid, otherwise the release notes + # will contain erroneous information. + url=urllib2 + try: + ret = urllib2.urlopen(basis_url) + except url.URLError as e: + print "%s does not exist!" % basis_url + exit(1) + + # TODO verify that the basis_url is a valid url and warn if not. + basis=u'' + basis_ver=basis_url.strip("/") + basis_ver=basis_ver.rsplit("/",1)[1] + if basis_url.find("releases") == 1: + basis="Linaro Release GCC " + basis_ver + elif basis_url.find("-rc") == 1: + basis="Linaro Release-Candidate GCC " + basis_ver + else: + basis="Linaro Snapshot GCC " + basis_ver + + release_string="Linaro GCC %s.%s-%s.%s %s" % (ver['major'],ver['minor'],month_year.strftime("%Y"), month_year.strftime("%m"),release_type) + + print "The release notes for this %s will be generated based on: %s" % (release_string, basis) + + # Get ready to do the rendering of the jinja2 templates into .textile + # format. + + # relative path to the README.textile templates. + src_readme_path=u'/components/toolchain/gcc-linaro/5/README.textile' + bin_readme_path=u'/components/toolchain/binaries/README.textile' + + # absolute path to README.textile templates. + template_src_readme=template_dir + src_readme_path + template_bin_readme=template_dir + bin_readme_path + + # relative path to the README.textile.series templates. + # Note, the src templates don't have an extends (.series) file. + src_extends_path=u'/components/toolchain/gcc-linaro/5/README.textile' + bin_extends_path=u'/components/toolchain/binaries/README.textile.series' + + # absolute path to README.textile.series template overrides. These are + # what are passed to the jinja2 rendering engine because they might + # override the default templates with version specific information. + series_src_extends=series_dir + src_extends_path + series_bin_extends=series_dir + bin_extends_path + + # These are the README.textile files that exist in the series directory. + series_src_readme=series_dir + src_readme_path + series_bin_readme=series_dir + bin_readme_path + + # Copy the template README.textile over the series README.textile file + # because the jinja2 engine will need access to the parent template. This + # will be overwritten with the generated README.textile later. + copyfile(template_src_readme, series_src_readme) + copyfile(template_bin_readme, series_bin_readme) + + # After the script runs the *_series_template_path files are overwritten + # with the generated README.textile files. + src_readme=series_src_readme + bin_readme=series_bin_readme + + + # Setting lstrip_blocks=True lets us use indenting in jinja2 templates. + env = Environment(loader=FileSystemLoader(series_dir), lstrip_blocks=True, trim_blocks=True) + + # We need to use a custom wordwrap filter that doesn't line-break urls. + env.filters['customwordwrap'] = do_customwordwrap + + source_archive_template = env.get_template(src_extends_path) + binary_archive_template = env.get_template(bin_extends_path) + announce_template = env.get_template(bin_extends_path) + + # Generate the toolchain GCC source archive release notes. + source_textile_rendering = source_archive_template.render( + backports=backportsDict, + GCC_base_version=ver['major'] + u'.' + ver['minor'] + u'.' + ver['revision'], + FSF_rev=fsf, + basis=basis, + basis_url=basis_url, + year=month_year.strftime("%Y"), + dd_month=month_year.strftime("%m"), + rc=rc, + spin=spin, + stab_maint=stab_maint, + release_type=release_type, + package="GCC", + major=ver['major'], + minor=ver['minor']) + + source_textile_rendering = source_textile_rendering.encode('utf-8') + + out_source=u'' + outfile + out_binary=u'Linaro-' + outfile + + try: + with open(out_source + u'.textile','w') as f: + f.write(source_textile_rendering) + print 'Wrote textile rendering to file %s' % os.getcwd() + u'/' + out_source + u'.textile' + except IOError as e: + sys.exit('Error rendering textile source archive release notes.') + + source_html_rendering = textile.textile(source_textile_rendering) + + try: + with open(out_source + u'.html','w') as f: + f.write(source_html_rendering) + print 'Wrote html rendering to file %s' % os.getcwd() + u'/' + out_source + u'.html' + except IOError as e: + sys.exit('Error rendering html release notes.') + + # Generate the binary toolchain archive release notes. + binary_textile_rendering = binary_archive_template.render( + announce='', + year=month_year.strftime("%Y"), + dd_month=month_year.strftime("%m"), + rc=rc, + spin=spin, + major=ver['major'], + minor=ver['minor'], + stab_maint=stab_maint, + snap_rc_release=release_type) + + binary_textile_rendering = binary_textile_rendering.encode('utf-8') + try: + with open(out_binary + u'.textile','w') as f: + f.write(binary_textile_rendering) + print 'Wrote textile rendering to file %s' % os.getcwd() + u'/' + out_binary + u'.textile' + except IOError as e: + sys.exit('Error rendering textile binary archive release notes.') + + binary_html_rendering = textile.textile(binary_textile_rendering) + + try: + with open(out_binary + u'.html','w') as f: + f.write(binary_html_rendering) + print 'Wrote html rendering to file %s' % os.getcwd() + u'/' + out_binary + u'.html' + except IOError as e: + sys.exit('Error rendering html release notes.') + + # Generate the binary toolchain announce message. + announce_rendering = announce_template.render( + announce='yes', + year=month_year.strftime("%Y"), + dd_month=month_year.strftime("%m"), + rc=rc, + spin=spin, + major=ver['major'], + minor=ver['minor'], + stab_maint=stab_maint, + snap_rc_release=release_type) + + announce_rendering = announce_rendering.encode('utf-8') + announce_file=u'announce-' + out_binary + u'.txt' + try: + with open(announce_file,'w') as f: + f.write(announce_rendering) + print 'Wrote text rendering to file %s' % os.getcwd() + u'/' + announce_file + except IOError as e: + sys.exit('Error rendering text announce message.') + + + # Copy the generated .textile file over the + # readme_series_dir/README.textile template file. + # WARNING: This must be done AFTER you generate the announce message + # because it uses the same README.textile template as the textile rendered + # release notes. + copyfile(out_binary + u'.textile' ,bin_readme) + print 'Wrote generated file ' + out_binary + u'.textile' + " to " + bin_readme + + # Copy the generated .textile file over the + # readme_series_dir/README.textile template file. + copyfile(out_source + u'.textile' ,src_readme ) + print 'Wrote generated file ' + out_source + u'.textile' + " to " + src_readme + + candidate.rnworkdir.add(bin_readme) + candidate.rnworkdir.add(src_readme) + candidate.rnworkdir.commit("Generated Release Notes for " + nextseries.label()) + candidate.rnworkdir.log(1) + + # TODO: Ask if the user would like to push the release notes changes. + + quit(0); + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/linaropy/rn/rnseries.py b/linaropy/rn/rnseries.py new file mode 100644 index 0000000..d3e2c3f --- /dev/null +++ b/linaropy/rn/rnseries.py @@ -0,0 +1,193 @@ +import unittest +import logging +import os + +from sh import ls +from shutil import copyfile + +from ..proj import Proj +from ..git.gitrepo import cd +from ..git.clone import Clone +from ..git.workdir import Workdir +from ..series import Series +from ..series import seriesFromBranchname +from linaroseries import LinaroSeries +from linaroseries import linaroSeriesFromBranchname +from template import TemplateRN + +from ..rninput import yninput + +# Abstract base class for a release-notes series +# Every RNSeries has a templateRN instance and a workdir. The template +# instance is used for the base template. +class RNSeries(object): + #rnremote=u'http://git.linaro.org/toolchain/release-notes.git' + # use ssh:// so that we can push to the remote. + rnremote=u'ssh://git@git.linaro.org/toolchain/release-notes.git' + _series='components/toolchain/binaries/README.textile.series' + # @rnrepo - path to the existing releases notes repository if there is one. + # If there isn't one a new one will be cloned. + # @trackseries - an instance of a LinaroSeries to track. + # @nextseries - an instance of a LinaroSeries that is the next series. + def __init__(self, proj, rnrepo=None, trackseries=None, nextseries=None, headless=False): + + # Create the release-notes repository clone. The Clone constructor + # will throw an exception if proj is not a Proj. That is an + # unrecoverable error so don't catch it here. The Clone constructor + # will also throw and exception if rnrepo is not a string. + + if rnrepo: + logging.info('RNSeries() clone already exists. Using existing.') + self.rnclone=Clone(proj, clonedir=rnrepo) + else: + self.rnclone=Clone(proj, remote=RNSeries.rnremote) + + # TODO: Write a testcase that exposes this. + if not trackseries: + raise TypeError("Input variable trackseries is required.") + + # TODO: Write a testcase that exposes this. + if not isinstance(trackseries, LinaroSeries): + raise TypeError("Input variable trackseries not of type Series.") + + # TODO: Write a testcase that exposes this. + if not nextseries: + raise TypeError("Input variable nextseries is required.") + + # TODO: Write a testcase that exposes this. + if not isinstance(nextseries, LinaroSeries): + raise TypeError("Input variable nextseries not of type Series.") + + self.trackseries=trackseries + self.nextseries=nextseries + + self.proj=proj + + templ_branchname=self.nextseries.shorttype() + "_" + self.nextseries.getDir() + + logging.info('RNSeries() Creating TemplateRN') + # Every release-notes series contains a 'template' workdir checked out + # into a new templ_branchname branch that tracks master. + self.rntemplate=TemplateRN(self.proj,self.rnclone, branchname_suffix=templ_branchname) + + # We do this now because the SeriesRN workdir might be branched from the template. + self.rntemplate.updateTemplateReadme() + updateseries=self.rntemplate.updateTemplateSeries() + + self.rntemplate.commit() + + print "Please verify that your changes have been committed on the template branch." + self.rntemplate.log(1) + + # It's possible that self.trackseries.getBranchname() doesn't exist + # (such is the case if we're creating the first candidate from a + # snapshot). In this case 'track' should be the local template branch + # so that we pick up the template changes. + if not self.rntemplate.branchexists(self.trackseries.getBranchname()): + print("Creating RNSeries based on branch " + self.rntemplate.getbranch()) + self.rnworkdir=Workdir(proj=self.proj, clone=self.rnclone, workdir=self.nextseries.shorttype(), track=self.rntemplate.getbranch(), branchname=self.nextseries.getBranchname()) + else: + print "You updated the template README.textile.series but this will not be reflected in the Candidate README.textile.series." + print "Creating RNSeries based on branch " + self.trackseries.getBranchname() + self.rnworkdir=Workdir(proj=self.proj, clone=self.rnclone, workdir=self.nextseries.shorttype(), track=self.trackseries.getBranchname(), branchname=self.nextseries.getBranchname()) + + print "Log of last commit on workdir branch:" + self.rnworkdir.log(1) + + print "If you would like to make changes to the series release notes please update the the Candidate README.textile.series file:" + answer=yninput("Do you need to update the Candidate README.textile.series file?", "N") + if answer == "y": + self.rnworkdir.editFile(self._series) + else: + print "The Candidate README.textile.series file has not been updated." + + self.rnworkdir.commit() + + print "Log of last commit on workdir branch:" + self.rnworkdir.log(1) + + # TODO: Don't forget to push template and workdir changes. + # self.rntemplate.pushToBranch("origin/master") + # TODO: Should the workdir know where to push to? + # self.rnworkdir.pushToBranch(self.nextseries.getBranchname()) + + def get_next_series(self): + return self.nextseries + + def get_track_series(self): + return self.trackseries + +# Do we even need these, especially if they're all the same? +class SnapshotRN(RNSeries): + def __init__(self, proj, trackseries, rnrepo=None): + if not isinstance(trackseries, LinaroSeries): + raise TypeError('The series input must be of type LinaroSeries') + + nextseries=trackseries.toNextSnapshot(strict=True) + + super(SnapshotRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=nextseries) + +class CandidateRN(RNSeries): + def __init__(self, proj, trackseries, rnrepo=None): + if not isinstance(trackseries, LinaroSeries): + raise TypeError('The trackseries input must be of type LinaroSeries') + + self.nextseries=trackseries.toNextCandidate(strict=True) + + super(CandidateRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=self.nextseries) + +class ReleaseRN(RNSeries): + def __init__(self, proj, trackseries, rnrepo=None): + if not isinstance(trackseries, LinaroSeries): + raise TypeError('The series input must be of type LinaroSeries') + + nextseries=trackseries.toNextRelease(strict=True) + + super(ReleaseRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=nextseries) + +class TestCandidateRN(unittest.TestCase): + #logging.basicConfig(level="INFO") + testdirprefix="CandidateRNUT" + + @classmethod + def setUpClass(cls): + cls.proj=Proj(prefix=TestCandidateRN.testdirprefix, persist=True) + logging.info('11111 Created Proj dir') + cls.rnrepo=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + logging.info('22222 Created clone dir') + cls.branch=u'snapshots/linaro-6.1-2016.06' + cls.series=linaroSeriesFromBranchname(branch=cls.branch) + + @classmethod + def tearDownClass(cls): + cls.proj.cleanup() + + def test_candidate_newclone(self): + logging.info('33333 Going To Create CandidateRN') + candidatern=CandidateRN(self.proj, rnrepo=self.rnrepo.repodir, trackseries=self.series) + logging.info('44444 Created CandidateRN') + self.assertEqual(candidatern.nextseries.longuppertype(), "Release-Candidate") + self.assertEqual(candidatern.nextseries.longlowertype(), "release-candidate") + self.assertTrue(os.path.isdir(self.proj.projdir + "/release-notes.git")) + self.assertTrue(os.path.isdir(self.proj.projdir + "/candidate")) + self.assertTrue(os.path.isdir(self.proj.projdir + "/template")) + +# def test_candidate_existingclone(self): +# # Since this reuses an existing clone of a repository the +# # release-notes.git directory should actually be a symlink. +# candidatern=CandidateRN(self.proj, trackseries=self.series, rnrepo=TestCandidateRN.rnrepo.repodir) +# self.assertEqual(candidatern.series.longuppertype(), "Release-Candidate") +# self.assertEqual(candidatern.series.longlowertype(), "release-candidate") +# self.assertTrue(os.path.islink(self.proj.projdir + "/release-notes.git")) +# self.assertTrue(os.path.isdir(self.proj.projdir + "/candidate")) +# self.assertTrue(os.path.isdir(self.proj.projdir + "/template")) +# +# self.assertEqual(candidatern.rntemplate.getbranch().split("_")[0], "template") +## self.assertTrue(candidatern.rnworkdir.getbranch().startswith("releases/")) +# +# print "Based on: %s" % self.series.getBranchname() +# print "Template Branch: %s" % candidatern.rntemplate.getbranch() +# print "Candidate Branch: %s" % candidatern.rnworkdir.getbranch() + +if __name__ == '__main__': + unittest.main() diff --git a/linaropy/rn/template.py b/linaropy/rn/template.py new file mode 100644 index 0000000..c036add --- /dev/null +++ b/linaropy/rn/template.py @@ -0,0 +1,94 @@ +import unittest +import logging +import os +import sys +import uuid + +from sh import ls +from ..proj import Proj +from ..git.clone import Clone +from ..git.workdir import Workdir +from ..rninput import yninput + +class TemplateRN(Workdir): + _readme='components/toolchain/binaries/README.textile' + _series='components/toolchain/binaries/README.textile.series' + + def __init__(self, proj, rnclone, branchname_suffix=None): + if branchname_suffix: + branchname='template_' + branchname_suffix + else: + branchname='template_' + str(uuid.uuid4()) + super(TemplateRN, self).__init__(proj, rnclone, "template", track="origin/master", branchname=branchname) + # Todo move template workdir out of detached head state. + + # TODO: Allow the user to edit BOTH the gcc-linaro/ and binaries/ + # README.textile file as currently this function only does the binary one. + # TODO: Create Unit Test + def updateTemplateReadme(self): + answer=yninput("Would you like to update the Template README.textile file?", "N") + if answer != "y": + print "The Template README.textile file has not been updated." + return False + + editor = os.getenv('EDITOR') + if not editor: + editor='/usr/bin/vim' + print "Edit the README.textile template fragment if changes are needed." + os.system(editor + ' ' + self.repodir + '/' + TemplateRN._readme) + + self.add(TemplateRN._readme) + return True + + # TODO: Create Unit Test + def updateTemplateSeries(self): + answer=yninput("Would you like to update the Template README.textile.series file?", "N") + if answer != "y": + print "The Template README.textile.series file has not been updated." + return False + + editor = os.getenv('EDITOR') + if not editor: + editor='/usr/bin/vim' + print "Edit the README.textile.series template fragment if changes are needed." + os.system(editor + ' ' + self.repodir + '/' + TemplateRN._series) + + self.add(TemplateRN._series) + return True + + def getReadme(self): + return self.repodir + '/' + TemplateRN._readme + + def getSeries(self): + return self.repodir + '/' + TemplateRN._series + +class TestTemplateRN(unittest.TestCase): + testdirprefix="TemplateRNUT" + + # Every instance of TestClass requires its own proj directory. + def setUp(self): + self.proj=Proj(prefix=TestTemplateRN.testdirprefix, persist=True) + + def tearDown(self): + self.proj.cleanup() + + def test_getREADME(self): + self.rnrepo=Clone(self.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + self.rntemplate=TemplateRN(self.proj,self.rnrepo) + compare=self.proj.projdir + '/template/' + TemplateRN._readme + self.assertEqual(self.rntemplate.getReadme(),compare) + self.assertTrue(os.path.isfile(self.rntemplate.getReadme())) + + def test_getSERIES(self): + self.rnrepo=Clone(self.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + self.rntemplate=TemplateRN(self.proj,self.rnrepo) + compare=self.proj.projdir + '/template/' + TemplateRN._series + self.assertEqual(self.rntemplate.getSeries(),compare) + self.assertTrue(os.path.isfile(self.rntemplate.getSeries())) + + + # TODO: Test branchname_suffix + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/linaropy/rninput.py b/linaropy/rninput.py new file mode 100644 index 0000000..86d3d90 --- /dev/null +++ b/linaropy/rninput.py @@ -0,0 +1,35 @@ +# For fileIO +import os.path + +# @message - The question to display to the user. +# @default - The default if is hit. +# @accept - The list of valid responses. +# @retry - Whether to retry on a malformed input. +# Returns 'y' or 'no' +def yninput(message, default="n", accept=['yes', 'y', 'no', 'n'], retry=True): + # TODO: Test this + if default.lower() not in accept: + raise TypeError('Default as %s is not in list of accepted responses.' % default) + + default_msg=" [y/N]: " + if default.lower() == "y" or default.lower() == "yes": + default_msg=" [Y/n]: " + + while(1): + answer=raw_input(message + default_msg) or default.lower() + if answer.lower() not in accept and retry: + print "'%s' is an invalid response. Please try again." % answer.lower() + continue + else: + if answer.lower() == "yes" or answer.lower() == "y": + return "y" + return "n" + +# TODO: Test with a directory returned as the answer. +def finput(message,orval): + while(1): + answer=raw_input(message) or orval + if os.path.exists(answer) and os.path.isfile(answer): + return answer + + print "%s doesn't exist or isn't a regular file. Try again." % answer diff --git a/linaropy/series.py b/linaropy/series.py new file mode 100644 index 0000000..3db2c97 --- /dev/null +++ b/linaropy/series.py @@ -0,0 +1,550 @@ +import unittest +import logging +import os +import uuid + +from vers import Spin +from vers import Rc +from vers import Vendor +from vers import Package +from vers import packageFromStr + +from datetime import datetime +from dateutil.relativedelta import relativedelta + + +# A Series stores the instance characteristic that describes the output +class Series(object): + + series = ['candidate', 'snapshot', 'release'] + serieslongupper = ['Release-Candidate', 'Snapshot', 'Release'] + + # @ seriestype - 'type' of either 'candidate', 'snapshot', or 'release' + # @ package - String or Package object representing the series package + # name with version string, e.g., 'GCC-5.3.1' + # @ vendor - String or Vendor object representing the vendor that is + # tagged on this series. + # @ date - String or 'datetime' object representing the YYYY.MM of this + # series. It defaults to 'today' + # @ spin - Optional Spin, str, or int + # @ rc - Optional Rc, str, or int + # This will make sure snapshot doesn't have an rc and release doesn't have + # an rc for instance. + def __init__(self, seriestype, vendor=None, package=None, date=datetime.today(), spin=None, rc=None, strict=True): + if isinstance(spin, Spin): + self.spin=spin + else: + # Spin will raise an exception if spin is the wrong type. + # A None parameter will create a Spin with an internal value of None. + self.spin=Spin(spin) + + if isinstance(rc, Rc): + self.rc=rc + else: + # Rc will raise an exception if rc is the wrong type. A None parameter + # will create an Rc with an internal value of None. + self.rc=Rc(rc) + + if isinstance(date, datetime): + # Force all of the days of the month to 15 to unify comparisons. + date.replace(day=15) + self.date=date + else: + # 'date' is a string. If the input date can't be parsed by datetime + # it will throw an exception. We can't recover from it so just pass + # it up. + if len(date) < 10: + tmpdate=datetime.strptime(date, "%Y.%m") + else: + tmpdate=datetime.strptime(date, "%Y.%m.%d") + # Force all of the days of the month to 15 to unify comparisons. + self.date=tmpdate.replace(day=15) + + # If the input vendor=None just instantiate the default vendor. + if not vendor: + self.vendor=Vendor() + else: + self.vendor=vendor + + if not package: + raise TypeError('Series requires an input package.') + elif isinstance(package, Package): + self.package=package + elif isinstance(package, basestring): + self.package=packageFromStr(package) + else: + # There can't be a defaut package because it requires a version. + raise TypeError("Series 'package' unrecognized type " + str(type(package))) + + # We might want to uniquely identify a particular Series object. + self.uniqueid=str(uuid.uuid4()) + + # We store a seriestype as an integer into an enumeration array + # so that the names can be changed as desired. + try: + self.seriestype = Series.series.index(seriestype.lower()) + except ValueError: + self.seriestype = -1 # no match + raise TypeError('Invalid series type %s.' % seriestype) + + # rc can only be non-None for release-candidates. + if strict: + if self.seriestype == Series.series.index("snapshot"): + if self.rc.val != 0: + raise ValueError('A snapshot series cannot have an rc.') + elif self.seriestype == Series.series.index("release"): + if self.rc.val != 0: + raise ValueError('A release series cannot have an rc.') + + + if self.seriestype == Series.series.index("candidate") and self.rc.val is 0: + raise TypeError('A candidate series must have an rc specified.') + + # namespace/vendor-package-version-YYYY.MM-spin-rc + self.fmt = { + '%N': self.getNamespace, + '%L': self.serieslabel, + '%P': self._getPackageName, + '%p': self._getPackageNameLower, + '%V': self._getVendor, + '%v': self._getVendorLower, + '%E': self._getVersion, + '%D': self._getYYYYMM, + '%d': self.getDir, + '%S': self._getSpin, + '%R': self._getRc, + } + + def _getPackageName(self): + return self.package.pv + + def _getPackageNameLower(self): + return self.package.lower() + + def _getVendor(self): + return self.vendor.__str__() + + def _getVendorLower(self): + return self.vendor.lower() + + def _getVersion(self): + return self.package.version.strfversion("%M%m") + + def _getYYYYMM(self): + return self.date.strftime("%Y.%m") + + def _getSpin(self): + return str(self.spin).strip('-') + + def _getRc(self): + return str(self.rc).strip('-') + + def __format__(self,format_spec): + ret='' + # Iterate across the dictionary and for each key found and invoke the + # function. + # self.fmt[idx]() + import string + ret=format_spec + for key in self.fmt.iterkeys(): + # TOOD: Crap this isn't going to work because each function takes different parameters. + replac=self.fmt[key]() + ret=string.replace(ret,key,replac) + return ret + + def serieslabel(self): + # Only the 'snapshot-' is used in a label. + if self.seriestype == Series.series.index("snapshot"): + return Series.series[self.seriestype] + u'-' + else: + return u'' + + def shorttype(self): + return Series.series[self.seriestype] + + def longlowertype(self): + return Series.serieslongupper[self.seriestype].lower() + + def longuppertype(self): + return Series.serieslongupper[self.seriestype] + + def __str__(self): + return Series.series[self.seriestype] + "_" + self.uniqueid + + def label(self): + label=str(self.vendor) + u'-' + self.serieslabel() + str(self.package) + label=label + u'-' + self.date.strftime("%Y.%m") + label=label + str(self.spin) + str(self.rc) + return label + + def incrementMonth(self): + self.date = self.date + relativedelta(months=1) + + # TODO: provide a format function which can change the separator and + # capitalization, etc. + + def getDir(self): + dirstr=self.package.version.strfversion("%M%m") + dirstr=dirstr + u'-' + self.date.strftime("%Y.%m") + dirstr=dirstr + str(self.spin) + dirstr=dirstr + str(self.rc) + return dirstr + + def getNamespace(self): + namespace=u'releases' + if self.seriestype == Series.series.index("snapshot"): + namespace=u'snapshots' + return namespace + + # TODO: Document what a conformant branch name looks like. + # Return a conformant branch name from the Series information. + def getBranchname(self): + branchname=u'' + if self.seriestype == Series.series.index("snapshot"): + branchname=branchname + u'snapshots/' + else: + branchname=branchname + u'releases/' + + branchname=branchname + self.vendor.lower() + branchname=branchname + u'-' + self.package.version.strfversion("%M%m") + branchname=branchname + u'-' + self.date.strftime("%Y.%m") + branchname=branchname + str(self.spin) + branchname=branchname + str(self.rc) + + return branchname + +# Helper function which creates a Series from a properly formed branch name +# input string. +def seriesFromBranchname(branch=None): + if not isinstance(branch, basestring): + raise TypeError('seriesFromBranchname requires a basestring as input') + + if not branch: + raise ValueError('seriesFromBranchname requires a non-empty string as input') + + # Get the part before the '/'. That's the namespace + try: + namespace=branch.rsplit('/', 1)[0] + except IndexError: + raise RuntimeError('string must be /') + + # Get everything after the '/'. + try: + seriesstr=branch.rsplit('/', 1)[1] + except IndexError: + raise RuntimeError('string must be -') + + if namespace == u'': + raise TypeError("Couldn't parse a namespace from input string") + if seriesstr == u'': + raise TypeError("Couldn't parse a series from input string") + + keys=['vendor', 'version', 'date', 'spin', 'rc'] + values=seriesstr.split('-') + dictionary=dict(zip(keys, values)) + + # if there is no spin but there is an 'rcX' in the spin key:value pair + # it means that there's really no spin but should be in the rc key:value + # pair. + if dictionary.has_key("spin") and "rc" in dictionary["spin"]: + dictionary["rc"]=dictionary["spin"] + dictionary.pop("spin", None) + + # We need to have None fields in the missing keys for when we call the + # Series constructor. + if not dictionary.has_key("rc"): + dictionary["rc"]=None + else: + # strip the "rc" and just leave the int. + if dictionary["rc"].startswith("rc"): + dictionary["rc"]=dictionary["rc"][2:] + + # We need to have None fields in the missing keys for when we call the + # Series constructor. + if not dictionary.has_key("spin"): + dictionary["spin"]=None + + seriesdate=datetime.today + if dictionary["date"]: + datekeys=['year', 'month', 'day'] + datevalues=dictionary["date"].split('.') + datefields=dict(zip(datekeys, datevalues)) + if not datefields.has_key("day"): + datefields["day"]="15" + seriesdate=datetime(int(datefields["year"]), int(datefields["month"]), int(datefields["day"])) + + if "snapshots" in namespace and dictionary["rc"]: + raise RuntimeError('A snapshots namespace can not have an rc. This is a non-conforming input.') + elif "releases" not in namespace and dictionary["rc"]: + raise RuntimeError('An rc must have a "releases" namespace. This is a non-conforming input.') + elif dictionary["rc"]: + seriesname="candidate" + elif "snapshots" in namespace: + seriesname="snapshot" + elif "releases" in namespace: + seriesname="release" + elif "snapshot" in namespace: + raise RuntimeError('"snapshot" is not a complete namespace. A conforming namespace is "snapshots".') + else: + # TODO test for unknown namespace. + raise RuntimeError('Unknown namespace in input string.') + + package=Package(package="GCC", version=dictionary["version"]) + series=Series(seriesname, package=package, date=seriesdate, spin=dictionary["spin"], rc=dictionary["rc"], strict=True) + + return series + +from vers import versionFromStr + +class TestSeries(unittest.TestCase): + + def test_missing_seriestype(self): + with self.assertRaises(TypeError): + spin=Spin() + rc=Rc() + candidate=Series(package="GCC-5.3.1", spin=spin, rc=rc) + + def test_no_match(self): + with self.assertRaises(TypeError): + candidate=Series("foobar", package="GCC-5.3.1") + + def test_partial_seriestype_match(self): + with self.assertRaises(TypeError): + candidate=Series("candid", package="GCC-5.3.1") + + def test_excessive_seriestype_match(self): + with self.assertRaises(TypeError): + candidate=Series("candidate2", package="GCC-5.3.1") + + def test_match_release(self): + release=Series("release", package="GCC-5.3.1") + self.assertEqual(str(release).split("_")[0], "release") + + def test_match_candidate_wrongcase(self): + candidate=Series("Candidate", package="GCC-5.3.1",rc="1") + self.assertEqual(str(candidate).split("_")[0], "candidate") + + def test_match_snapshot_wrongcase(self): + snapshot=Series("SNAPSHOT", package="GCC-5.3.1") + self.assertEqual(str(snapshot).split("_")[0], "snapshot") + + def test_longlowertype_candidate(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="1") + self.assertEqual(candidate.longlowertype(), "release-candidate") + + def test_longuppertype_candidate(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="4") + self.assertEqual(candidate.longuppertype(), "Release-Candidate") + + def test_shorttype_candidate(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="1") + self.assertEqual(candidate.shorttype(), "candidate") + + def test_serieslabel_candidate(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="1") + self.assertEqual(candidate.serieslabel(), "") + + def test_serieslabel_release(self): + candidate=Series("release", package="GCC-5.3.1") + self.assertEqual(candidate.serieslabel(), "") + + def test_serieslabel_snapshot(self): + candidate=Series("snapshot", package="GCC-5.3.1") + self.assertEqual(candidate.serieslabel(), "snapshot-") + + def test_empty_rc(self): + rc=Rc(7) + candidate=Series("candidate", package="GCC-5.3.1", rc=rc) + self.assertEqual(candidate.rc.vers, 7) + self.assertEqual(str(candidate.rc), "FOO") + + def test_empty_rc(self): + rc=Rc() + release=Series("release", package="GCC-5.3.1", rc=rc) + self.assertEqual(str(release.rc), "") + + release2=Series("release", package="GCC-5.3.1") + self.assertEqual(str(release2.rc), "") + + def test_specified_rc(self): + rc=Rc(7) + candidate=Series("candidate", package="GCC-5.3.1", rc=rc) + self.assertEqual(candidate.rc.val, 7) + self.assertEqual(str(candidate.rc), "-rc7") + + def test_candidate_with_no_rc(self): + with self.assertRaises(TypeError): + candidate=Series("candidate", package="GCC-5.3.1") + + with self.assertRaises(TypeError): + rc=Rc() + candidate2=Series("candidate", package="GCC-5.3.1", rc=rc) + + def test_rc_as_string(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="7") + self.assertEqual(candidate.rc.val, 7) + self.assertEqual(str(candidate.rc), "-rc7") + + def test_rc_as_int(self): + candidate=Series("candidate", package="GCC-5.3.1", rc=7) + self.assertEqual(candidate.rc.val, 7) + self.assertEqual(str(candidate.rc), "-rc7") + + def test_rc_as_typeerror(self): + floatrc=7.0 + with self.assertRaises(TypeError): + snapshot=Series("snapshot", package="GCC-5.3.1", rc=floatrc) + + def test_rc_as_negative(self): + with self.assertRaises(ValueError): + snapshot=Series("snapshot", package="GCC-5.3.1", rc="-1") + + def test_missing_spin(self): + snapshot=Series("snapshot", package="GCC-5.3.1") + self.assertEqual(snapshot.spin.val, 0) + self.assertEqual(str(snapshot.spin), "") + + def test_specified_spin(self): + spin=Spin(7) + snapshot=Series("snapshot", package="GCC-5.3.1", spin=spin) + self.assertEqual(snapshot.spin.val, 7) + self.assertEqual(str(snapshot.spin), "-7") + + def test_empty_spin(self): + spin=Spin() + snapshot=Series("snapshot", package="GCC-5.3.1", spin=spin) + self.assertEqual(str(snapshot.spin), "") + + def test_spin_as_string(self): + snapshot=Series("snapshot", spin="7", package="GCC-5.3.1") + self.assertEqual(snapshot.spin.val, 7) + self.assertEqual(str(snapshot.spin), "-7") + + def test_spin_as_int(self): + snapshot=Series("snapshot", spin=7, package="GCC-5.3.1") + self.assertEqual(snapshot.spin.val, 7) + self.assertEqual(str(snapshot.spin), "-7") + + def test_spin_as_typeerror(self): + floatspin=7.0 + with self.assertRaises(TypeError): + snapshot=Series("snapshot", spin=floatspin, package="GCC-5.3.1") + + def test_spin_as_negative(self): + with self.assertRaises(ValueError): + snapshot=Series("snapshot", spin="-1", package="GCC-5.3.1") + + def test_empty_spin_and_rc(self): + release=Series("release", package="GCC-5.3.1") + self.assertEqual(release.spin.val, 0) + self.assertEqual(release.rc.val, 0) + self.assertEqual(str(release.spin), "") + self.assertEqual(str(release.rc), "") + + def test_package_as_Package(self): + package = Package("GCC", "5.3.1") + release=Series("release", package) + self.assertEqual(str(release.package), "GCC-5.3.1") + + def test_package_as_Package(self): + # Create a Version instead of a package + package = versionFromStr("5.3.1") + with self.assertRaises(TypeError): + candidate=Series("candidate", package) + + def test_package_as_None(self): + package=None + with self.assertRaises(TypeError): + candidate=Series("candidate", package) + + def test_getbranchname(self): + candidate=Series("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc=None) + self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1") + + def test_date_string(self): + candidate=Series("candidate", package="GCC-5.3.1", date="2016.05.27", spin="1", rc="1") + self.assertEqual(datetime(2016,05,15),candidate.date) + + candidate2=Series("candidate", package="GCC-5.3.1", date="2016.05", spin="1", rc="1") + self.assertEqual(datetime(2016,05,15),candidate2.date) + + with self.assertRaises(TypeError): + candidate3=Series("candidate", package="GCC-5.3.1", date=datetime("20161034234"), spin="1", rc="1") + + def test_series_strict_true(self): + with self.assertRaises(ValueError): + snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=True) + + with self.assertRaises(ValueError): + snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + + with self.assertRaises(ValueError): + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=True) + + with self.assertRaises(ValueError): + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + + def test_series_strict_false(self): + snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6", rc="1", strict=False) + self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-6-rc1") + + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=False) + self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") + + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), rc="1", strict=False) + self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-rc1") + + # TODO: Test combinations of branch names with and without spins/rcs. + + # TODO: Test whether seriesFromBranchname can detect non-conforming branch names. + + # These tests will verify that a branchname can be read, a series created, + # and then the same branch name recreated. + def test_seriesFromBranchname(self): + branch=u'snapshots/linaro-5.3-2016.05-6-rc1' + with self.assertRaises(RuntimeError): + series=seriesFromBranchname(branch=branch) + + branch2=u'snapshots/linaro-5.3-2016.05-6' + series2=seriesFromBranchname(branch=branch2) + self.assertEqual(series2.getBranchname(),u'snapshots/linaro-5.3-2016.05-6') + + branch3=u'snapshots/linaro-5.3-2016.05' + series3=seriesFromBranchname(branch=branch3) + self.assertEqual(series3.getBranchname(),u'snapshots/linaro-5.3-2016.05') + + branch4=u'releases/linaro-5.3-2016.05-6-rc1' + series4=seriesFromBranchname(branch=branch4) + self.assertEqual(series4.getBranchname(),u'releases/linaro-5.3-2016.05-6-rc1') + + branch5=u'releases/linaro-5.3-2016.05-rc1' + series5=seriesFromBranchname(branch=branch5) + self.assertEqual(series5.getBranchname(),u'releases/linaro-5.3-2016.05-rc1') + + branch6=u'releases/linaro-5.3-2016.05-6' + series6=seriesFromBranchname(branch=branch6) + self.assertEqual(series6.getBranchname(),u'releases/linaro-5.3-2016.05-6') + + branch7=u'releases/linaro-5.3-2016.05' + series7=seriesFromBranchname(branch=branch7) + self.assertEqual(series7.getBranchname(),u'releases/linaro-5.3-2016.05') + + branch8=u'snapshots/linaro-5.3-2016.05-6-rc1' + with self.assertRaises(RuntimeError): + series8=seriesFromBranchname(branch=branch8) + + branch9=u'snapshot/linaro-5.3-2016.05-6-rc1' + with self.assertRaises(RuntimeError): + series9=seriesFromBranchname(branch=branch9) + + branch10=u'snapshot/linaro-5.3-2016.05-6' + with self.assertRaises(RuntimeError): + series10=seriesFromBranchname(branch=branch10) + + # TODO: Test series.label (as there was a runtime bug) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/linaropy/vers.py b/linaropy/vers.py new file mode 100644 index 0000000..6126a01 --- /dev/null +++ b/linaropy/vers.py @@ -0,0 +1,615 @@ +import unittest +import logging + +# Abstract baseclass for Vendor and Package that provides some basic +# typechecking. +class PV(object): + def __init__(self, pv=None): + if isinstance(pv, basestring): + self.pv=pv + else: + raise TypeError("Input to PV must be of type basestring.") + + def __str__(self): + return self.pv + + def lower(self): + return self.pv.lower() + +class Vendor(PV): + def __init__(self, vendor="Linaro"): + try: + super(Vendor,self).__init__(vendor) + except TypeError: + raise TypeError("Input 'vendor' must be of type basestring.") + +class Version(object): + # @major - required int or str representation of int. + # @minor - optional int or str representation of int, otherwise None. + # If None, then minor will not be output. + # @point - optional int or str representation of int, otherwise None. + # If None, then point will not be output. + def __init__(self, major, minor=None, point=None): + + if isinstance(major, int) or isinstance(major, basestring): + # This will throw an exception if 'major' is an empty string. + self.major=int(major) + else: + raise TypeError("major must be of type int, string.") + + if isinstance(minor, int) or isinstance(minor, basestring): + # This will throw an exception if 'minor' is an empty string. + self.minor=int(minor) + elif not minor: + self.minor=None + else: + raise TypeError("minor must be of type int, string, or None.") + + if isinstance(point, int) or isinstance(point, basestring): + + # This is illegal, as it'd mess up the output and make it look like + # there's a minor that's really a point. + if not minor and point: + raise RuntimeError("minor must be specified if point is specified.") + + # This will throw an exception if 'minor' is an empty string. + self.point=int(point) + elif not point: + self.point=None + else: + raise TypeError("point must be of type int, string, or None.") + + def __str__(self): + ver=str(self.major) + if self.minor: + ver=ver + u'.' + str(self.minor) + if self.point: + ver=ver + u'.' + str(self.point) + return ver + + # TODO: I'm sure there's a more pythonic way to do this but this is + # quick and dirty. + # %M - Major + # %m - Minor + # %p - Point + # delimiter - Delimiter character to use in output: space or dash. + # TODO: Use a dictionary with replace + def strfversion(self, formatstr, delimiter='.'): + toks=[str(x) for x in formatstr.split('%') if x.strip()] + + delstoks=len(toks) + verstoks=0 + + if self.major: + verstoks=verstoks+1 + if self.minor: + verstoks=verstoks+1 + if self.point: + verstoks=verstoks+1 + + if delstoks > verstoks: + numtoks=verstoks - 1 + else: + numtoks=delstoks - 1 + + vers=u'' + for tok in toks: + if tok == 'M': + vers=vers + str(self.major) + elif tok == 'm': + vers=vers + str(self.minor) + elif tok == 'p': + vers=vers + str(self.point) + if numtoks > 0: + vers=vers + delimiter + numtoks=numtoks-1 + return vers + +# Helper function which returns a Version. +def versionFromStr(version): + if not isinstance(version, basestring): + raise TypeError('input must be of type basestring') + + major=version.split('.', 1)[0] + + # split('.',1) will return index error on "5" but will return empty string on "5." + # Any empty string is not a valid input into Version. + if major == u'': + major=None + + try: + minor=version.split('.', 2)[1] + except IndexError: + minor=None + + if minor == u'': + minor=None + + try: + point=version.split('.', 3)[2] + except IndexError: + point=None + + if point == u'': + point=None + + return Version(major, minor, point) + +class Package(PV): + # @package - default = GCC + # @version - required. Either a string representing a version or a Version + # instance. + def __init__(self, package="GCC", version=None): + #TODO: Make sure package is not None. + if package == u'': + raise TypeError("Package must not be an empty string") + + # TODO: Provide a list of known inputs and test against those? + try: + super(Package,self).__init__(package) + except TypeError: + raise TypeError("Input 'package' must be of type basestring.") + + if not version: + raise RuntimeError("Package requires a version.") + elif isinstance(version, Version): + self.version=version + else: + self.version=versionFromStr(version) + + def get_package(self): + return super(Package,self).__str__() + + def get_version(self): + return str(self.version) + + def __str__(self): + return super(Package,self).__str__() + u'-' + str(self.version) + + def lower(self): + return super(Package,self).lower() + u'-' + str(self.version) + + #TODO: Create format function which can change the separator to ' ' or to '-'. + +def packageFromStr(package=None): + if not isinstance(package, basestring): + raise TypeError('packageFromStr requires a string as input') + + # Get the part before the version numbers. That's the package + try: + package_name=package.rsplit('-', 1)[0] + except IndexError: + raise RuntimeError('string must be -') + + try: + package_version=package.rsplit('-', 1)[1] + except IndexError: + raise RuntimeError('string must be -') + + if package_name == u'': + raise TypeError("Couldn't parse a package name from input string") + if package_version == u'': + raise TypeError("Couldn't parse a package version from input string") + + # This will throw exceptions if the input are malformed. + package=Package(package_name, package_version) + return package + +class Vers(object): + def __init__(self, val=None): + + if val is None: + self.val=0 + return + + if isinstance(val, int) or isinstance(val, basestring): + # This should raise an exception if the string can't be converted + # to an int. If val is already an int it's a nop. + intval=int(val) + + if intval < 0: + raise ValueError('Input val must not be negative') + self.val=intval + else: + raise TypeError("Input val must be of type int or string.") + + def increment(self): + self.val=self.val+1 + + # Return the "-" string or an empty string. + def __str__(self): + # Don't return + if self.val is None or self.val == 0: + return "" + else: + return "-" + str(self.val) + +class Spin(Vers): + def __init__(self, spin=None): + try: + super(Spin,self).__init__(spin) + except ValueError: + raise ValueError('Input spin must not be negative') + +class Rc(Vers): + def __init__(self, rc=None): + try: + super(Rc,self).__init__(rc) + except ValueError: + raise ValueError('Input rc must not be negative') + + # The string representation differs slighty from Spin + def __str__(self): + # Don't return + if self.val is None or self.val == 0: + return "" + else: + return "-rc" + str(self.val) + +class TestVersion(unittest.TestCase): + def test_no_major(self): + with self.assertRaises(TypeError): + version=Version(major=None) + with self.assertRaises(TypeError): + version=Version() + + def test_empty_major(self): + with self.assertRaises(ValueError): + version=Version(major="") + + def test_empty_minor(self): + with self.assertRaises(ValueError): + version=Version(major=5, minor="") + + def test_empty_point(self): + with self.assertRaises(ValueError): + version=Version(major=5, minor=3, point="") + + def test_non_int_major(self): + with self.assertRaises(ValueError): + version=Version('A') + + def test_int_major(self): + version=Version(5) + self.assertEqual(str(version), "5") + version2=Version(major=5) + self.assertEqual(str(version2), "5") + + def test_intstr_major(self): + version=Version("5") + self.assertEqual(str(version), "5") + version2=Version(major="5") + self.assertEqual(str(version2), "5") + + def test_non_int_minor(self): + with self.assertRaises(ValueError): + version=Version(major="5", minor="A") + + def test_int_minor(self): + version=Version(major=5, minor=3) + self.assertEqual(str(version), "5.3") + + def test_intstr_minor(self): + version=Version(major="5", minor="3") + self.assertEqual(str(version), "5.3") + + def test_no_minor(self): + version=Version(major="5", minor=None) + self.assertEqual(str(version), "5") + + def test_point_no_minor(self): + with self.assertRaises(RuntimeError): + version=Version(major=5, minor=None, point=1) + + def test_int_point(self): + version=Version(major=5, minor=3, point=1) + self.assertEqual(str(version), "5.3.1") + + def test_non_int_point(self): + with self.assertRaises(ValueError): + version=Version(major=5, minor=3, point="A") + + def test_intstr_point(self): + version=Version(major="5", minor="3", point="1") + self.assertEqual(str(version), "5.3.1") + + def test_no_point(self): + version=Version(major="5", minor="3", point=None) + self.assertEqual(str(version), "5.3") + + def test_strfversion(self): + version=Version(major="5", minor="3", point="1") + self.assertEqual(version.strfversion("%M%m%p", delimiter='-'),"5-3-1") + self.assertEqual(version.strfversion("%M%m%p"),"5.3.1") + +class TestVendor(unittest.TestCase): + def test_vendor_default(self): + vendor=Vendor() + self.assertEqual(str(vendor), "Linaro") + self.assertEqual(vendor.lower(), "linaro") + + def test_none_input(self): + with self.assertRaises(TypeError): + vendor=Vendor(vendor=None) + + def test_str_input(self): + vendor=Vendor("TestVendor") + self.assertEqual(str(vendor), "TestVendor") + self.assertEqual(vendor.lower(), "testvendor") + +class TestPackage(unittest.TestCase): + + def test_package_no_version(self): + with self.assertRaises(RuntimeError): + # We require a version. + package=Package() + + def test_package_default(self): + package=Package(version="5.3.1") + self.assertEqual(str(package), "GCC-5.3.1") + self.assertEqual(package.lower(), "gcc-5.3.1") + self.assertEqual(package.pv, "GCC") + + def test_none_package(self): + with self.assertRaises(TypeError): + package=Package(package=None, version="5.3.1") + + def test_none_version(self): + with self.assertRaises(RuntimeError): + package=Package(package="GCC", version=None) + + def test_with_Version(self): + version=Version(5,3,1) + package=Package(version=version) + self.assertEqual(str(package), "GCC-5.3.1") + self.assertEqual(package.lower(), "gcc-5.3.1") + + def test_package_version(self): + version=Version(5,3,1) + package=Package(version=version) + self.assertEqual(str(package.version), "5.3.1") + self.assertEqual(package.get_version(), "5.3.1") + self.assertEqual(package.get_package(), "GCC") + + def test_str_input(self): + package=Package("TestPackage", version="5.3.1") + self.assertEqual(str(package), "TestPackage-5.3.1") + self.assertEqual(package.lower(), "testpackage-5.3.1") + + def test_empty_package_name(self): + with self.assertRaises(TypeError): + package=Package(package="", version="5.3.1") + +class TestSpin(unittest.TestCase): + + def test_spin_val(self): + spin=Spin(1) + self.assertEqual(spin.val, 1) + + def test_spin_val_str(self): + spin=Spin(1) + self.assertEqual(str(spin), "-1") + + def test_spin_increment_val(self): + spin=Spin(1) + spin.increment() + self.assertEqual(spin.val, 2) + + def test_spin_increment_str(self): + spin=Spin(1) + spin.increment() + self.assertEqual(str(spin), "-2") + + def test_zero_spin(self): + spin=Spin(0) + self.assertEqual(spin.val, 0) + + def test_zero_spin_str(self): + spin=Spin(0) + self.assertEqual(str(spin), "") + + def test_none_spin(self): + spin=Spin(None) + self.assertEqual(spin.val, 0) + + def test_none_spin_increment(self): + spin=Spin(None) + spin.increment() + self.assertEqual(spin.val, 1) + + def test_none_spin_str(self): + spin=Spin(None) + self.assertEqual(str(spin), "") + + def test_default_spin(self): + spin=Spin() + self.assertEqual(spin.val, 0) + + def test_default_spin_str(self): + spin=Spin() + self.assertEqual(str(spin), "") + + def test_negative_val(self): + with self.assertRaises(ValueError): + spin=Spin(-1) + + def test_default_spin_str(self): + spin=Spin() + self.assertEqual(str(spin), "") + + def test_spin_string_input(self): + spin=Spin("9") + self.assertEqual(spin.val, 9) + self.assertEqual(str(spin), "-9") + + def test_spin_negative_string_input(self): + with self.assertRaises(ValueError): + spin=Spin("-9") + + def test_float_type(self): + with self.assertRaises(TypeError): + spin=Spin(7.0) + +class TestRc(unittest.TestCase): + def test_rc_val(self): + rc=Rc(1) + self.assertEqual(rc.val, 1) + + def test_rc_str(self): + rc=Rc(1) + self.assertEqual(str(rc), "-rc1") + + def test_negative_val(self): + with self.assertRaises(ValueError): + rc=Rc(-1) + + def test_zero_rc_str(self): + rc=Rc(0) + self.assertEqual(str(rc), "") + + def test_none_rc_str(self): + rc=Rc(None) + self.assertEqual(str(rc), "") + + def test_default_rc_str(self): + rc=Rc() + self.assertEqual(str(rc), "") + + def test_rc_increment_val(self): + rc=Rc(1) + rc.increment() + self.assertEqual(rc.val, 2) + + def test_rc_increment_str(self): + rc=Rc(1) + rc.increment() + self.assertEqual(str(rc), "-rc2") + + def test_none_rc_increment_str(self): + rc=Rc(None) + rc.increment() + self.assertEqual(str(rc), "-rc1") + + def test_default_rc_str(self): + rc=Rc() + self.assertEqual(str(rc), "") + + def test_rc_string_input(self): + rc=Rc("9") + self.assertEqual(rc.val, 9) + self.assertEqual(str(rc), "-rc9") + + def test_rc_negative_string_input(self): + with self.assertRaises(ValueError): + rc=Rc("-9") + + def test_float_type(self): + with self.assertRaises(TypeError): + rc=Rc(7.0) + +class TestPackageFromString(unittest.TestCase): + + def test_none(self): + with self.assertRaises(TypeError): + package=packageFromStr(package=None) + + def test_with_package(self): + package=Package("GCC", "5.3.1") + with self.assertRaises(TypeError): + package=packageFromStr(package=package) + + def test_with_no_package_name(self): + with self.assertRaises(TypeError): + package=packageFromStr(package="-5.3.1") + + with self.assertRaises(RuntimeError): + package=packageFromStr(package="5.3.1") + + def test_with_no_package_version(self): + with self.assertRaises(TypeError): + package=packageFromStr(package="GCC-") + + with self.assertRaises(RuntimeError): + package=packageFromStr(package="GCC") + + def test_with_space(self): + with self.assertRaises(RuntimeError): + package=packageFromStr(package="GCC 5.3.1") + + def test_correct_usage(self): + package=packageFromStr(package="GCC-5.3.1") + self.assertEqual(str(package), "GCC-5.3.1") + self.assertEqual(package.lower(), "gcc-5.3.1") + self.assertEqual(package.pv, "GCC") + self.assertEqual(str(package.version), "5.3.1") + self.assertEqual(package.version.major, 5) + self.assertEqual(package.version.minor, 3) + self.assertEqual(package.version.point, 1) + +class TestVersionFromStr(unittest.TestCase): + + def test_extra_dot(self): + version=versionFromStr("5.3.1.4") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, 3) + self.assertEqual(version.point, 1) + + def test_major_minor_point(self): + version=versionFromStr("5.3.1") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, 3) + self.assertEqual(version.point, 1) + + def test_major_minor_dot(self): + version=versionFromStr("5.3.") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, 3) + self.assertEqual(version.point, None) + + def test_major_minor(self): + version=versionFromStr("5.3") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, 3) + self.assertEqual(version.point, None) + + def test_major_dot(self): + version=versionFromStr("5.") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, None) + self.assertEqual(version.point, None) + + def test_major(self): + version=versionFromStr("5") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, None) + self.assertEqual(version.point, None) + + def test_empty(self): + with self.assertRaises(TypeError): + version=versionFromStr("") + + def test_non_int_major(self): + with self.assertRaises(ValueError): + version=versionFromStr("a.b.c") + + def test_non_int_minor(self): + with self.assertRaises(ValueError): + version=versionFromStr("5.b.1") + + def test_non_int_point(self): + with self.assertRaises(ValueError): + version=versionFromStr("5.3.a") + + def test_non_string(self): + version=Version(5,3,1) + with self.assertRaises(TypeError): + version=versionFromStr(version) + + def test_none(self): + with self.assertRaises(TypeError): + version=versionFromStr(version=None) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() + diff --git a/linaroseries.py b/linaroseries.py deleted file mode 100644 index 382ed08..0000000 --- a/linaroseries.py +++ /dev/null @@ -1,362 +0,0 @@ -import unittest -import copy - -from series import Series -from series import seriesFromBranchname -from datetime import datetime -from dateutil.relativedelta import relativedelta -from vers import Spin -from vers import Rc - - -# Progression: -# The Linaro release progression follows. -# -# Snapshot 2016.01 -# spin 1 2016.01-1 -# spin 2 2016.01-2 -# Candidate 2016.01-rc1 -# rc 2 2016.01-rc2 -# Release 2016.01 -# -# Note: Remember, if a snapshot is respun to fix a -# bug then the fix should be cherry-picked into the -# release branch and a release-candidate spun from -# from the release branch. -# -# Candidate 2016.01-1-rc1 -# rc 2 2016.01-1-rc2 -# Release 2016.01-1 - -# Inherit Series in order to provide Linaro specific rules on transitions -# between series types. The baseclass Series doesn't have the toNext* -# functions defined. -class LinaroSeries(Series): - def __init__(self, seriestype, vendor=None, package=None, date=datetime.today(), spin=None, rc=None, strict=True): - - # This is a dispatch table so that we can use a 'toNext' function based - # on an input type and it will call the correct toNextFoo function. - # This will work if the key is an integer or a string. Ideally this - # would be added to Series and we wouldn't need to derive, but I - # couldn't figure out how to get the pointer tables correct. - self.dispatchnext = { - Series.series.index("candidate"): self.toNextCandidate, - 'candidate': self.toNextCandidate, - Series.series.index("snapshot"): self.toNextSnapshot, - 'snapshot': self.toNextSnapshot, - Series.series.index("release"): self.toNextRelease, - 'release': self.toNextRelease, - } - - super(LinaroSeries,self).__init__(seriestype, vendor, package, date,spin,rc, strict) - - def toNextCandidate(self, date=None, strict=False): - if self.seriestype < 0 or self.seriestype >= len(Series.series): - raise TypeError('toNextCandidate on an unknown series type.') - - candidate=copy.deepcopy(self) - candidate.seriestype = Series.series.index("candidate") - - if date: - if not isinstance(date,datetime): - raise TypeError('date is not of type datetime.') - candidate.date=date - - if self.seriestype == Series.series.index("candidate"): - candidate.rc.increment() - elif self.seriestype == Series.series.index("release"): - rc=Rc(1) - candidate.rc=rc - candidate.spin.increment() - elif self.seriestype == Series.series.index("snapshot"): - spin=Spin(None) - rc=Rc(1) - candidate.rc=rc - candidate.spin=spin - - # If the user hasn't specified a date, the implicit behavior is to - # increment the date by 1 month when moving from a snapshot to a - # candidate. - if not date: - candidate.incrementMonth() - # If the user did specify a date and we're in strict mode we need to - # verify that the date they chose is exactly one month difference. - elif strict: - if candidate.date != self.date + relativedelta(months=1): - raise ValueError('toNextCandidate invoked with strict=True. Snapshot date to candidate date can only be +1 month difference.') - return candidate - - if strict and candidate.date != self.date: - raise ValueError('Candidate date can only change if current series is a Snapshot when toNextCandidate is invoked with strict=True.') - - return candidate - - def toNextRelease(self, date=None, strict=False): - if self.seriestype < 0 or self.seriestype >= len(Series.series): - raise TypeError('toNextRelease called on an unknown series type.') - elif self.seriestype == Series.series.index("release"): - raise TypeError('A release series can not be the basis for another release. Move to a release candidate first.') - elif self.seriestype == Series.series.index("snapshot"): - raise TypeError('A snapshot series can not be the basis for a release. Move to a release candidate first.') - - # Only a candidate can turn into a release. - release=copy.deepcopy(self) - # if we have a candidate and ask for a release rc needs to be None. - rc=Rc() - release.rc=rc - release.seriestype = Series.series.index("release") - - if date: - if not isinstance(date,datetime): - raise TypeError('date is not of type datetime.') - release.date=date - - if strict and release.date != self.date: - raise ValueError('Release date can not change as toNextRelease was invoked with strict=True') - - return release - - def toNextSnapshot(self, date=None, strict=False): - if self.seriestype < 0 or self.seriestype >= len(Series.series): - raise TypeError('toNextRelease called on an unknown series type.') - elif self.seriestype == Series.series.index("candidate"): - raise TypeError('Series of type candidate can not be the basis for a snapshot.') - if self.seriestype == Series.series.index("release"): - raise TypeError('Series of type release can not be the basis for a snapshot.') - - # Only snapshots can be snapshots next. - snapshot=copy.deepcopy(self) - snapshot.seriestype = Series.series.index("snapshot") - snapshot.spin.increment() - - if date: - if not isinstance(date,datetime): - raise TypeError('date is not of type datetime.') - snapshot.date=date - - if strict and snapshot.date != self.date: - raise ValueError('Snapshot date can not change as toNextSnapshot was invoked with strict=True') - - return snapshot - - # toNext will take a 'key' as either a string representing the - # Series.series type or the index of the Series.series type, e.g., - # 'candidate' or Series.series.index("candidate") and it will forward - # to the correct toNext* function as mapped in the dispatchnext - # dictionary. - def toNext(self, seriestype, date=None, strict=False): - return self.dispatchnext[seriestype](date, strict) - -# Helper function which creates a LinaroSeries from a properly formed branch name -# input string. -def linaroSeriesFromBranchname(branch=None): - - # Create a Series using the helper function and then populate a - # LinaroSeries. TODO: There's probably a better way to do this such - # as a SeriesFactory that is overridden. - series=seriesFromBranchname(branch) - linaroseries=LinaroSeries(Series.series[series.seriestype], vendor="linaro", package=series.package ,date=series.date , spin=series.spin, rc=series.rc, strict=True ) - return linaroseries - -class TestLinaroSeries(unittest.TestCase): - - def test_candidate_to_candidate(self): - candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") - candidate2=candidate.toNextCandidate() - self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.05-1-rc2") - self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) - self.assertEqual(candidate.date, candidate2.date) - - #TODO test date and strict=True - - def test_release_to_candidate(self): - # Test where spin is None and needs to be incremented. - release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15)) - candidate=release.toNextCandidate() - self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") - self.assertEqual(candidate.seriestype, Series.series.index("candidate")) - self.assertEqual(candidate.date, release.date) - - # Now test where spin is already non-zero. - release2=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") - candidate2=release2.toNextCandidate() - self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.05-2-rc1") - self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) - - #TODO test date and strict=True - - def test_snapshot_to_candidate(self): - snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") - candidate=snapshot.toNextCandidate() - self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.06-rc1") - self.assertEqual(candidate.seriestype, Series.series.index("candidate")) - self.assertNotEqual(candidate.date, snapshot.date) - # Verify that it's one month apart. - self.assertEqual(candidate.date, snapshot.date + relativedelta(months=1)) - - # Make sure that the default date increment is just one month. - candidate2=snapshot.toNextCandidate(strict=True) - self.assertEqual(candidate2.date, snapshot.date + relativedelta(months=1)) - - # whether strict=True or not. - candidate3=snapshot.toNextCandidate(strict=False) - self.assertEqual(candidate3.date, snapshot.date + relativedelta(months=1)) - - # If the user passed a date and strict=yes, make sure the increment is only one month. - candidate4=snapshot.toNextCandidate(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) - plusonem=snapshot.date + relativedelta(months=1) - self.assertEqual(candidate4.date,plusonem) - - # Make sure the date can be set without error when strict is False - candidate5=snapshot.toNextCandidate(date=datetime.strptime("2016.07.15", "%Y.%m.%d"), strict=False) - self.assertEqual(candidate5.date, snapshot.date + relativedelta(months=2)) - - with self.assertRaises(ValueError): - # Attempting to increment the date when strict=True should raise an exception. - candidate6=snapshot.toNextCandidate(date=datetime.strptime("2016.07.15", "%Y.%m.%d"), strict=True) - - def test_unknown_to_candidate(self): - unknown=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") - # Purposely override with an incorrect seriestype. - unknown.seriestype=99 - with self.assertRaises(TypeError): - candidate=unknown.toNextCandidate() - - def test_candidate_to_release(self): - candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") - release=candidate.toNextRelease() - self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1") - self.assertEqual(release.seriestype, Series.series.index("release")) - self.assertEqual(candidate.date, release.date) - - release2=candidate.toNextRelease(strict=True) - self.assertEqual(release2.getBranchname(), "releases/linaro-5.3-2016.05-1") - self.assertEqual(release2.date, candidate.date) - - with self.assertRaises(ValueError): - # Attempting to increment the date when strict=True should raise an exception. - release3=candidate.toNextRelease(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) - - # If strict=False and the date is changed, go ahead and change it. - release4=candidate.toNextRelease(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=False) - self.assertEqual(release4.getBranchname(), "releases/linaro-5.3-2016.06-1") - self.assertNotEqual(release4.date, release.date) - - with self.assertRaises(TypeError): - release5=candidate.toNextRelease("2016.06", strict=False) - - def test_snapshot_to_release(self): - snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") - with self.assertRaises(TypeError): - release=snapshot.toNextRelease() - - def test_release_to_release(self): - release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") - with self.assertRaises(TypeError): - release2=release.toNextRelease() - - def test_unknown_to_release(self): - unknown=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") - # Purposely override with an incorrect seriestype. - unknown.seriestype=99 - with self.assertRaises(TypeError): - release=unknown.toNextRelease() - - def test_candidate_to_snapshot(self): - candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") - with self.assertRaises(TypeError): - snapshot=candidate.toNextSnapshot() - # This would/should be the case if this wasn't and invalid option. - # self.assertEqual(snapshot.seriestype, "snapshot") - - def test_release_to_snapshot(self): - release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") - with self.assertRaises(TypeError): - snapshot=release.toNextSnapshot() - - def test_snapshot_to_snapshot(self): - snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") - snapshot2=snapshot.toNextSnapshot() - - self.assertEqual(snapshot2.getBranchname(), "snapshots/linaro-5.3-2016.05-7") - self.assertEqual(snapshot2.seriestype, Series.series.index("snapshot")) - self.assertEqual(snapshot.date, snapshot2.date) - - # Verify that the original hasn't changed. This verifies that a - # deepcopy is used on the toNext* functions. - self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-6") - - snapshot3=snapshot.toNextSnapshot(strict=True) - self.assertEqual(snapshot3.getBranchname(), "snapshots/linaro-5.3-2016.05-7") - self.assertEqual(snapshot3.seriestype, Series.series.index("snapshot")) - self.assertEqual(snapshot.date, snapshot3.date) - - with self.assertRaises(ValueError): - # Attempting to increment the date when strict=True should raise an exception. - snapshot4=snapshot.toNextSnapshot(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) - - # If strict=False and the date is changed, go ahead and change it. - snapshot5=snapshot.toNextSnapshot(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=False) - self.assertEqual(snapshot5.getBranchname(), "snapshots/linaro-5.3-2016.06-7") - self.assertNotEqual(snapshot5.date, snapshot.date) - - # Verify that a date as a string generates an exception. - with self.assertRaises(TypeError): - snapshot6=snapshot.toNextSnapshot("2016.06.15", strict=False) - - def test_unknown_to_snapshot(self): - unknown=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") - # Purposely override with an incorrect seriestype. - unknown.seriestype=99 - with self.assertRaises(TypeError): - snapshot=unknown.toNextSnapshot() - - def test_snapshot_toNext(self): - snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") - self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-1") - - candidate=snapshot.toNext("candidate") - # A Snapshot -> Candidate traversal always results in spin being set to zero - # and the first release candidate being created. - self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.06-rc1") - self.assertEqual(candidate.seriestype, Series.series.index("candidate")) - - # Make sure toNext works with 'index' as well as the type string. - candidate2=snapshot.toNext(Series.series.index("candidate")) - # A Snapshot -> Candidate traversal always results in spin being set to zero - # and the first release candidate being created. - self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.06-rc1") - self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) - - with self.assertRaises(TypeError): - candidate=snapshot.toNext("release") - candidate=snapshot.toNext(Series.series.index("release")) - - def test_toNext_incorrect_keys(self): - snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") - self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-1") - with self.assertRaises(KeyError): - candidate=snapshot.toNext("candidat") - candidate=snapshot.toNext("snapsho") - candidate=snapshot.toNext("releas") - candidate=snapshot.toNext(-1) - candidate=snapshot.toNext(len(Series.series)+1) - # Index out of range of the Series.series array. - - # Test to see if the objects returned from the toNext* functions are - # new instances and not references to the input Series. - def test_toNextFOO_return_new_objects(self): - candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") - candidate2=candidate.toNext("candidate") - release=candidate.toNext("release") - - self.assertIsNot(candidate,candidate2) - self.assertIsNot(candidate,release) - - snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") - snapshot2=snapshot.toNext("snapshot") - self.assertIsNot(snapshot,snapshot2) - -if __name__ == '__main__': - #logging.basicConfig(level="INFO") - unittest.main() diff --git a/proj.py b/proj.py deleted file mode 100644 index 8f8fa7b..0000000 --- a/proj.py +++ /dev/null @@ -1,117 +0,0 @@ -import unittest -import logging -import os -import tempfile - -# Execute shell commands with the 'sh' package. -from sh import rm - -# TODO: Take an existing directory as an input, which allows the benefits of a -# Proj without having to create the directory. BUT make sure that -# persist=True is forced in such cases. - -# This class represents the project temporary directory whose lifetime is -# managed. It is not persistent by default. -class Proj(object): - # @ persist - True, False or None. Only True persists. - # @ prefix - Optional prefix appended to projdir directory name. - def __init__(self, prefix="", persist=None): - # This allows string or bool inputs. - if str(persist).lower() == "true": - self.persist=True - longevity="persistent" - else: - self.persist=False - longevity="temporary" - - #TODO: Take an option variable to pass to mkdtemp to collect all - # instances of Proj in to for testing purposes. - self.projdir=unicode(tempfile.mkdtemp(prefix=prefix), "utf-8") - logging.info("Created %s project directory in %s" % (longevity, self.projdir)) - - def cleanup(self): - #FIXME: Only rm -rf if the directory exists. Make sure it's not / or /home -# #TODO: make sure that persisdir isn't a mount-bind to /home - if not self.persist and self.projdir != "/" and self.projdir != "/home": - logging.info("Removing project dir %s" % self.projdir) - rm("-rf", "--preserve-root", self.projdir) - self.projdir=None - else: - logging.info("Project dir %s will persist" % self.projdir) - - -class TestProj(unittest.TestCase): - - def setUp(self): - self.persistdir="" - - def test_create(self): - self.proj=Proj() - self.assertNotEqual(self.proj.projdir, "") - - def test_create_dir(self): - self.proj=Proj() - self.assertTrue(os.path.isdir(self.proj.projdir)) - - def test_create_with_prefix(self): - self.proj=Proj("testprefix") - self.assertTrue("testprefix" in self.proj.projdir) - - def test_create_with_persist_bool(self): - self.proj=Proj(persist=True) - self.assertTrue(self.proj.persist) - self.persistdir=self.proj.projdir - - def test_create_with_persist_str(self): - self.proj=Proj(persist="true") - self.assertTrue(self.proj.persist) - self.persistdir=self.proj.projdir - - def test_create_with_persist_false_str(self): - self.proj=Proj(persist="false") - self.assertFalse(self.proj.persist) - # Just in-case it wasn't set properly - self.persistdir=self.proj.projdir - - def test_create_with_persist_false_bool(self): - self.proj=Proj(persist=False) - self.assertFalse(self.proj.persist) - # Just in-case it wasn't removed properly - self.persistdir=self.proj.projdir - - def test_create_with_persist_capstr(self): - self.proj=Proj(persist="TRUE") - self.assertTrue(self.proj.persist) - self.persistdir=self.proj.projdir - - def test_cleanup(self): - self.proj=Proj() - projdir=self.proj.projdir - self.proj.cleanup() - # Verify that the directory has been removed. - self.assertFalse(os.path.isdir(projdir)) - - def test_cleanup_with_persist(self): - self.proj=Proj(persist=True) - projdir=self.proj.projdir - self.proj.cleanup() - self.assertTrue(os.path.isdir(projdir)) - self.persistdir=self.proj.projdir - - def test_cleanup_with_persist_and_prefix(self): - self.proj=Proj(persist=True, prefix="thingy") - projdir=self.proj.projdir - self.proj.cleanup() - self.assertTrue(os.path.isdir(projdir)) - self.persistdir=self.proj.projdir - - def tearDown(self): - # if persistdir is set and isdir then remove it or we'll pollute /tmp. - if os.path.isdir(self.persistdir): - if self.persistdir != "/" and self.persistdir != "/home": - rm("-rf", "--preserve-root", self.persistdir) - - -if __name__ == '__main__': - #logging.basicConfig(level="INFO") - unittest.main() diff --git a/rn.py b/rn.py index 313c6ef..c15f637 100644 --- a/rn.py +++ b/rn.py @@ -7,20 +7,20 @@ import os import sys # handle_exit is a context manager which guarantees that the project temporary # directories are cleaned up when there are signals. -from handle_exit import handle_exit +from linaropy.handle_exit import handle_exit from datetime import datetime -from series import Series -from gccclone import GCCClone -from clone import Clone -from linaroseries import LinaroSeries -from linaroseries import linaroSeriesFromBranchname -from proj import Proj -from gitrepo import cd -from template import TemplateRN -from rnseries import CandidateRN -from rngen import rngen +from linaropy.series import Series +from linaropy.rn.gccclone import GCCClone +from linaropy.git.clone import Clone +from linaropy.rn.linaroseries import LinaroSeries +from linaropy.rn.linaroseries import linaroSeriesFromBranchname +from linaropy.proj import Proj +from linaropy.git.gitrepo import cd +from linaropy.rn.template import TemplateRN +from linaropy.rn.rnseries import CandidateRN +from linaropy.rn.rngen import rngen rnProj=[] diff --git a/rngen.py b/rngen.py deleted file mode 100644 index 004ad4d..0000000 --- a/rngen.py +++ /dev/null @@ -1,303 +0,0 @@ -#!/usr/bin/env python -# vim: set tw=78 expandtab tabstop=4 shiftwidth=4 autoindent smartindent: - -import csv -import sys, getopt -reload(sys) -sys.setdefaultencoding('utf8') - -# For fileIO -import os.path - -# Generate html files from textile files. -import textile - -# Generate text files from html. -#from BeautifulSoup import BeautifulSoup -from bs4 import BeautifulSoup - -# Jinja2 is the templating engine. -from jinja2 import Template, FileSystemLoader, Environment - -from custom_wordwrap import do_customwordwrap - -from cStringIO import StringIO - -from datetime import datetime - -from shutil import copyfile - -# To test if a remote file exists. -import urllib2 - -from series import Series -from linaroseries import LinaroSeries -from rnseries import CandidateRN -from gccclone import GCCClone - -from rninput import finput - -def get_gcc_version(gccclone): - dct= {} - (dct["major"],dct["minor"],dct["revision"]) = gccclone.get_base_version().split(".") - return dct - -def rngen(candidate, gccclone): - #TODO Test that candidate is a CandidateRN - - trackseries=candidate.get_track_series() - nextseries=candidate.get_next_series() - - print "toNext: " + nextseries.getBranchname() - print "toNext: " + format(nextseries, '%N/%d') - - # TODO: What if 'track' is not a snapshot branch? To do this right we'd - # probably have to preserve basedon information in the release notes - # repository as a yaml file. - basedon='http://snapshots.linaro.org/components/toolchain/gcc-linaro/' - basedon=basedon + trackseries.getDir() - print "basedon: " + basedon - basis_url=basedon - -# print 'gccbaseversion is ' + gccclone.get_base_version() -# dct= {} -# (dct["major"],dct["minor"],dct["revision"]) = gccclone.get_base_version().split(".") -# print 'fsf revision is ' + gccclone.get_fsf_revision() - ver=get_gcc_version(gccclone) - print 'gcc major ' + ver['major'] - print 'gcc minor ' + ver['minor'] - - fsf=gccclone.get_fsf_revision() - - # The version numbering scheme changed with GCC 5. Now, every new release - # has the major version incremented. Prior to version five the minor was - # incremented per-release. - # TODO: read stable and maintenance information from a config file. - if int(ver['major']) <= 4: - gcc_version_path= u'' + ver['major'] + '.' + ver['minor'] - stab_maint='Maintenance' - else: - gcc_version_path=ver['major'] - stab_maint='Stable' - - print "This is a %s candidate for GCC version %s" % (stab_maint, - gcc_version_path) - - history=finput('Please enter the location of the changelog csv file: ', "6.1-2016.06.csv") - - print "using the following .csv file: " + history - - #month_year=format(nextseries, '%D') - month_year=nextseries.date - - outfile=u'GCC-' + ver['major'] + u'.' + ver['minor'] + u'-' + month_year.strftime("%Y.%m") - -# if spin != "": -# outfile=outfile + u'-' + spin -# -# if rc != "": -# outfile=outfile + u'-rc' + rc - outfile=outfile + str(nextseries.spin) + str(nextseries.rc) - print "outfile: " + outfile - - with open(history, mode='r') as backports: - backports.seek(0) - backports_content = backports.read() - in_memory_backports = StringIO(backports_content) - - backportsDict = csv.DictReader(in_memory_backports, fieldnames=['rev', 'fsf', 'linaro', 'summary', 'target', 'type', 'notes', 'backport', 'owner', 'author'], delimiter=',') - backports.close(); - - release_type=nextseries.longuppertype() - - spin=str(nextseries.spin).strip('-') - rc=str(nextseries.rc).strip("-rc") - - template_dir=candidate.rntemplate.workdir() - series_dir=candidate.rnworkdir.workdir() - - # Verify that the basis url is actually valid, otherwise the release notes - # will contain erroneous information. - url=urllib2 - try: - ret = urllib2.urlopen(basis_url) - except url.URLError as e: - print "%s does not exist!" % basis_url - exit(1) - - # TODO verify that the basis_url is a valid url and warn if not. - basis=u'' - basis_ver=basis_url.strip("/") - basis_ver=basis_ver.rsplit("/",1)[1] - if basis_url.find("releases") == 1: - basis="Linaro Release GCC " + basis_ver - elif basis_url.find("-rc") == 1: - basis="Linaro Release-Candidate GCC " + basis_ver - else: - basis="Linaro Snapshot GCC " + basis_ver - - release_string="Linaro GCC %s.%s-%s.%s %s" % (ver['major'],ver['minor'],month_year.strftime("%Y"), month_year.strftime("%m"),release_type) - - print "The release notes for this %s will be generated based on: %s" % (release_string, basis) - - # Get ready to do the rendering of the jinja2 templates into .textile - # format. - - # relative path to the README.textile templates. - src_readme_path=u'/components/toolchain/gcc-linaro/5/README.textile' - bin_readme_path=u'/components/toolchain/binaries/README.textile' - - # absolute path to README.textile templates. - template_src_readme=template_dir + src_readme_path - template_bin_readme=template_dir + bin_readme_path - - # relative path to the README.textile.series templates. - # Note, the src templates don't have an extends (.series) file. - src_extends_path=u'/components/toolchain/gcc-linaro/5/README.textile' - bin_extends_path=u'/components/toolchain/binaries/README.textile.series' - - # absolute path to README.textile.series template overrides. These are - # what are passed to the jinja2 rendering engine because they might - # override the default templates with version specific information. - series_src_extends=series_dir + src_extends_path - series_bin_extends=series_dir + bin_extends_path - - # These are the README.textile files that exist in the series directory. - series_src_readme=series_dir + src_readme_path - series_bin_readme=series_dir + bin_readme_path - - # Copy the template README.textile over the series README.textile file - # because the jinja2 engine will need access to the parent template. This - # will be overwritten with the generated README.textile later. - copyfile(template_src_readme, series_src_readme) - copyfile(template_bin_readme, series_bin_readme) - - # After the script runs the *_series_template_path files are overwritten - # with the generated README.textile files. - src_readme=series_src_readme - bin_readme=series_bin_readme - - - # Setting lstrip_blocks=True lets us use indenting in jinja2 templates. - env = Environment(loader=FileSystemLoader(series_dir), lstrip_blocks=True, trim_blocks=True) - - # We need to use a custom wordwrap filter that doesn't line-break urls. - env.filters['customwordwrap'] = do_customwordwrap - - source_archive_template = env.get_template(src_extends_path) - binary_archive_template = env.get_template(bin_extends_path) - announce_template = env.get_template(bin_extends_path) - - # Generate the toolchain GCC source archive release notes. - source_textile_rendering = source_archive_template.render( - backports=backportsDict, - GCC_base_version=ver['major'] + u'.' + ver['minor'] + u'.' + ver['revision'], - FSF_rev=fsf, - basis=basis, - basis_url=basis_url, - year=month_year.strftime("%Y"), - dd_month=month_year.strftime("%m"), - rc=rc, - spin=spin, - stab_maint=stab_maint, - release_type=release_type, - package="GCC", - major=ver['major'], - minor=ver['minor']) - - source_textile_rendering = source_textile_rendering.encode('utf-8') - - out_source=u'' + outfile - out_binary=u'Linaro-' + outfile - - try: - with open(out_source + u'.textile','w') as f: - f.write(source_textile_rendering) - print 'Wrote textile rendering to file %s' % os.getcwd() + u'/' + out_source + u'.textile' - except IOError as e: - sys.exit('Error rendering textile source archive release notes.') - - source_html_rendering = textile.textile(source_textile_rendering) - - try: - with open(out_source + u'.html','w') as f: - f.write(source_html_rendering) - print 'Wrote html rendering to file %s' % os.getcwd() + u'/' + out_source + u'.html' - except IOError as e: - sys.exit('Error rendering html release notes.') - - # Generate the binary toolchain archive release notes. - binary_textile_rendering = binary_archive_template.render( - announce='', - year=month_year.strftime("%Y"), - dd_month=month_year.strftime("%m"), - rc=rc, - spin=spin, - major=ver['major'], - minor=ver['minor'], - stab_maint=stab_maint, - snap_rc_release=release_type) - - binary_textile_rendering = binary_textile_rendering.encode('utf-8') - try: - with open(out_binary + u'.textile','w') as f: - f.write(binary_textile_rendering) - print 'Wrote textile rendering to file %s' % os.getcwd() + u'/' + out_binary + u'.textile' - except IOError as e: - sys.exit('Error rendering textile binary archive release notes.') - - binary_html_rendering = textile.textile(binary_textile_rendering) - - try: - with open(out_binary + u'.html','w') as f: - f.write(binary_html_rendering) - print 'Wrote html rendering to file %s' % os.getcwd() + u'/' + out_binary + u'.html' - except IOError as e: - sys.exit('Error rendering html release notes.') - - # Generate the binary toolchain announce message. - announce_rendering = announce_template.render( - announce='yes', - year=month_year.strftime("%Y"), - dd_month=month_year.strftime("%m"), - rc=rc, - spin=spin, - major=ver['major'], - minor=ver['minor'], - stab_maint=stab_maint, - snap_rc_release=release_type) - - announce_rendering = announce_rendering.encode('utf-8') - announce_file=u'announce-' + out_binary + u'.txt' - try: - with open(announce_file,'w') as f: - f.write(announce_rendering) - print 'Wrote text rendering to file %s' % os.getcwd() + u'/' + announce_file - except IOError as e: - sys.exit('Error rendering text announce message.') - - - # Copy the generated .textile file over the - # readme_series_dir/README.textile template file. - # WARNING: This must be done AFTER you generate the announce message - # because it uses the same README.textile template as the textile rendered - # release notes. - copyfile(out_binary + u'.textile' ,bin_readme) - print 'Wrote generated file ' + out_binary + u'.textile' + " to " + bin_readme - - # Copy the generated .textile file over the - # readme_series_dir/README.textile template file. - copyfile(out_source + u'.textile' ,src_readme ) - print 'Wrote generated file ' + out_source + u'.textile' + " to " + src_readme - - candidate.rnworkdir.add(bin_readme) - candidate.rnworkdir.add(src_readme) - candidate.rnworkdir.commit("Generated Release Notes for " + nextseries.label()) - candidate.rnworkdir.log(1) - - # TODO: Ask if the user would like to push the release notes changes. - - quit(0); - -if __name__ == "__main__": - main(sys.argv[1:]) diff --git a/rninput.py b/rninput.py deleted file mode 100644 index 86d3d90..0000000 --- a/rninput.py +++ /dev/null @@ -1,35 +0,0 @@ -# For fileIO -import os.path - -# @message - The question to display to the user. -# @default - The default if is hit. -# @accept - The list of valid responses. -# @retry - Whether to retry on a malformed input. -# Returns 'y' or 'no' -def yninput(message, default="n", accept=['yes', 'y', 'no', 'n'], retry=True): - # TODO: Test this - if default.lower() not in accept: - raise TypeError('Default as %s is not in list of accepted responses.' % default) - - default_msg=" [y/N]: " - if default.lower() == "y" or default.lower() == "yes": - default_msg=" [Y/n]: " - - while(1): - answer=raw_input(message + default_msg) or default.lower() - if answer.lower() not in accept and retry: - print "'%s' is an invalid response. Please try again." % answer.lower() - continue - else: - if answer.lower() == "yes" or answer.lower() == "y": - return "y" - return "n" - -# TODO: Test with a directory returned as the answer. -def finput(message,orval): - while(1): - answer=raw_input(message) or orval - if os.path.exists(answer) and os.path.isfile(answer): - return answer - - print "%s doesn't exist or isn't a regular file. Try again." % answer diff --git a/rnseries.py b/rnseries.py deleted file mode 100644 index 1b02373..0000000 --- a/rnseries.py +++ /dev/null @@ -1,193 +0,0 @@ -import unittest -import logging -import os - -from proj import Proj -from sh import ls -from shutil import copyfile - -from gitrepo import cd -from clone import Clone -from workdir import Workdir -from series import Series -from series import seriesFromBranchname -from linaroseries import LinaroSeries -from linaroseries import linaroSeriesFromBranchname -from template import TemplateRN - -from rninput import yninput - -# Abstract base class for a release-notes series -# Every RNSeries has a templateRN instance and a workdir. The template -# instance is used for the base template. -class RNSeries(object): - #rnremote=u'http://git.linaro.org/toolchain/release-notes.git' - # use ssh:// so that we can push to the remote. - rnremote=u'ssh://git@git.linaro.org/toolchain/release-notes.git' - _series='components/toolchain/binaries/README.textile.series' - # @rnrepo - path to the existing releases notes repository if there is one. - # If there isn't one a new one will be cloned. - # @trackseries - an instance of a LinaroSeries to track. - # @nextseries - an instance of a LinaroSeries that is the next series. - def __init__(self, proj, rnrepo=None, trackseries=None, nextseries=None): - - # Create the release-notes repository clone. The Clone constructor - # will throw an exception if proj is not a Proj. That is an - # unrecoverable error so don't catch it here. The Clone constructor - # will also throw and exception if rnrepo is not a string. - - if rnrepo: - logging.info('RNSeries() clone already exists. Using existing.') - self.rnclone=Clone(proj, clonedir=rnrepo) - else: - self.rnclone=Clone(proj, remote=RNSeries.rnremote) - - # TODO: Write a testcase that exposes this. - if not trackseries: - raise TypeError("Input variable trackseries is required.") - - # TODO: Write a testcase that exposes this. - if not isinstance(trackseries, LinaroSeries): - raise TypeError("Input variable trackseries not of type Series.") - - # TODO: Write a testcase that exposes this. - if not nextseries: - raise TypeError("Input variable nextseries is required.") - - # TODO: Write a testcase that exposes this. - if not isinstance(nextseries, LinaroSeries): - raise TypeError("Input variable nextseries not of type Series.") - - self.trackseries=trackseries - self.nextseries=nextseries - - self.proj=proj - - templ_branchname=self.nextseries.shorttype() + "_" + self.nextseries.getDir() - - logging.info('RNSeries() Creating TemplateRN') - # Every release-notes series contains a 'template' workdir checked out - # into a new templ_branchname branch that tracks master. - self.rntemplate=TemplateRN(self.proj,self.rnclone, branchname_suffix=templ_branchname) - - # We do this now because the SeriesRN workdir might be branched from the template. - self.rntemplate.updateTemplateReadme() - updateseries=self.rntemplate.updateTemplateSeries() - - self.rntemplate.commit() - - print "Please verify that your changes have been committed on the template branch." - self.rntemplate.log(1) - - # It's possible that self.trackseries.getBranchname() doesn't exist - # (such is the case if we're creating the first candidate from a - # snapshot). In this case 'track' should be the local template branch - # so that we pick up the template changes. - if not self.rntemplate.branchexists(self.trackseries.getBranchname()): - print("Creating RNSeries based on branch " + self.rntemplate.getbranch()) - self.rnworkdir=Workdir(proj=self.proj, clone=self.rnclone, workdir=self.nextseries.shorttype(), track=self.rntemplate.getbranch(), branchname=self.nextseries.getBranchname()) - else: - print "You updated the template README.textile.series but this will not be reflected in the Candidate README.textile.series." - print "Creating RNSeries based on branch " + self.trackseries.getBranchname() - self.rnworkdir=Workdir(proj=self.proj, clone=self.rnclone, workdir=self.nextseries.shorttype(), track=self.trackseries.getBranchname(), branchname=self.nextseries.getBranchname()) - - print "Log of last commit on workdir branch:" - self.rnworkdir.log(1) - - print "If you would like to make changes to the series release notes please update the the Candidate README.textile.series file:" - answer=yninput("Do you need to update the Candidate README.textile.series file?", "N") - if answer == "y": - self.rnworkdir.editFile(self._series) - else: - print "The Candidate README.textile.series file has not been updated." - - self.rnworkdir.commit() - - print "Log of last commit on workdir branch:" - self.rnworkdir.log(1) - - # TODO: Don't forget to push template and workdir changes. - # self.rntemplate.pushToBranch("origin/master") - # TODO: Should the workdir know where to push to? - # self.rnworkdir.pushToBranch(self.nextseries.getBranchname()) - - def get_next_series(self): - return self.nextseries - - def get_track_series(self): - return self.trackseries - -# Do we even need these, especially if they're all the same? -class SnapshotRN(RNSeries): - def __init__(self, proj, trackseries, rnrepo=None): - if not isinstance(trackseries, LinaroSeries): - raise TypeError('The series input must be of type LinaroSeries') - - nextseries=trackseries.toNextSnapshot(strict=True) - - super(SnapshotRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=nextseries) - -class CandidateRN(RNSeries): - def __init__(self, proj, trackseries, rnrepo=None): - if not isinstance(trackseries, LinaroSeries): - raise TypeError('The trackseries input must be of type LinaroSeries') - - self.nextseries=trackseries.toNextCandidate(strict=True) - - super(CandidateRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=self.nextseries) - -class ReleaseRN(RNSeries): - def __init__(self, proj, trackseries, rnrepo=None): - if not isinstance(trackseries, LinaroSeries): - raise TypeError('The series input must be of type LinaroSeries') - - nextseries=trackseries.toNextRelease(strict=True) - - super(ReleaseRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=nextseries) - -class TestCandidateRN(unittest.TestCase): - #logging.basicConfig(level="INFO") - testdirprefix="CandidateRNUT" - - @classmethod - def setUpClass(cls): - cls.proj=Proj(prefix=TestCandidateRN.testdirprefix, persist=True) - logging.info('11111 Created Proj dir') - cls.rnrepo=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') - logging.info('22222 Created clone dir') - cls.branch=u'snapshots/linaro-6.1-2016.06' - cls.series=linaroSeriesFromBranchname(branch=cls.branch) - - @classmethod - def tearDownClass(cls): - cls.proj.cleanup() - - def test_candidate_newclone(self): - logging.info('33333 Going To Create CandidateRN') - candidatern=CandidateRN(self.proj, rnrepo=self.rnrepo.repodir, trackseries=self.series) - logging.info('44444 Created CandidateRN') - self.assertEqual(candidatern.nextseries.longuppertype(), "Release-Candidate") - self.assertEqual(candidatern.nextseries.longlowertype(), "release-candidate") - self.assertTrue(os.path.isdir(self.proj.projdir + "/release-notes.git")) - self.assertTrue(os.path.isdir(self.proj.projdir + "/candidate")) - self.assertTrue(os.path.isdir(self.proj.projdir + "/template")) - -# def test_candidate_existingclone(self): -# # Since this reuses an existing clone of a repository the -# # release-notes.git directory should actually be a symlink. -# candidatern=CandidateRN(self.proj, trackseries=self.series, rnrepo=TestCandidateRN.rnrepo.repodir) -# self.assertEqual(candidatern.series.longuppertype(), "Release-Candidate") -# self.assertEqual(candidatern.series.longlowertype(), "release-candidate") -# self.assertTrue(os.path.islink(self.proj.projdir + "/release-notes.git")) -# self.assertTrue(os.path.isdir(self.proj.projdir + "/candidate")) -# self.assertTrue(os.path.isdir(self.proj.projdir + "/template")) -# -# self.assertEqual(candidatern.rntemplate.getbranch().split("_")[0], "template") -## self.assertTrue(candidatern.rnworkdir.getbranch().startswith("releases/")) -# -# print "Based on: %s" % self.series.getBranchname() -# print "Template Branch: %s" % candidatern.rntemplate.getbranch() -# print "Candidate Branch: %s" % candidatern.rnworkdir.getbranch() - -if __name__ == '__main__': - unittest.main() diff --git a/series.py b/series.py deleted file mode 100644 index 6475fe3..0000000 --- a/series.py +++ /dev/null @@ -1,550 +0,0 @@ -import unittest -import logging -import os -import uuid - -from vers import Spin -from vers import Rc -from vers import Vendor -from vers import Package - -from datetime import datetime -from dateutil.relativedelta import relativedelta - -from vers import packageFromStr - -# A Series stores the instance characteristic that describes the output -class Series(object): - - series = ['candidate', 'snapshot', 'release'] - serieslongupper = ['Release-Candidate', 'Snapshot', 'Release'] - - # @ seriestype - 'type' of either 'candidate', 'snapshot', or 'release' - # @ package - String or Package object representing the series package - # name with version string, e.g., 'GCC-5.3.1' - # @ vendor - String or Vendor object representing the vendor that is - # tagged on this series. - # @ date - String or 'datetime' object representing the YYYY.MM of this - # series. It defaults to 'today' - # @ spin - Optional Spin, str, or int - # @ rc - Optional Rc, str, or int - # This will make sure snapshot doesn't have an rc and release doesn't have - # an rc for instance. - def __init__(self, seriestype, vendor=None, package=None, date=datetime.today(), spin=None, rc=None, strict=True): - if isinstance(spin, Spin): - self.spin=spin - else: - # Spin will raise an exception if spin is the wrong type. - # A None parameter will create a Spin with an internal value of None. - self.spin=Spin(spin) - - if isinstance(rc, Rc): - self.rc=rc - else: - # Rc will raise an exception if rc is the wrong type. A None parameter - # will create an Rc with an internal value of None. - self.rc=Rc(rc) - - if isinstance(date, datetime): - # Force all of the days of the month to 15 to unify comparisons. - date.replace(day=15) - self.date=date - else: - # 'date' is a string. If the input date can't be parsed by datetime - # it will throw an exception. We can't recover from it so just pass - # it up. - if len(date) < 10: - tmpdate=datetime.strptime(date, "%Y.%m") - else: - tmpdate=datetime.strptime(date, "%Y.%m.%d") - # Force all of the days of the month to 15 to unify comparisons. - self.date=tmpdate.replace(day=15) - - # If the input vendor=None just instantiate the default vendor. - if not vendor: - self.vendor=Vendor() - else: - self.vendor=vendor - - if not package: - raise TypeError('Series requires an input package.') - elif isinstance(package, Package): - self.package=package - elif isinstance(package, basestring): - self.package=packageFromStr(package) - else: - # There can't be a defaut package because it requires a version. - raise TypeError("Series 'package' unrecognized type " + str(type(package))) - - # We might want to uniquely identify a particular Series object. - self.uniqueid=str(uuid.uuid4()) - - # We store a seriestype as an integer into an enumeration array - # so that the names can be changed as desired. - try: - self.seriestype = Series.series.index(seriestype.lower()) - except ValueError: - self.seriestype = -1 # no match - raise TypeError('Invalid series type %s.' % seriestype) - - # rc can only be non-None for release-candidates. - if strict: - if self.seriestype == Series.series.index("snapshot"): - if self.rc.val != 0: - raise ValueError('A snapshot series cannot have an rc.') - elif self.seriestype == Series.series.index("release"): - if self.rc.val != 0: - raise ValueError('A release series cannot have an rc.') - - - if self.seriestype == Series.series.index("candidate") and self.rc.val is 0: - raise TypeError('A candidate series must have an rc specified.') - - # namespace/vendor-package-version-YYYY.MM-spin-rc - self.fmt = { - '%N': self.getNamespace, - '%L': self.serieslabel, - '%P': self._getPackageName, - '%p': self._getPackageNameLower, - '%V': self._getVendor, - '%v': self._getVendorLower, - '%E': self._getVersion, - '%D': self._getYYYYMM, - '%d': self.getDir, - '%S': self._getSpin, - '%R': self._getRc, - } - - def _getPackageName(self): - return self.package.pv - - def _getPackageNameLower(self): - return self.package.lower() - - def _getVendor(self): - return self.vendor.__str__() - - def _getVendorLower(self): - return self.vendor.lower() - - def _getVersion(self): - return self.package.version.strfversion("%M%m") - - def _getYYYYMM(self): - return self.date.strftime("%Y.%m") - - def _getSpin(self): - return str(self.spin).strip('-') - - def _getRc(self): - return str(self.rc).strip('-') - - def __format__(self,format_spec): - ret='' - # Iterate across the dictionary and for each key found and invoke the - # function. - # self.fmt[idx]() - import string - ret=format_spec - for key in self.fmt.iterkeys(): - # TOOD: Crap this isn't going to work because each function takes different parameters. - replac=self.fmt[key]() - ret=string.replace(ret,key,replac) - return ret - - def serieslabel(self): - # Only the 'snapshot-' is used in a label. - if self.seriestype == Series.series.index("snapshot"): - return Series.series[self.seriestype] + u'-' - else: - return u'' - - def shorttype(self): - return Series.series[self.seriestype] - - def longlowertype(self): - return Series.serieslongupper[self.seriestype].lower() - - def longuppertype(self): - return Series.serieslongupper[self.seriestype] - - def __str__(self): - return Series.series[self.seriestype] + "_" + self.uniqueid - - def label(self): - label=str(self.vendor) + u'-' + self.serieslabel() + str(self.package) - label=label + u'-' + self.date.strftime("%Y.%m") - label=label + str(self.spin) + str(self.rc) - return label - - def incrementMonth(self): - self.date = self.date + relativedelta(months=1) - - # TODO: provide a format function which can change the separator and - # capitalization, etc. - - def getDir(self): - dirstr=self.package.version.strfversion("%M%m") - dirstr=dirstr + u'-' + self.date.strftime("%Y.%m") - dirstr=dirstr + str(self.spin) - dirstr=dirstr + str(self.rc) - return dirstr - - def getNamespace(self): - namespace=u'releases' - if self.seriestype == Series.series.index("snapshot"): - namespace=u'snapshots' - return namespace - - # TODO: Document what a conformant branch name looks like. - # Return a conformant branch name from the Series information. - def getBranchname(self): - branchname=u'' - if self.seriestype == Series.series.index("snapshot"): - branchname=branchname + u'snapshots/' - else: - branchname=branchname + u'releases/' - - branchname=branchname + self.vendor.lower() - branchname=branchname + u'-' + self.package.version.strfversion("%M%m") - branchname=branchname + u'-' + self.date.strftime("%Y.%m") - branchname=branchname + str(self.spin) - branchname=branchname + str(self.rc) - - return branchname - -# Helper function which creates a Series from a properly formed branch name -# input string. -def seriesFromBranchname(branch=None): - if not isinstance(branch, basestring): - raise TypeError('seriesFromBranchname requires a basestring as input') - - if not branch: - raise ValueError('seriesFromBranchname requires a non-empty string as input') - - # Get the part before the '/'. That's the namespace - try: - namespace=branch.rsplit('/', 1)[0] - except IndexError: - raise RuntimeError('string must be /') - - # Get everything after the '/'. - try: - seriesstr=branch.rsplit('/', 1)[1] - except IndexError: - raise RuntimeError('string must be -') - - if namespace == u'': - raise TypeError("Couldn't parse a namespace from input string") - if seriesstr == u'': - raise TypeError("Couldn't parse a series from input string") - - keys=['vendor', 'version', 'date', 'spin', 'rc'] - values=seriesstr.split('-') - dictionary=dict(zip(keys, values)) - - # if there is no spin but there is an 'rcX' in the spin key:value pair - # it means that there's really no spin but should be in the rc key:value - # pair. - if dictionary.has_key("spin") and "rc" in dictionary["spin"]: - dictionary["rc"]=dictionary["spin"] - dictionary.pop("spin", None) - - # We need to have None fields in the missing keys for when we call the - # Series constructor. - if not dictionary.has_key("rc"): - dictionary["rc"]=None - else: - # strip the "rc" and just leave the int. - if dictionary["rc"].startswith("rc"): - dictionary["rc"]=dictionary["rc"][2:] - - # We need to have None fields in the missing keys for when we call the - # Series constructor. - if not dictionary.has_key("spin"): - dictionary["spin"]=None - - seriesdate=datetime.today - if dictionary["date"]: - datekeys=['year', 'month', 'day'] - datevalues=dictionary["date"].split('.') - datefields=dict(zip(datekeys, datevalues)) - if not datefields.has_key("day"): - datefields["day"]="15" - seriesdate=datetime(int(datefields["year"]), int(datefields["month"]), int(datefields["day"])) - - if "snapshots" in namespace and dictionary["rc"]: - raise RuntimeError('A snapshots namespace can not have an rc. This is a non-conforming input.') - elif "releases" not in namespace and dictionary["rc"]: - raise RuntimeError('An rc must have a "releases" namespace. This is a non-conforming input.') - elif dictionary["rc"]: - seriesname="candidate" - elif "snapshots" in namespace: - seriesname="snapshot" - elif "releases" in namespace: - seriesname="release" - elif "snapshot" in namespace: - raise RuntimeError('"snapshot" is not a complete namespace. A conforming namespace is "snapshots".') - else: - # TODO test for unknown namespace. - raise RuntimeError('Unknown namespace in input string.') - - package=Package(package="GCC", version=dictionary["version"]) - series=Series(seriesname, package=package, date=seriesdate, spin=dictionary["spin"], rc=dictionary["rc"], strict=True) - - return series - -from vers import versionFromStr - -class TestSeries(unittest.TestCase): - - def test_missing_seriestype(self): - with self.assertRaises(TypeError): - spin=Spin() - rc=Rc() - candidate=Series(package="GCC-5.3.1", spin=spin, rc=rc) - - def test_no_match(self): - with self.assertRaises(TypeError): - candidate=Series("foobar", package="GCC-5.3.1") - - def test_partial_seriestype_match(self): - with self.assertRaises(TypeError): - candidate=Series("candid", package="GCC-5.3.1") - - def test_excessive_seriestype_match(self): - with self.assertRaises(TypeError): - candidate=Series("candidate2", package="GCC-5.3.1") - - def test_match_release(self): - release=Series("release", package="GCC-5.3.1") - self.assertEqual(str(release).split("_")[0], "release") - - def test_match_candidate_wrongcase(self): - candidate=Series("Candidate", package="GCC-5.3.1",rc="1") - self.assertEqual(str(candidate).split("_")[0], "candidate") - - def test_match_snapshot_wrongcase(self): - snapshot=Series("SNAPSHOT", package="GCC-5.3.1") - self.assertEqual(str(snapshot).split("_")[0], "snapshot") - - def test_longlowertype_candidate(self): - candidate=Series("candidate", package="GCC-5.3.1", rc="1") - self.assertEqual(candidate.longlowertype(), "release-candidate") - - def test_longuppertype_candidate(self): - candidate=Series("candidate", package="GCC-5.3.1", rc="4") - self.assertEqual(candidate.longuppertype(), "Release-Candidate") - - def test_shorttype_candidate(self): - candidate=Series("candidate", package="GCC-5.3.1", rc="1") - self.assertEqual(candidate.shorttype(), "candidate") - - def test_serieslabel_candidate(self): - candidate=Series("candidate", package="GCC-5.3.1", rc="1") - self.assertEqual(candidate.serieslabel(), "") - - def test_serieslabel_release(self): - candidate=Series("release", package="GCC-5.3.1") - self.assertEqual(candidate.serieslabel(), "") - - def test_serieslabel_snapshot(self): - candidate=Series("snapshot", package="GCC-5.3.1") - self.assertEqual(candidate.serieslabel(), "snapshot-") - - def test_empty_rc(self): - rc=Rc(7) - candidate=Series("candidate", package="GCC-5.3.1", rc=rc) - self.assertEqual(candidate.rc.vers, 7) - self.assertEqual(str(candidate.rc), "FOO") - - def test_empty_rc(self): - rc=Rc() - release=Series("release", package="GCC-5.3.1", rc=rc) - self.assertEqual(str(release.rc), "") - - release2=Series("release", package="GCC-5.3.1") - self.assertEqual(str(release2.rc), "") - - def test_specified_rc(self): - rc=Rc(7) - candidate=Series("candidate", package="GCC-5.3.1", rc=rc) - self.assertEqual(candidate.rc.val, 7) - self.assertEqual(str(candidate.rc), "-rc7") - - def test_candidate_with_no_rc(self): - with self.assertRaises(TypeError): - candidate=Series("candidate", package="GCC-5.3.1") - - with self.assertRaises(TypeError): - rc=Rc() - candidate2=Series("candidate", package="GCC-5.3.1", rc=rc) - - def test_rc_as_string(self): - candidate=Series("candidate", package="GCC-5.3.1", rc="7") - self.assertEqual(candidate.rc.val, 7) - self.assertEqual(str(candidate.rc), "-rc7") - - def test_rc_as_int(self): - candidate=Series("candidate", package="GCC-5.3.1", rc=7) - self.assertEqual(candidate.rc.val, 7) - self.assertEqual(str(candidate.rc), "-rc7") - - def test_rc_as_typeerror(self): - floatrc=7.0 - with self.assertRaises(TypeError): - snapshot=Series("snapshot", package="GCC-5.3.1", rc=floatrc) - - def test_rc_as_negative(self): - with self.assertRaises(ValueError): - snapshot=Series("snapshot", package="GCC-5.3.1", rc="-1") - - def test_missing_spin(self): - snapshot=Series("snapshot", package="GCC-5.3.1") - self.assertEqual(snapshot.spin.val, 0) - self.assertEqual(str(snapshot.spin), "") - - def test_specified_spin(self): - spin=Spin(7) - snapshot=Series("snapshot", package="GCC-5.3.1", spin=spin) - self.assertEqual(snapshot.spin.val, 7) - self.assertEqual(str(snapshot.spin), "-7") - - def test_empty_spin(self): - spin=Spin() - snapshot=Series("snapshot", package="GCC-5.3.1", spin=spin) - self.assertEqual(str(snapshot.spin), "") - - def test_spin_as_string(self): - snapshot=Series("snapshot", spin="7", package="GCC-5.3.1") - self.assertEqual(snapshot.spin.val, 7) - self.assertEqual(str(snapshot.spin), "-7") - - def test_spin_as_int(self): - snapshot=Series("snapshot", spin=7, package="GCC-5.3.1") - self.assertEqual(snapshot.spin.val, 7) - self.assertEqual(str(snapshot.spin), "-7") - - def test_spin_as_typeerror(self): - floatspin=7.0 - with self.assertRaises(TypeError): - snapshot=Series("snapshot", spin=floatspin, package="GCC-5.3.1") - - def test_spin_as_negative(self): - with self.assertRaises(ValueError): - snapshot=Series("snapshot", spin="-1", package="GCC-5.3.1") - - def test_empty_spin_and_rc(self): - release=Series("release", package="GCC-5.3.1") - self.assertEqual(release.spin.val, 0) - self.assertEqual(release.rc.val, 0) - self.assertEqual(str(release.spin), "") - self.assertEqual(str(release.rc), "") - - def test_package_as_Package(self): - package = Package("GCC", "5.3.1") - release=Series("release", package) - self.assertEqual(str(release.package), "GCC-5.3.1") - - def test_package_as_Package(self): - # Create a Version instead of a package - package = versionFromStr("5.3.1") - with self.assertRaises(TypeError): - candidate=Series("candidate", package) - - def test_package_as_None(self): - package=None - with self.assertRaises(TypeError): - candidate=Series("candidate", package) - - def test_getbranchname(self): - candidate=Series("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") - self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") - release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc=None) - self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1") - - def test_date_string(self): - candidate=Series("candidate", package="GCC-5.3.1", date="2016.05.27", spin="1", rc="1") - self.assertEqual(datetime(2016,05,15),candidate.date) - - candidate2=Series("candidate", package="GCC-5.3.1", date="2016.05", spin="1", rc="1") - self.assertEqual(datetime(2016,05,15),candidate2.date) - - with self.assertRaises(TypeError): - candidate3=Series("candidate", package="GCC-5.3.1", date=datetime("20161034234"), spin="1", rc="1") - - def test_series_strict_true(self): - with self.assertRaises(ValueError): - snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=True) - - with self.assertRaises(ValueError): - snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") - - with self.assertRaises(ValueError): - release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=True) - - with self.assertRaises(ValueError): - release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") - - def test_series_strict_false(self): - snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6", rc="1", strict=False) - self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-6-rc1") - - release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=False) - self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") - - release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), rc="1", strict=False) - self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-rc1") - - # TODO: Test combinations of branch names with and without spins/rcs. - - # TODO: Test whether seriesFromBranchname can detect non-conforming branch names. - - # These tests will verify that a branchname can be read, a series created, - # and then the same branch name recreated. - def test_seriesFromBranchname(self): - branch=u'snapshots/linaro-5.3-2016.05-6-rc1' - with self.assertRaises(RuntimeError): - series=seriesFromBranchname(branch=branch) - - branch2=u'snapshots/linaro-5.3-2016.05-6' - series2=seriesFromBranchname(branch=branch2) - self.assertEqual(series2.getBranchname(),u'snapshots/linaro-5.3-2016.05-6') - - branch3=u'snapshots/linaro-5.3-2016.05' - series3=seriesFromBranchname(branch=branch3) - self.assertEqual(series3.getBranchname(),u'snapshots/linaro-5.3-2016.05') - - branch4=u'releases/linaro-5.3-2016.05-6-rc1' - series4=seriesFromBranchname(branch=branch4) - self.assertEqual(series4.getBranchname(),u'releases/linaro-5.3-2016.05-6-rc1') - - branch5=u'releases/linaro-5.3-2016.05-rc1' - series5=seriesFromBranchname(branch=branch5) - self.assertEqual(series5.getBranchname(),u'releases/linaro-5.3-2016.05-rc1') - - branch6=u'releases/linaro-5.3-2016.05-6' - series6=seriesFromBranchname(branch=branch6) - self.assertEqual(series6.getBranchname(),u'releases/linaro-5.3-2016.05-6') - - branch7=u'releases/linaro-5.3-2016.05' - series7=seriesFromBranchname(branch=branch7) - self.assertEqual(series7.getBranchname(),u'releases/linaro-5.3-2016.05') - - branch8=u'snapshots/linaro-5.3-2016.05-6-rc1' - with self.assertRaises(RuntimeError): - series8=seriesFromBranchname(branch=branch8) - - branch9=u'snapshot/linaro-5.3-2016.05-6-rc1' - with self.assertRaises(RuntimeError): - series9=seriesFromBranchname(branch=branch9) - - branch10=u'snapshot/linaro-5.3-2016.05-6' - with self.assertRaises(RuntimeError): - series10=seriesFromBranchname(branch=branch10) - - # TODO: Test series.label (as there was a runtime bug) - -if __name__ == '__main__': - #logging.basicConfig(level="INFO") - unittest.main() diff --git a/template.py b/template.py deleted file mode 100644 index 4f6a865..0000000 --- a/template.py +++ /dev/null @@ -1,94 +0,0 @@ -import unittest -import logging -import os -import sys -import uuid - -from sh import ls -from proj import Proj -from clone import Clone -from workdir import Workdir -from rninput import yninput - -class TemplateRN(Workdir): - _readme='components/toolchain/binaries/README.textile' - _series='components/toolchain/binaries/README.textile.series' - - def __init__(self, proj, rnclone, branchname_suffix=None): - if branchname_suffix: - branchname='template_' + branchname_suffix - else: - branchname='template_' + str(uuid.uuid4()) - super(TemplateRN, self).__init__(proj, rnclone, "template", track="origin/master", branchname=branchname) - # Todo move template workdir out of detached head state. - - # TODO: Allow the user to edit BOTH the gcc-linaro/ and binaries/ - # README.textile file as currently this function only does the binary one. - # TODO: Create Unit Test - def updateTemplateReadme(self): - answer=yninput("Would you like to update the Template README.textile file?", "N") - if answer != "y": - print "The Template README.textile file has not been updated." - return False - - editor = os.getenv('EDITOR') - if not editor: - editor='/usr/bin/vim' - print "Edit the README.textile template fragment if changes are needed." - os.system(editor + ' ' + self.repodir + '/' + TemplateRN._readme) - - self.add(TemplateRN._readme) - return True - - # TODO: Create Unit Test - def updateTemplateSeries(self): - answer=yninput("Would you like to update the Template README.textile.series file?", "N") - if answer != "y": - print "The Template README.textile.series file has not been updated." - return False - - editor = os.getenv('EDITOR') - if not editor: - editor='/usr/bin/vim' - print "Edit the README.textile.series template fragment if changes are needed." - os.system(editor + ' ' + self.repodir + '/' + TemplateRN._series) - - self.add(TemplateRN._series) - return True - - def getReadme(self): - return self.repodir + '/' + TemplateRN._readme - - def getSeries(self): - return self.repodir + '/' + TemplateRN._series - -class TestTemplateRN(unittest.TestCase): - testdirprefix="TemplateRNUT" - - # Every instance of TestClass requires its own proj directory. - def setUp(self): - self.proj=Proj(prefix=TestTemplateRN.testdirprefix, persist=True) - - def tearDown(self): - self.proj.cleanup() - - def test_getREADME(self): - self.rnrepo=Clone(self.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') - self.rntemplate=TemplateRN(self.proj,self.rnrepo) - compare=self.proj.projdir + '/template/' + TemplateRN._readme - self.assertEqual(self.rntemplate.getReadme(),compare) - self.assertTrue(os.path.isfile(self.rntemplate.getReadme())) - - def test_getSERIES(self): - self.rnrepo=Clone(self.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') - self.rntemplate=TemplateRN(self.proj,self.rnrepo) - compare=self.proj.projdir + '/template/' + TemplateRN._series - self.assertEqual(self.rntemplate.getSeries(),compare) - self.assertTrue(os.path.isfile(self.rntemplate.getSeries())) - - - # TODO: Test branchname_suffix - -if __name__ == '__main__': - #logging.basicConfig(level="INFO") - unittest.main() diff --git a/testrn.py b/testrn.py new file mode 100644 index 0000000..196cbaf --- /dev/null +++ b/testrn.py @@ -0,0 +1,24 @@ +from unittest +from rn import create_parser + +class TestRn(TestCase): +""" +Base TestCase class, sets up a CLI parser +""" +@classmethod +def setUpClass(cls): + parser = create_parser() + cls.parser = parser + +def test_foo(self): +# args = self.parser.parse_args(['foo', '-R', '', '-A', 'idbs81839']) +# result = ping(args.tags, args.region, args.ami) +# self.assertIsNotNone(result) + +# TODO: test %Y.%m.%d dates +# TODO: test %Y.%m dates +# TODO: test non-conformant dates +# TODO: snapshots/linaro-5.3-2016.05-6 -c -d "2016.06." --strict + +if __name__ == '__main__': + unittest.main() diff --git a/vers.py b/vers.py deleted file mode 100644 index 6126a01..0000000 --- a/vers.py +++ /dev/null @@ -1,615 +0,0 @@ -import unittest -import logging - -# Abstract baseclass for Vendor and Package that provides some basic -# typechecking. -class PV(object): - def __init__(self, pv=None): - if isinstance(pv, basestring): - self.pv=pv - else: - raise TypeError("Input to PV must be of type basestring.") - - def __str__(self): - return self.pv - - def lower(self): - return self.pv.lower() - -class Vendor(PV): - def __init__(self, vendor="Linaro"): - try: - super(Vendor,self).__init__(vendor) - except TypeError: - raise TypeError("Input 'vendor' must be of type basestring.") - -class Version(object): - # @major - required int or str representation of int. - # @minor - optional int or str representation of int, otherwise None. - # If None, then minor will not be output. - # @point - optional int or str representation of int, otherwise None. - # If None, then point will not be output. - def __init__(self, major, minor=None, point=None): - - if isinstance(major, int) or isinstance(major, basestring): - # This will throw an exception if 'major' is an empty string. - self.major=int(major) - else: - raise TypeError("major must be of type int, string.") - - if isinstance(minor, int) or isinstance(minor, basestring): - # This will throw an exception if 'minor' is an empty string. - self.minor=int(minor) - elif not minor: - self.minor=None - else: - raise TypeError("minor must be of type int, string, or None.") - - if isinstance(point, int) or isinstance(point, basestring): - - # This is illegal, as it'd mess up the output and make it look like - # there's a minor that's really a point. - if not minor and point: - raise RuntimeError("minor must be specified if point is specified.") - - # This will throw an exception if 'minor' is an empty string. - self.point=int(point) - elif not point: - self.point=None - else: - raise TypeError("point must be of type int, string, or None.") - - def __str__(self): - ver=str(self.major) - if self.minor: - ver=ver + u'.' + str(self.minor) - if self.point: - ver=ver + u'.' + str(self.point) - return ver - - # TODO: I'm sure there's a more pythonic way to do this but this is - # quick and dirty. - # %M - Major - # %m - Minor - # %p - Point - # delimiter - Delimiter character to use in output: space or dash. - # TODO: Use a dictionary with replace - def strfversion(self, formatstr, delimiter='.'): - toks=[str(x) for x in formatstr.split('%') if x.strip()] - - delstoks=len(toks) - verstoks=0 - - if self.major: - verstoks=verstoks+1 - if self.minor: - verstoks=verstoks+1 - if self.point: - verstoks=verstoks+1 - - if delstoks > verstoks: - numtoks=verstoks - 1 - else: - numtoks=delstoks - 1 - - vers=u'' - for tok in toks: - if tok == 'M': - vers=vers + str(self.major) - elif tok == 'm': - vers=vers + str(self.minor) - elif tok == 'p': - vers=vers + str(self.point) - if numtoks > 0: - vers=vers + delimiter - numtoks=numtoks-1 - return vers - -# Helper function which returns a Version. -def versionFromStr(version): - if not isinstance(version, basestring): - raise TypeError('input must be of type basestring') - - major=version.split('.', 1)[0] - - # split('.',1) will return index error on "5" but will return empty string on "5." - # Any empty string is not a valid input into Version. - if major == u'': - major=None - - try: - minor=version.split('.', 2)[1] - except IndexError: - minor=None - - if minor == u'': - minor=None - - try: - point=version.split('.', 3)[2] - except IndexError: - point=None - - if point == u'': - point=None - - return Version(major, minor, point) - -class Package(PV): - # @package - default = GCC - # @version - required. Either a string representing a version or a Version - # instance. - def __init__(self, package="GCC", version=None): - #TODO: Make sure package is not None. - if package == u'': - raise TypeError("Package must not be an empty string") - - # TODO: Provide a list of known inputs and test against those? - try: - super(Package,self).__init__(package) - except TypeError: - raise TypeError("Input 'package' must be of type basestring.") - - if not version: - raise RuntimeError("Package requires a version.") - elif isinstance(version, Version): - self.version=version - else: - self.version=versionFromStr(version) - - def get_package(self): - return super(Package,self).__str__() - - def get_version(self): - return str(self.version) - - def __str__(self): - return super(Package,self).__str__() + u'-' + str(self.version) - - def lower(self): - return super(Package,self).lower() + u'-' + str(self.version) - - #TODO: Create format function which can change the separator to ' ' or to '-'. - -def packageFromStr(package=None): - if not isinstance(package, basestring): - raise TypeError('packageFromStr requires a string as input') - - # Get the part before the version numbers. That's the package - try: - package_name=package.rsplit('-', 1)[0] - except IndexError: - raise RuntimeError('string must be -') - - try: - package_version=package.rsplit('-', 1)[1] - except IndexError: - raise RuntimeError('string must be -') - - if package_name == u'': - raise TypeError("Couldn't parse a package name from input string") - if package_version == u'': - raise TypeError("Couldn't parse a package version from input string") - - # This will throw exceptions if the input are malformed. - package=Package(package_name, package_version) - return package - -class Vers(object): - def __init__(self, val=None): - - if val is None: - self.val=0 - return - - if isinstance(val, int) or isinstance(val, basestring): - # This should raise an exception if the string can't be converted - # to an int. If val is already an int it's a nop. - intval=int(val) - - if intval < 0: - raise ValueError('Input val must not be negative') - self.val=intval - else: - raise TypeError("Input val must be of type int or string.") - - def increment(self): - self.val=self.val+1 - - # Return the "-" string or an empty string. - def __str__(self): - # Don't return - if self.val is None or self.val == 0: - return "" - else: - return "-" + str(self.val) - -class Spin(Vers): - def __init__(self, spin=None): - try: - super(Spin,self).__init__(spin) - except ValueError: - raise ValueError('Input spin must not be negative') - -class Rc(Vers): - def __init__(self, rc=None): - try: - super(Rc,self).__init__(rc) - except ValueError: - raise ValueError('Input rc must not be negative') - - # The string representation differs slighty from Spin - def __str__(self): - # Don't return - if self.val is None or self.val == 0: - return "" - else: - return "-rc" + str(self.val) - -class TestVersion(unittest.TestCase): - def test_no_major(self): - with self.assertRaises(TypeError): - version=Version(major=None) - with self.assertRaises(TypeError): - version=Version() - - def test_empty_major(self): - with self.assertRaises(ValueError): - version=Version(major="") - - def test_empty_minor(self): - with self.assertRaises(ValueError): - version=Version(major=5, minor="") - - def test_empty_point(self): - with self.assertRaises(ValueError): - version=Version(major=5, minor=3, point="") - - def test_non_int_major(self): - with self.assertRaises(ValueError): - version=Version('A') - - def test_int_major(self): - version=Version(5) - self.assertEqual(str(version), "5") - version2=Version(major=5) - self.assertEqual(str(version2), "5") - - def test_intstr_major(self): - version=Version("5") - self.assertEqual(str(version), "5") - version2=Version(major="5") - self.assertEqual(str(version2), "5") - - def test_non_int_minor(self): - with self.assertRaises(ValueError): - version=Version(major="5", minor="A") - - def test_int_minor(self): - version=Version(major=5, minor=3) - self.assertEqual(str(version), "5.3") - - def test_intstr_minor(self): - version=Version(major="5", minor="3") - self.assertEqual(str(version), "5.3") - - def test_no_minor(self): - version=Version(major="5", minor=None) - self.assertEqual(str(version), "5") - - def test_point_no_minor(self): - with self.assertRaises(RuntimeError): - version=Version(major=5, minor=None, point=1) - - def test_int_point(self): - version=Version(major=5, minor=3, point=1) - self.assertEqual(str(version), "5.3.1") - - def test_non_int_point(self): - with self.assertRaises(ValueError): - version=Version(major=5, minor=3, point="A") - - def test_intstr_point(self): - version=Version(major="5", minor="3", point="1") - self.assertEqual(str(version), "5.3.1") - - def test_no_point(self): - version=Version(major="5", minor="3", point=None) - self.assertEqual(str(version), "5.3") - - def test_strfversion(self): - version=Version(major="5", minor="3", point="1") - self.assertEqual(version.strfversion("%M%m%p", delimiter='-'),"5-3-1") - self.assertEqual(version.strfversion("%M%m%p"),"5.3.1") - -class TestVendor(unittest.TestCase): - def test_vendor_default(self): - vendor=Vendor() - self.assertEqual(str(vendor), "Linaro") - self.assertEqual(vendor.lower(), "linaro") - - def test_none_input(self): - with self.assertRaises(TypeError): - vendor=Vendor(vendor=None) - - def test_str_input(self): - vendor=Vendor("TestVendor") - self.assertEqual(str(vendor), "TestVendor") - self.assertEqual(vendor.lower(), "testvendor") - -class TestPackage(unittest.TestCase): - - def test_package_no_version(self): - with self.assertRaises(RuntimeError): - # We require a version. - package=Package() - - def test_package_default(self): - package=Package(version="5.3.1") - self.assertEqual(str(package), "GCC-5.3.1") - self.assertEqual(package.lower(), "gcc-5.3.1") - self.assertEqual(package.pv, "GCC") - - def test_none_package(self): - with self.assertRaises(TypeError): - package=Package(package=None, version="5.3.1") - - def test_none_version(self): - with self.assertRaises(RuntimeError): - package=Package(package="GCC", version=None) - - def test_with_Version(self): - version=Version(5,3,1) - package=Package(version=version) - self.assertEqual(str(package), "GCC-5.3.1") - self.assertEqual(package.lower(), "gcc-5.3.1") - - def test_package_version(self): - version=Version(5,3,1) - package=Package(version=version) - self.assertEqual(str(package.version), "5.3.1") - self.assertEqual(package.get_version(), "5.3.1") - self.assertEqual(package.get_package(), "GCC") - - def test_str_input(self): - package=Package("TestPackage", version="5.3.1") - self.assertEqual(str(package), "TestPackage-5.3.1") - self.assertEqual(package.lower(), "testpackage-5.3.1") - - def test_empty_package_name(self): - with self.assertRaises(TypeError): - package=Package(package="", version="5.3.1") - -class TestSpin(unittest.TestCase): - - def test_spin_val(self): - spin=Spin(1) - self.assertEqual(spin.val, 1) - - def test_spin_val_str(self): - spin=Spin(1) - self.assertEqual(str(spin), "-1") - - def test_spin_increment_val(self): - spin=Spin(1) - spin.increment() - self.assertEqual(spin.val, 2) - - def test_spin_increment_str(self): - spin=Spin(1) - spin.increment() - self.assertEqual(str(spin), "-2") - - def test_zero_spin(self): - spin=Spin(0) - self.assertEqual(spin.val, 0) - - def test_zero_spin_str(self): - spin=Spin(0) - self.assertEqual(str(spin), "") - - def test_none_spin(self): - spin=Spin(None) - self.assertEqual(spin.val, 0) - - def test_none_spin_increment(self): - spin=Spin(None) - spin.increment() - self.assertEqual(spin.val, 1) - - def test_none_spin_str(self): - spin=Spin(None) - self.assertEqual(str(spin), "") - - def test_default_spin(self): - spin=Spin() - self.assertEqual(spin.val, 0) - - def test_default_spin_str(self): - spin=Spin() - self.assertEqual(str(spin), "") - - def test_negative_val(self): - with self.assertRaises(ValueError): - spin=Spin(-1) - - def test_default_spin_str(self): - spin=Spin() - self.assertEqual(str(spin), "") - - def test_spin_string_input(self): - spin=Spin("9") - self.assertEqual(spin.val, 9) - self.assertEqual(str(spin), "-9") - - def test_spin_negative_string_input(self): - with self.assertRaises(ValueError): - spin=Spin("-9") - - def test_float_type(self): - with self.assertRaises(TypeError): - spin=Spin(7.0) - -class TestRc(unittest.TestCase): - def test_rc_val(self): - rc=Rc(1) - self.assertEqual(rc.val, 1) - - def test_rc_str(self): - rc=Rc(1) - self.assertEqual(str(rc), "-rc1") - - def test_negative_val(self): - with self.assertRaises(ValueError): - rc=Rc(-1) - - def test_zero_rc_str(self): - rc=Rc(0) - self.assertEqual(str(rc), "") - - def test_none_rc_str(self): - rc=Rc(None) - self.assertEqual(str(rc), "") - - def test_default_rc_str(self): - rc=Rc() - self.assertEqual(str(rc), "") - - def test_rc_increment_val(self): - rc=Rc(1) - rc.increment() - self.assertEqual(rc.val, 2) - - def test_rc_increment_str(self): - rc=Rc(1) - rc.increment() - self.assertEqual(str(rc), "-rc2") - - def test_none_rc_increment_str(self): - rc=Rc(None) - rc.increment() - self.assertEqual(str(rc), "-rc1") - - def test_default_rc_str(self): - rc=Rc() - self.assertEqual(str(rc), "") - - def test_rc_string_input(self): - rc=Rc("9") - self.assertEqual(rc.val, 9) - self.assertEqual(str(rc), "-rc9") - - def test_rc_negative_string_input(self): - with self.assertRaises(ValueError): - rc=Rc("-9") - - def test_float_type(self): - with self.assertRaises(TypeError): - rc=Rc(7.0) - -class TestPackageFromString(unittest.TestCase): - - def test_none(self): - with self.assertRaises(TypeError): - package=packageFromStr(package=None) - - def test_with_package(self): - package=Package("GCC", "5.3.1") - with self.assertRaises(TypeError): - package=packageFromStr(package=package) - - def test_with_no_package_name(self): - with self.assertRaises(TypeError): - package=packageFromStr(package="-5.3.1") - - with self.assertRaises(RuntimeError): - package=packageFromStr(package="5.3.1") - - def test_with_no_package_version(self): - with self.assertRaises(TypeError): - package=packageFromStr(package="GCC-") - - with self.assertRaises(RuntimeError): - package=packageFromStr(package="GCC") - - def test_with_space(self): - with self.assertRaises(RuntimeError): - package=packageFromStr(package="GCC 5.3.1") - - def test_correct_usage(self): - package=packageFromStr(package="GCC-5.3.1") - self.assertEqual(str(package), "GCC-5.3.1") - self.assertEqual(package.lower(), "gcc-5.3.1") - self.assertEqual(package.pv, "GCC") - self.assertEqual(str(package.version), "5.3.1") - self.assertEqual(package.version.major, 5) - self.assertEqual(package.version.minor, 3) - self.assertEqual(package.version.point, 1) - -class TestVersionFromStr(unittest.TestCase): - - def test_extra_dot(self): - version=versionFromStr("5.3.1.4") - self.assertEqual(version.major, 5) - self.assertEqual(version.minor, 3) - self.assertEqual(version.point, 1) - - def test_major_minor_point(self): - version=versionFromStr("5.3.1") - self.assertEqual(version.major, 5) - self.assertEqual(version.minor, 3) - self.assertEqual(version.point, 1) - - def test_major_minor_dot(self): - version=versionFromStr("5.3.") - self.assertEqual(version.major, 5) - self.assertEqual(version.minor, 3) - self.assertEqual(version.point, None) - - def test_major_minor(self): - version=versionFromStr("5.3") - self.assertEqual(version.major, 5) - self.assertEqual(version.minor, 3) - self.assertEqual(version.point, None) - - def test_major_dot(self): - version=versionFromStr("5.") - self.assertEqual(version.major, 5) - self.assertEqual(version.minor, None) - self.assertEqual(version.point, None) - - def test_major(self): - version=versionFromStr("5") - self.assertEqual(version.major, 5) - self.assertEqual(version.minor, None) - self.assertEqual(version.point, None) - - def test_empty(self): - with self.assertRaises(TypeError): - version=versionFromStr("") - - def test_non_int_major(self): - with self.assertRaises(ValueError): - version=versionFromStr("a.b.c") - - def test_non_int_minor(self): - with self.assertRaises(ValueError): - version=versionFromStr("5.b.1") - - def test_non_int_point(self): - with self.assertRaises(ValueError): - version=versionFromStr("5.3.a") - - def test_non_string(self): - version=Version(5,3,1) - with self.assertRaises(TypeError): - version=versionFromStr(version) - - def test_none(self): - with self.assertRaises(TypeError): - version=versionFromStr(version=None) - -if __name__ == '__main__': - #logging.basicConfig(level="INFO") - unittest.main() - diff --git a/workdir.py b/workdir.py deleted file mode 100644 index e939dd0..0000000 --- a/workdir.py +++ /dev/null @@ -1,190 +0,0 @@ -import unittest -import logging -import os - -import uuid - -from proj import Proj - -from gitrepo import cd -from gitrepo import GitRepo - -from clone import Clone -from clone import cd - -from sh import git -from sh import git_new_workdir -from sh import ErrorReturnCode - -# TODO: Set the git repo pushurl. - -class Workdir(GitRepo): - # @ clone - clone this workdir from this Clone. - # @ workdir - name of the proposed workdir. - # @ track - derive the workdir from the branch designated by the string - # in 'track'. - # @ branchname - the name of the local branch - def __init__(self, proj, clone=None, workdir="", track=None, branchname=""): - super(Workdir,self).__init__(proj) - - # TODO: The Clone can be in a different project dir than proj. if - # clone.clonedir is not in proj.projdir then create a symlink in the - # current projdir. - - if not isinstance(clone, Clone): - raise TypeError('Clone input parameter is not of type Clone') - - if not isinstance(workdir, str): - raise TypeError('Workdir workdir parameter must be a string') - - # An empty workdir would result in an empty variable expansion - # and a malformed git new-workdir expression. - if workdir=="": - raise EnvironmentError('You must specify a workdir directory when creating a Workdir') - - if branchname=="": - raise EnvironmentError('You must specify a branchname when creating a Workdir') - - try: - with cd(self.proj.projdir): - logging.info("Workdir(): going to call git new-workdir tracking %s" % track) - if track: - logging.info("Workdir(): Calling git new-workdir for workdir %s, tracking %s" % (workdir, track)) - #TODO: Do we want to prevalidate that 'track' is a valid branch or tag? - # If we don't this will just throw an exception. - print git_new_workdir(clone.clonedir(), workdir, track, _err_to_out=True, _out="/dev/null") - else: - logging.info("Workdir(): Calling git new-workdir for workdir %s, tracking %s" % (workdir, "origin/HEAD")) - # Always just checkout HEAD if the track is not specified. - print git_new_workdir(clone.clonedir(), workdir, "origin/HEAD", _err_to_out=True, _out="/dev/null") - - except ErrorReturnCode as exc: - # if 'workdir' is None - # If 'workdir' already exists. - # If clone.clonedir() doesn't exist. - # If track isn't a valid branch. - raise EnvironmentError("Unable to create a git workdir ") - - self.repodir=self.proj.projdir + '/' + workdir - - try: - with cd(self.repodir): - git("checkout", "-b", branchname) - except ErrorReturnCode as exc: - raise EnvironmentError("Unable to checkout branch %s in workdir %s" % (branchname, self.repodir)) - - - #TODO: Verify that self.repodir exists or 'raise' exception? - - # TODO: write this function or the workdir will remain in detached - # head state if it's not checked out into a branch. - # def checkout(self, ): - - def workdir(self): - return self.repodir - -class TestWorkdir(unittest.TestCase): - testdirprefix="WorkdirUT" - - @classmethod - def setUpClass(cls): - # We need a Proj dir to store the necessary Clone in. - cls.proj=Proj(prefix=TestWorkdir.testdirprefix) - - #TODO: Perform a connectivity test to determine online or offline - # testing mode. - - # We'll create a Clone to be used by all the Workdir tests in order to - # be nice to the git server. - cls.rnclone=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') - #OFFLINE: cls.rnclone=Clone(cls.proj,remote=u'/var/run/media/ryanarn/sidecar/reldir/release-notes') - - # All further tests will use this as the remote if they need to - # create more clones. - cls.rnremote=cls.rnclone.repodir - - # A common name to use for the workdir for all of the Workdir tests. - cls.workdir="testworkdir" - - @classmethod - def tearDownClass(cls): - cls.proj.cleanup() - - # Every instance of TestWorkdir requires its own proj directory. - def setUp(self): - self.proj=Proj(prefix=TestWorkdir.testdirprefix) - self.branchname="testworkdir-" + str(uuid.uuid4()) - - def tearDown(self): - self.proj.cleanup() - - def test_workdir(self): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track=None, branchname=self.branchname) - self.assertTrue(os.path.isdir(self.workdir.workdir())) - - def test_workdir_with_default_track(self): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) - self.assertTrue(os.path.isdir(self.workdir.workdir())) - - def test_workdir_with_track(self): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) - self.assertTrue(os.path.isdir(self.workdir.workdir())) - - def test_workdir_with_checkoutbranch(self): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname="startbranch") - self.assertTrue(os.path.isdir(self.workdir.workdir())) - - # Test a new branch (derived from the existing branch) - with self.workdir.checkoutbranch("TestWorkdir", self.workdir.getbranch()): - self.assertEqual("TestWorkdir", self.workdir.getbranch()) - - # Verify that leaving the context restored the original branch. - self.assertEqual(self.workdir.getbranch(), "startbranch") - - # Test being able to switch to a branch that already exists. - with self.workdir.checkoutbranch("TestWorkdir"): - self.assertEqual("TestWorkdir", self.workdir.getbranch()) - - self.assertEqual(self.workdir.getbranch(), "startbranch") - - def test_workdir_already_exists(self): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) - - # Should fail because TestWorkdir.workdir should already exist. - with self.assertRaises(EnvironmentError): - self.workdir2=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname + '_2') - - def test_workdir_exceptions(self): - with self.assertRaises(TypeError): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=None, branchname=self.branchname) - - with self.assertRaises(EnvironmentError): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="someinvalidbranchname", branchname=self.branchname) - - # Test if clone.clonedir() doesn't exist by breaking repodir. - brokenclone=Clone(TestWorkdir.proj,TestWorkdir.rnremote) - brokenclone.repodir="/breakthisrepodir" - - # Verify that a Clone with a broken repodir/clonedir results in an - # exception. - with self.assertRaises(EnvironmentError): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) - - # 'clone' isn't a valid Clone type. - with self.assertRaises(TypeError): - self.workdir=Workdir(self.proj, clone="foobar", workdir=TestWorkdir.workdir, branchname=self.branchname) - - def test_workdir_exceptions(self): - #TODO: branchname is empty. - with self.assertRaises(EnvironmentError): - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname="") - - def test_workdir_exceptions(self): - #TODO: branchname already exists. - self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) - with self.assertRaises(EnvironmentError): - self.workdir2=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) - -if __name__ == '__main__': - #logging.basicConfig(level="INFO") - unittest.main() -- cgit v1.2.3