import requests import json class Aria2Download: def __init__(self): self.api = "http://localhost:6800/jsonrpc" # 消息id,aria2会原样返回这个id,可以自动生成也可以用其他唯一标识 self.id = "QXJpYU5nXzE2NzUxMzUwMDFfMC42Mzc0MDA5MTc2NjAzNDM=" def addUri(self, url, path, file=None, proxy=None): """ 添加任务 :param url: 文件下载地址 :param path: 文件保存路径 :param file: 文件保存名称 :param proxy: 代{过}{滤}理地址 :return: """ data = { "id": self.id, "jsonrpc": "2.0", "method": "aria2.addUri", "params": [[url], {"dir": path, "out": file, "all-proxy": proxy}] } req = requests.post(url=self.api, data=json.dumps(data)) return_json = req.json() req.close() # print("addUri", return_json) return return_json def getGlobalStat(self): """ 获取全部下载信息 :return: """ data = { "jsonrpc": "2.0", "method": "aria2.getGlobalStat", "id": self.id } req = requests.post(url=self.api, data=json.dumps(data)) return_json = req.json() req.close() # print("getGlobalStat", return_json) return return_json def tellActive(self): """ 正在下载 :return: """ data = { "jsonrpc": "2.0", "method": "aria2.tellActive", "id": self.id, "params": [ ["gid", "totalLength", "completedLength", "uploadSpeed", "downloadSpeed", "connections", "numSeeders", "seeder", "status", "errorCode", "verifiedLength", "verifyIntegrityPending", "files", "bittorrent", "infoHash"]] } req = requests.post(url=self.api, data=json.dumps(data)) return_json = req.json() req.close() # print("getGlobalStat", return_json) return return_json def tellWaiting(self): """ 正在等待 :return: """ data = {"jsonrpc": "2.0", "method": "aria2.tellWaiting", "id": self.id, "params": [0, 1000, ["gid", "totalLength", "completedLength", "uploadSpeed", "downloadSpeed", "connections", "numSeeders", "seeder", "status", "errorCode", "verifiedLength", "verifyIntegrityPending"] ] } req = requests.post(url=self.api, data=json.dumps(data)) return_json = req.json() req.close() print("tellWaiting", return_json) return return_json def tellStopped(self): """ 已完成/已停止 :return: """ data = {"jsonrpc": "2.0", "method": "aria2.tellStopped", "id": self.id, "params": [-1, 1000, ["gid", "totalLength", "completedLength", "uploadSpeed", "downloadSpeed", "connections", "numSeeders", "seeder", "status", "errorCode", "verifiedLength", "verifyIntegrityPending"]] } req = requests.post(url=self.api, data=json.dumps(data)) return_json = req.json() req.close() # print("tellStopped", return_json) return return_json def tellStatus(self, gid): """ 任务状态 :param gid: 任务ID :return: """ data = {"jsonrpc": "2.0", "method": "aria2.tellStatus", "id": self.id, "params": [gid]} req = requests.post(url=self.api, data=json.dumps(data)) return_json = req.json() req.close() # print("tellWaiting", return_json) return return_json def removeDownloadResult(self, gid): """ 删除下载结束的任务 :param gid: 任务ID :return: """ data = {"jsonrpc": "2.0", "method": "aria2.removeDownloadResult", "id": self.id, "params": [gid]} req = requests.post(url=self.api, data=json.dumps(data)) return_json = req.json() req.close() # print("removeDownloadResult", return_json) return return_json
python教程
Aria2的API接口进行批量下载Python源码
python教程
51源码
2024-06-27
共人阅读
版权声明:文章搜集于网络,如有侵权请联系本站,转载请说明出处:https://www.51yma.cn/jiaocheng/python/1461.html
文章来源:
下一篇: 返回列表
热门推荐
-
01python中condition条件变量的作用 500
-
02Python自动爬取轻壁纸网站脚本 177
-
03python每日自动批量推送URL到百度站长工具 125
-
04python使用VS接收数据 83
-
05Day2:列表与字典 79