Plan 9 from Bell Labs’s /usr/web/sources/contrib/bichued/root/sys/lib/python/bsddb/test/test_compare.py

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


"""
TestCases for python DB Btree key comparison function.
"""

import sys, os, re
import test_all
from cStringIO import StringIO

import unittest
try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbshelve
except ImportError:
    # For Python 2.3
    from bsddb import db, dbshelve

lexical_cmp = cmp

def lowercase_cmp(left, right):
    return cmp (left.lower(), right.lower())

def make_reverse_comparator (cmp):
    def reverse (left, right, delegate=cmp):
        return - delegate (left, right)
    return reverse

_expected_lexical_test_data = ['', 'CCCP', 'a', 'aaa', 'b', 'c', 'cccce', 'ccccf']
_expected_lowercase_test_data = ['', 'a', 'aaa', 'b', 'c', 'CC', 'cccce', 'ccccf', 'CCCP']

class ComparatorTests (unittest.TestCase):
    def comparator_test_helper (self, comparator, expected_data):
        data = expected_data[:]
        data.sort (comparator)
        self.failUnless (data == expected_data,
                         "comparator `%s' is not right: %s vs. %s"
                         % (comparator, expected_data, data))
    def test_lexical_comparator (self):
        self.comparator_test_helper (lexical_cmp, _expected_lexical_test_data)
    def test_reverse_lexical_comparator (self):
        rev = _expected_lexical_test_data[:]
        rev.reverse ()
        self.comparator_test_helper (make_reverse_comparator (lexical_cmp),
                                     rev)
    def test_lowercase_comparator (self):
        self.comparator_test_helper (lowercase_cmp,
                                     _expected_lowercase_test_data)

class AbstractBtreeKeyCompareTestCase (unittest.TestCase):
    env = None
    db = None

    def setUp (self):
        self.filename = self.__class__.__name__ + '.db'
        homeDir = os.path.join (os.path.dirname (sys.argv[0]), 'db_home')
        self.homeDir = homeDir
        try:
            os.mkdir (homeDir)
        except os.error:
            pass

        env = db.DBEnv ()
        env.open (homeDir,
                  db.DB_CREATE | db.DB_INIT_MPOOL
                  | db.DB_INIT_LOCK | db.DB_THREAD)
        self.env = env

    def tearDown (self):
        self.closeDB ()
        if self.env is not None:
            self.env.close ()
            self.env = None
        import glob
        map (os.remove, glob.glob (os.path.join (self.homeDir, '*')))

    def addDataToDB (self, data):
        i = 0
        for item in data:
            self.db.put (item, str (i))
            i = i + 1

    def createDB (self, key_comparator):
        self.db = db.DB (self.env)
        self.setupDB (key_comparator)
        self.db.open (self.filename, "test", db.DB_BTREE, db.DB_CREATE)

    def setupDB (self, key_comparator):
        self.db.set_bt_compare (key_comparator)

    def closeDB (self):
        if self.db is not None:
            self.db.close ()
            self.db = None

    def startTest (self):
        pass

    def finishTest (self, expected = None):
        if expected is not None:
            self.check_results (expected)
        self.closeDB ()

    def check_results (self, expected):
        curs = self.db.cursor ()
        try:
            index = 0
            rec = curs.first ()
            while rec:
                key, ignore = rec
                self.failUnless (index < len (expected),
                                 "to many values returned from cursor")
                self.failUnless (expected[index] == key,
                                 "expected value `%s' at %d but got `%s'"
                                 % (expected[index], index, key))
                index = index + 1
                rec = curs.next ()
            self.failUnless (index == len (expected),
                             "not enough values returned from cursor")
        finally:
            curs.close ()

class BtreeKeyCompareTestCase (AbstractBtreeKeyCompareTestCase):
    def runCompareTest (self, comparator, data):
        self.startTest ()
        self.createDB (comparator)
        self.addDataToDB (data)
        self.finishTest (data)

    def test_lexical_ordering (self):
        self.runCompareTest (lexical_cmp, _expected_lexical_test_data)

    def test_reverse_lexical_ordering (self):
        expected_rev_data = _expected_lexical_test_data[:]
        expected_rev_data.reverse ()
        self.runCompareTest (make_reverse_comparator (lexical_cmp),
                             expected_rev_data)

    def test_compare_function_useless (self):
        self.startTest ()
        def socialist_comparator (l, r):
            return 0
        self.createDB (socialist_comparator)
        self.addDataToDB (['b', 'a', 'd'])
        # all things being equal the first key will be the only key
        # in the database...  (with the last key's value fwiw)
        self.finishTest (['b'])


class BtreeExceptionsTestCase (AbstractBtreeKeyCompareTestCase):
    def test_raises_non_callable (self):
        self.startTest ()
        self.assertRaises (TypeError, self.createDB, 'abc')
        self.assertRaises (TypeError, self.createDB, None)
        self.finishTest ()

    def test_set_bt_compare_with_function (self):
        self.startTest ()
        self.createDB (lexical_cmp)
        self.finishTest ()

    def check_results (self, results):
        pass

    def test_compare_function_incorrect (self):
        self.startTest ()
        def bad_comparator (l, r):
            return 1
        # verify that set_bt_compare checks that comparator('', '') == 0
        self.assertRaises (TypeError, self.createDB, bad_comparator)
        self.finishTest ()

    def verifyStderr(self, method, successRe):
        """
        Call method() while capturing sys.stderr output internally and
        call self.fail() if successRe.search() does not match the stderr
        output.  This is used to test for uncatchable exceptions.
        """
        stdErr = sys.stderr
        sys.stderr = StringIO()
        try:
            method()
        finally:
            temp = sys.stderr
            sys.stderr = stdErr
            errorOut = temp.getvalue()
            if not successRe.search(errorOut):
                self.fail("unexpected stderr output:\n"+errorOut)

    def _test_compare_function_exception (self):
        self.startTest ()
        def bad_comparator (l, r):
            if l == r:
                # pass the set_bt_compare test
                return 0
            raise RuntimeError, "i'm a naughty comparison function"
        self.createDB (bad_comparator)
        #print "\n*** test should print 2 uncatchable tracebacks ***"
        self.addDataToDB (['a', 'b', 'c'])  # this should raise, but...
        self.finishTest ()

    def test_compare_function_exception(self):
        self.verifyStderr(
                self._test_compare_function_exception,
                re.compile('(^RuntimeError:.* naughty.*){2}', re.M|re.S)
        )

    def _test_compare_function_bad_return (self):
        self.startTest ()
        def bad_comparator (l, r):
            if l == r:
                # pass the set_bt_compare test
                return 0
            return l
        self.createDB (bad_comparator)
        #print "\n*** test should print 2 errors about returning an int ***"
        self.addDataToDB (['a', 'b', 'c'])  # this should raise, but...
        self.finishTest ()

    def test_compare_function_bad_return(self):
        self.verifyStderr(
                self._test_compare_function_bad_return,
                re.compile('(^TypeError:.* return an int.*){2}', re.M|re.S)
        )


    def test_cannot_assign_twice (self):

        def my_compare (a, b):
            return 0

        self.startTest ()
        self.createDB (my_compare)
        try:
            self.db.set_bt_compare (my_compare)
            assert False, "this set should fail"

        except RuntimeError, msg:
            pass

def test_suite ():
    res = unittest.TestSuite ()

    res.addTest (unittest.makeSuite (ComparatorTests))
    if db.version () >= (3, 3, 11):
        res.addTest (unittest.makeSuite (BtreeExceptionsTestCase))
        res.addTest (unittest.makeSuite (BtreeKeyCompareTestCase))
    return res

if __name__ == '__main__':
    unittest.main (defaultTest = 'suite')

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to webmaster@9p.io.