# DistUpgradeQuirks.py
#
# Copyright (c) 2004-2010 Canonical
#
# Author: Michael Vogt <[email protected]>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
import apt
import atexit
import distro_info
import glob
import logging
import os
import re
import subprocess
import pathlib
from subprocess import PIPE, Popen
from .DistUpgradeGettext import gettext as _
class DistUpgradeQuirks(object):
"""
This class collects the various quirks handlers that can
be hooked into to fix/work around issues that the individual
releases have.
The following handlers are supported:
- PreCacheOpen: run *before* the apt cache is opened the first time
to set options that affect the cache
- PostInitialUpdate: run *before* the sources.list is rewritten but
after an initial apt-get update
- PreDistUpgradeCache: run *right before* the dist-upgrade is
calculated in the cache
- PostDistUpgradeCache: run *after* the dist-upgrade was calculated
in the cache
- StartUpgrade: before the first package gets installed (but the
download is finished)
- PostUpgrade: run *after* the upgrade is finished successfully and
packages got installed
- PostCleanup: run *after* the cleanup (orphaned etc) is finished
"""
def __init__(self, controller, config):
self.controller = controller
self._view = controller._view
self.config = config
self.uname = Popen(["uname", "-r"], stdout=PIPE,
universal_newlines=True).communicate()[0].strip()
self.extra_snap_space = 0
self._poke = None
self._snapstore_reachable = False
self._snap_list = None
self._did_change_font = False
# individual quirks handler that run *before* the cache is opened
def PreCacheOpen(self):
""" run before the apt cache is opened the first time """
# we do not run any quirks in partialUpgrade mode
if self.controller._partialUpgrade:
logging.info("not running quirks in partialUpgrade mode")
return
logging.debug("running Quirks.PreCacheOpen")
self._add_apport_ignore_list()
# individual quirks handler that run *after* the cache is opened
def PostInitialUpdate(self):
# PreCacheOpen would be better but controller.abort fails terribly
""" run after the apt cache is opened the first time """
# we do not run any quirks in partialUpgrade mode
if self.controller._partialUpgrade:
logging.info("not running quirks in partialUpgrade mode")
return
logging.debug("running Quirks.PostInitialUpdate")
self._test_and_fail_on_tpm_fde()
cache = self.controller.cache
self._test_and_warn_if_ros_installed(cache)
self._maybe_prevent_flatpak_auto_removal()
if 'snapd' not in cache:
logging.debug("package required for Quirk not in cache")
return
if cache['snapd'].is_installed and \
(os.path.exists('/run/snapd.socket') or
os.path.exists('/run/snapd-snap.socket')):
self._checkStoreConnectivity()
# If the snap store is accessible, at the same time calculate the
# extra size needed by to-be-installed snaps. This also prepares
# the snaps-to-install list for the actual upgrade.
if self._snapstore_reachable:
self._calculateSnapSizeRequirements()
def PostUpgrade(self):
# we do not run any quirks in partialUpgrade mode
if self.controller._partialUpgrade:
logging.info("not running quirks in partialUpgrade mode")
return
logging.debug("running Quirks.PostUpgrade")
cache = self.controller.cache
if 'snapd' not in cache:
logging.debug("package required for Quirk not in cache")
return
if cache['snapd'].is_installed and \
self._snap_list:
self._replaceDebsAndSnaps()
if 'ubuntu-desktop-raspi' in cache:
if cache['ubuntu-desktop-raspi'].is_installed:
self._replace_fkms_overlay()
if 'ubuntu-server-raspi' in cache:
if cache['ubuntu-server-raspi'].is_installed:
self._add_kms_overlay()
# individual quirks handler when the dpkg run is finished ---------
def PostCleanup(self):
# we do not run any quirks in partialUpgrade mode
if self.controller._partialUpgrade:
logging.info("not running quirks in partialUpgrade mode")
return
" run after cleanup "
logging.debug("running Quirks.PostCleanup")
self._remove_apport_ignore_list()
# Try to refresh snaps, but ignore errors.
try:
subprocess.check_call(['snap', 'refresh'])
except Exception as e:
logging.debug(f'Failed to refresh snaps : {e}')
# run right before the first packages get installed
def StartUpgrade(self):
# we do not run any quirks in partialUpgrade mode
if self.controller._partialUpgrade:
logging.info("not running quirks in partialUpgrade mode")
return
logging.debug("running Quirks.StartUpgrade")
cache = self.controller.cache
self._removeOldApportCrashes()
self._killUpdateNotifier()
self._pokeScreensaver()
self._set_generic_font()
self._disable_kdump_tools_on_install(cache)
# individual quirks handler that run *right before* the dist-upgrade
# is calculated in the cache
def PreDistUpgradeCache(self):
""" run right before calculating the dist-upgrade """
# we do not run any quirks in partialUpgrade mode
if self.controller._partialUpgrade:
logging.info("not running quirks in partialUpgrade mode")
return
logging.debug("running Quirks.PreDistUpgradeCache")
self._maybe_remove_gpg_wks_server()
self._install_linux_sysctl_defaults()
# individual quirks handler that run *after* the dist-upgrade was
# calculated in the cache
def PostDistUpgradeCache(self):
""" run after calculating the dist-upgrade """
# we do not run any quirks in partialUpgrade mode
if self.controller._partialUpgrade:
logging.info("not running quirks in partialUpgrade mode")
return
logging.debug("running Quirks.PostDistUpgradeCache")
self._install_linux_metapackage()
def _test_and_warn_if_ros_installed(self, cache):
"""
Test and warn if ROS is installed. A given ROS release only
supports specific Ubuntu releases, and can cause the upgrade
to fail in an overly-cryptic manner.
"""
# These are the root ROS 1 and 2 dependencies as of 07/27/2020
ros_package_patterns = set()
for package_name in (
"catkin",
"rosboost-cfg",
"rosclean",
"ros-environment",
"ros-workspace"):
ros_package_patterns.add(
re.compile(r"ros-[^\-]+-%s" % package_name))
ros_is_installed = False
for pkg in cache:
if ros_is_installed:
break
for pattern in ros_package_patterns:
if pattern.match(pkg.name):
if pkg.is_installed or pkg.marked_install:
ros_is_installed = True
break
if ros_is_installed:
res = self._view.askYesNoQuestion(
_("The Robot Operating System (ROS) is installed"),
_("It appears that ROS is currently installed. Each ROS "
"release is very strict about the versions of Ubuntu "
"it supports, and Ubuntu upgrades can fail if that "
"guidance isn't followed. Before continuing, please "
"either uninstall ROS, or ensure the ROS release you "
"have installed supports the version of Ubuntu to "
"which you're upgrading.\n\n"
"For ROS 1 releases, refer to REP 3:\n"
"https://www.ros.org/reps/rep-0003.html\n\n"
"For ROS 2 releases, refer to REP 2000:\n"
"https://www.ros.org/reps/rep-2000.html\n\n"
"Are you sure you want to continue?"))
if not res:
self.controller.abort()
def _maybe_prevent_flatpak_auto_removal(self):
"""
If flatpak is installed, and there are either active remotes, or
flatpak apps installed, prevent flatpak's auto-removal on upgrade.
"""
prevent_auto_removal = False
if "flatpak" not in self.controller.cache:
return
if not self.controller.cache["flatpak"].is_installed:
return
if not os.path.exists("/usr/bin/flatpak"):
return
for subcmd in ["remotes", "list"]:
r = subprocess.run(
["/usr/bin/flatpak", subcmd],
stdout=subprocess.PIPE
)
if r.stdout.decode("utf-8").strip():
prevent_auto_removal = True
break
logging.debug("flatpak will{}be marked as manually installed"
.format(" " if prevent_auto_removal else " NOT "))
if not prevent_auto_removal:
return
self.controller.cache["flatpak"].mark_auto(auto=False)
for pkg in ("plasma-discover-backend-flatpak",
"gnome-software-plugin-flatpak"):
if pkg not in self.controller.cache:
continue
if not self.controller.cache[pkg].is_installed:
continue
logging.debug("{} will be marked as manually installed"
.format(pkg))
self.controller.cache[pkg].mark_auto(auto=False)
self.controller.cache.commit(
self._view.getAcquireProgress(),
self._view.getInstallProgress(self.controller.cache)
)
def _add_apport_ignore_list(self):
ignore_list = [
'/usr/libexec/tracker-extract-3', # LP: #2012638
'/usr/sbin/update-apt-xapian-index', # LP: #2058227
]
path = '/etc/apport/blacklist.d/upgrade-quirks-ignore-list'
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
for bin in ignore_list:
f.write(f'{bin}\n')
except Exception as e:
logging.debug(f'Failed to create {path}: {e}')
def _remove_apport_ignore_list(self):
path = '/etc/apport/blacklist.d/upgrade-quirks-ignore-list'
try:
os.remove(path)
except Exception as e:
logging.debug(f'Failed to remove {path}: {e}')
def _killUpdateNotifier(self):
"kill update-notifier"
# kill update-notifier now to suppress reboot required
if os.path.exists("/usr/bin/killall"):
logging.debug("killing update-notifier")
subprocess.call(["killall", "-q", "update-notifier"])
def _pokeScreensaver(self):
if (os.path.exists("/usr/bin/xdg-screensaver") and
os.environ.get('DISPLAY')):
logging.debug("setup poke timer for the screensaver")
cmd = "while true;"
cmd += " do /usr/bin/xdg-screensaver reset >/dev/null 2>&1;"
cmd += " sleep 30; done"
try:
self._poke = subprocess.Popen(cmd, shell=True)
atexit.register(self._stopPokeScreensaver)
except (OSError, ValueError):
logging.exception("failed to setup screensaver poke")
def _stopPokeScreensaver(self):
res = False
if self._poke is not None:
try:
self._poke.terminate()
res = self._poke.wait()
except OSError:
logging.exception("failed to stop screensaver poke")
self._poke = None
return res
def _removeOldApportCrashes(self):
" remove old apport crash files and whoopsie control files "
try:
for ext in ['.crash', '.upload', '.uploaded']:
for f in glob.glob("/var/crash/*%s" % ext):
logging.debug("removing old %s file '%s'" % (ext, f))
os.unlink(f)
except Exception as e:
logging.warning("error during unlink of old crash files (%s)" % e)
def _checkStoreConnectivity(self):
""" check for connectivity to the snap store to install snaps"""
res = False
snap_env = os.environ.copy()
snap_env["LANG"] = "C.UTF-8"
try:
connected = Popen(["snap", "debug", "connectivity"], stdout=PIPE,
stderr=PIPE, env=snap_env,
universal_newlines=True).communicate()
except Exception as e:
logging.debug(
f'Failed to check snap store connectivity, assuming none: {e}'
)
self._snapstore_reachable = False
return
if re.search(r"^ \* PASS", connected[0], re.MULTILINE):
self._snapstore_reachable = True
return
# can't connect
elif re.search(r"^ \*.*unreachable", connected[0], re.MULTILINE):
logging.error("No snap store connectivity")
old_lxd_deb_installed = False
cache = self.controller.cache
if 'lxd' in cache:
# epoch 1 is the transitional deb
if cache['lxd'].is_installed and not \
cache['lxd'].candidate.version.startswith("1:"):
logging.error("lxd is installed")
old_lxd_deb_installed = True
if old_lxd_deb_installed:
summary = _("Connection to the Snap Store failed")
msg = _("You have the package lxd installed but your "
"system is unable to reach the Snap Store. "
"lxd is now provided via a snap and the release "
"upgrade will fail if snapd is not functional. "
"Please make sure you're connected to the "
"Internet and update any firewall or proxy "
"settings as needed so that you can reach "
"api.snapcraft.io. If you are an enterprise "
"with a firewall setup you may want to configure "
"a Snap Store proxy."
)
self._view.error(summary, msg)
self.controller.abort()
else:
res = self._view.askYesNoQuestion(
_("Connection to Snap Store failed"),
_("Your system does not have a connection to the Snap "
"Store. For the best upgrade experience make sure "
"that your system can connect to api.snapcraft.io.\n"
"Do you still want to continue with the upgrade?")
)
# debug command not available
elif 'error: unknown command' in connected[1]:
logging.error("snap debug command not available")
res = self._view.askYesNoQuestion(
_("Outdated snapd package"),
_("Your system does not have the latest version of snapd. "
"Please update the version of snapd on your system to "
"improve the upgrade experience.\n"
"Do you still want to continue with the upgrade?")
)
# not running as root
elif 'error: access denied' in connected[1]:
res = False
logging.error("Not running as root!")
else:
logging.error("Unhandled error connecting to the snap store.")
if not res:
self.controller.abort()
def _calculateSnapSizeRequirements(self):
import json
import urllib.request
from urllib.error import URLError
# first fetch the list of snap-deb replacements that will be needed
# and store them for future reference, along with other data we'll
# need in the process
self._prepare_snap_replacement_data()
# now perform direct API calls to the store, requesting size
# information for each of the snaps needing installation
self._view.updateStatus(_("Calculating snap size requirements"))
for snap, snap_object in self._snap_list.items():
if snap_object['command'] != 'install':
continue
action = {
"instance-key": "upgrade-size-check",
"action": "download",
"snap-id": snap_object['snap-id'],
"channel": snap_object['channel'],
}
data = {
"context": [],
"actions": [action],
}
req = urllib.request.Request(
url='https://api.snapcraft.io/v2/snaps/refresh',
data=bytes(json.dumps(data), encoding='utf-8'))
req.add_header('Snap-Device-Series', '16')
req.add_header('Content-type', 'application/json')
req.add_header('Snap-Device-Architecture', self.controller.arch)
try:
response = urllib.request.urlopen(req).read()
info = json.loads(response)
size = int(info['results'][0]['snap']['download']['size'])
except (KeyError, URLError, ValueError):
logging.debug("Failed fetching size of snap %s" % snap)
continue
self.extra_snap_space += size
def _replaceDebsAndSnaps(self):
""" install a snap and mark its corresponding package for removal """
self._view.updateStatus(_("Processing snap replacements"))
# _snap_list should be populated by the earlier
# _calculateSnapSizeRequirements call.
for snap, snap_object in self._snap_list.items():
command = snap_object['command']
if command == 'switch':
channel = snap_object['channel']
self._view.updateStatus(
_("switching channel for snap %s" % snap)
)
popenargs = ["snap", command, "--channel", channel, snap]
elif command == 'remove':
self._view.updateStatus(_("removing snap %s" % snap))
popenargs = ["snap", command, snap]
else:
self._view.updateStatus(_("installing snap %s" % snap))
popenargs = ["snap", command,
"--channel", snap_object['channel'], snap]
try:
self._view.processEvents()
proc = subprocess.run(
popenargs,
stdout=subprocess.PIPE,
check=True)
self._view.processEvents()
except subprocess.CalledProcessError:
logging.debug("%s of snap %s failed" % (command, snap))
continue
if proc.returncode == 0:
logging.debug("%s of snap %s succeeded" % (command, snap))
if command == 'install' and snap_object['deb']:
self.controller.forced_obsoletes.append(snap_object['deb'])
def _is_greater_than(self, term1, term2):
""" copied from ubuntu-drivers common """
# We don't want to take into account
# the flavour
pattern = re.compile('(.+)-([0-9]+)-(.+)')
match1 = pattern.match(term1)
match2 = pattern.match(term2)
if match1 and match2:
term1 = '%s-%s' % (match1.group(1),
match1.group(2))
term2 = '%s-%s' % (match2.group(1),
match2.group(2))
logging.debug('Comparing %s with %s' % (term1, term2))
return apt.apt_pkg.version_compare(term1, term2) > 0
def _get_linux_metapackage(self, cache, headers):
""" Get the linux headers or linux metapackage
copied from ubuntu-drivers-common
"""
suffix = headers and '-headers' or ''
pattern = re.compile('linux-image-(.+)-([0-9]+)-(.+)')
source_pattern = re.compile('linux-(.+)')
metapackage = ''
version = ''
for pkg in cache:
if ('linux-image' in pkg.name and 'extra' not in pkg.name and
(pkg.is_installed or pkg.marked_install)):
match = pattern.match(pkg.name)
# Here we filter out packages such as
# linux-generic-lts-quantal
if match:
source = pkg.candidate.record['Source']
current_version = '%s-%s' % (match.group(1),
match.group(2))
# See if the current version is greater than
# the greatest that we've found so far
if self._is_greater_than(current_version,
version):
version = current_version
match_source = source_pattern.match(source)
# Set the linux-headers metapackage
if '-lts-' in source and match_source:
# This is the case of packages such as
# linux-image-3.5.0-18-generic which
# comes from linux-lts-quantal.
# Therefore the linux-headers-generic
# metapackage would be wrong here and
# we should use
# linux-headers-generic-lts-quantal
# instead
metapackage = 'linux%s-%s-%s' % (
suffix,
match.group(3),
match_source.group(1))
else:
# The scheme linux-headers-$flavour works
# well here
metapackage = 'linux%s-%s' % (
suffix,
match.group(3))
return metapackage
def _install_linux_metapackage(self):
""" Ensure the linux metapackage is installed for the newest_kernel
installed. (LP: #1509305)
"""
cache = self.controller.cache
linux_metapackage = self._get_linux_metapackage(cache, False)
# Seen on errors.u.c with linux-rpi2 metapackage
# https://errors.ubuntu.com/problem/994bf05fae85fbcd44f721495db6518f2d5a126d
if linux_metapackage not in cache:
logging.info("linux metapackage (%s) not available" %
linux_metapackage)
return
# install the package if it isn't installed
if not cache[linux_metapackage].is_installed:
logging.info("installing linux metapackage: %s" %
linux_metapackage)
reason = "linux metapackage may have been accidentally uninstalled"
cache.mark_install(linux_metapackage, reason)
def ensure_recommends_are_installed_on_desktops(self):
""" ensure that on a desktop install recommends are installed
(LP: #759262)
"""
if not self.controller.serverMode:
if not apt.apt_pkg.config.find_b("Apt::Install-Recommends"):
msg = "Apt::Install-Recommends was disabled,"
msg += " enabling it just for the upgrade"
logging.warning(msg)
apt.apt_pkg.config.set("Apt::Install-Recommends", "1")
def _is_deb2snap_metapkg_installed(self, deb2snap_entry):
""" Helper function that checks if the given deb2snap entry
has at least one metapkg which is installed on the system.
"""
metapkg_list = deb2snap_entry.get("metapkg", None)
if not isinstance(metapkg_list, list):
metapkg_list = [metapkg_list]
for metapkg in metapkg_list:
if metapkg not in self.controller.cache:
continue
if metapkg and \
self.controller.cache[metapkg].is_installed is False:
continue
return True
return False
def _parse_deb2snap_json(self):
import json
seeded_snaps = {}
unseeded_snaps = {}
try:
deb2snap_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'deb2snap.json'
)
with open(deb2snap_path, 'r') as f:
d2s = json.loads(f.read())
for snap in d2s["seeded"]:
seed = d2s["seeded"][snap]
if not self._is_deb2snap_metapkg_installed(seed):
continue
deb = seed.get("deb", None)
# Support strings like stable/ubuntu-{FROM_VERSION} in
# deb2snap.json so that (a) we don't need to update the file
# for every release, and (b) so that from_channel is not bound
# to only one release.
from_chan = seed.get(
'from_channel',
'stable/ubuntu-{FROM_VERSION}'
).format(
FROM_VERSION=self.controller.fromVersion
)
to_chan = seed.get(
'to_channel',
'stable/ubuntu-{TO_VERSION}'
).format(
TO_VERSION=self.controller.toVersion
)
force_switch = seed.get("force_switch", False)
seeded_snaps[snap] = (deb, from_chan, to_chan, force_switch)
for snap in d2s["unseeded"]:
unseed = d2s["unseeded"][snap]
deb = unseed.get("deb", None)
if not self._is_deb2snap_metapkg_installed(unseed):
continue
from_chan = seed.get(
'from_channel',
'stable/ubuntu-{FROM_VERSION}'
).format(
FROM_VERSION=self.controller.fromVersion
)
unseeded_snaps[snap] = (deb, from_chan)
except Exception as e:
logging.warning("error reading deb2snap.json file (%s)" % e)
return seeded_snaps, unseeded_snaps
def _prepare_snap_replacement_data(self):
""" Helper function fetching all required info for the deb-to-snap
migration: version strings for upgrade (from and to) and the list
of snaps (with actions).
"""
self._snap_list = {}
seeded_snaps, unseeded_snaps = self._parse_deb2snap_json()
snap_list = ''
# list the installed snaps and add them to seeded ones
snap_list = subprocess.Popen(["snap", "list"],
universal_newlines=True,
stdout=subprocess.PIPE).communicate()
if snap_list:
# first line of output is a header and the last line is empty
snaps_installed = [line.split()[0]
for line in snap_list[0].split('\n')[1:-1]]
for snap in snaps_installed:
if snap in seeded_snaps or snap in unseeded_snaps:
continue
else:
seeded_snaps[snap] = (
None,
f'stable/ubuntu-{self.controller.fromVersion}',
f'stable/ubuntu-{self.controller.toVersion}',
False
)
self._view.updateStatus(_("Checking for installed snaps"))
for snap, props in seeded_snaps.items():
(deb, from_channel, to_channel, force_switch) = props
snap_object = {}
# check to see if the snap is already installed
snap_info = subprocess.Popen(["snap", "info", snap],
universal_newlines=True,
stdout=subprocess.PIPE).communicate()
self._view.processEvents()
if re.search(r"^installed: ", snap_info[0], re.MULTILINE):
logging.debug("Snap %s is installed" % snap)
if not re.search(r"^tracking:.*%s" % from_channel,
snap_info[0], re.MULTILINE):
logging.debug("Snap %s is not tracking the release channel"
% snap)
if not force_switch:
# It is not tracking the release channel, and we were
# not told to force so don't switch.
continue
snap_object['command'] = 'switch'
else:
# Do not replace packages not installed
cache = self.controller.cache
if (deb and (deb not in cache or not cache[deb].is_installed)):
logging.debug("Deb package %s is not installed. Skipping "
"snap package %s installation" % (deb, snap))
continue
match = re.search(r"snap-id:\s*(\w*)", snap_info[0])
if not match:
logging.debug("Could not parse snap-id for the %s snap"
% snap)
continue
snap_object['command'] = 'install'
snap_object['deb'] = deb
snap_object['snap-id'] = match[1]
snap_object['channel'] = to_channel
self._snap_list[snap] = snap_object
for snap, (deb, from_channel) in unseeded_snaps.items():
snap_object = {}
# check to see if the snap is already installed
snap_info = subprocess.Popen(["snap", "info", snap],
universal_newlines=True,
stdout=subprocess.PIPE).communicate()
self._view.processEvents()
if re.search(r"^installed: ", snap_info[0], re.MULTILINE):
logging.debug("Snap %s is installed" % snap)
# its not tracking the release channel so don't remove
if not re.search(r"^tracking:.*%s" % from_channel,
snap_info[0], re.MULTILINE):
logging.debug("Snap %s is not tracking the release channel"
% snap)
continue
snap_object['command'] = 'remove'
# check if this snap is being used by any other snaps
conns = subprocess.Popen(["snap", "connections", snap],
universal_newlines=True,
stdout=subprocess.PIPE).communicate()
self._view.processEvents()
for conn in conns[0].split('\n'):
conn_cols = conn.split()
if len(conn_cols) != 4:
continue
plug = conn_cols[1]
slot = conn_cols[2]
if slot.startswith(snap + ':'):
plug_snap = plug.split(':')[0]
if plug_snap != '-' and \
plug_snap not in unseeded_snaps:
logging.debug("Snap %s is being used by %s. "
"Switching it to stable track"
% (snap, plug_snap))
snap_object['command'] = 'switch'
snap_object['channel'] = 'stable'
break
self._snap_list[snap] = snap_object
return self._snap_list
def _replace_pi_boot_config(self, old_config, new_config,
boot_config_filename, failure_action):
try:
boot_backup_filename = boot_config_filename + '.distUpgrade'
with open(boot_backup_filename, 'w', encoding='utf-8') as f:
f.write(old_config)
except IOError as exc:
logging.error("unable to write boot config backup to %s: %s; %s",
boot_backup_filename, exc, failure_action)
return
try:
with open(boot_config_filename, 'w', encoding='utf-8') as f:
f.write(new_config)
except IOError as exc:
logging.error("unable to write new boot config to %s: %s; %s",
boot_config_filename, exc, failure_action)
def _replace_fkms_overlay(self, boot_dir='/boot/firmware'):
failure_action = (
"You may need to replace the vc4-fkms-v3d overlay with "
"vc4-kms-v3d in config.txt on your boot partition")
try:
boot_config_filename = os.path.join(boot_dir, 'config.txt')
with open(boot_config_filename, 'r', encoding='utf-8') as f:
boot_config = f.read()
except FileNotFoundError:
logging.error("failed to open boot configuration in %s; %s",
boot_config_filename, failure_action)
return
new_config = ''.join(
# startswith and replace used to cope with (and preserve) any
# trailing d-t parameters, and any use of the -pi4 suffix
'# changed by do-release-upgrade (LP: #1923673)\n#' + line +
line.replace('dtoverlay=vc4-fkms-v3d', 'dtoverlay=vc4-kms-v3d')
if line.startswith('dtoverlay=vc4-fkms-v3d') else
# camera firmware disabled due to incompatibility with "full" kms
# overlay; without the camera firmware active it's also better to
# disable gpu_mem leaving the default (64MB) to allow as much as
# possible for the KMS driver
'# disabled by do-release-upgrade (LP: #1923673)\n#' + line
if line.startswith('gpu_mem=') or line.rstrip() == 'start_x=1' else
line
for line in boot_config.splitlines(keepends=True)
)
if new_config == boot_config:
logging.warning("no fkms overlay or camera firmware line found "
"in %s", boot_config_filename)
return
self._replace_pi_boot_config(
boot_config, new_config, boot_config_filename, failure_action)
def _add_kms_overlay(self, boot_dir='/boot/firmware'):
failure_action = (
"You may need to add dtoverlay=vc4-kms-v3d to an [all] section "
"in config.txt on your boot partition")
added_lines = [
'# added by do-release-upgrade (LP: #2065051)',
'dtoverlay=vc4-kms-v3d',
'disable_fw_kms_setup=1',
'',
'[pi3+]',
'dtoverlay=vc4-kms-v3d,cma-128',
'',
'[pi02]',
'dtoverlay=vc4-kms-v3d,cma-128',
'',
'[all]',
]
try:
boot_config_filename = os.path.join(boot_dir, 'config.txt')
with open(boot_config_filename, 'r', encoding='utf-8') as f:
boot_config = f.read()
except FileNotFoundError:
logging.error("failed to open boot configuration in %s; %s",
boot_config_filename, failure_action)
return
def find_insertion_point(lines):
# Returns the zero-based index of the dtoverlay=vc4-kms-v3d line in
# an [all] section, if one exists, or the last line of the last
# [all] section of the file, if one does not exist
in_all = True
last = 0
for index, line in enumerate(lines):
line = line.rstrip()
if in_all:
last = index
# startswith used to cope with any trailing dtparams
if line.startswith('dtoverlay=vc4-kms-v3d'):
return last
elif line.startswith('[') and line.endswith(']'):
in_all = line == '[all]'
elif line.startswith('include '):
# [sections] are included from includes verbatim, hence
# (without reading the included file) we must assume
# we're no longer in an [all] section
in_all = False
else:
in_all = line == '[all]'
return last
def add_kms_overlay(lines):
insert_point = find_insertion_point(lines)
try:
if lines[insert_point].startswith('dtoverlay=vc4-kms-v3d'):
return lines
except IndexError:
# Empty config, apparently!
pass
lines[insert_point:insert_point] = added_lines
return lines
lines = [line.rstrip() for line in boot_config.splitlines()]
lines = add_kms_overlay(lines)
new_config = ''.join(line + '\n' for line in lines)
if new_config == boot_config:
logging.warning("no addition of KMS overlay required in %s",
boot_config_filename)
return
self._replace_pi_boot_config(
boot_config, new_config, boot_config_filename, failure_action)
def _set_generic_font(self):
""" Due to changes to the Ubuntu font we enable a generic font
(in practice DejaVu or Noto) during the upgrade.
See https://launchpad.net/bugs/2034986
"""
temp_font = 'Sans'
if self._did_change_font:
return
if self.controller.get_user_env('XDG_SESSION_TYPE', '') in ('', 'tty'):
# Avoid running this on server systems or when the upgrade
# is done over ssh.
return
if self.controller.get_user_uid() is None:
logging.debug(
'Cannot determine non-root UID, will not change font'
)
return
schema = 'org.gnome.desktop.interface'
desktops = self.controller.get_user_env(
'XDG_CURRENT_DESKTOP', ''
).split(':')
# Some flavors use other schemas for the desktop font.
if 'MATE' in desktops or 'UKUI' in desktops:
schema = 'org.mate.interface'
elif 'X-Cinnamon' in desktops:
schema = 'org.cinnamon.desktop.interface'
# Some flavors lack the systemd integration needed for a
# user service, so we create an autostart file instead.
use_autostart = bool(
set(['Budgie', 'LXQt', 'MATE', 'UKUI', 'X-Cinnamon', 'XFCE'])
& set(desktops)
)
r = self.controller.run_as_user(
['/usr/bin/gsettings', 'get',
f'{schema}', 'font-name'],
stdout=subprocess.PIPE,
encoding='utf-8',
)
(font, _, size) = r.stdout.strip('\'\n').rpartition(' ')
font = font or 'Ubuntu'
try:
int(size)
except ValueError:
size = '11'
logging.debug(f'Setting generic font {temp_font} {size} during the '
f'upgrade. Original font is {font} {size}.')
r = self.controller.run_as_user([
'/usr/bin/gsettings', 'set', f'{schema}',
'font-name', f'"{temp_font} {size}"'
])
if r.returncode != 0:
logging.debug(f'Failed to change font to {temp_font} {size}')
return
self._did_change_font = True
# Touch a file to indiate that the font should be restored on the next
# boot.
need_font_restore_file = os.path.join(
self.controller.get_user_home(),
'.config/upgrade-need-font-restore'
)
os.makedirs(os.path.dirname(need_font_restore_file), exist_ok=True)
pathlib.Path(need_font_restore_file).touch(mode=0o666)
os.chown(
need_font_restore_file,
self.controller.get_user_uid(),
self.controller.get_user_gid(),
)
if use_autostart:
autostart_file = '/etc/xdg/autostart/upgrade-restore-font.desktop'
os.makedirs(os.path.dirname(autostart_file), exist_ok=True)
flag = '$HOME/.config/upgrade-need-font-restore'
with open(autostart_file, 'w') as f:
f.write(
'[Desktop Entry]\n'
'Name=Restore font after upgrade\n'
'Comment=Auto-generated by ubuntu-release-upgrader\n'
'Type=Application\n'
f'Exec=sh -c \'if [ -e "{flag}" ]; then gsettings set '
f'{schema} font-name "{font} {size}";'
f'rm -f "{flag}"; fi\'\n'
'NoDisplay=true\n'
)
return
# If we set the font back to normal before a reboot, the font will
# still get all messed up. To allow normal usage whether the user
# reboots immediately or not, create a service that will run only if a
# ~/.config/upgrade-need-font-restore exists, and then remove that file
# in ExecStart. This has the effect of creating a one-time service on
# the next boot.
unit_file = '/usr/lib/systemd/user/upgrade-restore-font.service'
os.makedirs(os.path.dirname(unit_file), exist_ok=True)
with open(unit_file, 'w') as f:
f.write(
'# Auto-generated by ubuntu-release-upgrader\n'
'[Unit]\n'
'Description=Restore font after upgrade\n'
'After=graphical-session.target dconf.service\n'
'ConditionPathExists=%h/.config/upgrade-need-font-restore\n'
'\n'
'[Service]\n'
'Type=oneshot\n'
'ExecStart=/usr/bin/gsettings set '
f'{schema} font-name \'{font} {size}\'\n'
'ExecStart=/usr/bin/rm -f '
'%h/.config/upgrade-need-font-restore\n'
'\n'
'[Install]\n'
'WantedBy=graphical-session.target\n'
)
self.controller.systemctl_as_user(['daemon-reload'])
r = self.controller.systemctl_as_user(
['enable', os.path.basename(unit_file)]
)
if r.returncode != 0:
logging.debug(f'Failed to enable {os.path.basename(unit_file)}. '
'Font will not be restored on reboot')
def _maybe_remove_gpg_wks_server(self):
"""
Prevent postfix from being unnecessarily installed, and leading to a
debconf prompt (LP: #2060578).
"""
# We want to use attributes of the cache that are only exposed in
# apt_pkg.Cache, not the higher level apt.Cache. Hence we operate on
# apt.Cache._cache.
cache = self.controller.cache._cache
try:
if not cache['gpg-wks-server'].current_ver:
# Not installed, nothing to do.
return
provides_mta = cache['mail-transport-agent'].provides_list
installed_mta = [
ver for _, _, ver in provides_mta
if ver.parent_pkg.current_ver
]
except KeyError:
# Something wasn't in the cache, ignore.
return
if not any(installed_mta):
logging.info(
'No mail-transport-agent installed, '
'marking gpg-wks-server for removal'
)
self.controller.cache['gpg-wks-server'].mark_delete(auto_fix=False)
apt.ProblemResolver(self.controller.cache).protect(
self.controller.cache['gpg-wks-server']
)
def _test_and_fail_on_tpm_fde(self):
"""
LP: #2065229
"""
if (
os.path.exists('/snap/pc-kernel') and
'ubuntu-desktop-minimal' in self.controller.cache and
self.controller.cache['ubuntu-desktop-minimal'].is_installed
):
logging.debug('Detected TPM FDE system')
di = distro_info.UbuntuDistroInfo()
version = di.version(self.controller.toDist) or 'next release'
self._view.error(
_(
f'Sorry, cannot upgrade this system to {version}'
),
_(
'Upgrades for desktop systems running TPM FDE are not '
'currently supported. '
'Please see https://launchpad.net/bugs/2065229 '
'for more information.'
),
)
self.controller.abort()
def _disable_kdump_tools_on_install(self, cache):
"""Disable kdump-tools if installed during upgrade."""
if 'kdump-tools' not in cache:
# Not installed or requested, nothing to do.
return
pkg = cache['kdump-tools']
if pkg.is_installed:
logging.info("kdump-tools already installed. Not disabling.")
return
elif pkg.marked_install:
logging.info("installing kdump-tools due to upgrade. Disabling.")
proc = subprocess.run(
(
'echo "kdump-tools kdump-tools/use_kdump boolean false"'
' | debconf-set-selections'
),
shell=True,
)
ret_code = proc.returncode
if ret_code != 0:
logging.debug(
(
"kdump-tools debconf-set-selections "
f"returned: {ret_code}"
)
)
def _install_linux_sysctl_defaults(self):
""" LP: #2089759 """
if self.controller.fromDist != 'oracular':
return
if (
'linux-sysctl-defaults' in self.controller.cache and
not self.controller.cache['linux-sysctl-defaults'].is_installed
):
logging.debug('Installing linux-sysctl-defaults')
self.controller.cache['linux-sysctl-defaults'].mark_install(
auto_fix=False
)