skyline-apiserver/libs/skyline-policy-manager/tests/unit/cmd/test_manage.py
Gao Hanxiang 1d14b4067c test: Add skyline-policy-manager unit test
1. Add skyline-policy-manager unit test case and tools
2. Update `setup` method type annotation
3. Move some constants to `skyline_policy_manager.constants`
4. Adjust import to use modules instead of functions
5. Update code format

Change-Id: Ic72e21126de0b16e4d969ad48ce64c57542c4667
2021-09-29 02:50:31 +00:00

159 lines
5.5 KiB
Python

# Copyright 2021 99cloud
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from dataclasses import asdict
from importlib import metadata
from importlib.metadata import EntryPoint
from pathlib import Path
from typing import Dict, List, Tuple, Union
import pytest
from click.testing import CliRunner
from oslo_policy.policy import DocumentedRuleDefault, RuleDefault
from skyline_policy_manager import constants, policies
from skyline_policy_manager.cmd.manage import (
generate_conf,
generate_rule,
generate_sample,
policy_manager,
validate,
)
from tests import fake
from tests.fake import FAKE_NS, FAKE_SERVICE_EPS, FAKER, FakeDocumentedRuleData, FakeRuleData
from tests.models import ArgumentData, TestData
class TestPolicyManager:
@pytest.fixture(autouse=True)
def setup_entry_points(self, monkeypatch) -> None:
eps = []
for ep_names in FAKE_SERVICE_EPS.values():
for ep_name in ep_names:
fake_rules: List[Union[DocumentedRuleDefault, RuleDefault]]
fake_rules = [
DocumentedRuleDefault(**asdict(FakeDocumentedRuleData()))
for _ in range(FAKER.numbers.integer_number(1, 10))
]
fake_rules.extend(
[
RuleDefault(**asdict(FakeRuleData()))
for _ in range(FAKER.numbers.integer_number(1, 3))
],
)
monkeypatch.setattr(fake, f"{ep_name}_list_rules", lambda: fake_rules)
eps.append(
EntryPoint(
name=ep_name,
value=f"tests.fake:{ep_name}_list_rules",
group=FAKE_NS,
),
)
def entry_points() -> Dict[str, Tuple[EntryPoint, ...]]:
return {FAKE_NS: tuple(eps)}
monkeypatch.setattr(metadata, "entry_points", entry_points)
monkeypatch.setattr(constants, "POLICY_NS", FAKE_NS)
monkeypatch.setattr(constants, "SUPPORTED_SERVICE_EPS", FAKE_SERVICE_EPS)
@pytest.fixture
def runner(self) -> CliRunner:
runner = CliRunner()
return runner
@pytest.mark.ddt(
TestData(
arguments=("dir_path",),
argument_data_set=[
ArgumentData(
id="str_dir_path",
values=(FAKER.text.word(),),
),
],
),
)
def test_generate_sample(self, runner: CliRunner, tmp_path: Path, dir_path: str) -> None:
sample_dir = tmp_path.joinpath(dir_path)
sample_dir.mkdir(parents=True, exist_ok=True)
policy_manager.add_command(generate_sample)
result = runner.invoke(
policy_manager,
["generate-sample", "--dir", sample_dir.as_posix()],
)
assert result.exit_code == 0
for service in FAKE_SERVICE_EPS:
assert sample_dir.joinpath(service).exists()
assert sample_dir.joinpath(service).joinpath("policy.yaml.sample").exists()
@pytest.mark.ddt(
TestData(
arguments=("dir_path",),
argument_data_set=[
ArgumentData(
id="str_dir_path",
values=(FAKER.text.word(),),
),
],
),
TestData(
arguments=("description",),
argument_data_set=[
ArgumentData(
id="str_description",
values=(FAKER.text.text(),),
),
],
),
)
def test_generate_conf(
self,
runner: CliRunner,
tmp_path: Path,
dir_path: str,
description: str,
) -> None:
conf_dir = tmp_path.joinpath(dir_path)
conf_dir.mkdir(parents=True, exist_ok=True)
policy_manager.add_command(generate_conf)
result = runner.invoke(
policy_manager,
["generate-conf", "--dir", conf_dir.as_posix(), "--desc", description],
)
service_rules = policies.get_service_rules()
assert result.exit_code == 0
for service in service_rules:
assert conf_dir.joinpath(service).exists()
assert conf_dir.joinpath(service).joinpath("policy.yaml").exists()
assert description in conf_dir.joinpath(service).joinpath("policy.yaml").read_text()
def test_generate_rule(self, runner: CliRunner) -> None:
policy_manager.add_command(generate_rule)
for ep_names in FAKE_SERVICE_EPS.values():
for ep_name in ep_names:
result = runner.invoke(policy_manager, ["generate-rule", ep_name])
assert result.exit_code == 0
def test_validate(self, runner: CliRunner) -> None:
policy_manager.add_command(validate)
result = runner.invoke(
policy_manager,
[
"validate",
"--diff",
],
)
assert result.exit_code == 0