Bilibili Download

通过 API 下载视频

Posted by JBNRZ on 2023-02-10
Estimated Reading Time 16 Minutes
Words 2.9k In Total
Viewed Times

依据 API 收集项目 编写

Python Script

  1. 允许下载多P视频
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
import logging
from base64 import b64encode
from concurrent.futures import ThreadPoolExecutor, as_completed
from os import path, system, unlink
from pprint import pformat
from time import sleep
from re import compile, findall
from sys import stdout
from threading import Thread
from typing import TYPE_CHECKING, Union

import loguru
from matplotlib import pyplot
from qrcode import make
from requests import get, post, head, Session
from requests.exceptions import SSLError
from rsa import PublicKey, encrypt
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait

if TYPE_CHECKING:
from loguru import Logger
logger: "Logger" = loguru.logger


class Filter:
def __init__(self) -> None:
self.level: Union[int, str] = "DEBUG"

def __call__(self, record):
module_name: str = "Bilibili"
record["name"] = module_name.split(".")[0]
levelno = (
logger.level(self.level).no if isinstance(self.level, str) else self.level
)
return record["level"].no >= levelno


class LoguruHandler(logging.Handler):
def emit(self, record):
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno

frame, depth = logging.currentframe(), 2
while frame and frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1

logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)


logger.remove()
default_filter: Filter = Filter()
"""默认日志等级过滤器"""
default_format: str = (
"<g>{time:MM-DD HH:mm:ss}</g> "
"[<g><lvl>{level}</lvl></g>] "
"<c><u>{name}</u></c> | "
# "<c>{function}:{line}</c>| "
"{message}"
)
"""默认日志格式"""
logger_id = logger.add(
stdout,
level=0,
colorize=True,
diagnose=False,
filter=default_filter,
format=default_format,
)

__autodoc__ = {"Filter": False, "LoguruHandler": False}
cookie = ''
check = [0, 0]
total = [0, 0]
num = 2


def check_ffmpeg():
if not path.exists('ffmpeg.exe'):
logger.error('未在当前环境下发现 ffmpeg 文件...')
exit(1)


def calc_divisional_range(filesize):
global num
step = filesize // num
arr = list(range(0, filesize, step))
logger.debug(arr)
check[0] = len(arr) - 1
result = []
for i in range(len(arr)-1):
s_pos, e_pos = arr[i], arr[i+1]-1
result.append([s_pos, e_pos])
result[-1][-1] = filesize-1
logger.debug(result)
return result


def range_download(save_name: str, s_pos: int, e_pos: int, url: str):
global check, total
headers = {
"Range": f"bytes={s_pos}-{e_pos}", "Referer": "https://www.bilibili.com",
"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
' Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
}
res = get(url, headers=headers, stream=True)
with open(save_name, "rb+") as f:
f.seek(s_pos)
for chunk in res.iter_content(chunk_size=64*1024):
if chunk:
f.write(chunk)
total[1] += 64 * 1024
check[1] += 1


def progress():
global total
while total[0] >= total[1]:
print('\r', end='')
print(f'Progress {round(total[1] / total[0] * 100, 1)}%', chr(9619) * round(total[1] / total[0] * 100), end='')
stdout.flush()
sleep(0.5)


def download(url: str, filename: str):
global check, total, num
logger.info(f'开始下载文件:{filename}')
headers = {
'Referer': "https://www.bilibili.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
" Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78"
}
open(filename, "wb")
with ThreadPoolExecutor() as p:
futures = []
length = int(head(url, headers=headers).headers['Content-Length'])
total[0] = length
logger.debug(f'文件大小:{length}')
for s_pos, e_pos in calc_divisional_range(length):
logger.debug(f'正在下载块: {s_pos} ~ {e_pos}')
futures.append(p.submit(range_download, filename, s_pos, e_pos, url))
futures.append(p.submit(progress))
as_completed(futures)
if check[1] != check[0]:
print()
check = [0, 0]
total = [0, 0]
logger.error('文件下载不完全,正在尝试重新下载...')
download(url, filename)
else:
print()
check = [0, 0]
total = [0, 0]
logger.debug(f'文件 {filename} 下载完毕')


def pass_geetest(gt: str, challenge: str) -> tuple:
"""
人机验证

:param gt:
:param challenge:
:return:
"""
geetest_url = 'https://kuresaru.github.io/geetest-validator/'
driver = webdriver.Chrome()
driver.get(url=geetest_url)
driver.find_element(By.ID, 'gt').send_keys(gt)
driver.find_element(By.ID, 'challenge').send_keys(challenge)
driver.find_element(By.ID, 'btn-gen').click()
WebDriverWait(driver, timeout=20).until(
lambda d: d.find_element(By.CLASS_NAME, "geetest_success_radar_tip_content"))
driver.find_element(By.CLASS_NAME, 'geetest_radar_tip').click()
WebDriverWait(driver, timeout=20).until(
lambda d: d.find_element(By.CLASS_NAME, "geetest_success_radar_tip_content").text == '验证成功')
driver.find_element(By.ID, 'btn-result').click()
validate = driver.find_element(By.XPATH, '/html/body/div[6]/input').get_attribute('value')
seccode = driver.find_element(By.XPATH, '/html/body/div[7]/input').get_attribute('value')
driver.close()
return validate, seccode


def gt_challenge() -> tuple:
"""
获取 geetest 验证码

:return:
"""
captcha_url = 'https://passport.bilibili.com/x/passport-login/captcha?source=main_web'
response = get(captcha_url).json()['data']
token, gt, challenge = response['token'], response['geetest']['gt'], response['geetest']['challenge']
validate, seccode = pass_geetest(gt, challenge)
return token, challenge, validate, seccode


def salt_pubkey(password: str) -> str:
"""
密码加盐

:param password:
:return:
"""
salt_pubkey_url = 'https://passport.bilibili.com/x/passport-login/web/key'
response = get(salt_pubkey_url).json()
salt, key = response['data']['hash'], response['data']['key']
pubkey = PublicKey.load_pkcs1_openssl_pem(key)
encrypted_pwd = encrypt((salt + password).encode(), pubkey)
encrypted_pwd = b64encode(encrypted_pwd).decode()
return encrypted_pwd


def login_by_pwd(username: str, password: str):
"""
以账号密码的方式登录

:param username:
:param password:
:return:
"""
login_by_pwd_url = 'https://passport.bilibili.com/x/passport-login/web/login'
token, challenge, validate, seccode = gt_challenge()
data = {
'username': username,
'password': salt_pubkey(password),
'keep': 0,
'token': token,
'challenge': challenge,
'validate': validate,
'seccode': seccode,
'go_url': 'https://www.bilibili.com',
'source': 'main_web'
}
logger.debug(pformat(data))
response = post(login_by_pwd_url, data=data).json()
return response


def login_by_phone(phone: int, cid=1):
"""
以手机验证码方式登录

:param phone:
:param cid:
:return:
"""
send_code_url = 'https://passport.bilibili.com/x/passport-login/web/sms/send'
login_by_phone_url = 'https://passport.bilibili.com/x/passport-login/web/login/sms'
token, challenge, validate, seccode = gt_challenge()
data = {
'cid': cid,
'tel': phone,
'source': 'main_web',
'token': token,
'challenge': challenge,
'validate': validate,
'seccode': seccode
}
response = post(send_code_url, data=data).json()
logger.debug(pformat(response))
if int(response['code']) == 0:
data = {
'cid': cid,
'tel': phone,
'code': int(input("code: ")),
'source': 'main_web',
'captcha_key': response['data']['captcha_key'],
'go_url': 'https://www.bilibili.com',
'keep': True
}
response = post(login_by_phone_url, data=data).json()
return response
else:
return response


def login_by_sacn():
"""
以 APP 扫描二维码方式登录

:return:
"""
get_qrcode_url = 'https://passport.bilibili.com/x/passport-login/web/qrcode/generate'
response = get(get_qrcode_url).json()['data']
content, qrcode_key = response['url'], response['qrcode_key']
img = make(content)
Thread(target=scan, args=[pyplot, qrcode_key]).start()
pyplot.imshow(img)
pyplot.show()


def scan(plt: pyplot, key: str):
"""
等待扫码,记录 cookie

:param plt:
:param key:
:return:
"""
scan_url = f'https://passport.bilibili.com/x/passport-login/web/qrcode/poll?qrcode_key={key}'
times = 0
logger.info('请使用手机 APP 扫码...')
while True:
response = get(scan_url).json()
status = response['data']['code']
if status == 86090 and times == 0:
logger.info('已扫码,请确认登陆...')
plt.close()
times += 1
elif status == 86038:
logger.error('二维码已过期,请重新申请二维码...')
exit(0)
elif status == 0:
logger.success('登录成功')
break
logger.debug(pformat(response))
session_pattern = compile('SESSDATA=(.*?)&bili_jct')
open('cookie.txt', 'w').write(findall(session_pattern, response['data']['url'])[0])
logger.info("已缓存 cookie...")
test_cookie()


def test_cookie():
"""
测试缓存 cookie 是否可用

:return:
"""
global cookie
user_space_url = 'https://api.bilibili.com/x/space/myinfo'
if not path.exists('cookie.txt'):
logger.error('请先登录以获取 cookie ...')
login_by_sacn()
else:
cache = open('cookie.txt', 'r', encoding='utf-8').read()
cookies = {
'SESSDATA': cache
}
vip_type = {0: "无", 1: "月度大会员", 2: "年度及以上大会员"}
response = get(user_space_url, cookies=cookies).json()
if response['code'] == 0:
logger.debug('缓存 cookie 可用...')
vip = vip_type[response["data"]["vip"]["type"]]
logger.info(f'当前缓存账户:{response["data"]["name"]};VIP:{vip}')
if vip == "无":
logger.warning("当前账户非大会员账户,将无法使用更高权限的 API...")
cookie = cache
return cache
else:
logger.warning('缓存 cookie 失效,请重新登录...')
login_by_sacn()


def search_by_word(word: str, search_type='video') -> list:
"""
以关键词搜索,获取 bvid

:param word:
:param search_type:
:return:
"""
global cookie
search_url = f'https://api.bilibili.com/x/web-interface/search/type?keyword={word}&search_type={search_type}'
if not cookie:
cookie = test_cookie()
cookies = {"SESSDATA": cookie}
s = Session()
response = s.get('https://bilibili.com/')
while 'Set-cookie' not in response.headers.keys():
s.get('https://bilibili.com/')
logger.debug(response.headers['set-cookie'])
response = s.get(search_url, cookies=cookies).json()
result: list = response['data']['result']
format_result = {}
for i in range(len(result)):
description = result[i]['description'].replace('\n', '\n\t')
format_result[i] = f"\nbvid: {result[i]['bvid']}\n"
format_result[i] += f"title: {result[i]['title']}\n"
format_result[i] += f"tag: {result[i]['tag']}\n"
format_result[i] += f"description: \n\t{description}"
logger.debug(format_result[i])
return result


def get_title(bvid: str) -> str:
global cookie
url = f'https://api.bilibili.com/x/web-interface/view?bvid={bvid}'
cookies = {
"SESSDATA": cookie
}
status = {
0: '成功', -400: '请求错误', -403: '权限不足', -404: '无视频', 62002: '稿件不可见', 62004: '稿件审核中'
}
response = get(url, cookies=cookies).json()
if response['code']:
logger.error(f'当前视频: {bvid} {status[response["code"]]}')
exit(0)
return response['data']['title']


def get_parts_info(bvid: str) -> list:
url = 'https://api.bilibili.com/x/player/pagelist'
response = get(url, params={'bvid': bvid}).json()
return response['data']


def get_download_url(bvid: str, part=0) -> dict:
"""
获取具体的视频音频流 url

三个测试样例:
BV1Xe4y1P7xZ
BV1eV411W7tt
BV1rR4y1y7fG

:param part:
:param bvid:
:return:
"""
global cookie
if not cookie:
cookie = test_cookie()
parts = get_parts_info(bvid)
cookies = {
"SESSDATA": cookie
}
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.61',
'referer': 'https://www.bilibili.com',
'cookies': f'SESSDATA={cookie}'
}
cid = parts[part]['cid']
player_url = f'https://api.bilibili.com/x/player/playurl?bvid={bvid}&cid={cid}&fnver=0&fnval=2000&fourk=1'
response = get(player_url, headers=headers, cookies=cookies).json()['data']['dash']
return extract_url(response)


def extract_url(response: dict) -> dict:
"""
格式化,提取有用的部分

:param response:
:return:
"""

format_data = {'audio': {}, 'video': {}, 'dolby': {}, 'flac': {}}
video_quality = {
6: '12-240P', 16: '11-320P', 32: '10-640P', 64: '09-720P', 74: '08-720P 60', 80: '07-1080P', 112: '06-1080P+',
116: '05-1080P 60', 120: '04-4K 超清', 125: '03-HDR', 126: '02-杜比视界', 127: '01-8K 超清'
}
audio_quality = {
30216: '05-64K', 30232: '04-132K', 30280: '03-192K', 30250: '02-杜比全景声', 30251: '01-Hi-Res 无损'
}
audios, dolby, flac, videos = response['audio'], response['dolby'], response['flac'], response['video']
for audio in audios:
if audio_quality[audio['id']] not in format_data['audio'].keys():
format_data['audio'][audio_quality[audio['id']]] = audio['base_url']
for video in videos:
if video_quality[video['id']] not in format_data['video'].keys():
format_data['video'][video_quality[video['id']]] = video['base_url']
if dolby['type'] != 0:
for i in dolby['audio']:
if audio_quality[i['id']] not in format_data['dolby'].keys():
format_data['dolby'][audio_quality[i['id']]] = i['base_url']
if flac:
format_data['flac'][audio_quality[flac['audio']['id']]] = flac['audio']['base_url']
return format_data


def download_video(bvid: str, part=0):
parts = get_parts_info(bvid)
cid = parts[part]['cid']
if part > len(parts):
logger.error('不存在当前分 P 视频...')
exit(0)
while True:
try:
urls = get_download_url(bvid, part)
break
except SSLError:
logger.warning('访问出错,正在重连...')
audios, videos, dolby, flac = urls['audio'], urls['video'], urls['audio'], urls['flac']
video_choice = sorted(list(videos.keys()))
audio_choice = sorted(list(set(list(audios.keys()) + list(dolby.keys()) + list(flac.keys()))))
logger.info(f'当前存在视频质量: {video_choice}')
logger.info(f'当前存在音频质量: {audio_choice}')
logger.info('默认下载最高质量音视频...')
logger.info(f'开始下载视频 {bvid} {parts[part]["part"]} {video_choice[0]}')
download(videos[video_choice[0]], 'video.m4s')
logger.info(f'开始下载音频 {bvid} {parts[part]["part"]} {audio_choice[0]}')
if audio_choice[0] in audios:
download(audios[audio_choice[0]], 'audio.m4s')
elif audio_choice[0] in dolby:
download(dolby[audio_choice[0]], 'audio.m4s')
else:
download(flac[audio_choice[0]], 'audio.m4s')
check_ffmpeg()
system(f'ffmpeg -i video.m4s -i audio.m4s -c:v copy -c:a copy -f mp4 {bvid}-{cid}.mp4')
unlink('video.m4s')
unlink('audio.m4s')
logger.success("Done...")


def main():
bvid = 'BV1Eb411u7Fw'
download(bvid)


if __name__ == "__main__":
main()

如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !