|
""" Unit tests version 2.6.1.0 for adodbapi""" |
|
""" |
|
adodbapi - A python DB API 2.0 interface to Microsoft ADO |
|
|
|
Copyright (C) 2002 Henrik Ekelund |
|
|
|
This library is free software; you can redistribute it and/or |
|
modify it under the terms of the GNU Lesser General Public |
|
License as published by the Free Software Foundation; either |
|
version 2.1 of the License, or (at your option) any later version. |
|
|
|
This library is distributed in the hope that it will be useful, |
|
but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
Lesser General Public License for more details. |
|
|
|
You should have received a copy of the GNU Lesser General Public |
|
License along with this library; if not, write to the Free Software |
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
|
|
|
Updates by Vernon Cole |
|
""" |
|
|
|
import copy |
|
import datetime |
|
import decimal |
|
import random |
|
import string |
|
import sys |
|
import unittest |
|
|
|
try: |
|
import win32com.client |
|
|
|
win32 = True |
|
except ImportError: |
|
win32 = False |
|
|
|
|
|
import adodbapitestconfig as config |
|
|
|
|
|
import tryconnection |
|
|
|
import adodbapi |
|
import adodbapi.apibase as api |
|
|
|
try: |
|
import adodbapi.ado_consts as ado_consts |
|
except ImportError: |
|
try: |
|
import ado_consts |
|
except ImportError: |
|
from adodbapi import ado_consts |
|
|
|
|
|
def str2bytes(sval): |
|
return sval.encode("latin1") |
|
|
|
|
|
long = int |
|
|
|
|
|
def randomstring(length): |
|
return "".join([random.choice(string.ascii_letters) for n in range(32)]) |
|
|
|
|
|
class CommonDBTests(unittest.TestCase): |
|
"Self contained super-simple tests in easy syntax, should work on everything between mySQL and Oracle" |
|
|
|
def setUp(self): |
|
self.engine = "unknown" |
|
|
|
def getEngine(self): |
|
return self.engine |
|
|
|
def getConnection(self): |
|
raise NotImplementedError |
|
|
|
def getCursor(self): |
|
return self.getConnection().cursor() |
|
|
|
def testConnection(self): |
|
crsr = self.getCursor() |
|
assert crsr.__class__.__name__ == "Cursor" |
|
|
|
def testErrorHandlerInherits(self): |
|
if not self.remote: |
|
conn = self.getConnection() |
|
mycallable = lambda connection, cursor, errorclass, errorvalue: 1 |
|
conn.errorhandler = mycallable |
|
crsr = conn.cursor() |
|
assert ( |
|
crsr.errorhandler == mycallable |
|
), "Error handler on crsr should be same as on connection" |
|
|
|
def testDefaultErrorHandlerConnection(self): |
|
if not self.remote: |
|
conn = self.getConnection() |
|
del conn.messages[:] |
|
try: |
|
conn.close() |
|
conn.commit() |
|
except: |
|
assert len(conn.messages) == 1 |
|
assert len(conn.messages[0]) == 2 |
|
assert conn.messages[0][0] == api.ProgrammingError |
|
|
|
def testOwnErrorHandlerConnection(self): |
|
if self.remote: |
|
return |
|
mycallable = ( |
|
lambda connection, cursor, errorclass, errorvalue: 1 |
|
) |
|
conn = self.getConnection() |
|
conn.errorhandler = mycallable |
|
conn.close() |
|
conn.commit() |
|
assert len(conn.messages) == 0 |
|
|
|
conn.errorhandler = None |
|
try: |
|
conn.close() |
|
conn.commit() |
|
except: |
|
pass |
|
|
|
assert ( |
|
len(conn.messages) > 0 |
|
), "Setting errorhandler to none should bring back the standard error handler" |
|
|
|
def testDefaultErrorHandlerCursor(self): |
|
crsr = self.getConnection().cursor() |
|
if not self.remote: |
|
del crsr.messages[:] |
|
try: |
|
crsr.execute("SELECT abbtytddrf FROM dasdasd") |
|
except: |
|
assert len(crsr.messages) == 1 |
|
assert len(crsr.messages[0]) == 2 |
|
assert crsr.messages[0][0] == api.DatabaseError |
|
|
|
def testOwnErrorHandlerCursor(self): |
|
if self.remote: |
|
return |
|
mycallable = ( |
|
lambda connection, cursor, errorclass, errorvalue: 1 |
|
) |
|
crsr = self.getConnection().cursor() |
|
crsr.errorhandler = mycallable |
|
crsr.execute("SELECT abbtytddrf FROM dasdasd") |
|
assert len(crsr.messages) == 0 |
|
|
|
crsr.errorhandler = None |
|
try: |
|
crsr.execute("SELECT abbtytddrf FROM dasdasd") |
|
except: |
|
pass |
|
|
|
assert ( |
|
len(crsr.messages) > 0 |
|
), "Setting errorhandler to none should bring back the standard error handler" |
|
|
|
def testUserDefinedConversions(self): |
|
if self.remote: |
|
return |
|
try: |
|
duplicatingConverter = lambda aStringField: aStringField * 2 |
|
assert duplicatingConverter("gabba") == "gabbagabba" |
|
|
|
self.helpForceDropOnTblTemp() |
|
conn = self.getConnection() |
|
|
|
self.assertRaises(AttributeError, lambda x: conn.variantConversions[x], [2]) |
|
if not self.remote: |
|
|
|
conn.variantConversions = copy.copy(api.variantConversions) |
|
crsr = conn.cursor() |
|
tabdef = ( |
|
"CREATE TABLE xx_%s (fldData VARCHAR(100) NOT NULL, fld2 VARCHAR(20))" |
|
% config.tmp |
|
) |
|
crsr.execute(tabdef) |
|
crsr.execute( |
|
"INSERT INTO xx_%s(fldData,fld2) VALUES('gabba','booga')" |
|
% config.tmp |
|
) |
|
crsr.execute( |
|
"INSERT INTO xx_%s(fldData,fld2) VALUES('hey','yo')" % config.tmp |
|
) |
|
|
|
conn.variantConversions[api.adoStringTypes] = duplicatingConverter |
|
crsr.execute( |
|
"SELECT fldData,fld2 FROM xx_%s ORDER BY fldData" % config.tmp |
|
) |
|
|
|
rows = crsr.fetchall() |
|
row = rows[0] |
|
self.assertEqual(row[0], "gabbagabba") |
|
row = rows[1] |
|
self.assertEqual(row[0], "heyhey") |
|
self.assertEqual(row[1], "yoyo") |
|
|
|
upcaseConverter = lambda aStringField: aStringField.upper() |
|
assert upcaseConverter("upThis") == "UPTHIS" |
|
|
|
|
|
rows.converters[1] = upcaseConverter |
|
self.assertEqual(row[0], "heyhey") |
|
self.assertEqual(row[1], "YO") |
|
|
|
finally: |
|
try: |
|
del conn.variantConversions |
|
except: |
|
pass |
|
self.helpRollbackTblTemp() |
|
|
|
def testUserDefinedConversionForExactNumericTypes(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not self.remote and sys.version_info < (3, 0): |
|
oldconverter = adodbapi.variantConversions[ |
|
ado_consts.adNumeric |
|
] |
|
|
|
|
|
try: |
|
adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtFloat |
|
self.helpTestDataType( |
|
"decimal(18,2)", "NUMBER", 3.45, compareAlmostEqual=1 |
|
) |
|
self.helpTestDataType( |
|
"numeric(18,2)", "NUMBER", 3.45, compareAlmostEqual=1 |
|
) |
|
|
|
adodbapi.variantConversions[ado_consts.adNumeric] = adodbapi.cvtString |
|
self.helpTestDataType("numeric(18,2)", "NUMBER", "3.45") |
|
|
|
adodbapi.variantConversions[ado_consts.adNumeric] = ( |
|
lambda x: "!!This function returns a funny unicode string %s!!" % x |
|
) |
|
self.helpTestDataType( |
|
"numeric(18,2)", |
|
"NUMBER", |
|
"3.45", |
|
allowedReturnValues=[ |
|
"!!This function returns a funny unicode string 3.45!!" |
|
], |
|
) |
|
finally: |
|
|
|
adodbapi.variantConversions[ |
|
ado_consts.adNumeric |
|
] = oldconverter |
|
|
|
def helpTestDataType( |
|
self, |
|
sqlDataTypeString, |
|
DBAPIDataTypeString, |
|
pyData, |
|
pyDataInputAlternatives=None, |
|
compareAlmostEqual=None, |
|
allowedReturnValues=None, |
|
): |
|
self.helpForceDropOnTblTemp() |
|
conn = self.getConnection() |
|
crsr = conn.cursor() |
|
tabdef = ( |
|
""" |
|
CREATE TABLE xx_%s ( |
|
fldId integer NOT NULL, |
|
fldData """ |
|
% config.tmp |
|
+ sqlDataTypeString |
|
+ ")\n" |
|
) |
|
|
|
crsr.execute(tabdef) |
|
|
|
|
|
crsr.execute("INSERT INTO xx_%s (fldId) VALUES (1)" % config.tmp) |
|
|
|
crsr.execute("SELECT fldId,fldData FROM xx_%s" % config.tmp) |
|
rs = crsr.fetchone() |
|
self.assertEqual(rs[1], None) |
|
assert rs[0] == 1 |
|
|
|
|
|
descTuple = crsr.description[1] |
|
assert descTuple[0] in ["fldData", "flddata"], 'was "%s" expected "%s"' % ( |
|
descTuple[0], |
|
"fldData", |
|
) |
|
|
|
if DBAPIDataTypeString == "STRING": |
|
assert descTuple[1] == api.STRING, 'was "%s" expected "%s"' % ( |
|
descTuple[1], |
|
api.STRING.values, |
|
) |
|
elif DBAPIDataTypeString == "NUMBER": |
|
assert descTuple[1] == api.NUMBER, 'was "%s" expected "%s"' % ( |
|
descTuple[1], |
|
api.NUMBER.values, |
|
) |
|
elif DBAPIDataTypeString == "BINARY": |
|
assert descTuple[1] == api.BINARY, 'was "%s" expected "%s"' % ( |
|
descTuple[1], |
|
api.BINARY.values, |
|
) |
|
elif DBAPIDataTypeString == "DATETIME": |
|
assert descTuple[1] == api.DATETIME, 'was "%s" expected "%s"' % ( |
|
descTuple[1], |
|
api.DATETIME.values, |
|
) |
|
elif DBAPIDataTypeString == "ROWID": |
|
assert descTuple[1] == api.ROWID, 'was "%s" expected "%s"' % ( |
|
descTuple[1], |
|
api.ROWID.values, |
|
) |
|
elif DBAPIDataTypeString == "UUID": |
|
assert descTuple[1] == api.OTHER, 'was "%s" expected "%s"' % ( |
|
descTuple[1], |
|
api.OTHER.values, |
|
) |
|
else: |
|
raise NotImplementedError |
|
|
|
|
|
inputs = [pyData] |
|
if pyDataInputAlternatives: |
|
inputs.extend(pyDataInputAlternatives) |
|
inputs = set(inputs) |
|
fldId = 1 |
|
for inParam in inputs: |
|
fldId += 1 |
|
try: |
|
crsr.execute( |
|
"INSERT INTO xx_%s (fldId,fldData) VALUES (?,?)" % config.tmp, |
|
(fldId, inParam), |
|
) |
|
except: |
|
if self.remote: |
|
for message in crsr.messages: |
|
print(message) |
|
else: |
|
conn.printADOerrors() |
|
raise |
|
crsr.execute( |
|
"SELECT fldData FROM xx_%s WHERE ?=fldID" % config.tmp, [fldId] |
|
) |
|
rs = crsr.fetchone() |
|
if allowedReturnValues: |
|
allowedTypes = tuple([type(aRV) for aRV in allowedReturnValues]) |
|
assert isinstance( |
|
rs[0], allowedTypes |
|
), 'result type "%s" must be one of %s' % (type(rs[0]), allowedTypes) |
|
else: |
|
assert isinstance( |
|
rs[0], type(pyData) |
|
), 'result type "%s" must be instance of %s' % ( |
|
type(rs[0]), |
|
type(pyData), |
|
) |
|
|
|
if compareAlmostEqual and DBAPIDataTypeString == "DATETIME": |
|
iso1 = adodbapi.dateconverter.DateObjectToIsoFormatString(rs[0]) |
|
iso2 = adodbapi.dateconverter.DateObjectToIsoFormatString(pyData) |
|
self.assertEqual(iso1, iso2) |
|
elif compareAlmostEqual: |
|
s = float(pyData) |
|
v = float(rs[0]) |
|
assert ( |
|
abs(v - s) / s < 0.00001 |
|
), "Values not almost equal recvd=%s, expected=%f" % (rs[0], s) |
|
else: |
|
if allowedReturnValues: |
|
ok = False |
|
self.assertTrue( |
|
rs[0] in allowedReturnValues, |
|
'Value "%s" not in %s' % (repr(rs[0]), allowedReturnValues), |
|
) |
|
else: |
|
self.assertEqual( |
|
rs[0], |
|
pyData, |
|
'Values are not equal recvd="%s", expected="%s"' |
|
% (rs[0], pyData), |
|
) |
|
|
|
def testDataTypeFloat(self): |
|
self.helpTestDataType("real", "NUMBER", 3.45, compareAlmostEqual=True) |
|
self.helpTestDataType("float", "NUMBER", 1.79e37, compareAlmostEqual=True) |
|
|
|
def testDataTypeDecmal(self): |
|
self.helpTestDataType( |
|
"decimal(18,2)", |
|
"NUMBER", |
|
3.45, |
|
allowedReturnValues=["3.45", "3,45", decimal.Decimal("3.45")], |
|
) |
|
self.helpTestDataType( |
|
"numeric(18,2)", |
|
"NUMBER", |
|
3.45, |
|
allowedReturnValues=["3.45", "3,45", decimal.Decimal("3.45")], |
|
) |
|
self.helpTestDataType( |
|
"decimal(20,2)", |
|
"NUMBER", |
|
444444444444444444, |
|
allowedReturnValues=[ |
|
"444444444444444444.00", |
|
"444444444444444444,00", |
|
decimal.Decimal("444444444444444444"), |
|
], |
|
) |
|
if self.getEngine() == "MSSQL": |
|
self.helpTestDataType( |
|
"uniqueidentifier", |
|
"UUID", |
|
"{71A4F49E-39F3-42B1-A41E-48FF154996E6}", |
|
allowedReturnValues=["{71A4F49E-39F3-42B1-A41E-48FF154996E6}"], |
|
) |
|
|
|
def testDataTypeMoney(self): |
|
if self.getEngine() == "MySQL": |
|
self.helpTestDataType( |
|
"DECIMAL(20,4)", "NUMBER", decimal.Decimal("-922337203685477.5808") |
|
) |
|
elif self.getEngine() == "PostgreSQL": |
|
self.helpTestDataType( |
|
"money", |
|
"NUMBER", |
|
decimal.Decimal("-922337203685477.5808"), |
|
compareAlmostEqual=True, |
|
allowedReturnValues=[ |
|
-922337203685477.5808, |
|
decimal.Decimal("-922337203685477.5808"), |
|
], |
|
) |
|
else: |
|
self.helpTestDataType("smallmoney", "NUMBER", decimal.Decimal("214748.02")) |
|
self.helpTestDataType( |
|
"money", "NUMBER", decimal.Decimal("-922337203685477.5808") |
|
) |
|
|
|
def testDataTypeInt(self): |
|
if self.getEngine() != "PostgreSQL": |
|
self.helpTestDataType("tinyint", "NUMBER", 115) |
|
self.helpTestDataType("smallint", "NUMBER", -32768) |
|
if self.getEngine() not in ["ACCESS", "PostgreSQL"]: |
|
self.helpTestDataType( |
|
"bit", "NUMBER", 1 |
|
) |
|
if self.getEngine() in ["MSSQL", "PostgreSQL"]: |
|
self.helpTestDataType( |
|
"bigint", |
|
"NUMBER", |
|
3000000000, |
|
allowedReturnValues=[3000000000, int(3000000000)], |
|
) |
|
self.helpTestDataType("int", "NUMBER", 2147483647) |
|
|
|
def testDataTypeChar(self): |
|
for sqlDataType in ("char(6)", "nchar(6)"): |
|
self.helpTestDataType( |
|
sqlDataType, |
|
"STRING", |
|
"spam ", |
|
allowedReturnValues=["spam", "spam", "spam ", "spam "], |
|
) |
|
|
|
def testDataTypeVarChar(self): |
|
if self.getEngine() == "MySQL": |
|
stringKinds = ["varchar(10)", "text"] |
|
elif self.getEngine() == "PostgreSQL": |
|
stringKinds = ["varchar(10)", "text", "character varying"] |
|
else: |
|
stringKinds = [ |
|
"varchar(10)", |
|
"nvarchar(10)", |
|
"text", |
|
"ntext", |
|
] |
|
|
|
for sqlDataType in stringKinds: |
|
self.helpTestDataType(sqlDataType, "STRING", "spam", ["spam"]) |
|
|
|
def testDataTypeDate(self): |
|
if self.getEngine() == "PostgreSQL": |
|
dt = "timestamp" |
|
else: |
|
dt = "datetime" |
|
self.helpTestDataType( |
|
dt, "DATETIME", adodbapi.Date(2002, 10, 28), compareAlmostEqual=True |
|
) |
|
if self.getEngine() not in ["MySQL", "PostgreSQL"]: |
|
self.helpTestDataType( |
|
"smalldatetime", |
|
"DATETIME", |
|
adodbapi.Date(2002, 10, 28), |
|
compareAlmostEqual=True, |
|
) |
|
if tag != "pythontime" and self.getEngine() not in [ |
|
"MySQL", |
|
"PostgreSQL", |
|
]: |
|
self.helpTestDataType( |
|
dt, |
|
"DATETIME", |
|
adodbapi.Timestamp(2002, 10, 28, 12, 15, 1), |
|
compareAlmostEqual=True, |
|
) |
|
|
|
def testDataTypeBinary(self): |
|
binfld = str2bytes("\x07\x00\xE2\x40*") |
|
arv = [binfld, adodbapi.Binary(binfld), bytes(binfld)] |
|
if self.getEngine() == "PostgreSQL": |
|
self.helpTestDataType( |
|
"bytea", "BINARY", adodbapi.Binary(binfld), allowedReturnValues=arv |
|
) |
|
else: |
|
self.helpTestDataType( |
|
"binary(5)", "BINARY", adodbapi.Binary(binfld), allowedReturnValues=arv |
|
) |
|
self.helpTestDataType( |
|
"varbinary(100)", |
|
"BINARY", |
|
adodbapi.Binary(binfld), |
|
allowedReturnValues=arv, |
|
) |
|
if self.getEngine() != "MySQL": |
|
self.helpTestDataType( |
|
"image", "BINARY", adodbapi.Binary(binfld), allowedReturnValues=arv |
|
) |
|
|
|
def helpRollbackTblTemp(self): |
|
self.helpForceDropOnTblTemp() |
|
|
|
def helpForceDropOnTblTemp(self): |
|
conn = self.getConnection() |
|
with conn.cursor() as crsr: |
|
try: |
|
crsr.execute("DROP TABLE xx_%s" % config.tmp) |
|
if not conn.autocommit: |
|
conn.commit() |
|
except: |
|
pass |
|
|
|
def helpCreateAndPopulateTableTemp(self, crsr): |
|
tabdef = ( |
|
""" |
|
CREATE TABLE xx_%s ( |
|
fldData INTEGER |
|
) |
|
""" |
|
% config.tmp |
|
) |
|
try: |
|
crsr.execute(tabdef) |
|
except api.DatabaseError: |
|
self.helpForceDropOnTblTemp() |
|
crsr.execute(tabdef) |
|
for i in range(9): |
|
crsr.execute("INSERT INTO xx_%s (fldData) VALUES (%i)" % (config.tmp, i)) |
|
|
|
|
|
def testFetchAll(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 9 |
|
|
|
i = 3 |
|
for row in rs[3:-2]: |
|
assert row[0] == i |
|
i += 1 |
|
self.helpRollbackTblTemp() |
|
|
|
def testPreparedStatement(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.prepare("SELECT fldData FROM xx_%s" % config.tmp) |
|
crsr.execute(crsr.command) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 9 |
|
assert rs[2][0] == 2 |
|
self.helpRollbackTblTemp() |
|
|
|
def testWrongPreparedStatement(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.prepare("SELECT * FROM nowhere") |
|
crsr.execute( |
|
"SELECT fldData FROM xx_%s" % config.tmp |
|
) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 9 |
|
assert rs[2][0] == 2 |
|
self.helpRollbackTblTemp() |
|
|
|
def testIterator(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) |
|
for i, row in enumerate( |
|
crsr |
|
): |
|
assert row[0] == i |
|
self.helpRollbackTblTemp() |
|
|
|
def testExecuteMany(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
seq_of_values = [(111,), (222,)] |
|
crsr.executemany( |
|
"INSERT INTO xx_%s (fldData) VALUES (?)" % config.tmp, seq_of_values |
|
) |
|
if crsr.rowcount == -1: |
|
print( |
|
self.getEngine() |
|
+ " Provider does not support rowcount (on .executemany())" |
|
) |
|
else: |
|
self.assertEqual(crsr.rowcount, 2) |
|
crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 11 |
|
self.helpRollbackTblTemp() |
|
|
|
def testRowCount(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) |
|
if crsr.rowcount == -1: |
|
|
|
pass |
|
else: |
|
self.assertEqual(crsr.rowcount, 9) |
|
self.helpRollbackTblTemp() |
|
|
|
def testRowCountNoRecordset(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("DELETE FROM xx_%s WHERE fldData >= 5" % config.tmp) |
|
if crsr.rowcount == -1: |
|
print(self.getEngine() + " Provider does not support rowcount (on DELETE)") |
|
else: |
|
self.assertEqual(crsr.rowcount, 4) |
|
self.helpRollbackTblTemp() |
|
|
|
def testFetchMany(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) |
|
rs = crsr.fetchmany(3) |
|
assert len(rs) == 3 |
|
rs = crsr.fetchmany(5) |
|
assert len(rs) == 5 |
|
rs = crsr.fetchmany(5) |
|
assert len(rs) == 1 |
|
self.helpRollbackTblTemp() |
|
|
|
def testFetchManyWithArraySize(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("SELECT fldData FROM xx_%s" % config.tmp) |
|
rs = crsr.fetchmany() |
|
assert len(rs) == 1 |
|
crsr.arraysize = 4 |
|
rs = crsr.fetchmany() |
|
assert len(rs) == 4 |
|
rs = crsr.fetchmany() |
|
assert len(rs) == 4 |
|
rs = crsr.fetchmany() |
|
assert len(rs) == 0 |
|
self.helpRollbackTblTemp() |
|
|
|
def testErrorConnect(self): |
|
conn = self.getConnection() |
|
kw = {} |
|
if "proxy_host" in conn.kwargs: |
|
kw["proxy_host"] = conn.kwargs["proxy_host"] |
|
conn.close() |
|
self.assertRaises(api.DatabaseError, self.db, "not a valid connect string", kw) |
|
|
|
def testRowIterator(self): |
|
self.helpForceDropOnTblTemp() |
|
conn = self.getConnection() |
|
crsr = conn.cursor() |
|
tabdef = ( |
|
""" |
|
CREATE TABLE xx_%s ( |
|
fldId integer NOT NULL, |
|
fldTwo integer, |
|
fldThree integer, |
|
fldFour integer) |
|
""" |
|
% config.tmp |
|
) |
|
crsr.execute(tabdef) |
|
|
|
inputs = [(2, 3, 4), (102, 103, 104)] |
|
fldId = 1 |
|
for inParam in inputs: |
|
fldId += 1 |
|
try: |
|
crsr.execute( |
|
"INSERT INTO xx_%s (fldId,fldTwo,fldThree,fldFour) VALUES (?,?,?,?)" |
|
% config.tmp, |
|
(fldId, inParam[0], inParam[1], inParam[2]), |
|
) |
|
except: |
|
if self.remote: |
|
for message in crsr.messages: |
|
print(message) |
|
else: |
|
conn.printADOerrors() |
|
raise |
|
crsr.execute( |
|
"SELECT fldTwo,fldThree,fldFour FROM xx_%s WHERE ?=fldID" % config.tmp, |
|
[fldId], |
|
) |
|
rec = crsr.fetchone() |
|
|
|
for j in range(len(inParam)): |
|
assert ( |
|
rec[j] == inParam[j] |
|
), 'returned value:"%s" != test value:"%s"' % (rec[j], inParam[j]) |
|
|
|
assert tuple(rec) == inParam, 'returned value:"%s" != test value:"%s"' % ( |
|
repr(rec), |
|
repr(inParam), |
|
) |
|
|
|
slice1 = tuple(rec[:-1]) |
|
slice2 = tuple(inParam[0:2]) |
|
assert slice1 == slice2, 'returned value:"%s" != test value:"%s"' % ( |
|
repr(slice1), |
|
repr(slice2), |
|
) |
|
|
|
assert rec["fldTwo"] == inParam[0] |
|
assert rec.fldThree == inParam[1] |
|
assert rec.fldFour == inParam[2] |
|
|
|
|
|
crsr.execute("select fldThree,fldFour,fldTwo from xx_%s" % config.tmp) |
|
recs = crsr.fetchall() |
|
assert recs[1][0] == 103 |
|
assert recs[0][1] == 4 |
|
assert recs[1]["fldFour"] == 104 |
|
assert recs[0, 0] == 3 |
|
assert recs[0, "fldTwo"] == 2 |
|
assert recs[1, 2] == 102 |
|
for i in range(1): |
|
for j in range(2): |
|
assert recs[i][j] == recs[i, j] |
|
|
|
def testFormatParamstyle(self): |
|
self.helpForceDropOnTblTemp() |
|
conn = self.getConnection() |
|
conn.paramstyle = "format" |
|
crsr = conn.cursor() |
|
tabdef = ( |
|
""" |
|
CREATE TABLE xx_%s ( |
|
fldId integer NOT NULL, |
|
fldData varchar(10), |
|
fldConst varchar(30)) |
|
""" |
|
% config.tmp |
|
) |
|
crsr.execute(tabdef) |
|
|
|
inputs = ["one", "two", "three"] |
|
fldId = 2 |
|
for inParam in inputs: |
|
fldId += 1 |
|
sql = ( |
|
"INSERT INTO xx_" |
|
+ config.tmp |
|
+ " (fldId,fldConst,fldData) VALUES (%s,'thi%s :may cause? trouble', %s)" |
|
) |
|
try: |
|
crsr.execute(sql, (fldId, inParam)) |
|
except: |
|
if self.remote: |
|
for message in crsr.messages: |
|
print(message) |
|
else: |
|
conn.printADOerrors() |
|
raise |
|
crsr.execute( |
|
"SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE %s=fldID", |
|
[fldId], |
|
) |
|
rec = crsr.fetchone() |
|
self.assertEqual( |
|
rec[0], |
|
inParam, |
|
'returned value:"%s" != test value:"%s"' % (rec[0], inParam), |
|
) |
|
self.assertEqual(rec[1], "thi%s :may cause? trouble") |
|
|
|
|
|
sel = ( |
|
"insert into xx_" + config.tmp + " (fldId,fldData) VALUES (%s,'four%sfive')" |
|
) |
|
params = (20,) |
|
crsr.execute(sel, params) |
|
|
|
|
|
assert "(?," in crsr.query, 'expected:"%s" in "%s"' % ("(?,", crsr.query) |
|
|
|
assert crsr.command == sel, 'expected:"%s" but found "%s"' % (sel, crsr.command) |
|
|
|
|
|
if not self.remote: |
|
self.assertEqual(crsr.parameters, params) |
|
|
|
crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=20" % config.tmp) |
|
rec = crsr.fetchone() |
|
self.assertEqual(rec[0], "four%sfive") |
|
|
|
def testNamedParamstyle(self): |
|
self.helpForceDropOnTblTemp() |
|
conn = self.getConnection() |
|
crsr = conn.cursor() |
|
crsr.paramstyle = "named" |
|
tabdef = ( |
|
""" |
|
CREATE TABLE xx_%s ( |
|
fldId integer NOT NULL, |
|
fldData varchar(10)) |
|
""" |
|
% config.tmp |
|
) |
|
crsr.execute(tabdef) |
|
|
|
inputs = ["four", "five", "six"] |
|
fldId = 10 |
|
for inParam in inputs: |
|
fldId += 1 |
|
try: |
|
crsr.execute( |
|
"INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" |
|
% config.tmp, |
|
{"f_Val": inParam, "Id": fldId}, |
|
) |
|
except: |
|
if self.remote: |
|
for message in crsr.messages: |
|
print(message) |
|
else: |
|
conn.printADOerrors() |
|
raise |
|
crsr.execute( |
|
"SELECT fldData FROM xx_%s WHERE fldID=:Id" % config.tmp, {"Id": fldId} |
|
) |
|
rec = crsr.fetchone() |
|
self.assertEqual( |
|
rec[0], |
|
inParam, |
|
'returned value:"%s" != test value:"%s"' % (rec[0], inParam), |
|
) |
|
|
|
crsr.execute( |
|
"insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp, |
|
{"xyz": 30}, |
|
) |
|
crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp) |
|
rec = crsr.fetchone() |
|
self.assertEqual(rec[0], "six:five") |
|
|
|
def testPyformatParamstyle(self): |
|
self.helpForceDropOnTblTemp() |
|
conn = self.getConnection() |
|
crsr = conn.cursor() |
|
crsr.paramstyle = "pyformat" |
|
tabdef = ( |
|
""" |
|
CREATE TABLE xx_%s ( |
|
fldId integer NOT NULL, |
|
fldData varchar(10)) |
|
""" |
|
% config.tmp |
|
) |
|
crsr.execute(tabdef) |
|
|
|
inputs = ["four", "five", "six"] |
|
fldId = 10 |
|
for inParam in inputs: |
|
fldId += 1 |
|
try: |
|
crsr.execute( |
|
"INSERT INTO xx_%s (fldId,fldData) VALUES (%%(Id)s,%%(f_Val)s)" |
|
% config.tmp, |
|
{"f_Val": inParam, "Id": fldId}, |
|
) |
|
except: |
|
if self.remote: |
|
for message in crsr.messages: |
|
print(message) |
|
else: |
|
conn.printADOerrors() |
|
raise |
|
crsr.execute( |
|
"SELECT fldData FROM xx_%s WHERE fldID=%%(Id)s" % config.tmp, |
|
{"Id": fldId}, |
|
) |
|
rec = crsr.fetchone() |
|
self.assertEqual( |
|
rec[0], |
|
inParam, |
|
'returned value:"%s" != test value:"%s"' % (rec[0], inParam), |
|
) |
|
|
|
crsr.execute( |
|
"insert into xx_%s (fldId,fldData) VALUES (%%(xyz)s,'six%%five')" |
|
% config.tmp, |
|
{"xyz": 30}, |
|
) |
|
crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp) |
|
rec = crsr.fetchone() |
|
self.assertEqual(rec[0], "six%five") |
|
|
|
def testAutomaticParamstyle(self): |
|
self.helpForceDropOnTblTemp() |
|
conn = self.getConnection() |
|
conn.paramstyle = "dynamic" |
|
crsr = conn.cursor() |
|
tabdef = ( |
|
""" |
|
CREATE TABLE xx_%s ( |
|
fldId integer NOT NULL, |
|
fldData varchar(10), |
|
fldConst varchar(30)) |
|
""" |
|
% config.tmp |
|
) |
|
crsr.execute(tabdef) |
|
inputs = ["one", "two", "three"] |
|
fldId = 2 |
|
for inParam in inputs: |
|
fldId += 1 |
|
try: |
|
crsr.execute( |
|
"INSERT INTO xx_" |
|
+ config.tmp |
|
+ " (fldId,fldConst,fldData) VALUES (?,'thi%s :may cause? troub:1e', ?)", |
|
(fldId, inParam), |
|
) |
|
except: |
|
if self.remote: |
|
for message in crsr.messages: |
|
print(message) |
|
else: |
|
conn.printADOerrors() |
|
raise |
|
trouble = "thi%s :may cause? troub:1e" |
|
crsr.execute( |
|
"SELECT fldData, fldConst FROM xx_" + config.tmp + " WHERE ?=fldID", |
|
[fldId], |
|
) |
|
rec = crsr.fetchone() |
|
self.assertEqual( |
|
rec[0], |
|
inParam, |
|
'returned value:"%s" != test value:"%s"' % (rec[0], inParam), |
|
) |
|
self.assertEqual(rec[1], trouble) |
|
|
|
fldId = 10 |
|
for inParam in inputs: |
|
fldId += 1 |
|
try: |
|
crsr.execute( |
|
"INSERT INTO xx_%s (fldId,fldData) VALUES (:Id,:f_Val)" |
|
% config.tmp, |
|
{"f_Val": inParam, "Id": fldId}, |
|
) |
|
except: |
|
if self.remote: |
|
for message in crsr.messages: |
|
print(message) |
|
else: |
|
conn.printADOerrors() |
|
raise |
|
crsr.execute( |
|
"SELECT fldData FROM xx_%s WHERE :Id=fldID" % config.tmp, {"Id": fldId} |
|
) |
|
rec = crsr.fetchone() |
|
self.assertEqual( |
|
rec[0], |
|
inParam, |
|
'returned value:"%s" != test value:"%s"' % (rec[0], inParam), |
|
) |
|
|
|
ppdcmd = ( |
|
"insert into xx_%s (fldId,fldData) VALUES (:xyz,'six:five')" % config.tmp |
|
) |
|
crsr.prepare(ppdcmd) |
|
crsr.execute(ppdcmd, {"xyz": 30}) |
|
crsr.execute("SELECT fldData FROM xx_%s WHERE fldID=30" % config.tmp) |
|
rec = crsr.fetchone() |
|
self.assertEqual(rec[0], "six:five") |
|
|
|
def testRollBack(self): |
|
conn = self.getConnection() |
|
crsr = conn.cursor() |
|
assert not crsr.connection.autocommit, "Unexpected beginning condition" |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.connection.commit() |
|
|
|
crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) |
|
|
|
selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp |
|
crsr.execute(selectSql) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 1 |
|
self.conn.rollback() |
|
crsr.execute(selectSql) |
|
assert ( |
|
crsr.fetchone() == None |
|
), "cursor.fetchone should return None if a query retrieves no rows" |
|
crsr.execute("SELECT fldData from xx_%s" % config.tmp) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 9, "the original records should still be present" |
|
self.helpRollbackTblTemp() |
|
|
|
def testCommit(self): |
|
try: |
|
con2 = self.getAnotherConnection() |
|
except NotImplementedError: |
|
return |
|
assert not con2.autocommit, "default should be manual commit" |
|
crsr = con2.cursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
|
|
crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) |
|
con2.commit() |
|
|
|
selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp |
|
crsr.execute(selectSql) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 1 |
|
crsr.close() |
|
con2.close() |
|
conn = self.getConnection() |
|
crsr = self.getCursor() |
|
with conn.cursor() as crsr: |
|
crsr.execute(selectSql) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 1 |
|
assert rs[0][0] == 100 |
|
self.helpRollbackTblTemp() |
|
|
|
def testAutoRollback(self): |
|
try: |
|
con2 = self.getAnotherConnection() |
|
except NotImplementedError: |
|
return |
|
assert not con2.autocommit, "unexpected beginning condition" |
|
crsr = con2.cursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) |
|
selectSql = "SELECT fldData FROM xx_%s WHERE fldData=100" % config.tmp |
|
crsr.execute(selectSql) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 1 |
|
crsr.close() |
|
con2.close() |
|
crsr = self.getCursor() |
|
try: |
|
crsr.execute( |
|
selectSql |
|
) |
|
row = crsr.fetchone() |
|
except api.DatabaseError: |
|
row = None |
|
assert row == None, ( |
|
"cursor.fetchone should return None if a query retrieves no rows. Got %s" |
|
% repr(row) |
|
) |
|
self.helpRollbackTblTemp() |
|
|
|
def testAutoCommit(self): |
|
try: |
|
ac_conn = self.getAnotherConnection({"autocommit": True}) |
|
except NotImplementedError: |
|
return |
|
crsr = ac_conn.cursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) |
|
crsr.close() |
|
with self.getCursor() as crsr: |
|
selectSql = "SELECT fldData from xx_%s" % config.tmp |
|
crsr.execute( |
|
selectSql |
|
) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 10, "all records should still be present" |
|
ac_conn.close() |
|
self.helpRollbackTblTemp() |
|
|
|
def testSwitchedAutoCommit(self): |
|
try: |
|
ac_conn = self.getAnotherConnection() |
|
except NotImplementedError: |
|
return |
|
ac_conn.autocommit = True |
|
crsr = ac_conn.cursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
crsr.execute("INSERT INTO xx_%s (fldData) VALUES(100)" % config.tmp) |
|
crsr.close() |
|
conn = self.getConnection() |
|
ac_conn.close() |
|
with self.getCursor() as crsr: |
|
selectSql = "SELECT fldData from xx_%s" % config.tmp |
|
crsr.execute( |
|
selectSql |
|
) |
|
rs = crsr.fetchall() |
|
assert len(rs) == 10, "all records should still be present" |
|
self.helpRollbackTblTemp() |
|
|
|
def testExtendedTypeHandling(self): |
|
class XtendString(str): |
|
pass |
|
|
|
class XtendInt(int): |
|
pass |
|
|
|
class XtendFloat(float): |
|
pass |
|
|
|
xs = XtendString(randomstring(30)) |
|
xi = XtendInt(random.randint(-100, 500)) |
|
xf = XtendFloat(random.random()) |
|
self.helpForceDropOnTblTemp() |
|
conn = self.getConnection() |
|
crsr = conn.cursor() |
|
tabdef = ( |
|
""" |
|
CREATE TABLE xx_%s ( |
|
s VARCHAR(40) NOT NULL, |
|
i INTEGER NOT NULL, |
|
f REAL NOT NULL)""" |
|
% config.tmp |
|
) |
|
crsr.execute(tabdef) |
|
crsr.execute( |
|
"INSERT INTO xx_%s (s, i, f) VALUES (?, ?, ?)" % config.tmp, (xs, xi, xf) |
|
) |
|
crsr.close() |
|
conn = self.getConnection() |
|
with self.getCursor() as crsr: |
|
selectSql = "SELECT s, i, f from xx_%s" % config.tmp |
|
crsr.execute( |
|
selectSql |
|
) |
|
row = crsr.fetchone() |
|
self.assertEqual(row.s, xs) |
|
self.assertEqual(row.i, xi) |
|
self.assertAlmostEqual(row.f, xf) |
|
self.helpRollbackTblTemp() |
|
|
|
|
|
class TestADOwithSQLServer(CommonDBTests): |
|
def setUp(self): |
|
self.conn = config.dbSqlServerconnect( |
|
*config.connStrSQLServer[0], **config.connStrSQLServer[1] |
|
) |
|
self.conn.timeout = 30 |
|
self.engine = "MSSQL" |
|
self.db = config.dbSqlServerconnect |
|
self.remote = config.connStrSQLServer[2] |
|
|
|
def tearDown(self): |
|
try: |
|
self.conn.rollback() |
|
except: |
|
pass |
|
try: |
|
self.conn.close() |
|
except: |
|
pass |
|
self.conn = None |
|
|
|
def getConnection(self): |
|
return self.conn |
|
|
|
def getAnotherConnection(self, addkeys=None): |
|
keys = dict(config.connStrSQLServer[1]) |
|
if addkeys: |
|
keys.update(addkeys) |
|
return config.dbSqlServerconnect(*config.connStrSQLServer[0], **keys) |
|
|
|
def testVariableReturningStoredProcedure(self): |
|
crsr = self.conn.cursor() |
|
spdef = """ |
|
CREATE PROCEDURE sp_DeleteMeOnlyForTesting |
|
@theInput varchar(50), |
|
@theOtherInput varchar(50), |
|
@theOutput varchar(100) OUTPUT |
|
AS |
|
SET @theOutput=@theInput+@theOtherInput |
|
""" |
|
try: |
|
crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting") |
|
self.conn.commit() |
|
except: |
|
pass |
|
crsr.execute(spdef) |
|
|
|
retvalues = crsr.callproc( |
|
"sp_DeleteMeOnlyForTesting", ("Dodsworth", "Anne", " ") |
|
) |
|
assert retvalues[0] == "Dodsworth", '%s is not "Dodsworth"' % repr(retvalues[0]) |
|
assert retvalues[1] == "Anne", '%s is not "Anne"' % repr(retvalues[1]) |
|
assert retvalues[2] == "DodsworthAnne", '%s is not "DodsworthAnne"' % repr( |
|
retvalues[2] |
|
) |
|
self.conn.rollback() |
|
|
|
def testMultipleSetReturn(self): |
|
crsr = self.getCursor() |
|
self.helpCreateAndPopulateTableTemp(crsr) |
|
|
|
spdef = """ |
|
CREATE PROCEDURE sp_DeleteMe_OnlyForTesting |
|
AS |
|
SELECT fldData FROM xx_%s ORDER BY fldData ASC |
|
SELECT fldData From xx_%s where fldData = -9999 |
|
SELECT fldData FROM xx_%s ORDER BY fldData DESC |
|
""" % ( |
|
config.tmp, |
|
config.tmp, |
|
config.tmp, |
|
) |
|
try: |
|
crsr.execute("DROP PROCEDURE sp_DeleteMe_OnlyForTesting") |
|
self.conn.commit() |
|
except: |
|
pass |
|
crsr.execute(spdef) |
|
|
|
retvalues = crsr.callproc("sp_DeleteMe_OnlyForTesting") |
|
row = crsr.fetchone() |
|
self.assertEqual(row[0], 0) |
|
assert crsr.nextset() == True, "Operation should succeed" |
|
assert not crsr.fetchall(), "Should be an empty second set" |
|
assert crsr.nextset() == True, "third set should be present" |
|
rowdesc = crsr.fetchall() |
|
self.assertEqual(rowdesc[0][0], 8) |
|
assert crsr.nextset() == None, "No more return sets, should return None" |
|
|
|
self.helpRollbackTblTemp() |
|
|
|
def testDatetimeProcedureParameter(self): |
|
crsr = self.conn.cursor() |
|
spdef = """ |
|
CREATE PROCEDURE sp_DeleteMeOnlyForTesting |
|
@theInput DATETIME, |
|
@theOtherInput varchar(50), |
|
@theOutput varchar(100) OUTPUT |
|
AS |
|
SET @theOutput = CONVERT(CHARACTER(20), @theInput, 0) + @theOtherInput |
|
""" |
|
try: |
|
crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting") |
|
self.conn.commit() |
|
except: |
|
pass |
|
crsr.execute(spdef) |
|
|
|
result = crsr.callproc( |
|
"sp_DeleteMeOnlyForTesting", |
|
[adodbapi.Timestamp(2014, 12, 25, 0, 1, 0), "Beep", " " * 30], |
|
) |
|
|
|
assert result[2] == "Dec 25 2014 12:01AM Beep", 'value was="%s"' % result[2] |
|
self.conn.rollback() |
|
|
|
def testIncorrectStoredProcedureParameter(self): |
|
crsr = self.conn.cursor() |
|
spdef = """ |
|
CREATE PROCEDURE sp_DeleteMeOnlyForTesting |
|
@theInput DATETIME, |
|
@theOtherInput varchar(50), |
|
@theOutput varchar(100) OUTPUT |
|
AS |
|
SET @theOutput = CONVERT(CHARACTER(20), @theInput) + @theOtherInput |
|
""" |
|
try: |
|
crsr.execute("DROP PROCEDURE sp_DeleteMeOnlyForTesting") |
|
self.conn.commit() |
|
except: |
|
pass |
|
crsr.execute(spdef) |
|
|
|
|
|
result = tryconnection.try_operation_with_expected_exception( |
|
(api.DataError, api.DatabaseError), |
|
crsr.callproc, |
|
["sp_DeleteMeOnlyForTesting"], |
|
{"parameters": ["this is wrong", "Anne", "not Alice"]}, |
|
) |
|
if result[0]: |
|
assert "@theInput" in str(result[1]) or "DatabaseError" in str( |
|
result |
|
), "Identifies the wrong erroneous parameter" |
|
else: |
|
assert result[0], result[1] |
|
self.conn.rollback() |
|
|
|
|
|
class TestADOwithAccessDB(CommonDBTests): |
|
def setUp(self): |
|
self.conn = config.dbAccessconnect( |
|
*config.connStrAccess[0], **config.connStrAccess[1] |
|
) |
|
self.conn.timeout = 30 |
|
self.engine = "ACCESS" |
|
self.db = config.dbAccessconnect |
|
self.remote = config.connStrAccess[2] |
|
|
|
def tearDown(self): |
|
try: |
|
self.conn.rollback() |
|
except: |
|
pass |
|
try: |
|
self.conn.close() |
|
except: |
|
pass |
|
self.conn = None |
|
|
|
def getConnection(self): |
|
return self.conn |
|
|
|
def getAnotherConnection(self, addkeys=None): |
|
raise NotImplementedError("Jet cannot use a second connection to the database") |
|
|
|
def testOkConnect(self): |
|
c = self.db(*config.connStrAccess[0], **config.connStrAccess[1]) |
|
assert c != None |
|
c.close() |
|
|
|
|
|
class TestADOwithMySql(CommonDBTests): |
|
def setUp(self): |
|
self.conn = config.dbMySqlconnect( |
|
*config.connStrMySql[0], **config.connStrMySql[1] |
|
) |
|
self.conn.timeout = 30 |
|
self.engine = "MySQL" |
|
self.db = config.dbMySqlconnect |
|
self.remote = config.connStrMySql[2] |
|
|
|
def tearDown(self): |
|
try: |
|
self.conn.rollback() |
|
except: |
|
pass |
|
try: |
|
self.conn.close() |
|
except: |
|
pass |
|
self.conn = None |
|
|
|
def getConnection(self): |
|
return self.conn |
|
|
|
def getAnotherConnection(self, addkeys=None): |
|
keys = dict(config.connStrMySql[1]) |
|
if addkeys: |
|
keys.update(addkeys) |
|
return config.dbMySqlconnect(*config.connStrMySql[0], **keys) |
|
|
|
def testOkConnect(self): |
|
c = self.db(*config.connStrMySql[0], **config.connStrMySql[1]) |
|
assert c != None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestADOwithPostgres(CommonDBTests): |
|
def setUp(self): |
|
self.conn = config.dbPostgresConnect( |
|
*config.connStrPostgres[0], **config.connStrPostgres[1] |
|
) |
|
self.conn.timeout = 30 |
|
self.engine = "PostgreSQL" |
|
self.db = config.dbPostgresConnect |
|
self.remote = config.connStrPostgres[2] |
|
|
|
def tearDown(self): |
|
try: |
|
self.conn.rollback() |
|
except: |
|
pass |
|
try: |
|
self.conn.close() |
|
except: |
|
pass |
|
self.conn = None |
|
|
|
def getConnection(self): |
|
return self.conn |
|
|
|
def getAnotherConnection(self, addkeys=None): |
|
keys = dict(config.connStrPostgres[1]) |
|
if addkeys: |
|
keys.update(addkeys) |
|
return config.dbPostgresConnect(*config.connStrPostgres[0], **keys) |
|
|
|
def testOkConnect(self): |
|
c = self.db(*config.connStrPostgres[0], **config.connStrPostgres[1]) |
|
assert c != None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TimeConverterInterfaceTest(unittest.TestCase): |
|
def testIDate(self): |
|
assert self.tc.Date(1990, 2, 2) |
|
|
|
def testITime(self): |
|
assert self.tc.Time(13, 2, 2) |
|
|
|
def testITimestamp(self): |
|
assert self.tc.Timestamp(1990, 2, 2, 13, 2, 1) |
|
|
|
def testIDateObjectFromCOMDate(self): |
|
assert self.tc.DateObjectFromCOMDate(37435.7604282) |
|
|
|
def testICOMDate(self): |
|
assert hasattr(self.tc, "COMDate") |
|
|
|
def testExactDate(self): |
|
d = self.tc.Date(1994, 11, 15) |
|
comDate = self.tc.COMDate(d) |
|
correct = 34653.0 |
|
assert comDate == correct, comDate |
|
|
|
def testExactTimestamp(self): |
|
d = self.tc.Timestamp(1994, 11, 15, 12, 0, 0) |
|
comDate = self.tc.COMDate(d) |
|
correct = 34653.5 |
|
self.assertEqual(comDate, correct) |
|
|
|
d = self.tc.Timestamp(2003, 5, 6, 14, 15, 17) |
|
comDate = self.tc.COMDate(d) |
|
correct = 37747.593946759262 |
|
self.assertEqual(comDate, correct) |
|
|
|
def testIsoFormat(self): |
|
d = self.tc.Timestamp(1994, 11, 15, 12, 3, 10) |
|
iso = self.tc.DateObjectToIsoFormatString(d) |
|
self.assertEqual(str(iso[:19]), "1994-11-15 12:03:10") |
|
|
|
dt = self.tc.Date(2003, 5, 2) |
|
iso = self.tc.DateObjectToIsoFormatString(dt) |
|
self.assertEqual(str(iso[:10]), "2003-05-02") |
|
|
|
|
|
if config.doMxDateTimeTest: |
|
import mx.DateTime |
|
|
|
|
|
class TestMXDateTimeConverter(TimeConverterInterfaceTest): |
|
def setUp(self): |
|
self.tc = api.mxDateTimeConverter() |
|
|
|
def testCOMDate(self): |
|
t = mx.DateTime.DateTime(2002, 6, 28, 18, 15, 2) |
|
cmd = self.tc.COMDate(t) |
|
assert cmd == t.COMDate() |
|
|
|
def testDateObjectFromCOMDate(self): |
|
cmd = self.tc.DateObjectFromCOMDate(37435.7604282) |
|
t = mx.DateTime.DateTime(2002, 6, 28, 18, 15, 0) |
|
t2 = mx.DateTime.DateTime(2002, 6, 28, 18, 15, 2) |
|
assert t2 > cmd > t |
|
|
|
def testDate(self): |
|
assert mx.DateTime.Date(1980, 11, 4) == self.tc.Date(1980, 11, 4) |
|
|
|
def testTime(self): |
|
assert mx.DateTime.Time(13, 11, 4) == self.tc.Time(13, 11, 4) |
|
|
|
def testTimestamp(self): |
|
t = mx.DateTime.DateTime(2002, 6, 28, 18, 15, 1) |
|
obj = self.tc.Timestamp(2002, 6, 28, 18, 15, 1) |
|
assert t == obj |
|
|
|
|
|
import time |
|
|
|
|
|
class TestPythonTimeConverter(TimeConverterInterfaceTest): |
|
def setUp(self): |
|
self.tc = api.pythonTimeConverter() |
|
|
|
def testCOMDate(self): |
|
mk = time.mktime((2002, 6, 28, 18, 15, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) |
|
t = time.localtime(mk) |
|
|
|
cmd = self.tc.COMDate(t) |
|
assert abs(cmd - 37435.7604282) < 1.0 / 24, "%f more than an hour wrong" % cmd |
|
|
|
def testDateObjectFromCOMDate(self): |
|
cmd = self.tc.DateObjectFromCOMDate(37435.7604282) |
|
t1 = time.gmtime( |
|
time.mktime((2002, 6, 28, 0, 14, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) |
|
) |
|
|
|
t2 = time.gmtime( |
|
time.mktime((2002, 6, 29, 12, 14, 2, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) |
|
) |
|
assert t1 < cmd < t2, '"%s" should be about 2002-6-28 12:15:01' % repr(cmd) |
|
|
|
def testDate(self): |
|
t1 = time.mktime((2002, 6, 28, 18, 15, 1, 4, 31 + 28 + 31 + 30 + 31 + 30, 0)) |
|
t2 = time.mktime((2002, 6, 30, 18, 15, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, 0)) |
|
obj = self.tc.Date(2002, 6, 29) |
|
assert t1 < time.mktime(obj) < t2, obj |
|
|
|
def testTime(self): |
|
self.assertEqual( |
|
self.tc.Time(18, 15, 2), time.gmtime(18 * 60 * 60 + 15 * 60 + 2) |
|
) |
|
|
|
def testTimestamp(self): |
|
t1 = time.localtime( |
|
time.mktime((2002, 6, 28, 18, 14, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) |
|
) |
|
t2 = time.localtime( |
|
time.mktime((2002, 6, 28, 18, 16, 1, 4, 31 + 28 + 31 + 30 + 31 + 28, -1)) |
|
) |
|
obj = self.tc.Timestamp(2002, 6, 28, 18, 15, 2) |
|
assert t1 < obj < t2, obj |
|
|
|
|
|
class TestPythonDateTimeConverter(TimeConverterInterfaceTest): |
|
def setUp(self): |
|
self.tc = api.pythonDateTimeConverter() |
|
|
|
def testCOMDate(self): |
|
t = datetime.datetime(2002, 6, 28, 18, 15, 1) |
|
|
|
cmd = self.tc.COMDate(t) |
|
assert abs(cmd - 37435.7604282) < 1.0 / 24, "more than an hour wrong" |
|
|
|
def testDateObjectFromCOMDate(self): |
|
cmd = self.tc.DateObjectFromCOMDate(37435.7604282) |
|
t1 = datetime.datetime(2002, 6, 28, 18, 14, 1) |
|
t2 = datetime.datetime(2002, 6, 28, 18, 16, 1) |
|
assert t1 < cmd < t2, cmd |
|
|
|
tx = datetime.datetime( |
|
2002, 6, 28, 18, 14, 1, 900000 |
|
) |
|
c1 = self.tc.DateObjectFromCOMDate(self.tc.COMDate(tx)) |
|
assert t1 < c1 < t2, c1 |
|
|
|
def testDate(self): |
|
t1 = datetime.date(2002, 6, 28) |
|
t2 = datetime.date(2002, 6, 30) |
|
obj = self.tc.Date(2002, 6, 29) |
|
assert t1 < obj < t2, obj |
|
|
|
def testTime(self): |
|
self.assertEqual(self.tc.Time(18, 15, 2).isoformat()[:8], "18:15:02") |
|
|
|
def testTimestamp(self): |
|
t1 = datetime.datetime(2002, 6, 28, 18, 14, 1) |
|
t2 = datetime.datetime(2002, 6, 28, 18, 16, 1) |
|
obj = self.tc.Timestamp(2002, 6, 28, 18, 15, 2) |
|
assert t1 < obj < t2, obj |
|
|
|
|
|
suites = [] |
|
suites.append(unittest.makeSuite(TestPythonDateTimeConverter, "test")) |
|
if config.doMxDateTimeTest: |
|
suites.append(unittest.makeSuite(TestMXDateTimeConverter, "test")) |
|
if config.doTimeTest: |
|
suites.append(unittest.makeSuite(TestPythonTimeConverter, "test")) |
|
|
|
if config.doAccessTest: |
|
suites.append(unittest.makeSuite(TestADOwithAccessDB, "test")) |
|
if config.doSqlServerTest: |
|
suites.append(unittest.makeSuite(TestADOwithSQLServer, "test")) |
|
if config.doMySqlTest: |
|
suites.append(unittest.makeSuite(TestADOwithMySql, "test")) |
|
if config.doPostgresTest: |
|
suites.append(unittest.makeSuite(TestADOwithPostgres, "test")) |
|
|
|
|
|
class cleanup_manager(object): |
|
def __enter__(self): |
|
pass |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
config.cleanup(config.testfolder, config.mdb_name) |
|
|
|
|
|
suite = unittest.TestSuite(suites) |
|
if __name__ == "__main__": |
|
mysuite = copy.deepcopy(suite) |
|
with cleanup_manager(): |
|
defaultDateConverter = adodbapi.dateconverter |
|
print(__doc__) |
|
print("Default Date Converter is %s" % (defaultDateConverter,)) |
|
dateconverter = defaultDateConverter |
|
tag = "datetime" |
|
unittest.TextTestRunner().run(mysuite) |
|
|
|
if config.iterateOverTimeTests: |
|
for test, dateconverter, tag in ( |
|
(config.doTimeTest, api.pythonTimeConverter, "pythontime"), |
|
(config.doMxDateTimeTest, api.mxDateTimeConverter, "mx"), |
|
): |
|
if test: |
|
mysuite = copy.deepcopy( |
|
suite |
|
) |
|
adodbapi.adodbapi.dateconverter = dateconverter() |
|
print("Changed dateconverter to ") |
|
print(adodbapi.adodbapi.dateconverter) |
|
unittest.TextTestRunner().run(mysuite) |
|
|