Coverage for torrentfile\mixins.py: 100%
107 statements
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 21:50 -0700
« prev ^ index » next coverage.py v7.3.0, created at 2023-08-27 21:50 -0700
1#! /usr/bin/python3
2# -*- coding: utf-8 -*-
4##############################################################################
5# Copyright (C) 2021-current alexpdev
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18##############################################################################
19"""
20Collection of classes that can be used as Mixins with other base classes.
22Classes such as TorrentFile, TorrentFilev2, and all Hasher classes can use the
23progress bar mixin. And any class is eligible to use the callback mixin.
24"""
25import os
26import sys
27import math
28import time
29import shutil
30from pathlib import Path
32from torrentfile.utils import debug_is_on, green
35class CbMixin:
36 """
37 Mixin class to set a callback hook during procedure.
38 """
40 @classmethod
41 def cb(cls, *args, **kwargs):
42 """Do nothing."""
44 @classmethod
45 def set_callback(cls, func):
46 """
47 Assign a callback to the Hashing class.
49 Parameters
50 ----------
51 func : Callable
52 the callback function
53 """
54 cls.cb = func # pragma: nocover
57class ProgressBar:
58 """
59 Holds the state and details of the terminal progress bars.
61 Parameters
62 ----------
63 total : int
64 the total amount to be accumulated.
65 title : str
66 the subject of the progress tracker
67 length : int
68 the width of the progress bar
69 unit : str
70 the text representation incremented
71 start : int
72 column where the progress bar should be drawn
73 """
75 def __init__(self, total: int, title: str, length: int, unit: str,
76 start: int):
77 """
78 Construct the progress bar object and store state of it's properties.
79 """
80 debug_is_on()
81 self.total = total
82 self.start = start
83 self.length = length
84 self.fill = chr(9608)
85 self.empty = chr(9617)
86 self.state = 0
87 self.unit = unit
88 self.show_total = total
89 if not unit:
90 self.unit = "" # pragma: nocover
91 elif unit == "bytes":
92 if self.total > 1_000_000_000:
93 self.show_total = self.total / (2**30)
94 self.unit = "GiB"
95 elif self.total > 1_000_000:
96 self.show_total = self.total / 1048576
97 self.unit = "MiB"
98 elif self.total > 10000:
99 self.show_total = self.total / 1024
100 self.unit = "KiB"
101 self.suffix = f"/{self.show_total:.02f} {self.unit}"
102 title = str(title)
103 if len(title) > start:
104 title = title[:start - 1] # pragma: nocover
105 padding = (start - len(title)) * " "
106 self.prefix = "".join([title, padding])
108 def get_progress(self) -> str:
109 """
110 Return the size of the filled portion of the progress bar.
112 Returns
113 -------
114 str :
115 the progress bar characters
116 """
117 if self.state >= self.total:
118 fill = self.length
119 else:
120 fill = math.ceil((self.state / self.total) * self.length)
121 empty = self.length - fill
122 contents = (self.fill * fill) + (self.empty * empty)
123 pbar = ["|", green(contents), "| "]
124 if self.unit == "GiB":
125 state = self.state / (2**30)
126 elif self.unit == "MiB":
127 state = self.state / 1048576
128 elif self.unit == "KiB":
129 state = self.state / 1024
130 else:
131 state = self.state
132 pbar.append(f"{state:.02f}")
133 return "".join(pbar)
135 @staticmethod
136 def close_out():
137 """
138 Finalize the last bits of progress bar.
140 Increment the terminal by one line leaving the progress bar in place,
141 and deleting the progress bar object to clear a space for the next one.
142 """
143 sys.stdout.flush()
144 sys.stdout.write("\n")
146 def update(self, val: int):
147 """
148 Update progress bar.
150 Using the value provided, increment the progress bar by that value.
152 Parameters
153 ----------
154 val : int
155 the number of bytes count the progress bar should increase.
156 """
157 self.state += val
158 pbar = self.get_progress()
159 output = f"{self.prefix}{pbar}{self.suffix}\r"
160 sys.stdout.write(output)
161 sys.stdout.flush()
163 @classmethod
164 def new(cls, total: int, path: str, length: int = 50, unit: str = "bytes"):
165 """
166 Generate a new progress bar for the given file path.
168 Parameters
169 ----------
170 total : int
171 the total amount of units accumulating towards.
172 path : str
173 path to file being hashed.
174 length : int
175 the number of characters of the actual progress bar.
176 unit : str
177 the text representation of the value being measured.
178 """
179 title = path
180 width = shutil.get_terminal_size().columns
181 if len(str(title)) >= width // 2:
182 parts = list(Path(title).parts)
183 while (len("//".join(parts)) > (width // 2)) and (len(parts) > 0):
184 del parts[0]
185 if parts:
186 title = os.path.join(*parts)
187 else:
188 title = os.path.basename(path) # pragma: nocover
189 length = min(length, width // 2)
190 start = width - int(length * 1.5)
191 return cls(total, title, length, unit, start)
194class ProgMixin:
195 """
196 Progress bar mixin class.
198 Displays progress of hashing individual files, usefull when hashing
199 really big files.
200 """
202 class NoProg:
203 """Stand-in object for when progress mode is set to 0."""
205 @staticmethod
206 def update(value):
207 """
208 Return the value.
210 Parameters
211 ----------
212 value: int
213 the input and output
215 Returns
216 -------
217 int :
218 same as input
219 """
220 return value
222 def get_progress_tracker(self, total: int, message: str):
223 """Return the progress bar object for external management.
225 Parameters
226 ----------
227 total: int
228 total size to track
229 message: str
230 prompt message for what is being tracked
232 Returns
233 -------
234 ProgressBar
235 progress bar object instance
236 """
237 if total < 0:
238 return self.NoProg()
239 return ProgressBar.new(total, message)
242def waiting(msg: str, flag: list, timeout: int = 20):
243 """
244 Show loading message while thread completes processing.
246 Parameters
247 ----------
248 msg : str
249 Message string printed before the progress bar
250 flag : list
251 Once flag is filled exit loop
252 timeout : int
253 max amount of time to run the function.
254 """
255 then = time.time()
256 codes, fill = list(range(9617, 9620)), chr(9619)
257 size = idx = 0
258 total = shutil.get_terminal_size().columns - len(msg) - 20
260 def output(text: str):
261 """
262 Print parameter message to the console.
264 Parameters
265 ----------
266 text : str
267 output message
268 """
269 sys.stdout.write(text)
270 sys.stdout.flush()
272 output("\n")
273 time.sleep(0.16)
274 while len(flag) == 0:
275 time.sleep(0.16)
276 filled = (fill * size) + chr(codes[idx]) + (" " * (total - size))
277 output(f"{msg}: {filled}\r")
278 idx = idx + 1 if idx + 1 < len(codes) else 0
279 size = size + 1 if size < total else 0
280 if time.time() - then > timeout:
281 break
282 output("\n")