__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

[email protected]: ~ $
# DistUpgradeQuirks.py
#
#  Copyright (c) 2004-2010 Canonical
#
#  Author: Michael Vogt <[email protected]>
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU General Public License as
#  published by the Free Software Foundation; either version 2 of the
#  License, or (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
#  USA

import apt
import atexit
import distro_info
import glob
import logging
import os
import re
import subprocess
import pathlib
from subprocess import PIPE, Popen

from .DistUpgradeGettext import gettext as _


class DistUpgradeQuirks(object):
    """
    This class collects the various quirks handlers that can
    be hooked into to fix/work around issues that the individual
    releases have.

    The following handlers are supported:
    - PreCacheOpen: run *before* the apt cache is opened the first time
                    to set options that affect the cache
    - PostInitialUpdate: run *before* the sources.list is rewritten but
                         after an initial apt-get update
    - PreDistUpgradeCache: run *right before* the dist-upgrade is
                           calculated in the cache
    - PostDistUpgradeCache: run *after* the dist-upgrade was calculated
                            in the cache
    - StartUpgrade: before the first package gets installed (but the
                    download is finished)
    - PostUpgrade: run *after* the upgrade is finished successfully and
                   packages got installed
    - PostCleanup: run *after* the cleanup (orphaned etc) is finished
    """

    def __init__(self, controller, config):
        self.controller = controller
        self._view = controller._view
        self.config = config
        self.uname = Popen(["uname", "-r"], stdout=PIPE,
                           universal_newlines=True).communicate()[0].strip()
        self.extra_snap_space = 0
        self._poke = None
        self._snapstore_reachable = False
        self._snap_list = None
        self._did_change_font = False

    # individual quirks handler that run *before* the cache is opened
    def PreCacheOpen(self):
        """ run before the apt cache is opened the first time """
        # we do not run any quirks in partialUpgrade mode
        if self.controller._partialUpgrade:
            logging.info("not running quirks in partialUpgrade mode")
            return

        logging.debug("running Quirks.PreCacheOpen")
        self._add_apport_ignore_list()

    # individual quirks handler that run *after* the cache is opened
    def PostInitialUpdate(self):
        # PreCacheOpen would be better but controller.abort fails terribly
        """ run after the apt cache is opened the first time """
        # we do not run any quirks in partialUpgrade mode
        if self.controller._partialUpgrade:
            logging.info("not running quirks in partialUpgrade mode")
            return

        logging.debug("running Quirks.PostInitialUpdate")
        self._test_and_fail_on_tpm_fde()

        cache = self.controller.cache
        self._test_and_warn_if_ros_installed(cache)

        self._maybe_prevent_flatpak_auto_removal()

        if 'snapd' not in cache:
            logging.debug("package required for Quirk not in cache")
            return
        if cache['snapd'].is_installed and \
                (os.path.exists('/run/snapd.socket') or
                 os.path.exists('/run/snapd-snap.socket')):
            self._checkStoreConnectivity()
        # If the snap store is accessible, at the same time calculate the
        # extra size needed by to-be-installed snaps.  This also prepares
        # the snaps-to-install list for the actual upgrade.
        if self._snapstore_reachable:
            self._calculateSnapSizeRequirements()

    def PostUpgrade(self):
        # we do not run any quirks in partialUpgrade mode
        if self.controller._partialUpgrade:
            logging.info("not running quirks in partialUpgrade mode")
            return

        logging.debug("running Quirks.PostUpgrade")
        cache = self.controller.cache
        if 'snapd' not in cache:
            logging.debug("package required for Quirk not in cache")
            return
        if cache['snapd'].is_installed and \
                self._snap_list:
            self._replaceDebsAndSnaps()
        if 'ubuntu-desktop-raspi' in cache:
            if cache['ubuntu-desktop-raspi'].is_installed:
                self._replace_fkms_overlay()
        if 'ubuntu-server-raspi' in cache:
            if cache['ubuntu-server-raspi'].is_installed:
                self._add_kms_overlay()

    # individual quirks handler when the dpkg run is finished ---------
    def PostCleanup(self):
        # we do not run any quirks in partialUpgrade mode
        if self.controller._partialUpgrade:
            logging.info("not running quirks in partialUpgrade mode")
            return

        " run after cleanup "
        logging.debug("running Quirks.PostCleanup")
        self._remove_apport_ignore_list()

        # Try to refresh snaps, but ignore errors.
        try:
            subprocess.check_call(['snap', 'refresh'])
        except Exception as e:
            logging.debug(f'Failed to refresh snaps : {e}')

    # run right before the first packages get installed
    def StartUpgrade(self):
        # we do not run any quirks in partialUpgrade mode
        if self.controller._partialUpgrade:
            logging.info("not running quirks in partialUpgrade mode")
            return

        logging.debug("running Quirks.StartUpgrade")
        cache = self.controller.cache
        self._removeOldApportCrashes()
        self._killUpdateNotifier()
        self._pokeScreensaver()
        self._set_generic_font()
        self._disable_kdump_tools_on_install(cache)

    # individual quirks handler that run *right before* the dist-upgrade
    # is calculated in the cache
    def PreDistUpgradeCache(self):
        """ run right before calculating the dist-upgrade """
        # we do not run any quirks in partialUpgrade mode
        if self.controller._partialUpgrade:
            logging.info("not running quirks in partialUpgrade mode")
            return

        logging.debug("running Quirks.PreDistUpgradeCache")
        self._maybe_remove_gpg_wks_server()
        self._install_linux_sysctl_defaults()

    # individual quirks handler that run *after* the dist-upgrade was
    # calculated in the cache
    def PostDistUpgradeCache(self):
        """ run after calculating the dist-upgrade """
        # we do not run any quirks in partialUpgrade mode
        if self.controller._partialUpgrade:
            logging.info("not running quirks in partialUpgrade mode")
            return

        logging.debug("running Quirks.PostDistUpgradeCache")
        self._install_linux_metapackage()

    def _test_and_warn_if_ros_installed(self, cache):
        """
        Test and warn if ROS is installed. A given ROS release only
        supports specific Ubuntu releases, and can cause the upgrade
        to fail in an overly-cryptic manner.
        """

        # These are the root ROS 1 and 2 dependencies as of 07/27/2020
        ros_package_patterns = set()
        for package_name in (
                "catkin",
                "rosboost-cfg",
                "rosclean",
                "ros-environment",
                "ros-workspace"):
            ros_package_patterns.add(
                re.compile(r"ros-[^\-]+-%s" % package_name))

        ros_is_installed = False
        for pkg in cache:
            if ros_is_installed:
                break

            for pattern in ros_package_patterns:
                if pattern.match(pkg.name):
                    if pkg.is_installed or pkg.marked_install:
                        ros_is_installed = True
                    break

        if ros_is_installed:
            res = self._view.askYesNoQuestion(
                _("The Robot Operating System (ROS) is installed"),
                _("It appears that ROS is currently installed. Each ROS "
                  "release is very strict about the versions of Ubuntu "
                  "it supports, and Ubuntu upgrades can fail if that "
                  "guidance isn't followed. Before continuing, please "
                  "either uninstall ROS, or ensure the ROS release you "
                  "have installed supports the version of Ubuntu to "
                  "which you're upgrading.\n\n"
                  "For ROS 1 releases, refer to REP 3:\n"
                  "https://www.ros.org/reps/rep-0003.html\n\n"
                  "For ROS 2 releases, refer to REP 2000:\n"
                  "https://www.ros.org/reps/rep-2000.html\n\n"
                  "Are you sure you want to continue?"))
            if not res:
                self.controller.abort()

    def _maybe_prevent_flatpak_auto_removal(self):
        """
        If flatpak is installed, and there are either active remotes, or
        flatpak apps installed, prevent flatpak's auto-removal on upgrade.
        """
        prevent_auto_removal = False

        if "flatpak" not in self.controller.cache:
            return

        if not self.controller.cache["flatpak"].is_installed:
            return

        if not os.path.exists("/usr/bin/flatpak"):
            return

        for subcmd in ["remotes", "list"]:
            r = subprocess.run(
                    ["/usr/bin/flatpak", subcmd],
                    stdout=subprocess.PIPE
            )
            if r.stdout.decode("utf-8").strip():
                prevent_auto_removal = True
                break

        logging.debug("flatpak will{}be marked as manually installed"
                      .format(" " if prevent_auto_removal else " NOT "))

        if not prevent_auto_removal:
            return

        self.controller.cache["flatpak"].mark_auto(auto=False)

        for pkg in ("plasma-discover-backend-flatpak",
                    "gnome-software-plugin-flatpak"):
            if pkg not in self.controller.cache:
                continue

            if not self.controller.cache[pkg].is_installed:
                continue

            logging.debug("{} will be marked as manually installed"
                          .format(pkg))
            self.controller.cache[pkg].mark_auto(auto=False)

        self.controller.cache.commit(
            self._view.getAcquireProgress(),
            self._view.getInstallProgress(self.controller.cache)
        )

    def _add_apport_ignore_list(self):
        ignore_list = [
            '/usr/libexec/tracker-extract-3',  # LP: #2012638
            '/usr/sbin/update-apt-xapian-index',  # LP: #2058227
        ]

        path = '/etc/apport/blacklist.d/upgrade-quirks-ignore-list'

        try:
            os.makedirs(os.path.dirname(path), exist_ok=True)
            with open(path, 'w') as f:
                for bin in ignore_list:
                    f.write(f'{bin}\n')

        except Exception as e:
            logging.debug(f'Failed to create {path}: {e}')

    def _remove_apport_ignore_list(self):
        path = '/etc/apport/blacklist.d/upgrade-quirks-ignore-list'

        try:
            os.remove(path)
        except Exception as e:
            logging.debug(f'Failed to remove {path}: {e}')

    def _killUpdateNotifier(self):
        "kill update-notifier"
        # kill update-notifier now to suppress reboot required
        if os.path.exists("/usr/bin/killall"):
            logging.debug("killing update-notifier")
            subprocess.call(["killall", "-q", "update-notifier"])

    def _pokeScreensaver(self):
        if (os.path.exists("/usr/bin/xdg-screensaver") and
                os.environ.get('DISPLAY')):
            logging.debug("setup poke timer for the screensaver")
            cmd = "while true;"
            cmd += " do /usr/bin/xdg-screensaver reset >/dev/null 2>&1;"
            cmd += " sleep 30; done"
            try:
                self._poke = subprocess.Popen(cmd, shell=True)
                atexit.register(self._stopPokeScreensaver)
            except (OSError, ValueError):
                logging.exception("failed to setup screensaver poke")

    def _stopPokeScreensaver(self):
        res = False
        if self._poke is not None:
            try:
                self._poke.terminate()
                res = self._poke.wait()
            except OSError:
                logging.exception("failed to stop screensaver poke")
            self._poke = None
        return res

    def _removeOldApportCrashes(self):
        " remove old apport crash files and whoopsie control files "
        try:
            for ext in ['.crash', '.upload', '.uploaded']:
                for f in glob.glob("/var/crash/*%s" % ext):
                    logging.debug("removing old %s file '%s'" % (ext, f))
                    os.unlink(f)
        except Exception as e:
            logging.warning("error during unlink of old crash files (%s)" % e)

    def _checkStoreConnectivity(self):
        """ check for connectivity to the snap store to install snaps"""
        res = False
        snap_env = os.environ.copy()
        snap_env["LANG"] = "C.UTF-8"
        try:
            connected = Popen(["snap", "debug", "connectivity"], stdout=PIPE,
                              stderr=PIPE, env=snap_env,
                              universal_newlines=True).communicate()
        except Exception as e:
            logging.debug(
                f'Failed to check snap store connectivity, assuming none: {e}'
            )
            self._snapstore_reachable = False
            return

        if re.search(r"^ \* PASS", connected[0], re.MULTILINE):
            self._snapstore_reachable = True
            return
        # can't connect
        elif re.search(r"^ \*.*unreachable", connected[0], re.MULTILINE):
            logging.error("No snap store connectivity")
            old_lxd_deb_installed = False
            cache = self.controller.cache
            if 'lxd' in cache:
                # epoch 1 is the transitional deb
                if cache['lxd'].is_installed and not \
                        cache['lxd'].candidate.version.startswith("1:"):
                    logging.error("lxd is installed")
                    old_lxd_deb_installed = True
            if old_lxd_deb_installed:
                summary = _("Connection to the Snap Store failed")
                msg = _("You have the package lxd installed but your "
                        "system is unable to reach the Snap Store. "
                        "lxd is now provided via a snap and the release "
                        "upgrade will fail if snapd is not functional. "
                        "Please make sure you're connected to the "
                        "Internet and update any firewall or proxy "
                        "settings as needed so that you can reach "
                        "api.snapcraft.io. If you are an enterprise "
                        "with a firewall setup you may want to configure "
                        "a Snap Store proxy."
                        )
                self._view.error(summary, msg)
                self.controller.abort()
            else:
                res = self._view.askYesNoQuestion(
                    _("Connection to Snap Store failed"),
                    _("Your system does not have a connection to the Snap "
                      "Store. For the best upgrade experience make sure "
                      "that your system can connect to api.snapcraft.io.\n"
                      "Do you still want to continue with the upgrade?")
                )
        # debug command not available
        elif 'error: unknown command' in connected[1]:
            logging.error("snap debug command not available")
            res = self._view.askYesNoQuestion(
                _("Outdated snapd package"),
                _("Your system does not have the latest version of snapd. "
                  "Please update the version of snapd on your system to "
                  "improve the upgrade experience.\n"
                  "Do you still want to continue with the upgrade?")
            )
        # not running as root
        elif 'error: access denied' in connected[1]:
            res = False
            logging.error("Not running as root!")
        else:
            logging.error("Unhandled error connecting to the snap store.")
        if not res:
            self.controller.abort()

    def _calculateSnapSizeRequirements(self):
        import json
        import urllib.request
        from urllib.error import URLError

        # first fetch the list of snap-deb replacements that will be needed
        # and store them for future reference, along with other data we'll
        # need in the process
        self._prepare_snap_replacement_data()
        # now perform direct API calls to the store, requesting size
        # information for each of the snaps needing installation
        self._view.updateStatus(_("Calculating snap size requirements"))
        for snap, snap_object in self._snap_list.items():
            if snap_object['command'] != 'install':
                continue
            action = {
                "instance-key": "upgrade-size-check",
                "action": "download",
                "snap-id": snap_object['snap-id'],
                "channel": snap_object['channel'],
            }
            data = {
                "context": [],
                "actions": [action],
            }
            req = urllib.request.Request(
                url='https://api.snapcraft.io/v2/snaps/refresh',
                data=bytes(json.dumps(data), encoding='utf-8'))
            req.add_header('Snap-Device-Series', '16')
            req.add_header('Content-type', 'application/json')
            req.add_header('Snap-Device-Architecture', self.controller.arch)
            try:
                response = urllib.request.urlopen(req).read()
                info = json.loads(response)
                size = int(info['results'][0]['snap']['download']['size'])
            except (KeyError, URLError, ValueError):
                logging.debug("Failed fetching size of snap %s" % snap)
                continue
            self.extra_snap_space += size

    def _replaceDebsAndSnaps(self):
        """ install a snap and mark its corresponding package for removal """
        self._view.updateStatus(_("Processing snap replacements"))
        # _snap_list should be populated by the earlier
        # _calculateSnapSizeRequirements call.
        for snap, snap_object in self._snap_list.items():
            command = snap_object['command']
            if command == 'switch':
                channel = snap_object['channel']
                self._view.updateStatus(
                    _("switching channel for snap %s" % snap)
                )
                popenargs = ["snap", command, "--channel", channel, snap]

            elif command == 'remove':
                self._view.updateStatus(_("removing snap %s" % snap))
                popenargs = ["snap", command, snap]
            else:
                self._view.updateStatus(_("installing snap %s" % snap))
                popenargs = ["snap", command,
                             "--channel", snap_object['channel'], snap]
            try:
                self._view.processEvents()
                proc = subprocess.run(
                    popenargs,
                    stdout=subprocess.PIPE,
                    check=True)
                self._view.processEvents()
            except subprocess.CalledProcessError:
                logging.debug("%s of snap %s failed" % (command, snap))
                continue
            if proc.returncode == 0:
                logging.debug("%s of snap %s succeeded" % (command, snap))
            if command == 'install' and snap_object['deb']:
                self.controller.forced_obsoletes.append(snap_object['deb'])

    def _is_greater_than(self, term1, term2):
        """ copied from ubuntu-drivers common """
        # We don't want to take into account
        # the flavour
        pattern = re.compile('(.+)-([0-9]+)-(.+)')
        match1 = pattern.match(term1)
        match2 = pattern.match(term2)
        if match1 and match2:
            term1 = '%s-%s' % (match1.group(1),
                               match1.group(2))
            term2 = '%s-%s' % (match2.group(1),
                               match2.group(2))

        logging.debug('Comparing %s with %s' % (term1, term2))
        return apt.apt_pkg.version_compare(term1, term2) > 0

    def _get_linux_metapackage(self, cache, headers):
        """ Get the linux headers or linux metapackage
            copied from ubuntu-drivers-common
        """
        suffix = headers and '-headers' or ''
        pattern = re.compile('linux-image-(.+)-([0-9]+)-(.+)')
        source_pattern = re.compile('linux-(.+)')

        metapackage = ''
        version = ''
        for pkg in cache:
            if ('linux-image' in pkg.name and 'extra' not in pkg.name and
                    (pkg.is_installed or pkg.marked_install)):
                match = pattern.match(pkg.name)
                # Here we filter out packages such as
                # linux-generic-lts-quantal
                if match:
                    source = pkg.candidate.record['Source']
                    current_version = '%s-%s' % (match.group(1),
                                                 match.group(2))
                    # See if the current version is greater than
                    # the greatest that we've found so far
                    if self._is_greater_than(current_version,
                                             version):
                        version = current_version
                        match_source = source_pattern.match(source)
                        # Set the linux-headers metapackage
                        if '-lts-' in source and match_source:
                            # This is the case of packages such as
                            # linux-image-3.5.0-18-generic which
                            # comes from linux-lts-quantal.
                            # Therefore the linux-headers-generic
                            # metapackage would be wrong here and
                            # we should use
                            # linux-headers-generic-lts-quantal
                            # instead
                            metapackage = 'linux%s-%s-%s' % (
                                           suffix,
                                           match.group(3),
                                           match_source.group(1))
                        else:
                            # The scheme linux-headers-$flavour works
                            # well here
                            metapackage = 'linux%s-%s' % (
                                           suffix,
                                           match.group(3))
        return metapackage

    def _install_linux_metapackage(self):
        """ Ensure the linux metapackage is installed for the newest_kernel
            installed. (LP: #1509305)
        """
        cache = self.controller.cache
        linux_metapackage = self._get_linux_metapackage(cache, False)
        # Seen on errors.u.c with linux-rpi2 metapackage
        # https://errors.ubuntu.com/problem/994bf05fae85fbcd44f721495db6518f2d5a126d
        if linux_metapackage not in cache:
            logging.info("linux metapackage (%s) not available" %
                         linux_metapackage)
            return
        # install the package if it isn't installed
        if not cache[linux_metapackage].is_installed:
            logging.info("installing linux metapackage: %s" %
                         linux_metapackage)
            reason = "linux metapackage may have been accidentally uninstalled"
            cache.mark_install(linux_metapackage, reason)

    def ensure_recommends_are_installed_on_desktops(self):
        """ ensure that on a desktop install recommends are installed
            (LP: #759262)
        """
        if not self.controller.serverMode:
            if not apt.apt_pkg.config.find_b("Apt::Install-Recommends"):
                msg = "Apt::Install-Recommends was disabled,"
                msg += " enabling it just for the upgrade"
                logging.warning(msg)
                apt.apt_pkg.config.set("Apt::Install-Recommends", "1")

    def _is_deb2snap_metapkg_installed(self, deb2snap_entry):
        """ Helper function that checks if the given deb2snap entry
            has at least one metapkg which is installed on the system.
        """
        metapkg_list = deb2snap_entry.get("metapkg", None)
        if not isinstance(metapkg_list, list):
            metapkg_list = [metapkg_list]

        for metapkg in metapkg_list:
            if metapkg not in self.controller.cache:
                continue
            if metapkg and \
                    self.controller.cache[metapkg].is_installed is False:
                continue

            return True

        return False

    def _parse_deb2snap_json(self):
        import json
        seeded_snaps = {}
        unseeded_snaps = {}

        try:
            deb2snap_path = os.path.join(
                os.path.dirname(os.path.abspath(__file__)),
                'deb2snap.json'
            )
            with open(deb2snap_path, 'r') as f:
                d2s = json.loads(f.read())

            for snap in d2s["seeded"]:
                seed = d2s["seeded"][snap]
                if not self._is_deb2snap_metapkg_installed(seed):
                    continue
                deb = seed.get("deb", None)
                # Support strings like stable/ubuntu-{FROM_VERSION} in
                # deb2snap.json so that (a) we don't need to update the file
                # for every release, and (b) so that from_channel is not bound
                # to only one release.
                from_chan = seed.get(
                    'from_channel',
                    'stable/ubuntu-{FROM_VERSION}'
                ).format(
                    FROM_VERSION=self.controller.fromVersion
                )
                to_chan = seed.get(
                    'to_channel',
                    'stable/ubuntu-{TO_VERSION}'
                ).format(
                    TO_VERSION=self.controller.toVersion
                )
                force_switch = seed.get("force_switch", False)
                seeded_snaps[snap] = (deb, from_chan, to_chan, force_switch)

            for snap in d2s["unseeded"]:
                unseed = d2s["unseeded"][snap]
                deb = unseed.get("deb", None)
                if not self._is_deb2snap_metapkg_installed(unseed):
                    continue
                from_chan = seed.get(
                    'from_channel',
                    'stable/ubuntu-{FROM_VERSION}'
                ).format(
                    FROM_VERSION=self.controller.fromVersion
                )
                unseeded_snaps[snap] = (deb, from_chan)
        except Exception as e:
            logging.warning("error reading deb2snap.json file (%s)" % e)

        return seeded_snaps, unseeded_snaps

    def _prepare_snap_replacement_data(self):
        """ Helper function fetching all required info for the deb-to-snap
            migration: version strings for upgrade (from and to) and the list
            of snaps (with actions).
        """
        self._snap_list = {}

        seeded_snaps, unseeded_snaps = self._parse_deb2snap_json()

        snap_list = ''
        # list the installed snaps and add them to seeded ones
        snap_list = subprocess.Popen(["snap", "list"],
                                     universal_newlines=True,
                                     stdout=subprocess.PIPE).communicate()
        if snap_list:
            # first line of output is a header and the last line is empty
            snaps_installed = [line.split()[0]
                               for line in snap_list[0].split('\n')[1:-1]]

            for snap in snaps_installed:
                if snap in seeded_snaps or snap in unseeded_snaps:
                    continue
                else:
                    seeded_snaps[snap] = (
                        None,
                        f'stable/ubuntu-{self.controller.fromVersion}',
                        f'stable/ubuntu-{self.controller.toVersion}',
                        False
                    )

        self._view.updateStatus(_("Checking for installed snaps"))
        for snap, props in seeded_snaps.items():
            (deb, from_channel, to_channel, force_switch) = props
            snap_object = {}
            # check to see if the snap is already installed
            snap_info = subprocess.Popen(["snap", "info", snap],
                                         universal_newlines=True,
                                         stdout=subprocess.PIPE).communicate()
            self._view.processEvents()
            if re.search(r"^installed: ", snap_info[0], re.MULTILINE):
                logging.debug("Snap %s is installed" % snap)

                if not re.search(r"^tracking:.*%s" % from_channel,
                                 snap_info[0], re.MULTILINE):
                    logging.debug("Snap %s is not tracking the release channel"
                                  % snap)

                    if not force_switch:
                        # It is not tracking the release channel, and we were
                        # not told to force so don't switch.
                        continue

                snap_object['command'] = 'switch'
            else:
                # Do not replace packages not installed
                cache = self.controller.cache
                if (deb and (deb not in cache or not cache[deb].is_installed)):
                    logging.debug("Deb package %s is not installed. Skipping "
                                  "snap package %s installation" % (deb, snap))
                    continue

                match = re.search(r"snap-id:\s*(\w*)", snap_info[0])
                if not match:
                    logging.debug("Could not parse snap-id for the %s snap"
                                  % snap)
                    continue
                snap_object['command'] = 'install'
                snap_object['deb'] = deb
                snap_object['snap-id'] = match[1]
            snap_object['channel'] = to_channel
            self._snap_list[snap] = snap_object
        for snap, (deb, from_channel) in unseeded_snaps.items():
            snap_object = {}
            # check to see if the snap is already installed
            snap_info = subprocess.Popen(["snap", "info", snap],
                                         universal_newlines=True,
                                         stdout=subprocess.PIPE).communicate()
            self._view.processEvents()
            if re.search(r"^installed: ", snap_info[0], re.MULTILINE):
                logging.debug("Snap %s is installed" % snap)
                # its not tracking the release channel so don't remove
                if not re.search(r"^tracking:.*%s" % from_channel,
                                 snap_info[0], re.MULTILINE):
                    logging.debug("Snap %s is not tracking the release channel"
                                  % snap)
                    continue

                snap_object['command'] = 'remove'

                # check if this snap is being used by any other snaps
                conns = subprocess.Popen(["snap", "connections", snap],
                                         universal_newlines=True,
                                         stdout=subprocess.PIPE).communicate()
                self._view.processEvents()

                for conn in conns[0].split('\n'):
                    conn_cols = conn.split()
                    if len(conn_cols) != 4:
                        continue
                    plug = conn_cols[1]
                    slot = conn_cols[2]

                    if slot.startswith(snap + ':'):
                        plug_snap = plug.split(':')[0]
                        if plug_snap != '-' and \
                           plug_snap not in unseeded_snaps:
                            logging.debug("Snap %s is being used by %s. "
                                          "Switching it to stable track"
                                          % (snap, plug_snap))
                            snap_object['command'] = 'switch'
                            snap_object['channel'] = 'stable'
                            break

                self._snap_list[snap] = snap_object
        return self._snap_list

    def _replace_pi_boot_config(self, old_config, new_config,
                                boot_config_filename, failure_action):
        try:
            boot_backup_filename = boot_config_filename + '.distUpgrade'
            with open(boot_backup_filename, 'w', encoding='utf-8') as f:
                f.write(old_config)
        except IOError as exc:
            logging.error("unable to write boot config backup to %s: %s; %s",
                          boot_backup_filename, exc, failure_action)
            return
        try:
            with open(boot_config_filename, 'w', encoding='utf-8') as f:
                f.write(new_config)
        except IOError as exc:
            logging.error("unable to write new boot config to %s: %s; %s",
                          boot_config_filename, exc, failure_action)

    def _replace_fkms_overlay(self, boot_dir='/boot/firmware'):
        failure_action = (
            "You may need to replace the vc4-fkms-v3d overlay with "
            "vc4-kms-v3d in config.txt on your boot partition")

        try:
            boot_config_filename = os.path.join(boot_dir, 'config.txt')
            with open(boot_config_filename, 'r', encoding='utf-8') as f:
                boot_config = f.read()
        except FileNotFoundError:
            logging.error("failed to open boot configuration in %s; %s",
                          boot_config_filename, failure_action)
            return

        new_config = ''.join(
            # startswith and replace used to cope with (and preserve) any
            # trailing d-t parameters, and any use of the -pi4 suffix
            '# changed by do-release-upgrade (LP: #1923673)\n#' + line +
            line.replace('dtoverlay=vc4-fkms-v3d', 'dtoverlay=vc4-kms-v3d')
            if line.startswith('dtoverlay=vc4-fkms-v3d') else
            # camera firmware disabled due to incompatibility with "full" kms
            # overlay; without the camera firmware active it's also better to
            # disable gpu_mem leaving the default (64MB) to allow as much as
            # possible for the KMS driver
            '# disabled by do-release-upgrade (LP: #1923673)\n#' + line
            if line.startswith('gpu_mem=') or line.rstrip() == 'start_x=1' else
            line
            for line in boot_config.splitlines(keepends=True)
        )

        if new_config == boot_config:
            logging.warning("no fkms overlay or camera firmware line found "
                            "in %s", boot_config_filename)
            return
        self._replace_pi_boot_config(
            boot_config, new_config, boot_config_filename, failure_action)

    def _add_kms_overlay(self, boot_dir='/boot/firmware'):
        failure_action = (
            "You may need to add dtoverlay=vc4-kms-v3d to an [all] section "
            "in config.txt on your boot partition")
        added_lines = [
            '# added by do-release-upgrade (LP: #2065051)',
            'dtoverlay=vc4-kms-v3d',
            'disable_fw_kms_setup=1',
            '',
            '[pi3+]',
            'dtoverlay=vc4-kms-v3d,cma-128',
            '',
            '[pi02]',
            'dtoverlay=vc4-kms-v3d,cma-128',
            '',
            '[all]',
        ]
        try:
            boot_config_filename = os.path.join(boot_dir, 'config.txt')
            with open(boot_config_filename, 'r', encoding='utf-8') as f:
                boot_config = f.read()
        except FileNotFoundError:
            logging.error("failed to open boot configuration in %s; %s",
                          boot_config_filename, failure_action)
            return

        def find_insertion_point(lines):
            # Returns the zero-based index of the dtoverlay=vc4-kms-v3d line in
            # an [all] section, if one exists, or the last line of the last
            # [all] section of the file, if one does not exist
            in_all = True
            last = 0
            for index, line in enumerate(lines):
                line = line.rstrip()
                if in_all:
                    last = index
                    # startswith used to cope with any trailing dtparams
                    if line.startswith('dtoverlay=vc4-kms-v3d'):
                        return last
                    elif line.startswith('[') and line.endswith(']'):
                        in_all = line == '[all]'
                    elif line.startswith('include '):
                        # [sections] are included from includes verbatim, hence
                        # (without reading the included file) we must assume
                        # we're no longer in an [all] section
                        in_all = False
                else:
                    in_all = line == '[all]'
            return last

        def add_kms_overlay(lines):
            insert_point = find_insertion_point(lines)
            try:
                if lines[insert_point].startswith('dtoverlay=vc4-kms-v3d'):
                    return lines
            except IndexError:
                # Empty config, apparently!
                pass
            lines[insert_point:insert_point] = added_lines
            return lines

        lines = [line.rstrip() for line in boot_config.splitlines()]
        lines = add_kms_overlay(lines)
        new_config = ''.join(line + '\n' for line in lines)

        if new_config == boot_config:
            logging.warning("no addition of KMS overlay required in %s",
                            boot_config_filename)
            return
        self._replace_pi_boot_config(
            boot_config, new_config, boot_config_filename, failure_action)

    def _set_generic_font(self):
        """ Due to changes to the Ubuntu font we enable a generic font
            (in practice DejaVu or Noto) during the upgrade.
            See https://launchpad.net/bugs/2034986
        """
        temp_font = 'Sans'

        if self._did_change_font:
            return

        if self.controller.get_user_env('XDG_SESSION_TYPE', '') in ('', 'tty'):
            # Avoid running this on server systems or when the upgrade
            # is done over ssh.
            return

        if self.controller.get_user_uid() is None:
            logging.debug(
                'Cannot determine non-root UID, will not change font'
            )
            return

        schema = 'org.gnome.desktop.interface'

        desktops = self.controller.get_user_env(
            'XDG_CURRENT_DESKTOP', ''
        ).split(':')

        # Some flavors use other schemas for the desktop font.
        if 'MATE' in desktops or 'UKUI' in desktops:
            schema = 'org.mate.interface'
        elif 'X-Cinnamon' in desktops:
            schema = 'org.cinnamon.desktop.interface'

        # Some flavors lack the systemd integration needed for a
        # user service, so we create an autostart file instead.
        use_autostart = bool(
            set(['Budgie', 'LXQt', 'MATE', 'UKUI', 'X-Cinnamon', 'XFCE'])
            & set(desktops)
        )

        r = self.controller.run_as_user(
            ['/usr/bin/gsettings', 'get',
             f'{schema}', 'font-name'],
            stdout=subprocess.PIPE,
            encoding='utf-8',
        )

        (font, _, size) = r.stdout.strip('\'\n').rpartition(' ')
        font = font or 'Ubuntu'
        try:
            int(size)
        except ValueError:
            size = '11'

        logging.debug(f'Setting generic font {temp_font} {size} during the '
                      f'upgrade. Original font is {font} {size}.')

        r = self.controller.run_as_user([
            '/usr/bin/gsettings', 'set', f'{schema}',
            'font-name', f'"{temp_font} {size}"'
        ])
        if r.returncode != 0:
            logging.debug(f'Failed to change font to {temp_font} {size}')
            return

        self._did_change_font = True

        # Touch a file to indiate that the font should be restored on the next
        # boot.
        need_font_restore_file = os.path.join(
            self.controller.get_user_home(),
            '.config/upgrade-need-font-restore'
        )
        os.makedirs(os.path.dirname(need_font_restore_file), exist_ok=True)
        pathlib.Path(need_font_restore_file).touch(mode=0o666)
        os.chown(
            need_font_restore_file,
            self.controller.get_user_uid(),
            self.controller.get_user_gid(),
        )

        if use_autostart:
            autostart_file = '/etc/xdg/autostart/upgrade-restore-font.desktop'
            os.makedirs(os.path.dirname(autostart_file), exist_ok=True)
            flag = '$HOME/.config/upgrade-need-font-restore'
            with open(autostart_file, 'w') as f:
                f.write(
                    '[Desktop Entry]\n'
                    'Name=Restore font after upgrade\n'
                    'Comment=Auto-generated by ubuntu-release-upgrader\n'
                    'Type=Application\n'
                    f'Exec=sh -c \'if [ -e "{flag}" ]; then gsettings set '
                    f'{schema} font-name "{font} {size}";'
                    f'rm -f "{flag}"; fi\'\n'
                    'NoDisplay=true\n'
                )
            return

        # If we set the font back to normal before a reboot, the font will
        # still get all messed up. To allow normal usage whether the user
        # reboots immediately or not, create a service that will run only if a
        # ~/.config/upgrade-need-font-restore exists, and then remove that file
        # in ExecStart. This has the effect of creating a one-time service on
        # the next boot.
        unit_file = '/usr/lib/systemd/user/upgrade-restore-font.service'
        os.makedirs(os.path.dirname(unit_file), exist_ok=True)

        with open(unit_file, 'w') as f:
            f.write(
                '# Auto-generated by ubuntu-release-upgrader\n'
                '[Unit]\n'
                'Description=Restore font after upgrade\n'
                'After=graphical-session.target dconf.service\n'
                'ConditionPathExists=%h/.config/upgrade-need-font-restore\n'
                '\n'
                '[Service]\n'
                'Type=oneshot\n'
                'ExecStart=/usr/bin/gsettings set '
                f'{schema} font-name \'{font} {size}\'\n'
                'ExecStart=/usr/bin/rm -f '
                '%h/.config/upgrade-need-font-restore\n'
                '\n'
                '[Install]\n'
                'WantedBy=graphical-session.target\n'
            )

        self.controller.systemctl_as_user(['daemon-reload'])
        r = self.controller.systemctl_as_user(
            ['enable', os.path.basename(unit_file)]
        )

        if r.returncode != 0:
            logging.debug(f'Failed to enable {os.path.basename(unit_file)}. '
                          'Font will not be restored on reboot')

    def _maybe_remove_gpg_wks_server(self):
        """
        Prevent postfix from being unnecessarily installed, and leading to a
        debconf prompt (LP: #2060578).
        """
        # We want to use attributes of the cache that are only exposed in
        # apt_pkg.Cache, not the higher level apt.Cache. Hence we operate on
        # apt.Cache._cache.
        cache = self.controller.cache._cache

        try:
            if not cache['gpg-wks-server'].current_ver:
                # Not installed, nothing to do.
                return

            provides_mta = cache['mail-transport-agent'].provides_list
            installed_mta = [
                ver for _, _, ver in provides_mta
                if ver.parent_pkg.current_ver
            ]
        except KeyError:
            # Something wasn't in the cache, ignore.
            return

        if not any(installed_mta):
            logging.info(
                'No mail-transport-agent installed, '
                'marking gpg-wks-server for removal'
            )

            self.controller.cache['gpg-wks-server'].mark_delete(auto_fix=False)
            apt.ProblemResolver(self.controller.cache).protect(
                self.controller.cache['gpg-wks-server']
            )

    def _test_and_fail_on_tpm_fde(self):
        """
        LP: #2065229
        """
        if (
            os.path.exists('/snap/pc-kernel') and
            'ubuntu-desktop-minimal' in self.controller.cache and
            self.controller.cache['ubuntu-desktop-minimal'].is_installed
        ):
            logging.debug('Detected TPM FDE system')

            di = distro_info.UbuntuDistroInfo()
            version = di.version(self.controller.toDist) or 'next release'

            self._view.error(
                _(
                    f'Sorry, cannot upgrade this system to {version}'
                ),
                _(
                    'Upgrades for desktop systems running TPM FDE are not '
                    'currently supported. '
                    'Please see https://launchpad.net/bugs/2065229 '
                    'for more information.'

                ),
            )
            self.controller.abort()

    def _disable_kdump_tools_on_install(self, cache):
        """Disable kdump-tools if installed during upgrade."""
        if 'kdump-tools' not in cache:
            # Not installed or requested, nothing to do.
            return

        pkg = cache['kdump-tools']

        if pkg.is_installed:
            logging.info("kdump-tools already installed. Not disabling.")
            return
        elif pkg.marked_install:
            logging.info("installing kdump-tools due to upgrade. Disabling.")
            proc = subprocess.run(
                (
                    'echo "kdump-tools kdump-tools/use_kdump boolean false"'
                    ' | debconf-set-selections'
                ),
                shell=True,
            )
            ret_code = proc.returncode
            if ret_code != 0:
                logging.debug(
                    (
                         "kdump-tools debconf-set-selections "
                         f"returned: {ret_code}"
                    )
                )

    def _install_linux_sysctl_defaults(self):
        """ LP: #2089759 """

        if self.controller.fromDist != 'oracular':
            return

        if (
            'linux-sysctl-defaults' in self.controller.cache and
            not self.controller.cache['linux-sysctl-defaults'].is_installed
        ):
            logging.debug('Installing linux-sysctl-defaults')

            self.controller.cache['linux-sysctl-defaults'].mark_install(
                auto_fix=False
            )

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
DistUpgradeApport.py File 4.91 KB 0644
DistUpgradeCache.py File 50.34 KB 0644
DistUpgradeConfigParser.py File 3.33 KB 0644
DistUpgradeController.py File 91.25 KB 0644
DistUpgradeFetcher.py File 5.85 KB 0644
DistUpgradeFetcherCore.py File 9.56 KB 0644
DistUpgradeFetcherKDE.py File 8.26 KB 0644
DistUpgradeGettext.py File 2.98 KB 0644
DistUpgradeMain.py File 8.78 KB 0644
DistUpgradeQuirks.py File 46.62 KB 0644
DistUpgradeVersion.py File 21 B 0644
DistUpgradeView.py File 16.1 KB 0644
DistUpgradeViewGtk3.py File 31.96 KB 0644
DistUpgradeViewKDE.py File 37.51 KB 0644
DistUpgradeViewNonInteractive.py File 13 KB 0644
DistUpgradeViewText.py File 9.8 KB 0644
GtkProgress.py File 3.93 KB 0644
MetaRelease.py File 16.99 KB 0644
QUrlOpener.py File 2.87 KB 0644
ReleaseNotesViewer.py File 7.27 KB 0644
ReleaseNotesViewerWebkit.py File 2.52 KB 0644
SimpleGtk3builderApp.py File 2.01 KB 0644
SimpleGtkbuilderApp.py File 1.99 KB 0644
__init__.py File 0 B 0644
apt_btrfs_snapshot.py File 12.68 KB 0644
dist-upgrade.py File 129 B 0644
telemetry.py File 3.33 KB 0644
utils.py File 17.37 KB 0644
xorg_fix_proprietary.py File 3.99 KB 0644
Filemanager