- Full API application, streamlined, de-duplication of document handling code into document_utils.py
- Added meta-data fields to DocumentVersion - Docker container to support API
This commit is contained in:
@@ -1,201 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Module for interacting with a user's youtube channel."""
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from pytube import extract, Playlist, request
|
||||
from pytube.helpers import uniqueify
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Channel(Playlist):
|
||||
def __init__(self, url: str, proxies: Optional[Dict[str, str]] = None):
|
||||
"""Construct a :class:`Channel <Channel>`.
|
||||
|
||||
:param str url:
|
||||
A valid YouTube channel URL.
|
||||
:param proxies:
|
||||
(Optional) A dictionary of proxies to use for web requests.
|
||||
"""
|
||||
super().__init__(url, proxies)
|
||||
|
||||
self.channel_uri = extract.channel_name(url)
|
||||
|
||||
self.channel_url = (
|
||||
f"https://www.youtube.com{self.channel_uri}"
|
||||
)
|
||||
|
||||
self.videos_url = self.channel_url + '/videos'
|
||||
self.playlists_url = self.channel_url + '/playlists'
|
||||
self.community_url = self.channel_url + '/community'
|
||||
self.featured_channels_url = self.channel_url + '/channels'
|
||||
self.about_url = self.channel_url + '/about'
|
||||
|
||||
# Possible future additions
|
||||
self._playlists_html = None
|
||||
self._community_html = None
|
||||
self._featured_channels_html = None
|
||||
self._about_html = None
|
||||
|
||||
@property
|
||||
def channel_name(self):
|
||||
"""Get the name of the YouTube channel.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return self.initial_data['metadata']['channelMetadataRenderer']['title']
|
||||
|
||||
@property
|
||||
def channel_id(self):
|
||||
"""Get the ID of the YouTube channel.
|
||||
|
||||
This will return the underlying ID, not the vanity URL.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return self.initial_data['metadata']['channelMetadataRenderer']['externalId']
|
||||
|
||||
@property
|
||||
def vanity_url(self):
|
||||
"""Get the vanity URL of the YouTube channel.
|
||||
|
||||
Returns None if it doesn't exist.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return self.initial_data['metadata']['channelMetadataRenderer'].get('vanityChannelUrl', None) # noqa:E501
|
||||
|
||||
@property
|
||||
def html(self):
|
||||
"""Get the html for the /videos page.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._html:
|
||||
return self._html
|
||||
self._html = request.get(self.videos_url)
|
||||
return self._html
|
||||
|
||||
@property
|
||||
def playlists_html(self):
|
||||
"""Get the html for the /playlists page.
|
||||
|
||||
Currently unused for any functionality.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._playlists_html:
|
||||
return self._playlists_html
|
||||
else:
|
||||
self._playlists_html = request.get(self.playlists_url)
|
||||
return self._playlists_html
|
||||
|
||||
@property
|
||||
def community_html(self):
|
||||
"""Get the html for the /community page.
|
||||
|
||||
Currently unused for any functionality.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._community_html:
|
||||
return self._community_html
|
||||
else:
|
||||
self._community_html = request.get(self.community_url)
|
||||
return self._community_html
|
||||
|
||||
@property
|
||||
def featured_channels_html(self):
|
||||
"""Get the html for the /channels page.
|
||||
|
||||
Currently unused for any functionality.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._featured_channels_html:
|
||||
return self._featured_channels_html
|
||||
else:
|
||||
self._featured_channels_html = request.get(self.featured_channels_url)
|
||||
return self._featured_channels_html
|
||||
|
||||
@property
|
||||
def about_html(self):
|
||||
"""Get the html for the /about page.
|
||||
|
||||
Currently unused for any functionality.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._about_html:
|
||||
return self._about_html
|
||||
else:
|
||||
self._about_html = request.get(self.about_url)
|
||||
return self._about_html
|
||||
|
||||
@staticmethod
|
||||
def _extract_videos(raw_json: str) -> Tuple[List[str], Optional[str]]:
|
||||
"""Extracts videos from a raw json page
|
||||
|
||||
:param str raw_json: Input json extracted from the page or the last
|
||||
server response
|
||||
:rtype: Tuple[List[str], Optional[str]]
|
||||
:returns: Tuple containing a list of up to 100 video watch ids and
|
||||
a continuation token, if more videos are available
|
||||
"""
|
||||
initial_data = json.loads(raw_json)
|
||||
# this is the json tree structure, if the json was extracted from
|
||||
# html
|
||||
try:
|
||||
videos = initial_data["contents"][
|
||||
"twoColumnBrowseResultsRenderer"][
|
||||
"tabs"][1]["tabRenderer"]["content"][
|
||||
"sectionListRenderer"]["contents"][0][
|
||||
"itemSectionRenderer"]["contents"][0][
|
||||
"gridRenderer"]["items"]
|
||||
except (KeyError, IndexError, TypeError):
|
||||
try:
|
||||
# this is the json tree structure, if the json was directly sent
|
||||
# by the server in a continuation response
|
||||
important_content = initial_data[1]['response']['onResponseReceivedActions'][
|
||||
0
|
||||
]['appendContinuationItemsAction']['continuationItems']
|
||||
videos = important_content
|
||||
except (KeyError, IndexError, TypeError):
|
||||
try:
|
||||
# this is the json tree structure, if the json was directly sent
|
||||
# by the server in a continuation response
|
||||
# no longer a list and no longer has the "response" key
|
||||
important_content = initial_data['onResponseReceivedActions'][0][
|
||||
'appendContinuationItemsAction']['continuationItems']
|
||||
videos = important_content
|
||||
except (KeyError, IndexError, TypeError) as p:
|
||||
logger.info(p)
|
||||
return [], None
|
||||
|
||||
try:
|
||||
continuation = videos[-1]['continuationItemRenderer'][
|
||||
'continuationEndpoint'
|
||||
]['continuationCommand']['token']
|
||||
videos = videos[:-1]
|
||||
except (KeyError, IndexError):
|
||||
# if there is an error, no continuation is available
|
||||
continuation = None
|
||||
|
||||
# remove duplicates
|
||||
return (
|
||||
uniqueify(
|
||||
list(
|
||||
# only extract the video ids from the video data
|
||||
map(
|
||||
lambda x: (
|
||||
f"/watch?v="
|
||||
f"{x['gridVideoRenderer']['videoId']}"
|
||||
),
|
||||
videos
|
||||
)
|
||||
),
|
||||
),
|
||||
continuation,
|
||||
)
|
||||
@@ -1,419 +0,0 @@
|
||||
"""Module to download a complete playlist from a youtube channel."""
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from datetime import date, datetime
|
||||
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
from pytube import extract, request, YouTube
|
||||
from pytube.helpers import cache, DeferredGeneratorList, install_proxy, uniqueify
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Playlist(Sequence):
|
||||
"""Load a YouTube playlist with URL"""
|
||||
|
||||
def __init__(self, url: str, proxies: Optional[Dict[str, str]] = None):
|
||||
if proxies:
|
||||
install_proxy(proxies)
|
||||
|
||||
self._input_url = url
|
||||
|
||||
# These need to be initialized as None for the properties.
|
||||
self._html = None
|
||||
self._ytcfg = None
|
||||
self._initial_data = None
|
||||
self._sidebar_info = None
|
||||
|
||||
self._playlist_id = None
|
||||
|
||||
@property
|
||||
def playlist_id(self):
|
||||
"""Get the playlist id.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._playlist_id:
|
||||
return self._playlist_id
|
||||
self._playlist_id = extract.playlist_id(self._input_url)
|
||||
return self._playlist_id
|
||||
|
||||
@property
|
||||
def playlist_url(self):
|
||||
"""Get the base playlist url.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return f"https://www.youtube.com/playlist?list={self.playlist_id}"
|
||||
|
||||
@property
|
||||
def html(self):
|
||||
"""Get the playlist page html.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
if self._html:
|
||||
return self._html
|
||||
self._html = request.get(self.playlist_url)
|
||||
return self._html
|
||||
|
||||
@property
|
||||
def ytcfg(self):
|
||||
"""Extract the ytcfg from the playlist page html.
|
||||
|
||||
:rtype: dict
|
||||
"""
|
||||
if self._ytcfg:
|
||||
return self._ytcfg
|
||||
self._ytcfg = extract.get_ytcfg(self.html)
|
||||
return self._ytcfg
|
||||
|
||||
@property
|
||||
def initial_data(self):
|
||||
"""Extract the initial data from the playlist page html.
|
||||
|
||||
:rtype: dict
|
||||
"""
|
||||
if self._initial_data:
|
||||
return self._initial_data
|
||||
else:
|
||||
self._initial_data = extract.initial_data(self.html)
|
||||
return self._initial_data
|
||||
|
||||
@property
|
||||
def sidebar_info(self):
|
||||
"""Extract the sidebar info from the playlist page html.
|
||||
|
||||
:rtype: dict
|
||||
"""
|
||||
if self._sidebar_info:
|
||||
return self._sidebar_info
|
||||
else:
|
||||
self._sidebar_info = self.initial_data['sidebar'][
|
||||
'playlistSidebarRenderer']['items']
|
||||
return self._sidebar_info
|
||||
|
||||
@property
|
||||
def yt_api_key(self):
|
||||
"""Extract the INNERTUBE_API_KEY from the playlist ytcfg.
|
||||
|
||||
:rtype: str
|
||||
"""
|
||||
return self.ytcfg['INNERTUBE_API_KEY']
|
||||
|
||||
def _paginate(
|
||||
self, until_watch_id: Optional[str] = None
|
||||
) -> Iterable[List[str]]:
|
||||
"""Parse the video links from the page source, yields the /watch?v=
|
||||
part from video link
|
||||
|
||||
:param until_watch_id Optional[str]: YouTube Video watch id until
|
||||
which the playlist should be read.
|
||||
|
||||
:rtype: Iterable[List[str]]
|
||||
:returns: Iterable of lists of YouTube watch ids
|
||||
"""
|
||||
videos_urls, continuation = self._extract_videos(
|
||||
json.dumps(extract.initial_data(self.html))
|
||||
)
|
||||
if until_watch_id:
|
||||
try:
|
||||
trim_index = videos_urls.index(f"/watch?v={until_watch_id}")
|
||||
yield videos_urls[:trim_index]
|
||||
return
|
||||
except ValueError:
|
||||
pass
|
||||
yield videos_urls
|
||||
|
||||
# Extraction from a playlist only returns 100 videos at a time
|
||||
# if self._extract_videos returns a continuation there are more
|
||||
# than 100 songs inside a playlist, so we need to add further requests
|
||||
# to gather all of them
|
||||
if continuation:
|
||||
load_more_url, headers, data = self._build_continuation_url(continuation)
|
||||
else:
|
||||
load_more_url, headers, data = None, None, None
|
||||
|
||||
while load_more_url and headers and data: # there is an url found
|
||||
logger.debug("load more url: %s", load_more_url)
|
||||
# requesting the next page of videos with the url generated from the
|
||||
# previous page, needs to be a post
|
||||
req = request.post(load_more_url, extra_headers=headers, data=data)
|
||||
# extract up to 100 songs from the page loaded
|
||||
# returns another continuation if more videos are available
|
||||
videos_urls, continuation = self._extract_videos(req)
|
||||
if until_watch_id:
|
||||
try:
|
||||
trim_index = videos_urls.index(f"/watch?v={until_watch_id}")
|
||||
yield videos_urls[:trim_index]
|
||||
return
|
||||
except ValueError:
|
||||
pass
|
||||
yield videos_urls
|
||||
|
||||
if continuation:
|
||||
load_more_url, headers, data = self._build_continuation_url(
|
||||
continuation
|
||||
)
|
||||
else:
|
||||
load_more_url, headers, data = None, None, None
|
||||
|
||||
def _build_continuation_url(self, continuation: str) -> Tuple[str, dict, dict]:
|
||||
"""Helper method to build the url and headers required to request
|
||||
the next page of videos
|
||||
|
||||
:param str continuation: Continuation extracted from the json response
|
||||
of the last page
|
||||
:rtype: Tuple[str, dict, dict]
|
||||
:returns: Tuple of an url and required headers for the next http
|
||||
request
|
||||
"""
|
||||
return (
|
||||
(
|
||||
# was changed to this format (and post requests)
|
||||
# between 2021.03.02 and 2021.03.03
|
||||
"https://www.youtube.com/youtubei/v1/browse?key="
|
||||
f"{self.yt_api_key}"
|
||||
),
|
||||
{
|
||||
"X-YouTube-Client-Name": "1",
|
||||
"X-YouTube-Client-Version": "2.20200720.00.02",
|
||||
},
|
||||
# extra data required for post request
|
||||
{
|
||||
"continuation": continuation,
|
||||
"context": {
|
||||
"client": {
|
||||
"clientName": "WEB",
|
||||
"clientVersion": "2.20200720.00.02"
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_videos(raw_json: str) -> Tuple[List[str], Optional[str]]:
|
||||
"""Extracts videos from a raw json page
|
||||
|
||||
:param str raw_json: Input json extracted from the page or the last
|
||||
server response
|
||||
:rtype: Tuple[List[str], Optional[str]]
|
||||
:returns: Tuple containing a list of up to 100 video watch ids and
|
||||
a continuation token, if more videos are available
|
||||
"""
|
||||
initial_data = json.loads(raw_json)
|
||||
try:
|
||||
# this is the json tree structure, if the json was extracted from
|
||||
# html
|
||||
section_contents = initial_data["contents"][
|
||||
"twoColumnBrowseResultsRenderer"][
|
||||
"tabs"][0]["tabRenderer"]["content"][
|
||||
"sectionListRenderer"]["contents"]
|
||||
try:
|
||||
# Playlist without submenus
|
||||
important_content = section_contents[
|
||||
0]["itemSectionRenderer"][
|
||||
"contents"][0]["playlistVideoListRenderer"]
|
||||
except (KeyError, IndexError, TypeError):
|
||||
# Playlist with submenus
|
||||
important_content = section_contents[
|
||||
1]["itemSectionRenderer"][
|
||||
"contents"][0]["playlistVideoListRenderer"]
|
||||
videos = important_content["contents"]
|
||||
except (KeyError, IndexError, TypeError):
|
||||
try:
|
||||
# this is the json tree structure, if the json was directly sent
|
||||
# by the server in a continuation response
|
||||
# no longer a list and no longer has the "response" key
|
||||
important_content = initial_data['onResponseReceivedActions'][0][
|
||||
'appendContinuationItemsAction']['continuationItems']
|
||||
videos = important_content
|
||||
except (KeyError, IndexError, TypeError) as p:
|
||||
logger.info(p)
|
||||
return [], None
|
||||
|
||||
try:
|
||||
continuation = videos[-1]['continuationItemRenderer'][
|
||||
'continuationEndpoint'
|
||||
]['continuationCommand']['token']
|
||||
videos = videos[:-1]
|
||||
except (KeyError, IndexError):
|
||||
# if there is an error, no continuation is available
|
||||
continuation = None
|
||||
|
||||
# remove duplicates
|
||||
return (
|
||||
uniqueify(
|
||||
list(
|
||||
# only extract the video ids from the video data
|
||||
map(
|
||||
lambda x: (
|
||||
f"/watch?v="
|
||||
f"{x['playlistVideoRenderer']['videoId']}"
|
||||
),
|
||||
videos
|
||||
)
|
||||
),
|
||||
),
|
||||
continuation,
|
||||
)
|
||||
|
||||
def trimmed(self, video_id: str) -> Iterable[str]:
|
||||
"""Retrieve a list of YouTube video URLs trimmed at the given video ID
|
||||
|
||||
i.e. if the playlist has video IDs 1,2,3,4 calling trimmed(3) returns
|
||||
[1,2]
|
||||
:type video_id: str
|
||||
video ID to trim the returned list of playlist URLs at
|
||||
:rtype: List[str]
|
||||
:returns:
|
||||
List of video URLs from the playlist trimmed at the given ID
|
||||
"""
|
||||
for page in self._paginate(until_watch_id=video_id):
|
||||
yield from (self._video_url(watch_path) for watch_path in page)
|
||||
|
||||
def url_generator(self):
|
||||
"""Generator that yields video URLs.
|
||||
|
||||
:Yields: Video URLs
|
||||
"""
|
||||
for page in self._paginate():
|
||||
for video in page:
|
||||
yield self._video_url(video)
|
||||
|
||||
@property # type: ignore
|
||||
@cache
|
||||
def video_urls(self) -> DeferredGeneratorList:
|
||||
"""Complete links of all the videos in playlist
|
||||
|
||||
:rtype: List[str]
|
||||
:returns: List of video URLs
|
||||
"""
|
||||
return DeferredGeneratorList(self.url_generator())
|
||||
|
||||
def videos_generator(self):
|
||||
for url in self.video_urls:
|
||||
yield YouTube(url)
|
||||
|
||||
@property
|
||||
def videos(self) -> Iterable[YouTube]:
|
||||
"""Yields YouTube objects of videos in this playlist
|
||||
|
||||
:rtype: List[YouTube]
|
||||
:returns: List of YouTube
|
||||
"""
|
||||
return DeferredGeneratorList(self.videos_generator())
|
||||
|
||||
def __getitem__(self, i: Union[slice, int]) -> Union[str, List[str]]:
|
||||
return self.video_urls[i]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.video_urls)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{repr(self.video_urls)}"
|
||||
|
||||
@property
|
||||
@cache
|
||||
def last_updated(self) -> Optional[date]:
|
||||
"""Extract the date that the playlist was last updated.
|
||||
|
||||
For some playlists, this will be a specific date, which is returned as a datetime
|
||||
object. For other playlists, this is an estimate such as "1 week ago". Due to the
|
||||
fact that this value is returned as a string, pytube does a best-effort parsing
|
||||
where possible, and returns the raw string where it is not possible.
|
||||
|
||||
:return: Date of last playlist update where possible, else the string provided
|
||||
:rtype: datetime.date
|
||||
"""
|
||||
last_updated_text = self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'stats'][2]['runs'][1]['text']
|
||||
try:
|
||||
date_components = last_updated_text.split()
|
||||
month = date_components[0]
|
||||
day = date_components[1].strip(',')
|
||||
year = date_components[2]
|
||||
return datetime.strptime(
|
||||
f"{month} {day:0>2} {year}", "%b %d %Y"
|
||||
).date()
|
||||
except (IndexError, KeyError):
|
||||
return last_updated_text
|
||||
|
||||
@property
|
||||
@cache
|
||||
def title(self) -> Optional[str]:
|
||||
"""Extract playlist title
|
||||
|
||||
:return: playlist title (name)
|
||||
:rtype: Optional[str]
|
||||
"""
|
||||
return self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'title']['runs'][0]['text']
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'description']['simpleText']
|
||||
|
||||
@property
|
||||
def length(self):
|
||||
"""Extract the number of videos in the playlist.
|
||||
|
||||
:return: Playlist video count
|
||||
:rtype: int
|
||||
"""
|
||||
count_text = self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'stats'][0]['runs'][0]['text']
|
||||
count_text = count_text.replace(',','')
|
||||
return int(count_text)
|
||||
|
||||
@property
|
||||
def views(self):
|
||||
"""Extract view count for playlist.
|
||||
|
||||
:return: Playlist view count
|
||||
:rtype: int
|
||||
"""
|
||||
# "1,234,567 views"
|
||||
views_text = self.sidebar_info[0]['playlistSidebarPrimaryInfoRenderer'][
|
||||
'stats'][1]['simpleText']
|
||||
# "1,234,567"
|
||||
count_text = views_text.split()[0]
|
||||
# "1234567"
|
||||
count_text = count_text.replace(',', '')
|
||||
return int(count_text)
|
||||
|
||||
@property
|
||||
def owner(self):
|
||||
"""Extract the owner of the playlist.
|
||||
|
||||
:return: Playlist owner name.
|
||||
:rtype: str
|
||||
"""
|
||||
return self.sidebar_info[1]['playlistSidebarSecondaryInfoRenderer'][
|
||||
'videoOwner']['videoOwnerRenderer']['title']['runs'][0]['text']
|
||||
|
||||
@property
|
||||
def owner_id(self):
|
||||
"""Extract the channel_id of the owner of the playlist.
|
||||
|
||||
:return: Playlist owner's channel ID.
|
||||
:rtype: str
|
||||
"""
|
||||
return self.sidebar_info[1]['playlistSidebarSecondaryInfoRenderer'][
|
||||
'videoOwner']['videoOwnerRenderer']['title']['runs'][0][
|
||||
'navigationEndpoint']['browseEndpoint']['browseId']
|
||||
|
||||
@property
|
||||
def owner_url(self):
|
||||
"""Create the channel url of the owner of the playlist.
|
||||
|
||||
:return: Playlist owner's channel url.
|
||||
:rtype: str
|
||||
"""
|
||||
return f'https://www.youtube.com/channel/{self.owner_id}'
|
||||
|
||||
@staticmethod
|
||||
def _video_url(watch_path: str):
|
||||
return f"https://www.youtube.com{watch_path}"
|
||||
@@ -1,225 +0,0 @@
|
||||
"""Module for interacting with YouTube search."""
|
||||
# Native python imports
|
||||
import logging
|
||||
|
||||
# Local imports
|
||||
from pytube import YouTube
|
||||
from pytube.innertube import InnerTube
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Search:
|
||||
def __init__(self, query):
|
||||
"""Initialize Search object.
|
||||
|
||||
:param str query:
|
||||
Search query provided by the user.
|
||||
"""
|
||||
self.query = query
|
||||
self._innertube_client = InnerTube(client='WEB')
|
||||
|
||||
# The first search, without a continuation, is structured differently
|
||||
# and contains completion suggestions, so we must store this separately
|
||||
self._initial_results = None
|
||||
|
||||
self._results = None
|
||||
self._completion_suggestions = None
|
||||
|
||||
# Used for keeping track of query continuations so that new results
|
||||
# are always returned when get_next_results() is called
|
||||
self._current_continuation = None
|
||||
|
||||
@property
|
||||
def completion_suggestions(self):
|
||||
"""Return query autocompletion suggestions for the query.
|
||||
|
||||
:rtype: list
|
||||
:returns:
|
||||
A list of autocomplete suggestions provided by YouTube for the query.
|
||||
"""
|
||||
if self._completion_suggestions:
|
||||
return self._completion_suggestions
|
||||
if self.results:
|
||||
self._completion_suggestions = self._initial_results['refinements']
|
||||
return self._completion_suggestions
|
||||
|
||||
@property
|
||||
def results(self):
|
||||
"""Return search results.
|
||||
|
||||
On first call, will generate and return the first set of results.
|
||||
Additional results can be generated using ``.get_next_results()``.
|
||||
|
||||
:rtype: list
|
||||
:returns:
|
||||
A list of YouTube objects.
|
||||
"""
|
||||
if self._results:
|
||||
return self._results
|
||||
|
||||
videos, continuation = self.fetch_and_parse()
|
||||
self._results = videos
|
||||
self._current_continuation = continuation
|
||||
return self._results
|
||||
|
||||
def get_next_results(self):
|
||||
"""Use the stored continuation string to fetch the next set of results.
|
||||
|
||||
This method does not return the results, but instead updates the results property.
|
||||
"""
|
||||
if self._current_continuation:
|
||||
videos, continuation = self.fetch_and_parse(self._current_continuation)
|
||||
self._results.extend(videos)
|
||||
self._current_continuation = continuation
|
||||
else:
|
||||
raise IndexError
|
||||
|
||||
def fetch_and_parse(self, continuation=None):
|
||||
"""Fetch from the innertube API and parse the results.
|
||||
|
||||
:param str continuation:
|
||||
Continuation string for fetching results.
|
||||
:rtype: tuple
|
||||
:returns:
|
||||
A tuple of a list of YouTube objects and a continuation string.
|
||||
"""
|
||||
# Begin by executing the query and identifying the relevant sections
|
||||
# of the results
|
||||
raw_results = self.fetch_query(continuation)
|
||||
|
||||
# Initial result is handled by try block, continuations by except block
|
||||
try:
|
||||
sections = raw_results['contents']['twoColumnSearchResultsRenderer'][
|
||||
'primaryContents']['sectionListRenderer']['contents']
|
||||
except KeyError:
|
||||
sections = raw_results['onResponseReceivedCommands'][0][
|
||||
'appendContinuationItemsAction']['continuationItems']
|
||||
item_renderer = None
|
||||
continuation_renderer = None
|
||||
for s in sections:
|
||||
if 'itemSectionRenderer' in s:
|
||||
item_renderer = s['itemSectionRenderer']
|
||||
if 'continuationItemRenderer' in s:
|
||||
continuation_renderer = s['continuationItemRenderer']
|
||||
|
||||
# If the continuationItemRenderer doesn't exist, assume no further results
|
||||
if continuation_renderer:
|
||||
next_continuation = continuation_renderer['continuationEndpoint'][
|
||||
'continuationCommand']['token']
|
||||
else:
|
||||
next_continuation = None
|
||||
|
||||
# If the itemSectionRenderer doesn't exist, assume no results.
|
||||
if item_renderer:
|
||||
videos = []
|
||||
raw_video_list = item_renderer['contents']
|
||||
for video_details in raw_video_list:
|
||||
# Skip over ads
|
||||
if video_details.get('searchPyvRenderer', {}).get('ads', None):
|
||||
continue
|
||||
|
||||
# Skip "recommended" type videos e.g. "people also watched" and "popular X"
|
||||
# that break up the search results
|
||||
if 'shelfRenderer' in video_details:
|
||||
continue
|
||||
|
||||
# Skip auto-generated "mix" playlist results
|
||||
if 'radioRenderer' in video_details:
|
||||
continue
|
||||
|
||||
# Skip playlist results
|
||||
if 'playlistRenderer' in video_details:
|
||||
continue
|
||||
|
||||
# Skip channel results
|
||||
if 'channelRenderer' in video_details:
|
||||
continue
|
||||
|
||||
# Skip 'people also searched for' results
|
||||
if 'horizontalCardListRenderer' in video_details:
|
||||
continue
|
||||
|
||||
# Can't seem to reproduce, probably related to typo fix suggestions
|
||||
if 'didYouMeanRenderer' in video_details:
|
||||
continue
|
||||
|
||||
# Seems to be the renderer used for the image shown on a no results page
|
||||
if 'backgroundPromoRenderer' in video_details:
|
||||
continue
|
||||
|
||||
if 'videoRenderer' not in video_details:
|
||||
logger.warn('Unexpected renderer encountered.')
|
||||
logger.warn(f'Renderer name: {video_details.keys()}')
|
||||
logger.warn(f'Search term: {self.query}')
|
||||
logger.warn(
|
||||
'Please open an issue at '
|
||||
'https://github.com/pytube/pytube/issues '
|
||||
'and provide this log output.'
|
||||
)
|
||||
continue
|
||||
|
||||
# Extract relevant video information from the details.
|
||||
# Some of this can be used to pre-populate attributes of the
|
||||
# YouTube object.
|
||||
vid_renderer = video_details['videoRenderer']
|
||||
vid_id = vid_renderer['videoId']
|
||||
vid_url = f'https://www.youtube.com/watch?v={vid_id}'
|
||||
vid_title = vid_renderer['title']['runs'][0]['text']
|
||||
vid_channel_name = vid_renderer['ownerText']['runs'][0]['text']
|
||||
vid_channel_uri = vid_renderer['ownerText']['runs'][0][
|
||||
'navigationEndpoint']['commandMetadata']['webCommandMetadata']['url']
|
||||
# Livestreams have "runs", non-livestreams have "simpleText",
|
||||
# and scheduled releases do not have 'viewCountText'
|
||||
if 'viewCountText' in vid_renderer:
|
||||
if 'runs' in vid_renderer['viewCountText']:
|
||||
vid_view_count_text = vid_renderer['viewCountText']['runs'][0]['text']
|
||||
else:
|
||||
vid_view_count_text = vid_renderer['viewCountText']['simpleText']
|
||||
# Strip ' views' text, then remove commas
|
||||
stripped_text = vid_view_count_text.split()[0].replace(',','')
|
||||
if stripped_text == 'No':
|
||||
vid_view_count = 0
|
||||
else:
|
||||
vid_view_count = int(stripped_text)
|
||||
else:
|
||||
vid_view_count = 0
|
||||
if 'lengthText' in vid_renderer:
|
||||
vid_length = vid_renderer['lengthText']['simpleText']
|
||||
else:
|
||||
vid_length = None
|
||||
|
||||
vid_metadata = {
|
||||
'id': vid_id,
|
||||
'url': vid_url,
|
||||
'title': vid_title,
|
||||
'channel_name': vid_channel_name,
|
||||
'channel_url': vid_channel_uri,
|
||||
'view_count': vid_view_count,
|
||||
'length': vid_length
|
||||
}
|
||||
|
||||
# Construct YouTube object from metadata and append to results
|
||||
vid = YouTube(vid_metadata['url'])
|
||||
vid.author = vid_metadata['channel_name']
|
||||
vid.title = vid_metadata['title']
|
||||
videos.append(vid)
|
||||
else:
|
||||
videos = None
|
||||
|
||||
return videos, next_continuation
|
||||
|
||||
def fetch_query(self, continuation=None):
|
||||
"""Fetch raw results from the innertube API.
|
||||
|
||||
:param str continuation:
|
||||
Continuation string for fetching results.
|
||||
:rtype: dict
|
||||
:returns:
|
||||
The raw json object returned by the innertube API.
|
||||
"""
|
||||
query_results = self._innertube_client.search(self.query, continuation)
|
||||
if not self._initial_results:
|
||||
self._initial_results = query_results
|
||||
return query_results # noqa:R504
|
||||
Reference in New Issue
Block a user