barotrauma-sunken-tapes/gui/deploy.py

310 lines
12 KiB
Python

import jinja2 as j2
import yaml
import shutil
import requests
import os
import os.path
import sys
import time
import logging
import certifi
import math
import vdf
from PIL import Image
from mutagen.oggvorbis import OggVorbis
from distutils.dir_util import copy_tree
from pathlib import Path
from subprocess import PIPE, STDOUT, CalledProcessError, CompletedProcess, Popen
def rmfulldir(dirpath):
try:
shutil.rmtree(dirpath)
except FileNotFoundError:
pass
def download_via_requests(url_source, file_name):
response = requests.get(url_source, stream=True, verify=certifi.where())
with open(file_name, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
del response
def get_ffmpeg_version():
url_ffmpeg_version = "https://www.gyan.dev/ffmpeg/builds/release-version"
fp = requests.get(url_ffmpeg_version, verify=certifi.where())
ffmpeg_version = fp.text
del fp
return ffmpeg_version
def stream_command(args, *, stdout_handler=logging.info, check=True, text=True,
stdout=PIPE, stderr=STDOUT, **kwargs):
"""Mimic subprocess.run, while processing the command output in real time."""
with Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process:
for line in process.stdout:
stdout_handler(line[:-1])
return_code = process.poll()
if check and return_code:
raise CalledProcessError(return_code, process.args)
return CompletedProcess(process.args, return_code)
class Deployer:
def __init__(self, logging_handler: logging.Handler = None):
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
self.logger = logging.getLogger()
if logging_handler is not None:
self.logger.addHandler(logging_handler)
self.utils_dir = "./utils"
self.build_dir = "./build"
self.music_dir = "./build/music"
self.source_dir = "./source"
self.install_dir = self.find_install_dir()
def find_install_dir(self):
self.logger.info("Trying to find Barotraumma install folder")
library_folders = Path("C:\Program Files (x86)\Steam\steamapps\libraryfolders.vdf")
game_id = "602960"
data = vdf.load(open(library_folders))
install_path = ""
for (id, folder) in data["libraryfolders"].items():
if game_id in folder["apps"]:
install_path = Path(folder["path"]) / Path("steamapps\common\Barotrauma")
self.logger.info(f"Barotrauma is installed in {install_path.as_posix()}")
return install_path.as_posix()
def download_ffmpeg(self, clean=False):
if clean:
rmfulldir(f"{self.utils_dir}/ffmpeg-" + get_ffmpeg_version() + "-full_build")
url_ffmpeg_source = "https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z"
out_ffmpeg_archive = f"{self.utils_dir}/ffmpeg-release-full.7z.exe"
self.download_and_extract(url_ffmpeg_source, out_ffmpeg_archive)
def download_git(self, clean=False):
if clean:
rmfulldir(f"{self.utils_dir}/git")
url_git_source = ("https://github.com/git-for-windows/git/releases/download/"
"v2.46.2.windows.1/PortableGit-2.46.2-64-bit.7z.exe")
out_git_archive = f"{self.utils_dir}/PortableGit-2.46.2-64-bit.7z.exe"
self.download_and_extract(url_git_source, out_git_archive, subdir="/git")
def download_and_extract(self, url_source, out_archive, subdir=""):
self.logger.info(f"Downloading {url_source}. This may take a while.")
download_via_requests(url_source, out_archive)
time.sleep(0.7)
self.logger.info("Download complete.")
self.logger.info(f"Extracting {out_archive} with 7z")
extract = [f"{self.utils_dir}/7z/7za.exe", "x", out_archive, "-o" f"{self.utils_dir}{subdir}"]
stream_command(extract, stdout_handler=self.logger.info)
time.sleep(0.7)
self.logger.info("Removing " + out_archive)
os.remove(out_archive)
def update(self):
self.logger.info(f"Checking for updates via git pull.")
pull = [f"{self.utils_dir}/git/bin/git.exe", "pull"]
stream_command(pull, stdout_handler=self.logger.info)
def fetch_and_cut_song(self, tape, ffmpeg_version):
python_executable = f"{self.utils_dir}/python-3.9.7-embed-amd64/python.exe"
script = "./fetch_song.py"
if type(tape["source"]) == str:
shutil.copy(tape["source"], f"{self.music_dir}/{tape['identifier']}.ogg")
else:
fetch = [python_executable, script, tape["source"]["url"], "-x", "--audio-format", "vorbis",
"--audio-quality", "0", "-o", f"{self.build_dir}/tmp_music/{tape['identifier']}.ogg"]
# this is done in a separate python script because subprocess.call makes sure that the
# download process is finished before trying to cut the song
stream_command(fetch, stdout_handler=self.logger.info)
time.sleep(0.1)
cut = [f"{self.utils_dir}/ffmpeg-" + ffmpeg_version + "-full_build/bin/ffmpeg.exe",
"-y", "-ss", f"{tape['source']['start']}",
"-i", f"{self.build_dir}/tmp_music/{tape['identifier']}.ogg", "-acodec", "libvorbis",
"-ac", "1", "-af", f"volume={tape['source']['volume']}dB",
f"{self.music_dir}/{tape['identifier']}.ogg"]
if tape["source"]["end"] != -1:
cut = cut[:-7] + ["-to", f"{tape['source']['end']}"] + cut[-7:]
try:
stream_command(cut, stdout_handler=self.logger.info)
except FileNotFoundError:
self.logger.error("ffmpeg not in utils directory.")
return
walkman = [f"{self.utils_dir}/ffmpeg-" + ffmpeg_version + "-full_build/bin/ffmpeg.exe",
"-i", f"{self.music_dir}/{tape['identifier']}.ogg",
"-af", f"highpass=f=300,lowpass=f=12000",
f"{self.music_dir}/{tape['identifier']}-walkman.ogg"]
try:
stream_command(walkman, stdout_handler=self.logger.info)
except FileNotFoundError:
self.logger.error("ffmpeg not in utils directory.")
return
def assemble_png_images(self, tapes, outfile: str, resize=None):
img_names = [f"{self.source_dir}/images/{tape['identifier']}.png" for tape in tapes]
images = [Image.open(x) for x in img_names]
if resize is not None:
for i, (im, tape) in enumerate(zip(images, tapes)):
im.thumbnail((128, 82), Image.ANTIALIAS)
if resize == (64, 41) and tape["icon_resize"] == "blur":
im.thumbnail(resize, Image.ANTIALIAS)
else:
im.thumbnail(resize, Image.NEAREST)
images[i] = im
widths, heights = zip(*(i.size for i in images))
columns = int((len(images)) ** 0.5)
rows = int(math.ceil(len(images) / columns))
total_width = max(widths) * columns
max_height = max(heights) * rows
new_im = Image.new('RGBA', (total_width, max_height))
for i, im in enumerate(images):
x_offset = max(widths) * (i % columns)
y_offset = max(heights) * math.floor(i / columns)
new_im.paste(im, (x_offset, y_offset))
new_im.save(f"{self.build_dir}/{outfile}.png")
def prepare_music(self, data):
try:
os.mkdir(f'{self.music_dir}')
except FileExistsError:
pass
ffmpeg_version = get_ffmpeg_version()
self.logger.info("Downloading and cutting the songs.")
for i, tape in enumerate(data):
if not (os.path.exists(f"{self.music_dir}/{tape['identifier']}.ogg") or os.path.exists(
f"{self.music_dir}/{tape['identifier']}-walkman.ogg")):
self.logger.info(f"{i + 1}/{len(data)} Downloading: {tape['name']}")
self.fetch_and_cut_song(tape, ffmpeg_version)
else:
self.logger.info(f"{i + 1}/{len(data)} Already exists: {tape['name']}")
self.logger.info(f"Removing temporary music folder.")
rmfulldir(f"{self.build_dir}/tmp_music/")
self.logger.info(f"Copying the sound effects to build.")
copy_tree(f"{self.source_dir}/sound_effects", f"{self.build_dir}/sound_effects")
def prepare_images(self, data):
self.logger.info(f"Assembling covers and icons into png files.")
self.assemble_png_images(data, "covers")
self.assemble_png_images(data, "icons", resize=(64, 41))
self.assemble_png_images(data, "sprites", resize=(33, 21))
self.logger.info(f"Copying other images.")
shutil.copy(f"{self.source_dir}/images/players_icons.png", f"{self.build_dir}/players_icons.png")
shutil.copy(f"{self.source_dir}/images/players_sprites.png", f"{self.build_dir}/players_sprites.png")
shutil.copy(f"{self.source_dir}/images/PreviewImage.png", f"{self.build_dir}/PreviewImage.png")
def build_xml_code(self, data, config):
self.logger.info(f"Calculate the value that lets you use the songs n-times.")
song_lengths = [OggVorbis(f"{self.music_dir}/{tape['identifier']}.ogg").info.length
for tape in data]
use_lengths = [song_length * n for song_length, n in
zip(song_lengths, [tape["no_of_uses"] for tape in data])]
condition_delta = [f"{1 / use_length:0.5f}" for use_length in use_lengths]
affliction_delta = [100 / song_length for song_length in song_lengths]
columns = int((len(data)) ** 0.5)
positions = [{"column": i % columns, "row": math.floor(i / columns)} for i in range(len(data))]
self.logger.info(f"Creating jinja environment.")
# create jinja2 environment
j2env = j2.Environment(loader=j2.FileSystemLoader(Path(".")))
j2env.globals.update(zip=zip)
# load the template file
template0 = j2env.get_template(f"{self.source_dir}/filelist_template.xml")
template1 = j2env.get_template(f"{self.source_dir}/sunken_tapes_template.xml")
template2 = j2env.get_template(f"{self.source_dir}/sunken_tapes_style_template.xml")
self.logger.info(f"Rendering the xml files.")
with open(f"{self.build_dir}/filelist.xml", "w+", encoding="utf8") as output_file:
# render the template
output_file.write(template0.render(config=config, tapes=data))
with open(f"{self.build_dir}/{config['slug']}.xml", "w+", encoding="utf8") as output_file:
# render the template
output_file.write(template1.render(tapes=data, config=config,
condition_delta=condition_delta,
affliction_delta=affliction_delta,
song_lengths=song_lengths,
positions=positions))
with open(f"{self.build_dir}/{config['slug']}_style.xml", "w+", encoding="utf8") as output_file:
# render the template
output_file.write(template2.render(tapes=data, config=config, positions=positions))
def deploy(self, local_mod_name: str, config):
try:
os.mkdir(self.build_dir)
except FileExistsError:
self.logger.info(f"Removing old XML files in {self.build_dir}/:")
for f in Path(self.build_dir).glob("*.xml"):
self.logger.info(f" {f}")
os.remove(f)
pass
self.logger.info("Reading tapes.yaml")
data_file = Path(f"{self.source_dir}/tapes.yaml")
# load yaml file
with data_file.open(encoding='utf-8') as fr:
data = yaml.load(fr, Loader=yaml.SafeLoader)
self.prepare_music(data)
self.prepare_images(data)
self.build_xml_code(data, config)
mod_directory = f"{self.install_dir}/LocalMods/{local_mod_name}/"
self.logger.info(f"Removing the old installed mod directory {mod_directory}.")
rmfulldir(mod_directory)
self.logger.info(f"Copying the new build.")
if Path(f"{self.install_dir}").is_dir():
copy_tree(self.build_dir, mod_directory)
else:
raise FileNotFoundError(
f"{self.install_dir} does not exist. Set up the correct Barotrauma installation directory")
self.logger.info(f"Done!")