'''RHUI CLI tests'''
from __future__ import print_function
import logging
from os.path import basename, getsize, join
import re
from shutil import rmtree
from tempfile import mkdtemp
import time
try:
from configparser import ConfigParser # Python 3+
except ImportError:
from ConfigParser import ConfigParser # Python 2
import nose
from stitches.expect import Expect
import yaml
from rhui3_tests_lib.conmgr import ConMgr
from rhui3_tests_lib.rhuimanager import RHUIManager
from rhui3_tests_lib.rhuimanager_cmdline import RHUIManagerCLI, \
CustomRepoAlreadyExists, \
CustomRepoGpgKeyNotFound
from rhui3_tests_lib.subscription import RHSMRHUI
from rhui3_tests_lib.util import Util
logging.basicConfig(level=logging.DEBUG)
RHUA = ConMgr.connect()
CUSTOM_REPOS = ["my_custom_repo", "another_custom_repo", "yet_another_custom_repo"]
CR_NAMES = [cr.replace("_", " ").title() for cr in CUSTOM_REPOS]
ALT_CONTENT_SRC_NAME = "atomic_cs"
CLI_CFG = ["test-rhui", "1.0", "0.1"]
DATADIR = "/tmp/extra_rhui_files"
KEYFILE = "test_gpg_key"
TEST_RPM = "rhui-rpm-upload-test-1-1.noarch.rpm"
OST_PKG = "ostree"
CERTS = {"Atomic": "rhcert_atomic.pem",
"expired": "rhcert_expired.pem",
"incompatible": "rhcert_incompatible.pem",
"partial": "rhcert_partially_invalid.pem",
"empty": "rhcert_empty.pem"}
TMPDIR = mkdtemp()
AVAILABLE_POOL_FILE = join(TMPDIR, "available")
REGISTERED_POOL_FILE = join(TMPDIR, "registered")
YUM_REPO_FILE = join(TMPDIR, "rh-cloud.repo")
[docs]class TestCLI(object):
'''
class for CLI tests
'''
def __init__(self):
with open("/etc/rhui3_tests/tested_repos.yaml") as configfile:
doc = yaml.load(configfile)
self.yum_repo_names = [doc["CLI_repo1"]["name"], doc["CLI_repo2"]["name"]]
self.yum_repo_ids = [doc["CLI_repo1"]["id"], doc["CLI_repo2"]["id"]]
self.yum_repo_labels = [doc["CLI_repo1"]["label"], doc["CLI_repo2"]["label"]]
self.yum_repo_paths = [doc["CLI_repo1"]["path"], doc["CLI_repo2"]["path"]]
self.product = doc["CLI_product"]
self.remote_content = doc["remote_content"]
self.subscriptions = doc["subscriptions"]
[docs] @staticmethod
def setup_class():
'''
announce the beginning of the test run
'''
print("*** Running %s: *** " % basename(__file__))
[docs] @staticmethod
def test_01_init_repo_check():
'''log in to RHUI, check if the repo list is empty'''
RHUIManager.initial_run(RHUA)
repolist = RHUIManagerCLI.repo_list(RHUA, True)
nose.tools.ok_(not repolist, msg="there are some repos already: %s" % repolist)
[docs] @staticmethod
def test_02_create_custom_repos():
'''create three custom repos for testing'''
# the first repo will be unprotected, with default parameters
RHUIManagerCLI.repo_create_custom(RHUA, CUSTOM_REPOS[0])
# the second repo will have a lot of custom parameters; it will be a protected repo
RHUIManagerCLI.repo_create_custom(RHUA,
repo_id=CUSTOM_REPOS[1],
path="huh-%s" % CUSTOM_REPOS[1],
display_name=CR_NAMES[1],
legacy_md=True,
protected=True,
gpg_public_keys="%s/%s" % (DATADIR, KEYFILE))
# the third repo will also be protected
RHUIManagerCLI.repo_create_custom(RHUA,
repo_id=CUSTOM_REPOS[2],
protected=True)
[docs] @staticmethod
def test_03_custom_repo_checks():
'''check if the custom repo cannot be added twice and if the GPG key path is validated'''
nose.tools.assert_raises(CustomRepoAlreadyExists,
RHUIManagerCLI.repo_create_custom,
RHUA,
CUSTOM_REPOS[0])
nose.tools.assert_raises(CustomRepoGpgKeyNotFound,
RHUIManagerCLI.repo_create_custom,
RHUA,
CUSTOM_REPOS[0] + "2",
gpg_public_keys="/this_file_cant_be_there")
[docs] @staticmethod
def test_04_check_custom_repos():
'''check if the custom repos were actually created'''
# try a delimiter this time
delimiter = ","
repos_expected = delimiter.join(sorted(CUSTOM_REPOS))
repos_actual = RHUIManagerCLI.repo_list(RHUA, True, False, delimiter)
nose.tools.eq_(repos_expected, repos_actual)
# ^ also checks if the repo IDs are sorted
[docs] @staticmethod
def test_05_upload_local_rpms():
'''upload content from a local directory to one of the custom repos'''
RHUIManagerCLI.packages_upload(RHUA, CUSTOM_REPOS[0], "%s/%s" % (DATADIR, TEST_RPM))
# also supply the whole directory
RHUIManagerCLI.packages_upload(RHUA, CUSTOM_REPOS[0], DATADIR)
[docs] def test_06_upload_remote_rpms(self):
'''upload content from remote servers to the custom repos'''
# try single RPMs first
RHUIManagerCLI.packages_remote(RHUA, CUSTOM_REPOS[1], self.remote_content["rpm"])
RHUIManagerCLI.packages_remote(RHUA, CUSTOM_REPOS[1], self.remote_content["ftp"])
# and now an HTML page with links to RPMs
RHUIManagerCLI.packages_remote(RHUA,
CUSTOM_REPOS[2],
self.remote_content["html_with_links"])
# and finally also some bad stuff
rhua = ConMgr.get_rhua_hostname()
try:
RHUIManagerCLI.packages_remote(RHUA, CUSTOM_REPOS[2], "https://%s/" % rhua)
except RuntimeError as err:
# the RHUA listens on port 443 and uses a self-signed cert, which should be refused
nose.tools.ok_("CERTIFICATE_VERIFY_FAILED" in str(err))
try:
# the RHUA also listens on port 80
# create an empty subdirectory, supply it to rhui-manager, and expect no RPMs
test_dir = "r"
Expect.expect_retval(RHUA, "mkdir -p /var/www/html/%s" % test_dir)
RHUIManagerCLI.packages_remote(RHUA, CUSTOM_REPOS[2], "http://%s/%s" % (rhua, test_dir))
except RuntimeError as err:
Expect.expect_retval(RHUA, "rmdir /var/www/html/%s" % test_dir)
nose.tools.ok_("Found 0 RPMs" in str(err))
[docs] def test_07_check_packages(self):
'''check that the uploaded packages are now in the repos'''
package_lists = [RHUIManagerCLI.packages_list(RHUA, repo) for repo in CUSTOM_REPOS]
nose.tools.eq_(package_lists[0], Util.get_rpms_in_dir(RHUA, DATADIR))
rpm_ftp_combined = sorted([basename(self.remote_content[p]) for p in ["rpm", "ftp"]])
nose.tools.eq_(package_lists[1], rpm_ftp_combined)
linked_rpms = sorted(Util.get_rpm_links(self.remote_content["html_with_links"]))
nose.tools.eq_(package_lists[2], linked_rpms)
[docs] @staticmethod
def test_08_upload_certificate():
'''upload the Atomic (the small) entitlement certificate'''
RHUIManagerCLI.cert_upload(RHUA, "%s/%s" % (DATADIR, CERTS["Atomic"]))
[docs] def test_09_check_certificate_info(self):
'''check certificate info for validity'''
ent_list = RHUIManagerCLI.cert_info(RHUA)
nose.tools.ok_(self.yum_repo_names[0] in ent_list,
msg="%s not found in %s" % (self.yum_repo_names[0], ent_list))
[docs] @staticmethod
def test_10_check_certificate_exp():
'''check if the certificate expiration date is OK'''
RHUIManager.cacert_expiration(RHUA)
[docs] def test_11_check_unused_product(self):
'''check if a repo is available'''
unused_repos = RHUIManagerCLI.repo_unused(RHUA)
nose.tools.ok_(self.yum_repo_names[0] in unused_repos,
msg="%s not found in %s" % (self.yum_repo_names[0], unused_repos))
[docs] def test_12_add_rh_repo_by_id(self):
'''add a Red Hat repo by its ID'''
RHUIManagerCLI.repo_add_by_repo(RHUA, [self.yum_repo_ids[1]])
[docs] def test_13_add_rh_repo_by_product(self):
'''add a Red Hat repo by its product name'''
RHUIManagerCLI.repo_add(RHUA, self.yum_repo_names[0])
[docs] def test_14_repo_list(self):
'''check the added repos'''
repolist_actual = RHUIManagerCLI.repo_list(RHUA, True, True).splitlines()
nose.tools.eq_(self.yum_repo_ids, repolist_actual)
[docs] def test_15_start_syncing_repo(self):
'''sync one of the repos'''
RHUIManagerCLI.repo_sync(RHUA, self.yum_repo_ids[1], self.yum_repo_names[1])
[docs] def test_16_repo_info(self):
'''verify that the repo name is part of the information about the specified repo ID'''
info = RHUIManagerCLI.repo_info(RHUA, self.yum_repo_ids[1])
nose.tools.eq_(info["name"], self.yum_repo_names[1])
[docs] def test_17_check_package_in_repo(self):
'''check a random package in the repo'''
package_list = RHUIManagerCLI.packages_list(RHUA, self.yum_repo_ids[1])
test_package_list = [package for package in package_list if package.startswith(OST_PKG)]
nose.tools.ok_(test_package_list,
msg="no %s* in %s" % (OST_PKG, package_list))
[docs] def test_18_list_labels(self):
'''check repo labels'''
actual_labels = RHUIManagerCLI.client_labels(RHUA)
nose.tools.ok_(all(repo in actual_labels for repo in self.yum_repo_labels),
msg="%s not found in %s" % (self.yum_repo_labels, actual_labels))
[docs] def test_19_generate_certificate(self):
'''generate an entitlement certificate'''
# generate it for RH repos and the first protected custom repo
# the label is the repo ID in the case of custom repos
RHUIManagerCLI.client_cert(RHUA,
self.yum_repo_labels + [CUSTOM_REPOS[1]],
CLI_CFG[0],
365,
"/tmp")
[docs] @staticmethod
def test_20_check_cli_crt_sig():
'''check if SHA-256 is used in the client certificate signature'''
# for RHBZ#1628957
sigs_expected = ["sha256", "sha256"]
_, stdout, _ = RHUA.exec_command("openssl x509 -noout -text -in " +
"/tmp/%s.crt" % CLI_CFG[0])
cert_details = stdout.read().decode()
sigs_actual = re.findall("sha[0-9]+", cert_details)
nose.tools.eq_(sigs_expected, sigs_actual)
[docs] def test_21_check_stray_custom_repo(self):
'''check if only the wanted repos are in the certificate'''
repo_labels_expected = ["custom-%s" % CUSTOM_REPOS[1]] + self.yum_repo_labels
_, stdout, _ = RHUA.exec_command("cat /tmp/%s-extensions.txt" % CLI_CFG[0])
extensions = stdout.read().decode()
repo_labels_actual = re.findall("|".join(["custom-.*"] + self.yum_repo_labels),
extensions)
nose.tools.eq_(sorted(repo_labels_expected), sorted(repo_labels_actual))
[docs] @staticmethod
def test_22_create_cli_config_rpm():
'''create a client configuration RPM'''
RHUIManagerCLI.client_rpm(RHUA,
["/tmp/%s.key" % CLI_CFG[0], "/tmp/%s.crt" % CLI_CFG[0]],
CLI_CFG,
"/tmp",
[CUSTOM_REPOS[0]],
"_none_")
# check if the rpm was created
conf_rpm = "/tmp/%s-%s/build/RPMS/noarch/%s-%s-%s.noarch.rpm" % tuple(CLI_CFG[:2] + CLI_CFG)
Expect.expect_retval(RHUA, "test -f %s" % conf_rpm)
[docs] def test_23_ensure_gpgcheck_config(self):
'''ensure that GPG checking is configured in the client configuration as expected'''
# for RHBZ#1428756
# we'll need the repo file in a few tests; fetch it now
remote_repo_file = "/tmp/%s-%s/build/BUILD/%s-%s/rh-cloud.repo" % tuple(CLI_CFG[:2] * 2)
try:
Util.fetch(RHUA, remote_repo_file, YUM_REPO_FILE)
except IOError:
raise RuntimeError("configuration not created, can't test it")
yum_cfg = ConfigParser()
yum_cfg.read(YUM_REPO_FILE)
# check RH repos: they all must have GPG checking enabled; get a list of those that don't
bad = [r for r in self.yum_repo_labels if not yum_cfg.getboolean("rhui-%s" % r, "gpgcheck")]
# check custom repos: the 2nd must have GPG checking enabled:
if not yum_cfg.getboolean("rhui-custom-%s" % CUSTOM_REPOS[1], "gpgcheck"):
bad.append(CUSTOM_REPOS[1])
# the first one mustn't:
if yum_cfg.getboolean("rhui-custom-%s" % CUSTOM_REPOS[0], "gpgcheck"):
bad.append(CUSTOM_REPOS[0])
nose.tools.ok_(not bad, msg="Unexpected GPG checking configuration for %s" % bad)
[docs] @staticmethod
def test_24_ensure_proxy_config():
'''ensure that the proxy setting is used in the client configuration'''
# for RHBZ#1658088
# reuse the fetched file if possible
if not getsize(YUM_REPO_FILE):
raise RuntimeError("configuration not created, can't test it")
yum_cfg = ConfigParser()
yum_cfg.read(YUM_REPO_FILE)
nose.tools.ok_(all([yum_cfg.get(r, "proxy") == "_none_" for r in yum_cfg.sections()]))
[docs] @staticmethod
def test_25_custom_repo_used():
'''check if the protected custom repo is included in the client configuration'''
# for RHBZ#1663422
# reuse the fetched file if possible
if not getsize(YUM_REPO_FILE):
raise RuntimeError("configuration not created, can't test it")
yum_cfg = ConfigParser()
yum_cfg.read(YUM_REPO_FILE)
nose.tools.ok_("rhui-custom-%s" % CUSTOM_REPOS[1] in yum_cfg.sections())
[docs] def test_26_create_acs_config_rpm(self):
'''create an alternate content source configuration RPM'''
# for RHBZ#1695464
name = ALT_CONTENT_SRC_NAME
RHUIManagerCLI.client_content_source(RHUA,
self.yum_repo_labels,
[name],
"/tmp")
# check that
cmd = "rpm2cpio /tmp/%s-2.0/build/RPMS/noarch/%s-2.0-1.noarch.rpm | " % (name, name) + \
r"cpio -i --to-stdout \*.conf | " + \
"sed -n -e '/^paths:/,$p' | " + \
"sed s/paths://"
_, stdout, _ = RHUA.exec_command(cmd)
paths_actual_raw = stdout.read().decode().splitlines()
# the paths are indented, let's get rid of the formatting
paths_actual = [p.lstrip() for p in paths_actual_raw]
# the OSTree repo must not be included
paths_expected = [p for p in self.yum_repo_paths if OST_PKG not in p]
nose.tools.eq_(paths_expected, paths_actual)
[docs] @staticmethod
def test_27_upload_expired_cert():
'''check expired certificate handling'''
try:
RHUIManagerCLI.cert_upload(RHUA, "%s/%s" % (DATADIR, CERTS["expired"]))
except RuntimeError as err:
nose.tools.ok_("The provided certificate is expired or invalid" in str(err),
msg="unexpected error: %s" % err)
[docs] @staticmethod
def test_28_upload_incompat_cert():
'''check incompatible certificate handling'''
cert = "%s/%s" % (DATADIR, CERTS["incompatible"])
if Util.cert_expired(RHUA, cert):
raise nose.exc.SkipTest("The given certificate has already expired.")
try:
RHUIManagerCLI.cert_upload(RHUA, cert)
except RuntimeError as err:
nose.tools.ok_("does not contain any entitlements" in str(err),
msg="unexpected error: %s" % err)
[docs] def test_29_register_system(self):
'''register the system in RHSM, attach the RHUI subscription'''
RHSMRHUI.register_system(RHUA)
RHSMRHUI.attach_subscription(RHUA, self.subscriptions["RHUI"])
[docs] @staticmethod
def test_30_fetch_available_pool():
'''fetch the available pool ID'''
available_pools = RHUIManagerCLI.subscriptions_list(RHUA, "available", True)
nose.tools.ok_(available_pools, msg="no available pool")
available_pool = available_pools[0]
nose.tools.ok_(re.match(r"^[0-9a-f]+$", available_pool),
msg="invalid pool ID: '%s'" % available_pool)
with open(AVAILABLE_POOL_FILE, "w") as apf:
apf.write(available_pool)
[docs] @staticmethod
def test_31_register_subscription():
'''register the subscription using the fetched pool ID'''
try:
with open(AVAILABLE_POOL_FILE) as apf:
available_pool = apf.read()
except IOError:
raise RuntimeError("pool ID was not fetched")
nose.tools.ok_(re.match(r"^[0-9a-f]+$", available_pool),
msg="invalid pool ID: '%s'" % available_pool)
RHUIManagerCLI.subscriptions_register(RHUA, available_pool)
[docs] @staticmethod
def test_32_fetch_registered_pool():
'''fetch the registered pool ID'''
registered_pools = RHUIManagerCLI.subscriptions_list(RHUA, "registered", True)
nose.tools.ok_(registered_pools, msg="no registered pool")
registered_pool = registered_pools[0]
nose.tools.ok_(re.match(r"^[0-9a-f]+$", registered_pool),
msg="invalid pool ID: '%s'" % registered_pool)
with open(REGISTERED_POOL_FILE, "w") as rpf:
rpf.write(registered_pool)
[docs] @staticmethod
def test_33_compare_pools():
'''check if the previously available and now registered pool IDs are the same'''
try:
with open(AVAILABLE_POOL_FILE) as apf:
available_pool = apf.read()
except IOError:
raise RuntimeError("no known available pool ID")
try:
with open(REGISTERED_POOL_FILE) as rpf:
registered_pool = rpf.read()
except IOError:
raise RuntimeError("no known registered pool ID")
nose.tools.eq_(available_pool, registered_pool)
[docs] def test_34_check_reg_pool_for_rhui(self):
'''check if the registered subscription's description is RHUI for CCSP'''
reg_sub = RHUIManagerCLI.subscriptions_list(RHUA)
nose.tools.ok_(reg_sub, msg="no subscription is registered")
nose.tools.eq_(self.subscriptions["RHUI"],
list(reg_sub.keys())[0],
msg="Expected subscription not registered in RHUI! Got: %s" % reg_sub)
[docs] @staticmethod
def test_35_unregister_subscription():
'''remove the subscription from RHUI'''
try:
with open(REGISTERED_POOL_FILE) as rpf:
registered_pool = rpf.read()
except IOError:
raise RuntimeError("no known registered pool ID")
nose.tools.ok_(re.match(r"^[0-9a-f]+$", registered_pool),
msg="invalid pool ID: '%s'" % registered_pool)
RHUIManagerCLI.subscriptions_unregister(RHUA, registered_pool)
[docs] @staticmethod
def test_36_unregister_system():
'''unregister the system from RHSM'''
RHSMRHUI.unregister_system(RHUA)
[docs] def test_37_resync_repo(self):
'''sync the repo again'''
RHUIManagerCLI.repo_sync(RHUA, self.yum_repo_ids[1], self.yum_repo_names[1])
[docs] @staticmethod
def test_38_resync_no_warning():
'''check if the syncs did not cause known unnecessary warnings'''
# for RHBZ#1506872
Expect.expect_retval(RHUA, "grep 'pulp.*metadata:WARNING' /var/log/messages", 1)
# for RHBZ#1579294
Expect.expect_retval(RHUA, "grep 'pulp.*publish:WARNING' /var/log/messages", 1)
# for RHBZ#1487523
Expect.expect_retval(RHUA,
"grep 'pulp.*Purging duplicate NEVRA can' /var/log/messages", 1)
[docs] @staticmethod
def test_39_list_repos():
'''get a list of available repos for further examination'''
Expect.expect_retval(RHUA,
"rhui-manager repo unused > /tmp/repos.stdout 2> /tmp/repos.stderr",
timeout=1200)
[docs] @staticmethod
def test_40_check_iso_repos():
'''check if non-RPM repos were ignored'''
# for RHBZ#1199426
Expect.expect_retval(RHUA,
"egrep 'Containers|Images|ISOs|Kickstart' /tmp/repos.stdout", 1)
[docs] @staticmethod
def test_41_check_pygiwarning():
'''check if PyGIWarning was not issued'''
# for RHBZ#1450430
Expect.expect_retval(RHUA, "grep PyGIWarning /tmp/repos.stderr", 1)
[docs] def test_42_check_repo_sorting(self):
'''check if repo lists are sorted'''
# for RHBZ#1601478
repolist_expected = sorted(CUSTOM_REPOS + self.yum_repo_ids)
repolist_actual = RHUIManagerCLI.repo_list(RHUA, True).splitlines()
nose.tools.eq_(repolist_expected, repolist_actual)
[docs] def test_43_upload_semi_bad_cert(self):
'''check that a partially invalid certificate can still be accepted'''
# for RHBZ#1588931 & RHBZ#1584527
# delete currently used certificates and repos first
RHUIManager.remove_rh_certs(RHUA)
for repo in CUSTOM_REPOS + self.yum_repo_ids:
RHUIManagerCLI.repo_delete(RHUA, repo)
repolist = RHUIManagerCLI.repo_list(RHUA, True)
nose.tools.ok_(not repolist, msg="can't continue as some repos remain: %s" % repolist)
# try uploading the cert now
cert = "%s/%s" % (DATADIR, CERTS["partial"])
if Util.cert_expired(RHUA, cert):
raise nose.exc.SkipTest("The given certificate has already expired.")
RHUIManagerCLI.cert_upload(RHUA, cert)
# the RHUI log must contain the fact that an invalid path was found in the cert
Expect.ping_pong(RHUA, "tail /root/.rhui/rhui.log", "Invalid entitlement path")
RHUIManager.remove_rh_certs(RHUA)
[docs] @staticmethod
def test_44_upload_empty_cert():
'''check that an empty certificate is rejected (no traceback)'''
# for RHBZ#1497028
cert = "%s/%s" % (DATADIR, CERTS["empty"])
if Util.cert_expired(RHUA, cert):
raise nose.exc.SkipTest("The given certificate has already expired.")
try:
RHUIManagerCLI.cert_upload(RHUA, cert)
except RuntimeError as err:
nose.tools.ok_("does not contain any entitlements" in str(err),
msg="unexpected error: %s" % err)
[docs] def test_45_multi_repo_product(self):
'''check that all repos in a multi-repo product get added'''
# for RHBZ#1651638
RHUIManagerCLI.cert_upload(RHUA, "%s/%s" % (DATADIR, CERTS["Atomic"]))
RHUIManagerCLI.repo_add(RHUA, self.product["name"])
# wait a few seconds for the repo to actually get added
time.sleep(4)
repolist_actual = RHUIManagerCLI.repo_list(RHUA, True).splitlines()
nose.tools.eq_([self.product["id"]], repolist_actual)
[docs] def test_99_cleanup(self):
'''cleanup: remove repos and temporary files'''
RHUIManagerCLI.repo_delete(RHUA, self.product["id"])
RHUIManager.remove_rh_certs(RHUA)
Expect.ping_pong(RHUA, "rm -rf /tmp/%s* ; " % CLI_CFG[0] +
"ls /tmp/%s* 2>&1" % CLI_CFG[0],
"No such file or directory")
Expect.ping_pong(RHUA, "rm -f /tmp/repos.std{out,err} ; " +
"ls /tmp/repos.std{out,err} 2>&1",
"No such file or directory")
Expect.ping_pong(RHUA, "rm -rf /tmp/%s* ; " % ALT_CONTENT_SRC_NAME +
"ls /tmp/%s* 2>&1" % ALT_CONTENT_SRC_NAME,
"No such file or directory")
rmtree(TMPDIR)
[docs] @staticmethod
def teardown_class():
'''
announce the end of the test run
'''
print("*** Finished running %s. *** " % basename(__file__))