__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

[email protected]: ~ $
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2002 Ben Escoto <[email protected]>
# Copyright 2007 Kenneth Loafman <[email protected]>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

"""
Miscellaneous utilities.
"""

import atexit
import csv
import errno
import time

import fasteners
import multiprocessing
import json
import os
import socket
import sys
import traceback
from io import StringIO

from duplicity import config
from duplicity import log
from duplicity import dup_tarfile


def exception_traceback(limit=50):
    """
    @return A string representation in typical Python format of the
            currently active/raised exception.
    """
    type, value, tb = sys.exc_info()  # pylint: disable=redefined-builtin

    lines = traceback.format_tb(tb, limit)
    lines.extend(traceback.format_exception_only(type, value))

    msg = "Traceback (innermost last):\n"
    msg = msg + "%-20s %s" % (str.join("", lines[:-1]), lines[-1])

    return msg


def escape(string):
    """Convert a (bytes) filename to a format suitable for logging (quoted utf8)"""
    string = os.fsdecode(string).encode("unicode-escape", "replace")
    return "'%s'" % string.decode("utf8", "replace").replace("'", "\\x27")


def uindex(index):
    """Convert an index (a tuple of path parts) to unicode for printing"""
    if index:
        return os.path.join(*list(map(os.fsdecode, index)))
    else:
        return "."


def uexc(e):
    """Returns the exception message in Unicode"""
    # Exceptions in duplicity often have path names in them, which if they are
    # non-ascii will cause a UnicodeDecodeError when implicitly decoding to
    # unicode.  So we decode manually, using the filesystem encoding.
    # 99.99% of the time, this will be a fine encoding to use.
    if e and e.args:
        # Find arg that is a string
        for m in e.args:
            if isinstance(m, str):
                # Already unicode
                return m
            elif isinstance(m, bytes):
                # Encoded, likely in filesystem encoding
                return os.fsdecode(m)
        # If the function did not return yet, we did not
        # succeed in finding a string; return the whole message.
        return str(e)
    else:
        return ""


def maybe_ignore_errors(fn):
    """
    Execute fn. If the global configuration setting ignore_errors is
    set to True, catch errors and log them but do continue (and return
    None).

    @param fn: A callable.
    @return Whatever fn returns when called, or None if it failed and ignore_errors is true.
    """
    try:
        return fn()
    except Exception as e:
        if config.ignore_errors:
            log.Warn(_("IGNORED_ERROR: Warning: ignoring error as requested: %s: %s") % (e.__class__.__name__, uexc(e)))
            return None
        else:
            raise


class BlackHoleList(list):
    def append(self, x):
        pass


class FakeTarFile(object):
    debug = 0

    def __iter__(self):
        return iter([])

    def close(self):
        pass


def make_tarfile(mode, fp):
    # We often use 'empty' tarfiles for signatures that haven't been filled out
    # yet.  So we want to ignore ReadError exceptions, which are used to signal
    # this.
    try:
        tf = dup_tarfile.TarFile("arbitrary", mode, fp)
        # Now we cause TarFile to not cache TarInfo objects.  It would end up
        # consuming a lot of memory over the lifetime of our long-lasting
        # signature files otherwise.
        tf.members = BlackHoleList()
        return tf
    except dup_tarfile.ReadError:
        return FakeTarFile()


def get_tarinfo_name(ti):
    # Python versions before 2.6 ensure that directories end with /, but 2.6
    # and later ensure they they *don't* have /.  ::shrug::  Internally, we
    # continue to use pre-2.6 method.
    if ti.isdir() and not ti.name.endswith(r"/"):
        return f"{ti.name}/"
    else:
        return ti.name


def ignore_missing(fn, filename):
    """
    Execute fn on filename.  Ignore ENOENT errors, otherwise raise exception.

    @param fn: callable
    @param filename: string
    """
    try:
        fn(filename)
    except OSError as ex:
        if ex.errno == errno.ENOENT:
            pass
        else:
            raise


def acquire_lockfile():
    config.lockpath = os.path.join(config.archive_dir_path.name, b"lockfile")
    config.lockfile = fasteners.process_lock.InterProcessLock(config.lockpath)
    log.Log(
        _("Acquiring lockfile %s") % os.fsdecode(config.lockpath),
        log.DEBUG,
    )
    if not config.lockfile.acquire(blocking=False):
        log.FatalError(
            f"Another duplicity instance is already running with this archive directory\n"
            f"If this is not the case, remove {config.lockpath}'.",
            log.ErrorCode.user_error,
        )


@atexit.register
def release_lockfile():
    if config.lockfile is not None:
        log.Log(
            _("Releasing lockfile %s") % os.fsdecode(config.lockpath),
            log.DEBUG,
        )
        try:
            config.lockfile.release()
            os.remove(config.lockpath)
            config.lockfile = None
            config.lockpath = ""
        except Exception as e:
            log.Error(f"Could not release lockfile: {str(e)}")
            pass


def copyfileobj(infp, outfp, byte_count=-1):
    """Copy byte_count bytes from infp to outfp, or all if byte_count < 0

    Returns the number of bytes actually written (may be less than
    byte_count if find eof.  Does not close either fileobj.

    """
    blocksize = 64 * 1024
    bytes_written = 0
    if byte_count < 0:
        while True:
            buf = infp.read(blocksize)
            if not buf:
                break
            bytes_written += len(buf)
            outfp.write(buf)
    else:
        while bytes_written + blocksize <= byte_count:
            buf = infp.read(blocksize)
            if not buf:
                break
            bytes_written += len(buf)
            outfp.write(buf)
        buf = infp.read(byte_count - bytes_written)
        bytes_written += len(buf)
        outfp.write(buf)
    return bytes_written


def which(program):
    """
    Return absolute path for program name.
    Returns None if program not found.
    """

    def is_exe(fpath):
        return os.path.isfile(fpath) and os.path.isabs(fpath) and os.access(fpath, os.X_OK)

    fpath, fname = os.path.split(program)
    if fpath:
        if is_exe(program):
            return program
    else:
        for path in os.getenv("PATH").split(os.pathsep):
            path = path.strip('"')
            exe_file = os.path.abspath(os.path.join(path, program))
            if is_exe(exe_file):
                return exe_file

    return None


def start_debugger():
    def is_port_in_use(host: str, port: int) -> bool:
        """
        Returns true if a port is in use.
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        res = sock.connect_ex((host, port)) == 0
        sock.close()
        return res

    def fix_path():
        """
        in a dev environment the path is screwed so fix it.
        """
        base = sys.path.pop(0)
        base = base.split(os.path.sep)[:-1]
        base = os.path.sep.join(base)
        sys.path.insert(0, base)

    # bring up remote debugger
    def cleanup():
        try:
            os.unlink("/tmp/DEBUG_RUNNING")
            os.unlink("/tmp/critical_section.lock")
        except Exception:
            pass

    if os.environ.get("PYDEVD", None) == "vscode":
        try:
            import debugpy  # pylint: disable=import-error
        except ImportError:
            log.FatalError(
                "Module debugpy must be available for debugging.\n"
                "Don't set 'PYDEVD=vscode'\n"
                "to avoid starting debugpy as debugger."
            )
        host = "127.0.0.1"
        pool_nr = 0
        if "multiprocessing" in sys.modules:
            try:
                pool_nr = multiprocessing.current_process()._identity[0]
                print(f"Debugger for Pool Member: {pool_nr}")
            except IndexError:
                print("Debugger not in pool process.")
        port = 5678 + pool_nr

        while is_port_in_use(host, port):
            port += 1
        print(f"Debugger start on port {port}")
        debugpy.listen(port)
        print(f"Waiting for debugger attach on port: {port}")
        debugpy.wait_for_client()
        fix_path()

    elif config.pydevd or os.environ.get("PYDEVD", None):
        try:
            import pydevd_pycharm  # pylint: disable=import-error
        except ImportError:
            log.FatalError(
                "Module pydevd_pycharm must be available for debugging.\n"
                "Remove '--pydevd' from command line and unset 'PYDEVD'\n"
                "from the environment to avoid starting the debugger."
            )

        # NOTE: this needs to be customized for your system
        debug_host = "localhost"
        debug_port = 6700

        # begin critical
        critical_section = fasteners.InterProcessLock("/tmp/critical_section.lock")
        critical_section.acquire()

        # get previous pid:port if any, return if pid the same as ours
        try:
            debug_running = open("/tmp/DEBUG_RUNNING", "r").read().strip()
        except IOError as e:
            debug_running = False

        # get previous pid and port
        if debug_running:
            prev_pid, prev_port = list(map(int, debug_running.split(":")))
            debug_port = prev_port + 1
        else:
            prev_pid, prev_port = None, None

        # if not new pid return
        if prev_pid == os.getpid():
            return

        # ignition
        try:
            pydevd_pycharm.settrace(
                debug_host,
                port=debug_port,
                suspend=False,
                stdoutToServer=True,
                stderrToServer=True,
                # patch_multiprocessing=True,
            )
            log.Info(f"Connection {debug_host}:{debug_port} accepted for debug.")
        except ConnectionRefusedError as e:
            log.Info(f"Connection {debug_host}:{debug_port} refused for debug: {str(e)}")

        # save our state in file
        try:
            open("/tmp/DEBUG_RUNNING", "w").write(f"{os.getpid()}:{debug_port}")
        except IOError as e:
            raise

        # release critical section
        critical_section.release()

        # clean created files
        atexit.register(cleanup)

        fix_path()


def merge_dicts(*dict_args):
    """
    Given any number of dictionaries, shallow copy and merge into a new dict,
    precedence goes to key-value pairs in latter dictionaries.
    """
    result = {}
    for dictionary in dict_args:
        result.update(dictionary)
    return result


def csv_args_to_dict(arg):
    """
    Given the string arg in single line csv format, split into pairs (key, val)
    and produce a dictionary from those key:val pairs.
    """
    mydict = {}
    with StringIO(arg) as infile:
        rows = csv.reader(infile)
        for row in rows:
            for i in range(0, len(row), 2):
                mydict[row[i]] = row[i + 1]
    return mydict


class BytesEncoder(json.JSONEncoder):
    """
    JSON doesn't allow byte type values. Converting them to unicode strings
    """

    def default(self, obj):
        if isinstance(obj, bytes):
            return obj.decode()
        return json.JSONEncoder.default(self, obj)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
backends Folder 0755
__init__.py File 1.01 KB 0644
__main__.py File 3.94 KB 0644
_librsync.cpython-313-x86_64-linux-gnu.so File 19.75 KB 0644
_librsyncmodule.c File 14.17 KB 0644
argparse311.py File 94.51 KB 0644
backend.py File 28.98 KB 0644
backend_pool.py File 13.88 KB 0644
cached_ops.py File 1.23 KB 0644
cli_data.py File 34.92 KB 0644
cli_main.py File 11.43 KB 0644
cli_util.py File 14.46 KB 0644
config.py File 11.95 KB 0644
diffdir.py File 25.33 KB 0644
dup_collections.py File 48.53 KB 0644
dup_main.py File 65.38 KB 0644
dup_tarfile.py File 1.23 KB 0644
dup_temp.py File 8.17 KB 0644
dup_time.py File 10.14 KB 0644
errors.py File 2.66 KB 0644
file_naming.py File 16.33 KB 0644
filechunkio.py File 2.56 KB 0644
globmatch.py File 7.36 KB 0644
gpg.py File 17.35 KB 0644
gpginterface.py File 22.41 KB 0644
lazy.py File 14.6 KB 0644
librsync.py File 8.3 KB 0644
log.py File 15.78 KB 0644
manifest.py File 17.54 KB 0644
patchdir.py File 21.26 KB 0644
path.py File 27.92 KB 0644
progress.py File 13.5 KB 0644
robust.py File 2.67 KB 0644
selection.py File 28.74 KB 0644
statistics.py File 15.45 KB 0644
tempdir.py File 10.51 KB 0644
util.py File 11.96 KB 0644
Filemanager