1#!/usr/bin/env python3 2 3# Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com> 4# Copyright (C) 2020 by Gregory CLEMENT <gregory.clement@bootlin.com> 5# 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 19 20import datetime 21import os 22import requests # URL checking 23import distutils.version 24import time 25import gzip 26import sys 27import operator 28 29try: 30 import ijson 31 # backend is a module in < 2.5, a string in >= 2.5 32 if 'python' in getattr(ijson.backend, '__name__', ijson.backend): 33 try: 34 import ijson.backends.yajl2_cffi as ijson 35 except ImportError: 36 sys.stderr.write('Warning: Using slow ijson python backend\n') 37except ImportError: 38 sys.stderr.write("You need ijson to parse NVD for CVE check\n") 39 exit(1) 40 41sys.path.append('utils/') 42 43NVD_START_YEAR = 2002 44NVD_JSON_VERSION = "1.1" 45NVD_BASE_URL = "https://nvd.nist.gov/feeds/json/cve/" + NVD_JSON_VERSION 46 47ops = { 48 '>=': operator.ge, 49 '>': operator.gt, 50 '<=': operator.le, 51 '<': operator.lt, 52 '=': operator.eq 53} 54 55 56# Check if two CPE IDs match each other 57def cpe_matches(cpe1, cpe2): 58 cpe1_elems = cpe1.split(":") 59 cpe2_elems = cpe2.split(":") 60 61 remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1], 62 zip(cpe1_elems, cpe2_elems)) 63 return len(list(remains)) == 0 64 65 66def cpe_product(cpe): 67 return cpe.split(':')[4] 68 69 70def cpe_version(cpe): 71 return cpe.split(':')[5] 72 73 74class CVE: 75 """An accessor class for CVE Items in NVD files""" 76 CVE_AFFECTS = 1 77 CVE_DOESNT_AFFECT = 2 78 CVE_UNKNOWN = 3 79 80 def __init__(self, nvd_cve): 81 """Initialize a CVE from its NVD JSON representation""" 82 self.nvd_cve = nvd_cve 83 84 @staticmethod 85 def download_nvd_year(nvd_path, year): 86 metaf = "nvdcve-%s-%s.meta" % (NVD_JSON_VERSION, year) 87 path_metaf = os.path.join(nvd_path, metaf) 88 jsonf_gz = "nvdcve-%s-%s.json.gz" % (NVD_JSON_VERSION, year) 89 path_jsonf_gz = os.path.join(nvd_path, jsonf_gz) 90 91 # If the database file is less than a day old, we assume the NVD data 92 # locally available is recent enough. 93 if os.path.exists(path_jsonf_gz) and os.stat(path_jsonf_gz).st_mtime >= time.time() - 86400: 94 return path_jsonf_gz 95 96 # If not, we download the meta file 97 url = "%s/%s" % (NVD_BASE_URL, metaf) 98 print("Getting %s" % url) 99 page_meta = requests.get(url) 100 page_meta.raise_for_status() 101 102 # If the meta file already existed, we compare the existing 103 # one with the data newly downloaded. If they are different, 104 # we need to re-download the database. 105 # If the database does not exist locally, we need to redownload it in 106 # any case. 107 if os.path.exists(path_metaf) and os.path.exists(path_jsonf_gz): 108 meta_known = open(path_metaf, "r").read() 109 if page_meta.text == meta_known: 110 return path_jsonf_gz 111 112 # Grab the compressed JSON NVD, and write files to disk 113 url = "%s/%s" % (NVD_BASE_URL, jsonf_gz) 114 print("Getting %s" % url) 115 page_json = requests.get(url) 116 page_json.raise_for_status() 117 open(path_jsonf_gz, "wb").write(page_json.content) 118 open(path_metaf, "w").write(page_meta.text) 119 return path_jsonf_gz 120 121 @classmethod 122 def read_nvd_dir(cls, nvd_dir): 123 """ 124 Iterate over all the CVEs contained in NIST Vulnerability Database 125 feeds since NVD_START_YEAR. If the files are missing or outdated in 126 nvd_dir, a fresh copy will be downloaded, and kept in .json.gz 127 """ 128 for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1): 129 filename = CVE.download_nvd_year(nvd_dir, year) 130 try: 131 content = ijson.items(gzip.GzipFile(filename), 'CVE_Items.item') 132 except: # noqa: E722 133 print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename) 134 raise 135 for cve in content: 136 yield cls(cve) 137 138 def each_product(self): 139 """Iterate over each product section of this cve""" 140 for vendor in self.nvd_cve['cve']['affects']['vendor']['vendor_data']: 141 for product in vendor['product']['product_data']: 142 yield product 143 144 def parse_node(self, node): 145 """ 146 Parse the node inside the configurations section to extract the 147 cpe information usefull to know if a product is affected by 148 the CVE. Actually only the product name and the version 149 descriptor are needed, but we also provide the vendor name. 150 """ 151 152 # The node containing the cpe entries matching the CVE can also 153 # contain sub-nodes, so we need to manage it. 154 for child in node.get('children', ()): 155 for parsed_node in self.parse_node(child): 156 yield parsed_node 157 158 for cpe in node.get('cpe_match', ()): 159 if not cpe['vulnerable']: 160 return 161 product = cpe_product(cpe['cpe23Uri']) 162 version = cpe_version(cpe['cpe23Uri']) 163 # ignore when product is '-', which means N/A 164 if product == '-': 165 return 166 op_start = '' 167 op_end = '' 168 v_start = '' 169 v_end = '' 170 171 if version != '*' and version != '-': 172 # Version is defined, this is a '=' match 173 op_start = '=' 174 v_start = version 175 else: 176 # Parse start version, end version and operators 177 if 'versionStartIncluding' in cpe: 178 op_start = '>=' 179 v_start = cpe['versionStartIncluding'] 180 181 if 'versionStartExcluding' in cpe: 182 op_start = '>' 183 v_start = cpe['versionStartExcluding'] 184 185 if 'versionEndIncluding' in cpe: 186 op_end = '<=' 187 v_end = cpe['versionEndIncluding'] 188 189 if 'versionEndExcluding' in cpe: 190 op_end = '<' 191 v_end = cpe['versionEndExcluding'] 192 193 yield { 194 'id': cpe['cpe23Uri'], 195 'v_start': v_start, 196 'op_start': op_start, 197 'v_end': v_end, 198 'op_end': op_end 199 } 200 201 def each_cpe(self): 202 for node in self.nvd_cve['configurations']['nodes']: 203 for cpe in self.parse_node(node): 204 yield cpe 205 206 @property 207 def identifier(self): 208 """The CVE unique identifier""" 209 return self.nvd_cve['cve']['CVE_data_meta']['ID'] 210 211 @property 212 def affected_products(self): 213 """The set of CPE products referred by this CVE definition""" 214 return set(cpe_product(p['id']) for p in self.each_cpe()) 215 216 def affects(self, name, version, cve_ignore_list, cpeid=None): 217 """ 218 True if the Buildroot Package object passed as argument is affected 219 by this CVE. 220 """ 221 if self.identifier in cve_ignore_list: 222 return self.CVE_DOESNT_AFFECT 223 224 pkg_version = distutils.version.LooseVersion(version) 225 if not hasattr(pkg_version, "version"): 226 print("Cannot parse package '%s' version '%s'" % (name, version)) 227 pkg_version = None 228 229 # if we don't have a cpeid, build one based on name and version 230 if not cpeid: 231 cpeid = "cpe:2.3:*:*:%s:%s:*:*:*:*:*:*:*" % (name, version) 232 # if we have a cpeid, use its version instead of the package 233 # version, as they might be different due to 234 # <pkg>_CPE_ID_VERSION 235 else: 236 pkg_version = distutils.version.LooseVersion(cpe_version(cpeid)) 237 238 for cpe in self.each_cpe(): 239 if not cpe_matches(cpe['id'], cpeid): 240 continue 241 if not cpe['v_start'] and not cpe['v_end']: 242 return self.CVE_AFFECTS 243 if not pkg_version: 244 continue 245 246 if cpe['v_start']: 247 try: 248 cve_affected_version = distutils.version.LooseVersion(cpe['v_start']) 249 inrange = ops.get(cpe['op_start'])(pkg_version, cve_affected_version) 250 except TypeError: 251 return self.CVE_UNKNOWN 252 253 # current package version is before v_start, so we're 254 # not affected by the CVE 255 if not inrange: 256 continue 257 258 if cpe['v_end']: 259 try: 260 cve_affected_version = distutils.version.LooseVersion(cpe['v_end']) 261 inrange = ops.get(cpe['op_end'])(pkg_version, cve_affected_version) 262 except TypeError: 263 return self.CVE_UNKNOWN 264 265 # current package version is after v_end, so we're 266 # not affected by the CVE 267 if not inrange: 268 continue 269 270 # We're in the version range affected by this CVE 271 return self.CVE_AFFECTS 272 273 return self.CVE_DOESNT_AFFECT 274