Source code for kadi_apy.lib.commons

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from datetime import timedelta

from kadi_apy.lib.exceptions import KadiAPYRequestError
from kadi_apy.lib.utils import _utcnow
from kadi_apy.lib.utils import chunked_response


class RequestMixin:
    """Mixin class to add request-related functionality."""

    def __init__(self, manager):
        self.manager = manager

    def _get(self, endpoint, **kwargs):
        return self.manager._get(endpoint, **kwargs)

    def _post(self, endpoint, **kwargs):
        return self.manager._post(endpoint, **kwargs)

    def _patch(self, endpoint, **kwargs):
        if hasattr(self, "_meta"):
            self._meta = None
        return self.manager._patch(endpoint, **kwargs)

    def _put(self, endpoint, **kwargs):
        return self.manager._put(endpoint, **kwargs)

    def _delete(self, endpoint, **kwargs):
        if hasattr(self, "_meta"):
            self._meta = None
        return self.manager._delete(endpoint, **kwargs)


class VerboseMixin:
    """Mixin class to add basic logging functionality."""

    def error(self, text, **kwargs):
        r"""Print text for error level.

        :param text: Text to be printed via :func:`click.echo()`.
        :type text: str
        :param \**kwargs: Additional arguments to pass to :func:`click.echo()`.
        """

        self.manager.error(text=text, **kwargs)

    def warning(self, text, **kwargs):
        r"""Print text for warning level.

        :param text: Text to be printed via :func:`click.echo()`.
        :type text: str
        :param \**kwargs: Additional arguments to pass to :func:`click.echo()`.
        """

        self.manager.warning(text=text, **kwargs)

    def info(self, text, **kwargs):
        r"""Print text for info level.

        :param text: Text to be printed via :func:`click.echo()`.
        :type text: str
        :param \**kwargs: Additional arguments to pass to :func:`click.echo()`.
        """

        self.manager.info(text=text, **kwargs)

    def debug(self, text, **kwargs):
        r"""Print text for debug level.

        :param text: Text to be printed via :func:`click.echo()`.
        :type text: str
        :param \**kwargs: Additional arguments to pass to :func:`click.echo()`.
        """

        self.manager.debug(text=text, **kwargs)

    def is_verbose(self, **kwargs):
        """Check the verbose level.

        :return: See :meth:`.KadiManager.is_verbose`.
        """

        return self.manager.is_verbose(**kwargs)


class PermissionMixin:
    """Mixin class to add user and group role management functionality."""

    def add_user(self, user_id, role_name):
        """Add a user role.

        :param user_id: The ID of the user to add.
        :type user_id: int
        :param role_name: Role of the User.
        :type role_name: str
        :return: The Response object.
        """
        endpoint = f"{self.base_path}/{self.id}/roles/users"
        data = {"role": {"name": role_name}, "user": {"id": user_id}}

        return self._post(endpoint, json=data)

    def change_user_role(self, user_id, role_name):
        """Change user role.

        :param user_id: The ID of the user whose role should be changed.
        :type user_id: int
        :param role_name: Name of the new role.
        :type role_name: str
        :return: The response object.
        """

        endpoint = f"{self.base_path}/{self.id}/roles/users/{user_id}"
        data = {"name": role_name}
        return self._patch(endpoint, json=data)

    def remove_user(self, user_id):
        """Remove a user role.

        :param user_id: The ID of the user to remove.
        :type user_id: int
        :return: The response object.
        """
        endpoint = f"{self.base_path}/{self.id}/roles/users/{user_id}"
        return self._delete(endpoint, json=None)

    def add_group_role(self, group_id, role_name):
        """Add a group role.

        :param group_id: The ID of the group to add.
        :type group_id: int
        :param role_name: Role of the group.
        :type role_name: str
        :return: The response object.
        """
        endpoint = f"{self.base_path}/{self.id}/roles/groups"
        data = {"role": {"name": role_name}, "group": {"id": group_id}}

        return self._post(endpoint, json=data)

    def change_group_role(self, group_id, role_name):
        """Change group role.

        :param group_id: The ID of the group whose role should be changed.
        :type group_id: int
        :param role_name: Name of the new role.
        :type role_name: str
        :return: The response object.
        """
        endpoint = f"{self.base_path}/{self.id}/roles/groups/{group_id}"
        data = {"name": role_name}

        return self._patch(endpoint, json=data)

    def remove_group_role(self, group_id):
        """Remove a group role.

        :param group_id: The ID of the group to remove.
        :type group_id: int
        :return: The response object.
        """
        endpoint = f"{self.base_path}/{self.id}/roles/groups/{group_id}"
        return self._delete(endpoint, json=None)


class TagMixin:
    """Mixin class to add tag management functionality."""

    def get_tags(self):
        """Get tags.

        :return: A list of all tags.
        :type: list
        """
        return self.meta["tags"]

    def check_tag(self, tag):
        """Check if a certain tag is already present.

        :param tag: The tag to check.
        :type tag: str
        :return: ``True`` if tag already exists, otherwise ``False``.
        :rtype: bool
        """
        return tag.lower() in self.get_tags()

    def add_tag(self, tag):
        """Add a tag.

        :param tag: The tag to add.
        :type tag: str
        :return: The response object.
        """
        endpoint = f"{self.base_path}/{self.id}"
        tags = self.get_tags()

        return self._patch(endpoint, json={"tags": tags + [tag.lower()]})

    def remove_tag(self, tag):
        """Remove a tag.

        :param tag: The tag to remove.
        :type tag: str
        :return: The response object.
        """
        endpoint = f"{self.base_path}/{self.id}"

        tag = tag.lower()
        tags = [t for t in self.get_tags() if t != tag]

        return self._patch(endpoint, json={"tags": tags})


class ExportMixin:
    """Mixin class to add resource export functionality."""

    def export(self, path, export_type="json", pipe=False, **params):
        r"""Export a resource using a specific export type.

        :param path: The path (including name of the file) to store the exported data.
        :type path: str
        :param export_type: The export format.
        :type export_type: str
        :param pipe: If ``True``, nothing is written here.
        :type pipe: bool
        :param \**params: Additional query parameters.
        :return: The response object.
        """
        if isinstance(params.get("filter"), dict):
            params["filter"] = json.dumps(params["filter"])

        response = self._get(
            f"{self.base_path}/{self.id}/export/{export_type}",
            params=params,
            stream=True,
        )

        if not pipe and response.status_code == 200:
            chunked_response(path, response)

        return response


class ResourceMeta(RequestMixin, VerboseMixin):
    """Mixing class to add resource metadata handling functionality."""

    def __init__(self, manager):
        super().__init__(manager)
        self._meta = None
        self._last_update = None

    @property
    def meta(self):
        """Get all metadata of the resource.

        In case the previous metadata was invalidated, either manually, after a timeout
        or due to another request, a request will be sent to retrieve the possibly
        updated metadata again.

        :return: The metadata of the resource.
        :raises KadiAPYRequestError: If requesting the metadata was not successful.
        """

        if self._last_update is not None:
            # Invalidate the cached metadata automatically after 5 minutes.
            if (_utcnow() - self._last_update) > timedelta(minutes=5):
                self._meta = None

        if self._meta is None:
            response = self._get(f"{self.base_path}/{self.id}")
            if response.status_code != 200:
                raise KadiAPYRequestError(response.json())

            self._meta = response.json()
            self._last_update = _utcnow()

        return self._meta