大家好,又见面了,我是你们的朋友全栈君。
import urllib.request import requests, os, threading from Crypto.Cipher import AES from src.Pacho.moviePa.tsdownload import aes_decode class m3u8down(object): def __init__(self, url, listheaders, dicheaders): self.url = url # 这里的url是index.m3u8地址 self.headers = listheaders self.header = dicheaders self.ts_parts = [] self.down_path = 'D:/workspace/download/Mp4' self.tsthreads = [] self.key = None def aes_decode(self, data, key): """AES解密 :param key: 密钥(16.32)一般16的倍数 :param data: 要解密的数据 :return: 处理好的数据 """ cryptor = AES.new(key, AES.MODE_CBC, key) plain_text = cryptor.decrypt(data) return plain_text.rstrip(b'\0') # .decode("utf-8") def to_ts(self): requests.packages.urllib3.disable_warnings() content_all = requests.get(self.url, verify=False, timeout=200).text if "#EXTM3U" not in content_all: raise BaseException("非M3U8的链接") if "EXT-X-VERSION" in content_all: file_line = content_all.split("\n") # print(file_line) self.get_tsurls(self.url, file_line) def get_tsurls(self, m3u8url, lines): for index, line in enumerate(lines): # m3u8文件中有ts,获取ts地址并添加索引 if "EXTINF" in line: # 找ts地址 if "/" not in lines[index + 1]: # 判断.ts是否是路径 'DjbgADY7468014.ts' or '/20181221/.../VRYKBY4319009.ts' ts_url = m3u8url.rsplit("/", 1)[0] + "/" + lines[index + 1] # 拼出ts片段的URL else: ts_url = m3u8url.rsplit("/", 1)[0] + "/" + lines[index + 1].rsplit("/", 1)[-1] # 拼出ts片段的URL self.ts_parts.append(ts_url) if "#EXT-X-KEY" in line: # #EXT-X-KEY:METHOD=AES-128,URI="encryption.key" key_url = m3u8url.rsplit("/", 1)[0] + "/" + line.split('"')[1] self.key = requests.get(url=key_url, timeout=120, headers=self.header).content # 获取秘钥 def load_ts(self, ts_url, files, count): if self.key: self.auto_keydown(ts_url, files, self.header, self.key) else: self.auto_down(ts_url, files, self.headers) print('第 %d/%d 个文件下载完成, 下载地址是%s' % (count, len(self.ts_parts), ts_url)) count += 1 def auto_down(self, url, filename, headers): # 下载失败后,自调用从新下载 try: opener = urllib.request.build_opener() # 创建opener对象 opener.addheaders = self.headers urllib.request.install_opener(opener) # 将opener对象装入urllib.request urllib.request.urlretrieve(url, filename) except Exception as ex: # print(ex.args, url) return self.auto_down(url, filename, headers) def auto_keydown(self, url, filename, headers, key): # 下载失败后,自调用从新下载 try: response = requests.get(url=url, timeout=120, headers=headers) with open(filename, 'ab+') as f: data = aes_decode(response.content, key) f.write(data) f.close() except Exception as ex: # print(ex.args, url) return self.auto_down(url, filename, headers, key) def threads(self): for i in range(len(self.ts_parts)): files = self.down_path + '/' + 'tsm{:0>5}.ts'.format(i) if os.path.exists(files): # 判断文件是已下载,且文件大小变为空。是则结束本次循环,继续循环 sz = os.path.getsize(files) if not sz: os.remove(files) # 删除空文件 print("删除空字节视频文件", files.rsplit("/", 1)[-1]) else: continue t = threading.Thread(target=self.load_ts, args=(self.ts_parts[i], files, i)) self.tsthreads.append(t) def main(): # This is m3u8 url url = 'https://www.XXXXX.com/20200612/jDCLCWyb/1500kb/hls/index.m3u8' hd = [('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1941.0 Safari/537.36')] headers = { "User-Agent": "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2"} m3u8 = m3u8down(url, hd, headers) m3u8.to_ts() m3u8.threads() for th in m3u8.tsthreads: th.start() for th in m3u8.tsthreads: th.join() print("{:-^20}".format("下载结束")) if __name__ == '__main__': main()
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/161206.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...