Spaces:
Running
Running
import sys | |
import gc | |
import gzip | |
import os | |
import threading | |
import time | |
import warnings | |
import io | |
import re | |
import pytest | |
from pathlib import Path | |
from tempfile import NamedTemporaryFile | |
from io import BytesIO, StringIO | |
from datetime import datetime | |
import locale | |
from multiprocessing import Process, Value | |
from ctypes import c_bool | |
import numpy as np | |
import numpy.ma as ma | |
from numpy.lib._iotools import ConverterError, ConversionWarning | |
from numpy.compat import asbytes | |
from numpy.ma.testutils import assert_equal | |
from numpy.testing import ( | |
assert_warns, assert_, assert_raises_regex, assert_raises, | |
assert_allclose, assert_array_equal, temppath, tempdir, IS_PYPY, | |
HAS_REFCOUNT, suppress_warnings, assert_no_gc_cycles, assert_no_warnings, | |
break_cycles | |
) | |
from numpy.testing._private.utils import requires_memory | |
class TextIO(BytesIO): | |
"""Helper IO class. | |
Writes encode strings to bytes if needed, reads return bytes. | |
This makes it easier to emulate files opened in binary mode | |
without needing to explicitly convert strings to bytes in | |
setting up the test data. | |
""" | |
def __init__(self, s=""): | |
BytesIO.__init__(self, asbytes(s)) | |
def write(self, s): | |
BytesIO.write(self, asbytes(s)) | |
def writelines(self, lines): | |
BytesIO.writelines(self, [asbytes(s) for s in lines]) | |
IS_64BIT = sys.maxsize > 2**32 | |
try: | |
import bz2 | |
HAS_BZ2 = True | |
except ImportError: | |
HAS_BZ2 = False | |
try: | |
import lzma | |
HAS_LZMA = True | |
except ImportError: | |
HAS_LZMA = False | |
def strptime(s, fmt=None): | |
""" | |
This function is available in the datetime module only from Python >= | |
2.5. | |
""" | |
if type(s) == bytes: | |
s = s.decode("latin1") | |
return datetime(*time.strptime(s, fmt)[:3]) | |
class RoundtripTest: | |
def roundtrip(self, save_func, *args, **kwargs): | |
""" | |
save_func : callable | |
Function used to save arrays to file. | |
file_on_disk : bool | |
If true, store the file on disk, instead of in a | |
string buffer. | |
save_kwds : dict | |
Parameters passed to `save_func`. | |
load_kwds : dict | |
Parameters passed to `numpy.load`. | |
args : tuple of arrays | |
Arrays stored to file. | |
""" | |
save_kwds = kwargs.get('save_kwds', {}) | |
load_kwds = kwargs.get('load_kwds', {"allow_pickle": True}) | |
file_on_disk = kwargs.get('file_on_disk', False) | |
if file_on_disk: | |
target_file = NamedTemporaryFile(delete=False) | |
load_file = target_file.name | |
else: | |
target_file = BytesIO() | |
load_file = target_file | |
try: | |
arr = args | |
save_func(target_file, *arr, **save_kwds) | |
target_file.flush() | |
target_file.seek(0) | |
if sys.platform == 'win32' and not isinstance(target_file, BytesIO): | |
target_file.close() | |
arr_reloaded = np.load(load_file, **load_kwds) | |
self.arr = arr | |
self.arr_reloaded = arr_reloaded | |
finally: | |
if not isinstance(target_file, BytesIO): | |
target_file.close() | |
# holds an open file descriptor so it can't be deleted on win | |
if 'arr_reloaded' in locals(): | |
if not isinstance(arr_reloaded, np.lib.npyio.NpzFile): | |
os.remove(target_file.name) | |
def check_roundtrips(self, a): | |
self.roundtrip(a) | |
self.roundtrip(a, file_on_disk=True) | |
self.roundtrip(np.asfortranarray(a)) | |
self.roundtrip(np.asfortranarray(a), file_on_disk=True) | |
if a.shape[0] > 1: | |
# neither C nor Fortran contiguous for 2D arrays or more | |
self.roundtrip(np.asfortranarray(a)[1:]) | |
self.roundtrip(np.asfortranarray(a)[1:], file_on_disk=True) | |
def test_array(self): | |
a = np.array([], float) | |
self.check_roundtrips(a) | |
a = np.array([[1, 2], [3, 4]], float) | |
self.check_roundtrips(a) | |
a = np.array([[1, 2], [3, 4]], int) | |
self.check_roundtrips(a) | |
a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.csingle) | |
self.check_roundtrips(a) | |
a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.cdouble) | |
self.check_roundtrips(a) | |
def test_array_object(self): | |
a = np.array([], object) | |
self.check_roundtrips(a) | |
a = np.array([[1, 2], [3, 4]], object) | |
self.check_roundtrips(a) | |
def test_1D(self): | |
a = np.array([1, 2, 3, 4], int) | |
self.roundtrip(a) | |
def test_mmap(self): | |
a = np.array([[1, 2.5], [4, 7.3]]) | |
self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'}) | |
a = np.asfortranarray([[1, 2.5], [4, 7.3]]) | |
self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'}) | |
def test_record(self): | |
a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) | |
self.check_roundtrips(a) | |
def test_format_2_0(self): | |
dt = [(("%d" % i) * 100, float) for i in range(500)] | |
a = np.ones(1000, dtype=dt) | |
with warnings.catch_warnings(record=True): | |
warnings.filterwarnings('always', '', UserWarning) | |
self.check_roundtrips(a) | |
class TestSaveLoad(RoundtripTest): | |
def roundtrip(self, *args, **kwargs): | |
RoundtripTest.roundtrip(self, np.save, *args, **kwargs) | |
assert_equal(self.arr[0], self.arr_reloaded) | |
assert_equal(self.arr[0].dtype, self.arr_reloaded.dtype) | |
assert_equal(self.arr[0].flags.fnc, self.arr_reloaded.flags.fnc) | |
class TestSavezLoad(RoundtripTest): | |
def roundtrip(self, *args, **kwargs): | |
RoundtripTest.roundtrip(self, np.savez, *args, **kwargs) | |
try: | |
for n, arr in enumerate(self.arr): | |
reloaded = self.arr_reloaded['arr_%d' % n] | |
assert_equal(arr, reloaded) | |
assert_equal(arr.dtype, reloaded.dtype) | |
assert_equal(arr.flags.fnc, reloaded.flags.fnc) | |
finally: | |
# delete tempfile, must be done here on windows | |
if self.arr_reloaded.fid: | |
self.arr_reloaded.fid.close() | |
os.remove(self.arr_reloaded.fid.name) | |
def test_big_arrays(self): | |
L = (1 << 31) + 100000 | |
a = np.empty(L, dtype=np.uint8) | |
with temppath(prefix="numpy_test_big_arrays_", suffix=".npz") as tmp: | |
np.savez(tmp, a=a) | |
del a | |
npfile = np.load(tmp) | |
a = npfile['a'] # Should succeed | |
npfile.close() | |
del a # Avoid pyflakes unused variable warning. | |
def test_multiple_arrays(self): | |
a = np.array([[1, 2], [3, 4]], float) | |
b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) | |
self.roundtrip(a, b) | |
def test_named_arrays(self): | |
a = np.array([[1, 2], [3, 4]], float) | |
b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) | |
c = BytesIO() | |
np.savez(c, file_a=a, file_b=b) | |
c.seek(0) | |
l = np.load(c) | |
assert_equal(a, l['file_a']) | |
assert_equal(b, l['file_b']) | |
def test_BagObj(self): | |
a = np.array([[1, 2], [3, 4]], float) | |
b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex) | |
c = BytesIO() | |
np.savez(c, file_a=a, file_b=b) | |
c.seek(0) | |
l = np.load(c) | |
assert_equal(sorted(dir(l.f)), ['file_a','file_b']) | |
assert_equal(a, l.f.file_a) | |
assert_equal(b, l.f.file_b) | |
def test_savez_filename_clashes(self): | |
# Test that issue #852 is fixed | |
# and savez functions in multithreaded environment | |
def writer(error_list): | |
with temppath(suffix='.npz') as tmp: | |
arr = np.random.randn(500, 500) | |
try: | |
np.savez(tmp, arr=arr) | |
except OSError as err: | |
error_list.append(err) | |
errors = [] | |
threads = [threading.Thread(target=writer, args=(errors,)) | |
for j in range(3)] | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
if errors: | |
raise AssertionError(errors) | |
def test_not_closing_opened_fid(self): | |
# Test that issue #2178 is fixed: | |
# verify could seek on 'loaded' file | |
with temppath(suffix='.npz') as tmp: | |
with open(tmp, 'wb') as fp: | |
np.savez(fp, data='LOVELY LOAD') | |
with open(tmp, 'rb', 10000) as fp: | |
fp.seek(0) | |
assert_(not fp.closed) | |
np.load(fp)['data'] | |
# fp must not get closed by .load | |
assert_(not fp.closed) | |
fp.seek(0) | |
assert_(not fp.closed) | |
def test_closing_fid(self): | |
# Test that issue #1517 (too many opened files) remains closed | |
# It might be a "weak" test since failed to get triggered on | |
# e.g. Debian sid of 2012 Jul 05 but was reported to | |
# trigger the failure on Ubuntu 10.04: | |
# http://projects.scipy.org/numpy/ticket/1517#comment:2 | |
with temppath(suffix='.npz') as tmp: | |
np.savez(tmp, data='LOVELY LOAD') | |
# We need to check if the garbage collector can properly close | |
# numpy npz file returned by np.load when their reference count | |
# goes to zero. Python 3 running in debug mode raises a | |
# ResourceWarning when file closing is left to the garbage | |
# collector, so we catch the warnings. | |
with suppress_warnings() as sup: | |
sup.filter(ResourceWarning) # TODO: specify exact message | |
for i in range(1, 1025): | |
try: | |
np.load(tmp)["data"] | |
except Exception as e: | |
msg = "Failed to load data from a file: %s" % e | |
raise AssertionError(msg) | |
finally: | |
if IS_PYPY: | |
gc.collect() | |
def test_closing_zipfile_after_load(self): | |
# Check that zipfile owns file and can close it. This needs to | |
# pass a file name to load for the test. On windows failure will | |
# cause a second error will be raised when the attempt to remove | |
# the open file is made. | |
prefix = 'numpy_test_closing_zipfile_after_load_' | |
with temppath(suffix='.npz', prefix=prefix) as tmp: | |
np.savez(tmp, lab='place holder') | |
data = np.load(tmp) | |
fp = data.zip.fp | |
data.close() | |
assert_(fp.closed) | |
class TestSaveTxt: | |
def test_array(self): | |
a = np.array([[1, 2], [3, 4]], float) | |
fmt = "%.18e" | |
c = BytesIO() | |
np.savetxt(c, a, fmt=fmt) | |
c.seek(0) | |
assert_equal(c.readlines(), | |
[asbytes((fmt + ' ' + fmt + '\n') % (1, 2)), | |
asbytes((fmt + ' ' + fmt + '\n') % (3, 4))]) | |
a = np.array([[1, 2], [3, 4]], int) | |
c = BytesIO() | |
np.savetxt(c, a, fmt='%d') | |
c.seek(0) | |
assert_equal(c.readlines(), [b'1 2\n', b'3 4\n']) | |
def test_1D(self): | |
a = np.array([1, 2, 3, 4], int) | |
c = BytesIO() | |
np.savetxt(c, a, fmt='%d') | |
c.seek(0) | |
lines = c.readlines() | |
assert_equal(lines, [b'1\n', b'2\n', b'3\n', b'4\n']) | |
def test_0D_3D(self): | |
c = BytesIO() | |
assert_raises(ValueError, np.savetxt, c, np.array(1)) | |
assert_raises(ValueError, np.savetxt, c, np.array([[[1], [2]]])) | |
def test_structured(self): | |
a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) | |
c = BytesIO() | |
np.savetxt(c, a, fmt='%d') | |
c.seek(0) | |
assert_equal(c.readlines(), [b'1 2\n', b'3 4\n']) | |
def test_structured_padded(self): | |
# gh-13297 | |
a = np.array([(1, 2, 3),(4, 5, 6)], dtype=[ | |
('foo', 'i4'), ('bar', 'i4'), ('baz', 'i4') | |
]) | |
c = BytesIO() | |
np.savetxt(c, a[['foo', 'baz']], fmt='%d') | |
c.seek(0) | |
assert_equal(c.readlines(), [b'1 3\n', b'4 6\n']) | |
def test_multifield_view(self): | |
a = np.ones(1, dtype=[('x', 'i4'), ('y', 'i4'), ('z', 'f4')]) | |
v = a[['x', 'z']] | |
with temppath(suffix='.npy') as path: | |
path = Path(path) | |
np.save(path, v) | |
data = np.load(path) | |
assert_array_equal(data, v) | |
def test_delimiter(self): | |
a = np.array([[1., 2.], [3., 4.]]) | |
c = BytesIO() | |
np.savetxt(c, a, delimiter=',', fmt='%d') | |
c.seek(0) | |
assert_equal(c.readlines(), [b'1,2\n', b'3,4\n']) | |
def test_format(self): | |
a = np.array([(1, 2), (3, 4)]) | |
c = BytesIO() | |
# Sequence of formats | |
np.savetxt(c, a, fmt=['%02d', '%3.1f']) | |
c.seek(0) | |
assert_equal(c.readlines(), [b'01 2.0\n', b'03 4.0\n']) | |
# A single multiformat string | |
c = BytesIO() | |
np.savetxt(c, a, fmt='%02d : %3.1f') | |
c.seek(0) | |
lines = c.readlines() | |
assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n']) | |
# Specify delimiter, should be overridden | |
c = BytesIO() | |
np.savetxt(c, a, fmt='%02d : %3.1f', delimiter=',') | |
c.seek(0) | |
lines = c.readlines() | |
assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n']) | |
# Bad fmt, should raise a ValueError | |
c = BytesIO() | |
assert_raises(ValueError, np.savetxt, c, a, fmt=99) | |
def test_header_footer(self): | |
# Test the functionality of the header and footer keyword argument. | |
c = BytesIO() | |
a = np.array([(1, 2), (3, 4)], dtype=int) | |
test_header_footer = 'Test header / footer' | |
# Test the header keyword argument | |
np.savetxt(c, a, fmt='%1d', header=test_header_footer) | |
c.seek(0) | |
assert_equal(c.read(), | |
asbytes('# ' + test_header_footer + '\n1 2\n3 4\n')) | |
# Test the footer keyword argument | |
c = BytesIO() | |
np.savetxt(c, a, fmt='%1d', footer=test_header_footer) | |
c.seek(0) | |
assert_equal(c.read(), | |
asbytes('1 2\n3 4\n# ' + test_header_footer + '\n')) | |
# Test the commentstr keyword argument used on the header | |
c = BytesIO() | |
commentstr = '% ' | |
np.savetxt(c, a, fmt='%1d', | |
header=test_header_footer, comments=commentstr) | |
c.seek(0) | |
assert_equal(c.read(), | |
asbytes(commentstr + test_header_footer + '\n' + '1 2\n3 4\n')) | |
# Test the commentstr keyword argument used on the footer | |
c = BytesIO() | |
commentstr = '% ' | |
np.savetxt(c, a, fmt='%1d', | |
footer=test_header_footer, comments=commentstr) | |
c.seek(0) | |
assert_equal(c.read(), | |
asbytes('1 2\n3 4\n' + commentstr + test_header_footer + '\n')) | |
def test_file_roundtrip(self): | |
with temppath() as name: | |
a = np.array([(1, 2), (3, 4)]) | |
np.savetxt(name, a) | |
b = np.loadtxt(name) | |
assert_array_equal(a, b) | |
def test_complex_arrays(self): | |
ncols = 2 | |
nrows = 2 | |
a = np.zeros((ncols, nrows), dtype=np.complex128) | |
re = np.pi | |
im = np.e | |
a[:] = re + 1.0j * im | |
# One format only | |
c = BytesIO() | |
np.savetxt(c, a, fmt=' %+.3e') | |
c.seek(0) | |
lines = c.readlines() | |
assert_equal( | |
lines, | |
[b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n', | |
b' ( +3.142e+00+ +2.718e+00j) ( +3.142e+00+ +2.718e+00j)\n']) | |
# One format for each real and imaginary part | |
c = BytesIO() | |
np.savetxt(c, a, fmt=' %+.3e' * 2 * ncols) | |
c.seek(0) | |
lines = c.readlines() | |
assert_equal( | |
lines, | |
[b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n', | |
b' +3.142e+00 +2.718e+00 +3.142e+00 +2.718e+00\n']) | |
# One format for each complex number | |
c = BytesIO() | |
np.savetxt(c, a, fmt=['(%.3e%+.3ej)'] * ncols) | |
c.seek(0) | |
lines = c.readlines() | |
assert_equal( | |
lines, | |
[b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n', | |
b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n']) | |
def test_complex_negative_exponent(self): | |
# Previous to 1.15, some formats generated x+-yj, gh 7895 | |
ncols = 2 | |
nrows = 2 | |
a = np.zeros((ncols, nrows), dtype=np.complex128) | |
re = np.pi | |
im = np.e | |
a[:] = re - 1.0j * im | |
c = BytesIO() | |
np.savetxt(c, a, fmt='%.3e') | |
c.seek(0) | |
lines = c.readlines() | |
assert_equal( | |
lines, | |
[b' (3.142e+00-2.718e+00j) (3.142e+00-2.718e+00j)\n', | |
b' (3.142e+00-2.718e+00j) (3.142e+00-2.718e+00j)\n']) | |
def test_custom_writer(self): | |
class CustomWriter(list): | |
def write(self, text): | |
self.extend(text.split(b'\n')) | |
w = CustomWriter() | |
a = np.array([(1, 2), (3, 4)]) | |
np.savetxt(w, a) | |
b = np.loadtxt(w) | |
assert_array_equal(a, b) | |
def test_unicode(self): | |
utf8 = b'\xcf\x96'.decode('UTF-8') | |
a = np.array([utf8], dtype=np.unicode_) | |
with tempdir() as tmpdir: | |
# set encoding as on windows it may not be unicode even on py3 | |
np.savetxt(os.path.join(tmpdir, 'test.csv'), a, fmt=['%s'], | |
encoding='UTF-8') | |
def test_unicode_roundtrip(self): | |
utf8 = b'\xcf\x96'.decode('UTF-8') | |
a = np.array([utf8], dtype=np.unicode_) | |
# our gz wrapper support encoding | |
suffixes = ['', '.gz'] | |
if HAS_BZ2: | |
suffixes.append('.bz2') | |
if HAS_LZMA: | |
suffixes.extend(['.xz', '.lzma']) | |
with tempdir() as tmpdir: | |
for suffix in suffixes: | |
np.savetxt(os.path.join(tmpdir, 'test.csv' + suffix), a, | |
fmt=['%s'], encoding='UTF-16-LE') | |
b = np.loadtxt(os.path.join(tmpdir, 'test.csv' + suffix), | |
encoding='UTF-16-LE', dtype=np.unicode_) | |
assert_array_equal(a, b) | |
def test_unicode_bytestream(self): | |
utf8 = b'\xcf\x96'.decode('UTF-8') | |
a = np.array([utf8], dtype=np.unicode_) | |
s = BytesIO() | |
np.savetxt(s, a, fmt=['%s'], encoding='UTF-8') | |
s.seek(0) | |
assert_equal(s.read().decode('UTF-8'), utf8 + '\n') | |
def test_unicode_stringstream(self): | |
utf8 = b'\xcf\x96'.decode('UTF-8') | |
a = np.array([utf8], dtype=np.unicode_) | |
s = StringIO() | |
np.savetxt(s, a, fmt=['%s'], encoding='UTF-8') | |
s.seek(0) | |
assert_equal(s.read(), utf8 + '\n') | |
def test_unicode_and_bytes_fmt(self, fmt, iotype): | |
# string type of fmt should not matter, see also gh-4053 | |
a = np.array([1.]) | |
s = iotype() | |
np.savetxt(s, a, fmt=fmt) | |
s.seek(0) | |
if iotype is StringIO: | |
assert_equal(s.read(), u"%f\n" % 1.) | |
else: | |
assert_equal(s.read(), b"%f\n" % 1.) | |
def test_large_zip(self): | |
def check_large_zip(memoryerror_raised): | |
memoryerror_raised.value = False | |
try: | |
# The test takes at least 6GB of memory, writes a file larger | |
# than 4GB. This tests the ``allowZip64`` kwarg to ``zipfile`` | |
test_data = np.asarray([np.random.rand( | |
np.random.randint(50,100),4) | |
for i in range(800000)], dtype=object) | |
with tempdir() as tmpdir: | |
np.savez(os.path.join(tmpdir, 'test.npz'), | |
test_data=test_data) | |
except MemoryError: | |
memoryerror_raised.value = True | |
raise | |
# run in a subprocess to ensure memory is released on PyPy, see gh-15775 | |
# Use an object in shared memory to re-raise the MemoryError exception | |
# in our process if needed, see gh-16889 | |
memoryerror_raised = Value(c_bool) | |
p = Process(target=check_large_zip, args=(memoryerror_raised,)) | |
p.start() | |
p.join() | |
if memoryerror_raised.value: | |
raise MemoryError("Child process raised a MemoryError exception") | |
# -9 indicates a SIGKILL, probably an OOM. | |
if p.exitcode == -9: | |
pytest.xfail("subprocess got a SIGKILL, apparently free memory was not sufficient") | |
assert p.exitcode == 0 | |
class LoadTxtBase: | |
def check_compressed(self, fopen, suffixes): | |
# Test that we can load data from a compressed file | |
wanted = np.arange(6).reshape((2, 3)) | |
linesep = ('\n', '\r\n', '\r') | |
for sep in linesep: | |
data = '0 1 2' + sep + '3 4 5' | |
for suffix in suffixes: | |
with temppath(suffix=suffix) as name: | |
with fopen(name, mode='wt', encoding='UTF-32-LE') as f: | |
f.write(data) | |
res = self.loadfunc(name, encoding='UTF-32-LE') | |
assert_array_equal(res, wanted) | |
with fopen(name, "rt", encoding='UTF-32-LE') as f: | |
res = self.loadfunc(f) | |
assert_array_equal(res, wanted) | |
def test_compressed_gzip(self): | |
self.check_compressed(gzip.open, ('.gz',)) | |
def test_compressed_bz2(self): | |
self.check_compressed(bz2.open, ('.bz2',)) | |
def test_compressed_lzma(self): | |
self.check_compressed(lzma.open, ('.xz', '.lzma')) | |
def test_encoding(self): | |
with temppath() as path: | |
with open(path, "wb") as f: | |
f.write('0.\n1.\n2.'.encode("UTF-16")) | |
x = self.loadfunc(path, encoding="UTF-16") | |
assert_array_equal(x, [0., 1., 2.]) | |
def test_stringload(self): | |
# umlaute | |
nonascii = b'\xc3\xb6\xc3\xbc\xc3\xb6'.decode("UTF-8") | |
with temppath() as path: | |
with open(path, "wb") as f: | |
f.write(nonascii.encode("UTF-16")) | |
x = self.loadfunc(path, encoding="UTF-16", dtype=np.unicode_) | |
assert_array_equal(x, nonascii) | |
def test_binary_decode(self): | |
utf16 = b'\xff\xfeh\x04 \x00i\x04 \x00j\x04' | |
v = self.loadfunc(BytesIO(utf16), dtype=np.unicode_, encoding='UTF-16') | |
assert_array_equal(v, np.array(utf16.decode('UTF-16').split())) | |
def test_converters_decode(self): | |
# test converters that decode strings | |
c = TextIO() | |
c.write(b'\xcf\x96') | |
c.seek(0) | |
x = self.loadfunc(c, dtype=np.unicode_, | |
converters={0: lambda x: x.decode('UTF-8')}) | |
a = np.array([b'\xcf\x96'.decode('UTF-8')]) | |
assert_array_equal(x, a) | |
def test_converters_nodecode(self): | |
# test native string converters enabled by setting an encoding | |
utf8 = b'\xcf\x96'.decode('UTF-8') | |
with temppath() as path: | |
with io.open(path, 'wt', encoding='UTF-8') as f: | |
f.write(utf8) | |
x = self.loadfunc(path, dtype=np.unicode_, | |
converters={0: lambda x: x + 't'}, | |
encoding='UTF-8') | |
a = np.array([utf8 + 't']) | |
assert_array_equal(x, a) | |
class TestLoadTxt(LoadTxtBase): | |
loadfunc = staticmethod(np.loadtxt) | |
def setup(self): | |
# lower chunksize for testing | |
self.orig_chunk = np.lib.npyio._loadtxt_chunksize | |
np.lib.npyio._loadtxt_chunksize = 1 | |
def teardown(self): | |
np.lib.npyio._loadtxt_chunksize = self.orig_chunk | |
def test_record(self): | |
c = TextIO() | |
c.write('1 2\n3 4') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=[('x', np.int32), ('y', np.int32)]) | |
a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) | |
assert_array_equal(x, a) | |
d = TextIO() | |
d.write('M 64.0 75.0\nF 25.0 60.0') | |
d.seek(0) | |
mydescriptor = {'names': ('gender', 'age', 'weight'), | |
'formats': ('S1', 'i4', 'f4')} | |
b = np.array([('M', 64.0, 75.0), | |
('F', 25.0, 60.0)], dtype=mydescriptor) | |
y = np.loadtxt(d, dtype=mydescriptor) | |
assert_array_equal(y, b) | |
def test_array(self): | |
c = TextIO() | |
c.write('1 2\n3 4') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int) | |
a = np.array([[1, 2], [3, 4]], int) | |
assert_array_equal(x, a) | |
c.seek(0) | |
x = np.loadtxt(c, dtype=float) | |
a = np.array([[1, 2], [3, 4]], float) | |
assert_array_equal(x, a) | |
def test_1D(self): | |
c = TextIO() | |
c.write('1\n2\n3\n4\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int) | |
a = np.array([1, 2, 3, 4], int) | |
assert_array_equal(x, a) | |
c = TextIO() | |
c.write('1,2,3,4\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',') | |
a = np.array([1, 2, 3, 4], int) | |
assert_array_equal(x, a) | |
def test_missing(self): | |
c = TextIO() | |
c.write('1,2,3,,5\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
converters={3: lambda s: int(s or - 999)}) | |
a = np.array([1, 2, 3, -999, 5], int) | |
assert_array_equal(x, a) | |
def test_converters_with_usecols(self): | |
c = TextIO() | |
c.write('1,2,3,,5\n6,7,8,9,10\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
converters={3: lambda s: int(s or - 999)}, | |
usecols=(1, 3,)) | |
a = np.array([[2, -999], [7, 9]], int) | |
assert_array_equal(x, a) | |
def test_comments_unicode(self): | |
c = TextIO() | |
c.write('# comment\n1,2,3,5\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
comments=u'#') | |
a = np.array([1, 2, 3, 5], int) | |
assert_array_equal(x, a) | |
def test_comments_byte(self): | |
c = TextIO() | |
c.write('# comment\n1,2,3,5\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
comments=b'#') | |
a = np.array([1, 2, 3, 5], int) | |
assert_array_equal(x, a) | |
def test_comments_multiple(self): | |
c = TextIO() | |
c.write('# comment\n1,2,3\n@ comment2\n4,5,6 // comment3') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
comments=['#', '@', '//']) | |
a = np.array([[1, 2, 3], [4, 5, 6]], int) | |
assert_array_equal(x, a) | |
def test_comments_multi_chars(self): | |
c = TextIO() | |
c.write('/* comment\n1,2,3,5\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
comments='/*') | |
a = np.array([1, 2, 3, 5], int) | |
assert_array_equal(x, a) | |
# Check that '/*' is not transformed to ['/', '*'] | |
c = TextIO() | |
c.write('*/ comment\n1,2,3,5\n') | |
c.seek(0) | |
assert_raises(ValueError, np.loadtxt, c, dtype=int, delimiter=',', | |
comments='/*') | |
def test_skiprows(self): | |
c = TextIO() | |
c.write('comment\n1,2,3,5\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
skiprows=1) | |
a = np.array([1, 2, 3, 5], int) | |
assert_array_equal(x, a) | |
c = TextIO() | |
c.write('# comment\n1,2,3,5\n') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
skiprows=1) | |
a = np.array([1, 2, 3, 5], int) | |
assert_array_equal(x, a) | |
def test_usecols(self): | |
a = np.array([[1, 2], [3, 4]], float) | |
c = BytesIO() | |
np.savetxt(c, a) | |
c.seek(0) | |
x = np.loadtxt(c, dtype=float, usecols=(1,)) | |
assert_array_equal(x, a[:, 1]) | |
a = np.array([[1, 2, 3], [3, 4, 5]], float) | |
c = BytesIO() | |
np.savetxt(c, a) | |
c.seek(0) | |
x = np.loadtxt(c, dtype=float, usecols=(1, 2)) | |
assert_array_equal(x, a[:, 1:]) | |
# Testing with arrays instead of tuples. | |
c.seek(0) | |
x = np.loadtxt(c, dtype=float, usecols=np.array([1, 2])) | |
assert_array_equal(x, a[:, 1:]) | |
# Testing with an integer instead of a sequence | |
for int_type in [int, np.int8, np.int16, | |
np.int32, np.int64, np.uint8, np.uint16, | |
np.uint32, np.uint64]: | |
to_read = int_type(1) | |
c.seek(0) | |
x = np.loadtxt(c, dtype=float, usecols=to_read) | |
assert_array_equal(x, a[:, 1]) | |
# Testing with some crazy custom integer type | |
class CrazyInt: | |
def __index__(self): | |
return 1 | |
crazy_int = CrazyInt() | |
c.seek(0) | |
x = np.loadtxt(c, dtype=float, usecols=crazy_int) | |
assert_array_equal(x, a[:, 1]) | |
c.seek(0) | |
x = np.loadtxt(c, dtype=float, usecols=(crazy_int,)) | |
assert_array_equal(x, a[:, 1]) | |
# Checking with dtypes defined converters. | |
data = '''JOE 70.1 25.3 | |
BOB 60.5 27.9 | |
''' | |
c = TextIO(data) | |
names = ['stid', 'temp'] | |
dtypes = ['S4', 'f8'] | |
arr = np.loadtxt(c, usecols=(0, 2), dtype=list(zip(names, dtypes))) | |
assert_equal(arr['stid'], [b"JOE", b"BOB"]) | |
assert_equal(arr['temp'], [25.3, 27.9]) | |
# Testing non-ints in usecols | |
c.seek(0) | |
bogus_idx = 1.5 | |
assert_raises_regex( | |
TypeError, | |
'^usecols must be.*%s' % type(bogus_idx), | |
np.loadtxt, c, usecols=bogus_idx | |
) | |
assert_raises_regex( | |
TypeError, | |
'^usecols must be.*%s' % type(bogus_idx), | |
np.loadtxt, c, usecols=[0, bogus_idx, 0] | |
) | |
def test_fancy_dtype(self): | |
c = TextIO() | |
c.write('1,2,3.0\n4,5,6.0\n') | |
c.seek(0) | |
dt = np.dtype([('x', int), ('y', [('t', int), ('s', float)])]) | |
x = np.loadtxt(c, dtype=dt, delimiter=',') | |
a = np.array([(1, (2, 3.0)), (4, (5, 6.0))], dt) | |
assert_array_equal(x, a) | |
def test_shaped_dtype(self): | |
c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6") | |
dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), | |
('block', int, (2, 3))]) | |
x = np.loadtxt(c, dtype=dt) | |
a = np.array([('aaaa', 1.0, 8.0, [[1, 2, 3], [4, 5, 6]])], | |
dtype=dt) | |
assert_array_equal(x, a) | |
def test_3d_shaped_dtype(self): | |
c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6 7 8 9 10 11 12") | |
dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), | |
('block', int, (2, 2, 3))]) | |
x = np.loadtxt(c, dtype=dt) | |
a = np.array([('aaaa', 1.0, 8.0, | |
[[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]])], | |
dtype=dt) | |
assert_array_equal(x, a) | |
def test_str_dtype(self): | |
# see gh-8033 | |
c = ["str1", "str2"] | |
for dt in (str, np.bytes_): | |
a = np.array(["str1", "str2"], dtype=dt) | |
x = np.loadtxt(c, dtype=dt) | |
assert_array_equal(x, a) | |
def test_empty_file(self): | |
with suppress_warnings() as sup: | |
sup.filter(message="loadtxt: Empty input file:") | |
c = TextIO() | |
x = np.loadtxt(c) | |
assert_equal(x.shape, (0,)) | |
x = np.loadtxt(c, dtype=np.int64) | |
assert_equal(x.shape, (0,)) | |
assert_(x.dtype == np.int64) | |
def test_unused_converter(self): | |
c = TextIO() | |
c.writelines(['1 21\n', '3 42\n']) | |
c.seek(0) | |
data = np.loadtxt(c, usecols=(1,), | |
converters={0: lambda s: int(s, 16)}) | |
assert_array_equal(data, [21, 42]) | |
c.seek(0) | |
data = np.loadtxt(c, usecols=(1,), | |
converters={1: lambda s: int(s, 16)}) | |
assert_array_equal(data, [33, 66]) | |
def test_dtype_with_object(self): | |
# Test using an explicit dtype with an object | |
data = """ 1; 2001-01-01 | |
2; 2002-01-31 """ | |
ndtype = [('idx', int), ('code', object)] | |
func = lambda s: strptime(s.strip(), "%Y-%m-%d") | |
converters = {1: func} | |
test = np.loadtxt(TextIO(data), delimiter=";", dtype=ndtype, | |
converters=converters) | |
control = np.array( | |
[(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))], | |
dtype=ndtype) | |
assert_equal(test, control) | |
def test_uint64_type(self): | |
tgt = (9223372043271415339, 9223372043271415853) | |
c = TextIO() | |
c.write("%s %s" % tgt) | |
c.seek(0) | |
res = np.loadtxt(c, dtype=np.uint64) | |
assert_equal(res, tgt) | |
def test_int64_type(self): | |
tgt = (-9223372036854775807, 9223372036854775807) | |
c = TextIO() | |
c.write("%s %s" % tgt) | |
c.seek(0) | |
res = np.loadtxt(c, dtype=np.int64) | |
assert_equal(res, tgt) | |
def test_from_float_hex(self): | |
# IEEE doubles and floats only, otherwise the float32 | |
# conversion may fail. | |
tgt = np.logspace(-10, 10, 5).astype(np.float32) | |
tgt = np.hstack((tgt, -tgt)).astype(float) | |
inp = '\n'.join(map(float.hex, tgt)) | |
c = TextIO() | |
c.write(inp) | |
for dt in [float, np.float32]: | |
c.seek(0) | |
res = np.loadtxt(c, dtype=dt) | |
assert_equal(res, tgt, err_msg="%s" % dt) | |
def test_from_complex(self): | |
tgt = (complex(1, 1), complex(1, -1)) | |
c = TextIO() | |
c.write("%s %s" % tgt) | |
c.seek(0) | |
res = np.loadtxt(c, dtype=complex) | |
assert_equal(res, tgt) | |
def test_complex_misformatted(self): | |
# test for backward compatibility | |
# some complex formats used to generate x+-yj | |
a = np.zeros((2, 2), dtype=np.complex128) | |
re = np.pi | |
im = np.e | |
a[:] = re - 1.0j * im | |
c = BytesIO() | |
np.savetxt(c, a, fmt='%.16e') | |
c.seek(0) | |
txt = c.read() | |
c.seek(0) | |
# misformat the sign on the imaginary part, gh 7895 | |
txt_bad = txt.replace(b'e+00-', b'e00+-') | |
assert_(txt_bad != txt) | |
c.write(txt_bad) | |
c.seek(0) | |
res = np.loadtxt(c, dtype=complex) | |
assert_equal(res, a) | |
def test_universal_newline(self): | |
with temppath() as name: | |
with open(name, 'w') as f: | |
f.write('1 21\r3 42\r') | |
data = np.loadtxt(name) | |
assert_array_equal(data, [[1, 21], [3, 42]]) | |
def test_empty_field_after_tab(self): | |
c = TextIO() | |
c.write('1 \t2 \t3\tstart \n4\t5\t6\t \n7\t8\t9.5\t') | |
c.seek(0) | |
dt = {'names': ('x', 'y', 'z', 'comment'), | |
'formats': ('<i4', '<i4', '<f4', '|S8')} | |
x = np.loadtxt(c, dtype=dt, delimiter='\t') | |
a = np.array([b'start ', b' ', b'']) | |
assert_array_equal(x['comment'], a) | |
def test_unpack_structured(self): | |
txt = TextIO("M 21 72\nF 35 58") | |
dt = {'names': ('a', 'b', 'c'), 'formats': ('|S1', '<i4', '<f4')} | |
a, b, c = np.loadtxt(txt, dtype=dt, unpack=True) | |
assert_(a.dtype.str == '|S1') | |
assert_(b.dtype.str == '<i4') | |
assert_(c.dtype.str == '<f4') | |
assert_array_equal(a, np.array([b'M', b'F'])) | |
assert_array_equal(b, np.array([21, 35])) | |
assert_array_equal(c, np.array([72., 58.])) | |
def test_ndmin_keyword(self): | |
c = TextIO() | |
c.write('1,2,3\n4,5,6') | |
c.seek(0) | |
assert_raises(ValueError, np.loadtxt, c, ndmin=3) | |
c.seek(0) | |
assert_raises(ValueError, np.loadtxt, c, ndmin=1.5) | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', ndmin=1) | |
a = np.array([[1, 2, 3], [4, 5, 6]]) | |
assert_array_equal(x, a) | |
d = TextIO() | |
d.write('0,1,2') | |
d.seek(0) | |
x = np.loadtxt(d, dtype=int, delimiter=',', ndmin=2) | |
assert_(x.shape == (1, 3)) | |
d.seek(0) | |
x = np.loadtxt(d, dtype=int, delimiter=',', ndmin=1) | |
assert_(x.shape == (3,)) | |
d.seek(0) | |
x = np.loadtxt(d, dtype=int, delimiter=',', ndmin=0) | |
assert_(x.shape == (3,)) | |
e = TextIO() | |
e.write('0\n1\n2') | |
e.seek(0) | |
x = np.loadtxt(e, dtype=int, delimiter=',', ndmin=2) | |
assert_(x.shape == (3, 1)) | |
e.seek(0) | |
x = np.loadtxt(e, dtype=int, delimiter=',', ndmin=1) | |
assert_(x.shape == (3,)) | |
e.seek(0) | |
x = np.loadtxt(e, dtype=int, delimiter=',', ndmin=0) | |
assert_(x.shape == (3,)) | |
# Test ndmin kw with empty file. | |
with suppress_warnings() as sup: | |
sup.filter(message="loadtxt: Empty input file:") | |
f = TextIO() | |
assert_(np.loadtxt(f, ndmin=2).shape == (0, 1,)) | |
assert_(np.loadtxt(f, ndmin=1).shape == (0,)) | |
def test_generator_source(self): | |
def count(): | |
for i in range(10): | |
yield "%d" % i | |
res = np.loadtxt(count()) | |
assert_array_equal(res, np.arange(10)) | |
def test_bad_line(self): | |
c = TextIO() | |
c.write('1 2 3\n4 5 6\n2 3') | |
c.seek(0) | |
# Check for exception and that exception contains line number | |
assert_raises_regex(ValueError, "3", np.loadtxt, c) | |
def test_none_as_string(self): | |
# gh-5155, None should work as string when format demands it | |
c = TextIO() | |
c.write('100,foo,200\n300,None,400') | |
c.seek(0) | |
dt = np.dtype([('x', int), ('a', 'S10'), ('y', int)]) | |
np.loadtxt(c, delimiter=',', dtype=dt, comments=None) # Should succeed | |
def test_binary_load(self): | |
butf8 = b"5,6,7,\xc3\x95scarscar\n\r15,2,3,hello\n\r"\ | |
b"20,2,3,\xc3\x95scar\n\r" | |
sutf8 = butf8.decode("UTF-8").replace("\r", "").splitlines() | |
with temppath() as path: | |
with open(path, "wb") as f: | |
f.write(butf8) | |
with open(path, "rb") as f: | |
x = np.loadtxt(f, encoding="UTF-8", dtype=np.unicode_) | |
assert_array_equal(x, sutf8) | |
# test broken latin1 conversion people now rely on | |
with open(path, "rb") as f: | |
x = np.loadtxt(f, encoding="UTF-8", dtype="S") | |
x = [b'5,6,7,\xc3\x95scarscar', b'15,2,3,hello', b'20,2,3,\xc3\x95scar'] | |
assert_array_equal(x, np.array(x, dtype="S")) | |
def test_max_rows(self): | |
c = TextIO() | |
c.write('1,2,3,5\n4,5,7,8\n2,1,4,5') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
max_rows=1) | |
a = np.array([1, 2, 3, 5], int) | |
assert_array_equal(x, a) | |
def test_max_rows_with_skiprows(self): | |
c = TextIO() | |
c.write('comments\n1,2,3,5\n4,5,7,8\n2,1,4,5') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
skiprows=1, max_rows=1) | |
a = np.array([1, 2, 3, 5], int) | |
assert_array_equal(x, a) | |
c = TextIO() | |
c.write('comment\n1,2,3,5\n4,5,7,8\n2,1,4,5') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
skiprows=1, max_rows=2) | |
a = np.array([[1, 2, 3, 5], [4, 5, 7, 8]], int) | |
assert_array_equal(x, a) | |
def test_max_rows_with_read_continuation(self): | |
c = TextIO() | |
c.write('1,2,3,5\n4,5,7,8\n2,1,4,5') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
max_rows=2) | |
a = np.array([[1, 2, 3, 5], [4, 5, 7, 8]], int) | |
assert_array_equal(x, a) | |
# test continuation | |
x = np.loadtxt(c, dtype=int, delimiter=',') | |
a = np.array([2,1,4,5], int) | |
assert_array_equal(x, a) | |
def test_max_rows_larger(self): | |
#test max_rows > num rows | |
c = TextIO() | |
c.write('comment\n1,2,3,5\n4,5,7,8\n2,1,4,5') | |
c.seek(0) | |
x = np.loadtxt(c, dtype=int, delimiter=',', | |
skiprows=1, max_rows=6) | |
a = np.array([[1, 2, 3, 5], [4, 5, 7, 8], [2, 1, 4, 5]], int) | |
assert_array_equal(x, a) | |
class Testfromregex: | |
def test_record(self): | |
c = TextIO() | |
c.write('1.312 foo\n1.534 bar\n4.444 qux') | |
c.seek(0) | |
dt = [('num', np.float64), ('val', 'S3')] | |
x = np.fromregex(c, r"([0-9.]+)\s+(...)", dt) | |
a = np.array([(1.312, 'foo'), (1.534, 'bar'), (4.444, 'qux')], | |
dtype=dt) | |
assert_array_equal(x, a) | |
def test_record_2(self): | |
c = TextIO() | |
c.write('1312 foo\n1534 bar\n4444 qux') | |
c.seek(0) | |
dt = [('num', np.int32), ('val', 'S3')] | |
x = np.fromregex(c, r"(\d+)\s+(...)", dt) | |
a = np.array([(1312, 'foo'), (1534, 'bar'), (4444, 'qux')], | |
dtype=dt) | |
assert_array_equal(x, a) | |
def test_record_3(self): | |
c = TextIO() | |
c.write('1312 foo\n1534 bar\n4444 qux') | |
c.seek(0) | |
dt = [('num', np.float64)] | |
x = np.fromregex(c, r"(\d+)\s+...", dt) | |
a = np.array([(1312,), (1534,), (4444,)], dtype=dt) | |
assert_array_equal(x, a) | |
def test_record_unicode(self): | |
utf8 = b'\xcf\x96' | |
with temppath() as path: | |
with open(path, 'wb') as f: | |
f.write(b'1.312 foo' + utf8 + b' \n1.534 bar\n4.444 qux') | |
dt = [('num', np.float64), ('val', 'U4')] | |
x = np.fromregex(path, r"(?u)([0-9.]+)\s+(\w+)", dt, encoding='UTF-8') | |
a = np.array([(1.312, 'foo' + utf8.decode('UTF-8')), (1.534, 'bar'), | |
(4.444, 'qux')], dtype=dt) | |
assert_array_equal(x, a) | |
regexp = re.compile(r"([0-9.]+)\s+(\w+)", re.UNICODE) | |
x = np.fromregex(path, regexp, dt, encoding='UTF-8') | |
assert_array_equal(x, a) | |
def test_compiled_bytes(self): | |
regexp = re.compile(b'(\\d)') | |
c = BytesIO(b'123') | |
dt = [('num', np.float64)] | |
a = np.array([1, 2, 3], dtype=dt) | |
x = np.fromregex(c, regexp, dt) | |
assert_array_equal(x, a) | |
#####-------------------------------------------------------------------------- | |
class TestFromTxt(LoadTxtBase): | |
loadfunc = staticmethod(np.genfromtxt) | |
def test_record(self): | |
# Test w/ explicit dtype | |
data = TextIO('1 2\n3 4') | |
test = np.genfromtxt(data, dtype=[('x', np.int32), ('y', np.int32)]) | |
control = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')]) | |
assert_equal(test, control) | |
# | |
data = TextIO('M 64.0 75.0\nF 25.0 60.0') | |
descriptor = {'names': ('gender', 'age', 'weight'), | |
'formats': ('S1', 'i4', 'f4')} | |
control = np.array([('M', 64.0, 75.0), ('F', 25.0, 60.0)], | |
dtype=descriptor) | |
test = np.genfromtxt(data, dtype=descriptor) | |
assert_equal(test, control) | |
def test_array(self): | |
# Test outputting a standard ndarray | |
data = TextIO('1 2\n3 4') | |
control = np.array([[1, 2], [3, 4]], dtype=int) | |
test = np.genfromtxt(data, dtype=int) | |
assert_array_equal(test, control) | |
# | |
data.seek(0) | |
control = np.array([[1, 2], [3, 4]], dtype=float) | |
test = np.loadtxt(data, dtype=float) | |
assert_array_equal(test, control) | |
def test_1D(self): | |
# Test squeezing to 1D | |
control = np.array([1, 2, 3, 4], int) | |
# | |
data = TextIO('1\n2\n3\n4\n') | |
test = np.genfromtxt(data, dtype=int) | |
assert_array_equal(test, control) | |
# | |
data = TextIO('1,2,3,4\n') | |
test = np.genfromtxt(data, dtype=int, delimiter=',') | |
assert_array_equal(test, control) | |
def test_comments(self): | |
# Test the stripping of comments | |
control = np.array([1, 2, 3, 5], int) | |
# Comment on its own line | |
data = TextIO('# comment\n1,2,3,5\n') | |
test = np.genfromtxt(data, dtype=int, delimiter=',', comments='#') | |
assert_equal(test, control) | |
# Comment at the end of a line | |
data = TextIO('1,2,3,5# comment\n') | |
test = np.genfromtxt(data, dtype=int, delimiter=',', comments='#') | |
assert_equal(test, control) | |
def test_skiprows(self): | |
# Test row skipping | |
control = np.array([1, 2, 3, 5], int) | |
kwargs = dict(dtype=int, delimiter=',') | |
# | |
data = TextIO('comment\n1,2,3,5\n') | |
test = np.genfromtxt(data, skip_header=1, **kwargs) | |
assert_equal(test, control) | |
# | |
data = TextIO('# comment\n1,2,3,5\n') | |
test = np.loadtxt(data, skiprows=1, **kwargs) | |
assert_equal(test, control) | |
def test_skip_footer(self): | |
data = ["# %i" % i for i in range(1, 6)] | |
data.append("A, B, C") | |
data.extend(["%i,%3.1f,%03s" % (i, i, i) for i in range(51)]) | |
data[-1] = "99,99" | |
kwargs = dict(delimiter=",", names=True, skip_header=5, skip_footer=10) | |
test = np.genfromtxt(TextIO("\n".join(data)), **kwargs) | |
ctrl = np.array([("%f" % i, "%f" % i, "%f" % i) for i in range(41)], | |
dtype=[(_, float) for _ in "ABC"]) | |
assert_equal(test, ctrl) | |
def test_skip_footer_with_invalid(self): | |
with suppress_warnings() as sup: | |
sup.filter(ConversionWarning) | |
basestr = '1 1\n2 2\n3 3\n4 4\n5 \n6 \n7 \n' | |
# Footer too small to get rid of all invalid values | |
assert_raises(ValueError, np.genfromtxt, | |
TextIO(basestr), skip_footer=1) | |
# except ValueError: | |
# pass | |
a = np.genfromtxt( | |
TextIO(basestr), skip_footer=1, invalid_raise=False) | |
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]])) | |
# | |
a = np.genfromtxt(TextIO(basestr), skip_footer=3) | |
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]])) | |
# | |
basestr = '1 1\n2 \n3 3\n4 4\n5 \n6 6\n7 7\n' | |
a = np.genfromtxt( | |
TextIO(basestr), skip_footer=1, invalid_raise=False) | |
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.], [6., 6.]])) | |
a = np.genfromtxt( | |
TextIO(basestr), skip_footer=3, invalid_raise=False) | |
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.]])) | |
def test_header(self): | |
# Test retrieving a header | |
data = TextIO('gender age weight\nM 64.0 75.0\nF 25.0 60.0') | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(data, dtype=None, names=True) | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
control = {'gender': np.array([b'M', b'F']), | |
'age': np.array([64.0, 25.0]), | |
'weight': np.array([75.0, 60.0])} | |
assert_equal(test['gender'], control['gender']) | |
assert_equal(test['age'], control['age']) | |
assert_equal(test['weight'], control['weight']) | |
def test_auto_dtype(self): | |
# Test the automatic definition of the output dtype | |
data = TextIO('A 64 75.0 3+4j True\nBCD 25 60.0 5+6j False') | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(data, dtype=None) | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
control = [np.array([b'A', b'BCD']), | |
np.array([64, 25]), | |
np.array([75.0, 60.0]), | |
np.array([3 + 4j, 5 + 6j]), | |
np.array([True, False]), ] | |
assert_equal(test.dtype.names, ['f0', 'f1', 'f2', 'f3', 'f4']) | |
for (i, ctrl) in enumerate(control): | |
assert_equal(test['f%i' % i], ctrl) | |
def test_auto_dtype_uniform(self): | |
# Tests whether the output dtype can be uniformized | |
data = TextIO('1 2 3 4\n5 6 7 8\n') | |
test = np.genfromtxt(data, dtype=None) | |
control = np.array([[1, 2, 3, 4], [5, 6, 7, 8]]) | |
assert_equal(test, control) | |
def test_fancy_dtype(self): | |
# Check that a nested dtype isn't MIA | |
data = TextIO('1,2,3.0\n4,5,6.0\n') | |
fancydtype = np.dtype([('x', int), ('y', [('t', int), ('s', float)])]) | |
test = np.genfromtxt(data, dtype=fancydtype, delimiter=',') | |
control = np.array([(1, (2, 3.0)), (4, (5, 6.0))], dtype=fancydtype) | |
assert_equal(test, control) | |
def test_names_overwrite(self): | |
# Test overwriting the names of the dtype | |
descriptor = {'names': ('g', 'a', 'w'), | |
'formats': ('S1', 'i4', 'f4')} | |
data = TextIO(b'M 64.0 75.0\nF 25.0 60.0') | |
names = ('gender', 'age', 'weight') | |
test = np.genfromtxt(data, dtype=descriptor, names=names) | |
descriptor['names'] = names | |
control = np.array([('M', 64.0, 75.0), | |
('F', 25.0, 60.0)], dtype=descriptor) | |
assert_equal(test, control) | |
def test_commented_header(self): | |
# Check that names can be retrieved even if the line is commented out. | |
data = TextIO(""" | |
#gender age weight | |
M 21 72.100000 | |
F 35 58.330000 | |
M 33 21.99 | |
""") | |
# The # is part of the first name and should be deleted automatically. | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(data, names=True, dtype=None) | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
ctrl = np.array([('M', 21, 72.1), ('F', 35, 58.33), ('M', 33, 21.99)], | |
dtype=[('gender', '|S1'), ('age', int), ('weight', float)]) | |
assert_equal(test, ctrl) | |
# Ditto, but we should get rid of the first element | |
data = TextIO(b""" | |
# gender age weight | |
M 21 72.100000 | |
F 35 58.330000 | |
M 33 21.99 | |
""") | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(data, names=True, dtype=None) | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
assert_equal(test, ctrl) | |
def test_names_and_comments_none(self): | |
# Tests case when names is true but comments is None (gh-10780) | |
data = TextIO('col1 col2\n 1 2\n 3 4') | |
test = np.genfromtxt(data, dtype=(int, int), comments=None, names=True) | |
control = np.array([(1, 2), (3, 4)], dtype=[('col1', int), ('col2', int)]) | |
assert_equal(test, control) | |
def test_file_is_closed_on_error(self): | |
# gh-13200 | |
with tempdir() as tmpdir: | |
fpath = os.path.join(tmpdir, "test.csv") | |
with open(fpath, "wb") as f: | |
f.write(u'\N{GREEK PI SYMBOL}'.encode('utf8')) | |
# ResourceWarnings are emitted from a destructor, so won't be | |
# detected by regular propagation to errors. | |
with assert_no_warnings(): | |
with pytest.raises(UnicodeDecodeError): | |
np.genfromtxt(fpath, encoding="ascii") | |
def test_autonames_and_usecols(self): | |
# Tests names and usecols | |
data = TextIO('A B C D\n aaaa 121 45 9.1') | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(data, usecols=('A', 'C', 'D'), | |
names=True, dtype=None) | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
control = np.array(('aaaa', 45, 9.1), | |
dtype=[('A', '|S4'), ('C', int), ('D', float)]) | |
assert_equal(test, control) | |
def test_converters_with_usecols(self): | |
# Test the combination user-defined converters and usecol | |
data = TextIO('1,2,3,,5\n6,7,8,9,10\n') | |
test = np.genfromtxt(data, dtype=int, delimiter=',', | |
converters={3: lambda s: int(s or - 999)}, | |
usecols=(1, 3,)) | |
control = np.array([[2, -999], [7, 9]], int) | |
assert_equal(test, control) | |
def test_converters_with_usecols_and_names(self): | |
# Tests names and usecols | |
data = TextIO('A B C D\n aaaa 121 45 9.1') | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(data, usecols=('A', 'C', 'D'), names=True, | |
dtype=None, | |
converters={'C': lambda s: 2 * int(s)}) | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
control = np.array(('aaaa', 90, 9.1), | |
dtype=[('A', '|S4'), ('C', int), ('D', float)]) | |
assert_equal(test, control) | |
def test_converters_cornercases(self): | |
# Test the conversion to datetime. | |
converter = { | |
'date': lambda s: strptime(s, '%Y-%m-%d %H:%M:%SZ')} | |
data = TextIO('2009-02-03 12:00:00Z, 72214.0') | |
test = np.genfromtxt(data, delimiter=',', dtype=None, | |
names=['date', 'stid'], converters=converter) | |
control = np.array((datetime(2009, 2, 3), 72214.), | |
dtype=[('date', np.object_), ('stid', float)]) | |
assert_equal(test, control) | |
def test_converters_cornercases2(self): | |
# Test the conversion to datetime64. | |
converter = { | |
'date': lambda s: np.datetime64(strptime(s, '%Y-%m-%d %H:%M:%SZ'))} | |
data = TextIO('2009-02-03 12:00:00Z, 72214.0') | |
test = np.genfromtxt(data, delimiter=',', dtype=None, | |
names=['date', 'stid'], converters=converter) | |
control = np.array((datetime(2009, 2, 3), 72214.), | |
dtype=[('date', 'datetime64[us]'), ('stid', float)]) | |
assert_equal(test, control) | |
def test_unused_converter(self): | |
# Test whether unused converters are forgotten | |
data = TextIO("1 21\n 3 42\n") | |
test = np.genfromtxt(data, usecols=(1,), | |
converters={0: lambda s: int(s, 16)}) | |
assert_equal(test, [21, 42]) | |
# | |
data.seek(0) | |
test = np.genfromtxt(data, usecols=(1,), | |
converters={1: lambda s: int(s, 16)}) | |
assert_equal(test, [33, 66]) | |
def test_invalid_converter(self): | |
strip_rand = lambda x: float((b'r' in x.lower() and x.split()[-1]) or | |
(b'r' not in x.lower() and x.strip() or 0.0)) | |
strip_per = lambda x: float((b'%' in x.lower() and x.split()[0]) or | |
(b'%' not in x.lower() and x.strip() or 0.0)) | |
s = TextIO("D01N01,10/1/2003 ,1 %,R 75,400,600\r\n" | |
"L24U05,12/5/2003, 2 %,1,300, 150.5\r\n" | |
"D02N03,10/10/2004,R 1,,7,145.55") | |
kwargs = dict( | |
converters={2: strip_per, 3: strip_rand}, delimiter=",", | |
dtype=None) | |
assert_raises(ConverterError, np.genfromtxt, s, **kwargs) | |
def test_tricky_converter_bug1666(self): | |
# Test some corner cases | |
s = TextIO('q1,2\nq3,4') | |
cnv = lambda s: float(s[1:]) | |
test = np.genfromtxt(s, delimiter=',', converters={0: cnv}) | |
control = np.array([[1., 2.], [3., 4.]]) | |
assert_equal(test, control) | |
def test_dtype_with_converters(self): | |
dstr = "2009; 23; 46" | |
test = np.genfromtxt(TextIO(dstr,), | |
delimiter=";", dtype=float, converters={0: bytes}) | |
control = np.array([('2009', 23., 46)], | |
dtype=[('f0', '|S4'), ('f1', float), ('f2', float)]) | |
assert_equal(test, control) | |
test = np.genfromtxt(TextIO(dstr,), | |
delimiter=";", dtype=float, converters={0: float}) | |
control = np.array([2009., 23., 46],) | |
assert_equal(test, control) | |
def test_dtype_with_converters_and_usecols(self): | |
dstr = "1,5,-1,1:1\n2,8,-1,1:n\n3,3,-2,m:n\n" | |
dmap = {'1:1':0, '1:n':1, 'm:1':2, 'm:n':3} | |
dtyp = [('e1','i4'),('e2','i4'),('e3','i2'),('n', 'i1')] | |
conv = {0: int, 1: int, 2: int, 3: lambda r: dmap[r.decode()]} | |
test = np.recfromcsv(TextIO(dstr,), dtype=dtyp, delimiter=',', | |
names=None, converters=conv) | |
control = np.rec.array([(1,5,-1,0), (2,8,-1,1), (3,3,-2,3)], dtype=dtyp) | |
assert_equal(test, control) | |
dtyp = [('e1','i4'),('e2','i4'),('n', 'i1')] | |
test = np.recfromcsv(TextIO(dstr,), dtype=dtyp, delimiter=',', | |
usecols=(0,1,3), names=None, converters=conv) | |
control = np.rec.array([(1,5,0), (2,8,1), (3,3,3)], dtype=dtyp) | |
assert_equal(test, control) | |
def test_dtype_with_object(self): | |
# Test using an explicit dtype with an object | |
data = """ 1; 2001-01-01 | |
2; 2002-01-31 """ | |
ndtype = [('idx', int), ('code', object)] | |
func = lambda s: strptime(s.strip(), "%Y-%m-%d") | |
converters = {1: func} | |
test = np.genfromtxt(TextIO(data), delimiter=";", dtype=ndtype, | |
converters=converters) | |
control = np.array( | |
[(1, datetime(2001, 1, 1)), (2, datetime(2002, 1, 31))], | |
dtype=ndtype) | |
assert_equal(test, control) | |
ndtype = [('nest', [('idx', int), ('code', object)])] | |
with assert_raises_regex(NotImplementedError, | |
'Nested fields.* not supported.*'): | |
test = np.genfromtxt(TextIO(data), delimiter=";", | |
dtype=ndtype, converters=converters) | |
# nested but empty fields also aren't supported | |
ndtype = [('idx', int), ('code', object), ('nest', [])] | |
with assert_raises_regex(NotImplementedError, | |
'Nested fields.* not supported.*'): | |
test = np.genfromtxt(TextIO(data), delimiter=";", | |
dtype=ndtype, converters=converters) | |
def test_dtype_with_object_no_converter(self): | |
# Object without a converter uses bytes: | |
parsed = np.genfromtxt(TextIO("1"), dtype=object) | |
assert parsed[()] == b"1" | |
parsed = np.genfromtxt(TextIO("string"), dtype=object) | |
assert parsed[()] == b"string" | |
def test_userconverters_with_explicit_dtype(self): | |
# Test user_converters w/ explicit (standard) dtype | |
data = TextIO('skip,skip,2001-01-01,1.0,skip') | |
test = np.genfromtxt(data, delimiter=",", names=None, dtype=float, | |
usecols=(2, 3), converters={2: bytes}) | |
control = np.array([('2001-01-01', 1.)], | |
dtype=[('', '|S10'), ('', float)]) | |
assert_equal(test, control) | |
def test_utf8_userconverters_with_explicit_dtype(self): | |
utf8 = b'\xcf\x96' | |
with temppath() as path: | |
with open(path, 'wb') as f: | |
f.write(b'skip,skip,2001-01-01' + utf8 + b',1.0,skip') | |
test = np.genfromtxt(path, delimiter=",", names=None, dtype=float, | |
usecols=(2, 3), converters={2: np.compat.unicode}, | |
encoding='UTF-8') | |
control = np.array([('2001-01-01' + utf8.decode('UTF-8'), 1.)], | |
dtype=[('', '|U11'), ('', float)]) | |
assert_equal(test, control) | |
def test_spacedelimiter(self): | |
# Test space delimiter | |
data = TextIO("1 2 3 4 5\n6 7 8 9 10") | |
test = np.genfromtxt(data) | |
control = np.array([[1., 2., 3., 4., 5.], | |
[6., 7., 8., 9., 10.]]) | |
assert_equal(test, control) | |
def test_integer_delimiter(self): | |
# Test using an integer for delimiter | |
data = " 1 2 3\n 4 5 67\n890123 4" | |
test = np.genfromtxt(TextIO(data), delimiter=3) | |
control = np.array([[1, 2, 3], [4, 5, 67], [890, 123, 4]]) | |
assert_equal(test, control) | |
def test_missing(self): | |
data = TextIO('1,2,3,,5\n') | |
test = np.genfromtxt(data, dtype=int, delimiter=',', | |
converters={3: lambda s: int(s or - 999)}) | |
control = np.array([1, 2, 3, -999, 5], int) | |
assert_equal(test, control) | |
def test_missing_with_tabs(self): | |
# Test w/ a delimiter tab | |
txt = "1\t2\t3\n\t2\t\n1\t\t3" | |
test = np.genfromtxt(TextIO(txt), delimiter="\t", | |
usemask=True,) | |
ctrl_d = np.array([(1, 2, 3), (np.nan, 2, np.nan), (1, np.nan, 3)],) | |
ctrl_m = np.array([(0, 0, 0), (1, 0, 1), (0, 1, 0)], dtype=bool) | |
assert_equal(test.data, ctrl_d) | |
assert_equal(test.mask, ctrl_m) | |
def test_usecols(self): | |
# Test the selection of columns | |
# Select 1 column | |
control = np.array([[1, 2], [3, 4]], float) | |
data = TextIO() | |
np.savetxt(data, control) | |
data.seek(0) | |
test = np.genfromtxt(data, dtype=float, usecols=(1,)) | |
assert_equal(test, control[:, 1]) | |
# | |
control = np.array([[1, 2, 3], [3, 4, 5]], float) | |
data = TextIO() | |
np.savetxt(data, control) | |
data.seek(0) | |
test = np.genfromtxt(data, dtype=float, usecols=(1, 2)) | |
assert_equal(test, control[:, 1:]) | |
# Testing with arrays instead of tuples. | |
data.seek(0) | |
test = np.genfromtxt(data, dtype=float, usecols=np.array([1, 2])) | |
assert_equal(test, control[:, 1:]) | |
def test_usecols_as_css(self): | |
# Test giving usecols with a comma-separated string | |
data = "1 2 3\n4 5 6" | |
test = np.genfromtxt(TextIO(data), | |
names="a, b, c", usecols="a, c") | |
ctrl = np.array([(1, 3), (4, 6)], dtype=[(_, float) for _ in "ac"]) | |
assert_equal(test, ctrl) | |
def test_usecols_with_structured_dtype(self): | |
# Test usecols with an explicit structured dtype | |
data = TextIO("JOE 70.1 25.3\nBOB 60.5 27.9") | |
names = ['stid', 'temp'] | |
dtypes = ['S4', 'f8'] | |
test = np.genfromtxt( | |
data, usecols=(0, 2), dtype=list(zip(names, dtypes))) | |
assert_equal(test['stid'], [b"JOE", b"BOB"]) | |
assert_equal(test['temp'], [25.3, 27.9]) | |
def test_usecols_with_integer(self): | |
# Test usecols with an integer | |
test = np.genfromtxt(TextIO(b"1 2 3\n4 5 6"), usecols=0) | |
assert_equal(test, np.array([1., 4.])) | |
def test_usecols_with_named_columns(self): | |
# Test usecols with named columns | |
ctrl = np.array([(1, 3), (4, 6)], dtype=[('a', float), ('c', float)]) | |
data = "1 2 3\n4 5 6" | |
kwargs = dict(names="a, b, c") | |
test = np.genfromtxt(TextIO(data), usecols=(0, -1), **kwargs) | |
assert_equal(test, ctrl) | |
test = np.genfromtxt(TextIO(data), | |
usecols=('a', 'c'), **kwargs) | |
assert_equal(test, ctrl) | |
def test_empty_file(self): | |
# Test that an empty file raises the proper warning. | |
with suppress_warnings() as sup: | |
sup.filter(message="genfromtxt: Empty input file:") | |
data = TextIO() | |
test = np.genfromtxt(data) | |
assert_equal(test, np.array([])) | |
# when skip_header > 0 | |
test = np.genfromtxt(data, skip_header=1) | |
assert_equal(test, np.array([])) | |
def test_fancy_dtype_alt(self): | |
# Check that a nested dtype isn't MIA | |
data = TextIO('1,2,3.0\n4,5,6.0\n') | |
fancydtype = np.dtype([('x', int), ('y', [('t', int), ('s', float)])]) | |
test = np.genfromtxt(data, dtype=fancydtype, delimiter=',', usemask=True) | |
control = ma.array([(1, (2, 3.0)), (4, (5, 6.0))], dtype=fancydtype) | |
assert_equal(test, control) | |
def test_shaped_dtype(self): | |
c = TextIO("aaaa 1.0 8.0 1 2 3 4 5 6") | |
dt = np.dtype([('name', 'S4'), ('x', float), ('y', float), | |
('block', int, (2, 3))]) | |
x = np.genfromtxt(c, dtype=dt) | |
a = np.array([('aaaa', 1.0, 8.0, [[1, 2, 3], [4, 5, 6]])], | |
dtype=dt) | |
assert_array_equal(x, a) | |
def test_withmissing(self): | |
data = TextIO('A,B\n0,1\n2,N/A') | |
kwargs = dict(delimiter=",", missing_values="N/A", names=True) | |
test = np.genfromtxt(data, dtype=None, usemask=True, **kwargs) | |
control = ma.array([(0, 1), (2, -1)], | |
mask=[(False, False), (False, True)], | |
dtype=[('A', int), ('B', int)]) | |
assert_equal(test, control) | |
assert_equal(test.mask, control.mask) | |
# | |
data.seek(0) | |
test = np.genfromtxt(data, usemask=True, **kwargs) | |
control = ma.array([(0, 1), (2, -1)], | |
mask=[(False, False), (False, True)], | |
dtype=[('A', float), ('B', float)]) | |
assert_equal(test, control) | |
assert_equal(test.mask, control.mask) | |
def test_user_missing_values(self): | |
data = "A, B, C\n0, 0., 0j\n1, N/A, 1j\n-9, 2.2, N/A\n3, -99, 3j" | |
basekwargs = dict(dtype=None, delimiter=",", names=True,) | |
mdtype = [('A', int), ('B', float), ('C', complex)] | |
# | |
test = np.genfromtxt(TextIO(data), missing_values="N/A", | |
**basekwargs) | |
control = ma.array([(0, 0.0, 0j), (1, -999, 1j), | |
(-9, 2.2, -999j), (3, -99, 3j)], | |
mask=[(0, 0, 0), (0, 1, 0), (0, 0, 1), (0, 0, 0)], | |
dtype=mdtype) | |
assert_equal(test, control) | |
# | |
basekwargs['dtype'] = mdtype | |
test = np.genfromtxt(TextIO(data), | |
missing_values={0: -9, 1: -99, 2: -999j}, usemask=True, **basekwargs) | |
control = ma.array([(0, 0.0, 0j), (1, -999, 1j), | |
(-9, 2.2, -999j), (3, -99, 3j)], | |
mask=[(0, 0, 0), (0, 1, 0), (1, 0, 1), (0, 1, 0)], | |
dtype=mdtype) | |
assert_equal(test, control) | |
# | |
test = np.genfromtxt(TextIO(data), | |
missing_values={0: -9, 'B': -99, 'C': -999j}, | |
usemask=True, | |
**basekwargs) | |
control = ma.array([(0, 0.0, 0j), (1, -999, 1j), | |
(-9, 2.2, -999j), (3, -99, 3j)], | |
mask=[(0, 0, 0), (0, 1, 0), (1, 0, 1), (0, 1, 0)], | |
dtype=mdtype) | |
assert_equal(test, control) | |
def test_user_filling_values(self): | |
# Test with missing and filling values | |
ctrl = np.array([(0, 3), (4, -999)], dtype=[('a', int), ('b', int)]) | |
data = "N/A, 2, 3\n4, ,???" | |
kwargs = dict(delimiter=",", | |
dtype=int, | |
names="a,b,c", | |
missing_values={0: "N/A", 'b': " ", 2: "???"}, | |
filling_values={0: 0, 'b': 0, 2: -999}) | |
test = np.genfromtxt(TextIO(data), **kwargs) | |
ctrl = np.array([(0, 2, 3), (4, 0, -999)], | |
dtype=[(_, int) for _ in "abc"]) | |
assert_equal(test, ctrl) | |
# | |
test = np.genfromtxt(TextIO(data), usecols=(0, -1), **kwargs) | |
ctrl = np.array([(0, 3), (4, -999)], dtype=[(_, int) for _ in "ac"]) | |
assert_equal(test, ctrl) | |
data2 = "1,2,*,4\n5,*,7,8\n" | |
test = np.genfromtxt(TextIO(data2), delimiter=',', dtype=int, | |
missing_values="*", filling_values=0) | |
ctrl = np.array([[1, 2, 0, 4], [5, 0, 7, 8]]) | |
assert_equal(test, ctrl) | |
test = np.genfromtxt(TextIO(data2), delimiter=',', dtype=int, | |
missing_values="*", filling_values=-1) | |
ctrl = np.array([[1, 2, -1, 4], [5, -1, 7, 8]]) | |
assert_equal(test, ctrl) | |
def test_withmissing_float(self): | |
data = TextIO('A,B\n0,1.5\n2,-999.00') | |
test = np.genfromtxt(data, dtype=None, delimiter=',', | |
missing_values='-999.0', names=True, usemask=True) | |
control = ma.array([(0, 1.5), (2, -1.)], | |
mask=[(False, False), (False, True)], | |
dtype=[('A', int), ('B', float)]) | |
assert_equal(test, control) | |
assert_equal(test.mask, control.mask) | |
def test_with_masked_column_uniform(self): | |
# Test masked column | |
data = TextIO('1 2 3\n4 5 6\n') | |
test = np.genfromtxt(data, dtype=None, | |
missing_values='2,5', usemask=True) | |
control = ma.array([[1, 2, 3], [4, 5, 6]], mask=[[0, 1, 0], [0, 1, 0]]) | |
assert_equal(test, control) | |
def test_with_masked_column_various(self): | |
# Test masked column | |
data = TextIO('True 2 3\nFalse 5 6\n') | |
test = np.genfromtxt(data, dtype=None, | |
missing_values='2,5', usemask=True) | |
control = ma.array([(1, 2, 3), (0, 5, 6)], | |
mask=[(0, 1, 0), (0, 1, 0)], | |
dtype=[('f0', bool), ('f1', bool), ('f2', int)]) | |
assert_equal(test, control) | |
def test_invalid_raise(self): | |
# Test invalid raise | |
data = ["1, 1, 1, 1, 1"] * 50 | |
for i in range(5): | |
data[10 * i] = "2, 2, 2, 2 2" | |
data.insert(0, "a, b, c, d, e") | |
mdata = TextIO("\n".join(data)) | |
kwargs = dict(delimiter=",", dtype=None, names=True) | |
def f(): | |
return np.genfromtxt(mdata, invalid_raise=False, **kwargs) | |
mtest = assert_warns(ConversionWarning, f) | |
assert_equal(len(mtest), 45) | |
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'abcde'])) | |
# | |
mdata.seek(0) | |
assert_raises(ValueError, np.genfromtxt, mdata, | |
delimiter=",", names=True) | |
def test_invalid_raise_with_usecols(self): | |
# Test invalid_raise with usecols | |
data = ["1, 1, 1, 1, 1"] * 50 | |
for i in range(5): | |
data[10 * i] = "2, 2, 2, 2 2" | |
data.insert(0, "a, b, c, d, e") | |
mdata = TextIO("\n".join(data)) | |
kwargs = dict(delimiter=",", dtype=None, names=True, | |
invalid_raise=False) | |
def f(): | |
return np.genfromtxt(mdata, usecols=(0, 4), **kwargs) | |
mtest = assert_warns(ConversionWarning, f) | |
assert_equal(len(mtest), 45) | |
assert_equal(mtest, np.ones(45, dtype=[(_, int) for _ in 'ae'])) | |
# | |
mdata.seek(0) | |
mtest = np.genfromtxt(mdata, usecols=(0, 1), **kwargs) | |
assert_equal(len(mtest), 50) | |
control = np.ones(50, dtype=[(_, int) for _ in 'ab']) | |
control[[10 * _ for _ in range(5)]] = (2, 2) | |
assert_equal(mtest, control) | |
def test_inconsistent_dtype(self): | |
# Test inconsistent dtype | |
data = ["1, 1, 1, 1, -1.1"] * 50 | |
mdata = TextIO("\n".join(data)) | |
converters = {4: lambda x: "(%s)" % x.decode()} | |
kwargs = dict(delimiter=",", converters=converters, | |
dtype=[(_, int) for _ in 'abcde'],) | |
assert_raises(ValueError, np.genfromtxt, mdata, **kwargs) | |
def test_default_field_format(self): | |
# Test default format | |
data = "0, 1, 2.3\n4, 5, 6.7" | |
mtest = np.genfromtxt(TextIO(data), | |
delimiter=",", dtype=None, defaultfmt="f%02i") | |
ctrl = np.array([(0, 1, 2.3), (4, 5, 6.7)], | |
dtype=[("f00", int), ("f01", int), ("f02", float)]) | |
assert_equal(mtest, ctrl) | |
def test_single_dtype_wo_names(self): | |
# Test single dtype w/o names | |
data = "0, 1, 2.3\n4, 5, 6.7" | |
mtest = np.genfromtxt(TextIO(data), | |
delimiter=",", dtype=float, defaultfmt="f%02i") | |
ctrl = np.array([[0., 1., 2.3], [4., 5., 6.7]], dtype=float) | |
assert_equal(mtest, ctrl) | |
def test_single_dtype_w_explicit_names(self): | |
# Test single dtype w explicit names | |
data = "0, 1, 2.3\n4, 5, 6.7" | |
mtest = np.genfromtxt(TextIO(data), | |
delimiter=",", dtype=float, names="a, b, c") | |
ctrl = np.array([(0., 1., 2.3), (4., 5., 6.7)], | |
dtype=[(_, float) for _ in "abc"]) | |
assert_equal(mtest, ctrl) | |
def test_single_dtype_w_implicit_names(self): | |
# Test single dtype w implicit names | |
data = "a, b, c\n0, 1, 2.3\n4, 5, 6.7" | |
mtest = np.genfromtxt(TextIO(data), | |
delimiter=",", dtype=float, names=True) | |
ctrl = np.array([(0., 1., 2.3), (4., 5., 6.7)], | |
dtype=[(_, float) for _ in "abc"]) | |
assert_equal(mtest, ctrl) | |
def test_easy_structured_dtype(self): | |
# Test easy structured dtype | |
data = "0, 1, 2.3\n4, 5, 6.7" | |
mtest = np.genfromtxt(TextIO(data), delimiter=",", | |
dtype=(int, float, float), defaultfmt="f_%02i") | |
ctrl = np.array([(0, 1., 2.3), (4, 5., 6.7)], | |
dtype=[("f_00", int), ("f_01", float), ("f_02", float)]) | |
assert_equal(mtest, ctrl) | |
def test_autostrip(self): | |
# Test autostrip | |
data = "01/01/2003 , 1.3, abcde" | |
kwargs = dict(delimiter=",", dtype=None) | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
mtest = np.genfromtxt(TextIO(data), **kwargs) | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
ctrl = np.array([('01/01/2003 ', 1.3, ' abcde')], | |
dtype=[('f0', '|S12'), ('f1', float), ('f2', '|S8')]) | |
assert_equal(mtest, ctrl) | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
mtest = np.genfromtxt(TextIO(data), autostrip=True, **kwargs) | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
ctrl = np.array([('01/01/2003', 1.3, 'abcde')], | |
dtype=[('f0', '|S10'), ('f1', float), ('f2', '|S5')]) | |
assert_equal(mtest, ctrl) | |
def test_replace_space(self): | |
# Test the 'replace_space' option | |
txt = "A.A, B (B), C:C\n1, 2, 3.14" | |
# Test default: replace ' ' by '_' and delete non-alphanum chars | |
test = np.genfromtxt(TextIO(txt), | |
delimiter=",", names=True, dtype=None) | |
ctrl_dtype = [("AA", int), ("B_B", int), ("CC", float)] | |
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype) | |
assert_equal(test, ctrl) | |
# Test: no replace, no delete | |
test = np.genfromtxt(TextIO(txt), | |
delimiter=",", names=True, dtype=None, | |
replace_space='', deletechars='') | |
ctrl_dtype = [("A.A", int), ("B (B)", int), ("C:C", float)] | |
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype) | |
assert_equal(test, ctrl) | |
# Test: no delete (spaces are replaced by _) | |
test = np.genfromtxt(TextIO(txt), | |
delimiter=",", names=True, dtype=None, | |
deletechars='') | |
ctrl_dtype = [("A.A", int), ("B_(B)", int), ("C:C", float)] | |
ctrl = np.array((1, 2, 3.14), dtype=ctrl_dtype) | |
assert_equal(test, ctrl) | |
def test_replace_space_known_dtype(self): | |
# Test the 'replace_space' (and related) options when dtype != None | |
txt = "A.A, B (B), C:C\n1, 2, 3" | |
# Test default: replace ' ' by '_' and delete non-alphanum chars | |
test = np.genfromtxt(TextIO(txt), | |
delimiter=",", names=True, dtype=int) | |
ctrl_dtype = [("AA", int), ("B_B", int), ("CC", int)] | |
ctrl = np.array((1, 2, 3), dtype=ctrl_dtype) | |
assert_equal(test, ctrl) | |
# Test: no replace, no delete | |
test = np.genfromtxt(TextIO(txt), | |
delimiter=",", names=True, dtype=int, | |
replace_space='', deletechars='') | |
ctrl_dtype = [("A.A", int), ("B (B)", int), ("C:C", int)] | |
ctrl = np.array((1, 2, 3), dtype=ctrl_dtype) | |
assert_equal(test, ctrl) | |
# Test: no delete (spaces are replaced by _) | |
test = np.genfromtxt(TextIO(txt), | |
delimiter=",", names=True, dtype=int, | |
deletechars='') | |
ctrl_dtype = [("A.A", int), ("B_(B)", int), ("C:C", int)] | |
ctrl = np.array((1, 2, 3), dtype=ctrl_dtype) | |
assert_equal(test, ctrl) | |
def test_incomplete_names(self): | |
# Test w/ incomplete names | |
data = "A,,C\n0,1,2\n3,4,5" | |
kwargs = dict(delimiter=",", names=True) | |
# w/ dtype=None | |
ctrl = np.array([(0, 1, 2), (3, 4, 5)], | |
dtype=[(_, int) for _ in ('A', 'f0', 'C')]) | |
test = np.genfromtxt(TextIO(data), dtype=None, **kwargs) | |
assert_equal(test, ctrl) | |
# w/ default dtype | |
ctrl = np.array([(0, 1, 2), (3, 4, 5)], | |
dtype=[(_, float) for _ in ('A', 'f0', 'C')]) | |
test = np.genfromtxt(TextIO(data), **kwargs) | |
def test_names_auto_completion(self): | |
# Make sure that names are properly completed | |
data = "1 2 3\n 4 5 6" | |
test = np.genfromtxt(TextIO(data), | |
dtype=(int, float, int), names="a") | |
ctrl = np.array([(1, 2, 3), (4, 5, 6)], | |
dtype=[('a', int), ('f0', float), ('f1', int)]) | |
assert_equal(test, ctrl) | |
def test_names_with_usecols_bug1636(self): | |
# Make sure we pick up the right names w/ usecols | |
data = "A,B,C,D,E\n0,1,2,3,4\n0,1,2,3,4\n0,1,2,3,4" | |
ctrl_names = ("A", "C", "E") | |
test = np.genfromtxt(TextIO(data), | |
dtype=(int, int, int), delimiter=",", | |
usecols=(0, 2, 4), names=True) | |
assert_equal(test.dtype.names, ctrl_names) | |
# | |
test = np.genfromtxt(TextIO(data), | |
dtype=(int, int, int), delimiter=",", | |
usecols=("A", "C", "E"), names=True) | |
assert_equal(test.dtype.names, ctrl_names) | |
# | |
test = np.genfromtxt(TextIO(data), | |
dtype=int, delimiter=",", | |
usecols=("A", "C", "E"), names=True) | |
assert_equal(test.dtype.names, ctrl_names) | |
def test_fixed_width_names(self): | |
# Test fix-width w/ names | |
data = " A B C\n 0 1 2.3\n 45 67 9." | |
kwargs = dict(delimiter=(5, 5, 4), names=True, dtype=None) | |
ctrl = np.array([(0, 1, 2.3), (45, 67, 9.)], | |
dtype=[('A', int), ('B', int), ('C', float)]) | |
test = np.genfromtxt(TextIO(data), **kwargs) | |
assert_equal(test, ctrl) | |
# | |
kwargs = dict(delimiter=5, names=True, dtype=None) | |
ctrl = np.array([(0, 1, 2.3), (45, 67, 9.)], | |
dtype=[('A', int), ('B', int), ('C', float)]) | |
test = np.genfromtxt(TextIO(data), **kwargs) | |
assert_equal(test, ctrl) | |
def test_filling_values(self): | |
# Test missing values | |
data = b"1, 2, 3\n1, , 5\n0, 6, \n" | |
kwargs = dict(delimiter=",", dtype=None, filling_values=-999) | |
ctrl = np.array([[1, 2, 3], [1, -999, 5], [0, 6, -999]], dtype=int) | |
test = np.genfromtxt(TextIO(data), **kwargs) | |
assert_equal(test, ctrl) | |
def test_comments_is_none(self): | |
# Github issue 329 (None was previously being converted to 'None'). | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(TextIO("test1,testNonetherestofthedata"), | |
dtype=None, comments=None, delimiter=',') | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
assert_equal(test[1], b'testNonetherestofthedata') | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(TextIO("test1, testNonetherestofthedata"), | |
dtype=None, comments=None, delimiter=',') | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
assert_equal(test[1], b' testNonetherestofthedata') | |
def test_latin1(self): | |
latin1 = b'\xf6\xfc\xf6' | |
norm = b"norm1,norm2,norm3\n" | |
enc = b"test1,testNonethe" + latin1 + b",test3\n" | |
s = norm + enc + norm | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(TextIO(s), | |
dtype=None, comments=None, delimiter=',') | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
assert_equal(test[1, 0], b"test1") | |
assert_equal(test[1, 1], b"testNonethe" + latin1) | |
assert_equal(test[1, 2], b"test3") | |
test = np.genfromtxt(TextIO(s), | |
dtype=None, comments=None, delimiter=',', | |
encoding='latin1') | |
assert_equal(test[1, 0], u"test1") | |
assert_equal(test[1, 1], u"testNonethe" + latin1.decode('latin1')) | |
assert_equal(test[1, 2], u"test3") | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(TextIO(b"0,testNonethe" + latin1), | |
dtype=None, comments=None, delimiter=',') | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
assert_equal(test['f0'], 0) | |
assert_equal(test['f1'], b"testNonethe" + latin1) | |
def test_binary_decode_autodtype(self): | |
utf16 = b'\xff\xfeh\x04 \x00i\x04 \x00j\x04' | |
v = self.loadfunc(BytesIO(utf16), dtype=None, encoding='UTF-16') | |
assert_array_equal(v, np.array(utf16.decode('UTF-16').split())) | |
def test_utf8_byte_encoding(self): | |
utf8 = b"\xcf\x96" | |
norm = b"norm1,norm2,norm3\n" | |
enc = b"test1,testNonethe" + utf8 + b",test3\n" | |
s = norm + enc + norm | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', np.VisibleDeprecationWarning) | |
test = np.genfromtxt(TextIO(s), | |
dtype=None, comments=None, delimiter=',') | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
ctl = np.array([ | |
[b'norm1', b'norm2', b'norm3'], | |
[b'test1', b'testNonethe' + utf8, b'test3'], | |
[b'norm1', b'norm2', b'norm3']]) | |
assert_array_equal(test, ctl) | |
def test_utf8_file(self): | |
utf8 = b"\xcf\x96" | |
with temppath() as path: | |
with open(path, "wb") as f: | |
f.write((b"test1,testNonethe" + utf8 + b",test3\n") * 2) | |
test = np.genfromtxt(path, dtype=None, comments=None, | |
delimiter=',', encoding="UTF-8") | |
ctl = np.array([ | |
["test1", "testNonethe" + utf8.decode("UTF-8"), "test3"], | |
["test1", "testNonethe" + utf8.decode("UTF-8"), "test3"]], | |
dtype=np.unicode_) | |
assert_array_equal(test, ctl) | |
# test a mixed dtype | |
with open(path, "wb") as f: | |
f.write(b"0,testNonethe" + utf8) | |
test = np.genfromtxt(path, dtype=None, comments=None, | |
delimiter=',', encoding="UTF-8") | |
assert_equal(test['f0'], 0) | |
assert_equal(test['f1'], "testNonethe" + utf8.decode("UTF-8")) | |
def test_utf8_file_nodtype_unicode(self): | |
# bytes encoding with non-latin1 -> unicode upcast | |
utf8 = u'\u03d6' | |
latin1 = u'\xf6\xfc\xf6' | |
# skip test if cannot encode utf8 test string with preferred | |
# encoding. The preferred encoding is assumed to be the default | |
# encoding of io.open. Will need to change this for PyTest, maybe | |
# using pytest.mark.xfail(raises=***). | |
try: | |
encoding = locale.getpreferredencoding() | |
utf8.encode(encoding) | |
except (UnicodeError, ImportError): | |
pytest.skip('Skipping test_utf8_file_nodtype_unicode, ' | |
'unable to encode utf8 in preferred encoding') | |
with temppath() as path: | |
with io.open(path, "wt") as f: | |
f.write(u"norm1,norm2,norm3\n") | |
f.write(u"norm1," + latin1 + u",norm3\n") | |
f.write(u"test1,testNonethe" + utf8 + u",test3\n") | |
with warnings.catch_warnings(record=True) as w: | |
warnings.filterwarnings('always', '', | |
np.VisibleDeprecationWarning) | |
test = np.genfromtxt(path, dtype=None, comments=None, | |
delimiter=',') | |
# Check for warning when encoding not specified. | |
assert_(w[0].category is np.VisibleDeprecationWarning) | |
ctl = np.array([ | |
["norm1", "norm2", "norm3"], | |
["norm1", latin1, "norm3"], | |
["test1", "testNonethe" + utf8, "test3"]], | |
dtype=np.unicode_) | |
assert_array_equal(test, ctl) | |
def test_recfromtxt(self): | |
# | |
data = TextIO('A,B\n0,1\n2,3') | |
kwargs = dict(delimiter=",", missing_values="N/A", names=True) | |
test = np.recfromtxt(data, **kwargs) | |
control = np.array([(0, 1), (2, 3)], | |
dtype=[('A', int), ('B', int)]) | |
assert_(isinstance(test, np.recarray)) | |
assert_equal(test, control) | |
# | |
data = TextIO('A,B\n0,1\n2,N/A') | |
test = np.recfromtxt(data, dtype=None, usemask=True, **kwargs) | |
control = ma.array([(0, 1), (2, -1)], | |
mask=[(False, False), (False, True)], | |
dtype=[('A', int), ('B', int)]) | |
assert_equal(test, control) | |
assert_equal(test.mask, control.mask) | |
assert_equal(test.A, [0, 2]) | |
def test_recfromcsv(self): | |
# | |
data = TextIO('A,B\n0,1\n2,3') | |
kwargs = dict(missing_values="N/A", names=True, case_sensitive=True) | |
test = np.recfromcsv(data, dtype=None, **kwargs) | |
control = np.array([(0, 1), (2, 3)], | |
dtype=[('A', int), ('B', int)]) | |
assert_(isinstance(test, np.recarray)) | |
assert_equal(test, control) | |
# | |
data = TextIO('A,B\n0,1\n2,N/A') | |
test = np.recfromcsv(data, dtype=None, usemask=True, **kwargs) | |
control = ma.array([(0, 1), (2, -1)], | |
mask=[(False, False), (False, True)], | |
dtype=[('A', int), ('B', int)]) | |
assert_equal(test, control) | |
assert_equal(test.mask, control.mask) | |
assert_equal(test.A, [0, 2]) | |
# | |
data = TextIO('A,B\n0,1\n2,3') | |
test = np.recfromcsv(data, missing_values='N/A',) | |
control = np.array([(0, 1), (2, 3)], | |
dtype=[('a', int), ('b', int)]) | |
assert_(isinstance(test, np.recarray)) | |
assert_equal(test, control) | |
# | |
data = TextIO('A,B\n0,1\n2,3') | |
dtype = [('a', int), ('b', float)] | |
test = np.recfromcsv(data, missing_values='N/A', dtype=dtype) | |
control = np.array([(0, 1), (2, 3)], | |
dtype=dtype) | |
assert_(isinstance(test, np.recarray)) | |
assert_equal(test, control) | |
#gh-10394 | |
data = TextIO('color\n"red"\n"blue"') | |
test = np.recfromcsv(data, converters={0: lambda x: x.strip(b'\"')}) | |
control = np.array([('red',), ('blue',)], dtype=[('color', (bytes, 4))]) | |
assert_equal(test.dtype, control.dtype) | |
assert_equal(test, control) | |
def test_max_rows(self): | |
# Test the `max_rows` keyword argument. | |
data = '1 2\n3 4\n5 6\n7 8\n9 10\n' | |
txt = TextIO(data) | |
a1 = np.genfromtxt(txt, max_rows=3) | |
a2 = np.genfromtxt(txt) | |
assert_equal(a1, [[1, 2], [3, 4], [5, 6]]) | |
assert_equal(a2, [[7, 8], [9, 10]]) | |
# max_rows must be at least 1. | |
assert_raises(ValueError, np.genfromtxt, TextIO(data), max_rows=0) | |
# An input with several invalid rows. | |
data = '1 1\n2 2\n0 \n3 3\n4 4\n5 \n6 \n7 \n' | |
test = np.genfromtxt(TextIO(data), max_rows=2) | |
control = np.array([[1., 1.], [2., 2.]]) | |
assert_equal(test, control) | |
# Test keywords conflict | |
assert_raises(ValueError, np.genfromtxt, TextIO(data), skip_footer=1, | |
max_rows=4) | |
# Test with invalid value | |
assert_raises(ValueError, np.genfromtxt, TextIO(data), max_rows=4) | |
# Test with invalid not raise | |
with suppress_warnings() as sup: | |
sup.filter(ConversionWarning) | |
test = np.genfromtxt(TextIO(data), max_rows=4, invalid_raise=False) | |
control = np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]) | |
assert_equal(test, control) | |
test = np.genfromtxt(TextIO(data), max_rows=5, invalid_raise=False) | |
control = np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]) | |
assert_equal(test, control) | |
# Structured array with field names. | |
data = 'a b\n#c d\n1 1\n2 2\n#0 \n3 3\n4 4\n5 5\n' | |
# Test with header, names and comments | |
txt = TextIO(data) | |
test = np.genfromtxt(txt, skip_header=1, max_rows=3, names=True) | |
control = np.array([(1.0, 1.0), (2.0, 2.0), (3.0, 3.0)], | |
dtype=[('c', '<f8'), ('d', '<f8')]) | |
assert_equal(test, control) | |
# To continue reading the same "file", don't use skip_header or | |
# names, and use the previously determined dtype. | |
test = np.genfromtxt(txt, max_rows=None, dtype=test.dtype) | |
control = np.array([(4.0, 4.0), (5.0, 5.0)], | |
dtype=[('c', '<f8'), ('d', '<f8')]) | |
assert_equal(test, control) | |
def test_gft_using_filename(self): | |
# Test that we can load data from a filename as well as a file | |
# object | |
tgt = np.arange(6).reshape((2, 3)) | |
linesep = ('\n', '\r\n', '\r') | |
for sep in linesep: | |
data = '0 1 2' + sep + '3 4 5' | |
with temppath() as name: | |
with open(name, 'w') as f: | |
f.write(data) | |
res = np.genfromtxt(name) | |
assert_array_equal(res, tgt) | |
def test_gft_from_gzip(self): | |
# Test that we can load data from a gzipped file | |
wanted = np.arange(6).reshape((2, 3)) | |
linesep = ('\n', '\r\n', '\r') | |
for sep in linesep: | |
data = '0 1 2' + sep + '3 4 5' | |
s = BytesIO() | |
with gzip.GzipFile(fileobj=s, mode='w') as g: | |
g.write(asbytes(data)) | |
with temppath(suffix='.gz2') as name: | |
with open(name, 'w') as f: | |
f.write(data) | |
assert_array_equal(np.genfromtxt(name), wanted) | |
def test_gft_using_generator(self): | |
# gft doesn't work with unicode. | |
def count(): | |
for i in range(10): | |
yield asbytes("%d" % i) | |
res = np.genfromtxt(count()) | |
assert_array_equal(res, np.arange(10)) | |
def test_auto_dtype_largeint(self): | |
# Regression test for numpy/numpy#5635 whereby large integers could | |
# cause OverflowErrors. | |
# Test the automatic definition of the output dtype | |
# | |
# 2**66 = 73786976294838206464 => should convert to float | |
# 2**34 = 17179869184 => should convert to int64 | |
# 2**10 = 1024 => should convert to int (int32 on 32-bit systems, | |
# int64 on 64-bit systems) | |
data = TextIO('73786976294838206464 17179869184 1024') | |
test = np.genfromtxt(data, dtype=None) | |
assert_equal(test.dtype.names, ['f0', 'f1', 'f2']) | |
assert_(test.dtype['f0'] == float) | |
assert_(test.dtype['f1'] == np.int64) | |
assert_(test.dtype['f2'] == np.int_) | |
assert_allclose(test['f0'], 73786976294838206464.) | |
assert_equal(test['f1'], 17179869184) | |
assert_equal(test['f2'], 1024) | |
def test_unpack_structured(self): | |
# Regression test for gh-4341 | |
# Unpacking should work on structured arrays | |
txt = TextIO("M 21 72\nF 35 58") | |
dt = {'names': ('a', 'b', 'c'), 'formats': ('S1', 'i4', 'f4')} | |
a, b, c = np.genfromtxt(txt, dtype=dt, unpack=True) | |
assert_equal(a.dtype, np.dtype('S1')) | |
assert_equal(b.dtype, np.dtype('i4')) | |
assert_equal(c.dtype, np.dtype('f4')) | |
assert_array_equal(a, np.array([b'M', b'F'])) | |
assert_array_equal(b, np.array([21, 35])) | |
assert_array_equal(c, np.array([72., 58.])) | |
def test_unpack_auto_dtype(self): | |
# Regression test for gh-4341 | |
# Unpacking should work when dtype=None | |
txt = TextIO("M 21 72.\nF 35 58.") | |
expected = (np.array(["M", "F"]), np.array([21, 35]), np.array([72., 58.])) | |
test = np.genfromtxt(txt, dtype=None, unpack=True, encoding="utf-8") | |
for arr, result in zip(expected, test): | |
assert_array_equal(arr, result) | |
assert_equal(arr.dtype, result.dtype) | |
def test_unpack_single_name(self): | |
# Regression test for gh-4341 | |
# Unpacking should work when structured dtype has only one field | |
txt = TextIO("21\n35") | |
dt = {'names': ('a',), 'formats': ('i4',)} | |
expected = np.array([21, 35], dtype=np.int32) | |
test = np.genfromtxt(txt, dtype=dt, unpack=True) | |
assert_array_equal(expected, test) | |
assert_equal(expected.dtype, test.dtype) | |
def test_squeeze_scalar(self): | |
# Regression test for gh-4341 | |
# Unpacking a scalar should give zero-dim output, | |
# even if dtype is structured | |
txt = TextIO("1") | |
dt = {'names': ('a',), 'formats': ('i4',)} | |
expected = np.array((1,), dtype=np.int32) | |
test = np.genfromtxt(txt, dtype=dt, unpack=True) | |
assert_array_equal(expected, test) | |
assert_equal((), test.shape) | |
assert_equal(expected.dtype, test.dtype) | |
class TestPathUsage: | |
# Test that pathlib.Path can be used | |
def test_loadtxt(self): | |
with temppath(suffix='.txt') as path: | |
path = Path(path) | |
a = np.array([[1.1, 2], [3, 4]]) | |
np.savetxt(path, a) | |
x = np.loadtxt(path) | |
assert_array_equal(x, a) | |
def test_save_load(self): | |
# Test that pathlib.Path instances can be used with save. | |
with temppath(suffix='.npy') as path: | |
path = Path(path) | |
a = np.array([[1, 2], [3, 4]], int) | |
np.save(path, a) | |
data = np.load(path) | |
assert_array_equal(data, a) | |
def test_save_load_memmap(self): | |
# Test that pathlib.Path instances can be loaded mem-mapped. | |
with temppath(suffix='.npy') as path: | |
path = Path(path) | |
a = np.array([[1, 2], [3, 4]], int) | |
np.save(path, a) | |
data = np.load(path, mmap_mode='r') | |
assert_array_equal(data, a) | |
# close the mem-mapped file | |
del data | |
if IS_PYPY: | |
break_cycles() | |
break_cycles() | |
def test_save_load_memmap_readwrite(self): | |
# Test that pathlib.Path instances can be written mem-mapped. | |
with temppath(suffix='.npy') as path: | |
path = Path(path) | |
a = np.array([[1, 2], [3, 4]], int) | |
np.save(path, a) | |
b = np.load(path, mmap_mode='r+') | |
a[0][0] = 5 | |
b[0][0] = 5 | |
del b # closes the file | |
if IS_PYPY: | |
break_cycles() | |
break_cycles() | |
data = np.load(path) | |
assert_array_equal(data, a) | |
def test_savez_load(self): | |
# Test that pathlib.Path instances can be used with savez. | |
with temppath(suffix='.npz') as path: | |
path = Path(path) | |
np.savez(path, lab='place holder') | |
with np.load(path) as data: | |
assert_array_equal(data['lab'], 'place holder') | |
def test_savez_compressed_load(self): | |
# Test that pathlib.Path instances can be used with savez. | |
with temppath(suffix='.npz') as path: | |
path = Path(path) | |
np.savez_compressed(path, lab='place holder') | |
data = np.load(path) | |
assert_array_equal(data['lab'], 'place holder') | |
data.close() | |
def test_genfromtxt(self): | |
with temppath(suffix='.txt') as path: | |
path = Path(path) | |
a = np.array([(1, 2), (3, 4)]) | |
np.savetxt(path, a) | |
data = np.genfromtxt(path) | |
assert_array_equal(a, data) | |
def test_ndfromtxt(self): | |
# Test outputting a standard ndarray | |
with temppath(suffix='.txt') as path: | |
path = Path(path) | |
with path.open('w') as f: | |
f.write(u'1 2\n3 4') | |
control = np.array([[1, 2], [3, 4]], dtype=int) | |
test = np.genfromtxt(path, dtype=int) | |
assert_array_equal(test, control) | |
def test_mafromtxt(self): | |
# From `test_fancy_dtype_alt` above | |
with temppath(suffix='.txt') as path: | |
path = Path(path) | |
with path.open('w') as f: | |
f.write(u'1,2,3.0\n4,5,6.0\n') | |
test = np.genfromtxt(path, delimiter=',', usemask=True) | |
control = ma.array([(1.0, 2.0, 3.0), (4.0, 5.0, 6.0)]) | |
assert_equal(test, control) | |
def test_recfromtxt(self): | |
with temppath(suffix='.txt') as path: | |
path = Path(path) | |
with path.open('w') as f: | |
f.write(u'A,B\n0,1\n2,3') | |
kwargs = dict(delimiter=",", missing_values="N/A", names=True) | |
test = np.recfromtxt(path, **kwargs) | |
control = np.array([(0, 1), (2, 3)], | |
dtype=[('A', int), ('B', int)]) | |
assert_(isinstance(test, np.recarray)) | |
assert_equal(test, control) | |
def test_recfromcsv(self): | |
with temppath(suffix='.txt') as path: | |
path = Path(path) | |
with path.open('w') as f: | |
f.write(u'A,B\n0,1\n2,3') | |
kwargs = dict(missing_values="N/A", names=True, case_sensitive=True) | |
test = np.recfromcsv(path, dtype=None, **kwargs) | |
control = np.array([(0, 1), (2, 3)], | |
dtype=[('A', int), ('B', int)]) | |
assert_(isinstance(test, np.recarray)) | |
assert_equal(test, control) | |
def test_gzip_load(): | |
a = np.random.random((5, 5)) | |
s = BytesIO() | |
f = gzip.GzipFile(fileobj=s, mode="w") | |
np.save(f, a) | |
f.close() | |
s.seek(0) | |
f = gzip.GzipFile(fileobj=s, mode="r") | |
assert_array_equal(np.load(f), a) | |
# These next two classes encode the minimal API needed to save()/load() arrays. | |
# The `test_ducktyping` ensures they work correctly | |
class JustWriter: | |
def __init__(self, base): | |
self.base = base | |
def write(self, s): | |
return self.base.write(s) | |
def flush(self): | |
return self.base.flush() | |
class JustReader: | |
def __init__(self, base): | |
self.base = base | |
def read(self, n): | |
return self.base.read(n) | |
def seek(self, off, whence=0): | |
return self.base.seek(off, whence) | |
def test_ducktyping(): | |
a = np.random.random((5, 5)) | |
s = BytesIO() | |
f = JustWriter(s) | |
np.save(f, a) | |
f.flush() | |
s.seek(0) | |
f = JustReader(s) | |
assert_array_equal(np.load(f), a) | |
def test_gzip_loadtxt(): | |
# Thanks to another windows brokenness, we can't use | |
# NamedTemporaryFile: a file created from this function cannot be | |
# reopened by another open call. So we first put the gzipped string | |
# of the test reference array, write it to a securely opened file, | |
# which is then read from by the loadtxt function | |
s = BytesIO() | |
g = gzip.GzipFile(fileobj=s, mode='w') | |
g.write(b'1 2 3\n') | |
g.close() | |
s.seek(0) | |
with temppath(suffix='.gz') as name: | |
with open(name, 'wb') as f: | |
f.write(s.read()) | |
res = np.loadtxt(name) | |
s.close() | |
assert_array_equal(res, [1, 2, 3]) | |
def test_gzip_loadtxt_from_string(): | |
s = BytesIO() | |
f = gzip.GzipFile(fileobj=s, mode="w") | |
f.write(b'1 2 3\n') | |
f.close() | |
s.seek(0) | |
f = gzip.GzipFile(fileobj=s, mode="r") | |
assert_array_equal(np.loadtxt(f), [1, 2, 3]) | |
def test_npzfile_dict(): | |
s = BytesIO() | |
x = np.zeros((3, 3)) | |
y = np.zeros((3, 3)) | |
np.savez(s, x=x, y=y) | |
s.seek(0) | |
z = np.load(s) | |
assert_('x' in z) | |
assert_('y' in z) | |
assert_('x' in z.keys()) | |
assert_('y' in z.keys()) | |
for f, a in z.items(): | |
assert_(f in ['x', 'y']) | |
assert_equal(a.shape, (3, 3)) | |
assert_(len(z.items()) == 2) | |
for f in z: | |
assert_(f in ['x', 'y']) | |
assert_('x' in z.keys()) | |
def test_load_refcount(): | |
# Check that objects returned by np.load are directly freed based on | |
# their refcount, rather than needing the gc to collect them. | |
f = BytesIO() | |
np.savez(f, [1, 2, 3]) | |
f.seek(0) | |
with assert_no_gc_cycles(): | |
np.load(f) | |
f.seek(0) | |
dt = [("a", 'u1', 2), ("b", 'u1', 2)] | |
with assert_no_gc_cycles(): | |
x = np.loadtxt(TextIO("0 1 2 3"), dtype=dt) | |
assert_equal(x, np.array([((0, 1), (2, 3))], dtype=dt)) | |