diff options
author | Ryan S. Arnold <ryan.arnold@linaro.org> | 2016-08-10 14:03:55 -0500 |
---|---|---|
committer | Ryan S. Arnold <ryan.arnold@linaro.org> | 2016-08-10 14:03:55 -0500 |
commit | d3848d12854f5b19351bf1ff5e2d7cc95d8fc95c (patch) | |
tree | 13c8c3a9615b641b25e6ae23aaaf55e92fc2d821 | |
parent | e5eb475b3cf929953b3e59eb31a04d4d364a1566 (diff) |
Generate release notes with python scripting.
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | clone.py | 312 | ||||
-rw-r--r-- | custom_wordwrap.py | 31 | ||||
-rw-r--r-- | gccclone.py | 27 | ||||
-rw-r--r-- | gitrepo.py | 234 | ||||
-rw-r--r-- | handle_exit.py | 157 | ||||
-rw-r--r-- | linaroseries.py | 362 | ||||
-rw-r--r-- | proj.py | 117 | ||||
-rw-r--r-- | rn.py | 135 | ||||
-rw-r--r-- | rngen.py | 303 | ||||
-rw-r--r-- | rninput.py | 35 | ||||
-rw-r--r-- | rnseries.py | 193 | ||||
-rw-r--r-- | series.py | 550 | ||||
-rwxr-xr-x | tcwg-release.sh | 30 | ||||
-rw-r--r-- | template.py | 94 | ||||
-rw-r--r-- | vers.py | 615 | ||||
-rw-r--r-- | workdir.py | 190 |
17 files changed, 3390 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c09b696 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.pyc +*.textile +*.html +*.csv +announce*.txt diff --git a/clone.py b/clone.py new file mode 100644 index 0000000..a3544bf --- /dev/null +++ b/clone.py @@ -0,0 +1,312 @@ +import unittest +import logging +import os + +from sh import git +from sh import ln +from sh import mkdir +from sh import ErrorReturnCode +from sh import chmod + +# In order to set directory read/write permissions for test cases +from stat import * + +from proj import Proj +from gitrepo import cd +from gitrepo import GitRepo + +import time + +class Clone(GitRepo): + def __init__(self, proj, clonedir=None, remote=None ): + super(Clone,self).__init__(proj) + + # If the user passes a clonedir then they want to reuse an existing + # clone. This will often be the case with large repositories like + # gcc, which take 20 minutes to clone. + + # Clone a new repo from the remote + if clonedir is None or (isinstance(clonedir, basestring) and clonedir == ""): + if remote=="" or remote is None: + raise TypeError('Clone input parameter \'remote\' can not be empty') + + # TODO test that remote is a well formed URL? + # TODO if there's a trailing / strip it. + self.remote=remote + + # Strip the end off the repo url to find the clonedir name. + clone_dirname=remote.rsplit('/', 1)[-1] + + logging.info('Cloning %s repository into %s/%s' % (self.remote, self.proj.projdir, clone_dirname)) + + self.repodir=self.proj.projdir + '/' + clone_dirname + try: + with cd(self.proj.projdir): + git("clone", self.remote, clone_dirname, _err_to_out=True, _out="/dev/null") + except ErrorReturnCode: + raise EnvironmentError("Git unable to clone into " + self.repodir) + + + # Reuse an existing git clone directory + else: + if not isinstance(clonedir, basestring): + raise TypeError('clonedir must be of type basestring') + elif not os.path.isdir(clonedir): + raise EnvironmentError("Specified clonedir directory '%s' does not exist" % clonedir) + + # Check to see if clonedir is a git repository. + with cd(clonedir): + # There might be a file called .git + if not os.path.isdir(clonedir + '/.git'): + try: + git("rev-parse", _err_to_out=True, _out="/dev/null") + except ErrorReturnCode: + raise EnvironmentError('Specified clonedir is not a git repository.') + + # Strip off the directory name + clone_dirname=clonedir.rsplit('/', 1)[-1] + self.repodir=self.proj.projdir + '/' + clone_dirname + + # TODO: verify that we don't get another clone dir in the projdir. + # It's possible the clonedir is already in the projdir. + if os.path.exists(self.repodir): + logging.info('The clonedir %s already exists in %s' % (clonedir, self.repodir)) + else: + logging.info('Reusing existing clonedir %s and symlinking as %s in %s' % (clonedir, self.repodir, self.proj.projdir)) + try: + ln("-s", clonedir, self.repodir) + except ErrorReturnCode: + raise EnvironmentError("Can't create symlink %s to %s." % (clonedir, self.repodir)) + + logging.info('Clone directory set as %s' % (self.repodir)) + return + + def clonedir(self): + return self.repodir + + def __repr__(self): + return "<Clone Class: remote = " + self.remote + ", clonedir =" + self.repodir +">" + +class TestClone(unittest.TestCase): + testdirprefix="CloneUT" + + @classmethod + def setUpClass(cls): + # We need a Proj dir to store the mother clone in. + cls.proj=Proj(prefix=TestClone.testdirprefix) + + # Setup a single remote clone in a Proj directory and then all of the + # other tests in this module should clone from this resulting local + # clone as if it were a remote. This will reduce the network burden on + # the remote git server being tested. If this fails then the entire + # TestClone will be marked as not executed. + + #TODO: Perform a connectivity test to determine online or offline + # testing mode. + + cls.rnclone=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + #OFFLINE: cls.rnclone=Clone(cls.proj,remote=u'/var/run/media/ryanarn/sidecar/reldir/release-notes') + cls.startbranch=cls.rnclone.getbranch() + + cls.rtremote=u'http://git.linaro.org/toolchain/tcwg-release-tools.git' + #OFFLINE: cls.rtremote=u'/var/run/media/ryanarn/sidecar/releases/tcwg-release-tools' + + # All further tests will use this as the remote. + cls.rnremote=cls.rnclone.repodir + + @classmethod + def tearDownClass(cls): + cls.proj.cleanup() + + # Every instance of TestClass requires its own proj directory. + def setUp(self): + # TODO: Is this redundant with the setUpClass()? + self.proj=Proj(prefix=TestClone.testdirprefix) + + def tearDown(self): + # TODO: Is this redundant with the tearDownClass()? + self.proj.cleanup() + + # These immediate tests clone from a remote repository + def test_clone(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + self.assertTrue(os.path.isdir(self.clone.clonedir())) + + def test_two_different_clones_in_one_projdir(self): + self.clonern=Clone(self.proj,remote=TestClone.rnremote) + self.clonert=Clone(self.proj,remote=TestClone.rtremote) + self.assertTrue(os.path.isdir(self.clonern.clonedir()) and os.path.isdir(self.clonert.clonedir())) + + # All clones after this point should use TestClone.rnremote as the remote. + + def test_str_function(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + self.assertEqual(str(self.clone), self.clone.clonedir()) + + def test_empty_string_clonedir_with_remote(self): + # This will use the remote because clonedir is an empty string. + self.clone=Clone(self.proj,remote=TestClone.rnremote, clonedir="") + self.assertTrue(os.path.isdir(self.clone.clonedir())) + + def test_none_clonedir_with_remote(self): + # This will use the remote because there is no clonedir + self.clone=Clone(self.proj,remote=TestClone.rnremote) + self.assertTrue(os.path.isdir(self.clone.clonedir())) + + def test_empty_remote_empty_clonedir(self): + # A bogus directory will fail. + with self.assertRaises(TypeError): + self.clone=Clone(self.proj,remote="", clonedir="") + + def test_non_string_clonedir(self): + self.foo=Clone(self.proj,remote=TestClone.rnremote, clonedir="") + with self.assertRaises(TypeError): + self.clone=Clone(self.proj, clonedir=self.foo) + + def test_no_string_clonedir_empty_remote(self): + # remote can't be be empty if clonedir is not set. + with self.assertRaises(TypeError): + self.clone=Clone(self.proj,remote="") + + def test_existing_clonedir(self): + # Clone's clonedir parameter allows a user to specify an existing + # directory use for the clone. It will setup a symlink in the proj + # directory pointing to the tree. This is a useful feature for + # very large repositories. + + # Use the clonedir from TestClone.rnclone + self.clone=Clone(self.proj,clonedir=TestClone.rnclone.clonedir()) + + # Verify that a symlink is used for the clone. + self.assertTrue(os.path.islink(self.clone.clonedir())) + + def test_failed_symlink(self): + # We'll make the projdir read-only so we can force symlink creation + # to fail. + with cd(self.proj.projdir): + # Set to read-only to force symlink to fail. + os.chmod(self.proj.projdir, S_IRUSR | S_IXUSR) + + # Use the clonedir from TestClone.rnclone + with self.assertRaises(EnvironmentError): + # Verify that this throws an exception. + self.clone=Clone(self.proj,clonedir=TestClone.rnclone.clonedir()) + + with cd(self.proj.projdir): + # Reset to writable so we can remove the projdir + os.chmod(self.proj.projdir, S_IWUSR | S_IXUSR) + + def test_clone_failure_due_to_write_permissions(self): + with cd(self.proj.projdir): + # Set to read-only to force the git clone to fail. + os.chmod(self.proj.projdir, S_IRUSR | S_IXUSR) + + with self.assertRaises(EnvironmentError): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + + with cd(self.proj.projdir): + # Reset to writable so we can remove the projdir + os.chmod(self.proj.projdir, S_IWUSR | S_IXUSR) + + def test_nonexistant_clonedir(self): + with self.assertRaises(EnvironmentError): + self.clone_one=Clone(self.proj, clonedir="/a/nonexistant/directory/") + + def test_not_gitdir(self): + # Verify that an exception is raised when attempting to clone something + # that's not a git repository. + nongitdir=self.proj.projdir + "/nongitdir" + with cd (self.proj.projdir): + os.mkdir(nongitdir) + + with self.assertRaises(EnvironmentError): + self.clone=Clone(self.proj,remote="", clonedir=nongitdir) + + def test_not_gitdir_but_with_dot_git(self): + # Verify that an exception is raised when attempting to clone a + # directory that has a .git file but is NOT a git repository. + nongitdir=self.proj.projdir + "/nongitdir" + with cd (self.proj.projdir): + os.mkdir(nongitdir) + # Create and empty .git file. + with open(".git", 'a'): + os.utime(".git", None) + + with self.assertRaises(EnvironmentError): + self.clone=Clone(self.proj,remote="", clonedir=nongitdir) + + def test_incorrect_input_proj_type(self): + # Clone requires a valid Proj type as an input parameter. Verify that + # it throws an exception. + proj='stringtypeonpurpose' + with self.assertRaises(TypeError): + self.clone=Clone(proj,remote=TestClone.rnremote) + + def test_non_destructive_cleanup(self): + # Make sure that the file removal method in proj.cleanup() doesn't + # follow and remove symlinks. + self.clone_one=Clone(self.proj,remote=TestClone.rnremote) + self.proj_two=Proj(prefix=TestClone.testdirprefix) + # Use the clonedir from the first project as the clonedir for the + # second project. + self.clone_two=Clone(self.proj_two,clonedir=self.clone_one.clonedir()) + self.assertTrue(os.path.islink(self.clone_two.clonedir())) + + # Cleanup proj_two + self.proj_two.cleanup() + + # Make sure clone_one.clonedir wasn't removed when proj_two was removed + # which is a concern since proj_two used a clone directory from + # proj. + self.assertTrue(os.path.isdir(self.clone_one.clonedir())) + + def test_get_branch_clone(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + self.assertEqual(TestClone.startbranch, self.clone.getbranch()) + + def test_checkoutbranch_clone(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + + # Test a new branch + with self.clone.checkoutbranch("TestClone", "origin/HEAD"): + self.assertEqual("TestClone", self.clone.getbranch()) + + self.assertEqual(TestClone.startbranch, self.clone.getbranch()) + + # Test being able to switch to a branch that already exists. + with self.clone.checkoutbranch("TestClone"): + self.assertEqual("TestClone", self.clone.getbranch()) + + self.assertEqual(TestClone.startbranch, self.clone.getbranch()) + + #TODO test a non-default 'track' passed to checkoutbranch. + + def test_checkoutbranch_clone_with_local_changes(self): + self.clone=Clone(self.proj,remote=TestClone.rnremote) + + # create an empty file and add it to the repository. + with cd (self.clone.repodir): + # Create and empty file. + with open("checkouttest", 'a'): + os.utime("checkouttest", None) + + self.clone.add(self.clone.repodir + '/checkouttest') + + with self.clone.checkoutbranch("TestClone", "origin/HEAD"): + self.assertEqual("TestClone", self.clone.getbranch()) + + # Verify there's no file name 'checkouttest' when we switch branch. + with cd (self.clone.repodir): + self.assertFalse(os.path.isfile(self.clone.repodir + '/checkouttest')) + + # Verify that we've returned to the correct branch. + self.assertEqual(TestClone.startbranch, self.clone.getbranch()) + + # Show that checkouttest does exist once were out of the + # checkoutbranch context. + with cd (self.clone.repodir): + self.assertTrue(os.path.isfile(self.clone.repodir + '/checkouttest')) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/custom_wordwrap.py b/custom_wordwrap.py new file mode 100644 index 0000000..4ed3e25 --- /dev/null +++ b/custom_wordwrap.py @@ -0,0 +1,31 @@ +#from jinja2.utils import Markup, escape, pformat, urlize, soft_unicode, \ +# unicode_urlencode +from jinja2.runtime import Undefined +#from jinja2.exceptions import FilterArgumentError +#from jinja2._compat import imap, string_types, text_type, iteritems + +def environmentfilter(f): + """Decorator for marking environment dependent filters. The current + :class:`Environment` is passed to the filter as first argument. + """ + f.environmentfilter = True + return f + +@environmentfilter +def do_customwordwrap(environment, s, width=79, break_long_words=True, + wrapstring=None, break_on_hyphens=False): + """ + Return a copy of the string passed to the filter wrapped after + ``79`` characters. You can override this default using the first + parameter. If you set the second parameter to `false` Jinja will not + split words apart if they are longer than `width`. By default, the newlines + will be the default newlines for the environment, but this can be changed + using the wrapstring keyword argument. + """ + if not wrapstring: + wrapstring = environment.newline_sequence + import textwrap + return wrapstring.join(textwrap.wrap(s, width=width, expand_tabs=False, + replace_whitespace=False, + break_long_words=break_long_words, + break_on_hyphens=break_on_hyphens)) diff --git a/gccclone.py b/gccclone.py new file mode 100644 index 0000000..dcb8b71 --- /dev/null +++ b/gccclone.py @@ -0,0 +1,27 @@ + +from clone import Clone +from sh import git +from gitrepo import cd + +class GCCClone(Clone): + def __init__(self, proj, clonedir=None, remote=None ): + super(GCCClone,self).__init__(proj, clonedir, remote) + + with cd(clonedir): + try: + with open('gcc/BASE-VER', 'r') as f: + self.basever = f.readline().strip() + except: + raise IOError('gcc/BASE-VER not found in ' + clonedir) + + log=git("log", "-n 1", "--grep=Merge branch") + for logline in log: + if logline.lstrip().startswith("Merge"): + # For some reason rsplit('.') isn't working. + self.fsfrev=logline.lstrip().rsplit(None,1)[1].split('.',1)[0] + + def get_base_version(self): + return self.basever + + def get_fsf_revision(self): + return self.fsfrev diff --git a/gitrepo.py b/gitrepo.py new file mode 100644 index 0000000..06a79de --- /dev/null +++ b/gitrepo.py @@ -0,0 +1,234 @@ +import unittest +import logging +import os + +from sh import git +from sh import ErrorReturnCode +from sh import ErrorReturnCode_128 + +import subprocess + +# Use this to manage 'cd' context +from contextlib import contextmanager + +from proj import Proj +# Return to prevdir when context is exited. +# Use in the following manner: +# +# with cd("foodir"): +# operation1_in_foodir +# operation2_in_foodir +# +# operation3_in_prevdir +# +@contextmanager +def cd(newdir): + prevdir = os.getcwd() + os.chdir(os.path.expanduser(newdir)) + try: + yield + finally: + os.chdir(prevdir) + +# This is called 'GitRepo' because a repository for a project doesn't only have +# to be a git repository. Abstract base class used to define the operations +# that both Clone and Workdir share. Notice that there is no actual repodir +# specified here. You must instantiate a subclass in order to use/test these +# operations. This is because 'clones' and 'workdirs' are created differently +# but you operate on them in the exact same way. +class GitRepo(object): + def __init__(self, proj): + self.repodir=u"" + if isinstance(proj, Proj): + self.proj=proj + else: + raise TypeError('proj input parameter is not of type Proj') + + def branchexists(self, branch): + with cd(self.repodir): + try: + # Quote the branchname because it might have strange + # characters in it. + br="%s" % branch + git("rev-parse", "--verify", br) + except ErrorReturnCode_128: + return False + return True + + def getbranch(self): + try: + with cd(self.repodir): + branch=git("rev-parse", "--abbrev-ref", "HEAD").stdout.rstrip() + except ErrorReturnCode: + raise EnvironmentError("Unable to get the current branch") + return branch + + # TODO: Fully test this. + # Stash current changes and change to new branch. + # Return to previous branch when context is exited. + @contextmanager + def checkoutbranch(self, branch, track=""): + with cd(self.repodir): + # The stashid will remain empty if there were no changes in the + # directory to stash. + stashid=u"" + + try: + self.prevbranch=git("rev-parse", "--abbrev-ref", "HEAD").stdout.rstrip() + except ErrorReturnCode: + raise EnvironmentError("Unable to get the current branch") + + try: + # TODO: do we want to do a blanket git add *? + stashid=git("stash", "create").stdout.rstrip() + # If there's no stash then there's no reason to save. + if stashid!="": + git("stash", "save") + except ErrorReturnCode: + raise EnvironmentError("Unable to get the current branch") + + try: + # TODO This is a nop as it is. + git("rev-parse", "--verify", branch) + # if the branch exists the previous statement won't cause an + # exception so we know we can just check it out. + git("checkout", branch) + except ErrorReturnCode: + try: + # if it doesn't exist we need to checkout a new branch + git("checkout", "-b", branch, track).stdout.rstrip() + except ErrorReturnCode as exc: + raise EnvironmentError("Unable to checkout -b %s %s" % (branch, track)) + try: + yield + finally: + try: + with cd(self.repodir): + git("checkout", self.prevbranch) + # If there's no stashid then there were no files stashed, + # so don't stash apply. + if stashid!="": + git("stash", "apply", stashid) + except ErrorReturnCode, exc: + raise EnvironmentError("Unable to return to the previous branch: %s" % exc.stderr) + + # TODO: Write a unit test for this. + def add(self, filetogitadd): + # TODO: Determine if filetogitadd is relative or absolute. + # TODO: verify that filetogitadd is in self.repodir + try: + with cd(self.repodir): + git("add", filetogitadd) + except ErrorReturnCode: + raise EnvironmentError("Unable to git add " + filetogitadd) + + # TODO: Write a unit test for this. + # TODO: fix the default + def commit(self, message="default"): + print "Committing changes." + try: + with cd(self.repodir): + # Git commit first with a boiler plate message and then allow the user + # to amend. + if git("status", "--porcelain"): + # print git("commit", "-m", '%s' % message, _err_to_out=True) + subprocess.call(["git", "commit", "-m", message]) + # using python sh will suppress the git editor + subprocess.call(["git", "commit", "--amend"]) + except ErrorReturnCode: + raise EnvironmentError("Unable to git commit ") + + def log(self, number): + try: + with cd(self.repodir): + print git("log", "-n %d" % number) + except ErrorReturnCode: + raise EnvironmentError("Unable to git add " + filetogitadd) + + # TODO: Does this need to 'cd' first? + def editFile(self, toedit): + editor = os.getenv('EDITOR') + if not editor: + editor='/usr/bin/vim' + os.system(editor + ' ' + self.repodir + '/' + toedit) + + self.add(toedit) + + return + + # TODO: Test this function with both dryrun=True and dryrun=False + def pushToBranch(self, tobranch, dryrun=True): + try: + # TODO: Check for un-merged commits. + with cd(self.repodir): + branch=self.getbranch() + if dryrun: + git("push", "--dry-run", branch, tobranch) + else: + git("push", branch, tobranch) + except ErrorReturnCode: + raise EnvironmentError("Unable to push branch %s to %s" % (branch, tobranch)) + + def __str__(self): + return self.repodir + +# TestGitRepo is an abstract baseclass. Verify that the constructor +# will throw an exception if you try to run operations without +# defining a subclass. +class TestGitRepo(unittest.TestCase): + repoprefix="GitRepoUT" + + # This class is only used to test the GitRepo functions since, as an, + # abstract-baseclass, GitRepo doesn't define repodir, and we need an + # actual repository to test git based functions. + class DerivedGitRepo(GitRepo): + def __init__(self, proj): + super(TestGitRepo.DerivedGitRepo,self).__init__(proj) + self.remote=u'http://git.linaro.org/toolchain/release-notes.git' + dgr_dirname="release-notes" + self.repodir=self.proj.projdir + "/" + dgr_dirname + try: + with cd(self.proj.projdir): + git("clone", self.remote, dgr_dirname, _err_to_out=True, _out="/dev/null") + except ErrorReturnCode: + raise EnvironmentError("Git unable to clone into " + self.repodir) + + @classmethod + def setUpClass(cls): + cls.proj=Proj(prefix=TestGitRepo.repoprefix, persist=False) + cls.dgr=TestGitRepo.DerivedGitRepo(cls.proj) + + @classmethod + def tearDownClass(cls): + cls.proj.cleanup() + + # Test that the abstract baseclass throws an exception if one of the + # functions are called from the baseclass without a derived class. + def test_repo(self): + self.repo=GitRepo(self.proj) + with self.assertRaises(OSError): + # We haven't set a repodir so this should throw an exception. + branch=self.repo.getbranch() + + # TODO: Create a test-branch in the repo so it's always there. + + def test_branchexists(self): + self.assertTrue(self.dgr.branchexists("remotes/origin/releases/linaro-4.9-2015.05")) + + def test_not_branchexists(self): + self.assertFalse(self.dgr.branchexists("foobar")) + + def test_getbranch(self): + with cd(self.dgr.repodir): + try: + git("checkout", "-b", "master_test", "origin/master").stdout.rstrip() + except ErrorReturnCode as exc: + raise EnvironmentError("Unable to checkout -b %s %s" % (branch, track)) + self.assertEquals(self.dgr.getbranch(), "master_test") + + # TODO: Test checkoutbranch with various combinations of polluted + # directories. + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/handle_exit.py b/handle_exit.py new file mode 100644 index 0000000..7eb4ae1 --- /dev/null +++ b/handle_exit.py @@ -0,0 +1,157 @@ +# Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com> +# License: MIT + +from __future__ import with_statement +import contextlib +import signal +import sys + + +def _sigterm_handler(signum, frame): + sys.exit(0) +_sigterm_handler.__enter_ctx__ = False + + +@contextlib.contextmanager +def handle_exit(callback=None, append=False): + """A context manager which properly handles SIGTERM and SIGINT + (KeyboardInterrupt) signals, registering a function which is + guaranteed to be called after signals are received. + Also, it makes sure to execute previously registered signal + handlers as well (if any). + + >>> app = App() + >>> with handle_exit(app.stop): + ... app.start() + ... + >>> + + If append == False raise RuntimeError if there's already a handler + registered for SIGTERM, otherwise both new and old handlers are + executed in this order. + """ + old_handler = signal.signal(signal.SIGTERM, _sigterm_handler) + if (old_handler != signal.SIG_DFL) and (old_handler != _sigterm_handler): + if not append: + raise RuntimeError("there is already a handler registered for " + "SIGTERM: %r" % old_handler) + + def handler(signum, frame): + try: + _sigterm_handler(signum, frame) + finally: + old_handler(signum, frame) + signal.signal(signal.SIGTERM, handler) + + if _sigterm_handler.__enter_ctx__: + raise RuntimeError("can't use nested contexts") + _sigterm_handler.__enter_ctx__ = True + + try: + yield + except KeyboardInterrupt: + pass + except SystemExit, err: + # code != 0 refers to an application error (e.g. explicit + # sys.exit('some error') call). + # We don't want that to pass silently. + # Nevertheless, the 'finally' clause below will always + # be executed. + if err.code != 0: + raise + finally: + _sigterm_handler.__enter_ctx__ = False + if callback is not None: + callback() + + +if __name__ == '__main__': + # =============================================================== + # --- test suite + # =============================================================== + + import unittest + import os + + class TestOnExit(unittest.TestCase): + + def setUp(self): + # reset signal handlers + signal.signal(signal.SIGTERM, signal.SIG_DFL) + self.flag = None + + def tearDown(self): + # make sure we exited the ctx manager + self.assertTrue(self.flag is not None) + + def test_base(self): + with handle_exit(): + pass + self.flag = True + + def test_callback(self): + callback = [] + with handle_exit(lambda: callback.append(None)): + pass + self.flag = True + self.assertEqual(callback, [None]) + + def test_kinterrupt(self): + with handle_exit(): + raise KeyboardInterrupt + self.flag = True + + def test_sigterm(self): + with handle_exit(): + os.kill(os.getpid(), signal.SIGTERM) + self.flag = True + + def test_sigint(self): + with handle_exit(): + os.kill(os.getpid(), signal.SIGINT) + self.flag = True + + def test_sigterm_old(self): + # make sure the old handler gets executed + queue = [] + signal.signal(signal.SIGTERM, lambda s, f: queue.append('old')) + with handle_exit(lambda: queue.append('new'), append=True): + os.kill(os.getpid(), signal.SIGTERM) + self.flag = True + self.assertEqual(queue, ['old', 'new']) + + def test_sigint_old(self): + # make sure the old handler gets executed + queue = [] + signal.signal(signal.SIGINT, lambda s, f: queue.append('old')) + with handle_exit(lambda: queue.append('new'), append=True): + os.kill(os.getpid(), signal.SIGINT) + self.flag = True + self.assertEqual(queue, ['old', 'new']) + + def test_no_append(self): + # make sure we can't use the context manager if there's + # already a handler registered for SIGTERM + signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0)) + try: + with handle_exit(lambda: self.flag.append(None)): + pass + except RuntimeError: + pass + else: + self.fail("exception not raised") + finally: + self.flag = True + + def test_nested_context(self): + self.flag = True + try: + with handle_exit(): + with handle_exit(): + pass + except RuntimeError: + pass + else: + self.fail("exception not raised") + + unittest.main() diff --git a/linaroseries.py b/linaroseries.py new file mode 100644 index 0000000..382ed08 --- /dev/null +++ b/linaroseries.py @@ -0,0 +1,362 @@ +import unittest +import copy + +from series import Series +from series import seriesFromBranchname +from datetime import datetime +from dateutil.relativedelta import relativedelta +from vers import Spin +from vers import Rc + + +# Progression: +# The Linaro release progression follows. +# +# Snapshot 2016.01 +# spin 1 2016.01-1 +# spin 2 2016.01-2 +# Candidate 2016.01-rc1 +# rc 2 2016.01-rc2 +# Release 2016.01 +# +# Note: Remember, if a snapshot is respun to fix a +# bug then the fix should be cherry-picked into the +# release branch and a release-candidate spun from +# from the release branch. +# +# Candidate 2016.01-1-rc1 +# rc 2 2016.01-1-rc2 +# Release 2016.01-1 + +# Inherit Series in order to provide Linaro specific rules on transitions +# between series types. The baseclass Series doesn't have the toNext* +# functions defined. +class LinaroSeries(Series): + def __init__(self, seriestype, vendor=None, package=None, date=datetime.today(), spin=None, rc=None, strict=True): + + # This is a dispatch table so that we can use a 'toNext' function based + # on an input type and it will call the correct toNextFoo function. + # This will work if the key is an integer or a string. Ideally this + # would be added to Series and we wouldn't need to derive, but I + # couldn't figure out how to get the pointer tables correct. + self.dispatchnext = { + Series.series.index("candidate"): self.toNextCandidate, + 'candidate': self.toNextCandidate, + Series.series.index("snapshot"): self.toNextSnapshot, + 'snapshot': self.toNextSnapshot, + Series.series.index("release"): self.toNextRelease, + 'release': self.toNextRelease, + } + + super(LinaroSeries,self).__init__(seriestype, vendor, package, date,spin,rc, strict) + + def toNextCandidate(self, date=None, strict=False): + if self.seriestype < 0 or self.seriestype >= len(Series.series): + raise TypeError('toNextCandidate on an unknown series type.') + + candidate=copy.deepcopy(self) + candidate.seriestype = Series.series.index("candidate") + + if date: + if not isinstance(date,datetime): + raise TypeError('date is not of type datetime.') + candidate.date=date + + if self.seriestype == Series.series.index("candidate"): + candidate.rc.increment() + elif self.seriestype == Series.series.index("release"): + rc=Rc(1) + candidate.rc=rc + candidate.spin.increment() + elif self.seriestype == Series.series.index("snapshot"): + spin=Spin(None) + rc=Rc(1) + candidate.rc=rc + candidate.spin=spin + + # If the user hasn't specified a date, the implicit behavior is to + # increment the date by 1 month when moving from a snapshot to a + # candidate. + if not date: + candidate.incrementMonth() + # If the user did specify a date and we're in strict mode we need to + # verify that the date they chose is exactly one month difference. + elif strict: + if candidate.date != self.date + relativedelta(months=1): + raise ValueError('toNextCandidate invoked with strict=True. Snapshot date to candidate date can only be +1 month difference.') + return candidate + + if strict and candidate.date != self.date: + raise ValueError('Candidate date can only change if current series is a Snapshot when toNextCandidate is invoked with strict=True.') + + return candidate + + def toNextRelease(self, date=None, strict=False): + if self.seriestype < 0 or self.seriestype >= len(Series.series): + raise TypeError('toNextRelease called on an unknown series type.') + elif self.seriestype == Series.series.index("release"): + raise TypeError('A release series can not be the basis for another release. Move to a release candidate first.') + elif self.seriestype == Series.series.index("snapshot"): + raise TypeError('A snapshot series can not be the basis for a release. Move to a release candidate first.') + + # Only a candidate can turn into a release. + release=copy.deepcopy(self) + # if we have a candidate and ask for a release rc needs to be None. + rc=Rc() + release.rc=rc + release.seriestype = Series.series.index("release") + + if date: + if not isinstance(date,datetime): + raise TypeError('date is not of type datetime.') + release.date=date + + if strict and release.date != self.date: + raise ValueError('Release date can not change as toNextRelease was invoked with strict=True') + + return release + + def toNextSnapshot(self, date=None, strict=False): + if self.seriestype < 0 or self.seriestype >= len(Series.series): + raise TypeError('toNextRelease called on an unknown series type.') + elif self.seriestype == Series.series.index("candidate"): + raise TypeError('Series of type candidate can not be the basis for a snapshot.') + if self.seriestype == Series.series.index("release"): + raise TypeError('Series of type release can not be the basis for a snapshot.') + + # Only snapshots can be snapshots next. + snapshot=copy.deepcopy(self) + snapshot.seriestype = Series.series.index("snapshot") + snapshot.spin.increment() + + if date: + if not isinstance(date,datetime): + raise TypeError('date is not of type datetime.') + snapshot.date=date + + if strict and snapshot.date != self.date: + raise ValueError('Snapshot date can not change as toNextSnapshot was invoked with strict=True') + + return snapshot + + # toNext will take a 'key' as either a string representing the + # Series.series type or the index of the Series.series type, e.g., + # 'candidate' or Series.series.index("candidate") and it will forward + # to the correct toNext* function as mapped in the dispatchnext + # dictionary. + def toNext(self, seriestype, date=None, strict=False): + return self.dispatchnext[seriestype](date, strict) + +# Helper function which creates a LinaroSeries from a properly formed branch name +# input string. +def linaroSeriesFromBranchname(branch=None): + + # Create a Series using the helper function and then populate a + # LinaroSeries. TODO: There's probably a better way to do this such + # as a SeriesFactory that is overridden. + series=seriesFromBranchname(branch) + linaroseries=LinaroSeries(Series.series[series.seriestype], vendor="linaro", package=series.package ,date=series.date , spin=series.spin, rc=series.rc, strict=True ) + return linaroseries + +class TestLinaroSeries(unittest.TestCase): + + def test_candidate_to_candidate(self): + candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + candidate2=candidate.toNextCandidate() + self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.05-1-rc2") + self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) + self.assertEqual(candidate.date, candidate2.date) + + #TODO test date and strict=True + + def test_release_to_candidate(self): + # Test where spin is None and needs to be incremented. + release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15)) + candidate=release.toNextCandidate() + self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") + self.assertEqual(candidate.seriestype, Series.series.index("candidate")) + self.assertEqual(candidate.date, release.date) + + # Now test where spin is already non-zero. + release2=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + candidate2=release2.toNextCandidate() + self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.05-2-rc1") + self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) + + #TODO test date and strict=True + + def test_snapshot_to_candidate(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") + candidate=snapshot.toNextCandidate() + self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.06-rc1") + self.assertEqual(candidate.seriestype, Series.series.index("candidate")) + self.assertNotEqual(candidate.date, snapshot.date) + # Verify that it's one month apart. + self.assertEqual(candidate.date, snapshot.date + relativedelta(months=1)) + + # Make sure that the default date increment is just one month. + candidate2=snapshot.toNextCandidate(strict=True) + self.assertEqual(candidate2.date, snapshot.date + relativedelta(months=1)) + + # whether strict=True or not. + candidate3=snapshot.toNextCandidate(strict=False) + self.assertEqual(candidate3.date, snapshot.date + relativedelta(months=1)) + + # If the user passed a date and strict=yes, make sure the increment is only one month. + candidate4=snapshot.toNextCandidate(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) + plusonem=snapshot.date + relativedelta(months=1) + self.assertEqual(candidate4.date,plusonem) + + # Make sure the date can be set without error when strict is False + candidate5=snapshot.toNextCandidate(date=datetime.strptime("2016.07.15", "%Y.%m.%d"), strict=False) + self.assertEqual(candidate5.date, snapshot.date + relativedelta(months=2)) + + with self.assertRaises(ValueError): + # Attempting to increment the date when strict=True should raise an exception. + candidate6=snapshot.toNextCandidate(date=datetime.strptime("2016.07.15", "%Y.%m.%d"), strict=True) + + def test_unknown_to_candidate(self): + unknown=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + # Purposely override with an incorrect seriestype. + unknown.seriestype=99 + with self.assertRaises(TypeError): + candidate=unknown.toNextCandidate() + + def test_candidate_to_release(self): + candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + release=candidate.toNextRelease() + self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1") + self.assertEqual(release.seriestype, Series.series.index("release")) + self.assertEqual(candidate.date, release.date) + + release2=candidate.toNextRelease(strict=True) + self.assertEqual(release2.getBranchname(), "releases/linaro-5.3-2016.05-1") + self.assertEqual(release2.date, candidate.date) + + with self.assertRaises(ValueError): + # Attempting to increment the date when strict=True should raise an exception. + release3=candidate.toNextRelease(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) + + # If strict=False and the date is changed, go ahead and change it. + release4=candidate.toNextRelease(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=False) + self.assertEqual(release4.getBranchname(), "releases/linaro-5.3-2016.06-1") + self.assertNotEqual(release4.date, release.date) + + with self.assertRaises(TypeError): + release5=candidate.toNextRelease("2016.06", strict=False) + + def test_snapshot_to_release(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") + with self.assertRaises(TypeError): + release=snapshot.toNextRelease() + + def test_release_to_release(self): + release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") + with self.assertRaises(TypeError): + release2=release.toNextRelease() + + def test_unknown_to_release(self): + unknown=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + # Purposely override with an incorrect seriestype. + unknown.seriestype=99 + with self.assertRaises(TypeError): + release=unknown.toNextRelease() + + def test_candidate_to_snapshot(self): + candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + with self.assertRaises(TypeError): + snapshot=candidate.toNextSnapshot() + # This would/should be the case if this wasn't and invalid option. + # self.assertEqual(snapshot.seriestype, "snapshot") + + def test_release_to_snapshot(self): + release=LinaroSeries("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + with self.assertRaises(TypeError): + snapshot=release.toNextSnapshot() + + def test_snapshot_to_snapshot(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6") + snapshot2=snapshot.toNextSnapshot() + + self.assertEqual(snapshot2.getBranchname(), "snapshots/linaro-5.3-2016.05-7") + self.assertEqual(snapshot2.seriestype, Series.series.index("snapshot")) + self.assertEqual(snapshot.date, snapshot2.date) + + # Verify that the original hasn't changed. This verifies that a + # deepcopy is used on the toNext* functions. + self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-6") + + snapshot3=snapshot.toNextSnapshot(strict=True) + self.assertEqual(snapshot3.getBranchname(), "snapshots/linaro-5.3-2016.05-7") + self.assertEqual(snapshot3.seriestype, Series.series.index("snapshot")) + self.assertEqual(snapshot.date, snapshot3.date) + + with self.assertRaises(ValueError): + # Attempting to increment the date when strict=True should raise an exception. + snapshot4=snapshot.toNextSnapshot(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=True) + + # If strict=False and the date is changed, go ahead and change it. + snapshot5=snapshot.toNextSnapshot(date=datetime.strptime("2016.06.15", "%Y.%m.%d"), strict=False) + self.assertEqual(snapshot5.getBranchname(), "snapshots/linaro-5.3-2016.06-7") + self.assertNotEqual(snapshot5.date, snapshot.date) + + # Verify that a date as a string generates an exception. + with self.assertRaises(TypeError): + snapshot6=snapshot.toNextSnapshot("2016.06.15", strict=False) + + def test_unknown_to_snapshot(self): + unknown=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + # Purposely override with an incorrect seriestype. + unknown.seriestype=99 + with self.assertRaises(TypeError): + snapshot=unknown.toNextSnapshot() + + def test_snapshot_toNext(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-1") + + candidate=snapshot.toNext("candidate") + # A Snapshot -> Candidate traversal always results in spin being set to zero + # and the first release candidate being created. + self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.06-rc1") + self.assertEqual(candidate.seriestype, Series.series.index("candidate")) + + # Make sure toNext works with 'index' as well as the type string. + candidate2=snapshot.toNext(Series.series.index("candidate")) + # A Snapshot -> Candidate traversal always results in spin being set to zero + # and the first release candidate being created. + self.assertEqual(candidate2.getBranchname(), "releases/linaro-5.3-2016.06-rc1") + self.assertEqual(candidate2.seriestype, Series.series.index("candidate")) + + with self.assertRaises(TypeError): + candidate=snapshot.toNext("release") + candidate=snapshot.toNext(Series.series.index("release")) + + def test_toNext_incorrect_keys(self): + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-1") + with self.assertRaises(KeyError): + candidate=snapshot.toNext("candidat") + candidate=snapshot.toNext("snapsho") + candidate=snapshot.toNext("releas") + candidate=snapshot.toNext(-1) + candidate=snapshot.toNext(len(Series.series)+1) + # Index out of range of the Series.series array. + + # Test to see if the objects returned from the toNext* functions are + # new instances and not references to the input Series. + def test_toNextFOO_return_new_objects(self): + candidate=LinaroSeries("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + candidate2=candidate.toNext("candidate") + release=candidate.toNext("release") + + self.assertIsNot(candidate,candidate2) + self.assertIsNot(candidate,release) + + snapshot=LinaroSeries("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1") + snapshot2=snapshot.toNext("snapshot") + self.assertIsNot(snapshot,snapshot2) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() @@ -0,0 +1,117 @@ +import unittest +import logging +import os +import tempfile + +# Execute shell commands with the 'sh' package. +from sh import rm + +# TODO: Take an existing directory as an input, which allows the benefits of a +# Proj without having to create the directory. BUT make sure that +# persist=True is forced in such cases. + +# This class represents the project temporary directory whose lifetime is +# managed. It is not persistent by default. +class Proj(object): + # @ persist - True, False or None. Only True persists. + # @ prefix - Optional prefix appended to projdir directory name. + def __init__(self, prefix="", persist=None): + # This allows string or bool inputs. + if str(persist).lower() == "true": + self.persist=True + longevity="persistent" + else: + self.persist=False + longevity="temporary" + + #TODO: Take an option variable to pass to mkdtemp to collect all + # instances of Proj in to for testing purposes. + self.projdir=unicode(tempfile.mkdtemp(prefix=prefix), "utf-8") + logging.info("Created %s project directory in %s" % (longevity, self.projdir)) + + def cleanup(self): + #FIXME: Only rm -rf if the directory exists. Make sure it's not / or /home +# #TODO: make sure that persisdir isn't a mount-bind to /home + if not self.persist and self.projdir != "/" and self.projdir != "/home": + logging.info("Removing project dir %s" % self.projdir) + rm("-rf", "--preserve-root", self.projdir) + self.projdir=None + else: + logging.info("Project dir %s will persist" % self.projdir) + + +class TestProj(unittest.TestCase): + + def setUp(self): + self.persistdir="" + + def test_create(self): + self.proj=Proj() + self.assertNotEqual(self.proj.projdir, "") + + def test_create_dir(self): + self.proj=Proj() + self.assertTrue(os.path.isdir(self.proj.projdir)) + + def test_create_with_prefix(self): + self.proj=Proj("testprefix") + self.assertTrue("testprefix" in self.proj.projdir) + + def test_create_with_persist_bool(self): + self.proj=Proj(persist=True) + self.assertTrue(self.proj.persist) + self.persistdir=self.proj.projdir + + def test_create_with_persist_str(self): + self.proj=Proj(persist="true") + self.assertTrue(self.proj.persist) + self.persistdir=self.proj.projdir + + def test_create_with_persist_false_str(self): + self.proj=Proj(persist="false") + self.assertFalse(self.proj.persist) + # Just in-case it wasn't set properly + self.persistdir=self.proj.projdir + + def test_create_with_persist_false_bool(self): + self.proj=Proj(persist=False) + self.assertFalse(self.proj.persist) + # Just in-case it wasn't removed properly + self.persistdir=self.proj.projdir + + def test_create_with_persist_capstr(self): + self.proj=Proj(persist="TRUE") + self.assertTrue(self.proj.persist) + self.persistdir=self.proj.projdir + + def test_cleanup(self): + self.proj=Proj() + projdir=self.proj.projdir + self.proj.cleanup() + # Verify that the directory has been removed. + self.assertFalse(os.path.isdir(projdir)) + + def test_cleanup_with_persist(self): + self.proj=Proj(persist=True) + projdir=self.proj.projdir + self.proj.cleanup() + self.assertTrue(os.path.isdir(projdir)) + self.persistdir=self.proj.projdir + + def test_cleanup_with_persist_and_prefix(self): + self.proj=Proj(persist=True, prefix="thingy") + projdir=self.proj.projdir + self.proj.cleanup() + self.assertTrue(os.path.isdir(projdir)) + self.persistdir=self.proj.projdir + + def tearDown(self): + # if persistdir is set and isdir then remove it or we'll pollute /tmp. + if os.path.isdir(self.persistdir): + if self.persistdir != "/" and self.persistdir != "/home": + rm("-rf", "--preserve-root", self.persistdir) + + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() @@ -0,0 +1,135 @@ +# We use argparse rather than getopt because it allows us to setup a parser as +# a separate object and then we can test input combination with an external +# python script that uses the unittest module. +import argparse +import logging +import os +import sys +# handle_exit is a context manager which guarantees that the project temporary +# directories are cleaned up when there are signals. +from handle_exit import handle_exit + +from datetime import datetime + +from series import Series +from gccclone import GCCClone +from clone import Clone +from linaroseries import LinaroSeries +from linaroseries import linaroSeriesFromBranchname +from proj import Proj +from gitrepo import cd +from template import TemplateRN +from rnseries import CandidateRN +from rngen import rngen + +rnProj=[] + +# Cleanup the temporary project directory if necessary. +def rncleanup(): + if not rnProj: + # This will be the case if the script is run via the test driver. + print "No cleanup needed" + else: + rnProj[0].cleanup() + +def generate(track, todate, toseries, gccsource): + # Delay creating the Proj directory until now so that the parser (and + # parser validation functions) can be tested in the unittests without + # invoking the internals of this driver. + rnProj.append(Proj(prefix='rn', persist=True)) + print "proj dir is: " + rnProj[0].projdir + + # This will raise an exception if gccsource is not a git repository. + gccclone=GCCClone(rnProj[0], clonedir=gccsource) + + #use gccsource to figure out the GCC Base Version and the FSF Version from the git commit history. + print 'gccbaseversion is ' + gccclone.get_base_version() + print 'fsf revision is ' + gccclone.get_fsf_revision() + + trackSeries=linaroSeriesFromBranchname(track) + + #rnclone=Clone(rnProj[0], remote=u'http://git.linaro.org/toolchain/release-notes.git') + rnclone=Clone(rnProj[0], remote=u'ssh://git@git.linaro.org/toolchain/release-notes.git') + + # TODO: Check toseries to create the right *RN object. + nexRN=CandidateRN(rnProj[0], rnrepo=rnclone.clonedir(), trackseries=trackSeries) + + nextSeries=nexRN.get_next_series() + + if todate != nextSeries.date: + raise RuntimeError('The date passed to this driver does not equal the date computed by LinaroSeries.toNext()') +# print "toNext: " + nextSeries.getBranchname() +# print "toNext: " + format(nextSeries, '%N/%d') + + # TODO: What if 'track' is not a snapshot branch? To do this right we'd + # probably have to preserve basedon information in the release notes + # repository as a yaml file. +# basedon='http://snapshots.linaro.org/components/toolchain/gcc-linaro/' +# basedon=basedon + trackSeries.getDir() +# print "basedon: " + basedon + + rngen(nexRN, gccclone) + +# print 'gccbaseversion is ' + gccclone.get_base_version() +# print 'fsf revision is ' + gccclone.get_fsf_revision() + + # TODO: After the release notes are generated, the generated files need + # to be added, commit, and pushed. + # These changes are on the CandidateRN workdir branch. + +# Verify that the GCC Source is located where it says it is. +class VerifyGCCSourceAction(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + if nargs is not None: + raise ValueError("nargs not allowed") + super(VerifyGCCSourceAction, self).__init__(option_strings, dest, **kwargs) + def __call__(self, parser, namespace, values, option_string=None): + print('%r %r %r' % (namespace, values, option_string)) + setattr(namespace, self.dest, values) + print "gccsource: " + values + # We simply want to test that the directory exists. We'll prove that + # it is a git directory in a later step. + if not os.path.isdir(values): + sys.exit(2) + +def str_to_datetime(datestr): + # strptime will throw an exception if the input date is not a string or is + # not parsable. + if len(datestr) < 10: + inputdate=datetime.strptime(datestr, "%Y.%m") + else: + inputdate=datetime.strptime(datestr, "%Y.%m.%d") + # Force the 'day' to 15 to unify comparisons. + return inputdate.replace(day=15) + +def create_parser(): + parser = argparse.ArgumentParser( + prog="rn.py", + + description='''Generate release notes.''' + ) + + # Positionals are required by default. + parser.add_argument('track', help='branchname of series to track.') + + parser.add_argument('-g', '--gccsource', dest='gccsource', required=True, action=VerifyGCCSourceAction, help='location of the gcc source') + + parser.add_argument('-d', '--date', dest='todate', required=True, help='the next series date in "YYYY.MM" form.', type=str_to_datetime) + + # At least one of the following arguments are required. + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument('-c', '--candidate', dest='toseries', action='store_const', const=LinaroSeries.series.index("candidate")) + group.add_argument('-r', '--release', dest='toseries', action='store_const', const=LinaroSeries.series.index("release")) + group.add_argument('-s', '--snapshot', dest='toseries', action='store_const', const=LinaroSeries.series.index("snapshot")) + + return parser + +def main(): + parser=create_parser() + args = parser.parse_args() + generate(args.track, args.todate, args.toseries, args.gccsource) + +if __name__ == '__main__': +# logging.basicConfig(level="INFO") + with handle_exit(rncleanup): + main() diff --git a/rngen.py b/rngen.py new file mode 100644 index 0000000..004ad4d --- /dev/null +++ b/rngen.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python +# vim: set tw=78 expandtab tabstop=4 shiftwidth=4 autoindent smartindent: + +import csv +import sys, getopt +reload(sys) +sys.setdefaultencoding('utf8') + +# For fileIO +import os.path + +# Generate html files from textile files. +import textile + +# Generate text files from html. +#from BeautifulSoup import BeautifulSoup +from bs4 import BeautifulSoup + +# Jinja2 is the templating engine. +from jinja2 import Template, FileSystemLoader, Environment + +from custom_wordwrap import do_customwordwrap + +from cStringIO import StringIO + +from datetime import datetime + +from shutil import copyfile + +# To test if a remote file exists. +import urllib2 + +from series import Series +from linaroseries import LinaroSeries +from rnseries import CandidateRN +from gccclone import GCCClone + +from rninput import finput + +def get_gcc_version(gccclone): + dct= {} + (dct["major"],dct["minor"],dct["revision"]) = gccclone.get_base_version().split(".") + return dct + +def rngen(candidate, gccclone): + #TODO Test that candidate is a CandidateRN + + trackseries=candidate.get_track_series() + nextseries=candidate.get_next_series() + + print "toNext: " + nextseries.getBranchname() + print "toNext: " + format(nextseries, '%N/%d') + + # TODO: What if 'track' is not a snapshot branch? To do this right we'd + # probably have to preserve basedon information in the release notes + # repository as a yaml file. + basedon='http://snapshots.linaro.org/components/toolchain/gcc-linaro/' + basedon=basedon + trackseries.getDir() + print "basedon: " + basedon + basis_url=basedon + +# print 'gccbaseversion is ' + gccclone.get_base_version() +# dct= {} +# (dct["major"],dct["minor"],dct["revision"]) = gccclone.get_base_version().split(".") +# print 'fsf revision is ' + gccclone.get_fsf_revision() + ver=get_gcc_version(gccclone) + print 'gcc major ' + ver['major'] + print 'gcc minor ' + ver['minor'] + + fsf=gccclone.get_fsf_revision() + + # The version numbering scheme changed with GCC 5. Now, every new release + # has the major version incremented. Prior to version five the minor was + # incremented per-release. + # TODO: read stable and maintenance information from a config file. + if int(ver['major']) <= 4: + gcc_version_path= u'' + ver['major'] + '.' + ver['minor'] + stab_maint='Maintenance' + else: + gcc_version_path=ver['major'] + stab_maint='Stable' + + print "This is a %s candidate for GCC version %s" % (stab_maint, + gcc_version_path) + + history=finput('Please enter the location of the changelog csv file: ', "6.1-2016.06.csv") + + print "using the following .csv file: " + history + + #month_year=format(nextseries, '%D') + month_year=nextseries.date + + outfile=u'GCC-' + ver['major'] + u'.' + ver['minor'] + u'-' + month_year.strftime("%Y.%m") + +# if spin != "": +# outfile=outfile + u'-' + spin +# +# if rc != "": +# outfile=outfile + u'-rc' + rc + outfile=outfile + str(nextseries.spin) + str(nextseries.rc) + print "outfile: " + outfile + + with open(history, mode='r') as backports: + backports.seek(0) + backports_content = backports.read() + in_memory_backports = StringIO(backports_content) + + backportsDict = csv.DictReader(in_memory_backports, fieldnames=['rev', 'fsf', 'linaro', 'summary', 'target', 'type', 'notes', 'backport', 'owner', 'author'], delimiter=',') + backports.close(); + + release_type=nextseries.longuppertype() + + spin=str(nextseries.spin).strip('-') + rc=str(nextseries.rc).strip("-rc") + + template_dir=candidate.rntemplate.workdir() + series_dir=candidate.rnworkdir.workdir() + + # Verify that the basis url is actually valid, otherwise the release notes + # will contain erroneous information. + url=urllib2 + try: + ret = urllib2.urlopen(basis_url) + except url.URLError as e: + print "%s does not exist!" % basis_url + exit(1) + + # TODO verify that the basis_url is a valid url and warn if not. + basis=u'' + basis_ver=basis_url.strip("/") + basis_ver=basis_ver.rsplit("/",1)[1] + if basis_url.find("releases") == 1: + basis="Linaro Release GCC " + basis_ver + elif basis_url.find("-rc") == 1: + basis="Linaro Release-Candidate GCC " + basis_ver + else: + basis="Linaro Snapshot GCC " + basis_ver + + release_string="Linaro GCC %s.%s-%s.%s %s" % (ver['major'],ver['minor'],month_year.strftime("%Y"), month_year.strftime("%m"),release_type) + + print "The release notes for this %s will be generated based on: %s" % (release_string, basis) + + # Get ready to do the rendering of the jinja2 templates into .textile + # format. + + # relative path to the README.textile templates. + src_readme_path=u'/components/toolchain/gcc-linaro/5/README.textile' + bin_readme_path=u'/components/toolchain/binaries/README.textile' + + # absolute path to README.textile templates. + template_src_readme=template_dir + src_readme_path + template_bin_readme=template_dir + bin_readme_path + + # relative path to the README.textile.series templates. + # Note, the src templates don't have an extends (.series) file. + src_extends_path=u'/components/toolchain/gcc-linaro/5/README.textile' + bin_extends_path=u'/components/toolchain/binaries/README.textile.series' + + # absolute path to README.textile.series template overrides. These are + # what are passed to the jinja2 rendering engine because they might + # override the default templates with version specific information. + series_src_extends=series_dir + src_extends_path + series_bin_extends=series_dir + bin_extends_path + + # These are the README.textile files that exist in the series directory. + series_src_readme=series_dir + src_readme_path + series_bin_readme=series_dir + bin_readme_path + + # Copy the template README.textile over the series README.textile file + # because the jinja2 engine will need access to the parent template. This + # will be overwritten with the generated README.textile later. + copyfile(template_src_readme, series_src_readme) + copyfile(template_bin_readme, series_bin_readme) + + # After the script runs the *_series_template_path files are overwritten + # with the generated README.textile files. + src_readme=series_src_readme + bin_readme=series_bin_readme + + + # Setting lstrip_blocks=True lets us use indenting in jinja2 templates. + env = Environment(loader=FileSystemLoader(series_dir), lstrip_blocks=True, trim_blocks=True) + + # We need to use a custom wordwrap filter that doesn't line-break urls. + env.filters['customwordwrap'] = do_customwordwrap + + source_archive_template = env.get_template(src_extends_path) + binary_archive_template = env.get_template(bin_extends_path) + announce_template = env.get_template(bin_extends_path) + + # Generate the toolchain GCC source archive release notes. + source_textile_rendering = source_archive_template.render( + backports=backportsDict, + GCC_base_version=ver['major'] + u'.' + ver['minor'] + u'.' + ver['revision'], + FSF_rev=fsf, + basis=basis, + basis_url=basis_url, + year=month_year.strftime("%Y"), + dd_month=month_year.strftime("%m"), + rc=rc, + spin=spin, + stab_maint=stab_maint, + release_type=release_type, + package="GCC", + major=ver['major'], + minor=ver['minor']) + + source_textile_rendering = source_textile_rendering.encode('utf-8') + + out_source=u'' + outfile + out_binary=u'Linaro-' + outfile + + try: + with open(out_source + u'.textile','w') as f: + f.write(source_textile_rendering) + print 'Wrote textile rendering to file %s' % os.getcwd() + u'/' + out_source + u'.textile' + except IOError as e: + sys.exit('Error rendering textile source archive release notes.') + + source_html_rendering = textile.textile(source_textile_rendering) + + try: + with open(out_source + u'.html','w') as f: + f.write(source_html_rendering) + print 'Wrote html rendering to file %s' % os.getcwd() + u'/' + out_source + u'.html' + except IOError as e: + sys.exit('Error rendering html release notes.') + + # Generate the binary toolchain archive release notes. + binary_textile_rendering = binary_archive_template.render( + announce='', + year=month_year.strftime("%Y"), + dd_month=month_year.strftime("%m"), + rc=rc, + spin=spin, + major=ver['major'], + minor=ver['minor'], + stab_maint=stab_maint, + snap_rc_release=release_type) + + binary_textile_rendering = binary_textile_rendering.encode('utf-8') + try: + with open(out_binary + u'.textile','w') as f: + f.write(binary_textile_rendering) + print 'Wrote textile rendering to file %s' % os.getcwd() + u'/' + out_binary + u'.textile' + except IOError as e: + sys.exit('Error rendering textile binary archive release notes.') + + binary_html_rendering = textile.textile(binary_textile_rendering) + + try: + with open(out_binary + u'.html','w') as f: + f.write(binary_html_rendering) + print 'Wrote html rendering to file %s' % os.getcwd() + u'/' + out_binary + u'.html' + except IOError as e: + sys.exit('Error rendering html release notes.') + + # Generate the binary toolchain announce message. + announce_rendering = announce_template.render( + announce='yes', + year=month_year.strftime("%Y"), + dd_month=month_year.strftime("%m"), + rc=rc, + spin=spin, + major=ver['major'], + minor=ver['minor'], + stab_maint=stab_maint, + snap_rc_release=release_type) + + announce_rendering = announce_rendering.encode('utf-8') + announce_file=u'announce-' + out_binary + u'.txt' + try: + with open(announce_file,'w') as f: + f.write(announce_rendering) + print 'Wrote text rendering to file %s' % os.getcwd() + u'/' + announce_file + except IOError as e: + sys.exit('Error rendering text announce message.') + + + # Copy the generated .textile file over the + # readme_series_dir/README.textile template file. + # WARNING: This must be done AFTER you generate the announce message + # because it uses the same README.textile template as the textile rendered + # release notes. + copyfile(out_binary + u'.textile' ,bin_readme) + print 'Wrote generated file ' + out_binary + u'.textile' + " to " + bin_readme + + # Copy the generated .textile file over the + # readme_series_dir/README.textile template file. + copyfile(out_source + u'.textile' ,src_readme ) + print 'Wrote generated file ' + out_source + u'.textile' + " to " + src_readme + + candidate.rnworkdir.add(bin_readme) + candidate.rnworkdir.add(src_readme) + candidate.rnworkdir.commit("Generated Release Notes for " + nextseries.label()) + candidate.rnworkdir.log(1) + + # TODO: Ask if the user would like to push the release notes changes. + + quit(0); + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/rninput.py b/rninput.py new file mode 100644 index 0000000..86d3d90 --- /dev/null +++ b/rninput.py @@ -0,0 +1,35 @@ +# For fileIO +import os.path + +# @message - The question to display to the user. +# @default - The default if <enter> is hit. +# @accept - The list of valid responses. +# @retry - Whether to retry on a malformed input. +# Returns 'y' or 'no' +def yninput(message, default="n", accept=['yes', 'y', 'no', 'n'], retry=True): + # TODO: Test this + if default.lower() not in accept: + raise TypeError('Default as %s is not in list of accepted responses.' % default) + + default_msg=" [y/N]: " + if default.lower() == "y" or default.lower() == "yes": + default_msg=" [Y/n]: " + + while(1): + answer=raw_input(message + default_msg) or default.lower() + if answer.lower() not in accept and retry: + print "'%s' is an invalid response. Please try again." % answer.lower() + continue + else: + if answer.lower() == "yes" or answer.lower() == "y": + return "y" + return "n" + +# TODO: Test with a directory returned as the answer. +def finput(message,orval): + while(1): + answer=raw_input(message) or orval + if os.path.exists(answer) and os.path.isfile(answer): + return answer + + print "%s doesn't exist or isn't a regular file. Try again." % answer diff --git a/rnseries.py b/rnseries.py new file mode 100644 index 0000000..1b02373 --- /dev/null +++ b/rnseries.py @@ -0,0 +1,193 @@ +import unittest +import logging +import os + +from proj import Proj +from sh import ls +from shutil import copyfile + +from gitrepo import cd +from clone import Clone +from workdir import Workdir +from series import Series +from series import seriesFromBranchname +from linaroseries import LinaroSeries +from linaroseries import linaroSeriesFromBranchname +from template import TemplateRN + +from rninput import yninput + +# Abstract base class for a release-notes series +# Every RNSeries has a templateRN instance and a workdir. The template +# instance is used for the base template. +class RNSeries(object): + #rnremote=u'http://git.linaro.org/toolchain/release-notes.git' + # use ssh:// so that we can push to the remote. + rnremote=u'ssh://git@git.linaro.org/toolchain/release-notes.git' + _series='components/toolchain/binaries/README.textile.series' + # @rnrepo - path to the existing releases notes repository if there is one. + # If there isn't one a new one will be cloned. + # @trackseries - an instance of a LinaroSeries to track. + # @nextseries - an instance of a LinaroSeries that is the next series. + def __init__(self, proj, rnrepo=None, trackseries=None, nextseries=None): + + # Create the release-notes repository clone. The Clone constructor + # will throw an exception if proj is not a Proj. That is an + # unrecoverable error so don't catch it here. The Clone constructor + # will also throw and exception if rnrepo is not a string. + + if rnrepo: + logging.info('RNSeries() clone already exists. Using existing.') + self.rnclone=Clone(proj, clonedir=rnrepo) + else: + self.rnclone=Clone(proj, remote=RNSeries.rnremote) + + # TODO: Write a testcase that exposes this. + if not trackseries: + raise TypeError("Input variable trackseries is required.") + + # TODO: Write a testcase that exposes this. + if not isinstance(trackseries, LinaroSeries): + raise TypeError("Input variable trackseries not of type Series.") + + # TODO: Write a testcase that exposes this. + if not nextseries: + raise TypeError("Input variable nextseries is required.") + + # TODO: Write a testcase that exposes this. + if not isinstance(nextseries, LinaroSeries): + raise TypeError("Input variable nextseries not of type Series.") + + self.trackseries=trackseries + self.nextseries=nextseries + + self.proj=proj + + templ_branchname=self.nextseries.shorttype() + "_" + self.nextseries.getDir() + + logging.info('RNSeries() Creating TemplateRN') + # Every release-notes series contains a 'template' workdir checked out + # into a new templ_branchname branch that tracks master. + self.rntemplate=TemplateRN(self.proj,self.rnclone, branchname_suffix=templ_branchname) + + # We do this now because the SeriesRN workdir might be branched from the template. + self.rntemplate.updateTemplateReadme() + updateseries=self.rntemplate.updateTemplateSeries() + + self.rntemplate.commit() + + print "Please verify that your changes have been committed on the template branch." + self.rntemplate.log(1) + + # It's possible that self.trackseries.getBranchname() doesn't exist + # (such is the case if we're creating the first candidate from a + # snapshot). In this case 'track' should be the local template branch + # so that we pick up the template changes. + if not self.rntemplate.branchexists(self.trackseries.getBranchname()): + print("Creating RNSeries based on branch " + self.rntemplate.getbranch()) + self.rnworkdir=Workdir(proj=self.proj, clone=self.rnclone, workdir=self.nextseries.shorttype(), track=self.rntemplate.getbranch(), branchname=self.nextseries.getBranchname()) + else: + print "You updated the template README.textile.series but this will not be reflected in the Candidate README.textile.series." + print "Creating RNSeries based on branch " + self.trackseries.getBranchname() + self.rnworkdir=Workdir(proj=self.proj, clone=self.rnclone, workdir=self.nextseries.shorttype(), track=self.trackseries.getBranchname(), branchname=self.nextseries.getBranchname()) + + print "Log of last commit on workdir branch:" + self.rnworkdir.log(1) + + print "If you would like to make changes to the series release notes please update the the Candidate README.textile.series file:" + answer=yninput("Do you need to update the Candidate README.textile.series file?", "N") + if answer == "y": + self.rnworkdir.editFile(self._series) + else: + print "The Candidate README.textile.series file has not been updated." + + self.rnworkdir.commit() + + print "Log of last commit on workdir branch:" + self.rnworkdir.log(1) + + # TODO: Don't forget to push template and workdir changes. + # self.rntemplate.pushToBranch("origin/master") + # TODO: Should the workdir know where to push to? + # self.rnworkdir.pushToBranch(self.nextseries.getBranchname()) + + def get_next_series(self): + return self.nextseries + + def get_track_series(self): + return self.trackseries + +# Do we even need these, especially if they're all the same? +class SnapshotRN(RNSeries): + def __init__(self, proj, trackseries, rnrepo=None): + if not isinstance(trackseries, LinaroSeries): + raise TypeError('The series input must be of type LinaroSeries') + + nextseries=trackseries.toNextSnapshot(strict=True) + + super(SnapshotRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=nextseries) + +class CandidateRN(RNSeries): + def __init__(self, proj, trackseries, rnrepo=None): + if not isinstance(trackseries, LinaroSeries): + raise TypeError('The trackseries input must be of type LinaroSeries') + + self.nextseries=trackseries.toNextCandidate(strict=True) + + super(CandidateRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=self.nextseries) + +class ReleaseRN(RNSeries): + def __init__(self, proj, trackseries, rnrepo=None): + if not isinstance(trackseries, LinaroSeries): + raise TypeError('The series input must be of type LinaroSeries') + + nextseries=trackseries.toNextRelease(strict=True) + + super(ReleaseRN,self).__init__(proj, rnrepo, trackseries=trackseries, nextseries=nextseries) + +class TestCandidateRN(unittest.TestCase): + #logging.basicConfig(level="INFO") + testdirprefix="CandidateRNUT" + + @classmethod + def setUpClass(cls): + cls.proj=Proj(prefix=TestCandidateRN.testdirprefix, persist=True) + logging.info('11111 Created Proj dir') + cls.rnrepo=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + logging.info('22222 Created clone dir') + cls.branch=u'snapshots/linaro-6.1-2016.06' + cls.series=linaroSeriesFromBranchname(branch=cls.branch) + + @classmethod + def tearDownClass(cls): + cls.proj.cleanup() + + def test_candidate_newclone(self): + logging.info('33333 Going To Create CandidateRN') + candidatern=CandidateRN(self.proj, rnrepo=self.rnrepo.repodir, trackseries=self.series) + logging.info('44444 Created CandidateRN') + self.assertEqual(candidatern.nextseries.longuppertype(), "Release-Candidate") + self.assertEqual(candidatern.nextseries.longlowertype(), "release-candidate") + self.assertTrue(os.path.isdir(self.proj.projdir + "/release-notes.git")) + self.assertTrue(os.path.isdir(self.proj.projdir + "/candidate")) + self.assertTrue(os.path.isdir(self.proj.projdir + "/template")) + +# def test_candidate_existingclone(self): +# # Since this reuses an existing clone of a repository the +# # release-notes.git directory should actually be a symlink. +# candidatern=CandidateRN(self.proj, trackseries=self.series, rnrepo=TestCandidateRN.rnrepo.repodir) +# self.assertEqual(candidatern.series.longuppertype(), "Release-Candidate") +# self.assertEqual(candidatern.series.longlowertype(), "release-candidate") +# self.assertTrue(os.path.islink(self.proj.projdir + "/release-notes.git")) +# self.assertTrue(os.path.isdir(self.proj.projdir + "/candidate")) +# self.assertTrue(os.path.isdir(self.proj.projdir + "/template")) +# +# self.assertEqual(candidatern.rntemplate.getbranch().split("_")[0], "template") +## self.assertTrue(candidatern.rnworkdir.getbranch().startswith("releases/")) +# +# print "Based on: %s" % self.series.getBranchname() +# print "Template Branch: %s" % candidatern.rntemplate.getbranch() +# print "Candidate Branch: %s" % candidatern.rnworkdir.getbranch() + +if __name__ == '__main__': + unittest.main() diff --git a/series.py b/series.py new file mode 100644 index 0000000..6475fe3 --- /dev/null +++ b/series.py @@ -0,0 +1,550 @@ +import unittest +import logging +import os +import uuid + +from vers import Spin +from vers import Rc +from vers import Vendor +from vers import Package + +from datetime import datetime +from dateutil.relativedelta import relativedelta + +from vers import packageFromStr + +# A Series stores the instance characteristic that describes the output +class Series(object): + + series = ['candidate', 'snapshot', 'release'] + serieslongupper = ['Release-Candidate', 'Snapshot', 'Release'] + + # @ seriestype - 'type' of either 'candidate', 'snapshot', or 'release' + # @ package - String or Package object representing the series package + # name with version string, e.g., 'GCC-5.3.1' + # @ vendor - String or Vendor object representing the vendor that is + # tagged on this series. + # @ date - String or 'datetime' object representing the YYYY.MM of this + # series. It defaults to 'today' + # @ spin - Optional Spin, str, or int + # @ rc - Optional Rc, str, or int + # This will make sure snapshot doesn't have an rc and release doesn't have + # an rc for instance. + def __init__(self, seriestype, vendor=None, package=None, date=datetime.today(), spin=None, rc=None, strict=True): + if isinstance(spin, Spin): + self.spin=spin + else: + # Spin will raise an exception if spin is the wrong type. + # A None parameter will create a Spin with an internal value of None. + self.spin=Spin(spin) + + if isinstance(rc, Rc): + self.rc=rc + else: + # Rc will raise an exception if rc is the wrong type. A None parameter + # will create an Rc with an internal value of None. + self.rc=Rc(rc) + + if isinstance(date, datetime): + # Force all of the days of the month to 15 to unify comparisons. + date.replace(day=15) + self.date=date + else: + # 'date' is a string. If the input date can't be parsed by datetime + # it will throw an exception. We can't recover from it so just pass + # it up. + if len(date) < 10: + tmpdate=datetime.strptime(date, "%Y.%m") + else: + tmpdate=datetime.strptime(date, "%Y.%m.%d") + # Force all of the days of the month to 15 to unify comparisons. + self.date=tmpdate.replace(day=15) + + # If the input vendor=None just instantiate the default vendor. + if not vendor: + self.vendor=Vendor() + else: + self.vendor=vendor + + if not package: + raise TypeError('Series requires an input package.') + elif isinstance(package, Package): + self.package=package + elif isinstance(package, basestring): + self.package=packageFromStr(package) + else: + # There can't be a defaut package because it requires a version. + raise TypeError("Series 'package' unrecognized type " + str(type(package))) + + # We might want to uniquely identify a particular Series object. + self.uniqueid=str(uuid.uuid4()) + + # We store a seriestype as an integer into an enumeration array + # so that the names can be changed as desired. + try: + self.seriestype = Series.series.index(seriestype.lower()) + except ValueError: + self.seriestype = -1 # no match + raise TypeError('Invalid series type %s.' % seriestype) + + # rc can only be non-None for release-candidates. + if strict: + if self.seriestype == Series.series.index("snapshot"): + if self.rc.val != 0: + raise ValueError('A snapshot series cannot have an rc.') + elif self.seriestype == Series.series.index("release"): + if self.rc.val != 0: + raise ValueError('A release series cannot have an rc.') + + + if self.seriestype == Series.series.index("candidate") and self.rc.val is 0: + raise TypeError('A candidate series must have an rc specified.') + + # namespace/vendor-package-version-YYYY.MM-spin-rc + self.fmt = { + '%N': self.getNamespace, + '%L': self.serieslabel, + '%P': self._getPackageName, + '%p': self._getPackageNameLower, + '%V': self._getVendor, + '%v': self._getVendorLower, + '%E': self._getVersion, + '%D': self._getYYYYMM, + '%d': self.getDir, + '%S': self._getSpin, + '%R': self._getRc, + } + + def _getPackageName(self): + return self.package.pv + + def _getPackageNameLower(self): + return self.package.lower() + + def _getVendor(self): + return self.vendor.__str__() + + def _getVendorLower(self): + return self.vendor.lower() + + def _getVersion(self): + return self.package.version.strfversion("%M%m") + + def _getYYYYMM(self): + return self.date.strftime("%Y.%m") + + def _getSpin(self): + return str(self.spin).strip('-') + + def _getRc(self): + return str(self.rc).strip('-') + + def __format__(self,format_spec): + ret='' + # Iterate across the dictionary and for each key found and invoke the + # function. + # self.fmt[idx]() + import string + ret=format_spec + for key in self.fmt.iterkeys(): + # TOOD: Crap this isn't going to work because each function takes different parameters. + replac=self.fmt[key]() + ret=string.replace(ret,key,replac) + return ret + + def serieslabel(self): + # Only the 'snapshot-' is used in a label. + if self.seriestype == Series.series.index("snapshot"): + return Series.series[self.seriestype] + u'-' + else: + return u'' + + def shorttype(self): + return Series.series[self.seriestype] + + def longlowertype(self): + return Series.serieslongupper[self.seriestype].lower() + + def longuppertype(self): + return Series.serieslongupper[self.seriestype] + + def __str__(self): + return Series.series[self.seriestype] + "_" + self.uniqueid + + def label(self): + label=str(self.vendor) + u'-' + self.serieslabel() + str(self.package) + label=label + u'-' + self.date.strftime("%Y.%m") + label=label + str(self.spin) + str(self.rc) + return label + + def incrementMonth(self): + self.date = self.date + relativedelta(months=1) + + # TODO: provide a format function which can change the separator and + # capitalization, etc. + + def getDir(self): + dirstr=self.package.version.strfversion("%M%m") + dirstr=dirstr + u'-' + self.date.strftime("%Y.%m") + dirstr=dirstr + str(self.spin) + dirstr=dirstr + str(self.rc) + return dirstr + + def getNamespace(self): + namespace=u'releases' + if self.seriestype == Series.series.index("snapshot"): + namespace=u'snapshots' + return namespace + + # TODO: Document what a conformant branch name looks like. + # Return a conformant branch name from the Series information. + def getBranchname(self): + branchname=u'' + if self.seriestype == Series.series.index("snapshot"): + branchname=branchname + u'snapshots/' + else: + branchname=branchname + u'releases/' + + branchname=branchname + self.vendor.lower() + branchname=branchname + u'-' + self.package.version.strfversion("%M%m") + branchname=branchname + u'-' + self.date.strftime("%Y.%m") + branchname=branchname + str(self.spin) + branchname=branchname + str(self.rc) + + return branchname + +# Helper function which creates a Series from a properly formed branch name +# input string. +def seriesFromBranchname(branch=None): + if not isinstance(branch, basestring): + raise TypeError('seriesFromBranchname requires a basestring as input') + + if not branch: + raise ValueError('seriesFromBranchname requires a non-empty string as input') + + # Get the part before the '/'. That's the namespace + try: + namespace=branch.rsplit('/', 1)[0] + except IndexError: + raise RuntimeError('string must be <namespace>/<everything_else>') + + # Get everything after the '/'. + try: + seriesstr=branch.rsplit('/', 1)[1] + except IndexError: + raise RuntimeError('string must be <package_name>-<package_version>') + + if namespace == u'': + raise TypeError("Couldn't parse a namespace from input string") + if seriesstr == u'': + raise TypeError("Couldn't parse a series from input string") + + keys=['vendor', 'version', 'date', 'spin', 'rc'] + values=seriesstr.split('-') + dictionary=dict(zip(keys, values)) + + # if there is no spin but there is an 'rcX' in the spin key:value pair + # it means that there's really no spin but should be in the rc key:value + # pair. + if dictionary.has_key("spin") and "rc" in dictionary["spin"]: + dictionary["rc"]=dictionary["spin"] + dictionary.pop("spin", None) + + # We need to have None fields in the missing keys for when we call the + # Series constructor. + if not dictionary.has_key("rc"): + dictionary["rc"]=None + else: + # strip the "rc" and just leave the int. + if dictionary["rc"].startswith("rc"): + dictionary["rc"]=dictionary["rc"][2:] + + # We need to have None fields in the missing keys for when we call the + # Series constructor. + if not dictionary.has_key("spin"): + dictionary["spin"]=None + + seriesdate=datetime.today + if dictionary["date"]: + datekeys=['year', 'month', 'day'] + datevalues=dictionary["date"].split('.') + datefields=dict(zip(datekeys, datevalues)) + if not datefields.has_key("day"): + datefields["day"]="15" + seriesdate=datetime(int(datefields["year"]), int(datefields["month"]), int(datefields["day"])) + + if "snapshots" in namespace and dictionary["rc"]: + raise RuntimeError('A snapshots namespace can not have an rc. This is a non-conforming input.') + elif "releases" not in namespace and dictionary["rc"]: + raise RuntimeError('An rc must have a "releases" namespace. This is a non-conforming input.') + elif dictionary["rc"]: + seriesname="candidate" + elif "snapshots" in namespace: + seriesname="snapshot" + elif "releases" in namespace: + seriesname="release" + elif "snapshot" in namespace: + raise RuntimeError('"snapshot" is not a complete namespace. A conforming namespace is "snapshots".') + else: + # TODO test for unknown namespace. + raise RuntimeError('Unknown namespace in input string.') + + package=Package(package="GCC", version=dictionary["version"]) + series=Series(seriesname, package=package, date=seriesdate, spin=dictionary["spin"], rc=dictionary["rc"], strict=True) + + return series + +from vers import versionFromStr + +class TestSeries(unittest.TestCase): + + def test_missing_seriestype(self): + with self.assertRaises(TypeError): + spin=Spin() + rc=Rc() + candidate=Series(package="GCC-5.3.1", spin=spin, rc=rc) + + def test_no_match(self): + with self.assertRaises(TypeError): + candidate=Series("foobar", package="GCC-5.3.1") + + def test_partial_seriestype_match(self): + with self.assertRaises(TypeError): + candidate=Series("candid", package="GCC-5.3.1") + + def test_excessive_seriestype_match(self): + with self.assertRaises(TypeError): + candidate=Series("candidate2", package="GCC-5.3.1") + + def test_match_release(self): + release=Series("release", package="GCC-5.3.1") + self.assertEqual(str(release).split("_")[0], "release") + + def test_match_candidate_wrongcase(self): + candidate=Series("Candidate", package="GCC-5.3.1",rc="1") + self.assertEqual(str(candidate).split("_")[0], "candidate") + + def test_match_snapshot_wrongcase(self): + snapshot=Series("SNAPSHOT", package="GCC-5.3.1") + self.assertEqual(str(snapshot).split("_")[0], "snapshot") + + def test_longlowertype_candidate(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="1") + self.assertEqual(candidate.longlowertype(), "release-candidate") + + def test_longuppertype_candidate(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="4") + self.assertEqual(candidate.longuppertype(), "Release-Candidate") + + def test_shorttype_candidate(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="1") + self.assertEqual(candidate.shorttype(), "candidate") + + def test_serieslabel_candidate(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="1") + self.assertEqual(candidate.serieslabel(), "") + + def test_serieslabel_release(self): + candidate=Series("release", package="GCC-5.3.1") + self.assertEqual(candidate.serieslabel(), "") + + def test_serieslabel_snapshot(self): + candidate=Series("snapshot", package="GCC-5.3.1") + self.assertEqual(candidate.serieslabel(), "snapshot-") + + def test_empty_rc(self): + rc=Rc(7) + candidate=Series("candidate", package="GCC-5.3.1", rc=rc) + self.assertEqual(candidate.rc.vers, 7) + self.assertEqual(str(candidate.rc), "FOO") + + def test_empty_rc(self): + rc=Rc() + release=Series("release", package="GCC-5.3.1", rc=rc) + self.assertEqual(str(release.rc), "") + + release2=Series("release", package="GCC-5.3.1") + self.assertEqual(str(release2.rc), "") + + def test_specified_rc(self): + rc=Rc(7) + candidate=Series("candidate", package="GCC-5.3.1", rc=rc) + self.assertEqual(candidate.rc.val, 7) + self.assertEqual(str(candidate.rc), "-rc7") + + def test_candidate_with_no_rc(self): + with self.assertRaises(TypeError): + candidate=Series("candidate", package="GCC-5.3.1") + + with self.assertRaises(TypeError): + rc=Rc() + candidate2=Series("candidate", package="GCC-5.3.1", rc=rc) + + def test_rc_as_string(self): + candidate=Series("candidate", package="GCC-5.3.1", rc="7") + self.assertEqual(candidate.rc.val, 7) + self.assertEqual(str(candidate.rc), "-rc7") + + def test_rc_as_int(self): + candidate=Series("candidate", package="GCC-5.3.1", rc=7) + self.assertEqual(candidate.rc.val, 7) + self.assertEqual(str(candidate.rc), "-rc7") + + def test_rc_as_typeerror(self): + floatrc=7.0 + with self.assertRaises(TypeError): + snapshot=Series("snapshot", package="GCC-5.3.1", rc=floatrc) + + def test_rc_as_negative(self): + with self.assertRaises(ValueError): + snapshot=Series("snapshot", package="GCC-5.3.1", rc="-1") + + def test_missing_spin(self): + snapshot=Series("snapshot", package="GCC-5.3.1") + self.assertEqual(snapshot.spin.val, 0) + self.assertEqual(str(snapshot.spin), "") + + def test_specified_spin(self): + spin=Spin(7) + snapshot=Series("snapshot", package="GCC-5.3.1", spin=spin) + self.assertEqual(snapshot.spin.val, 7) + self.assertEqual(str(snapshot.spin), "-7") + + def test_empty_spin(self): + spin=Spin() + snapshot=Series("snapshot", package="GCC-5.3.1", spin=spin) + self.assertEqual(str(snapshot.spin), "") + + def test_spin_as_string(self): + snapshot=Series("snapshot", spin="7", package="GCC-5.3.1") + self.assertEqual(snapshot.spin.val, 7) + self.assertEqual(str(snapshot.spin), "-7") + + def test_spin_as_int(self): + snapshot=Series("snapshot", spin=7, package="GCC-5.3.1") + self.assertEqual(snapshot.spin.val, 7) + self.assertEqual(str(snapshot.spin), "-7") + + def test_spin_as_typeerror(self): + floatspin=7.0 + with self.assertRaises(TypeError): + snapshot=Series("snapshot", spin=floatspin, package="GCC-5.3.1") + + def test_spin_as_negative(self): + with self.assertRaises(ValueError): + snapshot=Series("snapshot", spin="-1", package="GCC-5.3.1") + + def test_empty_spin_and_rc(self): + release=Series("release", package="GCC-5.3.1") + self.assertEqual(release.spin.val, 0) + self.assertEqual(release.rc.val, 0) + self.assertEqual(str(release.spin), "") + self.assertEqual(str(release.rc), "") + + def test_package_as_Package(self): + package = Package("GCC", "5.3.1") + release=Series("release", package) + self.assertEqual(str(release.package), "GCC-5.3.1") + + def test_package_as_Package(self): + # Create a Version instead of a package + package = versionFromStr("5.3.1") + with self.assertRaises(TypeError): + candidate=Series("candidate", package) + + def test_package_as_None(self): + package=None + with self.assertRaises(TypeError): + candidate=Series("candidate", package) + + def test_getbranchname(self): + candidate=Series("candidate", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + self.assertEqual(candidate.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc=None) + self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1") + + def test_date_string(self): + candidate=Series("candidate", package="GCC-5.3.1", date="2016.05.27", spin="1", rc="1") + self.assertEqual(datetime(2016,05,15),candidate.date) + + candidate2=Series("candidate", package="GCC-5.3.1", date="2016.05", spin="1", rc="1") + self.assertEqual(datetime(2016,05,15),candidate2.date) + + with self.assertRaises(TypeError): + candidate3=Series("candidate", package="GCC-5.3.1", date=datetime("20161034234"), spin="1", rc="1") + + def test_series_strict_true(self): + with self.assertRaises(ValueError): + snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=True) + + with self.assertRaises(ValueError): + snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + + with self.assertRaises(ValueError): + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=True) + + with self.assertRaises(ValueError): + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1") + + def test_series_strict_false(self): + snapshot=Series("snapshot", package="GCC-5.3.1", date=datetime(2016,05,15), spin="6", rc="1", strict=False) + self.assertEqual(snapshot.getBranchname(), "snapshots/linaro-5.3-2016.05-6-rc1") + + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), spin="1", rc="1", strict=False) + self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-1-rc1") + + release=Series("release", package="GCC-5.3.1", date=datetime(2016,05,15), rc="1", strict=False) + self.assertEqual(release.getBranchname(), "releases/linaro-5.3-2016.05-rc1") + + # TODO: Test combinations of branch names with and without spins/rcs. + + # TODO: Test whether seriesFromBranchname can detect non-conforming branch names. + + # These tests will verify that a branchname can be read, a series created, + # and then the same branch name recreated. + def test_seriesFromBranchname(self): + branch=u'snapshots/linaro-5.3-2016.05-6-rc1' + with self.assertRaises(RuntimeError): + series=seriesFromBranchname(branch=branch) + + branch2=u'snapshots/linaro-5.3-2016.05-6' + series2=seriesFromBranchname(branch=branch2) + self.assertEqual(series2.getBranchname(),u'snapshots/linaro-5.3-2016.05-6') + + branch3=u'snapshots/linaro-5.3-2016.05' + series3=seriesFromBranchname(branch=branch3) + self.assertEqual(series3.getBranchname(),u'snapshots/linaro-5.3-2016.05') + + branch4=u'releases/linaro-5.3-2016.05-6-rc1' + series4=seriesFromBranchname(branch=branch4) + self.assertEqual(series4.getBranchname(),u'releases/linaro-5.3-2016.05-6-rc1') + + branch5=u'releases/linaro-5.3-2016.05-rc1' + series5=seriesFromBranchname(branch=branch5) + self.assertEqual(series5.getBranchname(),u'releases/linaro-5.3-2016.05-rc1') + + branch6=u'releases/linaro-5.3-2016.05-6' + series6=seriesFromBranchname(branch=branch6) + self.assertEqual(series6.getBranchname(),u'releases/linaro-5.3-2016.05-6') + + branch7=u'releases/linaro-5.3-2016.05' + series7=seriesFromBranchname(branch=branch7) + self.assertEqual(series7.getBranchname(),u'releases/linaro-5.3-2016.05') + + branch8=u'snapshots/linaro-5.3-2016.05-6-rc1' + with self.assertRaises(RuntimeError): + series8=seriesFromBranchname(branch=branch8) + + branch9=u'snapshot/linaro-5.3-2016.05-6-rc1' + with self.assertRaises(RuntimeError): + series9=seriesFromBranchname(branch=branch9) + + branch10=u'snapshot/linaro-5.3-2016.05-6' + with self.assertRaises(RuntimeError): + series10=seriesFromBranchname(branch=branch10) + + # TODO: Test series.label (as there was a runtime bug) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() diff --git a/tcwg-release.sh b/tcwg-release.sh index ef2f3a9..73dfb97 100755 --- a/tcwg-release.sh +++ b/tcwg-release.sh @@ -696,6 +696,8 @@ elif (( VERBOSITY >= 3 )); then set -x; fi +# TODO: Validate BRANCH or TAG as valid. + # Track either the BRANCH or the TAG track="${BRANCH:+${BRANCH}}${TAG:+${TAG}}" @@ -828,6 +830,17 @@ print_info "${bold}Saved branch: ${save_branch}" print_info "${bold}Checking out branch \"${local_branch}\"..." pushd $REL_DIR/git 1>/dev/null git checkout -B ${local_branch} ${track} 2>/dev/null + +# if a tag get the branch: git branch -a --contains tag +# otherwise get the branch and use that for TRACK +# Pass $REL_DIR/git as the gcc source directory for pulling the GCC base version +# and the FSF version. +# Determine the basis URL: +# basis_url="http://snapshots.linaro.org/components/toolchain/gcc-linaro/${basedon}" +# CANDIDATE, RELEASE, or SNAPSHOT + + + popd 1>/dev/null print_info "${bold}Creating ${SNAPSHOT:+snapshot}${RELEASE:+release}${CANDIDATE:+release candidate} ChangeLog entries..." @@ -881,6 +894,7 @@ pushd $REL_DIR 1>/dev/null print_info "${bold}Creating tarball gcc-linaro-${rname}.tar.xz..." tar cfJ gcc-linaro-${rname}.tar.xz gcc-linaro-${rname} md5sum gcc-linaro-${rname}.tar.xz > gcc-linaro-${rname}.tar.xz.asc +# TODO Move this to after the release notes generation? ask "Do you want to upload the tarball to ${FILESERVER} [N/y] ?" user_ok if [ "$user_ok" == "y" ]; then scp gcc-linaro-${rname}.tar.* "${FILESERVER}" @@ -915,3 +929,19 @@ popd 1>/dev/null # Output the info a second time so it's all at the end of any captured logs. info hint + +print_info "${bold}Generating release notes..." +pushd $REL_DIR/git 1>/dev/null + +mydir="$(dirname "$(readlink -f "$0")")" + +print_info "${bold}Generating release notes into ${REL_DIR}/git/" + +set -x + python ${mydir}/rn.py ${track} -c -g ${REL_DIR}/git --date ${DATE} +set +x + +popd 1>/dev/null + + + diff --git a/template.py b/template.py new file mode 100644 index 0000000..4f6a865 --- /dev/null +++ b/template.py @@ -0,0 +1,94 @@ +import unittest +import logging +import os +import sys +import uuid + +from sh import ls +from proj import Proj +from clone import Clone +from workdir import Workdir +from rninput import yninput + +class TemplateRN(Workdir): + _readme='components/toolchain/binaries/README.textile' + _series='components/toolchain/binaries/README.textile.series' + + def __init__(self, proj, rnclone, branchname_suffix=None): + if branchname_suffix: + branchname='template_' + branchname_suffix + else: + branchname='template_' + str(uuid.uuid4()) + super(TemplateRN, self).__init__(proj, rnclone, "template", track="origin/master", branchname=branchname) + # Todo move template workdir out of detached head state. + + # TODO: Allow the user to edit BOTH the gcc-linaro/ and binaries/ + # README.textile file as currently this function only does the binary one. + # TODO: Create Unit Test + def updateTemplateReadme(self): + answer=yninput("Would you like to update the Template README.textile file?", "N") + if answer != "y": + print "The Template README.textile file has not been updated." + return False + + editor = os.getenv('EDITOR') + if not editor: + editor='/usr/bin/vim' + print "Edit the README.textile template fragment if changes are needed." + os.system(editor + ' ' + self.repodir + '/' + TemplateRN._readme) + + self.add(TemplateRN._readme) + return True + + # TODO: Create Unit Test + def updateTemplateSeries(self): + answer=yninput("Would you like to update the Template README.textile.series file?", "N") + if answer != "y": + print "The Template README.textile.series file has not been updated." + return False + + editor = os.getenv('EDITOR') + if not editor: + editor='/usr/bin/vim' + print "Edit the README.textile.series template fragment if changes are needed." + os.system(editor + ' ' + self.repodir + '/' + TemplateRN._series) + + self.add(TemplateRN._series) + return True + + def getReadme(self): + return self.repodir + '/' + TemplateRN._readme + + def getSeries(self): + return self.repodir + '/' + TemplateRN._series + +class TestTemplateRN(unittest.TestCase): + testdirprefix="TemplateRNUT" + + # Every instance of TestClass requires its own proj directory. + def setUp(self): + self.proj=Proj(prefix=TestTemplateRN.testdirprefix, persist=True) + + def tearDown(self): + self.proj.cleanup() + + def test_getREADME(self): + self.rnrepo=Clone(self.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + self.rntemplate=TemplateRN(self.proj,self.rnrepo) + compare=self.proj.projdir + '/template/' + TemplateRN._readme + self.assertEqual(self.rntemplate.getReadme(),compare) + self.assertTrue(os.path.isfile(self.rntemplate.getReadme())) + + def test_getSERIES(self): + self.rnrepo=Clone(self.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + self.rntemplate=TemplateRN(self.proj,self.rnrepo) + compare=self.proj.projdir + '/template/' + TemplateRN._series + self.assertEqual(self.rntemplate.getSeries(),compare) + self.assertTrue(os.path.isfile(self.rntemplate.getSeries())) + + + # TODO: Test branchname_suffix + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() @@ -0,0 +1,615 @@ +import unittest +import logging + +# Abstract baseclass for Vendor and Package that provides some basic +# typechecking. +class PV(object): + def __init__(self, pv=None): + if isinstance(pv, basestring): + self.pv=pv + else: + raise TypeError("Input to PV must be of type basestring.") + + def __str__(self): + return self.pv + + def lower(self): + return self.pv.lower() + +class Vendor(PV): + def __init__(self, vendor="Linaro"): + try: + super(Vendor,self).__init__(vendor) + except TypeError: + raise TypeError("Input 'vendor' must be of type basestring.") + +class Version(object): + # @major - required int or str representation of int. + # @minor - optional int or str representation of int, otherwise None. + # If None, then minor will not be output. + # @point - optional int or str representation of int, otherwise None. + # If None, then point will not be output. + def __init__(self, major, minor=None, point=None): + + if isinstance(major, int) or isinstance(major, basestring): + # This will throw an exception if 'major' is an empty string. + self.major=int(major) + else: + raise TypeError("major must be of type int, string.") + + if isinstance(minor, int) or isinstance(minor, basestring): + # This will throw an exception if 'minor' is an empty string. + self.minor=int(minor) + elif not minor: + self.minor=None + else: + raise TypeError("minor must be of type int, string, or None.") + + if isinstance(point, int) or isinstance(point, basestring): + + # This is illegal, as it'd mess up the output and make it look like + # there's a minor that's really a point. + if not minor and point: + raise RuntimeError("minor must be specified if point is specified.") + + # This will throw an exception if 'minor' is an empty string. + self.point=int(point) + elif not point: + self.point=None + else: + raise TypeError("point must be of type int, string, or None.") + + def __str__(self): + ver=str(self.major) + if self.minor: + ver=ver + u'.' + str(self.minor) + if self.point: + ver=ver + u'.' + str(self.point) + return ver + + # TODO: I'm sure there's a more pythonic way to do this but this is + # quick and dirty. + # %M - Major + # %m - Minor + # %p - Point + # delimiter - Delimiter character to use in output: space or dash. + # TODO: Use a dictionary with replace + def strfversion(self, formatstr, delimiter='.'): + toks=[str(x) for x in formatstr.split('%') if x.strip()] + + delstoks=len(toks) + verstoks=0 + + if self.major: + verstoks=verstoks+1 + if self.minor: + verstoks=verstoks+1 + if self.point: + verstoks=verstoks+1 + + if delstoks > verstoks: + numtoks=verstoks - 1 + else: + numtoks=delstoks - 1 + + vers=u'' + for tok in toks: + if tok == 'M': + vers=vers + str(self.major) + elif tok == 'm': + vers=vers + str(self.minor) + elif tok == 'p': + vers=vers + str(self.point) + if numtoks > 0: + vers=vers + delimiter + numtoks=numtoks-1 + return vers + +# Helper function which returns a Version. +def versionFromStr(version): + if not isinstance(version, basestring): + raise TypeError('input must be of type basestring') + + major=version.split('.', 1)[0] + + # split('.',1) will return index error on "5" but will return empty string on "5." + # Any empty string is not a valid input into Version. + if major == u'': + major=None + + try: + minor=version.split('.', 2)[1] + except IndexError: + minor=None + + if minor == u'': + minor=None + + try: + point=version.split('.', 3)[2] + except IndexError: + point=None + + if point == u'': + point=None + + return Version(major, minor, point) + +class Package(PV): + # @package - default = GCC + # @version - required. Either a string representing a version or a Version + # instance. + def __init__(self, package="GCC", version=None): + #TODO: Make sure package is not None. + if package == u'': + raise TypeError("Package must not be an empty string") + + # TODO: Provide a list of known inputs and test against those? + try: + super(Package,self).__init__(package) + except TypeError: + raise TypeError("Input 'package' must be of type basestring.") + + if not version: + raise RuntimeError("Package requires a version.") + elif isinstance(version, Version): + self.version=version + else: + self.version=versionFromStr(version) + + def get_package(self): + return super(Package,self).__str__() + + def get_version(self): + return str(self.version) + + def __str__(self): + return super(Package,self).__str__() + u'-' + str(self.version) + + def lower(self): + return super(Package,self).lower() + u'-' + str(self.version) + + #TODO: Create format function which can change the separator to ' ' or to '-'. + +def packageFromStr(package=None): + if not isinstance(package, basestring): + raise TypeError('packageFromStr requires a string as input') + + # Get the part before the version numbers. That's the package + try: + package_name=package.rsplit('-', 1)[0] + except IndexError: + raise RuntimeError('string must be <package_name>-<package_version>') + + try: + package_version=package.rsplit('-', 1)[1] + except IndexError: + raise RuntimeError('string must be <package_name>-<package_version>') + + if package_name == u'': + raise TypeError("Couldn't parse a package name from input string") + if package_version == u'': + raise TypeError("Couldn't parse a package version from input string") + + # This will throw exceptions if the input are malformed. + package=Package(package_name, package_version) + return package + +class Vers(object): + def __init__(self, val=None): + + if val is None: + self.val=0 + return + + if isinstance(val, int) or isinstance(val, basestring): + # This should raise an exception if the string can't be converted + # to an int. If val is already an int it's a nop. + intval=int(val) + + if intval < 0: + raise ValueError('Input val must not be negative') + self.val=intval + else: + raise TypeError("Input val must be of type int or string.") + + def increment(self): + self.val=self.val+1 + + # Return the "-<val>" string or an empty string. + def __str__(self): + # Don't return + if self.val is None or self.val == 0: + return "" + else: + return "-" + str(self.val) + +class Spin(Vers): + def __init__(self, spin=None): + try: + super(Spin,self).__init__(spin) + except ValueError: + raise ValueError('Input spin must not be negative') + +class Rc(Vers): + def __init__(self, rc=None): + try: + super(Rc,self).__init__(rc) + except ValueError: + raise ValueError('Input rc must not be negative') + + # The string representation differs slighty from Spin + def __str__(self): + # Don't return + if self.val is None or self.val == 0: + return "" + else: + return "-rc" + str(self.val) + +class TestVersion(unittest.TestCase): + def test_no_major(self): + with self.assertRaises(TypeError): + version=Version(major=None) + with self.assertRaises(TypeError): + version=Version() + + def test_empty_major(self): + with self.assertRaises(ValueError): + version=Version(major="") + + def test_empty_minor(self): + with self.assertRaises(ValueError): + version=Version(major=5, minor="") + + def test_empty_point(self): + with self.assertRaises(ValueError): + version=Version(major=5, minor=3, point="") + + def test_non_int_major(self): + with self.assertRaises(ValueError): + version=Version('A') + + def test_int_major(self): + version=Version(5) + self.assertEqual(str(version), "5") + version2=Version(major=5) + self.assertEqual(str(version2), "5") + + def test_intstr_major(self): + version=Version("5") + self.assertEqual(str(version), "5") + version2=Version(major="5") + self.assertEqual(str(version2), "5") + + def test_non_int_minor(self): + with self.assertRaises(ValueError): + version=Version(major="5", minor="A") + + def test_int_minor(self): + version=Version(major=5, minor=3) + self.assertEqual(str(version), "5.3") + + def test_intstr_minor(self): + version=Version(major="5", minor="3") + self.assertEqual(str(version), "5.3") + + def test_no_minor(self): + version=Version(major="5", minor=None) + self.assertEqual(str(version), "5") + + def test_point_no_minor(self): + with self.assertRaises(RuntimeError): + version=Version(major=5, minor=None, point=1) + + def test_int_point(self): + version=Version(major=5, minor=3, point=1) + self.assertEqual(str(version), "5.3.1") + + def test_non_int_point(self): + with self.assertRaises(ValueError): + version=Version(major=5, minor=3, point="A") + + def test_intstr_point(self): + version=Version(major="5", minor="3", point="1") + self.assertEqual(str(version), "5.3.1") + + def test_no_point(self): + version=Version(major="5", minor="3", point=None) + self.assertEqual(str(version), "5.3") + + def test_strfversion(self): + version=Version(major="5", minor="3", point="1") + self.assertEqual(version.strfversion("%M%m%p", delimiter='-'),"5-3-1") + self.assertEqual(version.strfversion("%M%m%p"),"5.3.1") + +class TestVendor(unittest.TestCase): + def test_vendor_default(self): + vendor=Vendor() + self.assertEqual(str(vendor), "Linaro") + self.assertEqual(vendor.lower(), "linaro") + + def test_none_input(self): + with self.assertRaises(TypeError): + vendor=Vendor(vendor=None) + + def test_str_input(self): + vendor=Vendor("TestVendor") + self.assertEqual(str(vendor), "TestVendor") + self.assertEqual(vendor.lower(), "testvendor") + +class TestPackage(unittest.TestCase): + + def test_package_no_version(self): + with self.assertRaises(RuntimeError): + # We require a version. + package=Package() + + def test_package_default(self): + package=Package(version="5.3.1") + self.assertEqual(str(package), "GCC-5.3.1") + self.assertEqual(package.lower(), "gcc-5.3.1") + self.assertEqual(package.pv, "GCC") + + def test_none_package(self): + with self.assertRaises(TypeError): + package=Package(package=None, version="5.3.1") + + def test_none_version(self): + with self.assertRaises(RuntimeError): + package=Package(package="GCC", version=None) + + def test_with_Version(self): + version=Version(5,3,1) + package=Package(version=version) + self.assertEqual(str(package), "GCC-5.3.1") + self.assertEqual(package.lower(), "gcc-5.3.1") + + def test_package_version(self): + version=Version(5,3,1) + package=Package(version=version) + self.assertEqual(str(package.version), "5.3.1") + self.assertEqual(package.get_version(), "5.3.1") + self.assertEqual(package.get_package(), "GCC") + + def test_str_input(self): + package=Package("TestPackage", version="5.3.1") + self.assertEqual(str(package), "TestPackage-5.3.1") + self.assertEqual(package.lower(), "testpackage-5.3.1") + + def test_empty_package_name(self): + with self.assertRaises(TypeError): + package=Package(package="", version="5.3.1") + +class TestSpin(unittest.TestCase): + + def test_spin_val(self): + spin=Spin(1) + self.assertEqual(spin.val, 1) + + def test_spin_val_str(self): + spin=Spin(1) + self.assertEqual(str(spin), "-1") + + def test_spin_increment_val(self): + spin=Spin(1) + spin.increment() + self.assertEqual(spin.val, 2) + + def test_spin_increment_str(self): + spin=Spin(1) + spin.increment() + self.assertEqual(str(spin), "-2") + + def test_zero_spin(self): + spin=Spin(0) + self.assertEqual(spin.val, 0) + + def test_zero_spin_str(self): + spin=Spin(0) + self.assertEqual(str(spin), "") + + def test_none_spin(self): + spin=Spin(None) + self.assertEqual(spin.val, 0) + + def test_none_spin_increment(self): + spin=Spin(None) + spin.increment() + self.assertEqual(spin.val, 1) + + def test_none_spin_str(self): + spin=Spin(None) + self.assertEqual(str(spin), "") + + def test_default_spin(self): + spin=Spin() + self.assertEqual(spin.val, 0) + + def test_default_spin_str(self): + spin=Spin() + self.assertEqual(str(spin), "") + + def test_negative_val(self): + with self.assertRaises(ValueError): + spin=Spin(-1) + + def test_default_spin_str(self): + spin=Spin() + self.assertEqual(str(spin), "") + + def test_spin_string_input(self): + spin=Spin("9") + self.assertEqual(spin.val, 9) + self.assertEqual(str(spin), "-9") + + def test_spin_negative_string_input(self): + with self.assertRaises(ValueError): + spin=Spin("-9") + + def test_float_type(self): + with self.assertRaises(TypeError): + spin=Spin(7.0) + +class TestRc(unittest.TestCase): + def test_rc_val(self): + rc=Rc(1) + self.assertEqual(rc.val, 1) + + def test_rc_str(self): + rc=Rc(1) + self.assertEqual(str(rc), "-rc1") + + def test_negative_val(self): + with self.assertRaises(ValueError): + rc=Rc(-1) + + def test_zero_rc_str(self): + rc=Rc(0) + self.assertEqual(str(rc), "") + + def test_none_rc_str(self): + rc=Rc(None) + self.assertEqual(str(rc), "") + + def test_default_rc_str(self): + rc=Rc() + self.assertEqual(str(rc), "") + + def test_rc_increment_val(self): + rc=Rc(1) + rc.increment() + self.assertEqual(rc.val, 2) + + def test_rc_increment_str(self): + rc=Rc(1) + rc.increment() + self.assertEqual(str(rc), "-rc2") + + def test_none_rc_increment_str(self): + rc=Rc(None) + rc.increment() + self.assertEqual(str(rc), "-rc1") + + def test_default_rc_str(self): + rc=Rc() + self.assertEqual(str(rc), "") + + def test_rc_string_input(self): + rc=Rc("9") + self.assertEqual(rc.val, 9) + self.assertEqual(str(rc), "-rc9") + + def test_rc_negative_string_input(self): + with self.assertRaises(ValueError): + rc=Rc("-9") + + def test_float_type(self): + with self.assertRaises(TypeError): + rc=Rc(7.0) + +class TestPackageFromString(unittest.TestCase): + + def test_none(self): + with self.assertRaises(TypeError): + package=packageFromStr(package=None) + + def test_with_package(self): + package=Package("GCC", "5.3.1") + with self.assertRaises(TypeError): + package=packageFromStr(package=package) + + def test_with_no_package_name(self): + with self.assertRaises(TypeError): + package=packageFromStr(package="-5.3.1") + + with self.assertRaises(RuntimeError): + package=packageFromStr(package="5.3.1") + + def test_with_no_package_version(self): + with self.assertRaises(TypeError): + package=packageFromStr(package="GCC-") + + with self.assertRaises(RuntimeError): + package=packageFromStr(package="GCC") + + def test_with_space(self): + with self.assertRaises(RuntimeError): + package=packageFromStr(package="GCC 5.3.1") + + def test_correct_usage(self): + package=packageFromStr(package="GCC-5.3.1") + self.assertEqual(str(package), "GCC-5.3.1") + self.assertEqual(package.lower(), "gcc-5.3.1") + self.assertEqual(package.pv, "GCC") + self.assertEqual(str(package.version), "5.3.1") + self.assertEqual(package.version.major, 5) + self.assertEqual(package.version.minor, 3) + self.assertEqual(package.version.point, 1) + +class TestVersionFromStr(unittest.TestCase): + + def test_extra_dot(self): + version=versionFromStr("5.3.1.4") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, 3) + self.assertEqual(version.point, 1) + + def test_major_minor_point(self): + version=versionFromStr("5.3.1") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, 3) + self.assertEqual(version.point, 1) + + def test_major_minor_dot(self): + version=versionFromStr("5.3.") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, 3) + self.assertEqual(version.point, None) + + def test_major_minor(self): + version=versionFromStr("5.3") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, 3) + self.assertEqual(version.point, None) + + def test_major_dot(self): + version=versionFromStr("5.") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, None) + self.assertEqual(version.point, None) + + def test_major(self): + version=versionFromStr("5") + self.assertEqual(version.major, 5) + self.assertEqual(version.minor, None) + self.assertEqual(version.point, None) + + def test_empty(self): + with self.assertRaises(TypeError): + version=versionFromStr("") + + def test_non_int_major(self): + with self.assertRaises(ValueError): + version=versionFromStr("a.b.c") + + def test_non_int_minor(self): + with self.assertRaises(ValueError): + version=versionFromStr("5.b.1") + + def test_non_int_point(self): + with self.assertRaises(ValueError): + version=versionFromStr("5.3.a") + + def test_non_string(self): + version=Version(5,3,1) + with self.assertRaises(TypeError): + version=versionFromStr(version) + + def test_none(self): + with self.assertRaises(TypeError): + version=versionFromStr(version=None) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() + diff --git a/workdir.py b/workdir.py new file mode 100644 index 0000000..e939dd0 --- /dev/null +++ b/workdir.py @@ -0,0 +1,190 @@ +import unittest +import logging +import os + +import uuid + +from proj import Proj + +from gitrepo import cd +from gitrepo import GitRepo + +from clone import Clone +from clone import cd + +from sh import git +from sh import git_new_workdir +from sh import ErrorReturnCode + +# TODO: Set the git repo pushurl. + +class Workdir(GitRepo): + # @ clone - clone this workdir from this Clone. + # @ workdir - name of the proposed workdir. + # @ track - derive the workdir from the branch designated by the string + # in 'track'. + # @ branchname - the name of the local branch + def __init__(self, proj, clone=None, workdir="", track=None, branchname=""): + super(Workdir,self).__init__(proj) + + # TODO: The Clone can be in a different project dir than proj. if + # clone.clonedir is not in proj.projdir then create a symlink in the + # current projdir. + + if not isinstance(clone, Clone): + raise TypeError('Clone input parameter is not of type Clone') + + if not isinstance(workdir, str): + raise TypeError('Workdir workdir parameter must be a string') + + # An empty workdir would result in an empty variable expansion + # and a malformed git new-workdir expression. + if workdir=="": + raise EnvironmentError('You must specify a workdir directory when creating a Workdir') + + if branchname=="": + raise EnvironmentError('You must specify a branchname when creating a Workdir') + + try: + with cd(self.proj.projdir): + logging.info("Workdir(): going to call git new-workdir tracking %s" % track) + if track: + logging.info("Workdir(): Calling git new-workdir for workdir %s, tracking %s" % (workdir, track)) + #TODO: Do we want to prevalidate that 'track' is a valid branch or tag? + # If we don't this will just throw an exception. + print git_new_workdir(clone.clonedir(), workdir, track, _err_to_out=True, _out="/dev/null") + else: + logging.info("Workdir(): Calling git new-workdir for workdir %s, tracking %s" % (workdir, "origin/HEAD")) + # Always just checkout HEAD if the track is not specified. + print git_new_workdir(clone.clonedir(), workdir, "origin/HEAD", _err_to_out=True, _out="/dev/null") + + except ErrorReturnCode as exc: + # if 'workdir' is None + # If 'workdir' already exists. + # If clone.clonedir() doesn't exist. + # If track isn't a valid branch. + raise EnvironmentError("Unable to create a git workdir ") + + self.repodir=self.proj.projdir + '/' + workdir + + try: + with cd(self.repodir): + git("checkout", "-b", branchname) + except ErrorReturnCode as exc: + raise EnvironmentError("Unable to checkout branch %s in workdir %s" % (branchname, self.repodir)) + + + #TODO: Verify that self.repodir exists or 'raise' exception? + + # TODO: write this function or the workdir will remain in detached + # head state if it's not checked out into a branch. + # def checkout(self, ): + + def workdir(self): + return self.repodir + +class TestWorkdir(unittest.TestCase): + testdirprefix="WorkdirUT" + + @classmethod + def setUpClass(cls): + # We need a Proj dir to store the necessary Clone in. + cls.proj=Proj(prefix=TestWorkdir.testdirprefix) + + #TODO: Perform a connectivity test to determine online or offline + # testing mode. + + # We'll create a Clone to be used by all the Workdir tests in order to + # be nice to the git server. + cls.rnclone=Clone(cls.proj,remote=u'http://git.linaro.org/toolchain/release-notes.git') + #OFFLINE: cls.rnclone=Clone(cls.proj,remote=u'/var/run/media/ryanarn/sidecar/reldir/release-notes') + + # All further tests will use this as the remote if they need to + # create more clones. + cls.rnremote=cls.rnclone.repodir + + # A common name to use for the workdir for all of the Workdir tests. + cls.workdir="testworkdir" + + @classmethod + def tearDownClass(cls): + cls.proj.cleanup() + + # Every instance of TestWorkdir requires its own proj directory. + def setUp(self): + self.proj=Proj(prefix=TestWorkdir.testdirprefix) + self.branchname="testworkdir-" + str(uuid.uuid4()) + + def tearDown(self): + self.proj.cleanup() + + def test_workdir(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track=None, branchname=self.branchname) + self.assertTrue(os.path.isdir(self.workdir.workdir())) + + def test_workdir_with_default_track(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) + self.assertTrue(os.path.isdir(self.workdir.workdir())) + + def test_workdir_with_track(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) + self.assertTrue(os.path.isdir(self.workdir.workdir())) + + def test_workdir_with_checkoutbranch(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname="startbranch") + self.assertTrue(os.path.isdir(self.workdir.workdir())) + + # Test a new branch (derived from the existing branch) + with self.workdir.checkoutbranch("TestWorkdir", self.workdir.getbranch()): + self.assertEqual("TestWorkdir", self.workdir.getbranch()) + + # Verify that leaving the context restored the original branch. + self.assertEqual(self.workdir.getbranch(), "startbranch") + + # Test being able to switch to a branch that already exists. + with self.workdir.checkoutbranch("TestWorkdir"): + self.assertEqual("TestWorkdir", self.workdir.getbranch()) + + self.assertEqual(self.workdir.getbranch(), "startbranch") + + def test_workdir_already_exists(self): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) + + # Should fail because TestWorkdir.workdir should already exist. + with self.assertRaises(EnvironmentError): + self.workdir2=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname + '_2') + + def test_workdir_exceptions(self): + with self.assertRaises(TypeError): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=None, branchname=self.branchname) + + with self.assertRaises(EnvironmentError): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="someinvalidbranchname", branchname=self.branchname) + + # Test if clone.clonedir() doesn't exist by breaking repodir. + brokenclone=Clone(TestWorkdir.proj,TestWorkdir.rnremote) + brokenclone.repodir="/breakthisrepodir" + + # Verify that a Clone with a broken repodir/clonedir results in an + # exception. + with self.assertRaises(EnvironmentError): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, branchname=self.branchname) + + # 'clone' isn't a valid Clone type. + with self.assertRaises(TypeError): + self.workdir=Workdir(self.proj, clone="foobar", workdir=TestWorkdir.workdir, branchname=self.branchname) + + def test_workdir_exceptions(self): + #TODO: branchname is empty. + with self.assertRaises(EnvironmentError): + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname="") + + def test_workdir_exceptions(self): + #TODO: branchname already exists. + self.workdir=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) + with self.assertRaises(EnvironmentError): + self.workdir2=Workdir(self.proj, clone=TestWorkdir.rnclone, workdir=TestWorkdir.workdir, track="origin/HEAD", branchname=self.branchname) + +if __name__ == '__main__': + #logging.basicConfig(level="INFO") + unittest.main() |