mirror of
https://github.com/zeldaret/oot.git
synced 2025-01-15 04:36:59 +00:00
208 lines
8.1 KiB
Python
Executable file
208 lines
8.1 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import signal
|
|
import time
|
|
import multiprocessing
|
|
from pathlib import Path
|
|
|
|
import version_config
|
|
|
|
|
|
def SignalHandler(sig, frame):
|
|
print(f'Signal {sig} received. Aborting...')
|
|
mainAbort.set()
|
|
# Don't exit immediately to update the extracted assets file.
|
|
|
|
def ExtractFile(assetConfig: version_config.AssetConfig, outputPath: Path, outputSourcePath: Path):
|
|
name = assetConfig.name
|
|
xmlPath = assetConfig.xml_path
|
|
version = globalVersionConfig.version
|
|
if globalAbort.is_set():
|
|
# Don't extract if another file wasn't extracted properly.
|
|
return
|
|
|
|
zapdPath = Path("tools") / "ZAPD" / "ZAPD.out"
|
|
configPath = Path("tools") / "ZAPDConfigs" / version / "Config.xml"
|
|
|
|
outputPath.mkdir(parents=True, exist_ok=True)
|
|
outputSourcePath.mkdir(parents=True, exist_ok=True)
|
|
|
|
execStr = f"{zapdPath} e -eh -i {xmlPath} -b {globalBaseromSegmentsDir} -o {outputPath} -osf {outputSourcePath} -gsf 1 -rconf {configPath} --cs-float both {ZAPDArgs}"
|
|
|
|
if name.startswith("code/") or name.startswith("n64dd/") or name.startswith("overlays/"):
|
|
assert assetConfig.start_offset is not None
|
|
assert assetConfig.end_offset is not None
|
|
|
|
execStr += f" --start-offset 0x{assetConfig.start_offset:X}"
|
|
execStr += f" --end-offset 0x{assetConfig.end_offset:X}"
|
|
|
|
if name.startswith("overlays/"):
|
|
overlayName = name.split("/")[1]
|
|
baseAddress = globalVersionConfig.dmadata_segments[overlayName].vram + assetConfig.start_offset
|
|
|
|
execStr += f" --base-address 0x{baseAddress:X}"
|
|
execStr += " --static"
|
|
|
|
if globalUnaccounted:
|
|
execStr += " -Wunaccounted"
|
|
|
|
print(execStr)
|
|
exitValue = os.system(execStr)
|
|
if exitValue != 0:
|
|
globalAbort.set()
|
|
print("\n")
|
|
print(f"Error when extracting from file {xmlPath}", file=os.sys.stderr)
|
|
print("Aborting...", file=os.sys.stderr)
|
|
print("\n")
|
|
|
|
def ExtractFunc(assetConfig: version_config.AssetConfig):
|
|
objectName = assetConfig.name
|
|
xml_path = assetConfig.xml_path
|
|
xml_path_str = str(xml_path)
|
|
|
|
outPath = globalOutputDir / objectName
|
|
outSourcePath = outPath
|
|
|
|
if xml_path_str in globalExtractedAssetsTracker:
|
|
timestamp = globalExtractedAssetsTracker[xml_path_str]["timestamp"]
|
|
modificationTime = int(os.path.getmtime(xml_path))
|
|
if modificationTime < timestamp:
|
|
# XML has not been modified since last extraction.
|
|
return
|
|
|
|
currentTimeStamp = int(time.time())
|
|
|
|
ExtractFile(assetConfig, outPath, outSourcePath)
|
|
|
|
if not globalAbort.is_set():
|
|
# Only update timestamp on successful extractions
|
|
if xml_path_str not in globalExtractedAssetsTracker:
|
|
globalExtractedAssetsTracker[xml_path_str] = globalManager.dict()
|
|
globalExtractedAssetsTracker[xml_path_str]["timestamp"] = currentTimeStamp
|
|
|
|
def initializeWorker(versionConfig: version_config.VersionConfig, abort, unaccounted: bool, extractedAssetsTracker: dict, manager, baseromSegmentsDir: Path, outputDir: Path):
|
|
global globalVersionConfig
|
|
global globalAbort
|
|
global globalUnaccounted
|
|
global globalExtractedAssetsTracker
|
|
global globalManager
|
|
global globalBaseromSegmentsDir
|
|
global globalOutputDir
|
|
globalVersionConfig = versionConfig
|
|
globalAbort = abort
|
|
globalUnaccounted = unaccounted
|
|
globalExtractedAssetsTracker = extractedAssetsTracker
|
|
globalManager = manager
|
|
globalBaseromSegmentsDir = baseromSegmentsDir
|
|
globalOutputDir = outputDir
|
|
|
|
def processZAPDArgs(argsZ):
|
|
badZAPDArg = False
|
|
for z in argsZ:
|
|
if z[0] == '-':
|
|
print(f'error: argument "{z}" starts with "-", which is not supported.', file=os.sys.stderr)
|
|
badZAPDArg = True
|
|
|
|
if badZAPDArg:
|
|
exit(1)
|
|
|
|
ZAPDArgs = " ".join(f"-{z}" for z in argsZ)
|
|
print("Using extra ZAPD arguments: " + ZAPDArgs)
|
|
return ZAPDArgs
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="baserom asset extractor")
|
|
parser.add_argument(
|
|
"baserom_segments_dir",
|
|
type=Path,
|
|
help="Directory of uncompressed ROM segments",
|
|
)
|
|
parser.add_argument(
|
|
"output_dir",
|
|
type=Path,
|
|
help="Output directory to place files in",
|
|
)
|
|
parser.add_argument("-v", "--version", dest="oot_version", help="OOT game version", default="gc-eu-mq-dbg")
|
|
parser.add_argument("-s", "--single", help="Extract a single asset by name, e.g. objects/gameplay_keep")
|
|
parser.add_argument("-f", "--force", help="Force the extraction of every xml instead of checking the touched ones (overwriting current files).", action="store_true")
|
|
parser.add_argument("-j", "--jobs", help="Number of cpu cores to extract with.")
|
|
parser.add_argument("-u", "--unaccounted", help="Enables ZAPD unaccounted detector warning system.", action="store_true")
|
|
parser.add_argument("-Z", help="Pass the argument on to ZAPD, e.g. `-ZWunaccounted` to warn about unaccounted blocks in XMLs. Each argument should be passed separately, *without* the leading dash.", metavar="ZAPD_ARG", action="append")
|
|
args = parser.parse_args()
|
|
|
|
baseromSegmentsDir: Path = args.baserom_segments_dir
|
|
version: str = args.oot_version
|
|
outputDir: Path = args.output_dir
|
|
|
|
args.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
versionConfig = version_config.load_version_config(version)
|
|
|
|
global ZAPDArgs
|
|
ZAPDArgs = processZAPDArgs(args.Z) if args.Z else ""
|
|
|
|
global mainAbort
|
|
mainAbort = multiprocessing.Event()
|
|
manager = multiprocessing.Manager()
|
|
signal.signal(signal.SIGINT, SignalHandler)
|
|
|
|
extraction_times_p = outputDir / "assets_extraction_times.json"
|
|
extractedAssetsTracker = manager.dict()
|
|
if extraction_times_p.exists() and not args.force:
|
|
with extraction_times_p.open(encoding='utf-8') as f:
|
|
extractedAssetsTracker.update(json.load(f, object_hook=manager.dict))
|
|
|
|
singleAssetName = args.single
|
|
if singleAssetName is not None:
|
|
assetConfig = None
|
|
for asset in versionConfig.assets:
|
|
if asset.name == singleAssetName:
|
|
assetConfig = asset
|
|
break
|
|
else:
|
|
print(f"Error. Asset {singleAssetName} not found in config.", file=os.sys.stderr)
|
|
exit(1)
|
|
|
|
initializeWorker(versionConfig, mainAbort, args.unaccounted, extractedAssetsTracker, manager, baseromSegmentsDir, outputDir)
|
|
# Always extract if -s is used.
|
|
xml_path_str = str(assetConfig.xml_path)
|
|
if xml_path_str in extractedAssetsTracker:
|
|
del extractedAssetsTracker[xml_path_str]
|
|
ExtractFunc(assetConfig)
|
|
else:
|
|
class CannotMultiprocessError(Exception):
|
|
pass
|
|
|
|
try:
|
|
numCores = int(args.jobs or 0)
|
|
if numCores <= 0:
|
|
numCores = 1
|
|
print("Extracting assets with " + str(numCores) + " CPU core" + ("s" if numCores > 1 else "") + ".")
|
|
try:
|
|
mp_context = multiprocessing.get_context("fork")
|
|
except ValueError as e:
|
|
raise CannotMultiprocessError() from e
|
|
with mp_context.Pool(numCores, initializer=initializeWorker, initargs=(versionConfig, mainAbort, args.unaccounted, extractedAssetsTracker, manager, baseromSegmentsDir, outputDir)) as p:
|
|
p.map(ExtractFunc, versionConfig.assets)
|
|
except (multiprocessing.ProcessError, TypeError, CannotMultiprocessError):
|
|
print("Warning: Multiprocessing exception occurred.", file=os.sys.stderr)
|
|
print("Disabling mutliprocessing.", file=os.sys.stderr)
|
|
|
|
initializeWorker(versionConfig, mainAbort, args.unaccounted, extractedAssetsTracker, manager, baseromSegmentsDir, outputDir)
|
|
for assetConfig in versionConfig.assets:
|
|
ExtractFunc(assetConfig)
|
|
|
|
with extraction_times_p.open('w', encoding='utf-8') as f:
|
|
serializableDict = dict()
|
|
for xml, data in extractedAssetsTracker.items():
|
|
serializableDict[xml] = dict(data)
|
|
json.dump(dict(serializableDict), f, ensure_ascii=False, indent=4)
|
|
|
|
if mainAbort.is_set():
|
|
exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|