2019-04-11 13:56:20 +00:00
# Copyright (C) 2010, 2011 Sebastian Thiel ( and contributors
# This module is part of GitDB and is released under
# the New BSD License:
"""Utilities used in ODB testing"""
from gitdb import OStream
from gitdb.utils.compat import xrange
import sys
import random
from array import array
from io import BytesIO
import glob
import unittest
import tempfile
import shutil
import os
import gc
import logging
from functools import wraps
#{ Bases
class TestBase(unittest.TestCase):
"""Base class for all tests
TestCase providing access to readonly repositories using the following member variables.
* gitrepopath
* read-only base path of the git source repository, i.e. .../git/.git
#{ Invvariants
k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE"
#} END invariants
def setUpClass(cls):
super(TestBase, cls).setUpClass()
except AttributeError:
cls.gitrepopath = os.environ.get(cls.k_env_git_repo)
if not cls.gitrepopath:
"You can set the %s environment variable to a .git repository of your choice - defaulting to the gitdb repository", cls.k_env_git_repo)
ospd = os.path.dirname
cls.gitrepopath = os.path.join(ospd(ospd(ospd(__file__))), '.git')
# end assure gitrepo is set
assert cls.gitrepopath.endswith('.git')
#} END bases
#{ Decorators
def skip_on_travis_ci(func):
"""All tests decorated with this one will raise SkipTest when run on travis ci.
Use it to workaround difficult to solve issues
NOTE: copied from bcore ("""
def wrapper(self, *args, **kwargs):
if 'TRAVIS' in os.environ:
import nose
raise nose.SkipTest("Cannot run on travis-ci")
# end check for travis ci
return func(self, *args, **kwargs)
# end wrapper
return wrapper
def with_rw_directory(func):
"""Create a temporary directory which can be written to, remove it if the
test succeeds, but leave it otherwise to aid additional debugging"""
def wrapper(self):
path = tempfile.mktemp(prefix=func.__name__)
keep = False
return func(self, path)
except Exception:
sys.stderr.write("Test {}.{} failed, output is at {!r}\n".format(type(self).__name__, func.__name__, path))
keep = True
# Need to collect here to be sure all handles have been closed. It appears
# a windows-only issue. In fact things should be deleted, as well as
# memory maps closed, once objects go out of scope. For some reason
# though this is not the case here unless we collect explicitly.
if not keep:
# END handle exception
# END wrapper
wrapper.__name__ = func.__name__
return wrapper
def with_packs_rw(func):
"""Function that provides a path into which the packs for testing should be
copied. Will pass on the path to the actual function afterwards"""
def wrapper(self, path):
src_pack_glob = fixture_path('packs/*')
copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
return func(self, path)
# END wrapper
wrapper.__name__ = func.__name__
return wrapper
#} END decorators
#{ Routines
def fixture_path(relapath=''):
""":return: absolute path into the fixture directory
:param relapath: relative path into the fixtures directory, or ''
to obtain the fixture directory itself"""
return os.path.join(os.path.dirname(__file__), 'fixtures', relapath)
def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
"""Copy all files found according to the given source glob into the target directory
:param hard_link_ok: if True, hard links will be created if possible. Otherwise
the files will be copied"""
for src_file in glob.glob(source_glob):
if hard_link_ok and hasattr(os, 'link'):
target = os.path.join(target_dir, os.path.basename(src_file))
try:, target)
except OSError:
shutil.copy(src_file, target_dir)
# END handle cross device links ( and resulting failure )
shutil.copy(src_file, target_dir)
# END try hard link
# END for each file to copy
def make_bytes(size_in_bytes, randomize=False):
""":return: string with given size in bytes
:param randomize: try to produce a very random stream"""
actual_size = size_in_bytes // 4
producer = xrange(actual_size)
if randomize:
producer = list(producer)
# END randomize
a = array('i', producer)
return a.tostring()
def make_object(type, data):
""":return: bytes resembling an uncompressed object"""
odata = "blob %i\0" % len(data)
return odata.encode("ascii") + data
def make_memory_file(size_in_bytes, randomize=False):
""":return: tuple(size_of_stream, stream)
:param randomize: try to produce a very random stream"""
d = make_bytes(size_in_bytes, randomize)
return len(d), BytesIO(d)
#} END routines
#{ Stream Utilities
class DummyStream(object):
def __init__(self):
self.was_read = False
self.bytes = 0
self.closed = False
def read(self, size):
self.was_read = True
self.bytes = size
def close(self):
self.closed = True
def _assert(self):
assert self.was_read
class DeriveTest(OStream):
def __init__(self, sha, type, size, stream, *args, **kwargs):
self.myarg = kwargs.pop('myarg')
self.args = args
def _assert(self):
assert self.args
assert self.myarg
#} END stream utilitiess