mirror of
https://github.com/miurahr/aqtinstall.git
synced 2025-12-17 04:34:37 +03:00
Merge pull request #504 from ddalcino/improve-get_hash
[Security] Improve `get_hash`
This commit is contained in:
@@ -19,7 +19,6 @@
|
||||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
import binascii
|
||||
import posixpath
|
||||
from dataclasses import dataclass, field
|
||||
from logging import getLogger
|
||||
@@ -27,7 +26,7 @@ from typing import Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
from defusedxml import ElementTree
|
||||
|
||||
from aqt.exceptions import ArchiveDownloadError, ArchiveListError, ChecksumDownloadFailure, NoPackageFound
|
||||
from aqt.exceptions import ArchiveDownloadError, ArchiveListError, NoPackageFound
|
||||
from aqt.helper import Settings, get_hash, getUrl, ssplit
|
||||
from aqt.metadata import QtRepoProperty, Version
|
||||
|
||||
@@ -231,9 +230,7 @@ class QtArchives:
|
||||
|
||||
def _download_update_xml(self, update_xml_path):
|
||||
"""Hook for unit test."""
|
||||
xml_hash = binascii.unhexlify(get_hash(update_xml_path, "sha256", self.timeout))
|
||||
if xml_hash == "":
|
||||
raise ChecksumDownloadFailure(f"Checksum for '{update_xml_path}' is empty")
|
||||
xml_hash = get_hash(update_xml_path, "sha256", self.timeout)
|
||||
update_xml_text = getUrl(posixpath.join(self.base, update_xml_path), self.timeout, xml_hash)
|
||||
self.update_xml_text = update_xml_text
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
import binascii
|
||||
import configparser
|
||||
import hashlib
|
||||
import json
|
||||
@@ -176,16 +176,28 @@ def iter_list_reps(_list: List, num_reps: int) -> Generator:
|
||||
list_index = 0
|
||||
|
||||
|
||||
def get_hash(archive_path: str, algorithm: str, timeout) -> str:
|
||||
def get_hash(archive_path: str, algorithm: str, timeout) -> bytes:
|
||||
"""
|
||||
Downloads a checksum and unhexlifies it to a `bytes` object, guaranteed to be the right length.
|
||||
Raises ChecksumDownloadFailure if the download failed, or if the checksum was un unexpected length.
|
||||
|
||||
:param archive_path: The path to the file that we want to check, not the path to the checksum.
|
||||
:param algorithm: sha256 is the only safe value to use here.
|
||||
:param timeout: The timeout used by getUrl.
|
||||
:return: A checksum in `bytes`
|
||||
"""
|
||||
logger = getLogger("aqt.helper")
|
||||
hash_lengths = {"sha256": 64, "sha1": 40, "md5": 32}
|
||||
for base_url in iter_list_reps(Settings.trusted_mirrors, Settings.max_retries_to_retrieve_hash):
|
||||
url = posixpath.join(base_url, f"{archive_path}.{algorithm}")
|
||||
logger.debug(f"Attempt to download checksum at {url}")
|
||||
try:
|
||||
r = getUrl(url, timeout)
|
||||
# sha256 & md5 files are: "some_hash archive_filename"
|
||||
return r.split(" ")[0]
|
||||
except (ArchiveConnectionError, ArchiveDownloadError):
|
||||
_hash = r.split(" ")[0]
|
||||
if len(_hash) == hash_lengths[algorithm]:
|
||||
return binascii.unhexlify(_hash)
|
||||
except (ArchiveConnectionError, ArchiveDownloadError, binascii.Incomplete, binascii.Error):
|
||||
pass
|
||||
filename = archive_path.split("/")[-1]
|
||||
raise ChecksumDownloadFailure(
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
import argparse
|
||||
import binascii
|
||||
import gc
|
||||
import multiprocessing
|
||||
import os
|
||||
@@ -1018,7 +1017,7 @@ def installer(
|
||||
timeout = (Settings.connection_timeout, Settings.response_timeout)
|
||||
else:
|
||||
timeout = (Settings.connection_timeout, response_timeout)
|
||||
hash = binascii.unhexlify(get_hash(qt_package.archive_path, algorithm="sha256", timeout=timeout))
|
||||
hash = get_hash(qt_package.archive_path, algorithm="sha256", timeout=timeout)
|
||||
|
||||
def download_bin(_base_url):
|
||||
url = posixpath.join(_base_url, qt_package.archive_path)
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
import binascii
|
||||
import itertools
|
||||
import operator
|
||||
import posixpath
|
||||
@@ -574,7 +573,7 @@ class MetadataFactory:
|
||||
@staticmethod
|
||||
def fetch_http(rest_of_url: str, is_check_hash: bool = True) -> str:
|
||||
timeout = (Settings.connection_timeout, Settings.response_timeout)
|
||||
expected_hash = binascii.unhexlify(get_hash(rest_of_url, "sha256", timeout)) if is_check_hash else None
|
||||
expected_hash = get_hash(rest_of_url, "sha256", timeout) if is_check_hash else None
|
||||
base_urls = Settings.baseurl, random.choice(Settings.fallbacks)
|
||||
for i, base_url in enumerate(base_urls):
|
||||
try:
|
||||
|
||||
@@ -201,6 +201,8 @@ def test_helper_retry_on_error(num_attempts_before_success, num_retries_allowed)
|
||||
)
|
||||
def test_helper_get_hash_retries(monkeypatch, num_tries_required, num_retries_allowed):
|
||||
num_tries = 0
|
||||
expected_hash = "a" * 64
|
||||
rest_of_url = "online/qtsdkrepository/some/path/to/archive.7z"
|
||||
|
||||
def mock_getUrl(url, *args, **kwargs):
|
||||
nonlocal num_tries
|
||||
@@ -210,20 +212,44 @@ def test_helper_get_hash_retries(monkeypatch, num_tries_required, num_retries_al
|
||||
parsed = urlparse(url)
|
||||
base = f"{parsed.scheme}://{parsed.netloc}"
|
||||
assert base in Settings.trusted_mirrors
|
||||
# Check that the url was composed properly
|
||||
assert url[len(base) :] == f"/{rest_of_url}.sha256"
|
||||
|
||||
hash_filename = str(parsed.path.split("/")[-1])
|
||||
assert hash_filename == "archive.7z.sha256"
|
||||
return "MOCK_HASH archive.7z"
|
||||
return f"{expected_hash} archive.7z"
|
||||
|
||||
monkeypatch.setattr("aqt.helper.getUrl", mock_getUrl)
|
||||
|
||||
if num_tries_required > num_retries_allowed:
|
||||
with pytest.raises(ChecksumDownloadFailure) as e:
|
||||
result = get_hash("http://insecure.mirror.com/some/path/to/archive.7z", "sha256", (5, 5))
|
||||
get_hash(rest_of_url, "sha256", (5, 5))
|
||||
assert e.type == ChecksumDownloadFailure
|
||||
else:
|
||||
result = get_hash("http://insecure.mirror.com/some/path/to/archive.7z", "sha256", (5, 5))
|
||||
assert result == "MOCK_HASH"
|
||||
result = get_hash(rest_of_url, "sha256", (5, 5))
|
||||
assert result == binascii.unhexlify(expected_hash)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"received_hash",
|
||||
(
|
||||
"", # Empty
|
||||
"a" * 40, # Hash length for sha1 checksums
|
||||
"q" * 64, # Not a hex digit; you can't unhexlify this
|
||||
),
|
||||
)
|
||||
def test_helper_get_hash_bad_hash(monkeypatch, received_hash):
|
||||
def mock_getUrl(url, *args, **kwargs):
|
||||
hash_filename = str(urlparse(url).path.split("/")[-1])
|
||||
assert hash_filename.endswith(".sha256")
|
||||
filename = hash_filename[: -len(".sha256")]
|
||||
return f"{received_hash} {filename}"
|
||||
|
||||
monkeypatch.setattr("aqt.helper.getUrl", mock_getUrl)
|
||||
|
||||
with pytest.raises(ChecksumDownloadFailure) as e:
|
||||
get_hash("online/qtsdkrepository/some/path/to/archive.7z", "sha256", (5, 5))
|
||||
assert e.type == ChecksumDownloadFailure
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
Reference in New Issue
Block a user