aboutsummaryrefslogtreecommitdiff
path: root/rhodecode/tests/scripts
diff options
context:
space:
mode:
authorMarcin Kuzminski <marcin@python-works.com>2012-07-01 16:24:15 +0200
committerMarcin Kuzminski <marcin@python-works.com>2012-07-01 16:24:15 +0200
commitf4eb9d6e084e7d604909dd8ec52646537622e299 (patch)
tree42b5e9f1e86850682a08e509768e9e72583b4184 /rhodecode/tests/scripts
parent70588803b7d19a45699dbe4a8b19a03de6a5c277 (diff)
orginized test module
- separated test models into separate files - added scripts for script tests - few fixes and renames for readability --HG-- branch : beta rename : rhodecode/tests/test_models.py => rhodecode/tests/models/test_notifications.py rename : rhodecode/tests/mem_watch => rhodecode/tests/scripts/mem_watch rename : rhodecode/tests/_test_concurency.py => rhodecode/tests/scripts/test_concurency.py rename : rhodecode/tests/rhodecode_crawler.py => rhodecode/tests/scripts/test_crawler.py rename : rhodecode/tests/test_hg_operations.py => rhodecode/tests/scripts/test_scm_operations.py
Diffstat (limited to 'rhodecode/tests/scripts')
-rwxr-xr-xrhodecode/tests/scripts/mem_watch1
-rw-r--r--rhodecode/tests/scripts/test_concurency.py213
-rwxr-xr-xrhodecode/tests/scripts/test_crawler.py184
-rwxr-xr-xrhodecode/tests/scripts/test_scm_operations.py410
4 files changed, 808 insertions, 0 deletions
diff --git a/rhodecode/tests/scripts/mem_watch b/rhodecode/tests/scripts/mem_watch
new file mode 100755
index 00000000..81f65557
--- /dev/null
+++ b/rhodecode/tests/scripts/mem_watch
@@ -0,0 +1 @@
+ps -eo size,pid,user,command --sort -size | awk '{ hr=$1/1024 ; printf("%13.2f Mb ",hr) } { for ( x=4 ; x<=NF ; x++ ) { printf("%s ",$x) } print "" }'|grep [p]aster
diff --git a/rhodecode/tests/scripts/test_concurency.py b/rhodecode/tests/scripts/test_concurency.py
new file mode 100644
index 00000000..b75c3133
--- /dev/null
+++ b/rhodecode/tests/scripts/test_concurency.py
@@ -0,0 +1,213 @@
+# -*- coding: utf-8 -*-
+"""
+ rhodecode.tests.test_hg_operations
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Test suite for making push/pull operations
+
+ :created_on: Dec 30, 2010
+ :author: marcink
+ :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
+ :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import shutil
+import logging
+from os.path import join as jn
+from os.path import dirname as dn
+
+from tempfile import _RandomNameSequence
+from subprocess import Popen, PIPE
+
+from paste.deploy import appconfig
+from pylons import config
+from sqlalchemy import engine_from_config
+
+from rhodecode.lib.utils import add_cache
+from rhodecode.model import init_model
+from rhodecode.model import meta
+from rhodecode.model.db import User, Repository
+from rhodecode.lib.auth import get_crypt_password
+
+from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
+from rhodecode.config.environment import load_environment
+
+rel_path = dn(dn(dn(os.path.abspath(__file__))))
+conf = appconfig('config:development.ini', relative_to=rel_path)
+load_environment(conf.global_conf, conf.local_conf)
+
+add_cache(conf)
+
+USER = 'test_admin'
+PASS = 'test12'
+HOST = 'hg.local'
+METHOD = 'pull'
+DEBUG = True
+log = logging.getLogger(__name__)
+
+
+class Command(object):
+
+ def __init__(self, cwd):
+ self.cwd = cwd
+
+ def execute(self, cmd, *args):
+ """Runs command on the system with given ``args``.
+ """
+
+ command = cmd + ' ' + ' '.join(args)
+ log.debug('Executing %s' % command)
+ if DEBUG:
+ print command
+ p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
+ stdout, stderr = p.communicate()
+ if DEBUG:
+ print stdout, stderr
+ return stdout, stderr
+
+
+def get_session():
+ engine = engine_from_config(conf, 'sqlalchemy.db1.')
+ init_model(engine)
+ sa = meta.Session
+ return sa
+
+
+def create_test_user(force=True):
+ print 'creating test user'
+ sa = get_session()
+
+ user = sa.query(User).filter(User.username == USER).scalar()
+
+ if force and user is not None:
+ print 'removing current user'
+ for repo in sa.query(Repository).filter(Repository.user == user).all():
+ sa.delete(repo)
+ sa.delete(user)
+ sa.commit()
+
+ if user is None or force:
+ print 'creating new one'
+ new_usr = User()
+ new_usr.username = USER
+ new_usr.password = get_crypt_password(PASS)
+ new_usr.email = 'mail@mail.com'
+ new_usr.name = 'test'
+ new_usr.lastname = 'lasttestname'
+ new_usr.active = True
+ new_usr.admin = True
+ sa.add(new_usr)
+ sa.commit()
+
+ print 'done'
+
+
+def create_test_repo(force=True):
+ print 'creating test repo'
+ from rhodecode.model.repo import RepoModel
+ sa = get_session()
+
+ user = sa.query(User).filter(User.username == USER).scalar()
+ if user is None:
+ raise Exception('user not found')
+
+ repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
+
+ if repo is None:
+ print 'repo not found creating'
+
+ form_data = {'repo_name':HG_REPO,
+ 'repo_type':'hg',
+ 'private':False,
+ 'clone_uri':'' }
+ rm = RepoModel(sa)
+ rm.base_path = '/home/hg'
+ rm.create(form_data, user)
+
+ print 'done'
+
+
+def set_anonymous_access(enable=True):
+ sa = get_session()
+ user = sa.query(User).filter(User.username == 'default').one()
+ user.active = enable
+ sa.add(user)
+ sa.commit()
+
+
+def get_anonymous_access():
+ sa = get_session()
+ return sa.query(User).filter(User.username == 'default').one().active
+
+
+#==============================================================================
+# TESTS
+#==============================================================================
+def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
+ seq=None):
+ cwd = path = jn(TESTS_TMP_PATH, repo)
+
+ if seq == None:
+ seq = _RandomNameSequence().next()
+
+ try:
+ shutil.rmtree(path, ignore_errors=True)
+ os.makedirs(path)
+ #print 'made dirs %s' % jn(path)
+ except OSError:
+ raise
+
+ clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
+ {'user':USER,
+ 'pass':PASS,
+ 'host':HOST,
+ 'cloned_repo':repo, }
+
+ dest = path + seq
+ if method == 'pull':
+ stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url)
+ else:
+ stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest)
+
+ if no_errors is False:
+ assert """adding file changes""" in stdout, 'no messages about cloning'
+ assert """abort""" not in stderr , 'got error from clone'
+
+if __name__ == '__main__':
+ try:
+ create_test_user(force=False)
+ seq = None
+ import time
+
+ try:
+ METHOD = sys.argv[3]
+ except:
+ pass
+
+ if METHOD == 'pull':
+ seq = _RandomNameSequence().next()
+ test_clone_with_credentials(repo=sys.argv[1], method='clone',
+ seq=seq)
+ s = time.time()
+ for i in range(1, int(sys.argv[2]) + 1):
+ print 'take', i
+ test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
+ seq=seq)
+ print 'time taken %.3f' % (time.time() - s)
+ except Exception, e:
+ raise
+ sys.exit('stop on %s' % e)
diff --git a/rhodecode/tests/scripts/test_crawler.py b/rhodecode/tests/scripts/test_crawler.py
new file mode 100755
index 00000000..4da18a7f
--- /dev/null
+++ b/rhodecode/tests/scripts/test_crawler.py
@@ -0,0 +1,184 @@
+# -*- coding: utf-8 -*-
+"""
+ rhodecode.tests.test_crawer
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Test for crawling a project for memory usage
+ This should be runned just as regular script together
+ with a watch script that will show memory usage.
+
+ watch -n1 ./rhodecode/tests/mem_watch
+
+ :created_on: Apr 21, 2010
+ :author: marcink
+ :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
+ :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import cookielib
+import urllib
+import urllib2
+import time
+import os
+import sys
+from os.path import join as jn
+from os.path import dirname as dn
+
+__here__ = os.path.abspath(__file__)
+__root__ = dn(dn(dn(__here__)))
+sys.path.append(__root__)
+
+from rhodecode.lib import vcs
+from rhodecode.lib.compat import OrderedSet
+from rhodecode.lib.vcs.exceptions import RepositoryError
+
+PASES = 3
+HOST = 'http://127.0.0.1'
+PORT = 5000
+BASE_URI = '%s:%s/' % (HOST, PORT)
+
+if len(sys.argv) == 2:
+ BASE_URI = sys.argv[1]
+
+if not BASE_URI.endswith('/'):
+ BASE_URI += '/'
+
+print 'Crawling @ %s' % BASE_URI
+BASE_URI += '%s'
+PROJECT_PATH = jn('/', 'home', 'marcink', 'hg_repos')
+PROJECTS = [
+ 'linux-magx-pbranch',
+ 'CPython',
+ 'rhodecode_tip',
+]
+
+
+cj = cookielib.FileCookieJar('/tmp/rc_test_cookie.txt')
+o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
+o.addheaders = [
+ ('User-agent', 'rhodecode-crawler'),
+ ('Accept-Language', 'en - us, en;q = 0.5')
+]
+
+urllib2.install_opener(o)
+
+
+def _get_repo(proj):
+ if isinstance(proj, basestring):
+ repo = vcs.get_repo(jn(PROJECT_PATH, proj))
+ proj = proj
+ else:
+ repo = proj
+ proj = repo.name
+
+ return repo, proj
+
+
+def test_changelog_walk(proj, pages=100):
+ repo, proj = _get_repo(proj)
+
+ total_time = 0
+ for i in range(1, pages):
+
+ page = '/'.join((proj, 'changelog',))
+
+ full_uri = (BASE_URI % page) + '?' + urllib.urlencode({'page':i})
+ s = time.time()
+ f = o.open(full_uri)
+ size = len(f.read())
+ e = time.time() - s
+ total_time += e
+ print 'visited %s size:%s req:%s ms' % (full_uri, size, e)
+
+ print 'total_time', total_time
+ print 'average on req', total_time / float(pages)
+
+
+def test_changeset_walk(proj, limit=None):
+ repo, proj = _get_repo(proj)
+
+ print 'processing', jn(PROJECT_PATH, proj)
+ total_time = 0
+
+ cnt = 0
+ for i in repo:
+ cnt += 1
+ raw_cs = '/'.join((proj, 'changeset', i.raw_id))
+ if limit and limit == cnt:
+ break
+
+ full_uri = (BASE_URI % raw_cs)
+ print '%s visiting %s\%s' % (cnt, full_uri, i)
+ s = time.time()
+ f = o.open(full_uri)
+ size = len(f.read())
+ e = time.time() - s
+ total_time += e
+ print '%s visited %s\%s size:%s req:%s ms' % (cnt, full_uri, i, size, e)
+
+ print 'total_time', total_time
+ print 'average on req', total_time / float(cnt)
+
+
+def test_files_walk(proj, limit=100):
+ repo, proj = _get_repo(proj)
+
+ print 'processing', jn(PROJECT_PATH, proj)
+ total_time = 0
+
+ paths_ = OrderedSet([''])
+ try:
+ tip = repo.get_changeset('tip')
+ for topnode, dirs, files in tip.walk('/'):
+
+ for dir in dirs:
+ paths_.add(dir.path)
+ for f in dir:
+ paths_.add(f.path)
+
+ for f in files:
+ paths_.add(f.path)
+
+ except RepositoryError, e:
+ pass
+
+ cnt = 0
+ for f in paths_:
+ cnt += 1
+ if limit and limit == cnt:
+ break
+
+ file_path = '/'.join((proj, 'files', 'tip', f))
+ full_uri = (BASE_URI % file_path)
+ print '%s visiting %s' % (cnt, full_uri)
+ s = time.time()
+ f = o.open(full_uri)
+ size = len(f.read())
+ e = time.time() - s
+ total_time += e
+ print '%s visited OK size:%s req:%s ms' % (cnt, size, e)
+
+ print 'total_time', total_time
+ print 'average on req', total_time / float(cnt)
+
+if __name__ == '__main__':
+ for path in PROJECTS:
+ repo = vcs.get_repo(jn(PROJECT_PATH, path))
+ for i in range(PASES):
+ print 'PASS %s/%s' % (i, PASES)
+ test_changelog_walk(repo, pages=80)
+ test_changeset_walk(repo, limit=100)
+ test_files_walk(repo, limit=100)
diff --git a/rhodecode/tests/scripts/test_scm_operations.py b/rhodecode/tests/scripts/test_scm_operations.py
new file mode 100755
index 00000000..13d2bb0a
--- /dev/null
+++ b/rhodecode/tests/scripts/test_scm_operations.py
@@ -0,0 +1,410 @@
+# -*- coding: utf-8 -*-
+"""
+ rhodecode.tests.test_hg_operations
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Test suite for making push/pull operations
+
+ :created_on: Dec 30, 2010
+ :author: marcink
+ :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
+ :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import time
+import sys
+import shutil
+import logging
+
+from os.path import join as jn
+from os.path import dirname as dn
+
+from tempfile import _RandomNameSequence
+from subprocess import Popen, PIPE
+
+from paste.deploy import appconfig
+from pylons import config
+from sqlalchemy import engine_from_config
+
+from rhodecode.lib.utils import add_cache
+from rhodecode.model import init_model
+from rhodecode.model import meta
+from rhodecode.model.db import User, Repository, UserLog
+from rhodecode.lib.auth import get_crypt_password
+
+from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
+from rhodecode.config.environment import load_environment
+
+rel_path = dn(dn(dn(os.path.abspath(__file__))))
+
+conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
+load_environment(conf.global_conf, conf.local_conf)
+
+add_cache(conf)
+
+USER = 'test_admin'
+PASS = 'test12'
+HOST = '127.0.0.1:5000'
+DEBUG = False
+print 'DEBUG:', DEBUG
+log = logging.getLogger(__name__)
+
+engine = engine_from_config(conf, 'sqlalchemy.db1.')
+init_model(engine)
+sa = meta.Session()
+
+
+class Command(object):
+
+ def __init__(self, cwd):
+ self.cwd = cwd
+
+ def execute(self, cmd, *args):
+ """Runs command on the system with given ``args``.
+ """
+
+ command = cmd + ' ' + ' '.join(args)
+ log.debug('Executing %s' % command)
+ if DEBUG:
+ print command
+ p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
+ stdout, stderr = p.communicate()
+ if DEBUG:
+ print stdout, stderr
+ return stdout, stderr
+
+
+def test_wrapp(func):
+
+ def __wrapp(*args, **kwargs):
+ print '>>>%s' % func.__name__
+ try:
+ res = func(*args, **kwargs)
+ except Exception, e:
+ print ('###############\n-'
+ '--%s failed %s--\n'
+ '###############\n' % (func.__name__, e))
+ sys.exit()
+ print '++OK++'
+ return res
+ return __wrapp
+
+
+def create_test_user(force=True):
+ print '\tcreating test user'
+
+ user = User.get_by_username(USER)
+
+ if force and user is not None:
+ print '\tremoving current user'
+ for repo in Repository.query().filter(Repository.user == user).all():
+ sa.delete(repo)
+ sa.delete(user)
+ sa.commit()
+
+ if user is None or force:
+ print '\tcreating new one'
+ new_usr = User()
+ new_usr.username = USER
+ new_usr.password = get_crypt_password(PASS)
+ new_usr.email = 'mail@mail.com'
+ new_usr.name = 'test'
+ new_usr.lastname = 'lasttestname'
+ new_usr.active = True
+ new_usr.admin = True
+ sa.add(new_usr)
+ sa.commit()
+
+ print '\tdone'
+
+
+def create_test_repo(force=True):
+ from rhodecode.model.repo import RepoModel
+
+ user = User.get_by_username(USER)
+ if user is None:
+ raise Exception('user not found')
+
+ repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
+
+ if repo is None:
+ print '\trepo not found creating'
+
+ form_data = {'repo_name':HG_REPO,
+ 'repo_type':'hg',
+ 'private':False,
+ 'clone_uri':'' }
+ rm = RepoModel(sa)
+ rm.base_path = '/home/hg'
+ rm.create(form_data, user)
+
+
+def set_anonymous_access(enable=True):
+ user = User.get_by_username('default')
+ user.active = enable
+ sa.add(user)
+ sa.commit()
+ print '\tanonymous access is now:', enable
+ if enable != User.get_by_username('default').active:
+ raise Exception('Cannot set anonymous access')
+
+
+def get_anonymous_access():
+ user = User.get_by_username('default')
+ return user.active
+
+
+#==============================================================================
+# TESTS
+#==============================================================================
+@test_wrapp
+def test_clone_with_credentials(no_errors=False):
+ cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+
+ try:
+ shutil.rmtree(path, ignore_errors=True)
+ os.makedirs(path)
+ #print 'made dirs %s' % jn(path)
+ except OSError:
+ raise
+
+ print '\tchecking if anonymous access is enabled'
+ anonymous_access = get_anonymous_access()
+ if anonymous_access:
+ print '\tenabled, disabling it '
+ set_anonymous_access(enable=False)
+
+ clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
+ {'user':USER,
+ 'pass':PASS,
+ 'host':HOST,
+ 'cloned_repo':HG_REPO,
+ 'dest':path}
+
+ stdout, stderr = Command(cwd).execute('hg clone', clone_url)
+
+ if no_errors is False:
+ assert """adding file changes""" in stdout, 'no messages about cloning'
+ assert """abort""" not in stderr , 'got error from clone'
+
+
+@test_wrapp
+def test_clone_anonymous():
+ cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+
+ try:
+ shutil.rmtree(path, ignore_errors=True)
+ os.makedirs(path)
+ #print 'made dirs %s' % jn(path)
+ except OSError:
+ raise
+
+
+ print '\tchecking if anonymous access is enabled'
+ anonymous_access = get_anonymous_access()
+ if not anonymous_access:
+ print '\tnot enabled, enabling it '
+ set_anonymous_access(enable=True)
+
+ clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
+ {'user':USER,
+ 'pass':PASS,
+ 'host':HOST,
+ 'cloned_repo':HG_REPO,
+ 'dest':path}
+
+ stdout, stderr = Command(cwd).execute('hg clone', clone_url)
+
+ assert """adding file changes""" in stdout, 'no messages about cloning'
+ assert """abort""" not in stderr , 'got error from clone'
+
+ #disable if it was enabled
+ if not anonymous_access:
+ print '\tdisabling anonymous access'
+ set_anonymous_access(enable=False)
+
+
+@test_wrapp
+def test_clone_wrong_credentials():
+ cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+
+ try:
+ shutil.rmtree(path, ignore_errors=True)
+ os.makedirs(path)
+ #print 'made dirs %s' % jn(path)
+ except OSError:
+ raise
+
+ print '\tchecking if anonymous access is enabled'
+ anonymous_access = get_anonymous_access()
+ if anonymous_access:
+ print '\tenabled, disabling it '
+ set_anonymous_access(enable=False)
+
+ clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
+ {'user':USER + 'error',
+ 'pass':PASS,
+ 'host':HOST,
+ 'cloned_repo':HG_REPO,
+ 'dest':path}
+
+ stdout, stderr = Command(cwd).execute('hg clone', clone_url)
+
+ if not """abort: authorization failed""" in stderr:
+ raise Exception('Failure')
+
+
+@test_wrapp
+def test_pull():
+ pass
+
+
+@test_wrapp
+def test_push_modify_file(f_name='setup.py'):
+ cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+ modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
+ for i in xrange(5):
+ cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
+ Command(cwd).execute(cmd)
+
+ cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
+ Command(cwd).execute(cmd)
+
+ Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
+
+
+@test_wrapp
+def test_push_new_file(commits=15, with_clone=True):
+
+ if with_clone:
+ test_clone_with_credentials(no_errors=True)
+
+ cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+ added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
+
+ Command(cwd).execute('touch %s' % added_file)
+
+ Command(cwd).execute('hg add %s' % added_file)
+
+ for i in xrange(commits):
+ cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
+ Command(cwd).execute(cmd)
+
+ cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
+ 'Marcin Kuźminski <marcin@python-blog.com>',
+ added_file)
+ Command(cwd).execute(cmd)
+
+ push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
+ {'user':USER,
+ 'pass':PASS,
+ 'host':HOST,
+ 'cloned_repo':HG_REPO,
+ 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
+
+ Command(cwd).execute('hg push --verbose --debug %s' % push_url)
+
+
+@test_wrapp
+def test_push_wrong_credentials():
+ cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+ clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
+ {'user':USER + 'xxx',
+ 'pass':PASS,
+ 'host':HOST,
+ 'cloned_repo':HG_REPO,
+ 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
+
+ modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
+ for i in xrange(5):
+ cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
+ Command(cwd).execute(cmd)
+
+ cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
+ Command(cwd).execute(cmd)
+
+ Command(cwd).execute('hg push %s' % clone_url)
+
+
+@test_wrapp
+def test_push_wrong_path():
+ cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+ added_file = jn(path, 'somefile.py')
+
+ try:
+ shutil.rmtree(path, ignore_errors=True)
+ os.makedirs(path)
+ print '\tmade dirs %s' % jn(path)
+ except OSError:
+ raise
+
+ Command(cwd).execute("""echo '' > %s""" % added_file)
+ Command(cwd).execute("""hg init %s""" % path)
+ Command(cwd).execute("""hg add %s""" % added_file)
+
+ for i in xrange(2):
+ cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
+ Command(cwd).execute(cmd)
+
+ cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
+ Command(cwd).execute(cmd)
+
+ clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
+ {'user':USER,
+ 'pass':PASS,
+ 'host':HOST,
+ 'cloned_repo':HG_REPO + '_error',
+ 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
+
+ stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
+ if not """abort: HTTP Error 403: Forbidden""" in stderr:
+ raise Exception('Failure')
+
+
+@test_wrapp
+def get_logs():
+ return UserLog.query().all()
+
+
+@test_wrapp
+def test_logs(initial):
+ logs = UserLog.query().all()
+ operations = 4
+ if len(initial) + operations != len(logs):
+ raise Exception("missing number of logs initial:%s vs current:%s" % \
+ (len(initial), len(logs)))
+
+
+if __name__ == '__main__':
+ create_test_user(force=False)
+ create_test_repo()
+
+ initial_logs = get_logs()
+ print 'initial activity logs: %s' % len(initial_logs)
+ s = time.time()
+ #test_push_modify_file()
+ test_clone_with_credentials()
+ test_clone_wrong_credentials()
+
+ test_push_new_file(commits=2, with_clone=True)
+
+ test_clone_anonymous()
+ test_push_wrong_path()
+
+ test_push_wrong_credentials()
+
+ test_logs(initial_logs)
+ print 'finished ok in %.3f' % (time.time() - s)