Spaces:
Running
Running
File size: 5,727 Bytes
1380717 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: https://opensource.org/license/bsd-3-clause/
"""Test for object db"""
from gitdb.test.lib import (
TestBase,
DummyStream,
make_bytes,
make_object,
fixture_path
)
from gitdb import (
DecompressMemMapReader,
FDCompressedSha1Writer,
LooseObjectDB,
Sha1Writer,
MemoryDB,
IStream,
)
from gitdb.util import hex_to_bin
import zlib
from gitdb.typ import (
str_blob_type
)
import tempfile
import os
from io import BytesIO
class TestStream(TestBase):
"""Test stream classes"""
data_sizes = (15, 10000, 1000 * 1024 + 512)
def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
"""Make stream tests - the orig_stream is seekable, allowing it to be
rewound and reused
:param cdata: the data we expect to read from stream, the contents
:param rewind_stream: function called to rewind the stream to make it ready
for reuse"""
ns = 10
assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))
# read in small steps
ss = len(cdata) // ns
for i in range(ns):
data = stream.read(ss)
chunk = cdata[i * ss:(i + 1) * ss]
assert data == chunk
# END for each step
rest = stream.read()
if rest:
assert rest == cdata[-len(rest):]
# END handle rest
if isinstance(stream, DecompressMemMapReader):
assert len(stream.data()) == stream.compressed_bytes_read()
# END handle special type
rewind_stream(stream)
# read everything
rdata = stream.read()
assert rdata == cdata
if isinstance(stream, DecompressMemMapReader):
assert len(stream.data()) == stream.compressed_bytes_read()
# END handle special type
def test_decompress_reader(self):
for close_on_deletion in range(2):
for with_size in range(2):
for ds in self.data_sizes:
cdata = make_bytes(ds, randomize=False)
# zdata = zipped actual data
# cdata = original content data
# create reader
if with_size:
# need object data
zdata = zlib.compress(make_object(str_blob_type, cdata))
typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
assert size == len(cdata)
assert typ == str_blob_type
# even if we don't set the size, it will be set automatically on first read
test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
assert test_reader._s == len(cdata)
else:
# here we need content data
zdata = zlib.compress(cdata)
reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
assert reader._s == len(cdata)
# END get reader
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
# put in a dummy stream for closing
dummy = DummyStream()
reader._m = dummy
assert not dummy.closed
del(reader)
assert dummy.closed == close_on_deletion
# END for each datasize
# END whether size should be used
# END whether stream should be closed when deleted
def test_sha_writer(self):
writer = Sha1Writer()
assert 2 == writer.write(b"hi")
assert len(writer.sha(as_hex=1)) == 40
assert len(writer.sha(as_hex=0)) == 20
# make sure it does something ;)
prev_sha = writer.sha()
writer.write(b"hi again")
assert writer.sha() != prev_sha
def test_compressed_writer(self):
for ds in self.data_sizes:
fd, path = tempfile.mkstemp()
ostream = FDCompressedSha1Writer(fd)
data = make_bytes(ds, randomize=False)
# for now, just a single write, code doesn't care about chunking
assert len(data) == ostream.write(data)
ostream.close()
# its closed already
self.assertRaises(OSError, os.close, fd)
# read everything back, compare to data we zip
fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0))
written_data = os.read(fd, os.path.getsize(path))
assert len(written_data) == os.path.getsize(path)
os.close(fd)
assert written_data == zlib.compress(data, 1) # best speed
os.remove(path)
# END for each os
def test_decompress_reader_special_case(self):
odb = LooseObjectDB(fixture_path('objects'))
mdb = MemoryDB()
for sha in (b'888401851f15db0eed60eb1bc29dec5ddcace911',
b'7bb839852ed5e3a069966281bb08d50012fb309b',):
ostream = odb.stream(hex_to_bin(sha))
# if there is a bug, we will be missing one byte exactly !
data = ostream.read()
assert len(data) == ostream.size
# Putting it back in should yield nothing new - after all, we have
dump = mdb.store(IStream(ostream.type, ostream.size, BytesIO(data)))
assert dump.hexsha == sha
# end for each loose object sha to test
|