1import pexpect 2 3import infra 4 5 6class Emulator(object): 7 8 def __init__(self, builddir, downloaddir, logtofile, timeout_multiplier): 9 self.qemu = None 10 self.downloaddir = downloaddir 11 self.logfile = infra.open_log_file(builddir, "run", logtofile) 12 # We use elastic runners on the cloud to runs our tests. Those runners 13 # can take a long time to run the emulator. Use a timeout multiplier 14 # when running the tests to avoid sporadic failures. 15 self.timeout_multiplier = timeout_multiplier 16 17 # Start Qemu to boot the system 18 # 19 # arch: Qemu architecture to use 20 # 21 # kernel: path to the kernel image, or the special string 22 # 'builtin'. 'builtin' means a pre-built kernel image will be 23 # downloaded from ARTEFACTS_URL and suitable options are 24 # automatically passed to qemu and added to the kernel cmdline. So 25 # far only armv5, armv7 and i386 builtin kernels are available. 26 # If None, then no kernel is used, and we assume a bootable device 27 # will be specified. 28 # 29 # kernel_cmdline: array of kernel arguments to pass to Qemu -append option 30 # 31 # options: array of command line options to pass to Qemu 32 # 33 def boot(self, arch, kernel=None, kernel_cmdline=None, options=None): 34 if arch in ["armv7", "armv5"]: 35 qemu_arch = "arm" 36 else: 37 qemu_arch = arch 38 39 qemu_cmd = ["qemu-system-{}".format(qemu_arch), 40 "-serial", "stdio", 41 "-display", "none", 42 "-m", "256"] 43 44 if options: 45 qemu_cmd += options 46 47 if kernel_cmdline is None: 48 kernel_cmdline = [] 49 50 if kernel: 51 if kernel == "builtin": 52 if arch in ["armv7", "armv5"]: 53 kernel_cmdline.append("console=ttyAMA0") 54 55 if arch == "armv7": 56 kernel = infra.download(self.downloaddir, 57 "kernel-vexpress-5.10.7") 58 dtb = infra.download(self.downloaddir, 59 "vexpress-v2p-ca9-5.10.7.dtb") 60 qemu_cmd += ["-dtb", dtb] 61 qemu_cmd += ["-M", "vexpress-a9"] 62 elif arch == "armv5": 63 kernel = infra.download(self.downloaddir, 64 "kernel-versatile-5.10.7") 65 dtb = infra.download(self.downloaddir, 66 "versatile-pb-5.10.7.dtb") 67 qemu_cmd += ["-dtb", dtb] 68 qemu_cmd += ["-M", "versatilepb"] 69 qemu_cmd += ["-device", "virtio-rng-pci"] 70 71 qemu_cmd += ["-kernel", kernel] 72 73 if kernel_cmdline: 74 qemu_cmd += ["-append", " ".join(kernel_cmdline)] 75 76 self.logfile.write("> starting qemu with '%s'\n" % " ".join(qemu_cmd)) 77 self.qemu = pexpect.spawn(qemu_cmd[0], qemu_cmd[1:], 78 timeout=5 * self.timeout_multiplier, 79 encoding='utf-8', 80 codec_errors='replace', 81 env={"QEMU_AUDIO_DRV": "none"}) 82 # We want only stdout into the log to avoid double echo 83 self.qemu.logfile_read = self.logfile 84 85 # Wait for the login prompt to appear, and then login as root with 86 # the provided password, or no password if not specified. 87 def login(self, password=None): 88 # The login prompt can take some time to appear when running multiple 89 # instances in parallel, so set the timeout to a large value 90 index = self.qemu.expect(["buildroot login:", pexpect.TIMEOUT], 91 timeout=60 * self.timeout_multiplier) 92 if index != 0: 93 self.logfile.write("==> System does not boot") 94 raise SystemError("System does not boot") 95 96 self.qemu.sendline("root") 97 if password: 98 self.qemu.expect("Password:") 99 self.qemu.sendline(password) 100 index = self.qemu.expect(["# ", pexpect.TIMEOUT]) 101 if index != 0: 102 raise SystemError("Cannot login") 103 self.run("dmesg -n 1") 104 # Prevent the shell from wrapping the commands at 80 columns. 105 self.run("stty columns 29999") 106 107 # Run the given 'cmd' with a 'timeout' on the target 108 # return a tuple (output, exit_code) 109 def run(self, cmd, timeout=-1): 110 self.qemu.sendline(cmd) 111 if timeout != -1: 112 timeout *= self.timeout_multiplier 113 self.qemu.expect("# ", timeout=timeout) 114 # Remove double carriage return from qemu stdout so str.splitlines() 115 # works as expected. 116 output = self.qemu.before.replace("\r\r", "\r").splitlines()[1:] 117 118 self.qemu.sendline("echo $?") 119 self.qemu.expect("# ") 120 exit_code = self.qemu.before.splitlines()[2] 121 exit_code = int(exit_code) 122 123 return output, exit_code 124 125 def stop(self): 126 if self.qemu is None: 127 return 128 self.qemu.terminate(force=True) 129