Source code for ts_benchmark.utils.compress

# -*- coding: utf-8 -*-

from __future__ import absolute_import

import gzip
import tarfile
from io import BytesIO
from typing import Dict, Optional


[docs] def compress_gz(data: Dict[str, str]) -> bytes: """ Compress in gz format """ outbuf = BytesIO() with tarfile.open(fileobj=outbuf, mode="w:gz") as tar: for k, v in data.items(): info = tarfile.TarInfo(name=k) v_bytes = v.encode("utf8") info.size = len(v_bytes) tar.addfile(info, fileobj=BytesIO(v_bytes)) return outbuf.getvalue()
[docs] def compress_gzip(data: Dict[str, str]) -> bytes: """ Compress data using Gzip compression. """ outbuf = BytesIO() with gzip.GzipFile(fileobj=outbuf, mode="wb") as gz: for k, v in data.items(): v_bytes = v.encode("utf8") gz.write(v_bytes) return outbuf.getvalue()
[docs] def decompress_gzip(compressed_data: bytes) -> Dict[str, str]: """ Decompress Gzip-compressed data and return the original dictionary. """ decompressed_data = {} compressed_buf = BytesIO(compressed_data) with gzip.GzipFile(fileobj=compressed_buf, mode="rb") as gz: while True: chunk = gz.read(1024) # Read a chunk of decompressed data (adjust chunk size if needed) if not chunk: break # No more data to read chunk_str = chunk.decode("utf8") key_values = chunk_str.split("\n") for key_value in key_values: if key_value: key, value = key_value.split(":") decompressed_data[key] = value return decompressed_data
[docs] def decompress_gz(data: bytes) -> Dict[str, str]: ret = {} with tarfile.open(fileobj=BytesIO(data), mode="r:gz") as tar: for member in tar.getmembers(): if member.isfile(): ret[member.name] = tar.extractfile(member).read().decode("utf8") return ret
[docs] def compress(data: Dict[str, str], method: str = "gz") -> bytes: if method != "gz": compress_gzip(data) # raise NotImplementedError("Only 'gz' method is supported by now") return compress_gz(data)
[docs] def decompress(data: bytes, method: str = "gz") -> Dict[str, str]: if method != "gz": decompress_gzip(data) # raise NotImplementedError("Only 'gz' method is supported by now") return decompress_gz(data)
[docs] def get_compress_file_ext(method: str) -> str: if method != "gz": return "gzip" # raise NotImplementedError("Only 'gz' method is supported by now") return "tar.gz"
[docs] def get_compress_method_from_ext(ext: str) -> Optional[str]: return { "tar.gz": "gz" }.get(ext)