__  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

[email protected]: ~ $
# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2002 Ben Escoto <[email protected]>
# Copyright 2007 Kenneth Loafman <[email protected]>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

import glob
import hashlib
import inspect
import json
import logging
import os
from random import random
import re
import sys
import time

import duplicity.backend
from duplicity import (
    log,
    path,
    progress,
)
from duplicity.errors import BackendException
from testing import _runtest_dir


class BackendErrors:
    # FAIL_WITH_EXCEPTION: set to substring matched by the filename.
    FAIL_WITH_EXCEPTION = "DUP_FAIL_WITH_EXCEPTION"
    # SYSTEM_EXIT: set to substring matched by the filename.
    FAIL_SYSTEM_EXIT = "DUP_FAIL_BY_SYSTEM_EXIT"
    # WAIT_FOR_OTHER_VOLUME: set to json string: ["file_to_delay", "file_wait_for"] (substring match)
    WAIT_FOR_OTHER_VOLUME = "DUP_FAIL_WAIT_FOR_VOLUME"
    # LAST_BYTE_MISSING: set to substring matched by the filename.
    LAST_BYTE_MISSING = "DUP_FAIL_LAST_BYTE_MISSING"
    # SKIP_PUT_SILENT: Don't put file if name contains string
    SKIP_PUT_SILENT = "DUP_FAIL_SKIP_PUT_SILENT"
    # RANDOM_DELAY: max random delay in ms added to command
    DELAY_RANDOM_MS = "DUP_FAIL_DELAY_RANDOM_MS"
    # FIX_DELAY: fix delay in ms added to command
    DELAY_FIX_MS = "DUP_FAIL_DELAY_FIX_MS"


class _TestBackend(duplicity.backend.Backend):
    """Use this backend to test/create certain error situations

    errors get triggered via ENV Vars, see class BackendErrors

    """

    def __init__(self, parsed_url):
        super().__init__(parsed_url)
        log._logger.addHandler(logging.FileHandler(f"{_runtest_dir}/testbackend.log"))
        log.Warn("TestBackend is not made for production use!")
        # The URL form "file:MyFile" is not a valid duplicity target.
        if not parsed_url.path.startswith("//"):
            raise BackendException("Bad file:// path syntax.")
        self.remote_pathdir = path.Path(parsed_url.path[2:])
        try:
            os.makedirs(self.remote_pathdir.base)
        except Exception:
            pass

    @staticmethod
    def _fail_with_exception(filename):
        filename = os.fsdecode(filename)
        log.Debug(f"DUB_FAIL: Check if fail on exception for {filename}. Called by {inspect.stack()[2][3]}.")
        if os.getenv(BackendErrors.FAIL_WITH_EXCEPTION) and os.getenv(BackendErrors.FAIL_WITH_EXCEPTION) in filename:
            log.Error(f"DUB_FAIL: Force exception on file {filename}.")
            raise FileNotFoundError(f"TEST: raised exception on {filename} by intention")

    @staticmethod
    def _fail_by_sys_exit(filename):
        filename = os.fsdecode(filename)
        log.Debug(f"DUB_FAIL: Check if do sys.exit(30) for {filename}. Called by {inspect.stack()[2][3]}.")
        if os.getenv(BackendErrors.FAIL_SYSTEM_EXIT) and os.getenv(BackendErrors.FAIL_SYSTEM_EXIT) in filename:
            log.Error(f"DUB_FAIL: Force sys.exit(30) on file {filename}.")
            time.sleep(1)  # otherwise log message doesn't get printed.
            sys.exit(30)

    def _wait_for_other_volume(self, filename):
        filename = os.fsdecode(filename)
        log.Debug(f"DUB_FAIL: Check if action on {filename} shoud be delayed. Called by {inspect.stack()[2][3]}.")
        env = os.getenv(BackendErrors.WAIT_FOR_OTHER_VOLUME)
        if not env:
            return  # skip if env not set
        file_stop, file_waitfor = json.loads(env)
        timestamp = re.match(r".*\.(\d{8}T\d{6}[-+0-9:Z]*)\..*", filename).group(1)
        file_waitfor_glob = f"{os.fsdecode(self.remote_pathdir.get_canonical())}/*{timestamp}*{file_waitfor}*"
        if file_stop in filename:
            while not glob.glob(file_waitfor_glob):
                log.Error(f"DUB_FAIL: Waiting for file matiching {file_waitfor_glob}.")
                time.sleep(1)
            log.Warn(f"DUB_FAIL: {filename} written after {glob.glob(file_waitfor_glob)}")

    def _remove_last_byte(self, filename):
        filename = os.fsdecode(filename)
        log.Debug(f"DUB_FAIL: Check if {filename} shoud be truncated. Called by {inspect.stack()[2][3]}.")
        if os.getenv(BackendErrors.LAST_BYTE_MISSING) and os.getenv(BackendErrors.LAST_BYTE_MISSING) in filename:
            log.Error(f"DUB_FAIL: removing last byte from {filename}")
            with open(self.remote_pathdir.append(filename).get_canonical(), "ab") as remote_file:
                remote_file.seek(-1, os.SEEK_END)
                remote_file.truncate()

    @staticmethod
    def _skip_put_silent(filename):
        """
        retrun true if file should be skipped silently
        """
        filename = os.fsdecode(filename)
        log.Debug(f"DUB_FAIL: Check if {filename} should be skipped. Called by {inspect.stack()[2][3]}.")
        if os.getenv(BackendErrors.SKIP_PUT_SILENT) and os.getenv(BackendErrors.SKIP_PUT_SILENT) in filename:
            log.Error(f"DUB_FAIL: {filename} skipped silent.")
            return True
        return False

    @staticmethod
    def _delay():
        """
        sleep a random amount of milliseconds if ENV set
        """
        log.Debug("DUB_FAIL: Check if action should be delayed.")
        wait = 0
        if os.getenv(BackendErrors.DELAY_RANDOM_MS):
            wait += random() * float(os.getenv(BackendErrors.DELAY_RANDOM_MS)) / 1000  # type: ignore
        if os.getenv(BackendErrors.DELAY_FIX_MS):
            wait += float(os.getenv(BackendErrors.DELAY_FIX_MS)) / 1000  # type: ignore
        if wait > 0:
            log.Warn(f"DUB_FAIL: wait for {wait} sec.")
            time.sleep(wait)

    def _move(self, source_path, remote_filename):
        self._fail_with_exception(remote_filename)
        self._fail_by_sys_exit(remote_filename)
        self._wait_for_other_volume(remote_filename)
        self._delay()
        target_path = self.remote_pathdir.append(remote_filename)
        try:
            source_path.rename(target_path)
            return True
        except OSError:
            return False

    def _put(self, source_path, remote_filename):
        self._fail_with_exception(remote_filename)
        self._wait_for_other_volume(remote_filename)
        self._delay()
        if self._skip_put_silent(remote_filename):
            return
        target_path = self.remote_pathdir.append(remote_filename)
        source_path.setdata()
        source_size = source_path.getsize()
        progress.report_transfer(0, source_size)
        target_path.writefileobj(source_path.open("rb"))

        self._fail_by_sys_exit(remote_filename)  # fail after file is transferred
        self._remove_last_byte(remote_filename)

        progress.report_transfer(source_size, source_size)

    def _get(self, filename, local_path):
        self._fail_with_exception(filename)
        self._fail_by_sys_exit(filename)
        self._delay()
        source_path = self.remote_pathdir.append(filename)
        local_path.writefileobj(source_path.open("rb"))

    def _list(self):
        return self.remote_pathdir.listdir()

    def _validate(self, remote_filename, size, source_path=None, **kwargs):
        # poc to show that validate can do additional things.
        self._delay()

        self._remove_last_byte(remote_filename)

        results_str = []
        results_bool = []
        try:
            if source_path:
                target_path = self.remote_pathdir.append(remote_filename)
                target_hash = self.__hash_fileobj(target_path.open())
                source_hash = self.__hash_fileobj(source_path.open())
                if target_hash == source_hash:
                    results_str.append(f"file hash {target_hash} matches")
                    results_bool.append(True)
                else:
                    results_str.append(f"expected hash {source_hash} doesn't match file hash {target_hash}")
                    results_bool.append(False)
            if size == self._query(remote_filename)["size"]:
                results_str.append(f"file size {size}")
                results_bool.append(True)
            else:
                results_str.append(
                    f'expected size {size} and file size {self._query(remote_filename)["size"]} don\'t match'
                )
                results_bool.append(False)
        except FileNotFoundError as e:
            results_bool.append(False)
            results_str.append(f"FileNotFoundError: {e}")
        except Exception as e:
            log.FatalError(
                _("Unexpected exception while validate %s.") % os.fsdecode(remote_filename),
                log.ErrorCode.backend_validation_failed,
                extra=f"Exception: {e}",
            )
        return all(results_bool), ", ".join(results_str)

    def _delete(self, filename):
        self._fail_with_exception(filename)
        self._fail_by_sys_exit(filename)
        self._delay()
        self.remote_pathdir.append(filename).delete()

    def _delete_list(self, filenames):
        for filename in filenames:
            self._fail_with_exception(filename)
            self._fail_by_sys_exit(filename)
            self._delay()
            self.remote_pathdir.append(filename).delete()

    def _query(self, filename):
        self._fail_with_exception(filename)
        self._delay()
        self._fail_by_sys_exit(filename)
        target_file = self.remote_pathdir.append(filename)
        target_file.setdata()
        size = target_file.getsize() if target_file.exists() else -1
        return {"size": size}

    def _query_list(self, filename_list):
        return {x: self._query(x) for x in filename_list}

    def _retry_cleanup(self):
        pass

    def _close(self):
        pass

    def _error_code(self, operation, e):  # pylint: disable=unused-argument
        return log.ErrorCode.backend_error

    def __hash_file(self, filename):
        self.__hash_fileobj(open(filename, "rb"))

    def __hash_fileobj(self, fileobj):
        # TODO: Remove when py38 goes EOL
        if sys.version_info[:2] == (3, 8):
            h = hashlib.sha1()
        else:
            h = hashlib.sha1(usedforsecurity=False)
        # loop till the end of the file
        chunk = 0
        while chunk != b"":
            # read only 1024 bytes at a time
            chunk = fileobj.read(1024)
            h.update(chunk)
        fileobj.close()
        # return the hex representation of digest
        return h.hexdigest()


duplicity.backend.register_backend("fortestsonly", _TestBackend)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
pyrax_identity Folder 0755
__init__.py File 1.09 KB 0644
_cf_cloudfiles.py File 3.83 KB 0644
_cf_pyrax.py File 5.02 KB 0644
_testbackend.py File 10.95 KB 0644
adbackend.py File 16.17 KB 0644
azurebackend.py File 5.41 KB 0644
b2backend.py File 9.31 KB 0644
boxbackend.py File 6.63 KB 0644
cfbackend.py File 1.12 KB 0644
dpbxbackend.py File 19.09 KB 0644
gdocsbackend.py File 8.9 KB 0644
gdrivebackend.py File 16.28 KB 0644
giobackend.py File 7.74 KB 0644
hsibackend.py File 2.54 KB 0644
hubicbackend.py File 2.37 KB 0644
idrivedbackend.py File 18.86 KB 0644
imapbackend.py File 9.53 KB 0644
jottacloudbackend.py File 5.62 KB 0644
lftpbackend.py File 8.97 KB 0644
localbackend.py File 2.97 KB 0644
mediafirebackend.py File 4.53 KB 0644
megabackend.py File 6.93 KB 0644
megav2backend.py File 8.42 KB 0644
megav3backend.py File 9.63 KB 0644
multibackend.py File 15.39 KB 0644
ncftpbackend.py File 5.32 KB 0644
onedrivebackend.py File 15.94 KB 0644
par2backend.py File 8.78 KB 0644
pcabackend.py File 12.18 KB 0644
pydrivebackend.py File 12.72 KB 0644
rclonebackend.py File 4.64 KB 0644
rsyncbackend.py File 6.3 KB 0644
s3_boto3_backend.py File 10.25 KB 0644
slatebackend.py File 6.37 KB 0644
ssh_paramiko_backend.py File 19.21 KB 0644
ssh_pexpect_backend.py File 12.63 KB 0644
swiftbackend.py File 9.91 KB 0644
sxbackend.py File 2.26 KB 0644
tahoebackend.py File 2.5 KB 0644
webdavbackend.py File 18.2 KB 0644
xorrisobackend.py File 11.96 KB 0644
Filemanager