1# 2# Copyright BitBake Contributors 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6 7from pathlib import Path 8import bb.compress.lz4 9import bb.compress.zstd 10import contextlib 11import os 12import shutil 13import tempfile 14import unittest 15import subprocess 16 17 18class CompressionTests(object): 19 def setUp(self): 20 self._t = tempfile.TemporaryDirectory() 21 self.tmpdir = Path(self._t.name) 22 self.addCleanup(self._t.cleanup) 23 24 def _file_helper(self, mode_suffix, data): 25 tmp_file = self.tmpdir / "compressed" 26 27 with self.do_open(tmp_file, mode="w" + mode_suffix) as f: 28 f.write(data) 29 30 with self.do_open(tmp_file, mode="r" + mode_suffix) as f: 31 read_data = f.read() 32 33 self.assertEqual(read_data, data) 34 35 def test_text_file(self): 36 self._file_helper("t", "Hello") 37 38 def test_binary_file(self): 39 self._file_helper("b", "Hello".encode("utf-8")) 40 41 def _pipe_helper(self, mode_suffix, data): 42 rfd, wfd = os.pipe() 43 with open(rfd, "rb") as r, open(wfd, "wb") as w: 44 with self.do_open(r, mode="r" + mode_suffix) as decompress: 45 with self.do_open(w, mode="w" + mode_suffix) as compress: 46 compress.write(data) 47 read_data = decompress.read() 48 49 self.assertEqual(read_data, data) 50 51 def test_text_pipe(self): 52 self._pipe_helper("t", "Hello") 53 54 def test_binary_pipe(self): 55 self._pipe_helper("b", "Hello".encode("utf-8")) 56 57 def test_bad_decompress(self): 58 tmp_file = self.tmpdir / "compressed" 59 with tmp_file.open("wb") as f: 60 f.write(b"\x00") 61 62 with self.assertRaises(OSError): 63 with self.do_open(tmp_file, mode="rb", stderr=subprocess.DEVNULL) as f: 64 data = f.read() 65 66 67class LZ4Tests(CompressionTests, unittest.TestCase): 68 def setUp(self): 69 if shutil.which("lz4c") is None: 70 self.skipTest("'lz4c' not found") 71 super().setUp() 72 73 @contextlib.contextmanager 74 def do_open(self, *args, **kwargs): 75 with bb.compress.lz4.open(*args, **kwargs) as f: 76 yield f 77 78 79class ZStdTests(CompressionTests, unittest.TestCase): 80 def setUp(self): 81 if shutil.which("zstd") is None: 82 self.skipTest("'zstd' not found") 83 super().setUp() 84 85 @contextlib.contextmanager 86 def do_open(self, *args, **kwargs): 87 with bb.compress.zstd.open(*args, **kwargs) as f: 88 yield f 89 90 91class PZStdTests(CompressionTests, unittest.TestCase): 92 def setUp(self): 93 if shutil.which("pzstd") is None: 94 self.skipTest("'pzstd' not found") 95 super().setUp() 96 97 @contextlib.contextmanager 98 def do_open(self, *args, **kwargs): 99 with bb.compress.zstd.open(*args, num_threads=2, **kwargs) as f: 100 yield f 101