#! /usr/bin/python3
# encoding=utf-8

# game-data-packager command-line launcher stub

# Copyright © 2015-2016 Simon McVittie <smcv@debian.org>
# SPDX-License-Identifier: GPL-2.0-or-later

import argparse
import json
import glob
import logging
import os
import shlex
import shutil
import string
import sys
import traceback
import zipfile
from typing import (Any, overload)

HERE = os.path.dirname(os.path.realpath(__file__))

if 'GDP_UNINSTALLED' in os.environ:
    _srcdir = HERE

    if 'GDP_BUILDDIR' in os.environ:
        RUNTIME_BUILT = os.environ['GDP_BUILDDIR']
    else:
        RUNTIME_BUILT = os.path.join(_srcdir, 'out')

    RUNTIME_SOURCE = os.path.join(_srcdir, 'runtime')
else:
    RUNTIME_BUILT = '/usr/share/games/game-data-packager-runtime'
    RUNTIME_SOURCE = '/usr/share/games/game-data-packager-runtime'

if HERE not in sys.path:
    sys.path[:0] = [HERE]

from gdp_launcher_version import GAME_PACKAGE_VERSION   # noqa

# Normalize environment so we can use ${XDG_DATA_HOME} unconditionally.
# Do this before we use GLib functions that might create worker threads,
# because setenv() is not thread-safe.
ORIG_ENVIRON = os.environ.copy()
os.environ.setdefault('XDG_CACHE_HOME', os.path.expanduser('~/.cache'))
os.environ.setdefault('XDG_CONFIG_HOME', os.path.expanduser('~/.config'))
os.environ.setdefault('XDG_CONFIG_DIRS', '/etc/xdg')
os.environ.setdefault('XDG_DATA_HOME', os.path.expanduser('~/.local/share'))
os.environ.setdefault('XDG_STATE_HOME', os.path.expanduser('~/.local/state'))
os.environ.setdefault('XDG_DATA_DIRS', '/usr/local/share:/usr/share')

logger = logging.getLogger('game-data-packager.launcher.base')

if os.environ.get('GDP_DEBUG'):
    logger.setLevel(logging.DEBUG)
else:
    logger.setLevel(logging.INFO)

DISTRO = 'the distribution'

try:
    os_release = open('/usr/lib/os-release')
except OSError:
    pass
else:
    for line in os_release:
        if line.startswith('NAME='):
            line = line[5:].strip()
            if line.startswith('"'):
                line = line.strip('"')
            elif line.startswith("'"):
                line = line.strip("'")
            DISTRO = line
    os_release.close()


@overload
def expand(path: str, **kwargs: str | None) -> str:
    ...


@overload
def expand(path: None, **kwargs: str | None) -> None:
    ...


def expand(path: str | None, **kwargs: str | None) -> str | None:
    if path is None:
        return None

    return os.path.expanduser(
        string.Template(path).substitute(os.environ, **kwargs)
    )


def force_list(x: str | list[str] | set[str]) -> list[str]:
    if isinstance(x, str):
        return x.split()
    else:
        return list(x)


__cached_prefered_langs: list[str] | None = None


# Keep in sync with game_data_packager/util.py
def prefered_langs() -> list[str]:
    global __cached_prefered_langs

    if __cached_prefered_langs is not None:
        return __cached_prefered_langs

    lang_raw: list[str] = []
    if 'LANGUAGE' in os.environ:
        lang_raw = os.environ['LANGUAGE'].split(':')
    if 'LANG' in os.environ:
        lang_raw.append(os.environ['LANG'])
    lang_raw.append('en')

    __cached_prefered_langs = []
    for lang in lang_raw:
        lang = lang.split('.')[0]
        if not lang or lang == 'C':
            continue
        if lang not in __cached_prefered_langs:
            __cached_prefered_langs.append(lang)
        lang = lang[0:2]
        if lang not in __cached_prefered_langs:
            __cached_prefered_langs.append(lang)

    return __cached_prefered_langs


class Launcher:
    warning_stamp: str | None

    def __init__(self, argv: str | None = None) -> None:
        name = os.path.basename(sys.argv[0])

        if name.endswith('.py'):
            name = name[:-3]

        parser = argparse.ArgumentParser(
                description="game-data-packager's game launcher",
                allow_abbrev=False)
        parser.add_argument(
            '--id', default=name,
            help='identity of launched game (default: from argv[0])',
        )
        parser.add_argument(
            '--demo', default=False, action='store_true',
            help='run a demo version even if the full version is available',
        )
        parser.add_argument(
            '--engine', default=None,
            help='use the specified game engine, if supported',
        )
        parser.add_argument(
            '--expansion', default=None,
            help='expansion to launch',
        )
        parser.add_argument(
            '--smp', default=False, action='store_true',
            help='use a multi-threaded game engine, if supported',
        )
        parser.add_argument(
            '--print-backtrace', default=False, action='store_true',
            help='print backtrace on crash',
        )
        parser.add_argument(
            '--debugger', default=None,
            help='run engine under a debugger',
        )
        parser.add_argument(
            '--quiet', '-q', default=False, action='store_true',
            help='silence console logging',
        )
        parser.add_argument(
            '--allow-binary-only', default=False, action='store_true',
            help='Allow running binary-only games',
        )
        parser.add_argument(
            '--version', action='version',
            version='game-data-packager launcher ' + GAME_PACKAGE_VERSION,
        )
        parser.add_argument(
            'arguments', nargs=argparse.REMAINDER,
            help='arguments for the launched game',
        )
        self.args, rest = parser.parse_known_args(argv)
        self.args.arguments[:0] = rest

        if self.args.id == 'launcher':
            parser.print_help()
            sys.exit(2)

        self.id: str = self.args.id
        self.name: str = self.id
        self.expansion_name: str | None = self.args.expansion

        self.set_id()

        self.load_launch_data()

        self.game = self.data.get('game', self.id)
        self.package = self.data.get('package', self.id)
        self.binary_only = self.data.get('binary_only', False)
        self.cdkey = self.data.get('cdkey', None)
        logger.debug('Binary-only: %r', self.binary_only)
        self.required_files = force_list(self.data.get('required_files', []))
        logger.debug('Checked files: %r', sorted(self.required_files))
        self.limit_gl_extensions = self.data.get('limit_gl_extensions', False)
        logger.debug('Limit_GL_EXTENSIONS: %r', self.limit_gl_extensions)

        self.extra_env = self.data.get('extra_env', {})
        self.expand_tokens = {}

        if self.id == 'ut99':
            if (
                os.uname().machine == 'x86_64'
                and os.path.exists('/usr/lib/ut99/System64/ut-bin')
            ):
                self.expand_tokens['UT99System'] = 'System64'
            elif (
                os.uname().machine == 'aarch64'
                and os.path.exists('/usr/lib/ut99/SystemARM64/ut-bin')
            ):
                self.expand_tokens['UT99System'] = 'SystemARM64'
            else:
                self.expand_tokens['UT99System'] = 'System'

        self.dot_directory = self.expand(
            self.data.get(
                'dot_directory',
                '${XDG_DATA_HOME}/' + self.id,
            )
        )

        if (
            self.id in ('quake3', 'quake3-server')
            and os.path.exists('/usr/lib/ioquake3')
            and not os.path.exists('/usr/lib/ioquake3/xdg-migration.stamp')
        ):
            self.dot_directory = self.expand('~/.q3a')

        logger.debug('Dot directory: %s', self.dot_directory)

        self.old_dot_directories = []

        for old in self.data.get('old_dot_directories', []):
            self.old_dot_directories.append(self.expand(old))
            logger.debug(
                'Migrating from old dot directory: %s',
                self.expand(old),
            )

        if 'engine' in self.data:
            self.engines = [self.data['engine']]
        elif 'engines' in self.data:
            self.engines = self.data['engines']
        else:
            self.engines = []

        if self.args.smp:
            if 'smp_engine' in self.data:
                self.engines.insert(0, self.data['smp_engine'])
            else:
                raise SystemExit(
                    'This game does not have a separate SMP/threaded engine',
                )

        if self.engines and self.args.engine is not None:
            self.engines.insert(0, self.args.engine)

        self.engine = None

        self.base_directories = list(
            map(
                self.expand,
                self.data.get('base_directories', ['/usr/lib/' + self.id]),
            ),
        )
        logger.debug('Base directories: %r', self.base_directories)

        self.library_preload = self.data.get('library_preload', [])
        logger.debug('Library preload: %r', self.library_preload)

        self.working_directory = self.expand(
            self.data.get('working_directory', None),
        )
        logger.debug('Working directory: %s', self.working_directory)

        self.argv = force_list(self.data.get('argv', []))
        extra_argv_quiet = force_list(self.data.get('extra_argv_quiet', []))

        self.languages: list[str] = []
        if 'languages' in self.data:
            for lang in self.data.get("languages").get('choices'):
                lang_to_add = False
                if lang in self.data.get("languages").get('checks', []):
                    f = self.data.get("languages").get('checks').get(lang)
                    for p in self.base_directories:
                        if os.path.exists(os.path.join(p, f)):
                            lang_to_add = True
                else:
                    lang_to_add = True

                if lang_to_add:
                    self.languages.append(lang)

        self.exit_status = 1

        self.symlink_into_dot_directory = self.data.get(
            'symlink_into_dot_directory',
            [],
        )
        self.symlink_into_dot_directory_dests = self.data.get(
            'symlink_into_dot_directory_dests',
            [self.dot_directory],
        )

        for p in self.base_directories:
            logger.debug('Searching: %s' % p)

        # sanity check: game engines often don't cope well with missing data
        self.have_all_data = self.check_required_files(
            self.base_directories,
            self.required_files,
        )

        if (self.args.demo or not self.have_all_data) and 'demo' in self.data:
            p = self.data['demo'].get('dot_directory', '')

            if p:
                self.dot_directory = self.expand(p)
                logger.debug('Changed dot directory to %s for demo', p)

            demo_directories = list(
                map(
                    self.expand,
                    self.data['demo'].get(
                        'base_directories', self.data['base_directories']
                    )
                )
            )
            if self.check_required_files(
                demo_directories,
                self.data['demo'].get('required_files', self.required_files)
            ):
                self.have_all_data = True
                self.base_directories = demo_directories

            if 'argv' in self.data['demo']:
                self.argv = force_list(self.data['demo']['argv'])
        else:
            # assume expansions only work with non-demo data
            for expansion, data in self.data.get('expansions', {}).items():
                base_directories = list(
                    map(
                        self.expand,
                        data.get('base_directories', [])
                    )
                ) + self.base_directories

                if self.check_required_files(
                    base_directories,
                    force_list(data.get('extra_required_files', [])),
                ):
                    self.symlink_into_dot_directory = (
                            self.symlink_into_dot_directory +
                            data.get('symlink_into_dot_directory', []))

                aliases = force_list(data.get('aliases', []))

                if (self.expansion_name == expansion or
                        self.expansion_name in aliases):
                    extra_argv = force_list(data.get('extra_argv', []))
                    self.argv = self.argv + extra_argv

                    extra_required_files = force_list(
                            data.get('extra_required_files', []))
                    self.required_files = (
                        self.required_files + extra_required_files
                    )

                    self.base_directories = base_directories
                    break

        self.library_path = [
            self.expand(a, dot_directory=self.dot_directory)
            for a in self.data.get('library_path', [])
        ]
        logger.debug('Library path: %r', self.library_path)

        if self.binary_only:
            assert self.dot_directory is not None
            if self.id in ('quake4', 'etqw'):
                self.warning_stamp = os.path.join(
                    self.dot_directory,
                    'confirmed-binary-only',
                )
            else:
                self.warning_stamp = os.path.join(
                    self.dot_directory,
                    'confirmed-binary-only.stamp',
                )
        else:
            self.warning_stamp = None

        if self.args.quiet:
            self.argv = self.argv + extra_argv_quiet

        logger.debug('Arguments: %r', self.argv)

    def set_id(self) -> None:
        pass

    def load_launch_data(self) -> None:
        launch_path = '%s/launch-%s.json' % (RUNTIME_BUILT, self.id)

        if os.path.exists(launch_path):
            self.data = json.load(open(launch_path, encoding='utf-8'))
        else:
            launch_path = 'launch-%s.json' % (self.id)
            zip_path = '%s/launch_scummvm.zip' % (RUNTIME_BUILT)
            with zipfile.ZipFile(zip_path, 'r') as zf:
                files = zf.namelist()
                if launch_path in files:
                    self.data = json.load(zf.open(launch_path))
                else:
                    raise RuntimeError(f"Can find launch data for {id=}")

    def expand(self, path: str, **kwargs: str | None) -> str:
        expand_tokens: dict[str, str | None] = {}
        expand_tokens.update(self.expand_tokens)
        expand_tokens.update(kwargs)
        return expand(path, **expand_tokens)

    def check_required_files(
        self,
        base_directories: list[str],
        required_files: list[str],
        warn: bool = True
    ) -> bool:
        for f in required_files:
            f = self.expand(f)
            logger.debug('looking for %s', f)
            for p in base_directories:
                logger.debug('looking for %s in %s', f, p)
                if os.path.exists(os.path.join(p, f)):
                    logger.debug('found %s in %s', f, p)
                    break
            else:
                if warn:
                    logger.warning('Data file is missing: %s' % f)
                return False
        else:
            return True

    def run_error(self, message: str) -> None:
        logger.error(message)

    def main(self) -> None:
        if self.engines:
            for e in self.engines:
                e = self.expand(e)
                if shutil.which(e) is not None:
                    self.engine = e
                    break
            else:
                self.run_error(
                    self.load_text(
                        'missing-engine.txt',
                        '\n'.join(
                            ['Game engine missing, tried:']
                            + [self.expand(e) for e in self.engines]
                        )
                    )
                )
                sys.exit(os.EX_OSFILE)

        if self.dot_directory is not None:
            os.makedirs(self.dot_directory, exist_ok=True)

        if not self.have_all_data:
            self.run_error(
                    self.load_text('missing-data.txt', 'Data files missing'))
            sys.exit(os.EX_OSFILE)

        if self.binary_only:
            assert self.warning_stamp is not None
            if (not os.path.exists(self.warning_stamp) and
               not self.args.allow_binary_only):
                self.exit_status = os.EX_NOPERM
                self.run_confirm_binary_only()
                sys.exit(self.exit_status)
                raise AssertionError('not reached')

        if len(self.languages) > 1:
            self.select_language()

        try:
            self.exec_game()
        except Exception:
            self.run_error(traceback.format_exc())
            sys.exit(self.exit_status)
        else:
            raise AssertionError('exec_game should never return')

    def flush(self) -> None:
        for f in (sys.stdout, sys.stderr):
            f.flush()

    def select_language(self) -> None:
        # don't do anything, we have no GUI
        pass

    def run_confirm_binary_only(self) -> None:
        # don't do anything, we have no GUI
        self.run_error(
            'Not running binary-only game without --allow-binary-only',
        )
        sys.exit(1)

    def run_enter_cdkey(self, cd_key_file: str) -> None:
        self.run_error(
            'Please paste your CDKEY into file %s' % cd_key_file,
        )

    def exec_game(self, _unused: Any = None) -> None:
        self.exit_status = os.EX_UNAVAILABLE

        # Copy before linking, so that the copies will suppress symlink
        # creation
        for pattern in self.data.get('copy_into_dot_directory', ()):
            pattern = self.expand(pattern)
            assert self.dot_directory is not None
            # copy from all base directories, highest priority first
            for base in self.base_directories:
                for f in glob.glob(os.path.join(base, pattern)):
                    assert f.startswith(base + '/')
                    target = os.path.join(
                        self.dot_directory,
                        f[len(base) + 1:],
                    )
                    d = os.path.dirname(target)

                    if os.path.exists(target):
                        logger.debug('Already exists: %s', target)
                        continue

                    if d:
                        logger.info('Creating directory: %s', d)
                        os.makedirs(d, exist_ok=True)

                    logger.info('Copying %s -> %s', f, target)
                    shutil.copyfile(f, target)

        for subdir in self.symlink_into_dot_directory:
            subdir = self.expand(subdir)

            for dest in self.symlink_into_dot_directory_dests:
                dest = self.expand(dest)

                if not os.path.exists(dest):
                    logger.debug('%s does not exist to set up symlinks', dest)
                    continue

                dot_subdir = os.path.join(dest, subdir)
                logger.debug(
                    'symlinking ${each base directory}/%s/** as %s/**',
                    subdir, dot_subdir,
                )
                # prune dangling symbolic links
                if os.path.exists(dot_subdir):
                    logger.debug(
                        'checking %r for dangling symlinks',
                        dot_subdir,
                    )
                    for dirpath, dirnames, filenames in os.walk(dot_subdir):
                        logger.debug(
                            'walking: %r %r %r', dirpath, dirnames, filenames,
                        )
                        for filename in filenames:
                            logger.debug(
                                'checking whether %r is a dangling symlink',
                                filename,
                            )
                            f = os.path.join(dirpath, filename)

                            if not os.path.exists(f):
                                logger.info('Removing dangling symlink %s', f)
                                os.remove(f)

                # symlink in all base directories, highest priority first
                for base in self.base_directories:
                    base_subdir = os.path.join(base, subdir)
                    logger.debug(
                        'Searching for files to link in %s',
                        base_subdir,
                    )
                    for dirpath, dirnames, filenames in os.walk(base_subdir):
                        logger.debug(
                            'walking: %r %r %r', dirpath, dirnames, filenames,
                        )
                        for filename in filenames:
                            logger.debug(
                                'ensuring that %s is symlinked in', filename,
                            )

                            f = os.path.join(dirpath, filename)
                            logger.debug('%s', f)
                            assert f.startswith(base_subdir + '/')

                            target = os.path.join(
                                dot_subdir,
                                f[len(base_subdir) + 1:],
                            )
                            d = os.path.dirname(target)

                            if os.path.exists(target):
                                logger.debug('Already exists: %s', target)
                                continue

                            if os.path.lexists(target):
                                logger.info(
                                    'Removing dangling symlink %s',
                                    target,
                                )
                                os.remove(target)

                            if d:
                                logger.info('Creating directory: %s', d)
                                os.makedirs(d, exist_ok=True)

                            logger.info('Symlinking %s -> %s', f, target)
                            os.symlink(f, target)

                for dirpath, dirnames, filenames in os.walk(base_subdir):
                    for filename in filenames:
                        f = os.path.join(dirpath, filename)

                        if not os.path.exists(f):
                            logger.info('Removing dangling symlink %s', f)
                            os.remove(f)

        if self.cdkey is not None:
            cd_key_file = os.path.join(
                    self.dot_directory,
                    self.cdkey,
                )
            if not os.path.exists(cd_key_file):
                self.exit_status = os.EX_NOPERM
                self.run_enter_cdkey(cd_key_file)
                sys.exit(self.exit_status)
                raise AssertionError('not reached')

        if self.working_directory is not None:
            os.chdir(self.working_directory)

        for p in self.base_directories:
            if os.path.isdir(p):
                base_directory = p
                break
        else:
            base_directory = None

        self.argv = [
            self.expand(a, base_directory=base_directory,
                        dot_directory=self.dot_directory)
            for a in self.argv
        ]

        if self.engine is not None:
            self.argv.insert(0, self.engine)

        environ = os.environ.copy()
        for k, v in self.extra_env.items():
            if k not in environ:
                environ[k] = str(v)

        if self.limit_gl_extensions:
            environ['MESA_EXTENSION_MAX_YEAR'] = '2002'       # Mesa
            environ['__GL_ExtensionStringVersion'] = '17700'  # Nvidia

        library_path = self.library_path[:]

        if 'LD_LIBRARY_PATH' in environ:
            library_path.append(environ['LD_LIBRARY_PATH'])

        environ['LD_LIBRARY_PATH'] = ':'.join(library_path)

        if len(self.library_preload) > 0:
            environ['LD_PRELOAD'] = ':'.join(self.library_preload)

        if self.args.print_backtrace:
            self.argv[:0] = [
                'gdb', '-return-child-result', '-batch',
                '-ex', 'run', '-ex', 'thread apply all bt full',
                '-ex', 'kill', '-ex', 'quit', '--args',
            ]
        elif self.args.debugger:
            self.argv[:0] = shlex.split(self.args.debugger)

        if self.args.debugger == 'gdb':
            self.run_error(
                'gdb requires arguments: please use --debugger="gdb --args" '
                'instead'
            )
            sys.exit(2)

        logger.debug('Executing: %r', self.argv + self.args.arguments)
        self.flush()

        if self.args.quiet:
            os.dup2(os.open(os.devnull, os.O_RDONLY), 0)
            os.dup2(os.open(os.devnull, os.O_WRONLY), 1)
            os.dup2(os.open(os.devnull, os.O_WRONLY), 2)

        os.execvpe(self.argv[0], self.argv + self.args.arguments, environ)

        raise AssertionError('os.execve should never return')

    def load_text(self, filename: str, placeholder: str) -> str:
        for f in (
            '%s.%s' % (self.id, filename),
            '%s.%s' % (self.game, filename),
            filename,
        ):
            try:
                path = os.path.join(RUNTIME_SOURCE, f)
                text = open(path).read()
            except OSError:
                pass
            else:
                text = string.Template(text).safe_substitute(
                        distro=DISTRO,
                        game=self.game,
                        id=self.id,
                        name=self.name,
                        package=self.package,
                        )
                # strip single \n
                text = text.replace('\n\n', '\r\r').replace('\n', ' ')
                text = text.replace('\r', '\n')
                return text
        else:
            return placeholder


if __name__ == '__main__':
    logging.basicConfig()
    Launcher().main()
