diff options
author | Marcin Kuzminski <marcin@python-works.com> | 2012-07-01 16:24:15 +0200 |
---|---|---|
committer | Marcin Kuzminski <marcin@python-works.com> | 2012-07-01 16:24:15 +0200 |
commit | f4eb9d6e084e7d604909dd8ec52646537622e299 (patch) | |
tree | 42b5e9f1e86850682a08e509768e9e72583b4184 /rhodecode/tests/scripts | |
parent | 70588803b7d19a45699dbe4a8b19a03de6a5c277 (diff) |
orginized test module
- separated test models into separate files
- added scripts for script tests
- few fixes and renames for readability
--HG--
branch : beta
rename : rhodecode/tests/test_models.py => rhodecode/tests/models/test_notifications.py
rename : rhodecode/tests/mem_watch => rhodecode/tests/scripts/mem_watch
rename : rhodecode/tests/_test_concurency.py => rhodecode/tests/scripts/test_concurency.py
rename : rhodecode/tests/rhodecode_crawler.py => rhodecode/tests/scripts/test_crawler.py
rename : rhodecode/tests/test_hg_operations.py => rhodecode/tests/scripts/test_scm_operations.py
Diffstat (limited to 'rhodecode/tests/scripts')
-rwxr-xr-x | rhodecode/tests/scripts/mem_watch | 1 | ||||
-rw-r--r-- | rhodecode/tests/scripts/test_concurency.py | 213 | ||||
-rwxr-xr-x | rhodecode/tests/scripts/test_crawler.py | 184 | ||||
-rwxr-xr-x | rhodecode/tests/scripts/test_scm_operations.py | 410 |
4 files changed, 808 insertions, 0 deletions
diff --git a/rhodecode/tests/scripts/mem_watch b/rhodecode/tests/scripts/mem_watch new file mode 100755 index 00000000..81f65557 --- /dev/null +++ b/rhodecode/tests/scripts/mem_watch @@ -0,0 +1 @@ +ps -eo size,pid,user,command --sort -size | awk '{ hr=$1/1024 ; printf("%13.2f Mb ",hr) } { for ( x=4 ; x<=NF ; x++ ) { printf("%s ",$x) } print "" }'|grep [p]aster diff --git a/rhodecode/tests/scripts/test_concurency.py b/rhodecode/tests/scripts/test_concurency.py new file mode 100644 index 00000000..b75c3133 --- /dev/null +++ b/rhodecode/tests/scripts/test_concurency.py @@ -0,0 +1,213 @@ +# -*- coding: utf-8 -*- +""" + rhodecode.tests.test_hg_operations + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Test suite for making push/pull operations + + :created_on: Dec 30, 2010 + :author: marcink + :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> + :license: GPLv3, see COPYING for more details. +""" +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import sys +import shutil +import logging +from os.path import join as jn +from os.path import dirname as dn + +from tempfile import _RandomNameSequence +from subprocess import Popen, PIPE + +from paste.deploy import appconfig +from pylons import config +from sqlalchemy import engine_from_config + +from rhodecode.lib.utils import add_cache +from rhodecode.model import init_model +from rhodecode.model import meta +from rhodecode.model.db import User, Repository +from rhodecode.lib.auth import get_crypt_password + +from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO +from rhodecode.config.environment import load_environment + +rel_path = dn(dn(dn(os.path.abspath(__file__)))) +conf = appconfig('config:development.ini', relative_to=rel_path) +load_environment(conf.global_conf, conf.local_conf) + +add_cache(conf) + +USER = 'test_admin' +PASS = 'test12' +HOST = 'hg.local' +METHOD = 'pull' +DEBUG = True +log = logging.getLogger(__name__) + + +class Command(object): + + def __init__(self, cwd): + self.cwd = cwd + + def execute(self, cmd, *args): + """Runs command on the system with given ``args``. + """ + + command = cmd + ' ' + ' '.join(args) + log.debug('Executing %s' % command) + if DEBUG: + print command + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) + stdout, stderr = p.communicate() + if DEBUG: + print stdout, stderr + return stdout, stderr + + +def get_session(): + engine = engine_from_config(conf, 'sqlalchemy.db1.') + init_model(engine) + sa = meta.Session + return sa + + +def create_test_user(force=True): + print 'creating test user' + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + + if force and user is not None: + print 'removing current user' + for repo in sa.query(Repository).filter(Repository.user == user).all(): + sa.delete(repo) + sa.delete(user) + sa.commit() + + if user is None or force: + print 'creating new one' + new_usr = User() + new_usr.username = USER + new_usr.password = get_crypt_password(PASS) + new_usr.email = 'mail@mail.com' + new_usr.name = 'test' + new_usr.lastname = 'lasttestname' + new_usr.active = True + new_usr.admin = True + sa.add(new_usr) + sa.commit() + + print 'done' + + +def create_test_repo(force=True): + print 'creating test repo' + from rhodecode.model.repo import RepoModel + sa = get_session() + + user = sa.query(User).filter(User.username == USER).scalar() + if user is None: + raise Exception('user not found') + + repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() + + if repo is None: + print 'repo not found creating' + + form_data = {'repo_name':HG_REPO, + 'repo_type':'hg', + 'private':False, + 'clone_uri':'' } + rm = RepoModel(sa) + rm.base_path = '/home/hg' + rm.create(form_data, user) + + print 'done' + + +def set_anonymous_access(enable=True): + sa = get_session() + user = sa.query(User).filter(User.username == 'default').one() + user.active = enable + sa.add(user) + sa.commit() + + +def get_anonymous_access(): + sa = get_session() + return sa.query(User).filter(User.username == 'default').one().active + + +#============================================================================== +# TESTS +#============================================================================== +def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD, + seq=None): + cwd = path = jn(TESTS_TMP_PATH, repo) + + if seq == None: + seq = _RandomNameSequence().next() + + try: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path) + #print 'made dirs %s' % jn(path) + except OSError: + raise + + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ + {'user':USER, + 'pass':PASS, + 'host':HOST, + 'cloned_repo':repo, } + + dest = path + seq + if method == 'pull': + stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url) + else: + stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest) + + if no_errors is False: + assert """adding file changes""" in stdout, 'no messages about cloning' + assert """abort""" not in stderr , 'got error from clone' + +if __name__ == '__main__': + try: + create_test_user(force=False) + seq = None + import time + + try: + METHOD = sys.argv[3] + except: + pass + + if METHOD == 'pull': + seq = _RandomNameSequence().next() + test_clone_with_credentials(repo=sys.argv[1], method='clone', + seq=seq) + s = time.time() + for i in range(1, int(sys.argv[2]) + 1): + print 'take', i + test_clone_with_credentials(repo=sys.argv[1], method=METHOD, + seq=seq) + print 'time taken %.3f' % (time.time() - s) + except Exception, e: + raise + sys.exit('stop on %s' % e) diff --git a/rhodecode/tests/scripts/test_crawler.py b/rhodecode/tests/scripts/test_crawler.py new file mode 100755 index 00000000..4da18a7f --- /dev/null +++ b/rhodecode/tests/scripts/test_crawler.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +""" + rhodecode.tests.test_crawer + ~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Test for crawling a project for memory usage + This should be runned just as regular script together + with a watch script that will show memory usage. + + watch -n1 ./rhodecode/tests/mem_watch + + :created_on: Apr 21, 2010 + :author: marcink + :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> + :license: GPLv3, see COPYING for more details. +""" +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + + +import cookielib +import urllib +import urllib2 +import time +import os +import sys +from os.path import join as jn +from os.path import dirname as dn + +__here__ = os.path.abspath(__file__) +__root__ = dn(dn(dn(__here__))) +sys.path.append(__root__) + +from rhodecode.lib import vcs +from rhodecode.lib.compat import OrderedSet +from rhodecode.lib.vcs.exceptions import RepositoryError + +PASES = 3 +HOST = 'http://127.0.0.1' +PORT = 5000 +BASE_URI = '%s:%s/' % (HOST, PORT) + +if len(sys.argv) == 2: + BASE_URI = sys.argv[1] + +if not BASE_URI.endswith('/'): + BASE_URI += '/' + +print 'Crawling @ %s' % BASE_URI +BASE_URI += '%s' +PROJECT_PATH = jn('/', 'home', 'marcink', 'hg_repos') +PROJECTS = [ + 'linux-magx-pbranch', + 'CPython', + 'rhodecode_tip', +] + + +cj = cookielib.FileCookieJar('/tmp/rc_test_cookie.txt') +o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) +o.addheaders = [ + ('User-agent', 'rhodecode-crawler'), + ('Accept-Language', 'en - us, en;q = 0.5') +] + +urllib2.install_opener(o) + + +def _get_repo(proj): + if isinstance(proj, basestring): + repo = vcs.get_repo(jn(PROJECT_PATH, proj)) + proj = proj + else: + repo = proj + proj = repo.name + + return repo, proj + + +def test_changelog_walk(proj, pages=100): + repo, proj = _get_repo(proj) + + total_time = 0 + for i in range(1, pages): + + page = '/'.join((proj, 'changelog',)) + + full_uri = (BASE_URI % page) + '?' + urllib.urlencode({'page':i}) + s = time.time() + f = o.open(full_uri) + size = len(f.read()) + e = time.time() - s + total_time += e + print 'visited %s size:%s req:%s ms' % (full_uri, size, e) + + print 'total_time', total_time + print 'average on req', total_time / float(pages) + + +def test_changeset_walk(proj, limit=None): + repo, proj = _get_repo(proj) + + print 'processing', jn(PROJECT_PATH, proj) + total_time = 0 + + cnt = 0 + for i in repo: + cnt += 1 + raw_cs = '/'.join((proj, 'changeset', i.raw_id)) + if limit and limit == cnt: + break + + full_uri = (BASE_URI % raw_cs) + print '%s visiting %s\%s' % (cnt, full_uri, i) + s = time.time() + f = o.open(full_uri) + size = len(f.read()) + e = time.time() - s + total_time += e + print '%s visited %s\%s size:%s req:%s ms' % (cnt, full_uri, i, size, e) + + print 'total_time', total_time + print 'average on req', total_time / float(cnt) + + +def test_files_walk(proj, limit=100): + repo, proj = _get_repo(proj) + + print 'processing', jn(PROJECT_PATH, proj) + total_time = 0 + + paths_ = OrderedSet(['']) + try: + tip = repo.get_changeset('tip') + for topnode, dirs, files in tip.walk('/'): + + for dir in dirs: + paths_.add(dir.path) + for f in dir: + paths_.add(f.path) + + for f in files: + paths_.add(f.path) + + except RepositoryError, e: + pass + + cnt = 0 + for f in paths_: + cnt += 1 + if limit and limit == cnt: + break + + file_path = '/'.join((proj, 'files', 'tip', f)) + full_uri = (BASE_URI % file_path) + print '%s visiting %s' % (cnt, full_uri) + s = time.time() + f = o.open(full_uri) + size = len(f.read()) + e = time.time() - s + total_time += e + print '%s visited OK size:%s req:%s ms' % (cnt, size, e) + + print 'total_time', total_time + print 'average on req', total_time / float(cnt) + +if __name__ == '__main__': + for path in PROJECTS: + repo = vcs.get_repo(jn(PROJECT_PATH, path)) + for i in range(PASES): + print 'PASS %s/%s' % (i, PASES) + test_changelog_walk(repo, pages=80) + test_changeset_walk(repo, limit=100) + test_files_walk(repo, limit=100) diff --git a/rhodecode/tests/scripts/test_scm_operations.py b/rhodecode/tests/scripts/test_scm_operations.py new file mode 100755 index 00000000..13d2bb0a --- /dev/null +++ b/rhodecode/tests/scripts/test_scm_operations.py @@ -0,0 +1,410 @@ +# -*- coding: utf-8 -*- +""" + rhodecode.tests.test_hg_operations + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + Test suite for making push/pull operations + + :created_on: Dec 30, 2010 + :author: marcink + :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> + :license: GPLv3, see COPYING for more details. +""" +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import time +import sys +import shutil +import logging + +from os.path import join as jn +from os.path import dirname as dn + +from tempfile import _RandomNameSequence +from subprocess import Popen, PIPE + +from paste.deploy import appconfig +from pylons import config +from sqlalchemy import engine_from_config + +from rhodecode.lib.utils import add_cache +from rhodecode.model import init_model +from rhodecode.model import meta +from rhodecode.model.db import User, Repository, UserLog +from rhodecode.lib.auth import get_crypt_password + +from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO +from rhodecode.config.environment import load_environment + +rel_path = dn(dn(dn(os.path.abspath(__file__)))) + +conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path) +load_environment(conf.global_conf, conf.local_conf) + +add_cache(conf) + +USER = 'test_admin' +PASS = 'test12' +HOST = '127.0.0.1:5000' +DEBUG = False +print 'DEBUG:', DEBUG +log = logging.getLogger(__name__) + +engine = engine_from_config(conf, 'sqlalchemy.db1.') +init_model(engine) +sa = meta.Session() + + +class Command(object): + + def __init__(self, cwd): + self.cwd = cwd + + def execute(self, cmd, *args): + """Runs command on the system with given ``args``. + """ + + command = cmd + ' ' + ' '.join(args) + log.debug('Executing %s' % command) + if DEBUG: + print command + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) + stdout, stderr = p.communicate() + if DEBUG: + print stdout, stderr + return stdout, stderr + + +def test_wrapp(func): + + def __wrapp(*args, **kwargs): + print '>>>%s' % func.__name__ + try: + res = func(*args, **kwargs) + except Exception, e: + print ('###############\n-' + '--%s failed %s--\n' + '###############\n' % (func.__name__, e)) + sys.exit() + print '++OK++' + return res + return __wrapp + + +def create_test_user(force=True): + print '\tcreating test user' + + user = User.get_by_username(USER) + + if force and user is not None: + print '\tremoving current user' + for repo in Repository.query().filter(Repository.user == user).all(): + sa.delete(repo) + sa.delete(user) + sa.commit() + + if user is None or force: + print '\tcreating new one' + new_usr = User() + new_usr.username = USER + new_usr.password = get_crypt_password(PASS) + new_usr.email = 'mail@mail.com' + new_usr.name = 'test' + new_usr.lastname = 'lasttestname' + new_usr.active = True + new_usr.admin = True + sa.add(new_usr) + sa.commit() + + print '\tdone' + + +def create_test_repo(force=True): + from rhodecode.model.repo import RepoModel + + user = User.get_by_username(USER) + if user is None: + raise Exception('user not found') + + repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() + + if repo is None: + print '\trepo not found creating' + + form_data = {'repo_name':HG_REPO, + 'repo_type':'hg', + 'private':False, + 'clone_uri':'' } + rm = RepoModel(sa) + rm.base_path = '/home/hg' + rm.create(form_data, user) + + +def set_anonymous_access(enable=True): + user = User.get_by_username('default') + user.active = enable + sa.add(user) + sa.commit() + print '\tanonymous access is now:', enable + if enable != User.get_by_username('default').active: + raise Exception('Cannot set anonymous access') + + +def get_anonymous_access(): + user = User.get_by_username('default') + return user.active + + +#============================================================================== +# TESTS +#============================================================================== +@test_wrapp +def test_clone_with_credentials(no_errors=False): + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + + try: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path) + #print 'made dirs %s' % jn(path) + except OSError: + raise + + print '\tchecking if anonymous access is enabled' + anonymous_access = get_anonymous_access() + if anonymous_access: + print '\tenabled, disabling it ' + set_anonymous_access(enable=False) + + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ + {'user':USER, + 'pass':PASS, + 'host':HOST, + 'cloned_repo':HG_REPO, + 'dest':path} + + stdout, stderr = Command(cwd).execute('hg clone', clone_url) + + if no_errors is False: + assert """adding file changes""" in stdout, 'no messages about cloning' + assert """abort""" not in stderr , 'got error from clone' + + +@test_wrapp +def test_clone_anonymous(): + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + + try: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path) + #print 'made dirs %s' % jn(path) + except OSError: + raise + + + print '\tchecking if anonymous access is enabled' + anonymous_access = get_anonymous_access() + if not anonymous_access: + print '\tnot enabled, enabling it ' + set_anonymous_access(enable=True) + + clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ + {'user':USER, + 'pass':PASS, + 'host':HOST, + 'cloned_repo':HG_REPO, + 'dest':path} + + stdout, stderr = Command(cwd).execute('hg clone', clone_url) + + assert """adding file changes""" in stdout, 'no messages about cloning' + assert """abort""" not in stderr , 'got error from clone' + + #disable if it was enabled + if not anonymous_access: + print '\tdisabling anonymous access' + set_anonymous_access(enable=False) + + +@test_wrapp +def test_clone_wrong_credentials(): + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + + try: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path) + #print 'made dirs %s' % jn(path) + except OSError: + raise + + print '\tchecking if anonymous access is enabled' + anonymous_access = get_anonymous_access() + if anonymous_access: + print '\tenabled, disabling it ' + set_anonymous_access(enable=False) + + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ + {'user':USER + 'error', + 'pass':PASS, + 'host':HOST, + 'cloned_repo':HG_REPO, + 'dest':path} + + stdout, stderr = Command(cwd).execute('hg clone', clone_url) + + if not """abort: authorization failed""" in stderr: + raise Exception('Failure') + + +@test_wrapp +def test_pull(): + pass + + +@test_wrapp +def test_push_modify_file(f_name='setup.py'): + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) + for i in xrange(5): + cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) + Command(cwd).execute(cmd) + + cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) + Command(cwd).execute(cmd) + + Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) + + +@test_wrapp +def test_push_new_file(commits=15, with_clone=True): + + if with_clone: + test_clone_with_credentials(no_errors=True) + + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) + + Command(cwd).execute('touch %s' % added_file) + + Command(cwd).execute('hg add %s' % added_file) + + for i in xrange(commits): + cmd = """echo 'added_line%s' >> %s""" % (i, added_file) + Command(cwd).execute(cmd) + + cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i, + 'Marcin Kuźminski <marcin@python-blog.com>', + added_file) + Command(cwd).execute(cmd) + + push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ + {'user':USER, + 'pass':PASS, + 'host':HOST, + 'cloned_repo':HG_REPO, + 'dest':jn(TESTS_TMP_PATH, HG_REPO)} + + Command(cwd).execute('hg push --verbose --debug %s' % push_url) + + +@test_wrapp +def test_push_wrong_credentials(): + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ + {'user':USER + 'xxx', + 'pass':PASS, + 'host':HOST, + 'cloned_repo':HG_REPO, + 'dest':jn(TESTS_TMP_PATH, HG_REPO)} + + modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') + for i in xrange(5): + cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) + Command(cwd).execute(cmd) + + cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file) + Command(cwd).execute(cmd) + + Command(cwd).execute('hg push %s' % clone_url) + + +@test_wrapp +def test_push_wrong_path(): + cwd = path = jn(TESTS_TMP_PATH, HG_REPO) + added_file = jn(path, 'somefile.py') + + try: + shutil.rmtree(path, ignore_errors=True) + os.makedirs(path) + print '\tmade dirs %s' % jn(path) + except OSError: + raise + + Command(cwd).execute("""echo '' > %s""" % added_file) + Command(cwd).execute("""hg init %s""" % path) + Command(cwd).execute("""hg add %s""" % added_file) + + for i in xrange(2): + cmd = """echo 'added_line%s' >> %s""" % (i, added_file) + Command(cwd).execute(cmd) + + cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) + Command(cwd).execute(cmd) + + clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ + {'user':USER, + 'pass':PASS, + 'host':HOST, + 'cloned_repo':HG_REPO + '_error', + 'dest':jn(TESTS_TMP_PATH, HG_REPO)} + + stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) + if not """abort: HTTP Error 403: Forbidden""" in stderr: + raise Exception('Failure') + + +@test_wrapp +def get_logs(): + return UserLog.query().all() + + +@test_wrapp +def test_logs(initial): + logs = UserLog.query().all() + operations = 4 + if len(initial) + operations != len(logs): + raise Exception("missing number of logs initial:%s vs current:%s" % \ + (len(initial), len(logs))) + + +if __name__ == '__main__': + create_test_user(force=False) + create_test_repo() + + initial_logs = get_logs() + print 'initial activity logs: %s' % len(initial_logs) + s = time.time() + #test_push_modify_file() + test_clone_with_credentials() + test_clone_wrong_credentials() + + test_push_new_file(commits=2, with_clone=True) + + test_clone_anonymous() + test_push_wrong_path() + + test_push_wrong_credentials() + + test_logs(initial_logs) + print 'finished ok in %.3f' % (time.time() - s) |