Files
anime_downloader/logic_anilife.py

682 lines
26 KiB
Python

import os
import sys
import traceback
import json
from datetime import datetime
import hashlib
import re
import lxml.etree
# third-party
import requests
from lxml import html
from urllib import parse
import urllib
packages = ["beautifulsoup4", "requests-cache", "cloudscraper"]
for package in packages:
try:
import package
except ImportError:
# main(["install", package])
os.system(f"pip install {package}")
import cloudscraper
# third-party
from flask import request, render_template, jsonify
# sjva 공용
from framework import db, scheduler, path_data, socketio
from framework.util import Util
from framework.common.util import headers
from plugin import (
LogicModuleBase,
FfmpegQueueEntity,
FfmpegQueue,
default_route_socketio,
)
from tool_base import d
# 패키지
from .plugin import P
logger = P.logger
# =================================================================#
# 패키지
class LogicAniLife(LogicModuleBase):
db_default = {
"anilife_db_version": "1",
"anilife_url": "https://anilife.live",
"anilife_download_path": os.path.join(path_data, P.package_name, "ohli24"),
"anilife_auto_make_folder": "True",
"anilife_auto_make_season_folder": "True",
"anilife_finished_insert": "[완결]",
"anilife_max_ffmpeg_process_count": "1",
"anilife_order_desc": "False",
"anilife_auto_start": "False",
"anilife_interval": "* 5 * * *",
"anilife_auto_mode_all": "False",
"anilife_auto_code_list": "all",
"anilife_current_code": "",
"anilife_uncompleted_auto_enqueue": "False",
"anilife_image_url_prefix_series": "https://www.jetcloud.cc/series/",
"anilife_image_url_prefix_episode": "https://www.jetcloud-list.cc/thumbnail/",
}
current_headers = None
current_data = None
referer = None
session = requests.Session()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
"Chrome/71.0.3578.98 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
"Referer": "",
"Cookie": ""
# "Cookie": "_ga=GA1.1.578607927.1660813724; __gads=ID=10abb8b98b6828ae-2281c943a9d500fd:T=1660813741:RT=1660813741:S=ALNI_MYU_iB2lBgSrEQUBwhKpNsToaqQ8A; sbtsck=javuwDzcOJqUyweM1OQeNGzHbjoHp7Cgw44XnPdM738c3E=; SPSI=e48379959d54a6a62cc7abdcafdb2761; SPSE=h5HfMGLJzLqzNafMD3YaOvHSC9xfh77CcWdKvexp/z5N5OsTkIiYSCudQhFffEfk/0pcOTVf0DpeV0RoNopzig==; anilife_csrf=b93b9f25a12a51cf185805ec4de7cf9d; UTGv2=h46b326af644f4ac5d0eb1502881136b3750; __gpi=UID=000008ba227e99e0:T=1660813741:RT=1660912282:S=ALNI_MaJHIVJIGpQ5nTE9lvypKQxJnn10A; DSR=SXPX8ELcRgh6N/9rNgjpQoNfaX2DRceeKYR0/ul7qTI9gApWQpZxr8jgymf/r0HsUT551vtOv2CMWpIn0Hd26A==; DCSS=89508000A76BBD939F6DDACE5BD9EB902D2212A; DGCC=Wdm; adOtr=7L4Xe58995d; spcsrf=6554fa003bf6a46dd9b7417acfacc20a; _ga_56VYJJ7FTM=GS1.1.1660912281.10.1.1660912576.0.0.0; PRLST=EO",
}
useragent = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, "
"like Gecko) Chrome/96.0.4664.110 Whale/3.12.129.46 Safari/537.36"
}
def __init__(self, P):
super(LogicAniLife, self).__init__(P, "setting", scheduler_desc="애니라이프 자동 다운로드")
self.name = "anilife"
self.queue = None
default_route_socketio(P, self)
@staticmethod
def get_html(url, referer=None, stream=False, timeout=5):
data = ""
try:
print("cloudflare protection bypass ==================")
# return LogicAniLife.get_html_cloudflare(url)
return LogicAniLife.get_html_selenium(url)
# return LogicAniLife.get_html_playwright(url)
# import browser_cookie3
# cj = browser_cookie3.chrome(domain_name="anilife.live")
referer = "https://anilife.live/"
if LogicAniLife.session is None:
LogicAniLife.session = requests.session()
# logger.debug('get_html :%s', url)
LogicAniLife.headers["Referer"] = "" if referer is None else referer
LogicAniLife.headers[
"Cookie"
] = "_ga=GA1.1.578607927.1660813724; __gads=ID=10abb8b98b6828ae-2281c943a9d500fd:T=1660813741:RT=1660813741:S=ALNI_MYU_iB2lBgSrEQUBwhKpNsToaqQ8A; sbtsck=javuwDzcOJqUyweM1OQeNGzHbjoHp7Cgw44XnPdM738c3E=; SPSI=e48379959d54a6a62cc7abdcafdb2761; SPSE=h5HfMGLJzLqzNafMD3YaOvHSC9xfh77CcWdKvexp/z5N5OsTkIiYSCudQhFffEfk/0pcOTVf0DpeV0RoNopzig==; anilife_csrf=b93b9f25a12a51cf185805ec4de7cf9d; UTGv2=h46b326af644f4ac5d0eb1502881136b3750; __gpi=UID=000008ba227e99e0:T=1660813741:RT=1660912282:S=ALNI_MaJHIVJIGpQ5nTE9lvypKQxJnn10A; DSR=SXPX8ELcRgh6N/9rNgjpQoNfaX2DRceeKYR0/ul7qTI9gApWQpZxr8jgymf/r0HsUT551vtOv2CMWpIn0Hd26A==; DCSS=89508000A76BBD939F6DDACE5BD9EB902D2212A; DGCC=Wdm; adOtr=7L4Xe58995d; spcsrf=6554fa003bf6a46dd9b7417acfacc20a; _ga_56VYJJ7FTM=GS1.1.1660912281.10.1.1660912576.0.0.0; PRLST=EO"
page_content = LogicAniLife.session.get(
url, headers=headers, timeout=timeout, allow_redirects=True
)
data = page_content.text
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
return data
# @staticmethod
# def get_html(url, cached=False):
#
# try:
# print("cloudflare protection bypass ==================")
# return LogicLinkkfYommi.get_html_cloudflare(url)
# # return LogicLinkkfYommi.get_html_playwright(url)
# #
# # if (
# # socket.gethostbyname(socket.gethostname()) == "192.168.0.32"
# # or socket.gethostbyname(socket.gethostname()) == "127.0.0.1"
# # ):
# # print("dev================")
# # # print("test")
# # # import undetected_chromedriver as uc
# # #
# # # driver = uc.Chrome(use_subprocess=True)
# # # driver.get(url)
# #
# # return LogicLinkkfYommi.get_html_cloudflare(url)
#
# if LogicLinkkfYommi.session is None:
# if cached:
# logger.debug("cached===========++++++++++++")
#
# LogicLinkkfYommi.session = CachedSession(
# os.path.join(cache_path, "linkkf_cache"),
# backend="sqlite",
# expire_after=300,
# cache_control=True,
# )
# # print(f"{cache_path}")
# # print(f"cache_path:: {LogicLinkkfYommi.session.cache}")
# else:
# LogicLinkkfYommi.session = requests.Session()
#
# LogicLinkkfYommi.referer = "https://linkkf.app"
#
# LogicLinkkfYommi.headers["referer"] = LogicLinkkfYommi.referer
#
# # logger.debug(
# # f"get_html()::LogicLinkkfYommi.referer = {LogicLinkkfYommi.referer}"
# # )
# page = LogicLinkkfYommi.session.get(url, headers=LogicLinkkfYommi.headers)
# # logger.info(f"page: {page}")
#
# return page.content.decode("utf8", errors="replace")
# # return page.text
# # return page.content
# except Exception as e:
# logger.error("Exception:%s", e)
# logger.error(traceback.format_exc())
@staticmethod
def get_html_playwright(url):
from playwright.sync_api import sync_playwright
import time
# scraper = cloudscraper.create_scraper(
# browser={"browser": "chrome", "platform": "windows", "desktop": True},
# debug=False,
# # sess=LogicAniLife.session,
# delay=10,
# )
#
# cookie_value, user_agent = scraper.get_cookie_string(url)
#
# logger.debug(f"cookie_value:: {cookie_value}")
start = time.time()
ua = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/69.0.3497.100 Safari/537.36"
)
# from playwright_stealth import stealth_sync
with sync_playwright() as p:
browser = p.webkit.launch(headless=True)
context = browser.new_context(
user_agent=ua,
)
LogicAniLife.referer = "https://anilife.live/"
LogicAniLife.headers["Referer"] = LogicAniLife.referer
# LogicAniLife.headers["Cookie"] = cookie_value
print(LogicAniLife.headers)
context.set_extra_http_headers(LogicAniLife.headers)
page = context.new_page()
def set_cookie(req):
if "cookie" in req.headers:
print(req.headers["cookie"])
cookie = req.headers["cookie"]
page.on("request", set_cookie)
# stealth_sync(page)
page.goto(url, wait_until="domcontentloaded")
cookies = context.cookies
print(cookies)
# print(page.content())
print(f"run at {time.time() - start} sec")
return page.content()
@staticmethod
def get_html_selenium(url):
from selenium.webdriver.common.by import By
from selenium import webdriver
from selenium_stealth import stealth
import time
options = webdriver.ChromeOptions()
# 크롬드라이버 헤더 옵션추가 (리눅스에서 실행시 필수)
options.add_argument("start-maximized")
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
# 크롬드라이버 경로
driver_path = "./bin/Darwin/chromedriver"
driver = webdriver.Chrome(executable_path=driver_path, chrome_options=options)
stealth(
driver,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
driver.get(url)
driver.refresh()
time.sleep(1)
elem = driver.find_element(By.XPATH, "//*")
source_code = elem.get_attribute("outerHTML")
return source_code.encode("utf-8")
@staticmethod
def get_html_cloudflare(url, cached=False):
# scraper = cloudscraper.create_scraper(
# # disableCloudflareV1=True,
# # captcha={"provider": "return_response"},
# delay=10,
# browser="chrome",
# )
# scraper = cfscrape.create_scraper(
# browser={"browser": "chrome", "platform": "android", "desktop": False}
# )
# scraper = cloudscraper.create_scraper(
# browser={"browser": "chrome", "platform": "windows", "mobile": False},
# debug=True,
# )
# LogicAniLife.headers["referer"] = LogicAniLife.referer
LogicAniLife.headers["Referer"] = "https://anilife.live/"
LogicAniLife.headers[
"Cookie"
] = "_ga=GA1.1.578607927.1660813724; __gads=ID=10abb8b98b6828ae-2281c943a9d500fd:T=1660813741:RT=1660813741:S=ALNI_MYU_iB2lBgSrEQUBwhKpNsToaqQ8A; sbtsck=javuwDzcOJqUyweM1OQeNGzHbjoHp7Cgw44XnPdM738c3E=; SPSI=e48379959d54a6a62cc7abdcafdb2761; SPSE=h5HfMGLJzLqzNafMD3YaOvHSC9xfh77CcWdKvexp/z5N5OsTkIiYSCudQhFffEfk/0pcOTVf0DpeV0RoNopzig==; anilife_csrf=b93b9f25a12a51cf185805ec4de7cf9d; UTGv2=h46b326af644f4ac5d0eb1502881136b3750; __gpi=UID=000008ba227e99e0:T=1660813741:RT=1660912282:S=ALNI_MaJHIVJIGpQ5nTE9lvypKQxJnn10A; DSR=SXPX8ELcRgh6N/9rNgjpQoNfaX2DRceeKYR0/ul7qTI9gApWQpZxr8jgymf/r0HsUT551vtOv2CMWpIn0Hd26A==; DCSS=89508000A76BBD939F6DDACE5BD9EB902D2212A; DGCC=Wdm; adOtr=7L4Xe58995d; spcsrf=6554fa003bf6a46dd9b7417acfacc20a; _ga_56VYJJ7FTM=GS1.1.1660912281.10.1.1660912576.0.0.0; PRLST=EO"
# logger.debug(f"headers:: {LogicAniLife.headers}")
if LogicAniLife.session is None:
LogicAniLife.session = requests.Session()
LogicAniLife.session.headers = LogicAniLife.headers
# LogicAniLife.session = requests.Session()
sess = cloudscraper.create_scraper(
browser={"browser": "firefox", "platform": "windows", "desktop": True},
debug=False,
sess=LogicAniLife.session,
delay=10,
)
# print(scraper.get(url, headers=LogicAniLife.headers).content)
# print(scraper.get(url).content)
# return scraper.get(url, headers=LogicAniLife.headers).content
print(LogicAniLife.headers)
return sess.get(
url, headers=LogicAniLife.session.headers, timeout=10, allow_redirects=True
).content.decode("utf8", errors="replace")
@staticmethod
def db_init():
pass
def process_menu(self, sub, req):
arg = P.ModelSetting.to_dict()
arg["sub"] = self.name
if sub in ["setting", "queue", "list", "category", "request"]:
if sub == "setting":
job_id = "%s_%s" % (self.P.package_name, self.name)
arg["scheduler"] = str(scheduler.is_include(job_id))
arg["is_running"] = str(scheduler.is_running(job_id))
return render_template(
"{package_name}_{module_name}_{sub}.html".format(
package_name=P.package_name, module_name=self.name, sub=sub
),
arg=arg,
)
return render_template("sample.html", title="%s - %s" % (P.package_name, sub))
def process_ajax(self, sub, req):
try:
if sub == "analysis":
# code = req.form['code']
logger.debug(req)
code = request.form["code"]
wr_id = request.form.get("wr_id", None)
bo_table = request.form.get("bo_table", None)
data = []
# logger.info("code::: %s", code)
P.ModelSetting.set("anilife_current_code", code)
data = self.get_series_info(code, wr_id, bo_table)
self.current_data = data
return jsonify({"ret": "success", "data": data, "code": code})
elif sub == "anime_list":
data = []
cate = request.form["type"]
page = request.form["page"]
data = self.get_anime_info(cate, page)
# self.current_data = data
return jsonify(
{"ret": "success", "cate": cate, "page": page, "data": data}
)
elif sub == "add_queue":
logger.debug(f"add_queue routine ===============")
ret = {}
info = json.loads(request.form["data"])
logger.info(f"info:: {info}")
ret["ret"] = self.add(info)
return jsonify(ret)
except Exception as e:
P.logger.error("Exception:%s", e)
P.logger.error(traceback.format_exc())
def setting_save_after(self):
if self.queue.get_max_ffmpeg_count() != P.ModelSetting.get_int(
"anilife_max_ffmpeg_process_count"
):
self.queue.set_max_ffmpeg_count(
P.ModelSetting.get_int("anilife_max_ffmpeg_process_count")
)
def scheduler_function(self):
pass
def plugin_load(self):
self.queue = FfmpegQueue(
P, P.ModelSetting.get_int("anilife_max_ffmpeg_process_count")
)
self.current_data = None
self.queue.queue_start()
def reset_db(self):
db.session.query(ModelAniLifeItem).delete()
db.session.commit()
return True
# 시리즈 정보를 가져오는 함수
def get_series_info(self, code, wr_id, bo_table):
try:
if code.isdigit():
url = P.ModelSetting.get("anilife_url") + "/detail/id/" + code
else:
url = P.ModelSetting.get("anilife_url") + "/g/l?id=" + code
logger.debug("url::: > %s", url)
response_data = LogicAniLife.get_html(url, timeout=10)
tree = html.fromstring(response_data)
# logger.debug(response_data)
main_title = tree.xpath('//div[@class="infox"]/h1/text()')[0]
image = tree.xpath('//div[@class="thumb"]/img/@src')[0]
des_items = tree.xpath(
'//div[@class="info-content"]/div[@class="spe"]/span'
)
des_items1 = (
tree.xpath('//div[@class="info-content"]/div[@class="spe"]')[0]
.text_content()
.strip()
)
# print(des_items1)
# print(len(des_items))
des = {}
des_key = [
"_otit",
"_dir",
"_pub",
"_tag",
"_classifi",
"_country",
"_season",
"_grade",
"_total_chapter",
"_show_time",
"_release_year",
"_recent_date",
"_air_date",
]
description_dict = {
"상태": "_status",
"원제": "_otit",
"원작": "_org",
"감독": "_dir",
"각본": "_scr",
"시즌": "_season",
"캐릭터 디자인": "_character_design",
"음악": "_sound",
"제작사": "_pub",
"장르": "_tag",
"분류": "_classifi",
"제작국가": "_country",
"방영일": "_date",
"등급": "_grade",
"유형": "_type",
"에피소드": "_total_chapter",
"상영시간": "_show_time",
"공식 방영일": "_release_date",
"방영 시작일": "_air_date",
"최근 방영일": "_recent_date",
"개봉년도": "_release_year",
}
print(main_title)
print(image)
# print(des_items)
list_body_li = tree.xpath('//div[@class="eplister"]/ul/li')
# logger.debug(f"list_body_li:: {list_body_li}")
episodes = []
vi = None
for li in list_body_li:
# logger.debug(li)
ep_num = li.xpath('.//a/div[@class="epl-num"]/text()')[0].strip()
title = li.xpath('.//a/div[@class="epl-title"]/text()')[0].strip()
thumbnail = image
link = li.xpath(".//a/@href")[0]
date = ""
m = hashlib.md5(title.encode("utf-8"))
_vi = m.hexdigest()
episodes.append(
{
"ep_num": ep_num,
"title": title,
"link": link,
"thumbnail": image,
"date": date,
"day": date,
"_id": title,
"va": link,
"_vi": _vi,
"content_code": code,
}
)
# print(lxml.etree.tostring(des_items, method="text"))
#
# for idx, item in enumerate(des_items):
# span = item.xpath(".//b/text()")
# logger.info(f"0: {span[0]}")
# key = description_dict[span[0].replace(":", "")]
# logger.debug(f"key:: {key}")
# try:
# print(item.xpath(".//text()")[1].strip())
# des[key] = item.xpath(".//text()")[1].strip()
# except IndexError:
# if item.xpath(".//a"):
# des[key] = item.xpath(".//a")[0]
# des[key] = ""
ser_description = "작품 설명 부분"
des = ""
des1 = ""
data = {
"title": main_title,
"image": image,
"date": "2022.01.11 00:30 (화)",
"ser_description": ser_description,
# "des": des,
"des1": des_items1,
"episode": episodes,
}
return data
except Exception as e:
P.logger.error("Exception:%s", e)
P.logger.error(traceback.format_exc())
return {"ret": "exception", "log": str(e)}
@staticmethod
def get_real_link(url):
response = requests.get(url)
if response.history:
print("Request was redirected")
for resp in response.history:
print(resp.status_code, resp.url)
print("Final destination:")
print(response.status_code, response.url)
return response.url
else:
print("Request was not redirected")
def get_anime_info(self, cate, page):
logger.debug(f"get_anime_info() routine")
logger.debug(f"cate:: {cate}")
wrapper_xpath = '//div[@class="bsx"]'
try:
if cate == "ing":
url = P.ModelSetting.get("anilife_url")
wrapper_xpath = (
'//div[contains(@class, "listupd")]/*/*/div[@class="bsx"]'
)
elif cate == "theater":
url = (
P.ModelSetting.get("anilife_url")
+ "/vodtype/categorize/Movie/"
+ page
)
wrapper_xpath = '//div[@class="bsx"]'
else:
url = (
P.ModelSetting.get("anilife_url")
+ "/vodtype/categorize/Movie/"
+ page
)
# cate == "complete":
logger.info("url:::> %s", url)
data = {}
response_data = LogicAniLife.get_html(url, timeout=10)
print(response_data)
logger.debug(f"wrapper_xath:: {wrapper_xpath}")
tree = html.fromstring(response_data)
tmp_items = tree.xpath(wrapper_xpath)
data["anime_count"] = len(tmp_items)
data["anime_list"] = []
for item in tmp_items:
entity = {}
entity["link"] = item.xpath(".//a/@href")[0]
logger.debug(entity["link"])
p = re.compile(r"^[http?s://]+[a-zA-Z0-9-]+/[a-zA-Z0-9-_.?=]+$")
print(p.match(entity["link"]) != None)
if p.match(entity["link"]) is None:
entity["link"] = P.ModelSetting.get("anilife_url") + entity["link"]
# real_url = LogicAniLife.get_real_link(url=entity["link"])
logger.debug(entity["link"])
entity["code"] = entity["link"].split("/")[-1]
entity["title"] = item.xpath(".//div[@class='tt']/text()")[0].strip()
entity["image_link"] = item.xpath(".//div[@class='limit']/img/@src")[
0
].replace("..", P.ModelSetting.get("anilife_url"))
data["ret"] = "success"
data["anime_list"].append(entity)
return data
except Exception as e:
P.logger.error("Exception:%s", e)
P.logger.error(traceback.format_exc())
return {"ret": "exception", "log": str(e)}
#########################################################
def add(self, episode_info):
if self.is_exist(episode_info):
return "queue_exist"
else:
db_entity = ModelAniLifeItem.get_by_anilife_id(episode_info["_id"])
logger.debug(f"db_entity():: => {db_entity}")
return "enqueue_db_append"
# pass
def is_exist(self, info):
for e in self.queue.entity_list:
if e.info["_id"] == info["_id"]:
return True
return False
class AniLifeQueueEntity(FfmpegQueueEntity):
pass
class ModelAniLifeItem(db.Model):
__tablename__ = "{package_name}_anilife_item".format(package_name=P.package_name)
__table_args__ = {"mysql_collate": "utf8_general_ci"}
__bind_key__ = P.package_name
id = db.Column(db.Integer, primary_key=True)
created_time = db.Column(db.DateTime)
completed_time = db.Column(db.DateTime)
reserved = db.Column(db.JSON)
content_code = db.Column(db.String)
season = db.Column(db.Integer)
episode_no = db.Column(db.Integer)
title = db.Column(db.String)
episode_title = db.Column(db.String)
anilife_va = db.Column(db.String)
anilife_vi = db.Column(db.String)
anilife_id = db.Column(db.String)
quality = db.Column(db.String)
filepath = db.Column(db.String)
filename = db.Column(db.String)
savepath = db.Column(db.String)
video_url = db.Column(db.String)
vtt_url = db.Column(db.String)
thumbnail = db.Column(db.String)
status = db.Column(db.String)
anilife_info = db.Column(db.JSON)
def __init__(self):
self.created_time = datetime.now()
def __repr__(self):
return repr(self.as_dict())
def as_dict(self):
ret = {x.name: getattr(self, x.name) for x in self.__table__.columns}
ret["created_time"] = self.created_time.strftime("%Y-%m-%d %H:%M:%S")
ret["completed_time"] = (
self.completed_time.strftime("%Y-%m-%d %H:%M:%S")
if self.completed_time is not None
else None
)
return ret
@classmethod
def get_by_id(cls, idx):
return db.session.query(cls).filter_by(id=idx).first()
@classmethod
def get_by_anilife_id(cls, anilife_id):
return db.session.query(cls).filter_by(anilife_id=anilife_id).first()
def save(self):
db.session.add(self)
db.session.commit()