Coverage for torrentfile\rebuild.py: 100%
251 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 21:50 -0700
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 21:50 -0700
1#! /usr/bin/python3
2# -*- coding: utf-8 -*-
4##############################################################################
5# Copyright (C) 2021-current alexpdev
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18##############################################################################
19"""
20Clases and functions for the rebuild or reassemble subcommand.
22Re-assemble a torrent into the propper directory structure as indicated by a
23torrent meta file, and validate the contents of each file allong the
24way. Displays a progress bar for each torrent.
25"""
26import os
27import math
28import logging
29from hashlib import sha1
30from pathlib import Path
32import pyben
34from torrentfile.hasher import HasherV2
35from torrentfile.mixins import CbMixin, ProgMixin
36from torrentfile.utils import copypath
38logger = logging.getLogger(__name__)
39SHA1 = 20
42class PathNode:
43 """
44 Base class representing information regarding a file included in torrent.
45 """
47 def __init__(
48 self,
49 start: int = None,
50 stop: int = None,
51 full: str = None,
52 filename: str = None,
53 path: str = None,
54 length: int = None,
55 ):
56 """
57 Hold file information that contributes to the contents of torrent.
59 Parameters
60 ----------
61 start : int, optional
62 where the piece starts, by default None
63 stop : int, optional
64 where the piece ends, by default None
65 full : str, optional
66 full path, by default None
67 filename : str, optional
68 filename, by default None
69 path : str, optional
70 parent path, by default None
71 length : int, optional
72 size, by default None
73 """
74 self.path = path
75 self.start = start
76 self.stop = stop
77 self.length = length
78 self.filename = filename
79 self.full = full
81 def get_part(self, path: str) -> bytes:
82 """
83 Extract the part of the file needed to complete the hash.
85 Parameters
86 ----------
87 path : str
88 filesystem path location of file.
90 Returns
91 -------
92 bytes
93 part of the file's contents
94 """
95 with open(path, "rb") as fd:
96 if self.start:
97 fd.seek(self.start)
98 if self.stop != -1:
99 partial = fd.read(self.stop - self.start)
100 else:
101 partial = fd.read()
102 return partial
104 def __len__(self) -> int:
105 """
106 Return size of the file.
108 Returns
109 -------
110 int
111 total size
112 """
113 return self.length
116class PieceNode:
117 """
118 Base class representing a single SHA1 hash block of data from a torrent.
119 """
121 def __init__(self, piece: bytes):
122 """
123 Store information about an individual SHA1 hash for a torrent file.
125 _extended_summary_
127 Parameters
128 ----------
129 piece : bytes
130 SHA1 hash bytes
131 """
132 self.piece = piece
133 self.paths = []
134 self.result = None
135 self.dest = None
137 def append(self, pathnode: PathNode):
138 """
139 Append the path argument to the paths list attribute.
141 Parameters
142 ----------
143 pathnode : PathNode
144 the pathnode
145 """
146 self.paths.append(pathnode)
148 def _find_matches(self, filemap: dict, paths: list, data: bytes) -> bool:
149 """
150 Gather relavent sections of the files in the list and check the hash.
152 Parameters
153 ----------
154 filemap : dict
155 dictionary containing filename and path details
156 paths : list
157 list of pathnodes
158 data : bytes
159 raw file contents
161 Returns
162 -------
163 bool
164 success state
165 """
166 if not paths:
167 piece_hash = sha1(data).digest() # nosec
168 return piece_hash == self.piece
169 pathnode = paths[0]
170 filename = pathnode.filename
171 if filename not in filemap:
172 return False # pragma: nocover
173 for loc, size in filemap[filename]:
174 if size != len(pathnode):
175 continue
176 partial = pathnode.get_part(loc)
177 val = self._find_matches(filemap, paths[1:], data + partial)
178 if val:
179 dest_path = os.path.join(self.dest, pathnode.full)
180 copypath(loc, dest_path)
181 return val
182 return False
184 def find_matches(self, filemap: dict, dest: str) -> bool:
185 """
186 Find the matching files for each path in the node.
188 Parameters
189 ----------
190 filemap : dict
191 filename and details
192 dest : str
193 target destination path
195 Returns
196 -------
197 bool
198 success status
199 """
200 self.dest = dest
201 self.result = self._find_matches(filemap, self.paths[:], bytes())
202 return self.result
205class Metadata(CbMixin, ProgMixin):
206 """
207 Class containing the metadata contents of a torrent file.
208 """
210 def __init__(self, path: str):
211 """
212 Construct metadata object for torrent info.
214 Parameters
215 ----------
216 path : str
217 path to the .torrent file.
218 """
219 self.path = os.path.abspath(path)
220 self.name = None
221 self.piece_length = 1
222 self.meta_version = 1
223 self.pieces = b""
224 self.piece_nodes = []
225 self.length = 0
226 self.files = []
227 self.filenames = set()
228 self.extract()
229 if self.meta_version == 2:
230 self.num_pieces = len(self.filenames)
231 else:
232 self.num_pieces = math.ceil(len(self.pieces) / SHA1)
234 def extract(self):
235 """
236 Decode and extract information for the .torrent file.
237 """
238 meta = pyben.load(self.path)
239 info = meta["info"]
240 self.piece_length = info["piece length"]
241 self.name = info["name"]
242 self.meta_version = info.get("meta version", 1)
243 self.pieces = info.get("pieces", bytes())
244 if self.meta_version == 2:
245 self._parse_tree(info["file tree"], [self.name])
246 elif "length" in info:
247 self.length += info["length"]
248 self.is_file = True
249 self.filenames.add(info["name"])
250 self.files.append({
251 "path": Path(self.name).parent,
252 "filename": self.name,
253 "full": self.name,
254 "length": self.length,
255 })
256 elif "files" in info:
257 for f in info["files"]:
258 path = f["path"]
259 full = os.path.join(self.name, *path)
260 self.files.append({
261 "path": Path(full).parent,
262 "filename": path[-1],
263 "full": full,
264 "length": f["length"],
265 })
266 self.length += f["length"]
267 self.filenames.add(path[-1])
269 def _map_pieces(self):
270 """
271 Create PathNode and PieceNode details for each piece in the torrent.
272 """
273 total_pieces = len(self.pieces) // SHA1
274 remainder = file_index = 0
275 current = {}
276 for i in range(total_pieces):
277 begin = SHA1 * i
278 piece = PieceNode(self.pieces[begin:begin + SHA1])
279 target = self.piece_length
280 if remainder:
281 start = current["length"] - remainder
282 if remainder < target:
283 stop = -1
284 target -= remainder
285 remainder = 0
286 file_index += 1
287 else:
288 stop = start + target
289 remainder -= target
290 target -= target
291 pathnode = PathNode(start=start, stop=stop, **current)
292 piece.append(pathnode)
293 while target > 0 and file_index < len(self.files):
294 start = 0
295 current = self.files[file_index]
296 size = current["length"]
297 if size < target:
298 stop = -1
299 target -= size
300 file_index += 1
301 else:
302 stop = target
303 remainder = size - target
304 target = 0
305 pathnode = PathNode(start=start, stop=stop, **current)
306 piece.append(pathnode)
307 self.piece_nodes.append(piece)
309 def _parse_tree(self, tree: dict, partials: list):
310 """
311 Parse the file tree dictionary of the torrent metafile.
313 Parameters
314 ----------
315 tree : dict
316 the dictionary representation of a file tree.
317 partials : list
318 list of paths leading up to the current key value.
319 """
320 for key, val in tree.items():
321 if "" in val:
322 self.filenames.add(key)
323 path = Path(os.path.join(*partials))
324 full = Path(os.path.join(path, key))
325 length = val[""]["length"]
326 root = val[""]["pieces root"]
327 self.files.append({
328 "path": path,
329 "full": full,
330 "filename": key,
331 "length": length,
332 "root": root,
333 })
334 self.length += length
335 else:
336 self._parse_tree(val, partials + [key])
338 def _match_v1(self, filemap: dict, dest: str):
339 """
340 Check each of the nodes against the filemap dictionary for matches.
342 Parameters
343 ----------
344 filemap : dict
345 filenames and filesystem information
346 dest : str
347 target destination path
348 """
349 self._map_pieces()
350 copied = []
351 for piece_node in self.piece_nodes:
352 paths = piece_node.paths
353 if len(paths) == 1 and paths[0].path in copied:
354 self._update()
355 continue
356 if piece_node.find_matches(filemap, dest):
357 for pathnode in paths:
358 if pathnode.path not in copied:
359 copied.append(pathnode.path)
360 dest_path = os.path.join(dest, pathnode.path)
361 self._update()
362 self.cb(pathnode.path, dest_path, self.num_pieces)
364 def _match_v2(self, filemap: dict, dest: str):
365 """
366 Rebuild method for torrent v2 files.
368 Parameters
369 ----------
370 filemap : dict
371 filesystem information
372 dest : str
373 destiantion path
374 """
375 for entry in self.files:
376 filename = entry["filename"]
377 length = entry["length"]
378 if filename not in filemap:
379 continue # pragma: nocover
380 paths = filemap[filename]
381 for path, size in paths:
382 if size == length:
383 hasher = HasherV2(path, self.piece_length, True)
384 if entry["root"] == hasher.root:
385 dest_path = os.path.join(dest, entry["full"])
386 copypath(entry["path"], dest_path)
387 self._update()
388 self.cb(path, dest_path, self.num_pieces)
389 break
391 def rebuild(self, filemap: dict, dest: str):
392 """
393 Rebuild torrent file contents from filemap at dest.
395 Searches through the contents of the meta file and compares filenames
396 with those in the filemap dict, and if found checks their contents,
397 and copies them to the destination path.
399 Parameters
400 ----------
401 filemap : dict
402 filesystem information
403 dest : str
404 destiantion path
405 """
406 self._prog = None
407 if self.meta_version == 2:
408 self._match_v2(filemap, dest)
409 else:
410 self._match_v1(filemap, dest)
411 if self._prog is not None:
412 self.progbar.close_out()
414 def _update(self):
415 """Start and updating the progress bar."""
416 if self._prog is None:
417 self._prog = True
418 self.progbar = self.get_progress_tracker(self.num_pieces,
419 self.name)
420 self.progbar.update(1)
423class Assembler(CbMixin):
424 """
425 Does most of the work in attempting the structure of torrentfiles.
427 Requires three paths as arguments.
428 - torrent metafile or directory containing multiple meta files
429 - directory containing the contents of meta file
430 - directory where torrents will be re-assembled
431 """
433 def __init__(self, metafiles: list, contents: list, dest: str):
434 """
435 Reassemble given torrent file from given cli arguments.
437 Rebuild metafiles and contents into their original directory
438 structure as much as possible in the destination directory.
439 Takes two paths as parameters,
440 - file or directory containing 1 or more torrent meta files
441 - path to where the contents are belived to be located.
443 Parameters
444 ----------
445 metafiles : str
446 path to torrent metafile or directory containing torrent metafiles.
447 contents : str
448 path to content or directory containing content that belongs to
449 torrentfile.
450 dest: str
451 path to the directory where rebuild will take place.
452 """
453 Metadata.set_callback(self._callback)
454 self.counter = 0
455 self._lastlog = None
456 self.contents = contents
457 self.dest = dest
458 self.meta_paths = metafiles
459 self.metafiles = self._get_metafiles()
460 filenames = set()
461 for meta in self.metafiles:
462 filenames |= meta.filenames
463 self.filemap = _index_contents(self.contents, filenames)
465 def _callback(self, filename: str, dest: str, num_pieces: int):
466 """
467 Run the callback functions associated with Mixin for copied files.
469 Parameters
470 ----------
471 filename : str
472 filename
473 dest : str
474 destination path
475 num_pieces : int
476 number of hash pieces
477 """
478 self.counter += 1
479 message = f"Matched:{num_pieces} {filename} -> {dest}"
480 if message != self._lastlog:
481 self._lastlog = message
482 logger.debug(message)
484 def assemble_torrents(self):
485 """
486 Assemble collection of torrent files into original structure.
488 Returns
489 -------
490 int
491 number of files copied
492 """
493 for metafile in self.metafiles:
494 logger.info("#%s Searching contents for %s", self.counter,
495 metafile.name)
496 self.rebuild(metafile)
497 return self.counter
499 def rebuild(self, metafile: Metadata) -> None:
500 """
501 Build the torrent file structure from contents of directory.
503 Traverse contents dir and compare discovered files
504 with files listed in torrent metadata and copy
505 the matches to the destination directory respecting folder
506 structures along the way.
507 """
508 metafile.rebuild(self.filemap, self.dest)
510 def _iter_files(self, path: str) -> list:
511 """
512 Iterate through metfiles directory createing Metafile objects.
514 Parameters
515 ----------
516 path : str
517 fs path
519 Returns
520 -------
521 list
522 list of Metadata Object
523 """
524 metafiles = []
525 for filename in os.listdir(path):
526 if filename.lower().endswith(".torrent"):
527 try:
528 meta = Metadata(os.path.join(path, filename))
529 metafiles.append(meta)
530 except ValueError: # pragma: nocover
531 self.counter -= 1
532 return metafiles
534 def _get_metafiles(self) -> list:
535 """
536 Collect all .torrent meta files from given directory or file.
538 Returns
539 -------
540 list
541 metafile objects
542 """
543 metafiles = []
544 for path in self.meta_paths:
545 if os.path.exists(path):
546 if os.path.isdir(path):
547 metafiles += self._iter_files(path)
548 elif path.lower().endswith(".torrent"):
549 meta = Metadata(path)
550 metafiles.append(meta)
551 return metafiles
554def _index_contents(contents: list, filenames: set) -> dict:
555 """
556 Collect all of the filenames and their respective paths.
558 Parameters
559 ----------
560 contents : list
561 paths to traverse looking for filenames
562 filenames : set
563 set of filenames to look for
565 Returns
566 -------
567 dict
568 all filenames and their respective paths.
569 """
570 mapping = {}
571 for dirpath in contents:
572 mapped = _index_content(dirpath, filenames)
573 for key, value in mapped.items():
574 mapping.setdefault(key, [])
575 mapping[key].extend(value)
576 return mapping
579def _index_content(root: str, filenames: set) -> dict:
580 """
581 Collect filenames from directory or file.
583 Parameters
584 ----------
585 root : str
586 path to search for filenames
587 filenames : set
588 set of filenames to search for
590 Returns
591 -------
592 dict
593 filenames and their respective paths
594 """
595 filemap = {}
596 if os.path.isfile(root):
597 name = os.path.basename(root)
598 if name in filenames:
599 size = os.path.getsize(root)
600 filemap.setdefault(name, [])
601 filemap[name].append((root, size))
602 return filemap
603 if os.path.isdir(root):
604 for path in os.listdir(root):
605 fullpath = os.path.join(root, path)
606 resultmap = _index_content(fullpath, filenames)
607 for key, value in resultmap.items():
608 filemap.setdefault(key, [])
609 filemap[key].extend(value)
610 return filemap