Coverage for torrentfile\commands.py: 100%

178 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-08-27 21:50 -0700

1#! /usr/bin/python3 

2# -*- coding: utf-8 -*- 

3 

4############################################################################## 

5# Copyright (C) 2021-current alexpdev 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18############################################################################## 

19""" 

20The commands module contains the Action Commands executed by the CLI script. 

21 

22Each function pertains to a command line action/subcommand and drives specific 

23features of the application. 

24 

25Functions 

26--------- 

27- create 

28- info 

29- edit 

30- recheck 

31- magnet 

32- rebuild 

33- find_config_file 

34- parse_config_file 

35- get_magnet 

36""" 

37 

38import os 

39import sys 

40import shutil 

41import logging 

42import configparser 

43from argparse import Namespace 

44from hashlib import sha1, sha256 

45from pathlib import Path 

46from urllib.parse import quote_plus 

47 

48import pyben 

49 

50from torrentfile.edit import edit_torrent 

51from torrentfile.interactive import select_action 

52from torrentfile.rebuild import Assembler 

53from torrentfile.recheck import Checker 

54from torrentfile.torrent import TorrentAssembler, TorrentFile 

55from torrentfile.utils import ArgumentError, check_path_writable 

56 

57logger = logging.getLogger(__name__) 

58 

59 

60def find_config_file(args: Namespace) -> str: 

61 """ 

62 Locate the path to the torrentfile configuration file. 

63 

64 Parameters 

65 ---------- 

66 args : Namespace 

67 command line argument values 

68 

69 Returns 

70 ------- 

71 str 

72 path to the configuration file 

73 

74 Raises 

75 ------ 

76 FileNotFoundError 

77 raised if configuration file not found. 

78 """ 

79 path = None 

80 error_message = "Could not find configuration file." 

81 if args.config_path: 

82 if os.path.exists(args.config_path): 

83 path = args.config_path 

84 else: 

85 raise FileNotFoundError(error_message) 

86 else: 

87 filename = "torrentfile.ini" 

88 paths = [ 

89 os.path.join(os.getcwd(), filename), 

90 Path.home() / ".torrentfile" / filename, 

91 Path.home() / ".config" / ".torrentfile" / filename, 

92 ] 

93 for subpath in paths: 

94 if os.path.exists(subpath): 

95 path = subpath 

96 break 

97 if path is None: 

98 raise FileNotFoundError(error_message) 

99 return path 

100 

101 

102def parse_config_file(path: str, kwargs: dict): 

103 """ 

104 Parse configuration file for torrent setup details. 

105 

106 Parameters 

107 ---------- 

108 path : str 

109 path to configuration file 

110 kwargs : dict 

111 options from command line arguments 

112 """ 

113 config = configparser.ConfigParser() 

114 config.read(path) 

115 

116 for key, val in config["config"].items(): 

117 if key.lower() in ["announce", "http-seed", "web-seed", "tracker"]: 

118 val = [i for i in val.split("\n") if i] 

119 

120 if key.lower() == "http-seed": 

121 kwargs["httpseeds"] = val 

122 

123 elif key.lower() == "web-seed": 

124 kwargs.setdefault("url-list", []) 

125 kwargs["url-list"] = val 

126 

127 else: 

128 kwargs[key.lower()] = val 

129 

130 elif key.lower() == "piece-length": 

131 kwargs["piece_length"] = val 

132 

133 elif key.lower() == "meta-version": 

134 kwargs["meta_version"] = val 

135 

136 elif val.lower() == "true": 

137 kwargs[key.lower()] = True 

138 

139 elif val.lower() == "false": 

140 kwargs[key.lower()] = False 

141 

142 else: 

143 kwargs[key.lower()] = val 

144 

145 

146def create(args: Namespace) -> Namespace: 

147 """ 

148 Execute the create CLI sub-command to create a new torrent metafile. 

149 

150 Parameters 

151 ---------- 

152 args : Namespace 

153 positional and optional CLI arguments. 

154 

155 Returns 

156 ------- 

157 torrentfile.MetaFile 

158 object containing the path to created metafile and its contents. 

159 """ 

160 kwargs = vars(args) 

161 if args.config: 

162 path = find_config_file(args) 

163 parse_config_file(path, kwargs) # pragma: nocover 

164 

165 if args.outfile: 

166 check_path_writable(args.outfile) 

167 

168 else: # pragma: nocover 

169 samplepath = os.path.join(os.getcwd(), ".torrent") 

170 check_path_writable(samplepath) 

171 

172 logger.debug("Creating torrent from %s", args.content) 

173 if args.meta_version == "1": 

174 torrent = TorrentFile(**kwargs) 

175 

176 else: 

177 torrent = TorrentAssembler(**kwargs) 

178 outfile, meta = torrent.write() 

179 

180 if args.magnet: 

181 magnet(outfile, version=0) 

182 

183 args.torrent = torrent 

184 args.kwargs = kwargs 

185 args.outfile = outfile 

186 args.meta = meta 

187 

188 print("\nTorrent Save Path: ", os.path.abspath(str(outfile))) 

189 logger.debug("Output path: %s", str(outfile)) 

190 return args 

191 

192 

193def info(args: Namespace) -> str: 

194 """ 

195 Show torrent metafile details to user via stdout. 

196 

197 Prints full details of torrent file contents to the terminal in 

198 a clean and readable format. 

199 

200 Parameters 

201 ---------- 

202 args : dict 

203 command line arguements provided by the user. 

204 

205 Returns 

206 ------- 

207 str 

208 The output printed to the terminal. 

209 """ 

210 metafile = args.metafile 

211 meta = pyben.load(metafile) 

212 data = meta["info"] 

213 del meta["info"] 

214 

215 meta.update(data) 

216 if "private" in meta and meta["private"] == 1: 

217 meta["private"] = "True" 

218 

219 if "announce-list" in meta: 

220 lst = meta["announce-list"] 

221 meta["announce-list"] = ", ".join([j for i in lst for j in i]) 

222 

223 if "url-list" in meta: 

224 meta["url-list"] = ", ".join(meta["url-list"]) 

225 

226 if "httpseeds" in meta: 

227 meta["httpseeds"] = ", ".join(meta["httpseeds"]) 

228 

229 text = [] 

230 longest = max(len(i) for i in meta.keys()) 

231 

232 for key, val in meta.items(): 

233 if key not in ["pieces", "piece layers", "files", "file tree"]: 

234 prefix = longest - len(key) + 1 

235 string = key + (" " * prefix) + str(val) 

236 text.append(string) 

237 

238 most = max(len(i) for i in text) 

239 text = ["-" * most, "\n"] + text + ["\n", "-" * most] 

240 output = "\n".join(text) 

241 sys.stdout.write(output) 

242 sys.stdout.flush() 

243 return output 

244 

245 

246def edit(args: Namespace) -> str: 

247 """ 

248 Execute the edit CLI sub-command with provided arguments. 

249 

250 Provides functionality that can change the details of a torrentfile 

251 that preserves all of the hash piece information so as not to break 

252 the torrentfile. 

253 

254 Parameters 

255 ---------- 

256 args : Namespace 

257 positional and optional CLI arguments. 

258 

259 Returns 

260 ------- 

261 str 

262 path to edited torrent file. 

263 """ 

264 metafile = args.metafile 

265 logger.info("Editing %s Meta File", str(args.metafile)) 

266 

267 editargs = { 

268 "url-list": args.url_list, 

269 "httpseeds": args.httpseeds, 

270 "announce": args.announce, 

271 "source": args.source, 

272 "private": args.private, 

273 "comment": args.comment, 

274 } 

275 return edit_torrent(metafile, editargs) 

276 

277 

278def recheck(args: Namespace) -> str: 

279 """ 

280 Execute recheck CLI sub-command. 

281 

282 Checks the piece hashes within a pre-existing torrent file 

283 and does a piece by piece check with the contents of a file 

284 or directory for completeness and validation. 

285 

286 Parameters 

287 ---------- 

288 args : Namespace 

289 positional and optional arguments. 

290 

291 Returns 

292 ------- 

293 str 

294 The percentage of content currently saved to disk. 

295 """ 

296 metafile = args.metafile 

297 content = args.content 

298 

299 if os.path.isdir(metafile): 

300 raise ArgumentError(f"Error: Unable to parse directory {metafile}. " 

301 "Check the order of the parameters.") 

302 

303 logger.debug("Validating %s <---------------> %s contents", metafile, 

304 content) 

305 

306 msg = f"Rechecking {metafile} ...\n" 

307 halfterm = shutil.get_terminal_size().columns / 2 

308 padding = int(halfterm - (len(msg) / 2)) * " " 

309 sys.stdout.write(padding + msg) 

310 

311 checker = Checker(metafile, content) 

312 logger.debug("Completed initialization of the Checker class") 

313 result = checker.results() 

314 

315 message = f"{content} <- {result}% -> {metafile}" 

316 padding = int(halfterm - (len(message) / 2)) * " " 

317 sys.stdout.write(padding + message + "\n") 

318 sys.stdout.flush() 

319 return result 

320 

321 

322def rename(args: Namespace) -> str: 

323 """ 

324 Rename a torrent file to it's original name found in metadata. 

325 

326 Parameters 

327 ---------- 

328 args: Namespace 

329 cli arguments 

330 

331 Returns 

332 ------- 

333 str 

334 renamed file path 

335 """ 

336 target = args.target 

337 if not target or not os.path.exists(target): 

338 raise FileNotFoundError # pragma: nocover 

339 meta = pyben.load(target) 

340 name = meta["info"]["name"] 

341 parent = os.path.dirname(target) 

342 new_path = os.path.join(parent, name + ".torrent") 

343 if os.path.exists(new_path): 

344 raise FileExistsError # pragma: nocover 

345 os.rename(target, new_path) 

346 return new_path 

347 

348 

349def get_magnet(namespace: Namespace) -> str: 

350 """ 

351 Prepare option parameters for retreiving magnet URI. 

352 

353 Parameters 

354 ---------- 

355 namespace: Namespace 

356 command line argument options 

357 

358 Returns 

359 ------- 

360 str 

361 Magnet URI 

362 """ 

363 metafile = namespace.metafile 

364 version = int(namespace.meta_version) 

365 return magnet(metafile, version=version) 

366 

367 

368def magnet(metafile: str, version: int = 0) -> str: 

369 """ 

370 Create a magnet URI from a Bittorrent meta file. 

371 

372 Parameters 

373 ---------- 

374 metafile : str 

375 path to bittorrent file 

376 version: int 

377 version of bittorrent protocol [default=1] 

378 

379 Returns 

380 ------- 

381 str 

382 Magnet URI 

383 """ 

384 if not os.path.exists(metafile): 

385 raise FileNotFoundError(f"No Such File {metafile}") 

386 meta = pyben.load(metafile) 

387 info_dict = meta["info"] 

388 

389 magnet = "magnet:?" 

390 bencoded_info = pyben.dumps(info_dict) 

391 

392 v1 = False 

393 if "meta version" not in info_dict or (version in [1, 3, 0] 

394 and "pieces" in info_dict): 

395 infohash = sha1(bencoded_info).hexdigest() # nosec 

396 magnet += "xt=urn:btih:" + infohash 

397 v1 = True 

398 

399 if "meta version" in info_dict and version != 1: 

400 infohash = sha256(bencoded_info).hexdigest() 

401 if v1: 

402 magnet += "&" 

403 magnet += "xt=urn:btmh:1220" + infohash 

404 

405 magnet += "&dn=" + quote_plus(info_dict["name"]) 

406 

407 if "announce-list" in meta: 

408 announce_args = [ 

409 "&tr=" + quote_plus(url) for urllist in meta["announce-list"] 

410 for url in urllist 

411 ] 

412 elif "announce" in meta: 

413 announce_args = ["&tr=" + quote_plus(meta["announce"])] 

414 else: 

415 announce_args = [""] 

416 

417 trackers = "".join(announce_args) 

418 

419 magnet += trackers if trackers != "&tr=" else "" 

420 

421 logger.info("Created Magnet URI %s", magnet) 

422 sys.stdout.write("\n" + magnet + "\n") 

423 return magnet 

424 

425 

426def rebuild(args: Namespace) -> int: 

427 """ 

428 Attempt to rebuild a torrent based on the a torrent file. 

429 

430 Recursively look through a directory for files that belong in 

431 a given torrent file, and rebuild as much of the torrent file 

432 as possible. Currently only checks if the filename and file 

433 size are a match. 

434 

435 1. Check file hashes to improve accuracy 

436 

437 Parameters 

438 ---------- 

439 args : Namespace 

440 command line arguments including the paths neccessary 

441 

442 Returns 

443 ------- 

444 int 

445 total number of content files copied to the rebuild directory 

446 """ 

447 metafiles = args.metafiles 

448 dest = args.destination 

449 contents = args.contents 

450 for path in [*metafiles, *contents]: 

451 if not os.path.exists(path): 

452 raise FileNotFoundError(path) 

453 assembler = Assembler(metafiles, contents, dest) 

454 return assembler.assemble_torrents() 

455 

456 

457interactive = select_action # for clean import system