- Improve annotation algorithm for Youtube (and others)
- Patch Pytube - improve OS deletion of files and writing of files - Start working on Claude - Improve template management
This commit is contained in:
419
patched_packages/pytube/contrib/playlist.py
Normal file
419
patched_packages/pytube/contrib/playlist.py
Normal file
@@ -0,0 +1,419 @@
|
||||
"""Module to download a complete playlist from a youtube channel."""
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from datetime import date, datetime
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
from pytube import extract, request, YouTube
|
||||
from pytube.helpers import cache, DeferredGeneratorList, install_proxy, uniqueify
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Playlist(Sequence):
|
||||
"""Load a YouTube playlist with URL"""
|
||||
|
||||
def __init__(self, url: str, proxies: Optional[Dict[str, str]] = None):
|
||||
if proxies:
|
||||
install_proxy(proxies)
|
||||
|
||||
self._input_url = url
|
||||
|
||||
# These need to be initialized as None for the properties.
|
||||
self._html = None
|
||||
self._ytcfg = None
|
||||
self._initial_data = None
|
||||
self._sidebar_info = None
|
||||
|
||||
self._playlist_id = None
|
||||
|
||||
@property
|
||||
def playlist_id(self):
|
||||
"""Get the playlist id.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._playlist_id:
|
||||
return self._playlist_id
|
||||
self._playlist_id = extract.playlist_id(self._input_url)
|
||||
return self._playlist_id
|
||||
|
||||
@property
|
||||
def playlist_url(self):
|
||||
"""Get the base playlist url.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return f"https://www.youtube.com/playlist?list={self.playlist_id}"
|
||||
|
||||
@property
|
||||
def html(self):
|
||||
"""Get the playlist page html.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._html:
|
||||
return self._html
|
||||
self._html = request.get(self.playlist_url)
|
||||
return self._html
|
||||
|
||||
@property
|
||||
def ytcfg(self):
|
||||
"""Extract the ytcfg from the playlist page html.
|
||||
|
||||
:rtype: dict
|
||||
"""
|
||||
if self._ytcfg:
|
||||
return self._ytcfg
|
||||
self._ytcfg = extract.get_ytcfg(self.html)
|
||||
return self._ytcfg
|
||||
|
||||
@property
|
||||
def initial_data(self):
|
||||
"""Extract the initial data from the playlist page html.
|
||||
|
||||
:rtype: dict
|
||||
"""
|
||||
if self._initial_data:
|
||||
return self._initial_data
|
||||
else:
|
||||
self._initial_data = extract.initial_data(self.html)
|
||||
return self._initial_data
|
||||
|
||||
@property
|
||||
def sidebar_info(self):
|
||||
"""Extract the sidebar info from the playlist page html.
|
||||
|
||||
:rtype: dict
|
||||
"""
|
||||
if self._sidebar_info:
|
||||
return self._sidebar_info
|
||||
else:
|
||||
self._sidebar_info = self.initial_data['sidebar'][
|
||||
'playlistSidebarRenderer']['items']
|
||||
return self._sidebar_info
|
||||
|
||||
@property
|
||||
def yt_api_key(self):
|
||||
"""Extract the INNERTUBE_API_KEY from the playlist ytcfg.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return self.ytcfg['INNERTUBE_API_KEY']
|
||||
|
||||
def _paginate(
|
||||
self, until_watch_id: Optional[str] = None
|
||||
) -> Iterable[List[str]]:
|
||||
"""Parse the video links from the page source, yields the /watch?v=
|
||||
part from video link
|
||||
|
||||
:param until_watch_id Optional[str]: YouTube Video watch id until
|
||||
which the playlist should be read.
|
||||
|
||||
:rtype: Iterable[List[str]]
|
||||
:returns: Iterable of lists of YouTube watch ids
|
||||
"""
|
||||
videos_urls, continuation = self._extract_videos(
|
||||
json.dumps(extract.initial_data(self.html))
|
||||
)
|
||||
if until_watch_id:
|
||||
try:
|
||||
trim_index = videos_urls.index(f"/watch?v={until_watch_id}")
|
||||
yield videos_urls[:trim_index]
|
||||
return
|
||||
except ValueError:
|
||||
pass
|
||||
yield videos_urls
|
||||
|
||||
# Extraction from a playlist only returns 100 videos at a time
|
||||
# if self._extract_videos returns a continuation there are more
|
||||
# than 100 songs inside a playlist, so we need to add further requests
|
||||
# to gather all of them
|
||||
if continuation:
|
||||
load_more_url, headers, data = self._build_continuation_url(continuation)
|
||||
else:
|
||||
load_more_url, headers, data = None, None, None
|
||||
|
||||
while load_more_url and headers and data: # there is an url found
|
||||
logger.debug("load more url: %s", load_more_url)
|
||||
# requesting the next page of videos with the url generated from the
|
||||
# previous page, needs to be a post
|
||||
req = request.post(load_more_url, extra_headers=headers, data=data)
|
||||
# extract up to 100 songs from the page loaded
|
||||
# returns another continuation if more videos are available
|
||||
videos_urls, continuation = self._extract_videos(req)
|
||||
if until_watch_id:
|
||||
try:
|
||||
trim_index = videos_urls.index(f"/watch?v={until_watch_id}")
|
||||
yield videos_urls[:trim_index]
|
||||
return
|
||||
except ValueError:
|
||||
pass
|
||||
yield videos_urls
|
||||
|
||||
if continuation:
|
||||
load_more_url, headers, data = self._build_continuation_url(
|
||||
continuation
|
||||
)
|
||||
else:
|
||||
load_more_url, headers, data = None, None, None
|
||||
|
||||
def _build_continuation_url(self, continuation: str) -> Tuple[str, dict, dict]:
|
||||
"""Helper method to build the url and headers required to request
|
||||
the next page of videos
|
||||
|
||||
:param str continuation: Continuation extracted from the json response
|
||||
of the last page
|
||||
:rtype: Tuple[str, dict, dict]
|
||||
:returns: Tuple of an url and required headers for the next http
|
||||
request
|
||||
"""
|
||||
return (
|
||||
(
|
||||
# was changed to this format (and post requests)
|
||||
# between 2021.03.02 and 2021.03.03
|
||||
"https://www.youtube.com/youtubei/v1/browse?key="
|
||||
f"{self.yt_api_key}"
|
||||
),
|
||||
{
|
||||
"X-YouTube-Client-Name": "1",
|
||||
"X-YouTube-Client-Version": "2.20200720.00.02",
|
||||
},
|
||||
# extra data required for post request
|
||||
{
|
||||
"continuation": continuation,
|
||||
"context": {
|
||||
"client": {
|
||||
"clientName": "WEB",
|
||||
"clientVersion": "2.20200720.00.02"
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_videos(raw_json: str) -> Tuple[List[str], Optional[str]]:
|
||||
"""Extracts videos from a raw json page
|
||||
|
||||
:param str raw_json: Input json extracted from the page or the last
|
||||
server response
|
||||
:rtype: Tuple[List[str], Optional[str]]
|
||||
:returns: Tuple containing a list of up to 100 video watch ids and
|
||||
a continuation token, if more videos are available
|
||||
"""
|
||||
initial_data = json.loads(raw_json)
|
||||
try:
|
||||
# this is the json tree structure, if the json was extracted from
|
||||
# html
|
||||
section_contents = initial_data["contents"][
|
||||
"twoColumnBrowseResultsRenderer"][
|
||||
"tabs"][0]["tabRenderer"]["content"][
|
||||
"sectionListRenderer"]["contents"]
|
||||
try:
|
||||
# Playlist without submenus
|
||||
important_content = section_contents[
|
||||
0]["itemSectionRenderer"][
|
||||
"contents"][0]["playlistVideoListRenderer"]
|
||||
except (KeyError, IndexError, TypeError):
|
||||
# Playlist with submenus
|
||||
important_content = section_contents[
|
||||
1]["itemSectionRenderer"][
|
||||
"contents"][0]["playlistVideoListRenderer"]
|
||||
videos = important_content["contents"]
|
||||
except (KeyError, IndexError, TypeError):
|
||||
try:
|
||||
# this is the json tree structure, if the json was directly sent
|
||||
# by the server in a continuation response
|
||||
# no longer a list and no longer has the "response" key
|
||||
important_content = initial_data['onResponseReceivedActions'][0][
|
||||
'appendContinuationItemsAction']['continuationItems']
|
||||
videos = important_content
|
||||
except (KeyError, IndexError, TypeError) as p:
|
||||
logger.info(p)
|
||||
return [], None
|
||||
|
||||
try:
|
||||
continuation = videos[-1]['continuationItemRenderer'][
|
||||
'continuationEndpoint'
|
||||
]['continuationCommand']['token']
|
||||
videos = videos[:-1]
|
||||
except (KeyError, IndexError):
|
||||
# if there is an error, no continuation is available
|
||||
continuation = None
|
||||
|
||||
# remove duplicates
|
||||
return (
|
||||
uniqueify(
|
||||
list(
|
||||
# only extract the video ids from the video data
|
||||
map(
|
||||
lambda x: (
|
||||
f"/watch?v="
|
||||
f"{x['playlistVideoRenderer']['videoId']}"
|
||||
),
|
||||
videos
|
||||
)
|
||||
),
|
||||
),
|
||||
continuation,
|
||||
)
|
||||
|
||||
def trimmed(self, video_id: str) -> Iterable[str]:
|
||||
"""Retrieve a list of YouTube video URLs trimmed at the given video ID
|
||||
|
||||
i.e. if the playlist has video IDs 1,2,3,4 calling trimmed(3) returns
|
||||
[1,2]
|
||||
:type video_id: str
|
||||
video ID to trim the returned list of playlist URLs at
|
||||
:rtype: List[str]
|
||||
:returns:
|
||||
List of video URLs from the playlist trimmed at the given ID
|
||||
"""
|
||||
for page in self._paginate(until_watch_id=video_id):
|
||||
yield from (self._video_url(watch_path) for watch_path in page)
|
||||
|
||||
def url_generator(self):
|
||||
"""Generator that yields video URLs.
|
||||
|
||||
:Yields: Video URLs
|
||||
"""
|
||||
for page in self._paginate():
|
||||
for video in page:
|
||||
yield self._video_url(video)
|
||||
|
||||
@property # type: ignore
|
||||
@cache
|
||||
def video_urls(self) -> DeferredGeneratorList:
|
||||
"""Complete links of all the videos in playlist
|
||||
|
||||
:rtype: List[str]
|
||||
:returns: List of video URLs
|
||||
"""
|
||||
return DeferredGeneratorList(self.url_generator())
|
||||
|
||||
def videos_generator(self):
|
||||
for url in self.video_urls:
|
||||
yield YouTube(url)
|
||||
|
||||
@property
|
||||
def videos(self) -> Iterable[YouTube]:
|
||||
"""Yields YouTube objects of videos in this playlist
|
||||
|
||||
:rtype: List[YouTube]
|
||||
:returns: List of YouTube
|
||||
"""
|
||||
return DeferredGeneratorList(self.videos_generator())
|
||||
|
||||
def __getitem__(self, i: Union[slice, int]) -> Union[str, List[str]]:
|
||||
return self.video_urls[i]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.video_urls)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{repr(self.video_urls)}"
|
||||
|
||||
@property
|
||||
@cache
|
||||
def last_updated(self) -> Optional[date]:
|
||||
"""Extract the date that the playlist was last updated.
|
||||
|
||||
For some playlists, this will be a specific date, which is returned as a datetime
|
||||
object. For other playlists, this is an estimate such as "1 week ago". Due to the
|
||||
fact that this value is returned as a string, pytube does a best-effort parsing
|
||||
where possible, and returns the raw string where it is not possible.
|
||||
|
||||
:return: Date of last playlist update where possible, else the string provided
|
||||
:rtype: datetime.date
|
||||
"""
|
||||
last_updated_text = self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'stats'][2]['runs'][1]['text']
|
||||
try:
|
||||
date_components = last_updated_text.split()
|
||||
month = date_components[0]
|
||||
day = date_components[1].strip(',')
|
||||
year = date_components[2]
|
||||
return datetime.strptime(
|
||||
f"{month} {day:0>2} {year}", "%b %d %Y"
|
||||
).date()
|
||||
except (IndexError, KeyError):
|
||||
return last_updated_text
|
||||
|
||||
@property
|
||||
@cache
|
||||
def title(self) -> Optional[str]:
|
||||
"""Extract playlist title
|
||||
|
||||
:return: playlist title (name)
|
||||
:rtype: Optional[str]
|
||||
"""
|
||||
return self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'title']['runs'][0]['text']
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'description']['simpleText']
|
||||
|
||||
@property
|
||||
def length(self):
|
||||
"""Extract the number of videos in the playlist.
|
||||
|
||||
:return: Playlist video count
|
||||
:rtype: int
|
||||
"""
|
||||
count_text = self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'stats'][0]['runs'][0]['text']
|
||||
count_text = count_text.replace(',','')
|
||||
return int(count_text)
|
||||
|
||||
@property
|
||||
def views(self):
|
||||
"""Extract view count for playlist.
|
||||
|
||||
:return: Playlist view count
|
||||
:rtype: int
|
||||
"""
|
||||
# "1,234,567 views"
|
||||
views_text = self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'stats'][1]['simpleText']
|
||||
# "1,234,567"
|
||||
count_text = views_text.split()[0]
|
||||
# "1234567"
|
||||
count_text = count_text.replace(',', '')
|
||||
return int(count_text)
|
||||
|
||||
@property
|
||||
def owner(self):
|
||||
"""Extract the owner of the playlist.
|
||||
|
||||
:return: Playlist owner name.
|
||||
:rtype: str
|
||||
"""
|
||||
return self.sidebar_info[1]['playlistSidebarSecondaryInfoRenderer'][
|
||||
'videoOwner']['videoOwnerRenderer']['title']['runs'][0]['text']
|
||||
|
||||
@property
|
||||
def owner_id(self):
|
||||
"""Extract the channel_id of the owner of the playlist.
|
||||
|
||||
:return: Playlist owner's channel ID.
|
||||
:rtype: str
|
||||
"""
|
||||
return self.sidebar_info[1]['playlistSidebarSecondaryInfoRenderer'][
|
||||
'videoOwner']['videoOwnerRenderer']['title']['runs'][0][
|
||||
'navigationEndpoint']['browseEndpoint']['browseId']
|
||||
|
||||
@property
|
||||
def owner_url(self):
|
||||
"""Create the channel url of the owner of the playlist.
|
||||
|
||||
:return: Playlist owner's channel url.
|
||||
:rtype: str
|
||||
"""
|
||||
return f'https://www.youtube.com/channel/{self.owner_id}'
|
||||
|
||||
@staticmethod
|
||||
def _video_url(watch_path: str):
|
||||
return f"https://www.youtube.com{watch_path}"
|
||||
Reference in New Issue
Block a user