1# 2# SPDX-License-Identifier: GPL-2.0-only 3# 4 5import logging 6import os 7import signal 8import subprocess 9 10from wic import WicError 11from wic.pluginbase import SourcePlugin 12from wic.misc import exec_cmd, get_bitbake_var 13from wic.filemap import sparse_copy 14 15logger = logging.getLogger('wic') 16 17class RawCopyPlugin(SourcePlugin): 18 """ 19 Populate partition content from raw image file. 20 """ 21 22 name = 'rawcopy' 23 24 @staticmethod 25 def do_image_label(fstype, dst, label): 26 if fstype.startswith('ext'): 27 cmd = 'tune2fs -L %s %s' % (label, dst) 28 elif fstype in ('msdos', 'vfat'): 29 cmd = 'dosfslabel %s %s' % (dst, label) 30 elif fstype == 'btrfs': 31 cmd = 'btrfs filesystem label %s %s' % (dst, label) 32 elif fstype == 'swap': 33 cmd = 'mkswap -L %s %s' % (label, dst) 34 elif fstype in ('squashfs', 'erofs'): 35 raise WicError("It's not possible to update a %s " 36 "filesystem label '%s'" % (fstype, label)) 37 else: 38 raise WicError("Cannot update filesystem label: " 39 "Unknown fstype: '%s'" % (fstype)) 40 41 exec_cmd(cmd) 42 43 @staticmethod 44 def do_image_uncompression(src, dst, workdir): 45 def subprocess_setup(): 46 # Python installs a SIGPIPE handler by default. This is usually not what 47 # non-Python subprocesses expect. 48 # SIGPIPE errors are known issues with gzip/bash 49 signal.signal(signal.SIGPIPE, signal.SIG_DFL) 50 51 extension = os.path.splitext(src)[1] 52 decompressor = { 53 ".bz2": "bzip2", 54 ".gz": "gzip", 55 ".xz": "xz" 56 }.get(extension) 57 if not decompressor: 58 raise WicError("Not supported compressor filename extension: %s" % extension) 59 cmd = "%s -dc %s > %s" % (decompressor, src, dst) 60 subprocess.call(cmd, preexec_fn=subprocess_setup, shell=True, cwd=workdir) 61 62 @classmethod 63 def do_prepare_partition(cls, part, source_params, cr, cr_workdir, 64 oe_builddir, bootimg_dir, kernel_dir, 65 rootfs_dir, native_sysroot): 66 """ 67 Called to do the actual content population for a partition i.e. it 68 'prepares' the partition to be incorporated into the image. 69 """ 70 if not kernel_dir: 71 kernel_dir = get_bitbake_var("DEPLOY_DIR_IMAGE") 72 if not kernel_dir: 73 raise WicError("Couldn't find DEPLOY_DIR_IMAGE, exiting") 74 75 logger.debug('Kernel dir: %s', kernel_dir) 76 77 if 'file' not in source_params: 78 raise WicError("No file specified") 79 80 if 'unpack' in source_params: 81 img = os.path.join(kernel_dir, source_params['file']) 82 src = os.path.join(cr_workdir, os.path.splitext(source_params['file'])[0]) 83 RawCopyPlugin.do_image_uncompression(img, src, cr_workdir) 84 else: 85 src = os.path.join(kernel_dir, source_params['file']) 86 87 dst = os.path.join(cr_workdir, "%s.%s" % (os.path.basename(source_params['file']), part.lineno)) 88 89 if not os.path.exists(os.path.dirname(dst)): 90 os.makedirs(os.path.dirname(dst)) 91 92 if 'skip' in source_params: 93 sparse_copy(src, dst, skip=int(source_params['skip'])) 94 else: 95 sparse_copy(src, dst) 96 97 # get the size in the right units for kickstart (kB) 98 du_cmd = "du -Lbks %s" % dst 99 out = exec_cmd(du_cmd) 100 filesize = int(out.split()[0]) 101 102 if filesize > part.size: 103 part.size = filesize 104 105 if part.label: 106 RawCopyPlugin.do_image_label(part.fstype, dst, part.label) 107 108 part.source_file = dst 109