Coverage for torrentfile\utils.py: 100%

133 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-08-27 21:50 -0700

1#! /usr/bin/python3 

2# -*- coding: utf-8 -*- 

3 

4############################################################################## 

5# Copyright (C) 2021-current alexpdev 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18############################################################################## 

19""" 

20Utility functions and classes used throughout package. 

21""" 

22 

23import os 

24import math 

25import ctypes 

26import shutil 

27import platform 

28from pathlib import Path 

29 

30if platform.system() == "Windows": # pragma: nocover 

31 kernel32 = ctypes.windll.kernel32 

32 kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) 

33 

34 

35class Memo: 

36 """ 

37 Memoize cache. 

38 

39 Parameters 

40 ---------- 

41 func : Callable 

42 The results of this callable will be cached. 

43 """ 

44 

45 def __init__(self, func): 

46 """ 

47 Construct cache. 

48 """ 

49 self.func = func 

50 self.counter = 0 

51 self.cache = {} 

52 

53 def __call__(self, path: str): 

54 """ 

55 Invoke each time memo function is executed. 

56 

57 Parameters 

58 ---------- 

59 path : str 

60 The relative or absolute path being used as key in cache dict. 

61 

62 Returns 

63 ------- 

64 Any : 

65 The results of calling the function with path. 

66 """ 

67 if path in self.cache and os.path.exists(path): 

68 self.counter += 1 

69 return self.cache[path] 

70 result = self.func(path) 

71 self.cache[path] = result 

72 return result 

73 

74 

75class MissingPathError(Exception): 

76 """ 

77 Path parameter is required to specify target content. 

78 

79 Creating a .torrent file with no contents seems rather silly. 

80 

81 Parameters 

82 ---------- 

83 message : str 

84 Message for user (optional). 

85 """ 

86 

87 def __init__(self, message: str = None): 

88 """ 

89 Raise when creating a meta file without specifying target content. 

90 

91 The `message` argument is a message to pass to Exception base class. 

92 """ 

93 self.message = f"Path arguement is missing and required {str(message)}" 

94 super().__init__(message) 

95 

96 

97class PieceLengthValueError(Exception): 

98 """ 

99 Piece Length parameter must equal a perfect power of 2. 

100 

101 Parameters 

102 ---------- 

103 message : str 

104 Message for user (optional). 

105 """ 

106 

107 def __init__(self, message: str = None): 

108 """ 

109 Raise when creating a meta file with incorrect piece length value. 

110 

111 The `message` argument is a message to pass to Exception base class. 

112 """ 

113 self.message = f"Incorrect value for piece length: {str(message)}" 

114 super().__init__(message) 

115 

116 

117class ArgumentError(Exception): 

118 """ 

119 Exception for mismatched or mistyped CLI arguments. 

120 """ 

121 

122 

123SUFFIXES = ["KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"] 

124 

125 

126def humanize_bytes(amount: int) -> str: 

127 """ 

128 Convert integer into human readable memory sized denomination. 

129 

130 Parameters 

131 ---------- 

132 amount : int 

133 total number of bytes. 

134 

135 Returns 

136 ------- 

137 str 

138 human readable representation of the given amount of bytes. 

139 """ 

140 base = 1024 

141 amount = float(amount) 

142 value = abs(amount) 

143 if value == 1: 

144 return f"{amount} Byte" 

145 if value < base: 

146 return f"{amount} Bytes" 

147 for i, s in enumerate(SUFFIXES): 

148 unit = base**(i + 2) 

149 if value < unit: 

150 break 

151 value = base * amount / unit 

152 return f"{value:.1f} {s}" 

153 

154 

155def normalize_piece_length(piece_length: int) -> int: 

156 """ 

157 Verify input piece_length is valid and convert accordingly. 

158 

159 Parameters 

160 ---------- 

161 piece_length : int | str 

162 The piece length provided by user. 

163 

164 Returns 

165 ------- 

166 int 

167 normalized piece length. 

168 

169 Raises 

170 ------ 

171 PieceLengthValueError : 

172 Piece length is improper value. 

173 """ 

174 if isinstance(piece_length, str): 

175 if piece_length.isnumeric(): 

176 piece_length = int(piece_length) 

177 else: 

178 raise PieceLengthValueError(piece_length) 

179 

180 if piece_length > (1 << 14): 

181 if 2**math.log2(piece_length) == piece_length: 

182 return piece_length 

183 raise PieceLengthValueError(piece_length) 

184 

185 if 13 < piece_length < 26: 

186 return 2**piece_length 

187 if piece_length <= 13: 

188 raise PieceLengthValueError(piece_length) 

189 

190 log = int(math.log2(piece_length)) 

191 if 2**log == piece_length: 

192 return piece_length 

193 raise PieceLengthValueError 

194 

195 

196def get_piece_length(size: int) -> int: 

197 """ 

198 Calculate the ideal piece length for bittorrent data. 

199 

200 Parameters 

201 ---------- 

202 size : int 

203 Total bits of all files incluided in .torrent file. 

204 

205 Returns 

206 ------- 

207 int 

208 Ideal piece length. 

209 """ 

210 exp = 14 

211 while size / (2**exp) > 1000 and exp < 24: 

212 exp += 1 

213 return 2**exp 

214 

215 

216@Memo 

217def filelist_total(pathstring: str) -> os.PathLike: 

218 """ 

219 Perform error checking and format conversion to os.PathLike. 

220 

221 Parameters 

222 ---------- 

223 pathstring : str 

224 An existing filesystem path. 

225 

226 Returns 

227 ------- 

228 os.PathLike 

229 Input path converted to bytes format. 

230 

231 Raises 

232 ------ 

233 MissingPathError 

234 File could not be found. 

235 """ 

236 if os.path.exists(pathstring): 

237 path = Path(pathstring) 

238 return _filelist_total(path) 

239 raise MissingPathError 

240 

241 

242def _filelist_total(path: os.PathLike) -> tuple: 

243 """ 

244 Recursively search directory tree for files. 

245 

246 Parameters 

247 ---------- 

248 path : str 

249 Path to file or directory base 

250 

251 Returns 

252 ------- 

253 Tuple[int, List] : 

254 int - sum of sizes for all files collected 

255 list - all file paths within directory tree 

256 """ 

257 if path.is_file(): 

258 file_size = os.path.getsize(path) 

259 return file_size, [str(path)] 

260 total = 0 

261 filelist = [] 

262 if path.is_dir(): 

263 for item in path.iterdir(): 

264 size, paths = filelist_total(item) 

265 total += size 

266 filelist.extend(paths) 

267 return total, sorted(filelist) 

268 

269 

270def path_size(path: str) -> int: 

271 """ 

272 Return the total size of all files in path recursively. 

273 

274 Parameters 

275 ---------- 

276 path : str 

277 path to target file or directory. 

278 

279 Returns 

280 ------- 

281 int 

282 total size of files. 

283 """ 

284 total_size, _ = filelist_total(path) 

285 return total_size 

286 

287 

288def get_file_list(path: str) -> list: 

289 """ 

290 Return a sorted list of file paths contained in directory. 

291 

292 Parameters 

293 ---------- 

294 path : str 

295 target file or directory. 

296 

297 Returns 

298 ------- 

299 list : 

300 sorted list of file paths. 

301 """ 

302 _, filelist = filelist_total(path) 

303 return filelist 

304 

305 

306def path_stat(path: str) -> tuple: 

307 """ 

308 Calculate directory statistics. 

309 

310 Parameters 

311 ---------- 

312 path : str 

313 The path to start calculating from. 

314 

315 Returns 

316 ------- 

317 Tuple[list, int, int] : 

318 list - List of all files contained in Directory 

319 int - Total sum of bytes from all contents of dir 

320 int - The size of pieces of the torrent contents. 

321 """ 

322 total_size, filelist = filelist_total(path) 

323 piece_length = get_piece_length(total_size) 

324 return (filelist, total_size, piece_length) 

325 

326 

327def path_piece_length(path: str) -> int: 

328 """ 

329 Calculate piece length for input path and contents. 

330 

331 Parameters 

332 ---------- 

333 path : str 

334 The absolute path to directory and contents. 

335 

336 Returns 

337 ------- 

338 int 

339 The size of pieces of torrent content. 

340 """ 

341 psize = path_size(path) 

342 return get_piece_length(psize) 

343 

344 

345def next_power_2(value: int) -> int: 

346 """ 

347 Calculate the next perfect power of 2 equal to or greater than value. 

348 

349 Parameters 

350 ---------- 

351 value : int 

352 integer value that is less than some perfect power of 2. 

353 

354 Returns 

355 ------- 

356 int 

357 The next power of 2 greater than value, or value if already power of 2. 

358 """ 

359 if not value & (value - 1) and value: 

360 return value 

361 start = 1 

362 while start < value: 

363 start <<= 1 

364 return start 

365 

366 

367def copypath(source: str, dest: str) -> None: 

368 """ 

369 Copy the file located at source to dest. 

370 

371 If one or more directory paths don't exist in dest, they will be created. 

372 If dest already exists and dest and source are the same size, it will be 

373 ignored, however if dest is smaller than source, dest will be overwritten. 

374 

375 Parameters 

376 ---------- 

377 source : str 

378 path to source file 

379 dest : str 

380 path to target destination 

381 """ 

382 if not os.path.exists(source) or (os.path.exists(dest) 

383 and os.path.getsize(source) 

384 <= os.path.getsize(dest)): 

385 return 

386 path_parts = Path(dest).parts 

387 if len(path_parts) > 1: 

388 root = path_parts[0] 

389 path_parts = path_parts[1:-1] 

390 if not os.path.exists(root): 

391 os.mkdir(root) # pragma: nocover 

392 for part in path_parts: 

393 path = os.path.join(root, part) 

394 if not os.path.exists(path): 

395 os.mkdir(path) 

396 root = path 

397 shutil.copy(source, dest) 

398 

399 

400def toggle_debug_mode(switch_on: bool): 

401 """ 

402 Switch the environment variable debug indicator on or off. 

403 

404 Parameters 

405 ---------- 

406 switch_on : bool 

407 if true turn debug mode on otherwise off 

408 """ 

409 os.environ["TORRENTFILE_DEBUG"] = "ON" if switch_on else "OFF" 

410 

411 

412def debug_is_on() -> bool: 

413 """ 

414 Return True if debug mode is on in environment variables. 

415 

416 Returns 

417 ------- 

418 bool 

419 is debug mode on 

420 """ 

421 return os.environ["TORRENTFILE_DEBUG"] == "ON" 

422 

423 

424def check_path_writable(path: str) -> bool: 

425 """ 

426 Test if output path is writable. 

427 

428 Parameters 

429 ---------- 

430 path : str 

431 file system path string 

432 

433 Returns 

434 ------- 

435 bool 

436 True if writeable, otherwise raises PermissionError 

437 """ 

438 try: 

439 if path.endswith("\\") or path.endswith("/"): 

440 path = os.path.join(path, ".torrent") 

441 with open(path, "ab") as _: 

442 pass 

443 os.remove(path) 

444 except PermissionError as err: # pragma: nocover 

445 directory = os.path.dirname(path) 

446 message = f"Target directory is not writeable {directory}" 

447 raise PermissionError(message) from err 

448 return True 

449 

450 

451def green(string: str) -> str: 

452 """ 

453 Output terminal content in green color. 

454 """ 

455 return colored(string, 92) 

456 

457 

458def colored(string: str, key: int) -> str: 

459 """ 

460 Output terminal content with formatting. 

461 """ 

462 return f"\033[{key}m{string}\033[0m"