xref: /OK3568_Linux_fs/buildroot/support/scripts/cve.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1#!/usr/bin/env python3
2
3# Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
4# Copyright (C) 2020 by Gregory CLEMENT <gregory.clement@bootlin.com>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20import datetime
21import os
22import requests  # URL checking
23import distutils.version
24import time
25import gzip
26import sys
27import operator
28
29try:
30    import ijson
31    # backend is a module in < 2.5, a string in >= 2.5
32    if 'python' in getattr(ijson.backend, '__name__', ijson.backend):
33        try:
34            import ijson.backends.yajl2_cffi as ijson
35        except ImportError:
36            sys.stderr.write('Warning: Using slow ijson python backend\n')
37except ImportError:
38    sys.stderr.write("You need ijson to parse NVD for CVE check\n")
39    exit(1)
40
41sys.path.append('utils/')
42
43NVD_START_YEAR = 2002
44NVD_JSON_VERSION = "1.1"
45NVD_BASE_URL = "https://nvd.nist.gov/feeds/json/cve/" + NVD_JSON_VERSION
46
47ops = {
48    '>=': operator.ge,
49    '>': operator.gt,
50    '<=': operator.le,
51    '<': operator.lt,
52    '=': operator.eq
53}
54
55
56# Check if two CPE IDs match each other
57def cpe_matches(cpe1, cpe2):
58    cpe1_elems = cpe1.split(":")
59    cpe2_elems = cpe2.split(":")
60
61    remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1],
62                     zip(cpe1_elems, cpe2_elems))
63    return len(list(remains)) == 0
64
65
66def cpe_product(cpe):
67    return cpe.split(':')[4]
68
69
70def cpe_version(cpe):
71    return cpe.split(':')[5]
72
73
74class CVE:
75    """An accessor class for CVE Items in NVD files"""
76    CVE_AFFECTS = 1
77    CVE_DOESNT_AFFECT = 2
78    CVE_UNKNOWN = 3
79
80    def __init__(self, nvd_cve):
81        """Initialize a CVE from its NVD JSON representation"""
82        self.nvd_cve = nvd_cve
83
84    @staticmethod
85    def download_nvd_year(nvd_path, year):
86        metaf = "nvdcve-%s-%s.meta" % (NVD_JSON_VERSION, year)
87        path_metaf = os.path.join(nvd_path, metaf)
88        jsonf_gz = "nvdcve-%s-%s.json.gz" % (NVD_JSON_VERSION, year)
89        path_jsonf_gz = os.path.join(nvd_path, jsonf_gz)
90
91        # If the database file is less than a day old, we assume the NVD data
92        # locally available is recent enough.
93        if os.path.exists(path_jsonf_gz) and os.stat(path_jsonf_gz).st_mtime >= time.time() - 86400:
94            return path_jsonf_gz
95
96        # If not, we download the meta file
97        url = "%s/%s" % (NVD_BASE_URL, metaf)
98        print("Getting %s" % url)
99        page_meta = requests.get(url)
100        page_meta.raise_for_status()
101
102        # If the meta file already existed, we compare the existing
103        # one with the data newly downloaded. If they are different,
104        # we need to re-download the database.
105        # If the database does not exist locally, we need to redownload it in
106        # any case.
107        if os.path.exists(path_metaf) and os.path.exists(path_jsonf_gz):
108            meta_known = open(path_metaf, "r").read()
109            if page_meta.text == meta_known:
110                return path_jsonf_gz
111
112        # Grab the compressed JSON NVD, and write files to disk
113        url = "%s/%s" % (NVD_BASE_URL, jsonf_gz)
114        print("Getting %s" % url)
115        page_json = requests.get(url)
116        page_json.raise_for_status()
117        open(path_jsonf_gz, "wb").write(page_json.content)
118        open(path_metaf, "w").write(page_meta.text)
119        return path_jsonf_gz
120
121    @classmethod
122    def read_nvd_dir(cls, nvd_dir):
123        """
124        Iterate over all the CVEs contained in NIST Vulnerability Database
125        feeds since NVD_START_YEAR. If the files are missing or outdated in
126        nvd_dir, a fresh copy will be downloaded, and kept in .json.gz
127        """
128        for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1):
129            filename = CVE.download_nvd_year(nvd_dir, year)
130            try:
131                content = ijson.items(gzip.GzipFile(filename), 'CVE_Items.item')
132            except:  # noqa: E722
133                print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename)
134                raise
135            for cve in content:
136                yield cls(cve)
137
138    def each_product(self):
139        """Iterate over each product section of this cve"""
140        for vendor in self.nvd_cve['cve']['affects']['vendor']['vendor_data']:
141            for product in vendor['product']['product_data']:
142                yield product
143
144    def parse_node(self, node):
145        """
146        Parse the node inside the configurations section to extract the
147        cpe information usefull to know if a product is affected by
148        the CVE. Actually only the product name and the version
149        descriptor are needed, but we also provide the vendor name.
150        """
151
152        # The node containing the cpe entries matching the CVE can also
153        # contain sub-nodes, so we need to manage it.
154        for child in node.get('children', ()):
155            for parsed_node in self.parse_node(child):
156                yield parsed_node
157
158        for cpe in node.get('cpe_match', ()):
159            if not cpe['vulnerable']:
160                return
161            product = cpe_product(cpe['cpe23Uri'])
162            version = cpe_version(cpe['cpe23Uri'])
163            # ignore when product is '-', which means N/A
164            if product == '-':
165                return
166            op_start = ''
167            op_end = ''
168            v_start = ''
169            v_end = ''
170
171            if version != '*' and version != '-':
172                # Version is defined, this is a '=' match
173                op_start = '='
174                v_start = version
175            else:
176                # Parse start version, end version and operators
177                if 'versionStartIncluding' in cpe:
178                    op_start = '>='
179                    v_start = cpe['versionStartIncluding']
180
181                if 'versionStartExcluding' in cpe:
182                    op_start = '>'
183                    v_start = cpe['versionStartExcluding']
184
185                if 'versionEndIncluding' in cpe:
186                    op_end = '<='
187                    v_end = cpe['versionEndIncluding']
188
189                if 'versionEndExcluding' in cpe:
190                    op_end = '<'
191                    v_end = cpe['versionEndExcluding']
192
193            yield {
194                'id': cpe['cpe23Uri'],
195                'v_start': v_start,
196                'op_start': op_start,
197                'v_end': v_end,
198                'op_end': op_end
199            }
200
201    def each_cpe(self):
202        for node in self.nvd_cve['configurations']['nodes']:
203            for cpe in self.parse_node(node):
204                yield cpe
205
206    @property
207    def identifier(self):
208        """The CVE unique identifier"""
209        return self.nvd_cve['cve']['CVE_data_meta']['ID']
210
211    @property
212    def affected_products(self):
213        """The set of CPE products referred by this CVE definition"""
214        return set(cpe_product(p['id']) for p in self.each_cpe())
215
216    def affects(self, name, version, cve_ignore_list, cpeid=None):
217        """
218        True if the Buildroot Package object passed as argument is affected
219        by this CVE.
220        """
221        if self.identifier in cve_ignore_list:
222            return self.CVE_DOESNT_AFFECT
223
224        pkg_version = distutils.version.LooseVersion(version)
225        if not hasattr(pkg_version, "version"):
226            print("Cannot parse package '%s' version '%s'" % (name, version))
227            pkg_version = None
228
229        # if we don't have a cpeid, build one based on name and version
230        if not cpeid:
231            cpeid = "cpe:2.3:*:*:%s:%s:*:*:*:*:*:*:*" % (name, version)
232        # if we have a cpeid, use its version instead of the package
233        # version, as they might be different due to
234        # <pkg>_CPE_ID_VERSION
235        else:
236            pkg_version = distutils.version.LooseVersion(cpe_version(cpeid))
237
238        for cpe in self.each_cpe():
239            if not cpe_matches(cpe['id'], cpeid):
240                continue
241            if not cpe['v_start'] and not cpe['v_end']:
242                return self.CVE_AFFECTS
243            if not pkg_version:
244                continue
245
246            if cpe['v_start']:
247                try:
248                    cve_affected_version = distutils.version.LooseVersion(cpe['v_start'])
249                    inrange = ops.get(cpe['op_start'])(pkg_version, cve_affected_version)
250                except TypeError:
251                    return self.CVE_UNKNOWN
252
253                # current package version is before v_start, so we're
254                # not affected by the CVE
255                if not inrange:
256                    continue
257
258            if cpe['v_end']:
259                try:
260                    cve_affected_version = distutils.version.LooseVersion(cpe['v_end'])
261                    inrange = ops.get(cpe['op_end'])(pkg_version, cve_affected_version)
262                except TypeError:
263                    return self.CVE_UNKNOWN
264
265                # current package version is after v_end, so we're
266                # not affected by the CVE
267                if not inrange:
268                    continue
269
270            # We're in the version range affected by this CVE
271            return self.CVE_AFFECTS
272
273        return self.CVE_DOESNT_AFFECT
274