Hackfut Security File Manager
Current Path:
/opt/alt/python37/lib64/python3.7/site-packages/psycopg2/tests
opt
/
alt
/
python37
/
lib64
/
python3.7
/
site-packages
/
psycopg2
/
tests
/
📁
..
📄
__init__.py
(3.03 KB)
📁
__pycache__
📄
dbapi20.py
(31.52 KB)
📄
dbapi20_tpc.py
(4.04 KB)
📄
test_async.py
(15.39 KB)
📄
test_bugX000.py
(1.64 KB)
📄
test_bug_gc.py
(1.68 KB)
📄
test_cancel.py
(3.64 KB)
📄
test_connection.py
(38.73 KB)
📄
test_copy.py
(11.65 KB)
📄
test_cursor.py
(20.8 KB)
📄
test_dates.py
(23.54 KB)
📄
test_errcodes.py
(2.09 KB)
📄
test_extras_dictcursor.py
(17.03 KB)
📄
test_green.py
(4.11 KB)
📄
test_lobject.py
(14.87 KB)
📄
test_module.py
(10.93 KB)
📄
test_notify.py
(6.98 KB)
📄
test_psycopg2_dbapi20.py
(1.9 KB)
📄
test_quote.py
(8.13 KB)
📄
test_transaction.py
(9.01 KB)
📄
test_types_basic.py
(18.23 KB)
📄
test_types_extras.py
(61.78 KB)
📄
test_with.py
(7.21 KB)
📄
testconfig.py
(1.22 KB)
📄
testutils.py
(10.88 KB)
Editing: test_green.py
#!/usr/bin/env python # test_green.py - unit test for _async wait callback # # Copyright (C) 2010-2011 Daniele Varrazzo <daniele.varrazzo@gmail.com> # # psycopg2 is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # In addition, as a special exception, the copyright holders give # permission to link this program with the OpenSSL library (or with # modified versions of OpenSSL that use the same license as OpenSSL), # and distribute linked combinations including the two. # # You must obey the GNU Lesser General Public License in all respects for # all of the code used other than OpenSSL. # # psycopg2 is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. import unittest import psycopg2 import psycopg2.extensions import psycopg2.extras from .testutils import ConnectingTestCase class ConnectionStub(object): """A `connection` wrapper allowing analysis of the `poll()` calls.""" def __init__(self, conn): self.conn = conn self.polls = [] def fileno(self): return self.conn.fileno() def poll(self): rv = self.conn.poll() self.polls.append(rv) return rv class GreenTestCase(ConnectingTestCase): def setUp(self): self._cb = psycopg2.extensions.get_wait_callback() psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select) ConnectingTestCase.setUp(self) def tearDown(self): ConnectingTestCase.tearDown(self) psycopg2.extensions.set_wait_callback(self._cb) def set_stub_wait_callback(self, conn): stub = ConnectionStub(conn) psycopg2.extensions.set_wait_callback( lambda conn: psycopg2.extras.wait_select(stub)) return stub def test_flush_on_write(self): # a very large query requires a flush loop to be sent to the backend conn = self.conn stub = self.set_stub_wait_callback(conn) curs = conn.cursor() for mb in 1, 5, 10, 20, 50: size = mb * 1024 * 1024 del stub.polls[:] curs.execute("select %s;", ('x' * size,)) self.assertEqual(size, len(curs.fetchone()[0])) if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1: return # This is more a testing glitch than an error: it happens # on high load on linux: probably because the kernel has more # buffers ready. A warning may be useful during development, # but an error is bad during regression testing. import warnings warnings.warn("sending a large query didn't trigger block on write.") def test_error_in_callback(self): # behaviour changed after issue #113: if there is an error in the # callback for the moment we don't have a way to reset the connection # without blocking (ticket #113) so just close it. conn = self.conn curs = conn.cursor() curs.execute("select 1") # have a BEGIN curs.fetchone() # now try to do something that will fail in the callback psycopg2.extensions.set_wait_callback(lambda conn: 1//0) self.assertRaises(ZeroDivisionError, curs.execute, "select 2") self.assertTrue(conn.closed) def test_dont_freak_out(self): # if there is an error in a green query, don't freak out and close # the connection conn = self.conn curs = conn.cursor() self.assertRaises(psycopg2.ProgrammingError, curs.execute, "select the unselectable") # check that the connection is left in an usable state self.assertTrue(not conn.closed) conn.rollback() curs.execute("select 1") self.assertEqual(curs.fetchone()[0], 1) def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) if __name__ == "__main__": unittest.main()
Upload File
Create Folder