# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kadi_apy.lib.commons import RequestMixin
from kadi_apy.lib.commons import VerboseMixin
from kadi_apy.lib.imports import import_eln
from kadi_apy.lib.imports import import_json_schema
from kadi_apy.lib.imports import import_shacl
from kadi_apy.lib.utils import _get_record_identifiers
from kadi_apy.lib.utils import append_identifier_suffix
from kadi_apy.lib.utils import error
from kadi_apy.lib.utils import get_resource_type
from kadi_apy.lib.utils import paginate
[docs]
class Miscellaneous(RequestMixin, VerboseMixin):
"""Model to handle miscellaneous functionality.
:param manager: Manager to use for all API requests.
:type manager: KadiManager
"""
[docs]
def get_deleted_resources(self, **params):
r"""Get a list of deleted resources in the trash. Supports pagination.
:param \**params: Additional query parameters.
:return: The response object.
"""
endpoint = "/trash"
return self._get(endpoint, params=params)
[docs]
def restore(self, item, item_id):
"""Restore an item from the trash.
:param item: The resource type defined either as string or class.
:param item_id: The ID of the item to restore.
:type item_id: int
:return: The response object.
"""
if isinstance(item, str):
item = get_resource_type(item)
endpoint = f"{item.base_path}/{item_id}/restore"
return self._post(endpoint)
[docs]
def purge(self, item, item_id):
"""Purge an item from the trash.
:param item: The resource type defined either as string or class.
:param item_id: The ID of the item to restore.
:type item_id: int
:return: The response object.
"""
if isinstance(item, str):
item = get_resource_type(item)
endpoint = f"{item.base_path}/{item_id}/purge"
return self._post(endpoint)
[docs]
def get_licenses(self, **params):
r"""Get a list of available licenses. Supports pagination.
:param \**params: Additional query parameters.
:return: The response object.
"""
endpoint = "/licenses"
return self._get(endpoint, params=params)
[docs]
def get_kadi_info(self):
"""Get information about the Kadi instance.
:return: The response object.
"""
endpoint = "/info"
return self._get(endpoint)
[docs]
def get_roles(self):
"""Get all possible roles and corresponding permissions of all resources.
:return: The response object.
"""
endpoint = "/roles"
return self._get(endpoint)
[docs]
def import_eln(self, file_path):
"""Import an RO-Crate file following the "ELN" file specification.
:param file_path: The path of the file.
:type file_path: str
:raises KadiAPYInputError: If the structure of the RO-Crate is not valid.
:raises KadiAPYRequestError: If any request was not successful while importing
the data and metadata.
"""
import_eln(self.manager, file_path)
[docs]
def import_json_schema(self, file_path, template_type="extras"):
"""Import JSON Schema file and create a template.
Note that only JSON Schema draft 2020-12 is fully supported, but older schemas
might still work.
:param file_path: The path of the file.
:type file_path: str
:param template_type: Type of the template. Can either be ``"record"`` or
``"extras"``.
:type template_type: str
:raises KadiAPYInputError: If the structure of the Schema is not valid.
:raises KadiAPYRequestError: If any request was not successful while importing
the metadata.
"""
import_json_schema(self.manager, file_path, template_type)
[docs]
def import_shacl(self, file_path, template_type="extras"):
"""Import SHACL Shapes file and create a template.
:param file_path: The path of the file.
:type file_path: str
:param template_type: Type of the template. Can either be ``"record"`` or
``"extras"``.
:type template_type: str
:raises KadiAPYInputError: If the structure of the Shapes is not valid.
:raises KadiAPYRequestError: If any request was not successful while importing
the metadata.
"""
import_shacl(self.manager, file_path, template_type)
[docs]
def reuse_collection(self, collection_id, suffix, linked_record_id=None):
"""Reuse the collection with the given ID.
Creates a new collection containing two types of records. 1. Modified records:
Modified Copies of existing records with new identifiers using a given suffix.
2. Linked records: Unchanged records reused for maintaining link integrity.
It ensures that all record links are preserved within the modified collection.
Note that currently only the metadata of records is copied.
:param collection_id: The persistent ID of the collection to be reused.
:param suffix: A string to append to each record and collection identifier.
:parm linked_record_id: A string containing IDs of records to be linked
to the modified collection. If multiple, they should be
comma-separated (e.g., "123,456").
Defaults to None if no records are to be linked.
:raises KadiAPYRequestError: If any request was not successful during the
duplication process.
"""
collection = self.manager.collection(id=collection_id)
new_collection_identifier = append_identifier_suffix(
collection.meta["identifier"], suffix
)
new_collection = self.manager.collection(
title=collection.meta["title"],
identifier=new_collection_identifier,
description=collection.meta["description"],
tags=collection.meta["tags"],
visibility=collection.meta["visibility"],
create=True,
)
id_to_identifier = _get_record_identifiers(collection)
record_identifiers = list(id_to_identifier.values())
if linked_record_id:
linked_record_id = [
int(i.strip()) for i in str(linked_record_id).split(",")
]
else:
linked_record_id = []
linked_record_map = {}
for record_id in linked_record_id:
if record_id not in id_to_identifier:
error(
f"Record ID {record_id} not found in the collection."
f"Availible IDs: {list(id_to_identifier.keys())}"
)
identifier = id_to_identifier[record_id]
linked_record_map[identifier] = identifier
_copy_records(
self.manager, new_collection, record_identifiers, suffix, linked_record_map
)
# Link the new collection as a child to the existing collection.
collection.add_collection_link(collection_id=new_collection.id)
def _copy_records(
manager,
collection,
record_identifiers,
suffix,
linked_record_map=None,
extras_value=False,
):
def clean_meta(meta):
meta = meta.copy()
# Handle dict type
if meta.get("type") == "dict" and isinstance(meta.get("value"), list):
meta["value"] = [clean_meta(m) for m in meta["value"]]
# Handle list type and dict inside of list
elif meta.get("type") == "list" and isinstance(meta.get("value"), list):
new_list = []
for item in meta["value"]:
if isinstance(item, dict) and "type" in item and "value" in item:
new_list.append(clean_meta(item))
else:
new_list.append(item)
meta["value"] = new_list
else:
# handle other datatypes and drops value if not required
is_required = meta.get("validation", {}).get("required", False)
if not is_required:
meta.pop("value", None)
return meta
def clean_extras(original_extras):
return [clean_meta(meta) for meta in original_extras]
linked_record_map = linked_record_map or {}
records = {}
for identifier in record_identifiers:
record = manager.record(identifier=identifier)
if identifier in linked_record_map:
# Reuse the same record without modification.
reused_record = record
records[identifier] = {
"old_record": record,
"new_record": reused_record,
}
collection.add_record_link(record_id=reused_record.id)
else:
new_identifier = append_identifier_suffix(identifier, suffix)
new_record = manager.record(
title=record.meta["title"],
identifier=new_identifier,
type=record.meta["type"],
description=record.meta["description"],
extras=(
record.meta["extras"]
if extras_value
else clean_extras(record.meta["extras"])
),
tags=record.meta["tags"],
visibility=record.meta["visibility"],
license=record.meta["license"],
create=True,
)
records[identifier] = {
"old_record": record,
"new_record": new_record,
}
# Add the new record to the new collection.
collection.add_record_link(record_id=new_record.id)
# Process links only after all records are created.
for record_data in records.values():
old_record = record_data["old_record"]
new_record = record_data["new_record"]
def _process_record_links(page, old_record=old_record, new_record=new_record):
response = old_record.get_record_links(
direction="out", per_page=100, page=page
).json()
for link in response["items"]:
record_to_identifier = link["record_to"]["identifier"]
if record_to_identifier in records:
new_record_to = records[record_to_identifier]["new_record"]
new_record.link_record(
record_to=new_record_to.id, name=link["name"]
)
return response
paginate(_process_record_links)