Files
inflearn/logic_inflearn.py

1125 lines
42 KiB
Python
Executable File

# -*- coding: utf-8 -*-
#########################################################
# python
import os
import sys
import traceback
import logging
import threading
import time
import re
import random
import urllib
import asyncio
# import pip
# import urlparse
from urllib.parse import urlparse
import json
import aiohttp
packages = ["beautifulsoup4"]
for package in packages:
try:
import package
except ImportError:
# main(["install", package])
os.system(f"pip install {package}")
# third-party
import requests
from lxml import html, etree
from bs4 import BeautifulSoup
from urllib import parse
# from debugger import Debugger
# from debugger1 import timerun, yommilogger
from .debugger1 import timerun, yommi_logger
# import snoop
# from snoop import spy
# sjva 공용
from framework import db, scheduler, path_data
from framework.job import Job
from framework.util import Util
from framework.logger import get_logger
# 패키지
# from .plugin import package_name, logger
# from anime_downloader.logic_ohli24 import ModelOhli24Item
from .model import ModelSetting, ModelInflearn, ModelInflearnProgram
from .logic_queue import LogicQueue
#########################################################
package_name = __name__.split(".")[0]
logger = get_logger(package_name)
class LogicInflearn(object):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98"
"Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
}
session = None
referer = None
current_data = None
season = "1"
@staticmethod
def get_html(url):
try:
if LogicInflearn.session is None:
LogicInflearn.session = requests.Session()
LogicInflearn.headers["referer"] = LogicInflearn.referer
LogicInflearn.referer = url
# logger.debug(
# f"get_html()::LogicLinkkfYommi.referer = {LogicLinkkfYommi.referer}"
# )
page = LogicInflearn.session.get(url, headers=LogicInflearn.headers)
# logger.info("page", page)
return page.content.decode("utf8", errors="replace")
# return page.text
# return page.content
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def get_video_url_from_url(url, url2):
video_url = None
referer_url = None
vtt_url = None
LogicInflearn.referer = url2
# logger.info("dx download url : %s , url2 : %s" % (url, url2))
# logger.debug(LogicLinkkfYommi.referer)
try:
if "kfani" in url2:
# kfani 계열 처리 => 방문해서 m3u8을 받아온다.
logger.debug("kfani routine")
LogicInflearn.referer = url2
# logger.debug(f"url2: {url2}")
data = LogicInflearn.get_html(url2)
# logger.info("dx: data", data)
regex2 = r'"([^\"]*m3u8)"|<source[^>]+src=\"([^"]+)'
temp_url = re.findall(regex2, data)[0]
video_url = ""
ref = "https://kfani.me"
for i in temp_url:
if i is None:
continue
video_url = i
# video_url = '{1} -headers \'Referer: "{0}"\' -user_agent "Mozilla/5.0 (Windows NT 10.0; Win64;
# x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3554.0 Safari/537.36"'.format(ref,
# video_url)
match = re.compile(r"<track.+src\=\"(?P<vtt_url>.*?.vtt)").search(data)
# logger.info("match group: %s", match.group('vtt_url'))
vtt_url = match.group("vtt_url")
# logger.info("vtt_url: %s", vtt_url)
# logger.debug(f"LogicLinkkfYommi.referer: {LogicLinkkfYommi.referer}")
referer_url = url2
else:
logger.error("새로운 유형의 url 발생! %s %s" % (url, url2))
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
# logger.debug(f"referer_url: {referer_url}")
# logger.debug(f"LogicLinkkfYommi.referer: {LogicLinkkfYommi.referer}")
return [video_url, referer_url, vtt_url]
@staticmethod
def get_video_url(episode_url):
try:
# url = urlparse.urljoin(ModelSetting.get('inflearn_url'), episode_id)
url = episode_url
# logger.info("url: %s" % url)
data = LogicInflearn.get_html(url)
# logger.info(data)
tree = html.fromstring(data)
url2s = [
tag.attrib["value"]
for tag in tree.xpath('//*[@id="body"]/div/span/center/select/option')
]
# logger.info('dx: url', url)
# logger.info('dx: urls2', url2s)
video_url = None
referer_url = None # dx
for url2 in url2s:
try:
if video_url is not None:
continue
logger.debug(f"url: {url}, url2: {url2}")
ret = LogicInflearn.get_video_url_from_url(url, url2)
print(f"ret::::> {ret}")
if ret is not None:
video_url = ret
referer_url = url2
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
# logger.info(video_url)
# return [video_url, referer_url]
return video_url
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def apply_new_title(new_title):
try:
ret = {}
if LogicInflearn.current_data is not None:
program = (
db.session.query(ModelInflearnProgram)
.filter_by(programcode=LogicInflearn.current_data["code"])
.first()
)
new_title = Util.change_text_for_use_filename(new_title)
LogicInflearn.current_data["save_folder"] = new_title
program.save_folder = new_title
db.session.commit()
for entity in LogicInflearn.current_data["episode"]:
entity["save_folder"] = new_title
entity["filename"] = LogicInflearn.get_filename(
LogicInflearn.current_data["save_folder"],
LogicInflearn.current_data["season"],
entity["title"],
)
# tmp = data['filename'].split('.')
# tmp[0] = new_title
# data['filename'] = '.'.join(tmp)
return LogicInflearn.current_data
else:
ret["ret"] = False
ret["log"] = "No current data!!"
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
ret["ret"] = False
ret["log"] = str(e)
return ret
@staticmethod
def apply_new_season(new_season):
try:
ret = {}
season = int(new_season)
if LogicInflearn.current_data is not None:
program = (
db.session.query(ModelInflearnProgram)
.filter_by(programcode=LogicInflearn.current_data["code"])
.first()
)
LogicInflearn.current_data["season"] = season
program.season = season
db.session.commit()
for entity in LogicInflearn.current_data["episode"]:
entity["filename"] = LogicInflearn.get_filename(
LogicInflearn.current_data["save_folder"],
LogicInflearn.current_data["season"],
entity["title"],
)
return LogicInflearn.current_data
else:
ret["ret"] = False
ret["log"] = "No current data!!"
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
ret["ret"] = False
ret["log"] = str(e)
return ret
@staticmethod
def add_whitelist(*args):
ret = {}
logger.debug(f"args: {args}")
try:
if len(args) == 0:
code = str(LogicInflearn.current_data["code"])
else:
code = str(args[0])
whitelist_program = ModelSetting.get("whitelist_program")
whitelist_programs = [
str(x.strip().replace(" ", ""))
for x in whitelist_program.replace("\n", ",").split(",")
]
if code not in whitelist_programs:
whitelist_programs.append(code)
whitelist_programs = filter(
lambda x: x != "", whitelist_programs
) # remove blank code
whitelist_program = ",".join(whitelist_programs)
entity = (
db.session.query(ModelSetting)
.filter_by(key="whitelist_program")
.with_for_update()
.first()
)
entity.value = whitelist_program
db.session.commit()
ret["ret"] = True
ret["code"] = code
if len(args) == 0:
return LogicInflearn.current_data
else:
return ret
else:
ret["ret"] = False
ret["log"] = "이미 추가되어 있습니다."
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
ret["ret"] = False
ret["log"] = str(e)
return ret
@staticmethod
def get_lecture_list():
try:
url = f"{ModelSetting.get('inflearn_url')}/curation/latest?page=1"
html_content = LogicInflearn.get_html(url)
download_path = ModelSetting.get("download_path")
tree = html.fromstring(html_content)
tmp_items = tree.xpath('//div[@class="item"]')
# logger.info('tmp_items:::', tmp_items)
data = {"ret": "success"}
# logger.debug(tree.xpath('//*[@id="wp_page"]//text()'))
if tree.xpath('//*[@id="wp_page"]//text()'):
data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1]
else:
data["total_page"] = 0
data["episode_count"] = len(tmp_items)
data["episode"] = []
for item in tmp_items:
entity = {}
entity["link"] = item.xpath(".//a/@href")[0]
entity["code"] = re.search(r"[0-9]+", entity["link"]).group()
entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[
0
].strip()
entity["image_link"] = item.xpath(
'.//img[@class="photo"]/@data-lazy-src'
)[0]
entity["chapter"] = item.xpath(".//a/button/span//text()")[0]
# logger.info('entity:::', entity['title'])
data["episode"].append(entity)
json_file_path = os.path.join(download_path, "airing_list.json")
logger.debug("json_file_path:: %s", json_file_path)
with open(json_file_path, "w") as outfile:
json.dump(data, outfile)
return data
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def get_search_result(query):
try:
# query = query.encode("utf-8")
_query = urllib.parse.quote(query)
url = f"{ModelSetting.get('inflearn_url')}/?s={_query}"
logger.debug("search url::> %s", url)
html_content = LogicInflearn.get_html(url)
download_path = ModelSetting.get("download_path")
tree = html.fromstring(html_content)
tmp_items = tree.xpath('//div[@class="item"]')
# logger.info('tmp_items:::', tmp_items)
data = {"ret": "success", "query": query}
# data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1]
if tree.xpath('//*[@id="wp_page"]//text()'):
data["total_page"] = tree.xpath('//*[@id="wp_page"]//text()')[-1]
else:
data["total_page"] = 0
data["episode_count"] = len(tmp_items)
data["episode"] = []
for item in tmp_items:
entity = {}
entity["link"] = item.xpath(".//a/@href")[0]
entity["code"] = re.search(r"[0-9]+", entity["link"]).group()
entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[
0
].strip()
entity["image_link"] = item.xpath('.//img[@class="photo"]/@src')[0]
# logger.info('entity:::', entity['title'])
data["episode"].append(entity)
json_file_path = os.path.join(download_path, "airing_list.json")
logger.debug("json_file_path:: %s", json_file_path)
with open(json_file_path, "w") as outfile:
json.dump(data, outfile)
return data
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def get_lecture_list_info(cate, page):
try:
url = ""
if cate == "recent":
# url = f"{ModelSetting.get('inflearn_url')}/curation/latest?page={page}&order={cate}"
url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}"
elif cate == "rating":
url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}"
elif cate == "popular":
url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}"
elif cate == "seq":
url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}"
logger.debug(f"get_lecture_list_info():url >> {url}")
html_content = LogicInflearn.get_html(url)
# logger.debug("html_content: %s", html_content)
tree = html.fromstring(html_content)
tmp_items = tree.xpath(
'//div[contains(@class, "courses_card_list_body")]/div'
)
# logger.info("tmp_items::: %s", tmp_items)
data = {
"ret": "success",
"page": page,
"total_page": 100,
"episode_count": len(tmp_items),
"episode": [],
}
for item in tmp_items:
entity = {}
entity["link"] = item.xpath(".//a/@href")[0]
entity["code"] = entity["link"].split("/")[-1]
entity["_code"] = item.xpath("/div/@data-productid")
# logger.debug(item)
# entity["code"] = 1
entity["title"] = item.xpath('.//p[@class="course_title"]/text()')[
0
].strip()
entity["teacher"] = item.xpath('.//div[@class="instructor"]/text()')[
0
].strip()
# entity["price"] = item.xpath(
# './/div[@class="price"]//span[@class="pay_price"]/text()'
# )[0].strip()
entity["price"] = item.xpath('.//div[@class="price"]/text()')
entity["image_link"] = item.xpath('.//img[@class="swiper-lazy"]/@src')
entity["chapter"] = entity["price"]
# entity["chapter"] = item.xpath(".//a/button/span//text()")[0]
# logger.info('entity:::', entity['title'])
data["episode"].append(entity)
# json_file_path = os.path.join(download_path, "airing_list.json")
# logger.debug("json_file_path:: %s", json_file_path)
# with open(json_file_path, "w") as outfile:
# json.dump(data, outfile)
# logger.debug("data:: %s", data)
return data
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
# @staticmethod
# def get_lecture_list_info(cate, page):
# try:
# url = ""
# if cate == "recent":
# # url = f"{ModelSetting.get('inflearn_url')}/curation/latest?page={page}&order={cate}"
# url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}"
# elif cate == "rating":
# url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}"
# elif cate == "popular":
# url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}"
# elif cate == "seq":
# url = f"{ModelSetting.get('inflearn_url')}/courses?page={page}&order={cate}"
# logger.debug(f"get_lecture_list_info():url >> {url}")
#
# html_content = LogicInflearn.get_html(url)
# # logger.debug("html_content: %s", html_content)
#
# tree = html.fromstring(html_content)
# tmp_items = tree.xpath(
# '//ul[@class="tag-courses__list e-tag-courses__list"]/li'
# )
# # logger.info("tmp_items::: %s", tmp_items)
#
# data = {
# "ret": "success",
# "page": page,
# "total_page": 100,
# "episode_count": len(tmp_items),
# "episode": [],
# }
#
# for item in tmp_items:
# entity = {}
# entity["link"] = item.xpath(".//a/@href")[0]
# entity["code"] = entity["link"].split("/")[-1]
# entity["_code"] = item.attrib["data-id"]
# # logger.debug(item)
# # entity["code"] = 1
# entity["title"] = item.xpath('.//div[@class="info__basic"]//h3/text()')[
# 0
# ].strip()
# entity["teacher"] = item.xpath(
# './/div[@class="info__basic"]//h4/text()'
# )[0].strip()
# entity["price"] = item.xpath(
# './/div[@class="course-card__price"]/dd/text()'
# )[0].strip()
# entity["image_link"] = item.xpath('.//img[@class="swiper-lazy"]/@src')
# entity["chapter"] = entity["price"]
# # entity["chapter"] = item.xpath(".//a/button/span//text()")[0]
# # logger.info('entity:::', entity['title'])
# data["episode"].append(entity)
#
# # json_file_path = os.path.join(download_path, "airing_list.json")
# # logger.debug("json_file_path:: %s", json_file_path)
#
# # with open(json_file_path, "w") as outfile:
# # json.dump(data, outfile)
# # logger.debug("data:: %s", data)
#
# return data
#
# except Exception as e:
# logger.error("Exception:%s", e)
# logger.error(traceback.format_exc())
@staticmethod
def get_screen_movie_info(page):
try:
url = f"{ModelSetting.get('inflearn_url')}/ani/page/{page}"
html_content = LogicInflearn.get_html(url)
download_path = ModelSetting.get("download_path")
tree = html.fromstring(html_content)
tmp_items = tree.xpath('//div[@class="item"]')
# logger.info('tmp_items:::', tmp_items)
data = {"ret": "success", "page": page}
data["episode_count"] = len(tmp_items)
data["episode"] = []
for item in tmp_items:
entity = {}
entity["link"] = item.xpath(".//a/@href")[0]
entity["code"] = re.search(r"[0-9]+", entity["link"]).group()
entity["title"] = item.xpath('.//span[@class="name-film"]//text()')[
0
].strip()
entity["image_link"] = item.xpath(
'.//img[@class="photo"]/@data-lazy-src'
)[0]
# logger.info('entity:::', entity['title'])
data["episode"].append(entity)
json_file_path = os.path.join(download_path, "airing_list.json")
logger.debug("json_file_path:: %s", json_file_path)
with open(json_file_path, "w") as outfile:
json.dump(data, outfile)
return data
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
@yommi_logger(logging_type="debug")
def get_title_info_old(code):
try:
# if (
# LogicInflearn.current_data is not None
# and LogicInflearn.current_data["code"] == code
# and LogicInflearn.current_data["ret"]
# ):
# return LogicInflearn.current_data
url = "%s/course/%s" % (ModelSetting.get("inflearn_url"), parse.quote(code))
logger.info(url)
html_content = LogicInflearn.get_html(url)
sys.setrecursionlimit(10**7)
# logger.info(html_content)
# tree = html.fromstring(html_content)
# tree = etree.fromstring(
# html_content, parser=etree.XMLParser(huge_tree=True)
# )
# tree1 = BeautifulSoup(html_content, "lxml")
# tree = etree.HTML(str(soup))
# logger.info(tree)
data = {"code": code, "ret": False}
soup = BeautifulSoup(html_content, "html.parser")
# logger.debug(soup.select_one("div.cd-header__thumbnail-cover"))
data["poster_url"] = soup.select_one(
"div.cd-header__thumbnail-cover div img"
)["src"]
data["title"] = soup.select_one("div.cd-header__title").text
main_title = soup.select_one("div.cd-header__title").text
# data["item_id"] = soup.select_one('meta[property="dable:item_id"]')[
# "content"
# ]
# item_id = data["item_id"]
data["save_folder"] = data["title"]
data["season"] = "1"
# tmp = soup.select("ul > a")
# logger.debug(f"tmp1 size:=> {str(len(tmp))}")
curriculum_content = soup.find_all("a", {"class": "cd-accordion__unit"})
preview_path = []
for i, elem in enumerate(curriculum_content):
# print(elem)
preview_path.append(elem["href"])
# print(f"{i}. {elem['href']}")
# 미리보기 가능 1번 동영상 뷰 페이지로 이동
# self.getVideoInfo(preview_path[0])
base_url = "https://www.inflearn.com"
url = base_url + parse.quote(preview_path[0])
logger.debug(f"url::::: {url}")
resData = requests.get(url, timeout=20)
if resData.url != url:
# redirect occurred; likely symbol doesn't exist or cannot be found.
raise requests.TooManyRedirects()
resData.raise_for_status()
# soup = BeautifulSoup(resData.text, "html.parser")
soup = BeautifulSoup(resData.text, "html.parser")
items = soup.find_all("div", attrs={"class": "unit-el"})
# print(len(items))
lecture_list = []
# create xlsx file
# wb = Workbook()
# ws = wb.active # create xlsx sheet
# ws.append(
# ["title", "data_id", "run_time", "api_url", "file_name", "hlsUrl"]
# )
temp = []
# print(type(items))
program = (
db.session.query(ModelInflearnProgram)
.filter_by(programcode=code)
.first()
)
if program is None:
program = ModelInflearnProgram(data)
db.session.add(program)
db.session.commit()
else:
data["save_folder"] = program.save_folder
data["season"] = program.season
for idx, item in enumerate(items):
#
temp1 = {}
print("idx::", idx)
data_id = item["data-id"]
run_time = ""
title = item.find("div", attrs={"class": "title"}).get_text()
if item.find("span", {"class": "runtime"}) is not None:
run_time = item.find("span", {"class": "runtime"}).get_text()
api_url = f"{base_url}/api/course/{code}/lecture/{data_id}"
temp1["season"] = "1"
LogicInflearn.season = "1"
# logger.debug(api_url)
m3u8_info = LogicInflearn.getM3u8_info(
api_url, LogicInflearn.season, idx
)
# print(api_url)
# print('type::::', type(m3u8_url))
logger.debug(m3u8_info)
# ws.append(
# [
# title,
# data_id,
# run_time,
# api_url,
# m3u8_info["name"],
# m3u8_info["hlsUrl"],
# ]
# )
# temp.append(title, data_id, run_time, api_url,m3u8_info['name'], m3u8_info['hlsUrl'])
# temp1['title'] = title
temp1["save_folder"] = Util.change_text_for_use_filename(
data["save_folder"]
)
# logger.debug(temp1["save_folder"])
tmp_save_path = ModelSetting.get("download_path")
if ModelSetting.get("auto_make_folder") == "True":
program_path = os.path.join(tmp_save_path, temp1["save_folder"])
temp1["save_path"] = program_path
if ModelSetting.get("inflearn_auto_make_season_folder"):
temp1["save_path"] = os.path.join(
temp1["save_path"], "Season %s" % int(temp1["season"])
)
temp1["title"] = title
temp1["data_id"] = data_id
temp1["item_id"] = m3u8_info["data_id"]
temp1["code"] = temp1["item_id"]
temp1["run_time"] = run_time
temp1["api_url"] = api_url
temp1["name"] = m3u8_info["name"]
temp1["filename"] = m3u8_info["filename"]
# logger.debug(temp1["name"])
# logger.debug(temp1["filename"])
temp1["url"] = m3u8_info["hlsUrl"]
# temp1["url"] = m3u8_info["hlsUrl"]
temp1["size"] = m3u8_info["size"]
temp.append(temp1)
# print(temp)
# logger.info('data', data)
# LogicInflearn.current_data = temp
data["episode"] = temp
LogicInflearn.current_data = data
# logger.debug(data)
return data
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
data["log"] = str(e)
data["ret"] = "error"
return data
except IndexError as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
data["log"] = str(e)
data["ret"] = "error"
return data
@staticmethod
@yommi_logger(logging_type="debug")
def get_title_info(code):
try:
url = "%s/course/%s" % (ModelSetting.get("inflearn_url"), parse.quote(code))
logger.info(url)
html_content = LogicInflearn.get_html(url)
# sys.setrecursionlimit(10**7)
# logger.info(html_content)
# tree = html.fromstring(html_content)
# tree = etree.fromstring(
# html_content, parser=etree.XMLParser(huge_tree=True)
# )
# tree1 = BeautifulSoup(html_content, "lxml")
# tree = etree.HTML(str(soup))
# logger.info(tree)
data = {"code": code, "ret": False}
soup = BeautifulSoup(html_content, "html.parser")
# logger.debug(soup.select_one("div.cd-header__thumbnail-cover"))
data["poster_url"] = soup.select_one(
"div.cd-header__thumbnail-cover div img"
)["src"]
data["title"] = soup.select_one("div.cd-header__title").text
main_title = soup.select_one("div.cd-header__title").text
# data["item_id"] = soup.select_one('meta[property="dable:item_id"]')[
# "content"
# ]
# item_id = data["item_id"]
data["save_folder"] = data["title"]
data["season"] = "1"
# tmp = soup.select("ul > a")
# logger.debug(f"tmp1 size:=> {str(len(tmp))}")
curriculum_content = soup.find_all("a", {"class": "cd-accordion__unit"})
preview_path = []
# for i, elem in enumerate(curriculum_content):
# # print(elem)
# preview_path.append(elem["href"])
# # print(f"{i}. {elem['href']}")
first_item = curriculum_content[0]["href"]
# 미리보기 가능 1번 동영상 뷰 페이지로 이동
# self.getVideoInfo(preview_path[0])
base_url = "https://www.inflearn.com"
url = base_url + parse.quote(first_item)
logger.debug(f"url::::: {url}")
res_data = requests.get(url, timeout=20)
if res_data.url != url:
# redirect occurred; likely symbol doesn't exist or cannot be found.
raise requests.TooManyRedirects()
res_data.raise_for_status()
# soup = BeautifulSoup(resData.text, "html.parser")
soup = BeautifulSoup(res_data.text, "html.parser")
items = soup.find_all("div", attrs={"class": "unit-el"})
# print(len(items))
lecture_list = []
temp = []
# print(type(items))
program = (
db.session.query(ModelInflearnProgram)
.filter_by(programcode=code)
.first()
)
if program is None:
program = ModelInflearnProgram(data)
db.session.add(program)
db.session.commit()
else:
data["save_folder"] = program.save_folder
data["season"] = program.season
# curriculum_urls = []
# for idx, item in enumerate(items):
# data_id = item["data-id"]
# api_url = f"{base_url}/api/course/{code}/lecture/{data_id}"
# curriculum_urls.append(api_url)
#
# ret_data = asyncio.run(get_lecture_infos(curriculum_urls))
#
# logger.debug(f"ret_data():: ret_data=> {ret_data}")
for idx, item in enumerate(items):
#
temp1 = {}
print("idx::", idx)
data_id = item["data-id"]
run_time = ""
title = item.find("div", attrs={"class": "title"}).get_text()
if item.find("span", {"class": "runtime"}) is not None:
run_time = item.find("span", {"class": "runtime"}).get_text()
api_url = f"{base_url}/api/course/{code}/lecture/{data_id}"
temp1["season"] = "1"
LogicInflearn.season = "1"
# logger.debug(api_url)
m3u8_info = LogicInflearn.getM3u8_info(
api_url, LogicInflearn.season, idx
)
# print(api_url)
# print('type::::', type(m3u8_url))
logger.debug(m3u8_info)
# ws.append(
# [
# title,
# data_id,
# run_time,
# api_url,
# m3u8_info["name"],
# m3u8_info["hlsUrl"],
# ]
# )
# temp.append(title, data_id, run_time, api_url,m3u8_info['name'], m3u8_info['hlsUrl'])
# temp1['title'] = title
temp1["save_folder"] = Util.change_text_for_use_filename(
data["save_folder"]
)
# logger.debug(temp1["save_folder"])
tmp_save_path = ModelSetting.get("download_path")
if ModelSetting.get("auto_make_folder") == "True":
program_path = os.path.join(tmp_save_path, temp1["save_folder"])
temp1["save_path"] = program_path
if ModelSetting.get("inflearn_auto_make_season_folder"):
temp1["save_path"] = os.path.join(
temp1["save_path"], "Season %s" % int(temp1["season"])
)
temp1["title"] = title
temp1["data_id"] = data_id
temp1["item_id"] = m3u8_info["data_id"]
temp1["code"] = temp1["item_id"]
temp1["run_time"] = run_time
temp1["api_url"] = api_url
temp1["name"] = m3u8_info["name"]
temp1["filename"] = m3u8_info["filename"]
# logger.debug(temp1["name"])
# logger.debug(temp1["filename"])
temp1["url"] = m3u8_info["hlsUrl"]
# temp1["url"] = m3u8_info["hlsUrl"]
temp1["size"] = m3u8_info["size"]
temp.append(temp1)
# print(temp)
# logger.info('data', data)
# LogicInflearn.current_data = temp
data["episode"] = temp
LogicInflearn.current_data = data
# logger.debug(data)
return data
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
data["log"] = str(e)
data["ret"] = "error"
return data
except IndexError as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
data["log"] = str(e)
data["ret"] = "error"
return data
@staticmethod
def getM3u8_info(url, season, idx):
data_id = ""
m3u8_url = ""
name = ""
size = ""
duration = ""
filename = ""
title = ""
res_data = LogicInflearn.getHtml(url, "json").json()
# logger.info(f"getM3u8_info()::url => {url}")
logger.info(f"getM3u8_info::url => {url}")
# logger.debug("resData::: %s", res_data)
try:
if res_data["course"]["id"] is not None:
data_id = res_data["course"]["id"]
if res_data["course"]["_"]["current_unit"]["title"] is not None:
title = res_data["course"]["_"]["current_unit"]["title"]
if res_data["newBOX"]["video"]["name"] is not None:
name = res_data["newBOX"]["video"]["name"]
filename = f"{title}.{name.split('.')[0]}.S{season.zfill(2)}.E{str(idx).zfill(3)}.{name.split('.')[-1]}"
if res_data["newBOX"]["video"]["vod_info"]["hlsUrl"] is not None:
# logger.debug(res_data["newBOX"]["video"]["vod_info"]["hlsUrl"])
m3u8_url = res_data["newBOX"]["video"]["vod_info"]["hlsUrl"]
size = res_data["newBOX"]["video"]["vod_info"]["size"]
duration = res_data["newBOX"]["video"]["vod_info"]["duration"]
# return {
# "name": name,
# "hlsUrl": m3u8_url,
# "size": size,
# "duration": duration,
# }
except KeyError:
pass
# name = ""
# m3u8_url = ""
# size = None
return {
"data_id": data_id,
"title": title,
"name": name,
"hlsUrl": m3u8_url,
"size": size,
"duration": duration,
"filename": filename,
}
@staticmethod
def getHtml(url, header):
o = parse.urlparse(url)
# print(o)
tmp_url = f"{o.scheme}://{o.netloc}{parse.quote(o.path)}"
# print('tmp_url', tmp_url)
# if (header == 'json'):
# resData = requests.get(tmp_url).json()
# else:
# resData = requests.get(tmp_url)
resData = requests.get(tmp_url)
# print('resData:::', resData)
if (
resData.url != tmp_url
): # redirect occurred; likely symbol doesn't exist or cannot be found.
raise requests.TooManyRedirects()
# print(resHtml.text)
resData.raise_for_status()
return resData
@staticmethod
def get_filename(maintitle, season, title):
try:
# logger.debug("get_filename()===")
# logger.info("title:: %s", title)
# logger.info("maintitle:: %s", maintitle)
match = re.compile(
r"(?P<title>.*?)\s?((?P<season>\d+)기)?\s?((?P<epi_no>\d+)화?)"
).search(title)
if match:
epi_no = int(match.group("epi_no"))
if epi_no < 10:
epi_no = "0%s" % epi_no
else:
epi_no = "%s" % epi_no
if int(season) < 10:
season = "0%s" % season
else:
season = "%s" % season
# title_part = match.group('title').strip()
# ret = '%s.S%sE%s%s.720p-SA.mp4' % (maintitle, season, epi_no, date_str)
ret = "%s.S%sE%s.720p-LK.mp4" % (maintitle, season, epi_no)
else:
logger.debug("NOT MATCH")
ret = "%s.720p-SA.mp4" % maintitle
return Util.change_text_for_use_filename(ret)
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def get_info_by_code(code):
logger.info(f"get_info_by_code: {code}")
# logger.debug(LogicInflearn.current_data)
try:
if LogicInflearn.current_data is not None:
for t in LogicInflearn.current_data["episode"]:
if t["data_id"] == code:
return t
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def scheduler_function():
try:
logger.debug("Linkkf scheduler_function start..")
whitelist_program = ModelSetting.get("whitelist_program")
whitelist_programs = [
x.strip().replace(" ", "")
for x in whitelist_program.replace("\n", ",").split(",")
]
logger.debug(f"whitelist_programs: {whitelist_programs}")
for code in whitelist_programs:
logger.info("auto download start : %s", code)
downloaded = (
db.session.query(ModelInflearn)
.filter(ModelInflearn.completed.is_(True))
.filter_by(programcode=code)
.with_for_update()
.all()
)
# logger.debug(f"downloaded:: {downloaded}")
dl_codes = [dl.episodecode for dl in downloaded]
# logger.debug('dl_codes:: ', dl_codes)
logger.info("downloaded codes :%s", dl_codes)
# if len(dl_codes) > 0:
data = LogicInflearn.get_title_info(code)
for episode in data["episode"]:
e_code = episode["code"]
if e_code not in dl_codes:
logger.info("Logic Queue added :%s", e_code)
LogicQueue.add_queue(episode)
logger.debug("========================================")
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def download(form):
try:
ret = {}
logger.debug("download call")
# ret = None
# options = {
# "save_path": form["save_path"],
# "filename": form["filename"],
# "format": form["format"],
# }
logger.debug(form)
# Todo:
# tmp = LogicQueue.add_queue(form.to_dict())
tmp = LogicQueue.add_youtube_queue(form.to_dict())
logger.debug("add_queue : tmp >> %s", tmp)
# ret["ret"] = "success" if tmp else "fail"
ret["ret"] = tmp
return ret
except Exception as e:
logger.error("Exception:%s", e)
logger.error(traceback.format_exc())
@staticmethod
def reset_db() -> bool:
db.session.query(ModelInflearn).delete()
db.session.commit()
return True
@staticmethod
def get_excel_info():
_path_dir = "/WD/Users/yommi/Work/fastapi/app/inflearn_xlsx"
file_list = os.listdir(_path_dir)
# logger.debug(file_list)
return file_list