import os import sys import traceback import json from datetime import datetime import hashlib import re import lxml.etree # third-party import requests from lxml import html from urllib import parse import urllib packages = ["beautifulsoup4", "requests-cache", "cloudscraper"] for package in packages: try: import package except ImportError: # main(["install", package]) os.system(f"pip install {package}") import cloudscraper # third-party from flask import request, render_template, jsonify # sjva 공용 from framework import db, scheduler, path_data, socketio from framework.util import Util from framework.common.util import headers from plugin import ( LogicModuleBase, FfmpegQueueEntity, FfmpegQueue, default_route_socketio, ) from tool_base import d # 패키지 from .plugin import P logger = P.logger # =================================================================# # 패키지 class LogicAniLife(LogicModuleBase): db_default = { "anilife_db_version": "1", "anilife_url": "https://anilife.live", "anilife_download_path": os.path.join(path_data, P.package_name, "ohli24"), "anilife_auto_make_folder": "True", "anilife_auto_make_season_folder": "True", "anilife_finished_insert": "[완결]", "anilife_max_ffmpeg_process_count": "1", "anilife_order_desc": "False", "anilife_auto_start": "False", "anilife_interval": "* 5 * * *", "anilife_auto_mode_all": "False", "anilife_auto_code_list": "all", "anilife_current_code": "", "anilife_uncompleted_auto_enqueue": "False", "anilife_image_url_prefix_series": "https://www.jetcloud.cc/series/", "anilife_image_url_prefix_episode": "https://www.jetcloud-list.cc/thumbnail/", } current_headers = None current_data = None referer = None session = requests.Session() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)" "Chrome/71.0.3578.98 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", "Referer": "", "Cookie": "" # "Cookie": "_ga=GA1.1.578607927.1660813724; __gads=ID=10abb8b98b6828ae-2281c943a9d500fd:T=1660813741:RT=1660813741:S=ALNI_MYU_iB2lBgSrEQUBwhKpNsToaqQ8A; sbtsck=javuwDzcOJqUyweM1OQeNGzHbjoHp7Cgw44XnPdM738c3E=; SPSI=e48379959d54a6a62cc7abdcafdb2761; SPSE=h5HfMGLJzLqzNafMD3YaOvHSC9xfh77CcWdKvexp/z5N5OsTkIiYSCudQhFffEfk/0pcOTVf0DpeV0RoNopzig==; anilife_csrf=b93b9f25a12a51cf185805ec4de7cf9d; UTGv2=h46b326af644f4ac5d0eb1502881136b3750; __gpi=UID=000008ba227e99e0:T=1660813741:RT=1660912282:S=ALNI_MaJHIVJIGpQ5nTE9lvypKQxJnn10A; DSR=SXPX8ELcRgh6N/9rNgjpQoNfaX2DRceeKYR0/ul7qTI9gApWQpZxr8jgymf/r0HsUT551vtOv2CMWpIn0Hd26A==; DCSS=89508000A76BBD939F6DDACE5BD9EB902D2212A; DGCC=Wdm; adOtr=7L4Xe58995d; spcsrf=6554fa003bf6a46dd9b7417acfacc20a; _ga_56VYJJ7FTM=GS1.1.1660912281.10.1.1660912576.0.0.0; PRLST=EO", } useragent = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, " "like Gecko) Chrome/96.0.4664.110 Whale/3.12.129.46 Safari/537.36" } def __init__(self, P): super(LogicAniLife, self).__init__(P, "setting", scheduler_desc="애니라이프 자동 다운로드") self.name = "anilife" self.queue = None default_route_socketio(P, self) @staticmethod def get_html(url, referer=None, stream=False, timeout=5): data = "" try: print("cloudflare protection bypass ==================") # return LogicAniLife.get_html_cloudflare(url) return LogicAniLife.get_html_selenium(url) # return LogicAniLife.get_html_playwright(url) # import browser_cookie3 # cj = browser_cookie3.chrome(domain_name="anilife.live") referer = "https://anilife.live/" if LogicAniLife.session is None: LogicAniLife.session = requests.session() # logger.debug('get_html :%s', url) LogicAniLife.headers["Referer"] = "" if referer is None else referer LogicAniLife.headers[ "Cookie" ] = "_ga=GA1.1.578607927.1660813724; __gads=ID=10abb8b98b6828ae-2281c943a9d500fd:T=1660813741:RT=1660813741:S=ALNI_MYU_iB2lBgSrEQUBwhKpNsToaqQ8A; sbtsck=javuwDzcOJqUyweM1OQeNGzHbjoHp7Cgw44XnPdM738c3E=; SPSI=e48379959d54a6a62cc7abdcafdb2761; SPSE=h5HfMGLJzLqzNafMD3YaOvHSC9xfh77CcWdKvexp/z5N5OsTkIiYSCudQhFffEfk/0pcOTVf0DpeV0RoNopzig==; anilife_csrf=b93b9f25a12a51cf185805ec4de7cf9d; UTGv2=h46b326af644f4ac5d0eb1502881136b3750; __gpi=UID=000008ba227e99e0:T=1660813741:RT=1660912282:S=ALNI_MaJHIVJIGpQ5nTE9lvypKQxJnn10A; DSR=SXPX8ELcRgh6N/9rNgjpQoNfaX2DRceeKYR0/ul7qTI9gApWQpZxr8jgymf/r0HsUT551vtOv2CMWpIn0Hd26A==; DCSS=89508000A76BBD939F6DDACE5BD9EB902D2212A; DGCC=Wdm; adOtr=7L4Xe58995d; spcsrf=6554fa003bf6a46dd9b7417acfacc20a; _ga_56VYJJ7FTM=GS1.1.1660912281.10.1.1660912576.0.0.0; PRLST=EO" page_content = LogicAniLife.session.get( url, headers=headers, timeout=timeout, allow_redirects=True ) data = page_content.text except Exception as e: logger.error("Exception:%s", e) logger.error(traceback.format_exc()) return data # @staticmethod # def get_html(url, cached=False): # # try: # print("cloudflare protection bypass ==================") # return LogicLinkkfYommi.get_html_cloudflare(url) # # return LogicLinkkfYommi.get_html_playwright(url) # # # # if ( # # socket.gethostbyname(socket.gethostname()) == "192.168.0.32" # # or socket.gethostbyname(socket.gethostname()) == "127.0.0.1" # # ): # # print("dev================") # # # print("test") # # # import undetected_chromedriver as uc # # # # # # driver = uc.Chrome(use_subprocess=True) # # # driver.get(url) # # # # return LogicLinkkfYommi.get_html_cloudflare(url) # # if LogicLinkkfYommi.session is None: # if cached: # logger.debug("cached===========++++++++++++") # # LogicLinkkfYommi.session = CachedSession( # os.path.join(cache_path, "linkkf_cache"), # backend="sqlite", # expire_after=300, # cache_control=True, # ) # # print(f"{cache_path}") # # print(f"cache_path:: {LogicLinkkfYommi.session.cache}") # else: # LogicLinkkfYommi.session = requests.Session() # # LogicLinkkfYommi.referer = "https://linkkf.app" # # LogicLinkkfYommi.headers["referer"] = LogicLinkkfYommi.referer # # # logger.debug( # # f"get_html()::LogicLinkkfYommi.referer = {LogicLinkkfYommi.referer}" # # ) # page = LogicLinkkfYommi.session.get(url, headers=LogicLinkkfYommi.headers) # # logger.info(f"page: {page}") # # return page.content.decode("utf8", errors="replace") # # return page.text # # return page.content # except Exception as e: # logger.error("Exception:%s", e) # logger.error(traceback.format_exc()) @staticmethod def get_html_playwright(url): from playwright.sync_api import sync_playwright import time # scraper = cloudscraper.create_scraper( # browser={"browser": "chrome", "platform": "windows", "desktop": True}, # debug=False, # # sess=LogicAniLife.session, # delay=10, # ) # # cookie_value, user_agent = scraper.get_cookie_string(url) # # logger.debug(f"cookie_value:: {cookie_value}") start = time.time() ua = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/69.0.3497.100 Safari/537.36" ) # from playwright_stealth import stealth_sync with sync_playwright() as p: browser = p.webkit.launch(headless=True) context = browser.new_context( user_agent=ua, ) LogicAniLife.referer = "https://anilife.live/" LogicAniLife.headers["Referer"] = LogicAniLife.referer # LogicAniLife.headers["Cookie"] = cookie_value print(LogicAniLife.headers) context.set_extra_http_headers(LogicAniLife.headers) page = context.new_page() def set_cookie(req): if "cookie" in req.headers: print(req.headers["cookie"]) cookie = req.headers["cookie"] page.on("request", set_cookie) # stealth_sync(page) page.goto(url, wait_until="domcontentloaded") cookies = context.cookies print(cookies) # print(page.content()) print(f"run at {time.time() - start} sec") return page.content() @staticmethod def get_html_selenium(url): from selenium.webdriver.common.by import By from selenium import webdriver from selenium_stealth import stealth import time options = webdriver.ChromeOptions() # 크롬드라이버 헤더 옵션추가 (리눅스에서 실행시 필수) options.add_argument("start-maximized") options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option("useAutomationExtension", False) # 크롬드라이버 경로 driver_path = "./bin/Darwin/chromedriver" driver = webdriver.Chrome(executable_path=driver_path, chrome_options=options) stealth( driver, languages=["en-US", "en"], vendor="Google Inc.", platform="Win32", webgl_vendor="Intel Inc.", renderer="Intel Iris OpenGL Engine", fix_hairline=True, ) driver.get(url) driver.refresh() time.sleep(1) elem = driver.find_element(By.XPATH, "//*") source_code = elem.get_attribute("outerHTML") return source_code.encode("utf-8") @staticmethod def get_html_cloudflare(url, cached=False): # scraper = cloudscraper.create_scraper( # # disableCloudflareV1=True, # # captcha={"provider": "return_response"}, # delay=10, # browser="chrome", # ) # scraper = cfscrape.create_scraper( # browser={"browser": "chrome", "platform": "android", "desktop": False} # ) # scraper = cloudscraper.create_scraper( # browser={"browser": "chrome", "platform": "windows", "mobile": False}, # debug=True, # ) # LogicAniLife.headers["referer"] = LogicAniLife.referer LogicAniLife.headers["Referer"] = "https://anilife.live/" LogicAniLife.headers[ "Cookie" ] = "_ga=GA1.1.578607927.1660813724; __gads=ID=10abb8b98b6828ae-2281c943a9d500fd:T=1660813741:RT=1660813741:S=ALNI_MYU_iB2lBgSrEQUBwhKpNsToaqQ8A; sbtsck=javuwDzcOJqUyweM1OQeNGzHbjoHp7Cgw44XnPdM738c3E=; SPSI=e48379959d54a6a62cc7abdcafdb2761; SPSE=h5HfMGLJzLqzNafMD3YaOvHSC9xfh77CcWdKvexp/z5N5OsTkIiYSCudQhFffEfk/0pcOTVf0DpeV0RoNopzig==; anilife_csrf=b93b9f25a12a51cf185805ec4de7cf9d; UTGv2=h46b326af644f4ac5d0eb1502881136b3750; __gpi=UID=000008ba227e99e0:T=1660813741:RT=1660912282:S=ALNI_MaJHIVJIGpQ5nTE9lvypKQxJnn10A; DSR=SXPX8ELcRgh6N/9rNgjpQoNfaX2DRceeKYR0/ul7qTI9gApWQpZxr8jgymf/r0HsUT551vtOv2CMWpIn0Hd26A==; DCSS=89508000A76BBD939F6DDACE5BD9EB902D2212A; DGCC=Wdm; adOtr=7L4Xe58995d; spcsrf=6554fa003bf6a46dd9b7417acfacc20a; _ga_56VYJJ7FTM=GS1.1.1660912281.10.1.1660912576.0.0.0; PRLST=EO" # logger.debug(f"headers:: {LogicAniLife.headers}") if LogicAniLife.session is None: LogicAniLife.session = requests.Session() LogicAniLife.session.headers = LogicAniLife.headers # LogicAniLife.session = requests.Session() sess = cloudscraper.create_scraper( browser={"browser": "firefox", "platform": "windows", "desktop": True}, debug=False, sess=LogicAniLife.session, delay=10, ) # print(scraper.get(url, headers=LogicAniLife.headers).content) # print(scraper.get(url).content) # return scraper.get(url, headers=LogicAniLife.headers).content print(LogicAniLife.headers) return sess.get( url, headers=LogicAniLife.session.headers, timeout=10, allow_redirects=True ).content.decode("utf8", errors="replace") @staticmethod def db_init(): pass def process_menu(self, sub, req): arg = P.ModelSetting.to_dict() arg["sub"] = self.name if sub in ["setting", "queue", "list", "category", "request"]: if sub == "setting": job_id = "%s_%s" % (self.P.package_name, self.name) arg["scheduler"] = str(scheduler.is_include(job_id)) arg["is_running"] = str(scheduler.is_running(job_id)) return render_template( "{package_name}_{module_name}_{sub}.html".format( package_name=P.package_name, module_name=self.name, sub=sub ), arg=arg, ) return render_template("sample.html", title="%s - %s" % (P.package_name, sub)) def process_ajax(self, sub, req): try: if sub == "analysis": # code = req.form['code'] logger.debug(req) code = request.form["code"] wr_id = request.form.get("wr_id", None) bo_table = request.form.get("bo_table", None) data = [] # logger.info("code::: %s", code) P.ModelSetting.set("anilife_current_code", code) data = self.get_series_info(code, wr_id, bo_table) self.current_data = data return jsonify({"ret": "success", "data": data, "code": code}) elif sub == "anime_list": data = [] cate = request.form["type"] page = request.form["page"] data = self.get_anime_info(cate, page) # self.current_data = data return jsonify( {"ret": "success", "cate": cate, "page": page, "data": data} ) elif sub == "add_queue": logger.debug(f"add_queue routine ===============") ret = {} info = json.loads(request.form["data"]) logger.info(f"info:: {info}") ret["ret"] = self.add(info) return jsonify(ret) except Exception as e: P.logger.error("Exception:%s", e) P.logger.error(traceback.format_exc()) def setting_save_after(self): if self.queue.get_max_ffmpeg_count() != P.ModelSetting.get_int( "anilife_max_ffmpeg_process_count" ): self.queue.set_max_ffmpeg_count( P.ModelSetting.get_int("anilife_max_ffmpeg_process_count") ) def scheduler_function(self): pass def plugin_load(self): self.queue = FfmpegQueue( P, P.ModelSetting.get_int("anilife_max_ffmpeg_process_count") ) self.current_data = None self.queue.queue_start() def reset_db(self): db.session.query(ModelAniLifeItem).delete() db.session.commit() return True # 시리즈 정보를 가져오는 함수 def get_series_info(self, code, wr_id, bo_table): try: if code.isdigit(): url = P.ModelSetting.get("anilife_url") + "/detail/id/" + code else: url = P.ModelSetting.get("anilife_url") + "/g/l?id=" + code logger.debug("url::: > %s", url) response_data = LogicAniLife.get_html(url, timeout=10) tree = html.fromstring(response_data) # logger.debug(response_data) main_title = tree.xpath('//div[@class="infox"]/h1/text()')[0] image = tree.xpath('//div[@class="thumb"]/img/@src')[0] des_items = tree.xpath( '//div[@class="info-content"]/div[@class="spe"]/span' ) des_items1 = ( tree.xpath('//div[@class="info-content"]/div[@class="spe"]')[0] .text_content() .strip() ) # print(des_items1) # print(len(des_items)) des = {} des_key = [ "_otit", "_dir", "_pub", "_tag", "_classifi", "_country", "_season", "_grade", "_total_chapter", "_show_time", "_release_year", "_recent_date", "_air_date", ] description_dict = { "상태": "_status", "원제": "_otit", "원작": "_org", "감독": "_dir", "각본": "_scr", "시즌": "_season", "캐릭터 디자인": "_character_design", "음악": "_sound", "제작사": "_pub", "장르": "_tag", "분류": "_classifi", "제작국가": "_country", "방영일": "_date", "등급": "_grade", "유형": "_type", "에피소드": "_total_chapter", "상영시간": "_show_time", "공식 방영일": "_release_date", "방영 시작일": "_air_date", "최근 방영일": "_recent_date", "개봉년도": "_release_year", } print(main_title) print(image) # print(des_items) list_body_li = tree.xpath('//div[@class="eplister"]/ul/li') # logger.debug(f"list_body_li:: {list_body_li}") episodes = [] vi = None for li in list_body_li: # logger.debug(li) ep_num = li.xpath('.//a/div[@class="epl-num"]/text()')[0].strip() title = li.xpath('.//a/div[@class="epl-title"]/text()')[0].strip() thumbnail = image link = li.xpath(".//a/@href")[0] date = "" m = hashlib.md5(title.encode("utf-8")) _vi = m.hexdigest() episodes.append( { "ep_num": ep_num, "title": title, "link": link, "thumbnail": image, "date": date, "day": date, "_id": title, "va": link, "_vi": _vi, "content_code": code, } ) # print(lxml.etree.tostring(des_items, method="text")) # # for idx, item in enumerate(des_items): # span = item.xpath(".//b/text()") # logger.info(f"0: {span[0]}") # key = description_dict[span[0].replace(":", "")] # logger.debug(f"key:: {key}") # try: # print(item.xpath(".//text()")[1].strip()) # des[key] = item.xpath(".//text()")[1].strip() # except IndexError: # if item.xpath(".//a"): # des[key] = item.xpath(".//a")[0] # des[key] = "" ser_description = "작품 설명 부분" des = "" des1 = "" data = { "title": main_title, "image": image, "date": "2022.01.11 00:30 (화)", "ser_description": ser_description, # "des": des, "des1": des_items1, "episode": episodes, } return data except Exception as e: P.logger.error("Exception:%s", e) P.logger.error(traceback.format_exc()) return {"ret": "exception", "log": str(e)} @staticmethod def get_real_link(url): response = requests.get(url) if response.history: print("Request was redirected") for resp in response.history: print(resp.status_code, resp.url) print("Final destination:") print(response.status_code, response.url) return response.url else: print("Request was not redirected") def get_anime_info(self, cate, page): logger.debug(f"get_anime_info() routine") logger.debug(f"cate:: {cate}") wrapper_xpath = '//div[@class="bsx"]' try: if cate == "ing": url = P.ModelSetting.get("anilife_url") wrapper_xpath = ( '//div[contains(@class, "listupd")]/*/*/div[@class="bsx"]' ) elif cate == "theater": url = ( P.ModelSetting.get("anilife_url") + "/vodtype/categorize/Movie/" + page ) wrapper_xpath = '//div[@class="bsx"]' else: url = ( P.ModelSetting.get("anilife_url") + "/vodtype/categorize/Movie/" + page ) # cate == "complete": logger.info("url:::> %s", url) data = {} response_data = LogicAniLife.get_html(url, timeout=10) print(response_data) logger.debug(f"wrapper_xath:: {wrapper_xpath}") tree = html.fromstring(response_data) tmp_items = tree.xpath(wrapper_xpath) data["anime_count"] = len(tmp_items) data["anime_list"] = [] for item in tmp_items: entity = {} entity["link"] = item.xpath(".//a/@href")[0] logger.debug(entity["link"]) p = re.compile(r"^[http?s://]+[a-zA-Z0-9-]+/[a-zA-Z0-9-_.?=]+$") print(p.match(entity["link"]) != None) if p.match(entity["link"]) is None: entity["link"] = P.ModelSetting.get("anilife_url") + entity["link"] # real_url = LogicAniLife.get_real_link(url=entity["link"]) logger.debug(entity["link"]) entity["code"] = entity["link"].split("/")[-1] entity["title"] = item.xpath(".//div[@class='tt']/text()")[0].strip() entity["image_link"] = item.xpath(".//div[@class='limit']/img/@src")[ 0 ].replace("..", P.ModelSetting.get("anilife_url")) data["ret"] = "success" data["anime_list"].append(entity) return data except Exception as e: P.logger.error("Exception:%s", e) P.logger.error(traceback.format_exc()) return {"ret": "exception", "log": str(e)} ######################################################### def add(self, episode_info): if self.is_exist(episode_info): return "queue_exist" else: db_entity = ModelAniLifeItem.get_by_anilife_id(episode_info["_id"]) logger.debug(f"db_entity():: => {db_entity}") return "enqueue_db_append" # pass def is_exist(self, info): for e in self.queue.entity_list: if e.info["_id"] == info["_id"]: return True return False class AniLifeQueueEntity(FfmpegQueueEntity): pass class ModelAniLifeItem(db.Model): __tablename__ = "{package_name}_anilife_item".format(package_name=P.package_name) __table_args__ = {"mysql_collate": "utf8_general_ci"} __bind_key__ = P.package_name id = db.Column(db.Integer, primary_key=True) created_time = db.Column(db.DateTime) completed_time = db.Column(db.DateTime) reserved = db.Column(db.JSON) content_code = db.Column(db.String) season = db.Column(db.Integer) episode_no = db.Column(db.Integer) title = db.Column(db.String) episode_title = db.Column(db.String) anilife_va = db.Column(db.String) anilife_vi = db.Column(db.String) anilife_id = db.Column(db.String) quality = db.Column(db.String) filepath = db.Column(db.String) filename = db.Column(db.String) savepath = db.Column(db.String) video_url = db.Column(db.String) vtt_url = db.Column(db.String) thumbnail = db.Column(db.String) status = db.Column(db.String) anilife_info = db.Column(db.JSON) def __init__(self): self.created_time = datetime.now() def __repr__(self): return repr(self.as_dict()) def as_dict(self): ret = {x.name: getattr(self, x.name) for x in self.__table__.columns} ret["created_time"] = self.created_time.strftime("%Y-%m-%d %H:%M:%S") ret["completed_time"] = ( self.completed_time.strftime("%Y-%m-%d %H:%M:%S") if self.completed_time is not None else None ) return ret @classmethod def get_by_id(cls, idx): return db.session.query(cls).filter_by(id=idx).first() @classmethod def get_by_anilife_id(cls, anilife_id): return db.session.query(cls).filter_by(anilife_id=anilife_id).first() def save(self): db.session.add(self) db.session.commit()