"""Various Security Tests"""
from __future__ import print_function
import csv
import logging
from os.path import basename
import subprocess
import nose
from rhui3_tests_lib.conmgr import ConMgr
from rhui3_tests_lib.rhuimanager import RHUIManager
from rhui3_tests_lib.rhuimanager_instance import RHUIManagerInstance
logging.basicConfig(level=logging.DEBUG)
HOSTNAMES = {"RHUA": ConMgr.get_rhua_hostname(),
"CDS": ConMgr.get_cds_hostnames(),
"HAProxy": ConMgr.get_haproxy_hostnames()}
PORTS = {"puppet": 8140,
"https": 443,
"crane": 5000}
PROTOCOL_TEST_CMD = "echo | openssl s_client -%s -connect %s:%s; echo $?"
# these are in fact the s_client options for protocols, just without the dash
PROTOCOLS = {"good": ["tls1_2"],
"bad": ["ssl3", "tls1", "tls1_1"]}
RESULTS = {"good": "Secure Renegotiation IS supported",
"bad": "Secure Renegotiation IS NOT supported"}
# connections to the RHUA and the HAProxy nodes
RHUA = ConMgr.connect()
HAPROXIES = [ConMgr.connect(host) for host in HOSTNAMES["HAProxy"]]
def _check_protocols(hostname, port):
"""helper method to try various protocols on hostname:port"""
# check allowed protocols
for protocol in PROTOCOLS["good"]:
raw_output = subprocess.check_output(PROTOCOL_TEST_CMD % (protocol, hostname, port),
shell=True,
stderr=subprocess.STDOUT)
output_lines = raw_output.decode().splitlines()
# check for the line that indicates a good result
nose.tools.ok_(RESULTS["good"] in output_lines,
msg="s_client didn't print '%s' when using %s with %s:%s" % \
(RESULTS["good"], protocol, hostname, port))
# also check the exit status (the last line), should be 0 to indicate success
nose.tools.eq_(int(output_lines[-1]), 0)
# check disallowed protocols
for protocol in PROTOCOLS["bad"]:
raw_output = subprocess.check_output(PROTOCOL_TEST_CMD % (protocol, hostname, port),
shell=True,
stderr=subprocess.STDOUT)
output_lines = raw_output.decode().splitlines()
# check for the line that indicates a bad result
nose.tools.ok_(RESULTS["bad"] in output_lines,
msg="s_client didn't print '%s' when using %s with %s:%s" % \
(RESULTS["bad"], protocol, hostname, port))
# also check the exit status (the last line), should be 1 to indicate a failure
nose.tools.eq_(int(output_lines[-1]), 1)
[docs]def setup():
"""announce the beginning of the test run"""
print("*** Running %s: *** " % basename(__file__))
[docs]def test_01_login_add_cds_hap():
"""log in to RHUI, add CDS and HAProxy nodes"""
RHUIManager.initial_run(RHUA)
for cds in HOSTNAMES["CDS"]:
RHUIManagerInstance.add_instance(RHUA, "cds", cds)
for haproxy in HOSTNAMES["HAProxy"]:
RHUIManagerInstance.add_instance(RHUA, "loadbalancers", haproxy)
[docs]def test_02_puppet():
"""check protocols allowed by Puppet on the RHUA"""
# for RHBZ#1637261
_check_protocols(HOSTNAMES["RHUA"], PORTS["puppet"])
[docs]def test_03_https_rhua():
"""check protocols allowed by Apache on the RHUA"""
# for RHBZ#1637261
_check_protocols(HOSTNAMES["RHUA"], PORTS["https"])
[docs]def test_04_https_cds():
"""check protocols allowed by Apache on the CDS nodes"""
# for RHBZ#1637261
for cds in HOSTNAMES["CDS"]:
_check_protocols(cds, PORTS["https"])
[docs]def test_05_crane_cds():
"""check protocols allowed by Crane on the CDS nodes"""
# for RHBZ#1637261
for cds in HOSTNAMES["CDS"]:
_check_protocols(cds, PORTS["crane"])
[docs]def test_06_haproxy_stats():
"""check haproxy stats"""
# for RHBZ#1718066
for haproxy in HAPROXIES:
_, stdout, _ = haproxy.exec_command("echo 'show stat' | nc -U /var/lib/haproxy/stats")
stats = list(csv.DictReader(stdout))
cranestats = {row["svname"]: row["status"] for row in stats if row["# pxname"] == "crane00"}
httpsstats = {row["svname"]: row["status"] for row in stats if row["# pxname"] == "https00"}
# check the stats for the frontend, the CDS nodes, and the backend; crane & https
nose.tools.eq_(cranestats["FRONTEND"], "OPEN")
nose.tools.eq_(cranestats["BACKEND"], "UP")
for cds in HOSTNAMES["CDS"]:
nose.tools.eq_(cranestats[cds], "UP")
nose.tools.eq_(httpsstats["FRONTEND"], "OPEN")
nose.tools.eq_(httpsstats["BACKEND"], "UP")
for cds in HOSTNAMES["CDS"]:
nose.tools.eq_(httpsstats[cds], "UP")
[docs]def test_99_cleanup():
"""delete CDS and HAProxy nodes"""
RHUIManagerInstance.delete_all(RHUA, "loadbalancers")
RHUIManagerInstance.delete_all(RHUA, "cds")
[docs]def teardown():
"""announce the end of the test run"""
print("*** Finished running %s. *** " % basename(__file__))