[SOLVED] Mass Set Location

Linux specific questions, problems.
Post Reply
rockhazard

[SOLVED] Mass Set Location

Post by rockhazard »

Is there a way to relocate a range of torrents all at once? I've had to migrate my files to new hard drives and change the partition and directory scheme. I need a way to insert a string in the path of all the downloaded file locations, instead of using "Set location..." one at a time, which would be tedious.
Example:

Code: Select all

# old paths
/old/path/to/downloads/torrent1
/old/path/to/downloads/torrent2
# new paths
/new/path/to/downloads/torrent1
/new/path/to/downloads/torrent2
I don't see a way in the preferences, torrent options, etc, but maybe I can directly edit the database or something?

SOLUTION: jslay's handy script
Please read the original thread for insights/instructions.
The latest version of the script causes a runaway backup process that builds a huge backup file, so I disabled the script's backup functions and did a manual backup of BT_backup. I also altered it to point to the flatpak's BT_backup location. For many, this won't work, so just uncomment line 31 and comment line 32. This worked for me, but as always, backup your files before using this:

Code: Select all

#!/usr/bin/env python3

# script to batch change the save paths of torrents in qBittorrent BT_backup.
# credit: jslay
# flatpak mod by rockhazard 08/01/2021
# NO WARRANTIES stated or implied for any use; use at your own risk
# link to original script: https://qbforums.shiki.hu/viewtopic.php?f=14&t=4976&p=33204&hilit=Bulk+Change+Save+Path+Script#p33204

import os
import re
import sys
import logging
import zipfile

from datetime import datetime


logger = logging.getLogger(__name__)


class QBTBatchMove(object):
    def __init__(self, bt_backup_path=None):
        if bt_backup_path is None:
            logger.debug('Discovering BT_backup path...')
            if sys.platform.startswith('win32'):
                logger.debug('Windows System')
                bt_backup_path = os.path.join(os.getenv('localappdata'), 'qBittorrent\\BT_backup')
            elif sys.platform.startswith('linux'):
                logger.debug('Linux System')
                # default linux path
                # bt_backup_path = os.path.join(os.getenv('HOME'), '.local/share/data/qBittorrent/BT_backup')
                # flatpak
                bt_backup_path = os.path.join(os.getenv('HOME'), '.var/app/org.qbittorrent.qBittorrent/data/data/qBittorrent/BT_backup')
        if not os.path.exists(bt_backup_path) or not os.path.isdir(bt_backup_path):
            raise NotADirectoryError(bt_backup_path)
        logger.debug('BT_backup Path: %s' % bt_backup_path)
        self.bt_backup_path = bt_backup_path
        self.discovered_files = None

    def run(self, existing_path, new_path, create_backup=False):
        """
        Perform Batch Processing of path changes.
        :param existing_path: Existing path to look for
        :type existing_path: str
        :param new_path: New Path to replace with
        :type new_path: str
        :param create_backup: Create a backup of the file before modifying?
        :type create_backup: bool
        """
        if create_backup:
            backup_filename = 'fastresume_backup' + datetime.now().strftime('%Y%m%d%H%M%S') + '.zip'
            self.backup_folder(self.bt_backup_path,
                               os.path.join(os.path.dirname(self.bt_backup_path), backup_filename))
        if sys.platform.startswith('win32') and not existing_path.endswith('\\'):
            existing_path += '\\'
        elif sys.platform.startswith('linux') and not existing_path.endswith('/'):
            existing_path += '/'
        if sys.platform.startswith('win32') and not new_path.endswith('\\'):
            new_path += '\\'
        elif sys.platform.startswith('linux') and not new_path.endswith('/'):
            new_path += '/'
        logger.info('Searching for .fastresume files with path %s ...' % existing_path)
        self.discovered_files = self.discover_relevant_fast_resume(self.bt_backup_path,
                                                                   existing_path)
        if not self.discovered_files:
            raise ValueError('Found no .fastresume files with existing path %s' % existing_path)
        logger.info('Found %s Files.' % len(self.discovered_files))
        logger.debug('Discovered FastResumes: %s' % self.discovered_files)
        for file in self.discovered_files:
            logger.info('Updating File %s...' % file.file_path)
            new_save_path = file.save_path.replace(bytes(existing_path, 'utf-8'),
                                                   bytes(new_path, 'utf-8'))
            file.set_save_path(new_save_path.decode('utf-8'),
                               save_file=False,
                               create_backup=False)
            new_qbt_save_path = file.qbt_save_path.replace(bytes(existing_path, 'utf-8'),
                                                           bytes(new_path, 'utf-8'))
            file.set_qbt_save_path(new_qbt_save_path.decode('utf-8'),
                                   save_file=False,
                                   create_backup=False)
            file.save()
            logger.info('File (%s) Updated!' % file.file_path)

    @staticmethod
    def discover_relevant_fast_resume(bt_backup_path, existing_path):
        """
        Find .fastresume files that contain the existing path.
        :param bt_backup_path: Path to BT_backup folder
        :type bt_backup_path: str
        :param existing_path: The existing path to look for
        :type existing_path: str
        :return: List of FastResume Objects
        :rtype: list[FastResume]
        """
        existing_path = bytes(existing_path, 'utf-8')
        fast_resume_files = [FastResume(os.path.join(bt_backup_path, file))
                             for file in os.listdir(bt_backup_path)
                             if file.endswith('.fastresume')]
        fast_resume_files = [file for file in fast_resume_files if existing_path in file.save_path
                             or existing_path in file.qbt_save_path]
        return fast_resume_files

    @staticmethod
    def backup_folder(folder_path, archive_path):
        logger.info('Creating Archive %s ...' % archive_path)
        with zipfile.ZipFile(archive_path, 'w') as archive:
            for file in os.listdir(folder_path):
                archive.write(os.path.join(folder_path, file))
        logger.info('Done!')


class FastResume(object):
    save_path_pattern = br'save_path(\d+)'
    qBt_save_path_pattern = br'qBt-savePath(\d+)'

    def __init__(self, file_path):
        self.file_path = os.path.realpath(file_path)
        if not os.path.exists(self.file_path) or not os.path.isfile(self.file_path):
            raise FileNotFoundError(self.file_path)
        self._save_path = None
        self._qbt_save_path = None
        self._pattern_save_path = re.compile(self.save_path_pattern)
        self._pattern_qBt_save_path = re.compile(self.qBt_save_path_pattern)
        self._data = None
        self._load_data()

    def _load_data(self):
        with open(self.file_path, 'rb') as f:
            self._data = f.read()
        save_path_matches = self._pattern_save_path.search(self._data)
        qbt_matches = self._pattern_qBt_save_path.search(self._data)
        if save_path_matches is None:
            raise ValueError('Unable to determine \'save_path\'')
        if qbt_matches is None:
            raise ValueError('Unable to determine \'qBt-savePath\'')
        self._save_path = self._data[save_path_matches.end() + 1:
                                     save_path_matches.end() + 1 + int(save_path_matches.group(1))]
        self._qbt_save_path = self._data[qbt_matches.end() + 1:
                                         qbt_matches.end() + 1 + int(qbt_matches.group(1))]

    @property
    def save_path(self):
        return self._save_path

    @property
    def qbt_save_path(self):
        return self._qbt_save_path

    def set_save_path(self, path, save_file=True, create_backup=False):
        if sys.platform.startswith('win32') and not path.endswith('\\'):
            path += '\\'
        elif sys.platform.startswith('linux') and not path.endswith('/'):
            path += '/'
        if create_backup:
            today = datetime.now()
            file_name = '%s.%s.%s' % (self.file_path,
                                      today.strftime('%Y%m%d%H%M%S'),
                                      'bkup')
            self.save(file_name)
        self._data = self._data.replace(
            b'save_path' + bytes(str(len(self._save_path)), 'utf-8') + b':' + self._save_path,
            b'save_path' + bytes(str(len(path)), 'utf-8') + b':' + bytes(path, 'utf-8')
        )
        self._save_path = bytes(path, 'utf-8')
        if save_file:
            self.save()

    def set_qbt_save_path(self, path, save_file=True, create_backup=False):
        if sys.platform.startswith('win32') and not path.endswith('\\'):
            path += '\\'
        elif sys.platform.startswith('linux') and not path.endswith('/'):
            path += '/'
        if create_backup:
            today = datetime.now()
            file_name = '%s.%s.%s' % (self.file_path,
                                      today.strftime('%Y%m%d%H%M%S'),
                                      'bkup')
            self.save(file_name)
        self._data = self._data.replace(
            b'qBt-savePath' + bytes(str(len(self._qbt_save_path)), 'utf-8') + b':' + self._qbt_save_path,
            b'qBt-savePath' + bytes(str(len(path)), 'utf-8') + b':' + bytes(path, 'utf-8')
        )
        self._qbt_save_path = bytes(path, 'utf-8')
        if save_file:
            self.save()

    def set_save_paths(self, path, save_file=True, create_backup=False):
        # when create_backup=True, recursion may cause runaway process/huge file
        if create_backup:
            today = datetime.now()
            file_name = '%s.%s.%s' % (self.file_path,
                                      today.strftime('%Y%m%d%H%M%S'),
                                      'bkup')
            self.save(file_name)
        self.set_save_path(path, save_file=False, create_backup=False)
        self.set_qbt_save_path(path, save_file=False, create_backup=False)
        if save_file:
            self.save()

    def save(self, file_name=None):
        if file_name is None:
            file_name = self.file_path
        logger.info('Saving File %s...' % file_name)
        with open(file_name, 'wb') as f:
            f.write(self._data)


if __name__ == '__main__':
    logger.addHandler(logging.StreamHandler(stream=sys.stdout))
    logger.setLevel('INFO')
    qbm = QBTBatchMove()
    # bt_backup = input('BT_Backup Path (%s): ' % qbm.bt_backup_path)
    # if bt_backup != '':
        qbm.bt_backup_path = bt_backup
    ep = input('Existing Path: ')
    np = input('New Path: ')
    qbm.run(ep, np)
Post Reply