Update 2020-07-15

简介

由于上个版本需要使用N_m3u8DL-CLI,而N_m3u8DL-CLI只能在 Windows 平台使用。所以我又重写了一份脚本。

环境

  • Windows 10
  • Python 3.7
  • Pycharm 2020.1

食用方法

请安装以下包

  • pip install m3u8
  • pip install pycryptodome - 进入 python 安装目录,如 C:\python37 - 在\Lib\site-packages 目录下找到: - crypto 这个目录重命名为: Crypto
  • pip install natsort
  • pip install dataclasses
  • pip install concurrent
  • pip install glob

打开全网影片搜索,打开某一集(期)的播放页面,复制地址

运行程序,输入地址,根据提示可选择下载的集/期数。

代码

项目地址:Github

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import logging
import os
import random
import re
import time
import urllib
import urllib.request
from urllib.parse import urljoin

import m3u8
import requests
from glob import iglob

from natsort import natsorted
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
# pip3 install pycryptodome
from Crypto.Cipher import AES

uapools = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36 Edg/80.0.361.50",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:60.0) Gecko/20100101 Firefox/60.0",
"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.5",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; .NET4.0E) QQBrowser/6.9.11079.201",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; .NET4.0E; SE 2.X MetaSr 1.0)"
]


class FindM3U8URL(object):

def __init__(self):
self.index_url: str
self.resultOfCurPageUrl = []
self.resultOfCurPageName = []
self.resultName = []

def log(self):
if not os.path.exists(".\\log"):
os.mkdir(".\\log")
timenow = time.strftime("%Y_%m_%d_%H_%M_%S")
logfilename = ".\\log\\" + timenow + ".log"
logging.basicConfig(filename=logfilename, format='%(levelname)s:%(asctime)s:%(message)s', level=logging.INFO,
datefmt='[%d/%b/%Y %H:%M:%S]')

def searchByUrl(self):
if ("html" in self.index_url):
pass
else:
self.index_url = self.index_url + ".html"
patOfUrl = '/vodplay/(.*?)-[0-9]+.html'
self.urlOut = re.compile(patOfUrl, re.S).findall(self.index_url)[0]
logging.info("urlOut:" + self.urlOut)

def randomUA(self):
opener = urllib.request.build_opener()
this_ua = random.choice(uapools)
ua = ("User-Agent", this_ua)
self.uaForN = "User-Agent:" + this_ua
opener.addheaders = [ua]
urllib.request.install_opener(opener)
logging.info("ua:" + str(this_ua))

def searchAllUrl(self):
self.randomUA()
dataOfFirstPage = urllib.request.urlopen(self.index_url, timeout=60).read().decode("utf-8", "ignore")
patOfEveryPage = '<a href="/vodplay/' + self.urlOut + '(.*?)">.*?</a></li>'
patOfName = '<a href="/vodplay/' + self.urlOut + '.*?>(.*?)</a></li>'

self.resultOfPage = re.compile(patOfEveryPage, re.S).findall(dataOfFirstPage)
self.resultOfName = re.compile(patOfName, re.S).findall(dataOfFirstPage)

def findName(self):
self.randomUA()
dataOfFirstPage = urllib.request.urlopen(self.index_url, timeout=60).read().decode("utf-8", "ignore")
pat = "vod_name='(.*?)',"
self.resultName = re.compile(pat, re.S).findall(dataOfFirstPage)[0]

def findM3U8(self):
patOfM3U8 = 'url":"https?:(.*?)","url_next'
self.randomUA()
for i in range(self.startindex - 1, self.endindex):
curPageUrl = "http://lab.liumingye.cn/vodplay/" + self.urlOut + self.resultOfPage[i]
dataOfCurPage = urllib.request.urlopen(curPageUrl).read().decode("utf-8", "ignore")
resultOfCurPage = re.compile(patOfM3U8, re.S).findall(dataOfCurPage)[0]
self.resultOfCurPageUrl.append("http:" + resultOfCurPage.replace('\/', '/'))
self.resultOfCurPageName.append(self.resultOfName[i])

def downloadM3U8(self):
print("共有" + str(len(self.resultOfPage)) + "集/期")
logging.info("共有" + str(len(self.resultOfPage)) + "集/期")
self.startindex = int(input("请输入从第几集开始下载:"))
logging.info(f'从第 {self.startindex} 集/期开始下载')
self.endindex = int(input("请输入下载到第几集:"))
logging.info(f'下载到第 {self.endindex} 集/期')

def run(self):
self.log()
self.searchByUrl()
self.searchAllUrl()
self.downloadM3U8()
self.findName()
self.findM3U8()
return self.resultName, self.resultOfCurPageUrl, self.resultOfCurPageName


@dataclass
class DownLoadM3U8(object):
m3u8_url: str
file_name: str

def __post_init__(self):
self.thread_pool = ThreadPoolExecutor(max_workers=10)
if not self.file_name:
self.file_name = time.strftime("%Y_%m_%d_%H_%M_%S") + '_index.mp4'
self.m3u8_obj = m3u8.load(self.m3u8_url)
self.cryptor = self.get_key()

def randomUA(self):
self.UserAgent = random.choice(uapools)

def get_key(self):
"""
获取key进行解密,这里可以获取method加密方式进行解密
"""
if self.m3u8_obj.keys and self.m3u8_obj.keys[0]:
res = requests.get(self.m3u8_obj.keys[0].absolute_uri, headers={'User-Agent': self.UserAgent})
# AES 解密
return AES.new(res.content, AES.MODE_CBC, res.content)
else:
return None

def get_ts_url(self):
for seg in self.m3u8_obj.segments:
yield urljoin(self.m3u8_obj.base_uri, seg.uri)

def download_ts(self, url_info):
"""
下载ts文件,写入时如果有加密需要解密
"""
url, ts_name = url_info
print(f'download {ts_name} from {url} ')
logging.info(f'download {ts_name} from {url} ')
res = requests.get(url, headers={'User-Agent': self.UserAgent})
with open(ts_name, 'wb') as fp:
if self.cryptor is not None:
fp.write(self.cryptor.decrypt(res.content))
else:
fp.write(res.content)

def download_all_ts(self):
ts_urls = self.get_ts_url()
for index, ts_url in enumerate(ts_urls):
self.thread_pool.submit(self.download_ts, [ts_url, f'{index}.ts'])
self.thread_pool.shutdown()

def run(self):
self.randomUA()
# 如果是第一层M3U8文件,那么就获取第二层的url
if self.m3u8_obj.playlists and self.m3u8_obj.data.get("playlists"):
self.m3u8_url = urljoin(self.m3u8_obj.base_uri, self.m3u8_obj.data.get("playlists")[0]["uri"])
self.__post_init__()
if not self.m3u8_obj.segments or not self.m3u8_obj.files:
raise ValueError("m3u8数据不正确,请检查")
self.download_all_ts()
print("Download ts files completed")
logging.info("Download ts files completed")
ts_path = '*.ts'
with open(self.file_name, 'wb') as fn:
for ts in natsorted(iglob(ts_path)):
with open(ts, 'rb') as ft:
sc_line = ft.read()
fn.write(sc_line)
for ts in iglob(ts_path):
os.remove(ts)
if os.path.exists("key.key"):
os.remove("key.key")
print(f'合并 {self.file_name} 完成')
logging.info(f'合并 {self.file_name} 完成')


if __name__ == '__main__':
m3u8_url: str = input("Please input index url:")
m3u8url = FindM3U8URL()
m3u8url.index_url = m3u8_url
prename, urllist, namelist = m3u8url.run()
if not os.path.exists('Download'):
os.mkdir('Download')
os.chdir('Download')
if not os.path.exists(str(prename)):
os.mkdir(str(prename))
os.chdir(str(prename))
for i in range(0, len(urllist)):
start = time.time()
save_name = prename + namelist[i] + '.mp4'
if os.path.exists(save_name):
print(f'{save_name} 已存在,跳过')
pass
else:
print("正在从" + urllist[i] + "下载" + save_name)
logging.info("正在从" + urllist[i] + "下载" + save_name)
print("请耐心等待!")
print("目录下会生成很多.ts文件,不用担心,下载完成后会自动删除")
print("正在下载,请稍后!")
M3U8 = DownLoadM3U8(urllist[i], save_name)
M3U8.run()
end = time.time()
print(f'下载 {save_name} 共耗时 {end - start} 秒')
logging.info(f'下载 {save_name} 共耗时 {end - start} 秒')
print("感谢使用,再见!")
logging.info("感谢使用,再见!")

致谢