ts 视频下载[通俗易懂]

ts 视频下载[通俗易懂]importurllib.requestimportrequests,os,threadingfromCrypto.CipherimportAESfromsrc.Pacho.moviePa.tsdownloadimportaes_decodeclassm3u8down(object):def__init__(self,url,listheaders,dicheaders):self.url=url#这里的url是index.m3.

大家好,又见面了,我是你们的朋友全栈君。

import urllib.request
import requests, os, threading
from Crypto.Cipher import AES
from src.Pacho.moviePa.tsdownload import aes_decode

class m3u8down(object):
    def __init__(self, url, listheaders, dicheaders):
        self.url = url  # 这里的url是index.m3u8地址
        self.headers = listheaders
        self.header = dicheaders
        self.ts_parts = []
        self.down_path = 'D:/workspace/download/Mp4'
        self.tsthreads = []
        self.key = None

    def aes_decode(self, data, key):
        """AES解密
        :param key:  密钥(16.32)一般16的倍数
        :param data:  要解密的数据
        :return:  处理好的数据
        """
        cryptor = AES.new(key, AES.MODE_CBC, key)
        plain_text = cryptor.decrypt(data)
        return plain_text.rstrip(b'\0')  # .decode("utf-8")

    def to_ts(self):
        requests.packages.urllib3.disable_warnings()
        content_all = requests.get(self.url, verify=False, timeout=200).text
        if "#EXTM3U" not in content_all:
            raise BaseException("非M3U8的链接")
        if "EXT-X-VERSION" in content_all:
            file_line = content_all.split("\n")
            # print(file_line)
            self.get_tsurls(self.url, file_line)

    def get_tsurls(self, m3u8url, lines):
        for index, line in enumerate(lines):  # m3u8文件中有ts,获取ts地址并添加索引
            if "EXTINF" in line:  # 找ts地址
                if "/" not in lines[index + 1]:  # 判断.ts是否是路径 'DjbgADY7468014.ts' or '/20181221/.../VRYKBY4319009.ts'
                    ts_url = m3u8url.rsplit("/", 1)[0] + "/" + lines[index + 1]  # 拼出ts片段的URL
                else:
                    ts_url = m3u8url.rsplit("/", 1)[0] + "/" + lines[index + 1].rsplit("/", 1)[-1]  # 拼出ts片段的URL
                self.ts_parts.append(ts_url)
            if "#EXT-X-KEY" in line:
                # #EXT-X-KEY:METHOD=AES-128,URI="encryption.key"
                key_url = m3u8url.rsplit("/", 1)[0] + "/" + line.split('"')[1]
                self.key = requests.get(url=key_url, timeout=120, headers=self.header).content  # 获取秘钥

    def load_ts(self, ts_url, files, count):
        if self.key:
            self.auto_keydown(ts_url, files, self.header, self.key)
        else:
            self.auto_down(ts_url, files, self.headers)
        print('第 %d/%d 个文件下载完成, 下载地址是%s' % (count, len(self.ts_parts), ts_url))
        count += 1


    def auto_down(self, url, filename, headers):  # 下载失败后,自调用从新下载
        try:
            opener = urllib.request.build_opener()  # 创建opener对象
            opener.addheaders = self.headers
            urllib.request.install_opener(opener)  # 将opener对象装入urllib.request
            urllib.request.urlretrieve(url, filename)
        except Exception as ex:
            # print(ex.args, url)
            return self.auto_down(url, filename, headers)

    def auto_keydown(self, url, filename, headers, key):  # 下载失败后,自调用从新下载
        try:
            response = requests.get(url=url, timeout=120, headers=headers)
            with open(filename, 'ab+') as f:
                data = aes_decode(response.content, key)
                f.write(data)
                f.close()
        except Exception as ex:
            # print(ex.args, url)
            return self.auto_down(url, filename, headers, key)

    def threads(self):
        for i in range(len(self.ts_parts)):
            files = self.down_path + '/' + 'tsm{:0>5}.ts'.format(i)
            if os.path.exists(files):  # 判断文件是已下载,且文件大小变为空。是则结束本次循环,继续循环
                sz = os.path.getsize(files)
                if not sz:
                    os.remove(files)  # 删除空文件
                    print("删除空字节视频文件", files.rsplit("/", 1)[-1])
                else:
                    continue
            t = threading.Thread(target=self.load_ts, args=(self.ts_parts[i], files, i))
            self.tsthreads.append(t)

def main():  # This is m3u8 url
    url = 'https://www.XXXXX.com/20200612/jDCLCWyb/1500kb/hls/index.m3u8'

    hd = [('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1941.0 Safari/537.36')]
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36",
        "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive",
        "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2"} 

    m3u8 = m3u8down(url, hd, headers)
    m3u8.to_ts()
    m3u8.threads()

    for th in m3u8.tsthreads:
        th.start()

    for th in m3u8.tsthreads:
        th.join()

    print("{:-^20}".format("下载结束"))

if __name__ == '__main__':
    main()
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/161206.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号