Coverage for torrentfile\utils.py: 100%
133 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 21:50 -0700
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 21:50 -0700
1#! /usr/bin/python3
2# -*- coding: utf-8 -*-
4##############################################################################
5# Copyright (C) 2021-current alexpdev
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18##############################################################################
19"""
20Utility functions and classes used throughout package.
21"""
23import os
24import math
25import ctypes
26import shutil
27import platform
28from pathlib import Path
30if platform.system() == "Windows": # pragma: nocover
31 kernel32 = ctypes.windll.kernel32
32 kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
35class Memo:
36 """
37 Memoize cache.
39 Parameters
40 ----------
41 func : Callable
42 The results of this callable will be cached.
43 """
45 def __init__(self, func):
46 """
47 Construct cache.
48 """
49 self.func = func
50 self.counter = 0
51 self.cache = {}
53 def __call__(self, path: str):
54 """
55 Invoke each time memo function is executed.
57 Parameters
58 ----------
59 path : str
60 The relative or absolute path being used as key in cache dict.
62 Returns
63 -------
64 Any :
65 The results of calling the function with path.
66 """
67 if path in self.cache and os.path.exists(path):
68 self.counter += 1
69 return self.cache[path]
70 result = self.func(path)
71 self.cache[path] = result
72 return result
75class MissingPathError(Exception):
76 """
77 Path parameter is required to specify target content.
79 Creating a .torrent file with no contents seems rather silly.
81 Parameters
82 ----------
83 message : str
84 Message for user (optional).
85 """
87 def __init__(self, message: str = None):
88 """
89 Raise when creating a meta file without specifying target content.
91 The `message` argument is a message to pass to Exception base class.
92 """
93 self.message = f"Path arguement is missing and required {str(message)}"
94 super().__init__(message)
97class PieceLengthValueError(Exception):
98 """
99 Piece Length parameter must equal a perfect power of 2.
101 Parameters
102 ----------
103 message : str
104 Message for user (optional).
105 """
107 def __init__(self, message: str = None):
108 """
109 Raise when creating a meta file with incorrect piece length value.
111 The `message` argument is a message to pass to Exception base class.
112 """
113 self.message = f"Incorrect value for piece length: {str(message)}"
114 super().__init__(message)
117class ArgumentError(Exception):
118 """
119 Exception for mismatched or mistyped CLI arguments.
120 """
123SUFFIXES = ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]
126def humanize_bytes(amount: int) -> str:
127 """
128 Convert integer into human readable memory sized denomination.
130 Parameters
131 ----------
132 amount : int
133 total number of bytes.
135 Returns
136 -------
137 str
138 human readable representation of the given amount of bytes.
139 """
140 base = 1024
141 amount = float(amount)
142 value = abs(amount)
143 if value == 1:
144 return f"{amount} Byte"
145 if value < base:
146 return f"{amount} Bytes"
147 for i, s in enumerate(SUFFIXES):
148 unit = base**(i + 2)
149 if value < unit:
150 break
151 value = base * amount / unit
152 return f"{value:.1f} {s}"
155def normalize_piece_length(piece_length: int) -> int:
156 """
157 Verify input piece_length is valid and convert accordingly.
159 Parameters
160 ----------
161 piece_length : int | str
162 The piece length provided by user.
164 Returns
165 -------
166 int
167 normalized piece length.
169 Raises
170 ------
171 PieceLengthValueError :
172 Piece length is improper value.
173 """
174 if isinstance(piece_length, str):
175 if piece_length.isnumeric():
176 piece_length = int(piece_length)
177 else:
178 raise PieceLengthValueError(piece_length)
180 if piece_length > (1 << 14):
181 if 2**math.log2(piece_length) == piece_length:
182 return piece_length
183 raise PieceLengthValueError(piece_length)
185 if 13 < piece_length < 26:
186 return 2**piece_length
187 if piece_length <= 13:
188 raise PieceLengthValueError(piece_length)
190 log = int(math.log2(piece_length))
191 if 2**log == piece_length:
192 return piece_length
193 raise PieceLengthValueError
196def get_piece_length(size: int) -> int:
197 """
198 Calculate the ideal piece length for bittorrent data.
200 Parameters
201 ----------
202 size : int
203 Total bits of all files incluided in .torrent file.
205 Returns
206 -------
207 int
208 Ideal piece length.
209 """
210 exp = 14
211 while size / (2**exp) > 1000 and exp < 24:
212 exp += 1
213 return 2**exp
216@Memo
217def filelist_total(pathstring: str) -> os.PathLike:
218 """
219 Perform error checking and format conversion to os.PathLike.
221 Parameters
222 ----------
223 pathstring : str
224 An existing filesystem path.
226 Returns
227 -------
228 os.PathLike
229 Input path converted to bytes format.
231 Raises
232 ------
233 MissingPathError
234 File could not be found.
235 """
236 if os.path.exists(pathstring):
237 path = Path(pathstring)
238 return _filelist_total(path)
239 raise MissingPathError
242def _filelist_total(path: os.PathLike) -> tuple:
243 """
244 Recursively search directory tree for files.
246 Parameters
247 ----------
248 path : str
249 Path to file or directory base
251 Returns
252 -------
253 Tuple[int, List] :
254 int - sum of sizes for all files collected
255 list - all file paths within directory tree
256 """
257 if path.is_file():
258 file_size = os.path.getsize(path)
259 return file_size, [str(path)]
260 total = 0
261 filelist = []
262 if path.is_dir():
263 for item in path.iterdir():
264 size, paths = filelist_total(item)
265 total += size
266 filelist.extend(paths)
267 return total, sorted(filelist)
270def path_size(path: str) -> int:
271 """
272 Return the total size of all files in path recursively.
274 Parameters
275 ----------
276 path : str
277 path to target file or directory.
279 Returns
280 -------
281 int
282 total size of files.
283 """
284 total_size, _ = filelist_total(path)
285 return total_size
288def get_file_list(path: str) -> list:
289 """
290 Return a sorted list of file paths contained in directory.
292 Parameters
293 ----------
294 path : str
295 target file or directory.
297 Returns
298 -------
299 list :
300 sorted list of file paths.
301 """
302 _, filelist = filelist_total(path)
303 return filelist
306def path_stat(path: str) -> tuple:
307 """
308 Calculate directory statistics.
310 Parameters
311 ----------
312 path : str
313 The path to start calculating from.
315 Returns
316 -------
317 Tuple[list, int, int] :
318 list - List of all files contained in Directory
319 int - Total sum of bytes from all contents of dir
320 int - The size of pieces of the torrent contents.
321 """
322 total_size, filelist = filelist_total(path)
323 piece_length = get_piece_length(total_size)
324 return (filelist, total_size, piece_length)
327def path_piece_length(path: str) -> int:
328 """
329 Calculate piece length for input path and contents.
331 Parameters
332 ----------
333 path : str
334 The absolute path to directory and contents.
336 Returns
337 -------
338 int
339 The size of pieces of torrent content.
340 """
341 psize = path_size(path)
342 return get_piece_length(psize)
345def next_power_2(value: int) -> int:
346 """
347 Calculate the next perfect power of 2 equal to or greater than value.
349 Parameters
350 ----------
351 value : int
352 integer value that is less than some perfect power of 2.
354 Returns
355 -------
356 int
357 The next power of 2 greater than value, or value if already power of 2.
358 """
359 if not value & (value - 1) and value:
360 return value
361 start = 1
362 while start < value:
363 start <<= 1
364 return start
367def copypath(source: str, dest: str) -> None:
368 """
369 Copy the file located at source to dest.
371 If one or more directory paths don't exist in dest, they will be created.
372 If dest already exists and dest and source are the same size, it will be
373 ignored, however if dest is smaller than source, dest will be overwritten.
375 Parameters
376 ----------
377 source : str
378 path to source file
379 dest : str
380 path to target destination
381 """
382 if not os.path.exists(source) or (os.path.exists(dest)
383 and os.path.getsize(source)
384 <= os.path.getsize(dest)):
385 return
386 path_parts = Path(dest).parts
387 if len(path_parts) > 1:
388 root = path_parts[0]
389 path_parts = path_parts[1:-1]
390 if not os.path.exists(root):
391 os.mkdir(root) # pragma: nocover
392 for part in path_parts:
393 path = os.path.join(root, part)
394 if not os.path.exists(path):
395 os.mkdir(path)
396 root = path
397 shutil.copy(source, dest)
400def toggle_debug_mode(switch_on: bool):
401 """
402 Switch the environment variable debug indicator on or off.
404 Parameters
405 ----------
406 switch_on : bool
407 if true turn debug mode on otherwise off
408 """
409 os.environ["TORRENTFILE_DEBUG"] = "ON" if switch_on else "OFF"
412def debug_is_on() -> bool:
413 """
414 Return True if debug mode is on in environment variables.
416 Returns
417 -------
418 bool
419 is debug mode on
420 """
421 return os.environ["TORRENTFILE_DEBUG"] == "ON"
424def check_path_writable(path: str) -> bool:
425 """
426 Test if output path is writable.
428 Parameters
429 ----------
430 path : str
431 file system path string
433 Returns
434 -------
435 bool
436 True if writeable, otherwise raises PermissionError
437 """
438 try:
439 if path.endswith("\\") or path.endswith("/"):
440 path = os.path.join(path, ".torrent")
441 with open(path, "ab") as _:
442 pass
443 os.remove(path)
444 except PermissionError as err: # pragma: nocover
445 directory = os.path.dirname(path)
446 message = f"Target directory is not writeable {directory}"
447 raise PermissionError(message) from err
448 return True
451def green(string: str) -> str:
452 """
453 Output terminal content in green color.
454 """
455 return colored(string, 92)
458def colored(string: str, key: int) -> str:
459 """
460 Output terminal content with formatting.
461 """
462 return f"\033[{key}m{string}\033[0m"