Coverage for torrentfile\mixins.py: 100%

107 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-08-27 21:50 -0700

1#! /usr/bin/python3 

2# -*- coding: utf-8 -*- 

3 

4############################################################################## 

5# Copyright (C) 2021-current alexpdev 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); 

8# you may not use this file except in compliance with the License. 

9# You may obtain a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, 

15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

16# See the License for the specific language governing permissions and 

17# limitations under the License. 

18############################################################################## 

19""" 

20Collection of classes that can be used as Mixins with other base classes. 

21 

22Classes such as TorrentFile, TorrentFilev2, and all Hasher classes can use the 

23progress bar mixin. And any class is eligible to use the callback mixin. 

24""" 

25import os 

26import sys 

27import math 

28import time 

29import shutil 

30from pathlib import Path 

31 

32from torrentfile.utils import debug_is_on, green 

33 

34 

35class CbMixin: 

36 """ 

37 Mixin class to set a callback hook during procedure. 

38 """ 

39 

40 @classmethod 

41 def cb(cls, *args, **kwargs): 

42 """Do nothing.""" 

43 

44 @classmethod 

45 def set_callback(cls, func): 

46 """ 

47 Assign a callback to the Hashing class. 

48 

49 Parameters 

50 ---------- 

51 func : Callable 

52 the callback function 

53 """ 

54 cls.cb = func # pragma: nocover 

55 

56 

57class ProgressBar: 

58 """ 

59 Holds the state and details of the terminal progress bars. 

60 

61 Parameters 

62 ---------- 

63 total : int 

64 the total amount to be accumulated. 

65 title : str 

66 the subject of the progress tracker 

67 length : int 

68 the width of the progress bar 

69 unit : str 

70 the text representation incremented 

71 start : int 

72 column where the progress bar should be drawn 

73 """ 

74 

75 def __init__(self, total: int, title: str, length: int, unit: str, 

76 start: int): 

77 """ 

78 Construct the progress bar object and store state of it's properties. 

79 """ 

80 debug_is_on() 

81 self.total = total 

82 self.start = start 

83 self.length = length 

84 self.fill = chr(9608) 

85 self.empty = chr(9617) 

86 self.state = 0 

87 self.unit = unit 

88 self.show_total = total 

89 if not unit: 

90 self.unit = "" # pragma: nocover 

91 elif unit == "bytes": 

92 if self.total > 1_000_000_000: 

93 self.show_total = self.total / (2**30) 

94 self.unit = "GiB" 

95 elif self.total > 1_000_000: 

96 self.show_total = self.total / 1048576 

97 self.unit = "MiB" 

98 elif self.total > 10000: 

99 self.show_total = self.total / 1024 

100 self.unit = "KiB" 

101 self.suffix = f"/{self.show_total:.02f} {self.unit}" 

102 title = str(title) 

103 if len(title) > start: 

104 title = title[:start - 1] # pragma: nocover 

105 padding = (start - len(title)) * " " 

106 self.prefix = "".join([title, padding]) 

107 

108 def get_progress(self) -> str: 

109 """ 

110 Return the size of the filled portion of the progress bar. 

111 

112 Returns 

113 ------- 

114 str : 

115 the progress bar characters 

116 """ 

117 if self.state >= self.total: 

118 fill = self.length 

119 else: 

120 fill = math.ceil((self.state / self.total) * self.length) 

121 empty = self.length - fill 

122 contents = (self.fill * fill) + (self.empty * empty) 

123 pbar = ["|", green(contents), "| "] 

124 if self.unit == "GiB": 

125 state = self.state / (2**30) 

126 elif self.unit == "MiB": 

127 state = self.state / 1048576 

128 elif self.unit == "KiB": 

129 state = self.state / 1024 

130 else: 

131 state = self.state 

132 pbar.append(f"{state:.02f}") 

133 return "".join(pbar) 

134 

135 @staticmethod 

136 def close_out(): 

137 """ 

138 Finalize the last bits of progress bar. 

139 

140 Increment the terminal by one line leaving the progress bar in place, 

141 and deleting the progress bar object to clear a space for the next one. 

142 """ 

143 sys.stdout.flush() 

144 sys.stdout.write("\n") 

145 

146 def update(self, val: int): 

147 """ 

148 Update progress bar. 

149 

150 Using the value provided, increment the progress bar by that value. 

151 

152 Parameters 

153 ---------- 

154 val : int 

155 the number of bytes count the progress bar should increase. 

156 """ 

157 self.state += val 

158 pbar = self.get_progress() 

159 output = f"{self.prefix}{pbar}{self.suffix}\r" 

160 sys.stdout.write(output) 

161 sys.stdout.flush() 

162 

163 @classmethod 

164 def new(cls, total: int, path: str, length: int = 50, unit: str = "bytes"): 

165 """ 

166 Generate a new progress bar for the given file path. 

167 

168 Parameters 

169 ---------- 

170 total : int 

171 the total amount of units accumulating towards. 

172 path : str 

173 path to file being hashed. 

174 length : int 

175 the number of characters of the actual progress bar. 

176 unit : str 

177 the text representation of the value being measured. 

178 """ 

179 title = path 

180 width = shutil.get_terminal_size().columns 

181 if len(str(title)) >= width // 2: 

182 parts = list(Path(title).parts) 

183 while (len("//".join(parts)) > (width // 2)) and (len(parts) > 0): 

184 del parts[0] 

185 if parts: 

186 title = os.path.join(*parts) 

187 else: 

188 title = os.path.basename(path) # pragma: nocover 

189 length = min(length, width // 2) 

190 start = width - int(length * 1.5) 

191 return cls(total, title, length, unit, start) 

192 

193 

194class ProgMixin: 

195 """ 

196 Progress bar mixin class. 

197 

198 Displays progress of hashing individual files, usefull when hashing 

199 really big files. 

200 """ 

201 

202 class NoProg: 

203 """Stand-in object for when progress mode is set to 0.""" 

204 

205 @staticmethod 

206 def update(value): 

207 """ 

208 Return the value. 

209 

210 Parameters 

211 ---------- 

212 value: int 

213 the input and output 

214 

215 Returns 

216 ------- 

217 int : 

218 same as input 

219 """ 

220 return value 

221 

222 def get_progress_tracker(self, total: int, message: str): 

223 """Return the progress bar object for external management. 

224 

225 Parameters 

226 ---------- 

227 total: int 

228 total size to track 

229 message: str 

230 prompt message for what is being tracked 

231 

232 Returns 

233 ------- 

234 ProgressBar 

235 progress bar object instance 

236 """ 

237 if total < 0: 

238 return self.NoProg() 

239 return ProgressBar.new(total, message) 

240 

241 

242def waiting(msg: str, flag: list, timeout: int = 20): 

243 """ 

244 Show loading message while thread completes processing. 

245 

246 Parameters 

247 ---------- 

248 msg : str 

249 Message string printed before the progress bar 

250 flag : list 

251 Once flag is filled exit loop 

252 timeout : int 

253 max amount of time to run the function. 

254 """ 

255 then = time.time() 

256 codes, fill = list(range(9617, 9620)), chr(9619) 

257 size = idx = 0 

258 total = shutil.get_terminal_size().columns - len(msg) - 20 

259 

260 def output(text: str): 

261 """ 

262 Print parameter message to the console. 

263 

264 Parameters 

265 ---------- 

266 text : str 

267 output message 

268 """ 

269 sys.stdout.write(text) 

270 sys.stdout.flush() 

271 

272 output("\n") 

273 time.sleep(0.16) 

274 while len(flag) == 0: 

275 time.sleep(0.16) 

276 filled = (fill * size) + chr(codes[idx]) + (" " * (total - size)) 

277 output(f"{msg}: {filled}\r") 

278 idx = idx + 1 if idx + 1 < len(codes) else 0 

279 size = size + 1 if size < total else 0 

280 if time.time() - then > timeout: 

281 break 

282 output("\n")