WIP use checksums for updates.xml files

This commit is contained in:
David Dalcino
2022-02-27 16:59:21 -08:00
parent 2c5c261592
commit f979d80899
3 changed files with 33 additions and 11 deletions

View File

@@ -19,15 +19,15 @@
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import binascii
import posixpath
import xml.etree.ElementTree as ElementTree
from dataclasses import dataclass, field
from logging import getLogger
from typing import Dict, Iterable, List, Optional, Tuple
from aqt.exceptions import ArchiveDownloadError, ArchiveListError, NoPackageFound
from aqt.helper import Settings, getUrl, ssplit
from aqt.exceptions import ArchiveDownloadError, ArchiveListError, ChecksumDownloadFailure, NoPackageFound
from aqt.helper import Settings, get_hash, getUrl, ssplit
from aqt.metadata import QtRepoProperty, Version
@@ -224,13 +224,17 @@ class QtArchives:
self.target,
f"qt{self.version.major}_{self._version_str()}{self._arch_ext()}",
)
update_xml_url = posixpath.join(self.base, os_target_folder, "Updates.xml")
self._download_update_xml(update_xml_url)
update_xml_path = posixpath.join(os_target_folder, "Updates.xml")
self._download_update_xml(update_xml_path)
self._parse_update_xml(os_target_folder, self._target_packages())
def _download_update_xml(self, update_xml_url):
def _download_update_xml(self, update_xml_path):
"""Hook for unit test."""
self.update_xml_text = getUrl(update_xml_url, self.timeout)
xml_hash = binascii.unhexlify(get_hash(update_xml_path, "sha256", self.timeout))
if xml_hash == "":
raise ChecksumDownloadFailure(f"Checksum for '{update_xml_path}' is empty")
update_xml_text = getUrl(posixpath.join(self.base, update_xml_path), self.timeout, xml_hash)
self.update_xml_text = update_xml_text
def _parse_update_xml(self, os_target_folder, target_packages: Optional[ModuleToPackage]):
if not target_packages:
@@ -439,7 +443,7 @@ class ToolArchives(QtArchives):
# tools_ifw/
self.tool_name,
)
update_xml_url = posixpath.join(self.base, os_target_folder, "Updates.xml")
update_xml_url = posixpath.join(os_target_folder, "Updates.xml")
self._download_update_xml(update_xml_url) # call super method.
self._parse_update_xml(os_target_folder, None)

View File

@@ -31,7 +31,7 @@ import xml.etree.ElementTree as ElementTree
from logging import getLogger
from logging.handlers import QueueListener
from pathlib import Path
from typing import Callable, Dict, Generator, List, Tuple
from typing import Callable, Dict, Generator, List, Optional, Tuple
from urllib.parse import urlparse
import requests
@@ -55,7 +55,13 @@ def _check_content_type(ct: str) -> bool:
return any(ct.startswith(t) for t in candidate)
def getUrl(url: str, timeout) -> str:
def getUrl(url: str, timeout, expected_hash: Optional[bytes] = None) -> str:
"""
Gets a file from `url` via HTTP GET.
No caller should call this function without providing an expected_hash, unless
the caller is `get_hash`, which cannot know what the expected hash should be.
"""
logger = getLogger("aqt.helper")
with requests.sessions.Session() as session:
retries = requests.adapters.Retry(
@@ -84,6 +90,14 @@ def getUrl(url: str, timeout) -> str:
msg = f"Failed to retrieve file at {url}\nServer response code: {r.status_code}, reason: {r.reason}"
raise ArchiveDownloadError(msg)
result = r.text
filename = url.split("/")[-1]
actual_hash = hashlib.sha256(bytes(result, "utf-8")).digest()
if expected_hash is not None and expected_hash != actual_hash:
raise ArchiveChecksumError(
f"Downloaded file {filename} is corrupted! Detect checksum error.\n"
f"Expect {expected_hash.hex()}: {url}\n"
f"Actual {actual_hash.hex()}: {filename}"
)
return result

View File

@@ -1,3 +1,4 @@
import hashlib
import json
import os
import re
@@ -73,7 +74,10 @@ def corrupt_xmlfile():
),
)
def test_qtarchive_parse_corrupt_xmlfile(monkeypatch, corrupt_xmlfile, archives_class, init_args):
monkeypatch.setattr("aqt.archives.getUrl", lambda self, url: corrupt_xmlfile)
monkeypatch.setattr("aqt.archives.getUrl", lambda *args, **kwargs: corrupt_xmlfile)
monkeypatch.setattr(
"aqt.archives.get_hash", lambda *args, **kwargs: hashlib.sha256(bytes(corrupt_xmlfile, "utf-8")).hexdigest()
)
with pytest.raises(ArchiveListError) as error:
archives_class(*init_args)