Skip to content

Recheck

recheck #

Module container Checker Class.

The CheckerClass takes a torrentfile and tha path to it’s contents. It will then iterate through every file and directory contained and compare their data to values contained within the torrent file. Completion percentages will be printed to screen for each file and at the end for the torrentfile as a whole.

Checker(metafile: str, path: str) #

Check a given file or directory to see if it matches a torrentfile.

Public constructor for Checker class instance.

PARAMETER DESCRIPTION
metafile

Path to “.torrent” file.

TYPE: str

path

Path where the content is located in filesystem.

TYPE: str

Example#
1
2
3
4
5
>> metafile = "/path/to/torrentfile/content_file_or_dir.torrent"
>> location = "/path/to/location"
>> os.path.exists("/path/to/location/content_file_or_dir")
Out: True
>> checker = Checker(metafile, location)

Validate data against hashes contained in .torrent file.

PARAMETER DESCRIPTION
metafile

path to .torrent file

TYPE: str

path

path to content or contents parent directory.

TYPE: str

Source code in torrentfile\recheck.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def __init__(self, metafile: str, path: str):
    """
    Validate data against hashes contained in .torrent file.

    Parameters
    ----------
    metafile : str
        path to .torrent file
    path : str
        path to content or contents parent directory.
    """
    if not os.path.exists(metafile):
        raise FileNotFoundError
    if os.path.isdir(metafile):
        raise ArgumentError(
            "The <metafile> must be a .torrent file. Not a directory")
    self.last_log = None
    self.log_msg("Checking: %s, %s", metafile, path)
    self.metafile = metafile
    self.total = 0
    self.paths = []
    self.fileinfo = {}
    print("Extracting data from torrent file...")
    self.meta = pyben.load(metafile)
    self.info = self.meta["info"]
    self.name = self.info["name"]
    self.piece_length = self.info["piece length"]

    if "meta version" in self.info:
        if "pieces" in self.info:
            self.meta_version = 3
        else:
            self.meta_version = 2
    else:
        self.meta_version = 1

    self.root = self.find_root(path)
    self.check_paths()

check_paths() #

Gather all file paths described in the torrent file.

Source code in torrentfile\recheck.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def check_paths(self):
    """
    Gather all file paths described in the torrent file.
    """
    finfo = self.fileinfo

    if "length" in self.info:
        self.log_msg("%s points to a single file", self.root)
        self.total = self.info["length"]
        self.paths.append(str(self.root))

        finfo[0] = {
            "path": self.root,
            "length": self.info["length"],
        }

        if self.meta_version > 1:
            root = self.info["file tree"][self.name][""]["pieces root"]
            finfo[0]["pieces root"] = root

        return

    # Otherwise Content is more than 1 file.
    self.log_msg("%s points to a directory", self.root)
    if self.meta_version == 1:
        for i, item in enumerate(self.info["files"]):
            self.total += item["length"]
            base = os.path.join(*item["path"])

            self.fileinfo[i] = {
                "path": str(self.root / base),
                "length": item["length"],
            }

            self.paths.append(str(self.root / base))
        return

    self.walk_file_tree(self.info["file tree"], [])

find_root(path: str) -> str #

Check path for torrent content.

The path can be a relative or absolute filesystem path. In the case where the content is a single file, the path may point directly to the the file, or it may point to the parent directory. If content points to a directory. The directory will be checked to see if it matches the torrent’s name, if not the directories contents will be searched. The returned value will be the absolute path that matches the torrent’s name.

PARAMETER DESCRIPTION
path

root path to torrent content

TYPE: str

RETURNS DESCRIPTION
str

root path to content

Source code in torrentfile\recheck.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def find_root(self, path: str) -> str:
    """
    Check path for torrent content.

    The path can be a relative or absolute filesystem path.  In the case
    where the content is a single file, the path may point directly to the
    the file, or it may point to the parent directory.  If content points
    to a directory.  The directory will be checked to see if it matches
    the torrent's name, if not the directories contents will be searched.
    The returned value will be the absolute path that matches the torrent's
    name.

    Parameters
    ----------
    path : str
        root path to torrent content

    Returns
    -------
    str
        root path to content
    """
    if not os.path.exists(path):
        self.log_msg("Could not locate torrent content %s.", path)
        raise FileNotFoundError(path)

    root = Path(path)
    if root.name == self.name:
        self.log_msg("Content found: %s.", str(root))
        return root

    if self.name in os.listdir(root):
        return root / self.name

    self.log_msg("Could not locate torrent content in: %s", str(root))
    raise FileNotFoundError(root)

iter_hashes() -> tuple #

Produce results of comparing torrent contents piece by piece.

YIELDS DESCRIPTION
chunck

hash of data found on disk

TYPE: bytes

piece

hash of data when complete and correct

TYPE: bytes

path

path to file being hashed

TYPE: str

size

length of bytes hashed for piece

TYPE: int

Source code in torrentfile\recheck.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def iter_hashes(self) -> tuple:
    """
    Produce results of comparing torrent contents piece by piece.

    Yields
    ------
    chunck : bytes
        hash of data found on disk
    piece : bytes
        hash of data when complete and correct
    path : str
        path to file being hashed
    size : int
        length of bytes hashed for piece
    """
    matched = consumed = 0
    checker = self.piece_checker()
    for chunk, piece, path, size in checker(self):
        consumed += size
        matching = 0
        if chunk == piece:
            matching += size
            matched += size
        yield chunk, piece, path, size
        total_consumed = str(int(consumed / self.total * 100))
        percent_matched = str(int(matched / consumed * 100))
        self.log_msg(
            "Processed: %s%%, Matched: %s%%",
            total_consumed,
            percent_matched,
        )
    self._result = (matched / consumed) * 100 if consumed > 0 else 0

log_msg(*args, level: int = logging.INFO) #

Log message msg to logger and send msg to callback hook.

PARAMETER DESCRIPTION
*args

formatting args for log message

TYPE: dict DEFAULT: ()

level

Log level for this message; default=logging.INFO

TYPE: int DEFAULT: logging.INFO

Source code in torrentfile\recheck.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def log_msg(self, *args, level: int = logging.INFO):
    """
    Log message `msg` to logger and send `msg` to callback hook.

    Parameters
    ----------
    *args : dict
        formatting args for log message
    level : int
        Log level for this message; default=`logging.INFO`
    """
    message = args[0]
    if len(args) >= 3:
        message = message % tuple(args[1:])
    elif len(args) == 2:
        message = message % args[1]

    # Repeat log messages should be ignored.
    if message != self.last_log:
        self.last_log = message
        logger.log(level, message)
        if self._hook and level == logging.INFO:
            self._hook(message)

piece_checker() #

Check individual pieces of the torrent.

RETURNS DESCRIPTION
HashChecker | FeedChecker

Individual piece hasher.

Source code in torrentfile\recheck.py
122
123
124
125
126
127
128
129
130
131
132
133
def piece_checker(self):
    """
    Check individual pieces of the torrent.

    Returns
    -------
    HashChecker | FeedChecker
        Individual piece hasher.
    """
    if self.meta_version == 1:
        return FeedChecker
    return HashChecker

register_callback(hook) classmethod #

Register hooks from 3rd party programs to access generated info.

PARAMETER DESCRIPTION
hook

callback function for the logging feature.

TYPE: function

Source code in torrentfile\recheck.py
110
111
112
113
114
115
116
117
118
119
120
@classmethod
def register_callback(cls, hook):
    """
    Register hooks from 3rd party programs to access generated info.

    Parameters
    ----------
    hook : function
        callback function for the logging feature.
    """
    cls._hook = hook

results() #

Generate result percentage and store for future calls.

Source code in torrentfile\recheck.py
135
136
137
138
139
140
141
142
143
144
145
146
def results(self):
    """
    Generate result percentage and store for future calls.
    """
    responses = []
    for response in self.iter_hashes():
        responses.append(response)

    self.log_msg("Final result for %s recheck:  %s", self.metafile,
                 self._result)

    return self._result

walk_file_tree(tree: dict, partials: list) #

Traverse File Tree dictionary to get file details.

Extract full pathnames, length, root hash, and layer hashes for each file included in the .torrent’s file tree.

PARAMETER DESCRIPTION
tree

File Tree dict extracted from torrent file.

TYPE: dict

partials

list of intermediate pathnames.

TYPE: list

Source code in torrentfile\recheck.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def walk_file_tree(self, tree: dict, partials: list):
    """
    Traverse File Tree dictionary to get file details.

    Extract full pathnames, length, root hash, and layer hashes
    for each file included in the .torrent's file tree.

    Parameters
    ----------
    tree : dict
        File Tree dict extracted from torrent file.
    partials : list
        list of intermediate pathnames.
    """
    for key, val in tree.items():
        # Empty string means the tree's leaf is value
        if "" in val:
            base = os.path.join(*partials, key)
            roothash = None
            length = val[""]["length"]
            roothash = None if not length else val[""]["pieces root"]
            full = str(self.root / base)
            self.fileinfo[len(self.paths)] = {
                "path": full,
                "length": length,
                "pieces root": roothash,
            }
            self.paths.append(full)
            self.total += length
        else:
            self.walk_file_tree(val, partials + [key])

FeedChecker(checker: Checker) #

Bases: ProgMixin

Validates torrent content.

Seemlesly validate torrent file contents by comparing hashes in metafile against data on disk.

PARAMETER DESCRIPTION
checker

the checker class instance.

TYPE: object

Generate hashes of piece length data from filelist contents.

Source code in torrentfile\recheck.py
327
328
329
330
331
332
333
334
335
336
337
338
def __init__(self, checker: Checker):
    """
    Generate hashes of piece length data from filelist contents.
    """
    self.piece_length = checker.piece_length
    self.paths = checker.paths
    self.pieces = checker.info["pieces"]
    self.fileinfo = checker.fileinfo
    self.piece_map = {}
    self.index = 0
    self.piece_count = 0
    self.it = None

__iter__() #

Assign iterator and return self.

Source code in torrentfile\recheck.py
340
341
342
343
344
345
def __iter__(self):
    """
    Assign iterator and return self.
    """
    self.it = self.iter_pieces()
    return self

__next__() #

Yield back result of comparison.

Source code in torrentfile\recheck.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def __next__(self):
    """
    Yield back result of comparison.
    """
    try:
        partial = next(self.it)
    except StopIteration as itererror:
        raise StopIteration from itererror

    chunck = sha1(partial).digest()  # nosec
    start = self.piece_count * SHA1
    end = start + SHA1
    piece = self.pieces[start:end]
    self.piece_count += 1
    path = self.paths[self.index]
    return chunck, piece, path, len(partial)

extract(path: str, partial: bytearray) -> bytearray #

Split file paths contents into blocks of data for hash pieces.

PARAMETER DESCRIPTION
path

path to content.

TYPE: str

partial

any remaining content from last file.

TYPE: bytes

RETURNS DESCRIPTION
bytearray

Hash digest for block of .torrent contents.

Source code in torrentfile\recheck.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def extract(self, path: str, partial: bytearray) -> bytearray:
    """
    Split file paths contents into blocks of data for hash pieces.

    Parameters
    ----------
    path : str
        path to content.
    partial : bytes
        any remaining content from last file.

    Returns
    -------
    bytearray
        Hash digest for block of .torrent contents.
    """
    read = 0
    length = self.fileinfo[self.index]["length"]
    partial = bytearray() if len(partial) == self.piece_length else partial
    if path not in self.paths:  # pragma: no cover
        raise MissingPathError(path)
    with open(path, "rb") as current:
        while True:
            bitlength = self.piece_length - len(partial)
            part = bytearray(bitlength)
            amount = current.readinto(part)
            read += amount
            partial.extend(part[:amount])
            if amount < bitlength:
                if amount > 0 and read == length:
                    self.progbar.update(amount)
                    yield partial
                break
            self.progbar.update(amount)
            yield partial
            partial = bytearray(0)
    if length != read:
        for pad in self._gen_padding(partial, length, read):
            yield pad

iter_pieces() #

Iterate through, and hash pieces of torrent contents.

YIELDS DESCRIPTION
piece

hash digest for block of torrent data.

TYPE: bytes

Source code in torrentfile\recheck.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def iter_pieces(self):
    """
    Iterate through, and hash pieces of torrent contents.

    Yields
    ------
    piece : bytes
        hash digest for block of torrent data.
    """
    partial = bytearray()
    for i, path in enumerate(self.paths):
        total = self.fileinfo[i]["length"]
        self.progbar = self.get_progress_tracker(total, path)
        self.index = i
        if os.path.exists(path):
            for piece in self.extract(path, partial):
                if (len(piece) == self.piece_length) or (i + 1 == len(
                        self.paths)):
                    yield piece
                else:
                    partial = piece

        else:
            length = self.fileinfo[i]["length"]
            for pad in self._gen_padding(partial, length):
                if len(pad) == self.piece_length:
                    yield pad
                else:
                    partial = pad
        self.progbar.close_out()

HashChecker(checker: Checker) #

Bases: ProgMixin

Iterate through contents of meta data and verify with file contents.

PARAMETER DESCRIPTION
checker

the checker instance that maintains variables.

TYPE: Checker

Construct a HybridChecker instance.

Source code in torrentfile\recheck.py
477
478
479
480
481
482
483
484
485
486
487
def __init__(self, checker: Checker):
    """
    Construct a HybridChecker instance.
    """
    self.checker = checker
    self.paths = checker.paths
    self.piece_length = checker.piece_length
    self.fileinfo = checker.fileinfo
    self.piece_layers = checker.meta["piece layers"]
    self.current = None
    self.index = -1

Padder(length, piece_length) #

Padding class to generate padding hashes wherever needed.

PARAMETER DESCRIPTION
length

the total size of the mock file generating padding for.

piece_length

the block size that each hash represents.

TYPE: int

Construct padding class to Mock missing or incomplete files.

PARAMETER DESCRIPTION
length

size of the file

TYPE: int

piece_length

the piece length for each iteration.

TYPE: int

Source code in torrentfile\recheck.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def __init__(self, length, piece_length):
    """
    Construct padding class to Mock missing or incomplete files.

    Parameters
    ----------
    length : int
        size of the file
    piece_length : int
        the piece length for each iteration.
    """
    self.length = length
    self.piece_length = piece_length
    self.pad = sha256(bytearray(piece_length)).digest()
__iter__() #

Return self to correctly implement iterator type.

Source code in torrentfile\recheck.py
535
536
537
538
539
def __iter__(self):
    """
    Return self to correctly implement iterator type.
    """
    return self  # pragma: nocover
__next__() -> bytes #

Iterate through seemingly endless sha256 hashes of zeros.

RETURNS DESCRIPTION
tuple

returns the padding

TYPE: bytes

RAISES DESCRIPTION
StopIteration
Source code in torrentfile\recheck.py
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def __next__(self) -> bytes:
    """
    Iterate through seemingly endless sha256 hashes of zeros.

    Returns
    -------
    tuple :
        returns the padding

    Raises
    ------
    StopIteration
    """
    if self.length >= self.piece_length:
        self.length -= self.piece_length
        return self.pad
    if self.length > 0:
        pad = sha256(bytearray(self.length)).digest()
        self.length -= self.length
        return pad
    raise StopIteration

__iter__() #

Assign iterator and return self.

Source code in torrentfile\recheck.py
489
490
491
492
493
def __iter__(self):
    """
    Assign iterator and return self.
    """
    return self

__next__() #

Provide the result of comparison.

Source code in torrentfile\recheck.py
495
496
497
498
499
500
501
502
503
504
505
506
def __next__(self):
    """
    Provide the result of comparison.
    """
    if self.current is None:
        self.next_file()
    try:
        return self.process_current()
    except StopIteration as itererr:
        if self.next_file():
            return self.process_current()
        raise StopIteration from itererr

advance() -> tuple #

Increment the number of pieces processed for the current file.

RETURNS DESCRIPTION
tuple

the piece and size

Source code in torrentfile\recheck.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
def advance(self) -> tuple:
    """
    Increment the number of pieces processed for the current file.

    Returns
    -------
    tuple
        the piece and size
    """
    start = self.count * SHA256
    end = start + SHA256
    piece = self.pieces[start:end]
    self.count += 1
    if self.length >= self.piece_length:
        self.length -= self.piece_length
        size = self.piece_length
    else:
        size = self.length
        self.length -= self.length
    return piece, size

next_file() -> bool #

Remove all references to processed files and prepare for the next.

RETURNS DESCRIPTION
bool

if there is a next file found

Source code in torrentfile\recheck.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
def next_file(self) -> bool:
    """
    Remove all references to  processed files and prepare for the next.

    Returns
    -------
    bool
        if there is a next file found
    """
    self.index += 1
    if self.current is None or self.index < len(self.paths):
        self.current = self.paths[self.index]
        self.length = self.fileinfo[self.index]["length"]
        self.root_hash = self.fileinfo[self.index]["pieces root"]
        if self.length > self.piece_length:
            self.pieces = self.piece_layers[self.root_hash]
        else:
            self.pieces = self.root_hash
        path = self.paths[self.index]
        self.progbar = self.get_progress_tracker(self.length, path)
        self.count = 0
        if os.path.exists(self.current):
            self.hasher = FileHasher(
                path,
                self.piece_length,
                progress=2,
                progress_bar=self.progbar,
            )
        else:
            self.hasher = self.Padder(self.length, self.piece_length)
        return True
    if self.index >= len(self.paths):
        del self.current
        del self.length
        del self.root_hash
        del self.pieces
    return False

process_current() -> tuple #

Gather necessary information to compare to metafile details.

RETURNS DESCRIPTION
tuple

a tuple containing the layer, piece, current path and size

RAISES DESCRIPTION
StopIteration
Source code in torrentfile\recheck.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def process_current(self) -> tuple:
    """
    Gather necessary information to compare to metafile details.

    Returns
    -------
    tuple
        a tuple containing the layer, piece, current path and size

    Raises
    ------
    StopIteration
    """
    try:
        layer = next(self.hasher)
        piece, size = self.advance()
        self.progbar.update(size)
        return layer, piece, self.current, size
    except StopIteration as err:
        if self.length > 0 and self.count * SHA256 < len(self.pieces):
            self.hasher = self.Padder(self.length, self.piece_length)
            piece, size = self.advance()
            layer = next(self.hasher)
            self.progbar.update(0)
            return layer, piece, self.current, size
        raise StopIteration from err