Spaces:
Runtime error
Runtime error
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors | |
# | |
# This module is part of GitDB and is released under | |
# the New BSD License: https://opensource.org/license/bsd-3-clause/ | |
"""Test for object db""" | |
from gitdb.test.lib import ( | |
TestBase, | |
DummyStream, | |
make_bytes, | |
make_object, | |
fixture_path | |
) | |
from gitdb import ( | |
DecompressMemMapReader, | |
FDCompressedSha1Writer, | |
LooseObjectDB, | |
Sha1Writer, | |
MemoryDB, | |
IStream, | |
) | |
from gitdb.util import hex_to_bin | |
import zlib | |
from gitdb.typ import ( | |
str_blob_type | |
) | |
import tempfile | |
import os | |
from io import BytesIO | |
class TestStream(TestBase): | |
"""Test stream classes""" | |
data_sizes = (15, 10000, 1000 * 1024 + 512) | |
def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): | |
"""Make stream tests - the orig_stream is seekable, allowing it to be | |
rewound and reused | |
:param cdata: the data we expect to read from stream, the contents | |
:param rewind_stream: function called to rewind the stream to make it ready | |
for reuse""" | |
ns = 10 | |
assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata)) | |
# read in small steps | |
ss = len(cdata) // ns | |
for i in range(ns): | |
data = stream.read(ss) | |
chunk = cdata[i * ss:(i + 1) * ss] | |
assert data == chunk | |
# END for each step | |
rest = stream.read() | |
if rest: | |
assert rest == cdata[-len(rest):] | |
# END handle rest | |
if isinstance(stream, DecompressMemMapReader): | |
assert len(stream.data()) == stream.compressed_bytes_read() | |
# END handle special type | |
rewind_stream(stream) | |
# read everything | |
rdata = stream.read() | |
assert rdata == cdata | |
if isinstance(stream, DecompressMemMapReader): | |
assert len(stream.data()) == stream.compressed_bytes_read() | |
# END handle special type | |
def test_decompress_reader(self): | |
for close_on_deletion in range(2): | |
for with_size in range(2): | |
for ds in self.data_sizes: | |
cdata = make_bytes(ds, randomize=False) | |
# zdata = zipped actual data | |
# cdata = original content data | |
# create reader | |
if with_size: | |
# need object data | |
zdata = zlib.compress(make_object(str_blob_type, cdata)) | |
typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) | |
assert size == len(cdata) | |
assert typ == str_blob_type | |
# even if we don't set the size, it will be set automatically on first read | |
test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) | |
assert test_reader._s == len(cdata) | |
else: | |
# here we need content data | |
zdata = zlib.compress(cdata) | |
reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) | |
assert reader._s == len(cdata) | |
# END get reader | |
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) | |
# put in a dummy stream for closing | |
dummy = DummyStream() | |
reader._m = dummy | |
assert not dummy.closed | |
del(reader) | |
assert dummy.closed == close_on_deletion | |
# END for each datasize | |
# END whether size should be used | |
# END whether stream should be closed when deleted | |
def test_sha_writer(self): | |
writer = Sha1Writer() | |
assert 2 == writer.write(b"hi") | |
assert len(writer.sha(as_hex=1)) == 40 | |
assert len(writer.sha(as_hex=0)) == 20 | |
# make sure it does something ;) | |
prev_sha = writer.sha() | |
writer.write(b"hi again") | |
assert writer.sha() != prev_sha | |
def test_compressed_writer(self): | |
for ds in self.data_sizes: | |
fd, path = tempfile.mkstemp() | |
ostream = FDCompressedSha1Writer(fd) | |
data = make_bytes(ds, randomize=False) | |
# for now, just a single write, code doesn't care about chunking | |
assert len(data) == ostream.write(data) | |
ostream.close() | |
# its closed already | |
self.assertRaises(OSError, os.close, fd) | |
# read everything back, compare to data we zip | |
fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0)) | |
written_data = os.read(fd, os.path.getsize(path)) | |
assert len(written_data) == os.path.getsize(path) | |
os.close(fd) | |
assert written_data == zlib.compress(data, 1) # best speed | |
os.remove(path) | |
# END for each os | |
def test_decompress_reader_special_case(self): | |
odb = LooseObjectDB(fixture_path('objects')) | |
mdb = MemoryDB() | |
for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911', | |
b'7bb839852ed5e3a069966281bb08d50012fb309b',): | |
ostream = odb.stream(hex_to_bin(sha)) | |
# if there is a bug, we will be missing one byte exactly ! | |
data = ostream.read() | |
assert len(data) == ostream.size | |
# Putting it back in should yield nothing new - after all, we have | |
dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data))) | |
assert dump.hexsha == sha | |
# end for each loose object sha to test | |