diff --git a/olot/oci/oci_common.py b/olot/oci/oci_common.py index dbbcc1f..6a624a9 100644 --- a/olot/oci/oci_common.py +++ b/olot/oci/oci_common.py @@ -1,26 +1,32 @@ from typing import Annotated, List from pydantic import AnyUrl, Field -from pathlib import Path -import os -from olot.utils.files import tarball_from_file, targz_from_file MediaType = Annotated[str, Field( ..., pattern=r'^[A-Za-z0-9][A-Za-z0-9!#$&^_.+-]{0,126}/[A-Za-z0-9][A-Za-z0-9!#$&^_.+-]{0,126}$' )] -class MediaTypes: - """Constant values from OCI Image Manifest spec - See also: https://github.com/opencontainers/image-spec/blob/main/media-types.md - """ - manifest: MediaType = "application/vnd.oci.image.manifest.v1+json" - index: MediaType = "application/vnd.oci.image.index.v1+json" - layer: MediaType = "application/vnd.oci.image.layer.v1.tar" - layer_gzip: MediaType = "application/vnd.oci.image.layer.v1.tar+gzip" +class Keys: + image_title_annotation = "org.opencontainers.image.title" + image_created_annotation = "org.opencontainers.image.created" + +class Values: + empty_digest = "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a" + empty_data = "e30=" +class MediaTypes: + """Constant values from OCI Image Manifest spec + + See also: https://github.com/opencontainers/image-spec/blob/main/media-types.md + """ + manifest: MediaType = "application/vnd.oci.image.manifest.v1+json" + index: MediaType = "application/vnd.oci.image.index.v1+json" + layer: MediaType = "application/vnd.oci.image.layer.v1.tar" + layer_gzip: MediaType = "application/vnd.oci.image.layer.v1.tar+gzip" + empty: MediaType = "application/vnd.oci.empty.v1+json" Digest = Annotated[str, Field( ..., @@ -34,26 +40,3 @@ class MediaTypes: )] -def create_blobs(source_dir: Path, oci_dir: Path): - if not source_dir.exists(): - raise ValueError(f"Input directory '{source_dir}' does not exist.") - - sha256_path = oci_dir / "blobs" / "sha256" - os.makedirs(sha256_path, exist_ok=True) - - layers = {} # layer digest : diff_id - - # assume flat structure for source_dir for now - # TODO: handle subdirectories appropriately - model_files = [source_dir / Path(f) for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f))] - - for model_file in model_files: - - # handle model card file if encountered - assume README.md is the modelcard - if os.path.basename(os.path.normpath(model_file)).endswith("README.md"): - postcomp_chksum, precomp_chksum = targz_from_file(model_file, sha256_path) - layers[postcomp_chksum] = precomp_chksum - else: - checksum = tarball_from_file(model_file, sha256_path) - layers[checksum] = checksum - return layers \ No newline at end of file diff --git a/olot/oci/oci_image_manifest.py b/olot/oci/oci_image_manifest.py index fc25f27..525009d 100644 --- a/olot/oci/oci_image_manifest.py +++ b/olot/oci/oci_image_manifest.py @@ -5,11 +5,15 @@ from __future__ import annotations from typing import Annotated, List, Optional +import os +import subprocess +from pathlib import Path from pydantic import BaseModel, Field -from olot.oci.oci_common import MediaType, Digest, Urls +from olot.oci.oci_common import Digest, Urls, Keys, Values, MediaTypes, MediaType from olot.utils.types import Int64, Base64, Annotations +from olot.utils.files import MIMETypes # class MediaType(BaseModel): # __root__: constr( @@ -126,3 +130,75 @@ class OCIImageManifest(BaseModel): subject: Optional[ContentDescriptor] = None layers: List[ContentDescriptor] = Field(..., min_length=1) annotations: Optional[Annotations] = None + +def empty_config() -> ContentDescriptor: + return ContentDescriptor( + mediaType=MediaTypes.empty, + size=2, + digest=Values.empty_digest, + data=Values.empty_data, + urls=None, + artifactType=None, + ) + +def create_oci_image_manifest( + schemaVersion: int = 2, + mediaType: Optional[str] = MediaTypes.manifest, + artifactType: Optional[str] = None, + config: ContentDescriptor = empty_config(), + subject: Optional[ContentDescriptor] = None, + layers: List[ContentDescriptor] = [], + annotations: Optional[Annotations] = None, +) -> OCIImageManifest: + return OCIImageManifest( + schemaVersion=schemaVersion, + mediaType=mediaType, + artifactType=artifactType, + config=config, + subject=subject, + layers=layers, + annotations=annotations, + ) + +def get_file_media_type(file_path: os.PathLike) -> str: + """ + Get the MIME type of a file using the `file` command. + """ + try: + result = subprocess.run(['file', '--mime-type', '-b', file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) + mime_type = result.stdout.decode('utf-8').strip() + return mime_type + except subprocess.CalledProcessError as e: + print(f"Error occurred while getting MIME type: {e}") + return MIMETypes.octet_stream + except Exception as e: + print(f"Unexpected error: {e}") + return MIMETypes.octet_stream + + +def create_manifest_layers(files: List[Path], blob_layers: dict) -> List[ContentDescriptor]: + """ + Create a list of ContentDescriptor objects representing the layers of an OCI image manifest. + + Args: + files (List[os.PathLike]): A list of file paths to be used as layers in the manifest. + Returns: + List[ContentDescriptor]: A list of ContentDescriptor objects representing the layers of the manifest + """ + layers: List[ContentDescriptor] = [] + for file in files: + precomp, postcomp = blob_layers[os.path.basename(file)] + file_digest = postcomp if postcomp != "" else precomp + layer = ContentDescriptor( + mediaType=get_file_media_type(file), + size=os.stat(file).st_size, + digest=f"sha256:{file_digest}", + urls=None, + data=None, + artifactType=None, + annotations= { + Keys.image_title_annotation: os.path.basename(file) + } + ) + layers.append(layer) + return layers \ No newline at end of file diff --git a/olot/oci_artifact.py b/olot/oci_artifact.py new file mode 100644 index 0000000..9150679 --- /dev/null +++ b/olot/oci_artifact.py @@ -0,0 +1,75 @@ +from pathlib import Path +import os +import datetime +import json +import argparse +from typing import List + +from olot.oci.oci_image_manifest import create_oci_image_manifest, create_manifest_layers +from olot.oci.oci_common import Keys +from olot.utils.files import MIMETypes, tarball_from_file, targz_from_file +from olot.utils.types import compute_hash_of_str + +def create_oci_artifact_from_model(source_dir: Path, dest_dir: Path): + if not source_dir.exists(): + raise NotADirectoryError(f"Input directory '{source_dir}' does not exist.") + + if dest_dir is None: + dest_dir = source_dir / "oci" + os.makedirs(dest_dir, exist_ok=True) + + sha256_path = dest_dir / "blobs" / "sha256" + os.makedirs(sha256_path, exist_ok=True) + + # assume flat structure for source_dir for now + # TODO: handle subdirectories appropriately + model_files = [source_dir / Path(f) for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f))] + + # Populate blobs directory + layers = create_blobs(model_files, dest_dir) + + # Create the OCI image manifest + manifest_layers = create_manifest_layers(model_files, layers) + annotations = { + Keys.image_created_annotation: datetime.datetime.now().isoformat() + } + artifactType = MIMETypes.mlmodel + manifest = create_oci_image_manifest( + artifactType=artifactType, + layers=manifest_layers, + annotations=annotations + ) + manifest_json = json.dumps(manifest.dict(), indent=4, sort_keys=True) + manifest_SHA = compute_hash_of_str(manifest_json) + with open(sha256_path / manifest_SHA, "w") as f: + f.write(manifest_json) + + +def create_blobs(model_files: List[Path], dest_dir: Path): + """ + Create the blobs directory for an OCI artifact. + """ + layers = {} # layer digest : (precomp, postcomp) + sha256_path = dest_dir / "blobs" / "sha256" + + for model_file in model_files: + file_name = os.path.basename(os.path.normpath(model_file)) + # handle model card file if encountered - assume README.md is the modelcard + if file_name.endswith("README.md"): + postcomp_chksum, precomp_chksum = targz_from_file(model_file, sha256_path) + layers[file_name] = (precomp_chksum, postcomp_chksum) + else: + checksum = tarball_from_file(model_file, sha256_path) + layers[file_name] = (checksum, "") + return layers + +# create a main function to test the function +def main(): + parser = argparse.ArgumentParser(description="Create OCI artifact from model") + parser.add_argument('source_dir', type=str, help='Path to the source directory') + args = parser.parse_args() + + source_dir = Path(args.source_dir) + create_oci_artifact_from_model(source_dir, None) + + diff --git a/olot/utils/files.py b/olot/utils/files.py index 127fb89..ac40439 100644 --- a/olot/utils/files.py +++ b/olot/utils/files.py @@ -19,6 +19,9 @@ def tell(self): def close(self): self.base_writer.close() +class MIMETypes: + mlmodel = "application/x-mlmodel" + octet_stream = "application/octet-stream" def get_file_hash(path) -> str: h = hashlib.sha256() diff --git a/tests/oci/oci_common_test.py b/tests/oci/oci_artifact_test.py similarity index 58% rename from tests/oci/oci_common_test.py rename to tests/oci/oci_artifact_test.py index 5f841d4..d6e94c9 100644 --- a/tests/oci/oci_common_test.py +++ b/tests/oci/oci_artifact_test.py @@ -1,22 +1,26 @@ from tests.common import sample_model_path, file_checksums_with_compression, file_checksums_without_compression -from olot.oci.oci_common import create_blobs +from olot.oci_artifact import create_blobs +import os +from pathlib import Path def test_create_blobs(tmp_path): source_dir = sample_model_path() dest_dir = tmp_path - layers = create_blobs(source_dir, dest_dir) + model_files = [source_dir / Path(f) for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f))] + + layers = create_blobs(model_files, dest_dir) expected_layers = {} result = file_checksums_with_compression(source_dir / "README.md", dest_dir) - expected_layers[result[0]] = result[1] + expected_layers["README.md"] = result[1] result = file_checksums_without_compression(source_dir / "hello.md", dest_dir) - expected_layers[result] = result + expected_layers["hello.md"] = result result = file_checksums_without_compression(source_dir / "model.joblib", dest_dir) - expected_layers[result] = result + expected_layers["model.joblib"] = result assert sorted(layers) == sorted(expected_layers) \ No newline at end of file