1
0
Fork 0
mirror of https://github.com/zeldaret/oot.git synced 2024-12-01 15:26:01 +00:00
oot/tools/compress.py

343 lines
10 KiB
Python
Raw Normal View History

Setup rom decompression and compression (#1614) * decompress baserom * cleanup * specific hash check * rename baserom * git subrepo clone (merge) --branch=5da3132606e4fd427a8d72b8428e4f921cd6e56f git@github.com:z64tools/z64compress.git tools/z64compress subrepo: subdir: "tools/z64compress" merged: "5da313260" upstream: origin: "git@github.com:z64tools/z64compress.git" branch: "5da3132606e4fd427a8d72b8428e4f921cd6e56f" commit: "5da313260" git-subrepo: version: "0.4.3" origin: "https://github.com/ingydotnet/git-subrepo.git" commit: "2f68596" * setup compression * Add all compressed segments to the spec * Update md5 files * readme instructions * cleanup * Setup python dependencies on Jenkinsfile * Update Makefile Co-authored-by: cadmic <cadmic24@gmail.com> * review * . .venv/bin/activate * update readme * whoops * Yeet other versions from decompress_baserom.py * my bad * Move everything to baseroms/VERSION/ * Active venv on each command * jenkinsfile: use multiline syntax * Put the correct path on the jenkinsfile * Forgot to call per_version_fixes * CC0 * Update readme * Change where baserom segments are put into * Update Makefile Co-authored-by: Dragorn421 <Dragorn421@users.noreply.github.com> * Update crunch64 * Label compressed instead of uncompressed * Update README.md Co-authored-by: fig02 <fig02srl@gmail.com> * Fix * `make rom` * baserom-uncompressed * Update README.md Co-authored-by: Dragorn421 <Dragorn421@users.noreply.github.com> * Update README.md Co-authored-by: Dragorn421 <Dragorn421@users.noreply.github.com> * Update README.md Co-authored-by: Dragorn421 <Dragorn421@users.noreply.github.com> * Update README.md Co-authored-by: Dragorn421 <Dragorn421@users.noreply.github.com> * Update README.md Co-authored-by: Dragorn421 <Dragorn421@users.noreply.github.com> * review * baserom-decompressed.z64 * ignore baseroms * rm -rf tools/z64compress * wip crunch64-based compress.py * OK compress * use ipl3checksum sum directly for cic update on compressed rom * multithreading * "docs" * fix MT: move set_sigint_ignored to global level for pickling * license compress.py * rm junk * Fix (or at least sort out) compress_ranges.txt dependencies * Update tools/overlayhelpers/damage_table.py Co-authored-by: Dragorn421 <Dragorn421@users.noreply.github.com> --------- Co-authored-by: cadmic <cadmic24@gmail.com> Co-authored-by: Dragorn421 <Dragorn421@users.noreply.github.com> Co-authored-by: fig02 <fig02srl@gmail.com>
2024-01-24 18:00:10 +00:00
# SPDX-FileCopyrightText: 2024 zeldaret
# SPDX-License-Identifier: CC0-1.0
from __future__ import annotations
import argparse
from pathlib import Path
import dataclasses
import struct
import time
import multiprocessing
import multiprocessing.pool
import crunch64
STRUCT_IIII = struct.Struct(">IIII")
@dataclasses.dataclass
class DmaEntry:
"""
A Python counterpart to the dmadata entry struct:
```c
typedef struct {
/* 0x00 */ uintptr_t vromStart;
/* 0x04 */ uintptr_t vromEnd;
/* 0x08 */ uintptr_t romStart;
/* 0x0C */ uintptr_t romEnd;
} DmaEntry;
```
"""
vromStart: int
vromEnd: int
romStart: int
romEnd: int
def __repr__(self):
return (
"DmaEntry("
f"vromStart=0x{self.vromStart:08X}, "
f"vromEnd=0x{self.vromEnd:08X}, "
f"romStart=0x{self.romStart:08X}, "
f"romEnd=0x{self.romEnd:08X}"
")"
)
SIZE_BYTES = STRUCT_IIII.size
def to_bin(self, data: memoryview):
STRUCT_IIII.pack_into(
data,
0,
self.vromStart,
self.vromEnd,
self.romStart,
self.romEnd,
)
@staticmethod
def from_bin(data: memoryview):
return DmaEntry(*STRUCT_IIII.unpack_from(data))
DMA_ENTRY_ZERO = DmaEntry(0, 0, 0, 0)
def align(v: int):
v += 0xF
return v // 0x10 * 0x10
@dataclasses.dataclass
class RomSegment:
vromStart: int
vromEnd: int
is_compressed: bool
data: memoryview | None
data_async: multiprocessing.pool.AsyncResult | None
@property
def uncompressed_size(self):
return self.vromEnd - self.vromStart
# Make interrupting the compression with ^C less jank
# https://stackoverflow.com/questions/72967793/keyboardinterrupt-with-python-multiprocessing-pool
def set_sigint_ignored():
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
def compress_rom(
rom_data: memoryview,
dmadata_offset_start: int,
dmadata_offset_end: int,
compress_entries_indices: set[int],
n_threads: int = None,
):
"""
rom_data: the uncompressed rom data
dmadata_offset_start: the offset in the rom where the dmadata starts (inclusive)
dmadata_offset_end: the offset in the rom where the dmadata ends (exclusive)
compress_entries_indices: the indices in the dmadata of the segments that should be compressed
n_threads: how many cores to use for compression
"""
# Segments of the compressed rom (not all are compressed)
compressed_rom_segments: list[RomSegment] = []
with multiprocessing.Pool(n_threads, initializer=set_sigint_ignored) as p:
# Extract each segment from the input rom
for entry_index, dmadata_offset in enumerate(
range(dmadata_offset_start, dmadata_offset_end, DmaEntry.SIZE_BYTES)
):
dma_entry = DmaEntry.from_bin(rom_data[dmadata_offset:])
if dma_entry == DMA_ENTRY_ZERO:
continue
segment_rom_start = dma_entry.romStart
segment_rom_end = dma_entry.romStart + (
dma_entry.vromEnd - dma_entry.vromStart
)
segment_data_uncompressed = rom_data[segment_rom_start:segment_rom_end]
is_compressed = entry_index in compress_entries_indices
if is_compressed:
segment_data = None
segment_data_async = p.apply_async(
crunch64.yaz0.compress,
(bytes(segment_data_uncompressed),),
)
else:
segment_data = segment_data_uncompressed
segment_data_async = None
compressed_rom_segments.append(
RomSegment(
dma_entry.vromStart,
dma_entry.vromEnd,
is_compressed,
segment_data,
segment_data_async,
)
)
# Technically optional but required for matching.
compressed_rom_segments.sort(key=lambda segment: segment.vromStart)
# Wait on compression of all compressed segments
waiting_on_segments = [
segment for segment in compressed_rom_segments if segment.is_compressed
]
total_uncompressed_size_of_data_to_compress = sum(
segment.uncompressed_size for segment in waiting_on_segments
)
uncompressed_size_of_data_compressed_so_far = 0
while waiting_on_segments:
# Show progress
progress = (
uncompressed_size_of_data_compressed_so_far
/ total_uncompressed_size_of_data_to_compress
)
print(f"Compressing... {progress * 100:.1f}%", end="\r")
# The segments for which the compression is not finished yet are
# added to this list
still_waiting_on_segments = []
got_some_results = False
for segment in waiting_on_segments:
assert segment.data is None
assert segment.data_async is not None
try:
compressed_data = segment.data_async.get(0)
except multiprocessing.TimeoutError:
# Compression not finished yet
still_waiting_on_segments.append(segment)
else:
# Compression finished!
assert isinstance(compressed_data, bytes)
segment.data = memoryview(compressed_data)
uncompressed_size_of_data_compressed_so_far += (
segment.uncompressed_size
)
got_some_results = True
segment.data_async = None
if not got_some_results and still_waiting_on_segments:
# Nothing happened this wait iteration, idle a bit
time.sleep(0.010)
waiting_on_segments = still_waiting_on_segments
print("Putting together the compressed rom...")
# Put together the compressed rom
compressed_rom_size = sum(
align(len(segment.data)) for segment in compressed_rom_segments
)
pad_to_multiple_of = 8 * 2**20 # 8 MiB
compressed_rom_size_padded = (
(compressed_rom_size + pad_to_multiple_of - 1)
// pad_to_multiple_of
* pad_to_multiple_of
)
compressed_rom_data = memoryview(bytearray(compressed_rom_size_padded))
compressed_rom_dma_entries: list[DmaEntry] = []
rom_offset = 0
for segment in compressed_rom_segments:
assert segment.data is not None
segment_rom_start = rom_offset
segment_rom_end = align(segment_rom_start + len(segment.data))
i = segment_rom_start + len(segment.data)
assert i <= len(compressed_rom_data)
compressed_rom_data[segment_rom_start:i] = segment.data
compressed_rom_dma_entries.append(
DmaEntry(
segment.vromStart,
segment.vromEnd,
segment_rom_start,
segment_rom_end if segment.is_compressed else 0,
)
)
rom_offset = segment_rom_end
assert rom_offset == compressed_rom_size
# Pad the compressed rom with the pattern matching the baseroms
for i in range(compressed_rom_size, compressed_rom_size_padded):
compressed_rom_data[i] = i % 256
# Write the new dmadata
dmadata_offset = dmadata_offset_start
for dma_entry in compressed_rom_dma_entries:
assert dmadata_offset + DmaEntry.SIZE_BYTES <= dmadata_offset_end
dma_entry.to_bin(compressed_rom_data[dmadata_offset:])
dmadata_offset += DmaEntry.SIZE_BYTES
return compressed_rom_data
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--in",
dest="in_rom",
required=True,
help="path to an uncompressed rom to be compressed",
)
parser.add_argument(
"--out",
dest="out_rom",
required=True,
help="path of the compressed rom to write out",
)
parser.add_argument(
"--dma-range",
dest="dma_range",
required=True,
help=(
"The dmadata location in the rom, in format"
" 'start_inclusive-end_exclusive' and using hexadecimal offsets"
" (e.g. '0x12f70-0x19030')."
),
)
parser.add_argument(
"--compress",
dest="compress_ranges",
required=True,
help=(
"The indices in the dmadata of the entries to be compressed,"
" where 0 is the first entry."
" It is a comma-separated list of individual indices and inclusive ranges."
" e.g. '0-1,3,5,6-9' is all indices from 0 to 9 (included) except 2 and 4."
),
)
parser.add_argument(
"--threads",
dest="n_threads",
type=int,
default=1,
help="how many cores to use for parallel compression",
)
args = parser.parse_args()
in_rom_p = Path(args.in_rom)
if not in_rom_p.exists():
parser.error(f"Input rom file {in_rom_p} doesn't exist.")
out_rom_p = Path(args.out_rom)
dma_range_str: str = args.dma_range
dma_range_ends_str = dma_range_str.split("-")
assert len(dma_range_ends_str) == 2, dma_range_str
dmadata_offset_start, dmadata_offset_end = (
int(v_str, 16) for v_str in dma_range_ends_str
)
assert dmadata_offset_start < dmadata_offset_end, dma_range_str
compress_ranges_str: str = args.compress_ranges
compress_entries_indices = set()
for compress_range_str in compress_ranges_str.split(","):
compress_range_ends_str = compress_range_str.split("-")
assert len(compress_range_ends_str) <= 2, (
compress_range_ends_str,
compress_range_str,
compress_ranges_str,
)
compress_range_ends = [int(v_str) for v_str in compress_range_ends_str]
if len(compress_range_ends) == 1:
compress_entries_indices.add(compress_range_ends[0])
else:
assert len(compress_range_ends) == 2
compress_range_first, compress_range_last = compress_range_ends
compress_entries_indices.update(
range(compress_range_first, compress_range_last + 1)
)
n_threads = args.n_threads
in_rom_data = in_rom_p.read_bytes()
out_rom_data = compress_rom(
memoryview(in_rom_data),
dmadata_offset_start,
dmadata_offset_end,
compress_entries_indices,
n_threads,
)
out_rom_p.write_bytes(out_rom_data)
if __name__ == "__main__":
main()