-
Notifications
You must be signed in to change notification settings - Fork 3
/
connection_management.py
151 lines (134 loc) · 5.74 KB
/
connection_management.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import feedparser
import asyncio
import aiohttp
import constants
import urllib
import subprocess
from aiohttp_socks import ProxyConnector
import parser_classes
from aiohttp_socks import ProxyType
import time
import secrets
import os
import shutil
# manages socks5 auths used for Tor stream isolation
class CircuitManager:
def __init__(self, nCircuits = 15, ttl = 600):
self.ttl = ttl
self.nCircuits = 15
self.i = 0
self.expiryTime = 0
def initiateCircuitAuths(self):
self.circuitAuths=[generateNewSocks5Auth() for i in range(self.nCircuits)]
def getAuth(self):
# if ttl is over, reinitiate circuit auth list
if self.expiryTime < time.time():
self.initiateCircuitAuths()
self.expiryTime = time.time() + self.ttl
# circulate over the various auths so that you don't use the same circuit all the
# time
self.i += 1
return self.circuitAuths[self.i%self.nCircuits]
# use this function to generate new socks5 authentication (for tor stream
# isolation)
def generateNewSocks5Auth(userNameLen = 30, passwordLen = 30):
rnd = secrets.SystemRandom()
alphaNumeric = "QWERTYUIOPASDFGHJKLZXCVBNMqwertyuiopasdfghjklzxcvbnm1234567890"
username = "".join([rnd.choice(alphaNumeric) for i in range(userNameLen)])
password = "".join([rnd.choice(alphaNumeric) for i in range(passwordLen)])
return username, password
# use this function to get content (typically hypertext or xml) using HTTP from YouTube
async def getHttpContent(url, useTor, semaphore, auth=None, contentType='text'):
if useTor:
if auth is not None:
username, password = auth
else:
username = None
password = None
connector = ProxyConnector(proxy_type=ProxyType.SOCKS5, host = "127.0.0.1",
port = 9050, username=username, password = password, rdns = True)
else:
connector = None
# This cookie lets us avoid the YouTube consent page
cookies = {'CONSENT':'YES+'}
headers = {'Accept-Language':'en-US'}
await semaphore.acquire()
async with aiohttp.ClientSession(connector=connector, cookies = cookies) as session:
session.headers['Accept-Language']='en-US'
async with session.get(url, headers=headers) as response:
if contentType == 'text':
result = await response.text()
elif contentType == 'bytes':
result = await response.read()
else:
raise ValueError(f"unknown content type: {contentType}")
semaphore.release()
return result
# if you have a channel id, you can use this function to get the rss address
def getRssAddressFromChannelId(channelId):
return f"https://www.youtube.com/feeds/videos.xml?channel_id={channelId}"
# use this function to get a list of query results from searching for a channel
# results are of the type ChannelQueryObject
async def getChannelQueryResults(query, useTor=False, auth=None):
url = 'https://youtube.com/results?search_query=' + urllib.parse.quote(query) + \
'&sp=EgIQAg%253D%253D'
semaphore = asyncio.Semaphore(constants.MAX_CONNECTIONS)
getTask = asyncio.create_task(getHttpContent(url, useTor=useTor, semaphore=semaphore,
auth=auth))
htmlContent = await getTask
parser = parser_classes.ChannelQueryParser()
parser.feed(htmlContent)
return parser.resultList
# use this function to get a list of query results from searching for a video
# results are of the type VideoQueryObject
async def getVideoQueryResults(query, useTor=False, auth=None):
url = 'https://youtube.com/results?search_query=' + urllib.parse.quote(query) + \
'&sp=EgIQAQ%253D%253D'
semaphore = asyncio.Semaphore(constants.MAX_CONNECTIONS)
getTask = asyncio.create_task(getHttpContent(url, semaphore=semaphore, useTor=useTor,
auth=auth))
htmlContent = await getTask
parser = parser_classes.VideoQueryParser()
parser.feed(htmlContent)
return parser.resultList
# use this function to get rss entries from channel id
async def getRssEntriesFromChannelId(channelId, semaphore, useTor=False, auth=None):
rssAddress = getRssAddressFromChannelId(channelId)
getTask = asyncio.create_task(getHttpContent(rssAddress, useTor, semaphore=semaphore,
auth=auth))
rssContent = await getTask
entries = feedparser.parse(rssContent)['entries']
return entries
# use this function to open a YouTube video url in mpv
def openUrlInMpv(url, useTor=False, maxResolution=1080, circuitManager = None):
try:
command = []
if useTor:
auth = circuitManager.getAuth()
command.append('torsocks')
command.append('-u')
command.append(auth[0])
command.append('-p')
command.append(auth[1])
command += ['mpv', \
f'--ytdl-format=bestvideo[height=?{maxResolution}]+bestaudio/best']
command.append(url)
mpvProcess = subprocess.Popen(command, stdout = subprocess.DEVNULL,
stderr = subprocess.STDOUT)
mpvProcess.wait()
result = mpvProcess.poll()
except KeyboardInterrupt:
mpvProcess.kill()
mpvProcess.wait()
result = -1
return result == 0
# use this function to get the data we care about from the entries found by the RSS parser
def getRelevantDictFromFeedParserDict(feedparserDict):
outputDict = {
'id' : feedparserDict['id'],
'link' : feedparserDict['link'],
'title' : feedparserDict['title'],
'thumbnail' : feedparserDict['media_thumbnail'][0]['url'],
'seen' : False
}
return outputDict