- Patch Pytube - improve OS deletion of files and writing of files - Start working on Claude - Improve template management
202 lines
6.4 KiB
Python
202 lines
6.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Module for interacting with a user's youtube channel."""
|
|
import json
|
|
import logging
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
from pytube import extract, Playlist, request
|
|
from pytube.helpers import uniqueify
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Channel(Playlist):
|
|
def __init__(self, url: str, proxies: Optional[Dict[str, str]] = None):
|
|
"""Construct a :class:`Channel <Channel>`.
|
|
|
|
:param str url:
|
|
A valid YouTube channel URL.
|
|
:param proxies:
|
|
(Optional) A dictionary of proxies to use for web requests.
|
|
"""
|
|
super().__init__(url, proxies)
|
|
|
|
self.channel_uri = extract.channel_name(url)
|
|
|
|
self.channel_url = (
|
|
f"https://www.youtube.com{self.channel_uri}"
|
|
)
|
|
|
|
self.videos_url = self.channel_url + '/videos'
|
|
self.playlists_url = self.channel_url + '/playlists'
|
|
self.community_url = self.channel_url + '/community'
|
|
self.featured_channels_url = self.channel_url + '/channels'
|
|
self.about_url = self.channel_url + '/about'
|
|
|
|
# Possible future additions
|
|
self._playlists_html = None
|
|
self._community_html = None
|
|
self._featured_channels_html = None
|
|
self._about_html = None
|
|
|
|
@property
|
|
def channel_name(self):
|
|
"""Get the name of the YouTube channel.
|
|
|
|
:rtype: str
|
|
"""
|
|
return self.initial_data['metadata']['channelMetadataRenderer']['title']
|
|
|
|
@property
|
|
def channel_id(self):
|
|
"""Get the ID of the YouTube channel.
|
|
|
|
This will return the underlying ID, not the vanity URL.
|
|
|
|
:rtype: str
|
|
"""
|
|
return self.initial_data['metadata']['channelMetadataRenderer']['externalId']
|
|
|
|
@property
|
|
def vanity_url(self):
|
|
"""Get the vanity URL of the YouTube channel.
|
|
|
|
Returns None if it doesn't exist.
|
|
|
|
:rtype: str
|
|
"""
|
|
return self.initial_data['metadata']['channelMetadataRenderer'].get('vanityChannelUrl', None) # noqa:E501
|
|
|
|
@property
|
|
def html(self):
|
|
"""Get the html for the /videos page.
|
|
|
|
:rtype: str
|
|
"""
|
|
if self._html:
|
|
return self._html
|
|
self._html = request.get(self.videos_url)
|
|
return self._html
|
|
|
|
@property
|
|
def playlists_html(self):
|
|
"""Get the html for the /playlists page.
|
|
|
|
Currently unused for any functionality.
|
|
|
|
:rtype: str
|
|
"""
|
|
if self._playlists_html:
|
|
return self._playlists_html
|
|
else:
|
|
self._playlists_html = request.get(self.playlists_url)
|
|
return self._playlists_html
|
|
|
|
@property
|
|
def community_html(self):
|
|
"""Get the html for the /community page.
|
|
|
|
Currently unused for any functionality.
|
|
|
|
:rtype: str
|
|
"""
|
|
if self._community_html:
|
|
return self._community_html
|
|
else:
|
|
self._community_html = request.get(self.community_url)
|
|
return self._community_html
|
|
|
|
@property
|
|
def featured_channels_html(self):
|
|
"""Get the html for the /channels page.
|
|
|
|
Currently unused for any functionality.
|
|
|
|
:rtype: str
|
|
"""
|
|
if self._featured_channels_html:
|
|
return self._featured_channels_html
|
|
else:
|
|
self._featured_channels_html = request.get(self.featured_channels_url)
|
|
return self._featured_channels_html
|
|
|
|
@property
|
|
def about_html(self):
|
|
"""Get the html for the /about page.
|
|
|
|
Currently unused for any functionality.
|
|
|
|
:rtype: str
|
|
"""
|
|
if self._about_html:
|
|
return self._about_html
|
|
else:
|
|
self._about_html = request.get(self.about_url)
|
|
return self._about_html
|
|
|
|
@staticmethod
|
|
def _extract_videos(raw_json: str) -> Tuple[List[str], Optional[str]]:
|
|
"""Extracts videos from a raw json page
|
|
|
|
:param str raw_json: Input json extracted from the page or the last
|
|
server response
|
|
:rtype: Tuple[List[str], Optional[str]]
|
|
:returns: Tuple containing a list of up to 100 video watch ids and
|
|
a continuation token, if more videos are available
|
|
"""
|
|
initial_data = json.loads(raw_json)
|
|
# this is the json tree structure, if the json was extracted from
|
|
# html
|
|
try:
|
|
videos = initial_data["contents"][
|
|
"twoColumnBrowseResultsRenderer"][
|
|
"tabs"][1]["tabRenderer"]["content"][
|
|
"sectionListRenderer"]["contents"][0][
|
|
"itemSectionRenderer"]["contents"][0][
|
|
"gridRenderer"]["items"]
|
|
except (KeyError, IndexError, TypeError):
|
|
try:
|
|
# this is the json tree structure, if the json was directly sent
|
|
# by the server in a continuation response
|
|
important_content = initial_data[1]['response']['onResponseReceivedActions'][
|
|
0
|
|
]['appendContinuationItemsAction']['continuationItems']
|
|
videos = important_content
|
|
except (KeyError, IndexError, TypeError):
|
|
try:
|
|
# this is the json tree structure, if the json was directly sent
|
|
# by the server in a continuation response
|
|
# no longer a list and no longer has the "response" key
|
|
important_content = initial_data['onResponseReceivedActions'][0][
|
|
'appendContinuationItemsAction']['continuationItems']
|
|
videos = important_content
|
|
except (KeyError, IndexError, TypeError) as p:
|
|
logger.info(p)
|
|
return [], None
|
|
|
|
try:
|
|
continuation = videos[-1]['continuationItemRenderer'][
|
|
'continuationEndpoint'
|
|
]['continuationCommand']['token']
|
|
videos = videos[:-1]
|
|
except (KeyError, IndexError):
|
|
# if there is an error, no continuation is available
|
|
continuation = None
|
|
|
|
# remove duplicates
|
|
return (
|
|
uniqueify(
|
|
list(
|
|
# only extract the video ids from the video data
|
|
map(
|
|
lambda x: (
|
|
f"/watch?v="
|
|
f"{x['gridVideoRenderer']['videoId']}"
|
|
),
|
|
videos
|
|
)
|
|
),
|
|
),
|
|
continuation,
|
|
)
|