1# 2# BitBake Test for lib/bb/persist_data/ 3# 4# Copyright (C) 2018 Garmin Ltd. 5# 6# SPDX-License-Identifier: GPL-2.0-only 7# 8 9import unittest 10import bb.data 11import bb.persist_data 12import tempfile 13import threading 14 15class PersistDataTest(unittest.TestCase): 16 def _create_data(self): 17 return bb.persist_data.persist('TEST_PERSIST_DATA', self.d) 18 19 def setUp(self): 20 self.d = bb.data.init() 21 self.tempdir = tempfile.TemporaryDirectory() 22 self.d['PERSISTENT_DIR'] = self.tempdir.name 23 self.data = self._create_data() 24 self.items = { 25 'A1': '1', 26 'B1': '2', 27 'C2': '3' 28 } 29 self.stress_count = 10000 30 self.thread_count = 5 31 32 for k,v in self.items.items(): 33 self.data[k] = v 34 35 def tearDown(self): 36 self.tempdir.cleanup() 37 38 def _iter_helper(self, seen, iterator): 39 with iter(iterator): 40 for v in iterator: 41 self.assertTrue(v in seen) 42 seen.remove(v) 43 self.assertEqual(len(seen), 0, '%s not seen' % seen) 44 45 def test_get(self): 46 for k, v in self.items.items(): 47 self.assertEqual(self.data[k], v) 48 49 self.assertIsNone(self.data.get('D')) 50 with self.assertRaises(KeyError): 51 self.data['D'] 52 53 def test_set(self): 54 for k, v in self.items.items(): 55 self.data[k] += '-foo' 56 57 for k, v in self.items.items(): 58 self.assertEqual(self.data[k], v + '-foo') 59 60 def test_delete(self): 61 self.data['D'] = '4' 62 self.assertEqual(self.data['D'], '4') 63 del self.data['D'] 64 self.assertIsNone(self.data.get('D')) 65 with self.assertRaises(KeyError): 66 self.data['D'] 67 68 def test_contains(self): 69 for k in self.items: 70 self.assertTrue(k in self.data) 71 self.assertTrue(self.data.has_key(k)) 72 self.assertFalse('NotFound' in self.data) 73 self.assertFalse(self.data.has_key('NotFound')) 74 75 def test_len(self): 76 self.assertEqual(len(self.data), len(self.items)) 77 78 def test_iter(self): 79 self._iter_helper(set(self.items.keys()), self.data) 80 81 def test_itervalues(self): 82 self._iter_helper(set(self.items.values()), self.data.itervalues()) 83 84 def test_iteritems(self): 85 self._iter_helper(set(self.items.items()), self.data.iteritems()) 86 87 def test_get_by_pattern(self): 88 self._iter_helper({'1', '2'}, self.data.get_by_pattern('_1')) 89 90 def _stress_read(self, data): 91 for i in range(self.stress_count): 92 for k in self.items: 93 data[k] 94 95 def _stress_write(self, data): 96 for i in range(self.stress_count): 97 for k, v in self.items.items(): 98 data[k] = v + str(i) 99 100 def _validate_stress(self): 101 for k, v in self.items.items(): 102 self.assertEqual(self.data[k], v + str(self.stress_count - 1)) 103 104 def test_stress(self): 105 self._stress_read(self.data) 106 self._stress_write(self.data) 107 self._validate_stress() 108 109 def test_stress_threads(self): 110 def read_thread(): 111 data = self._create_data() 112 self._stress_read(data) 113 114 def write_thread(): 115 data = self._create_data() 116 self._stress_write(data) 117 118 threads = [] 119 for i in range(self.thread_count): 120 threads.append(threading.Thread(target=read_thread)) 121 threads.append(threading.Thread(target=write_thread)) 122 123 for t in threads: 124 t.start() 125 self._stress_read(self.data) 126 for t in threads: 127 t.join() 128 self._validate_stress() 129 130