Iammcqwory commited on
Commit
691060c
·
verified ·
1 Parent(s): 96109e7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +39 -33
app.py CHANGED
@@ -3,7 +3,7 @@ from pytubefix import YouTube
3
  import tempfile
4
  import os
5
  from urllib.error import HTTPError
6
- from typing import Optional, List, Dict
7
  import logging
8
 
9
  # Configure logging
@@ -33,29 +33,35 @@ class VideoDownloader:
33
  st.error(f"Error accessing video: {str(e)}")
34
  return None
35
 
36
- def get_streams(self, yt: YouTube, stream_type: str = 'video') -> Dict:
37
- """Get available streams based on type"""
38
  try:
39
  if stream_type == 'video':
40
- streams = yt.streams.filter(progressive=True, file_extension='mp4').all()
41
- resolutions = sorted(list(set(stream.resolution for stream in streams if stream.resolution)), reverse=True)
42
- return {
43
- 'streams': streams,
44
- 'qualities': resolutions,
45
- 'title': yt.title
46
- }
47
- else: # audio
48
- streams = yt.streams.filter(only_audio=True).all()
49
- bitrates = sorted(list(set(stream.abr for stream in streams if stream.abr)), reverse=True)
50
- return {
51
- 'streams': streams,
52
- 'qualities': bitrates,
53
- 'title': yt.title
54
- }
 
 
 
 
 
 
55
  except Exception as e:
56
- logger.error(f"Error getting streams: {e}")
57
- st.error(f"Error getting available qualities: {str(e)}")
58
- return {'streams': [], 'qualities': [], 'title': ''}
59
 
60
  def download_content(self, stream, title: str, extension: str) -> None:
61
  """Download and provide content to user"""
@@ -109,23 +115,23 @@ def main():
109
  if url:
110
  yt = downloader.get_youtube(url)
111
  if yt:
112
- stream_data = downloader.get_streams(yt, 'video')
113
- if stream_data:
114
- st.success(f"📺 Found: {stream_data['title']}")
115
  quality = st.selectbox(
116
  "Select Video Quality:",
117
- stream_data['qualities']
118
  )
119
 
120
  selected_stream = next(
121
- (s for s in stream_data['streams'] if s.resolution == quality),
122
  None
123
  )
124
 
125
  if selected_stream and st.button("Prepare Download", type="primary"):
126
  downloader.download_content(
127
  selected_stream,
128
- stream_data['title'],
129
  'mp4'
130
  )
131
 
@@ -137,23 +143,23 @@ def main():
137
  if url:
138
  yt = downloader.get_youtube(url)
139
  if yt:
140
- stream_data = downloader.get_streams(yt, 'audio')
141
- if stream_data:
142
- st.success(f"🎵 Found: {stream_data['title']}")
143
  quality = st.selectbox(
144
  "Select Audio Quality:",
145
- stream_data['qualities']
146
  )
147
 
148
  selected_stream = next(
149
- (s for s in stream_data['streams'] if s.abr == quality),
150
  None
151
  )
152
 
153
  if selected_stream and st.button("Prepare Audio Download", type="primary"):
154
  downloader.download_content(
155
  selected_stream,
156
- stream_data['title'],
157
  'mp3'
158
  )
159
 
 
3
  import tempfile
4
  import os
5
  from urllib.error import HTTPError
6
+ from typing import Optional, List, Dict, Tuple
7
  import logging
8
 
9
  # Configure logging
 
33
  st.error(f"Error accessing video: {str(e)}")
34
  return None
35
 
36
+ def process_streams(self, yt: YouTube, stream_type: str = 'video') -> Tuple[List, List, str]:
37
+ """Process and return streams information"""
38
  try:
39
  if stream_type == 'video':
40
+ # Get all progressive MP4 streams
41
+ all_streams = yt.streams.filter(progressive=True, file_extension='mp4').all()
42
+ # Extract unique resolutions
43
+ resolutions = []
44
+ seen = set()
45
+ for stream in all_streams:
46
+ if stream.resolution and stream.resolution not in seen:
47
+ resolutions.append(stream.resolution)
48
+ seen.add(stream.resolution)
49
+ return all_streams, sorted(resolutions, reverse=True), yt.title
50
+ else:
51
+ # Get all audio streams
52
+ all_streams = yt.streams.filter(only_audio=True).all()
53
+ # Extract unique bitrates
54
+ bitrates = []
55
+ seen = set()
56
+ for stream in all_streams:
57
+ if stream.abr and stream.abr not in seen:
58
+ bitrates.append(stream.abr)
59
+ seen.add(stream.abr)
60
+ return all_streams, sorted(bitrates, reverse=True), yt.title
61
  except Exception as e:
62
+ logger.error(f"Error processing streams: {e}")
63
+ st.error(f"Error processing available qualities: {str(e)}")
64
+ return [], [], ""
65
 
66
  def download_content(self, stream, title: str, extension: str) -> None:
67
  """Download and provide content to user"""
 
115
  if url:
116
  yt = downloader.get_youtube(url)
117
  if yt:
118
+ streams, qualities, title = downloader.process_streams(yt, 'video')
119
+ if streams and qualities:
120
+ st.success(f"📺 Found: {title}")
121
  quality = st.selectbox(
122
  "Select Video Quality:",
123
+ qualities
124
  )
125
 
126
  selected_stream = next(
127
+ (s for s in streams if s.resolution == quality),
128
  None
129
  )
130
 
131
  if selected_stream and st.button("Prepare Download", type="primary"):
132
  downloader.download_content(
133
  selected_stream,
134
+ title,
135
  'mp4'
136
  )
137
 
 
143
  if url:
144
  yt = downloader.get_youtube(url)
145
  if yt:
146
+ streams, qualities, title = downloader.process_streams(yt, 'audio')
147
+ if streams and qualities:
148
+ st.success(f"🎵 Found: {title}")
149
  quality = st.selectbox(
150
  "Select Audio Quality:",
151
+ qualities
152
  )
153
 
154
  selected_stream = next(
155
+ (s for s in streams if s.abr == quality),
156
  None
157
  )
158
 
159
  if selected_stream and st.button("Prepare Audio Download", type="primary"):
160
  downloader.download_content(
161
  selected_stream,
162
+ title,
163
  'mp3'
164
  )
165