Files
anime_downloader/logic_linkkf.py
2023-01-29 21:21:50 +09:00

635 lines
23 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/02/08 3:44 PM
# @Author : yommi
# @Site :
# @File : logic_linkkf
# @Software: PyCharm
import os, sys, traceback, re, json, threading
from datetime import datetime
import copy
import random
import time
import urllib
from urllib.parse import urlparse
# third-party
import requests
from lxml import html
from urllib import parse
import urllib
import asyncio
# third-party
from flask import request, render_template, jsonify
from sqlalchemy import or_, and_, func, not_, desc
# sjva 공용
from framework import app, db, scheduler, path_data, socketio
from framework.util import Util
from framework.common.util import headers
from plugin import (
LogicModuleBase,
FfmpegQueueEntity,
FfmpegQueue,
default_route_socketio,
)
packages = ["beautifulsoup4", "requests-cache", "cloudscraper"]
for _package in packages:
try:
import _package
except ModuleNotFoundError:
if _package == "playwright":
pass
# os.system(f"pip3 install playwright")
# os.system(f"playwright install")
else:
print("here")
os.system(f"{app.config['config']['pip']} install {_package}")
# import package
except ImportError:
# main(["install", package])
if _package == "playwright":
pass
# os.system(f"pip3 install {package}")
# os.system(f"playwright install")
else:
print("here")
os.system(f"{app.config['config']['pip']} install {_package}")
# import package
from tool_base import d
# 패키지
from .plugin import P
logger = P.logger
name = "linkkf"
class LogicLinkkf(LogicModuleBase):
db_default = {
"linkkf_db_version": "1",
"linkkf_url": "https://linkkf.app",
"linkkf_download_path": os.path.join(path_data, P.package_name, "linkkf"),
"linkkf_auto_make_folder": "True",
"linkkf_auto_make_season_folder": "True",
"linkkf_finished_insert": "[완결]",
"linkkf_max_ffmpeg_process_count": "1",
"linkkf_order_desc": "True",
"linkkf_auto_start": "False",
"linkkf_interval": "* 5 * * *",
"linkkf_auto_mode_all": "False",
"linkkf_auto_code_list": "all",
"linkkf_current_code": "",
"linkkf_uncompleted_auto_enqueue": "False",
"linkkf_image_url_prefix_series": "",
"linkkf_image_url_prefix_episode": "",
"linkkf_discord_notify": "True",
}
current_headers = None
current_data = None
referer = None
download_queue = None
download_thread = None
current_download_count = 0
cache_path = os.path.dirname(__file__)
session = requests.Session()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/71.0.3578.98 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
"Referer": "",
}
useragent = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, "
"like Gecko) Chrome/96.0.4664.110 Whale/3.12.129.46 Safari/537.36"
}
def __init__(self, P):
super(LogicLinkkf, self).__init__(P, "setting", scheduler_desc="linkkf 자동 다운로드")
self.name = "linkkf"
default_route_socketio(P, self)
def process_menu(self, sub, req):
arg = P.ModelSetting.to_dict()
arg["sub"] = self.name
if sub in ["setting", "queue", "category", "list", "request"]:
if sub == "request" and req.args.get("content_code") is not None:
arg["linkkf_current_code"] = req.args.get("content_code")
if sub == "setting":
job_id = "%s_%s" % (self.P.package_name, self.name)
arg["scheduler"] = str(scheduler.is_include(job_id))
arg["is_running"] = str(scheduler.is_running(job_id))
return render_template(
"{package_name}_{module_name}_{sub}.html".format(
package_name=P.package_name, module_name=self.name, sub=sub
),
arg=arg,
)
return render_template("sample.html", title="%s - %s" % (P.package_name, sub))
def process_ajax(self, sub, req):
_data = []
try:
if sub == "analysis":
_code = request.form["code"]
P.ModelSetting.set("linkkf_current_code", _code)
_data = self.get_series_info(_code)
self.current_data = _data
return jsonify({"ret": "success", "data": _data, "code": _code})
elif sub == "anime_list":
data = []
cate = request.form["type"]
page = request.form["page"]
data = self.get_anime_info(cate, page)
# self.current_data = data
return jsonify(
{"ret": "success", "cate": cate, "page": page, "data": data}
)
elif sub == "screen_movie_list":
try:
logger.debug("request:::> %s", request.form["page"])
page = request.form["page"]
data = self.get_screen_movie_info(page)
dummy_data = {"ret": "success", "data": data}
return jsonify(data)
except Exception as e:
logger.error(f"Exception: {str(e)}")
logger.error(traceback.format_exc())
elif sub == "complete_list":
pass
elif sub == "search":
data = []
# cate = request.form["type"]
# page = request.form["page"]
cate = request.form["type"]
query = request.form["query"]
page = request.form["page"]
data = self.get_search_result(query, page, cate)
# self.current_data = data
return jsonify(
{
"ret": "success",
"cate": cate,
"page": page,
"query": query,
"data": data,
}
)
elif sub == "add_queue":
logger.debug(f"linkkf add_queue routine ===============")
ret = {}
info = json.loads(request.form["data"])
logger.info(f"info:: {info}")
ret["ret"] = self.add(info)
return jsonify(ret)
elif sub == "entity_list":
pass
elif sub == "queue_command":
pass
elif sub == "add_queue_checked_list":
pass
elif sub == "web_list":
pass
elif sub == "db_remove":
pass
elif sub == "add_whitelist":
pass
except Exception as e:
P.logger.error(f"Exception: {str(e)}")
P.logger.error(traceback.format_exc())
@staticmethod
def get_html(url: str, timeout: int = 10, cached=False):
try:
if LogicLinkkf.referer is None:
LogicLinkkf.referer = f"{P.ModelSetting.get('linkkf_url')}"
# return LogicLinkkfYommi.get_html_requests(url)
return LogicLinkkf.get_html_cloudflare(url, timeout)
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def get_html_cloudflare(url: str, timeout: int = 10, cached=False):
logger.debug(f"cloudflare protection bypass {'=' * 30}")
import cloudscraper
user_agents_list = [
"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.83 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36",
]
# ua = UserAgent(verify_ssl=False)
LogicLinkkf.headers["User-Agent"] = random.choice(user_agents_list)
LogicLinkkf.headers["Referer"] = LogicLinkkf.referer
# logger.debug(f"headers:: {LogicLinkkfYommi.headers}")
if LogicLinkkf.session is None:
LogicLinkkf.session = requests.Session()
# LogicLinkkfYommi.session = requests.Session()
# re_sess = requests.Session()
# logger.debug(LogicLinkkfYommi.session)
# sess = cloudscraper.create_scraper(
# # browser={"browser": "firefox", "mobile": False},
# browser={"browser": "chrome", "mobile": False},
# debug=True,
# sess=LogicLinkkfYommi.session,
# delay=10,
# )
# scraper = cloudscraper.create_scraper(sess=re_sess)
scraper = cloudscraper.create_scraper(
# debug=True,
delay=10,
sess=LogicLinkkf.session,
browser={
"custom": "linkkf",
},
)
# print(scraper.get(url, headers=LogicLinkkfYommi.headers).content)
# print(scraper.get(url).content)
# return scraper.get(url, headers=LogicLinkkfYommi.headers).content
# logger.debug(LogicLinkkfYommi.headers)
return scraper.get(
url,
headers=LogicLinkkf.headers,
timeout=10,
).content.decode("utf8", errors="replace")
def get_anime_info(self, cate, page):
try:
if cate == "ing":
url = f"{P.ModelSetting.get('linkkf_url')}/airing/page/{page}"
items_xpath = '//div[@class="myui-vodlist__box"]'
title_xpath = './/a[@class="text-fff"]//text()'
elif cate == "movie":
url = f"{P.ModelSetting.get('linkkf_url')}/ani/page/{page}"
items_xpath = '//div[@class="myui-vodlist__box"]'
title_xpath = './/a[@class="text-fff"]//text()'
elif cate == "complete":
url = f"{P.ModelSetting.get('linkkf_url')}/anime-list/page/{page}"
items_xpath = '//div[@class="myui-vodlist__box"]'
title_xpath = './/a[@class="text-fff"]//text()'
elif cate == "top_view":
url = f"{P.ModelSetting.get('linkkf_url')}/topview/page/{page}"
items_xpath = '//div[@class="myui-vodlist__box"]'
title_xpath = './/a[@class="text-fff"]//text()'
logger.info("url:::> %s", url)
logger.info("test..........................")
# logger.info("test..........................")
if self.referer is None:
self.referer = "https://linkkf.app"
data = {"ret": "success", "page": page}
response_data = LogicLinkkf.get_html(url, timeout=10)
# P.logger.debug(response_data)
P.logger.debug("debug.....................")
tree = html.fromstring(response_data)
tmp_items = tree.xpath(items_xpath)
if tree.xpath('//div[@id="wp_page"]//text()'):
data["total_page"] = tree.xpath('//div[@id="wp_page"]//text()')[-1]
else:
data["total_page"] = 0
data["episode_count"] = len(tmp_items)
data["episode"] = []
for item in tmp_items:
entity = dict()
entity["link"] = item.xpath(".//a/@href")[0]
entity["code"] = re.search(r"[0-9]+", entity["link"]).group()
entity["title"] = item.xpath(title_xpath)[0].strip()
entity["image_link"] = item.xpath("./a/@data-original")[0]
entity["chapter"] = (
item.xpath("./a/span//text()")[0].strip()
if len(item.xpath("./a/span//text()")) > 0
else ""
)
# logger.info('entity:::', entity['title'])
data["episode"].append(entity)
# logger.debug(data)
return data
except Exception as e:
P.logger.error("Exception:%s", e)
P.logger.error(traceback.format_exc())
return {"ret": "exception", "log": str(e)}
def get_series_info(self, code):
from bs4 import BeautifulSoup
data = {"code": code, "ret": False}
try:
# 이전 데이터가 있다면, 리턴 (# If you have previous data, return)
if (
LogicLinkkf.current_data is not None
and LogicLinkkf.current_data["code"] == code
and LogicLinkkf.current_data["ret"]
):
return LogicLinkkf.current_data
url = "%s/%s" % (P.ModelSetting.get("linkkf_url"), code)
logger.info(url)
logger.debug(LogicLinkkf.headers)
html_content = LogicLinkkf.get_html(url, cached=False)
# html_content = LogicLinkkf.get_html_playwright(url)
# html_content = LogicLinkkf.get_html_cloudflare(url, cached=False)
sys.setrecursionlimit(10**7)
# logger.info(html_content)
tree = html.fromstring(html_content)
# tree = etree.fromstring(
# html_content, parser=etree.XMLParser(huge_tree=True)
# )
# tree1 = BeautifulSoup(html_content, "lxml")
soup = BeautifulSoup(html_content, "html.parser")
# tree = etree.HTML(str(soup))
# logger.info(tree)
tmp2 = soup.select("ul > a")
if len(tmp2) == 0:
tmp = soup.select("u > a")
else:
tmp = soup.select("ul > a")
# logger.debug(f"tmp1 size:=> {str(len(tmp))}")
try:
tmp = (
tree.xpath('//div[@class="hrecipe"]/article/center/strong')[0]
.text_content()
.strip()
)
except IndexError:
tmp = tree.xpath("//article/center/strong")[0].text_content().strip()
# logger.info(tmp)
match = re.compile(r"(?P<season>\d+)기").search(tmp)
if match:
data["season"] = match.group("season")
else:
data["season"] = "1"
data["_id"] = str(code)
data["title"] = tmp.replace(data["season"] + "", "").strip()
data["title"] = data["title"].replace("()", "").strip()
data["title"] = (
Util.change_text_for_use_filename(data["title"])
.replace("OVA", "")
.strip()
)
try:
data["poster_url"] = tree.xpath(
'//div[@class="myui-content__thumb"]/a/@data-original'
)
# print(tree.xpath('//div[@class="myui-content__detail"]/text()'))
if len(tree.xpath('//div[@class="myui-content__detail"]/text()')) > 3:
data["detail"] = [
{
"info": str(
tree.xpath(
"//div[@class='myui-content__detail']/text()"
)[3]
)
}
]
else:
data["detail"] = [{"정보없음": ""}]
except Exception as e:
logger.error(e)
data["detail"] = [{"정보없음": ""}]
data["poster_url"] = None
data["rate"] = tree.xpath('span[@class="tag-score"]')
tag_score = tree.xpath('//span[@class="taq-score"]')[0].text_content()
# logger.debug(tag_score)
tag_count = (
tree.xpath('//span[contains(@class, "taq-count")]')[0]
.text_content()
.strip()
)
data_rate = tree.xpath('//div[@class="rating"]/div/@data-rate')
tmp2 = soup.select("ul > a")
if len(tmp) == 0:
tmp = soup.select("u > a")
else:
tmp = soup.select("ul > a")
if tmp is not None:
data["episode_count"] = str(len(tmp))
else:
data["episode_count"] = "0"
data["episode"] = []
# tags = tree.xpath(
# '//*[@id="syno-nsc-ext-gen3"]/article/div[1]/article/a')
# tags = tree.xpath("//ul/a")
tags = soup.select("ul > u > a")
if len(tags) > 0:
pass
else:
tags = soup.select("ul > a")
logger.debug(len(tags))
# logger.info("tags", tags)
# re1 = re.compile(r'\/(?P<code>\d+)')
re1 = re.compile(r"\-([^-])+\.")
data["save_folder"] = data["title"]
# logger.debug(f"save_folder::> {data['save_folder']}")
# program = (
# db.session.query(ModelLinkkfProgram).filter_by(programcode=code).first()
# )
idx = 1
for t in tags:
entity = {
"_id": data["code"],
"program_code": data["code"],
"program_title": data["title"],
"save_folder": Util.change_text_for_use_filename(
data["save_folder"]
),
"title": t.text.strip(),
# "title": t.text_content().strip(),
}
# entity['code'] = re1.search(t.attrib['href']).group('code')
# logger.debug(f"title ::>{entity['title']}")
# 고유id임을 알수 없는 말도 안됨..
# 에피소드 코드가 고유해야 상태값 갱신이 제대로 된 값에 넣어짐
p = re.compile(r"([0-9]+)화?")
m_obj = p.match(entity["title"])
# logger.info(m_obj.group())
# entity['code'] = data['code'] + '_' +str(idx)
episode_code = None
# logger.debug(f"m_obj::> {m_obj}")
if m_obj is not None:
episode_code = m_obj.group(1)
entity["code"] = data["code"] + episode_code.zfill(4)
else:
entity["code"] = data["code"]
aa = t["href"]
if "/player" in aa:
entity["url"] = "https://linkkf.app" + t["href"]
else:
entity["url"] = t["href"]
entity["season"] = data["season"]
# 저장 경로 저장
# Todo: db
tmp_save_path = P.ModelSetting.get(f"linkkf_download_path")
if P.ModelSetting.get("linkkf_auto_make_folder") == "True":
program_path = os.path.join(tmp_save_path, entity["save_folder"])
entity["save_path"] = program_path
if P.ModelSetting.get("linkkf_auto_make_season_folder"):
entity["save_path"] = os.path.join(
entity["save_path"], "Season %s" % int(entity["season"])
)
entity["image"] = data["poster_url"]
entity["filename"] = LogicLinkkf.get_filename(
data["save_folder"], data["season"], entity["title"]
)
data["episode"].append(entity)
idx = idx + 1
data["ret"] = True
# logger.info('data', data)
self.current_data = data
return data
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
data["log"] = str(e)
data["ret"] = "error"
return data
except IndexError as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
data["log"] = str(e)
data["ret"] = "error"
return data
@staticmethod
def get_filename(maintitle, season, title):
try:
# logger.debug("get_filename()===")
# logger.info("title:: %s", title)
# logger.info("maintitle:: %s", maintitle)
match = re.compile(
r"(?P<title>.*?)\s?((?P<season>\d+)기)?\s?((?P<epi_no>\d+)화?)"
).search(title)
if match:
epi_no = int(match.group("epi_no"))
if epi_no < 10:
epi_no = "0%s" % epi_no
else:
epi_no = "%s" % epi_no
if int(season) < 10:
season = "0%s" % season
else:
season = "%s" % season
# title_part = match.group('title').strip()
# ret = '%s.S%sE%s%s.720p-SA.mp4' % (maintitle, season, epi_no, date_str)
ret = "%s.S%sE%s.720p-LK.mp4" % (maintitle, season, epi_no)
else:
logger.debug("NOT MATCH")
ret = "%s.720p-SA.mp4" % maintitle
return Util.change_text_for_use_filename(ret)
except Exception as e:
logger.error(f"Exception: {str(e)}")
logger.error(traceback.format_exc())
pass
class ModelLinkkfItem(db.Model):
__tablename__ = "{package_name}_linkkf_item".format(package_name=P.package_name)
__table_args__ = {"mysql_collate": "utf8_general_ci"}
__bind_key__ = P.package_name
id = db.Column(db.Integer, primary_key=True)
created_time = db.Column(db.DateTime)
completed_time = db.Column(db.DateTime)
reserved = db.Column(db.JSON)
content_code = db.Column(db.String)
season = db.Column(db.Integer)
episode_no = db.Column(db.Integer)
title = db.Column(db.String)
episode_title = db.Column(db.String)
linkkf_va = db.Column(db.String)
linkkf_vi = db.Column(db.String)
linkkf_id = db.Column(db.String)
quality = db.Column(db.String)
filepath = db.Column(db.String)
filename = db.Column(db.String)
savepath = db.Column(db.String)
video_url = db.Column(db.String)
vtt_url = db.Column(db.String)
thumbnail = db.Column(db.String)
status = db.Column(db.String)
linkkf_info = db.Column(db.JSON)
def __int__(self):
self.created_time == datetime.now()
def __repr__(self):
return repr(self.as_dict())
def as_dict(self):
ret = {x.name: getattr(self, x.name) for x in self.__table__.columns}
ret["created_time"] = self.created_time.strftime("%Y-%m-%d %H:%M:%S")
ret["completed_time"] = (
self.completed_time.strftime("%Y-%m-%d %H:%M:%S")
if self.completed_time is not None
else None
)
return ret
def save(self):
db.session.add(self)
db.session.commit()
@classmethod
def get_by_id(cls, idx):
return db.session.query(cls).filter_by(id=idx).first()