- Patch Pytube - improve OS deletion of files and writing of files - Start working on Claude - Improve template management
508 lines
15 KiB
Python
508 lines
15 KiB
Python
"""This module is designed to interact with the innertube API.
|
|
|
|
This module is NOT intended to be used directly by end users, as each of the
|
|
interfaces returns raw results. These should instead be parsed to extract
|
|
the useful information for the end user.
|
|
"""
|
|
# Native python imports
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import time
|
|
from urllib import parse
|
|
|
|
# Local imports
|
|
from pytube import request
|
|
|
|
# YouTube on TV client secrets
|
|
_client_id = '861556708454-d6dlm3lh05idd8npek18k6be8ba3oc68.apps.googleusercontent.com'
|
|
_client_secret = 'SboVhoG9s0rNafixCSGGKXAT'
|
|
|
|
# Extracted API keys -- unclear what these are linked to.
|
|
_api_keys = [
|
|
'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8',
|
|
'AIzaSyCtkvNIR1HCEwzsqK6JuE6KqpyjusIRI30',
|
|
'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w',
|
|
'AIzaSyC8UYZpvA2eknNex0Pjid0_eTLJoDu6los',
|
|
'AIzaSyCjc_pVEDi4qsv5MtC2dMXzpIaDoRFLsxw',
|
|
'AIzaSyDHQ9ipnphqTzDqZsbtd8_Ru4_kiKVQe2k'
|
|
]
|
|
|
|
_default_clients = {
|
|
'WEB': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'WEB',
|
|
'clientVersion': '2.20200720.00.02'
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'Mozilla/5.0'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
'ANDROID': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'ANDROID',
|
|
'clientVersion': '17.31.35',
|
|
'androidSdkVersion': 30
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'com.google.android.youtube/',
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
'IOS': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'IOS',
|
|
'clientVersion': '17.33.2',
|
|
'deviceModel': 'iPhone14,3'
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'com.google.ios.youtube/'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
|
|
'WEB_EMBED': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'WEB_EMBEDDED_PLAYER',
|
|
'clientVersion': '2.20210721.00.00',
|
|
'clientScreen': 'EMBED'
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'Mozilla/5.0'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
'ANDROID_EMBED': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'ANDROID_EMBEDDED_PLAYER',
|
|
'clientVersion': '17.31.35',
|
|
'clientScreen': 'EMBED',
|
|
'androidSdkVersion': 30,
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'com.google.android.youtube/'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
'IOS_EMBED': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'IOS_MESSAGES_EXTENSION',
|
|
'clientVersion': '17.33.2',
|
|
'deviceModel': 'iPhone14,3'
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'com.google.ios.youtube/'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
|
|
'WEB_MUSIC': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'WEB_REMIX',
|
|
'clientVersion': '1.20220727.01.00',
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'Mozilla/5.0'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
'ANDROID_MUSIC': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'ANDROID_MUSIC',
|
|
'clientVersion': '5.16.51',
|
|
'androidSdkVersion': 30
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'com.google.android.apps.youtube.music/'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
'IOS_MUSIC': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'IOS_MUSIC',
|
|
'clientVersion': '5.21',
|
|
'deviceModel': 'iPhone14,3'
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'com.google.ios.youtubemusic/'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
|
|
'WEB_CREATOR': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'WEB_CREATOR',
|
|
'clientVersion': '1.20220726.00.00',
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'Mozilla/5.0'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
'ANDROID_CREATOR': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'ANDROID_CREATOR',
|
|
'clientVersion': '22.30.100',
|
|
'androidSdkVersion': 30,
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'com.google.android.apps.youtube.creator/',
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
'IOS_CREATOR': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'IOS_CREATOR',
|
|
'clientVersion': '22.33.101',
|
|
'deviceModel': 'iPhone14,3',
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'com.google.ios.ytcreator/'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
|
|
'MWEB': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'MWEB',
|
|
'clientVersion': '2.20220801.00.00',
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'Mozilla/5.0'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
|
|
'TV_EMBED': {
|
|
'context': {
|
|
'client': {
|
|
'clientName': 'TVHTML5_SIMPLY_EMBEDDED_PLAYER',
|
|
'clientVersion': '2.0',
|
|
}
|
|
},
|
|
'header': {
|
|
'User-Agent': 'Mozilla/5.0'
|
|
},
|
|
'api_key': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8'
|
|
},
|
|
}
|
|
_token_timeout = 1800
|
|
_cache_dir = pathlib.Path(__file__).parent.resolve() / '__cache__'
|
|
_token_file = os.path.join(_cache_dir, 'tokens.json')
|
|
|
|
|
|
class InnerTube:
|
|
"""Object for interacting with the innertube API."""
|
|
def __init__(self, client='ANDROID_MUSIC', use_oauth=False, allow_cache=True):
|
|
"""Initialize an InnerTube object.
|
|
|
|
:param str client:
|
|
Client to use for the object.
|
|
Default to web because it returns the most playback types.
|
|
:param bool use_oauth:
|
|
Whether or not to authenticate to YouTube.
|
|
:param bool allow_cache:
|
|
Allows caching of oauth tokens on the machine.
|
|
"""
|
|
self.context = _default_clients[client]['context']
|
|
self.header = _default_clients[client]['header']
|
|
self.api_key = _default_clients[client]['api_key']
|
|
self.access_token = None
|
|
self.refresh_token = None
|
|
self.use_oauth = use_oauth
|
|
self.allow_cache = allow_cache
|
|
|
|
# Stored as epoch time
|
|
self.expires = None
|
|
|
|
# Try to load from file if specified
|
|
if self.use_oauth and self.allow_cache:
|
|
# Try to load from file if possible
|
|
if os.path.exists(_token_file):
|
|
with open(_token_file) as f:
|
|
data = json.load(f)
|
|
self.access_token = data['access_token']
|
|
self.refresh_token = data['refresh_token']
|
|
self.expires = data['expires']
|
|
self.refresh_bearer_token()
|
|
|
|
def cache_tokens(self):
|
|
"""Cache tokens to file if allowed."""
|
|
if not self.allow_cache:
|
|
return
|
|
|
|
data = {
|
|
'access_token': self.access_token,
|
|
'refresh_token': self.refresh_token,
|
|
'expires': self.expires
|
|
}
|
|
if not os.path.exists(_cache_dir):
|
|
os.mkdir(_cache_dir)
|
|
with open(_token_file, 'w') as f:
|
|
json.dump(data, f)
|
|
|
|
def refresh_bearer_token(self, force=False):
|
|
"""Refreshes the OAuth token if necessary.
|
|
|
|
:param bool force:
|
|
Force-refresh the bearer token.
|
|
"""
|
|
if not self.use_oauth:
|
|
return
|
|
# Skip refresh if it's not necessary and not forced
|
|
if self.expires > time.time() and not force:
|
|
return
|
|
|
|
# Subtracting 30 seconds is arbitrary to avoid potential time discrepencies
|
|
start_time = int(time.time() - 30)
|
|
data = {
|
|
'client_id': _client_id,
|
|
'client_secret': _client_secret,
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': self.refresh_token
|
|
}
|
|
response = request._execute_request(
|
|
'https://oauth2.googleapis.com/token',
|
|
'POST',
|
|
headers={
|
|
'Content-Type': 'application/json'
|
|
},
|
|
data=data
|
|
)
|
|
response_data = json.loads(response.read())
|
|
|
|
self.access_token = response_data['access_token']
|
|
self.expires = start_time + response_data['expires_in']
|
|
self.cache_tokens()
|
|
|
|
def fetch_bearer_token(self):
|
|
"""Fetch an OAuth token."""
|
|
# Subtracting 30 seconds is arbitrary to avoid potential time discrepencies
|
|
start_time = int(time.time() - 30)
|
|
data = {
|
|
'client_id': _client_id,
|
|
'scope': 'https://www.googleapis.com/auth/youtube'
|
|
}
|
|
response = request._execute_request(
|
|
'https://oauth2.googleapis.com/device/code',
|
|
'POST',
|
|
headers={
|
|
'Content-Type': 'application/json'
|
|
},
|
|
data=data
|
|
)
|
|
response_data = json.loads(response.read())
|
|
verification_url = response_data['verification_url']
|
|
user_code = response_data['user_code']
|
|
print(f'Please open {verification_url} and input code {user_code}')
|
|
input('Press enter when you have completed this step.')
|
|
|
|
data = {
|
|
'client_id': _client_id,
|
|
'client_secret': _client_secret,
|
|
'device_code': response_data['device_code'],
|
|
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code'
|
|
}
|
|
response = request._execute_request(
|
|
'https://oauth2.googleapis.com/token',
|
|
'POST',
|
|
headers={
|
|
'Content-Type': 'application/json'
|
|
},
|
|
data=data
|
|
)
|
|
response_data = json.loads(response.read())
|
|
|
|
self.access_token = response_data['access_token']
|
|
self.refresh_token = response_data['refresh_token']
|
|
self.expires = start_time + response_data['expires_in']
|
|
self.cache_tokens()
|
|
|
|
@property
|
|
def base_url(self):
|
|
"""Return the base url endpoint for the innertube API."""
|
|
return 'https://www.youtube.com/youtubei/v1'
|
|
|
|
@property
|
|
def base_data(self):
|
|
"""Return the base json data to transmit to the innertube API."""
|
|
return {
|
|
'context': self.context
|
|
}
|
|
|
|
@property
|
|
def base_params(self):
|
|
"""Return the base query parameters to transmit to the innertube API."""
|
|
return {
|
|
'key': self.api_key,
|
|
'contentCheckOk': True,
|
|
'racyCheckOk': True
|
|
}
|
|
|
|
def _call_api(self, endpoint, query, data):
|
|
"""Make a request to a given endpoint with the provided query parameters and data."""
|
|
# Remove the API key if oauth is being used.
|
|
if self.use_oauth:
|
|
del query['key']
|
|
|
|
endpoint_url = f'{endpoint}?{parse.urlencode(query)}'
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
}
|
|
# Add the bearer token if applicable
|
|
if self.use_oauth:
|
|
if self.access_token:
|
|
self.refresh_bearer_token()
|
|
headers['Authorization'] = f'Bearer {self.access_token}'
|
|
else:
|
|
self.fetch_bearer_token()
|
|
headers['Authorization'] = f'Bearer {self.access_token}'
|
|
|
|
headers.update(self.header)
|
|
|
|
response = request._execute_request(
|
|
endpoint_url,
|
|
'POST',
|
|
headers=headers,
|
|
data=data
|
|
)
|
|
return json.loads(response.read())
|
|
|
|
def browse(self):
|
|
"""Make a request to the browse endpoint.
|
|
|
|
TODO: Figure out how we can use this
|
|
"""
|
|
# endpoint = f'{self.base_url}/browse' # noqa:E800
|
|
...
|
|
# return self._call_api(endpoint, query, self.base_data) # noqa:E800
|
|
|
|
def config(self):
|
|
"""Make a request to the config endpoint.
|
|
|
|
TODO: Figure out how we can use this
|
|
"""
|
|
# endpoint = f'{self.base_url}/config' # noqa:E800
|
|
...
|
|
# return self._call_api(endpoint, query, self.base_data) # noqa:E800
|
|
|
|
def guide(self):
|
|
"""Make a request to the guide endpoint.
|
|
|
|
TODO: Figure out how we can use this
|
|
"""
|
|
# endpoint = f'{self.base_url}/guide' # noqa:E800
|
|
...
|
|
# return self._call_api(endpoint, query, self.base_data) # noqa:E800
|
|
|
|
def next(self):
|
|
"""Make a request to the next endpoint.
|
|
|
|
TODO: Figure out how we can use this
|
|
"""
|
|
# endpoint = f'{self.base_url}/next' # noqa:E800
|
|
...
|
|
# return self._call_api(endpoint, query, self.base_data) # noqa:E800
|
|
|
|
def player(self, video_id):
|
|
"""Make a request to the player endpoint.
|
|
|
|
:param str video_id:
|
|
The video id to get player info for.
|
|
:rtype: dict
|
|
:returns:
|
|
Raw player info results.
|
|
"""
|
|
endpoint = f'{self.base_url}/player'
|
|
query = {
|
|
'videoId': video_id,
|
|
}
|
|
query.update(self.base_params)
|
|
return self._call_api(endpoint, query, self.base_data)
|
|
|
|
def search(self, search_query, continuation=None):
|
|
"""Make a request to the search endpoint.
|
|
|
|
:param str search_query:
|
|
The query to search.
|
|
:rtype: dict
|
|
:returns:
|
|
Raw search query results.
|
|
"""
|
|
endpoint = f'{self.base_url}/search'
|
|
query = {
|
|
'query': search_query
|
|
}
|
|
query.update(self.base_params)
|
|
data = {}
|
|
if continuation:
|
|
data['continuation'] = continuation
|
|
data.update(self.base_data)
|
|
return self._call_api(endpoint, query, data)
|
|
|
|
def verify_age(self, video_id):
|
|
"""Make a request to the age_verify endpoint.
|
|
|
|
Notable examples of the types of video this verification step is for:
|
|
* https://www.youtube.com/watch?v=QLdAhwSBZ3w
|
|
* https://www.youtube.com/watch?v=hc0ZDaAZQT0
|
|
|
|
:param str video_id:
|
|
The video id to get player info for.
|
|
:rtype: dict
|
|
:returns:
|
|
Returns information that includes a URL for bypassing certain restrictions.
|
|
"""
|
|
endpoint = f'{self.base_url}/verify_age'
|
|
data = {
|
|
'nextEndpoint': {
|
|
'urlEndpoint': {
|
|
'url': f'/watch?v={video_id}'
|
|
}
|
|
},
|
|
'setControvercy': True
|
|
}
|
|
data.update(self.base_data)
|
|
result = self._call_api(endpoint, self.base_params, data)
|
|
return result
|
|
|
|
def get_transcript(self, video_id):
|
|
"""Make a request to the get_transcript endpoint.
|
|
|
|
This is likely related to captioning for videos, but is currently untested.
|
|
"""
|
|
endpoint = f'{self.base_url}/get_transcript'
|
|
query = {
|
|
'videoId': video_id,
|
|
}
|
|
query.update(self.base_params)
|
|
result = self._call_api(endpoint, query, self.base_data)
|
|
return result
|