Hackfut Security File Manager
Current Path:
/opt/alt/python37/lib64/python3.7/site-packages/psycopg2/tests
opt
/
alt
/
python37
/
lib64
/
python3.7
/
site-packages
/
psycopg2
/
tests
/
📁
..
📄
__init__.py
(3.03 KB)
📁
__pycache__
📄
dbapi20.py
(31.52 KB)
📄
dbapi20_tpc.py
(4.04 KB)
📄
test_async.py
(15.39 KB)
📄
test_bugX000.py
(1.64 KB)
📄
test_bug_gc.py
(1.68 KB)
📄
test_cancel.py
(3.64 KB)
📄
test_connection.py
(38.73 KB)
📄
test_copy.py
(11.65 KB)
📄
test_cursor.py
(20.8 KB)
📄
test_dates.py
(23.54 KB)
📄
test_errcodes.py
(2.09 KB)
📄
test_extras_dictcursor.py
(17.03 KB)
📄
test_green.py
(4.11 KB)
📄
test_lobject.py
(14.87 KB)
📄
test_module.py
(10.93 KB)
📄
test_notify.py
(6.98 KB)
📄
test_psycopg2_dbapi20.py
(1.9 KB)
📄
test_quote.py
(8.13 KB)
📄
test_transaction.py
(9.01 KB)
📄
test_types_basic.py
(18.23 KB)
📄
test_types_extras.py
(61.78 KB)
📄
test_with.py
(7.21 KB)
📄
testconfig.py
(1.22 KB)
📄
testutils.py
(10.88 KB)
Editing: dbapi20_tpc.py
""" Python DB API 2.0 driver Two Phase Commit compliance test suite. """ import unittest class TwoPhaseCommitTests(unittest.TestCase): driver = None def connect(self): """Make a database connection.""" raise NotImplementedError _last_id = 0 _global_id_prefix = "dbapi20_tpc:" def make_xid(self, con): id = TwoPhaseCommitTests._last_id TwoPhaseCommitTests._last_id += 1 return con.xid(42, "%s%d" % (self._global_id_prefix, id), "qualifier") def test_xid(self): con = self.connect() try: xid = con.xid(42, "global", "bqual") except self.driver.NotSupportedError: self.fail("Driver does not support transaction IDs.") self.assertEqual(xid[0], 42) self.assertEqual(xid[1], "global") self.assertEqual(xid[2], "bqual") # Try some extremes for the transaction ID: xid = con.xid(0, "", "") self.assertEqual(tuple(xid), (0, "", "")) xid = con.xid(0x7fffffff, "a" * 64, "b" * 64) self.assertEqual(tuple(xid), (0x7fffffff, "a" * 64, "b" * 64)) def test_tpc_begin(self): con = self.connect() try: xid = self.make_xid(con) try: con.tpc_begin(xid) except self.driver.NotSupportedError: self.fail("Driver does not support tpc_begin()") finally: con.close() def test_tpc_commit_without_prepare(self): con = self.connect() try: xid = self.make_xid(con) con.tpc_begin(xid) cursor = con.cursor() cursor.execute("SELECT 1") con.tpc_commit() finally: con.close() def test_tpc_rollback_without_prepare(self): con = self.connect() try: xid = self.make_xid(con) con.tpc_begin(xid) cursor = con.cursor() cursor.execute("SELECT 1") con.tpc_rollback() finally: con.close() def test_tpc_commit_with_prepare(self): con = self.connect() try: xid = self.make_xid(con) con.tpc_begin(xid) cursor = con.cursor() cursor.execute("SELECT 1") con.tpc_prepare() con.tpc_commit() finally: con.close() def test_tpc_rollback_with_prepare(self): con = self.connect() try: xid = self.make_xid(con) con.tpc_begin(xid) cursor = con.cursor() cursor.execute("SELECT 1") con.tpc_prepare() con.tpc_rollback() finally: con.close() def test_tpc_begin_in_transaction_fails(self): con = self.connect() try: xid = self.make_xid(con) cursor = con.cursor() cursor.execute("SELECT 1") self.assertRaises(self.driver.ProgrammingError, con.tpc_begin, xid) finally: con.close() def test_tpc_begin_in_tpc_transaction_fails(self): con = self.connect() try: xid = self.make_xid(con) cursor = con.cursor() cursor.execute("SELECT 1") self.assertRaises(self.driver.ProgrammingError, con.tpc_begin, xid) finally: con.close() def test_commit_in_tpc_fails(self): # calling commit() within a TPC transaction fails with # ProgrammingError. con = self.connect() try: xid = self.make_xid(con) con.tpc_begin(xid) self.assertRaises(self.driver.ProgrammingError, con.commit) finally: con.close() def test_rollback_in_tpc_fails(self): # calling rollback() within a TPC transaction fails with # ProgrammingError. con = self.connect() try: xid = self.make_xid(con) con.tpc_begin(xid) self.assertRaises(self.driver.ProgrammingError, con.rollback) finally: con.close()
Upload File
Create Folder