mirror of
https://github.com/peter-tanner/Blackboard-marks.git
synced 2024-11-30 11:40:16 +08:00
Download feedback in comments, and format code
This commit is contained in:
parent
e3ed2765d6
commit
234cd31bf4
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -3,4 +3,6 @@ tmp/
|
||||||
__pycache__
|
__pycache__
|
||||||
chromedriver*
|
chromedriver*
|
||||||
test*
|
test*
|
||||||
.vscode/
|
.vscode/
|
||||||
|
*.7z
|
||||||
|
*.tar
|
144
main.py
144
main.py
|
@ -27,31 +27,37 @@ from selenium.common.exceptions import ElementNotInteractableException
|
||||||
testing = False
|
testing = False
|
||||||
try:
|
try:
|
||||||
testing = True
|
testing = True
|
||||||
from utils.test import get_etc
|
from utils.test import get_etc
|
||||||
except:
|
except:
|
||||||
def get_etc(*args): return False
|
def get_etc(*args): return False
|
||||||
|
|
||||||
|
|
||||||
# stupid bug
|
# stupid bug
|
||||||
def click_the_fing_button(driver,button):
|
def click_the_fing_button(driver, button):
|
||||||
try:
|
try:
|
||||||
ActionChains(driver).move_to_element(button)
|
ActionChains(driver).move_to_element(button)
|
||||||
ActionChains(driver).click(button).perform()
|
ActionChains(driver).click(button).perform()
|
||||||
WebDriverWait(driver,2).until(EC.number_of_windows_to_be(2))
|
WebDriverWait(driver, 2).until(EC.number_of_windows_to_be(2))
|
||||||
except:
|
except:
|
||||||
driver.set_window_size(1024, 768) # hack to wake selenium up when it doesnt want to click the button!
|
# hack to wake selenium up when it doesnt want to click the button!
|
||||||
click_the_fing_button(driver,button)
|
driver.set_window_size(1024, 768)
|
||||||
|
click_the_fing_button(driver, button)
|
||||||
driver.maximize_window()
|
driver.maximize_window()
|
||||||
|
|
||||||
# You can probably replace this with a recursive method like in blackboard scraper but tbh i just want to get this script done so i can stop working for once.
|
# You can probably replace this with a recursive method like in blackboard scraper but tbh i just want to get this script done so i can stop working for once.
|
||||||
def scrape_further(driver,path,session):
|
|
||||||
|
|
||||||
|
def scrape_further(driver, path, session):
|
||||||
# attempts for bb-held tests
|
# attempts for bb-held tests
|
||||||
attempts = driver.find_elements(By.XPATH, "//a[starts-with(@href, '/webapps/assessment')]")
|
attempts = driver.find_elements(
|
||||||
attempts = [ x.get_attribute('href') for x in attempts ]
|
By.XPATH, "//a[starts-with(@href, '/webapps/assessment')]")
|
||||||
|
attempts = [x.get_attribute('href') for x in attempts]
|
||||||
for i, attempt in enumerate(attempts):
|
for i, attempt in enumerate(attempts):
|
||||||
name = "attempt_"+str(i)+"_["+parse_qs(urlparse(attempt).query)['attempt_id'][0]+"]"
|
name = "attempt_" + \
|
||||||
attempt = re.sub("^"+BASE_URL,"",attempt)
|
str(i)+"_["+parse_qs(urlparse(attempt).query)['attempt_id'][0]+"]"
|
||||||
|
attempt = re.sub("^"+BASE_URL, "", attempt)
|
||||||
driver.execute_script("window.open('"+BASE_URL+attempt+"')")
|
driver.execute_script("window.open('"+BASE_URL+attempt+"')")
|
||||||
WebDriverWait(driver,10).until(EC.number_of_windows_to_be(3))
|
WebDriverWait(driver, 10).until(EC.number_of_windows_to_be(3))
|
||||||
driver.switch_to.window(driver.window_handles[2])
|
driver.switch_to.window(driver.window_handles[2])
|
||||||
save_html(path, name, driver.page_source)
|
save_html(path, name, driver.page_source)
|
||||||
if testing:
|
if testing:
|
||||||
|
@ -59,25 +65,36 @@ def scrape_further(driver,path,session):
|
||||||
driver.close()
|
driver.close()
|
||||||
driver.switch_to.window(driver.window_handles[1])
|
driver.switch_to.window(driver.window_handles[1])
|
||||||
|
|
||||||
# submission file for assignment
|
# Comments may contain feedback links
|
||||||
request_stack = RequestStack(session)
|
request_stack = RequestStack(session)
|
||||||
attempts = driver.find_elements(By.XPATH, "//a[starts-with(@href, '/webapps/assignment/download')]")
|
etc_files = driver.find_elements(
|
||||||
attempts = [ x.get_attribute('href') for x in attempts ]
|
By.XPATH, "//a[contains(@href, '/bbcswebdav')]")
|
||||||
|
etc_files = [x.get_attribute('href') for x in etc_files]
|
||||||
|
for i, item in enumerate(etc_files):
|
||||||
|
if (not item is None) and ("bbcswebdav" in item):
|
||||||
|
request_stack.add_file(item, path)
|
||||||
|
|
||||||
|
# submission file for assignment
|
||||||
|
attempts = driver.find_elements(
|
||||||
|
By.XPATH, "//a[starts-with(@href, '/webapps/assignment/download')]")
|
||||||
|
attempts = [x.get_attribute('href') for x in attempts]
|
||||||
for i, attempt in enumerate(attempts):
|
for i, attempt in enumerate(attempts):
|
||||||
request_stack.add_file(attempt,path)
|
request_stack.add_file(attempt, path)
|
||||||
|
|
||||||
get_feedback = False
|
get_feedback = False
|
||||||
try:
|
try:
|
||||||
# download button causes a tab to appear quickly, download, then disappear
|
# download button causes a tab to appear quickly, download, then disappear
|
||||||
# need to capture the url to get the metadata and dl to the correct location
|
# need to capture the url to get the metadata and dl to the correct location
|
||||||
# cant be arsed to figure out how the pspdfkit js that executes this download works.
|
# cant be arsed to figure out how the pspdfkit js that executes this download works.
|
||||||
SwitchToIFrame(driver, (By.XPATH, "//iframe[@class='docviewer_iframe_embed']"))
|
SwitchToIFrame(
|
||||||
|
driver, (By.XPATH, "//iframe[@class='docviewer_iframe_embed']"))
|
||||||
SwitchToIFrame(driver, (By.XPATH, "//iframe[@title='PSPDFKit']"))
|
SwitchToIFrame(driver, (By.XPATH, "//iframe[@title='PSPDFKit']"))
|
||||||
get_feedback = True
|
get_feedback = True
|
||||||
except:
|
except:
|
||||||
print("No feedback to download")
|
print("No feedback to download")
|
||||||
if get_feedback:
|
if get_feedback:
|
||||||
dl_button = WaitClickable(driver,(By.XPATH, "//button[contains(@class,'PSPDFKit-Toolbar-Button PSPDFKit-Tool-Button')][@title='Download']"))
|
dl_button = WaitClickable(
|
||||||
|
driver, (By.XPATH, "//button[contains(@class,'PSPDFKit-Toolbar-Button PSPDFKit-Tool-Button')][@title='Download']"))
|
||||||
dl_button.click()
|
dl_button.click()
|
||||||
download_file(path)
|
download_file(path)
|
||||||
request_stack.download_all()
|
request_stack.download_all()
|
||||||
|
@ -86,34 +103,37 @@ def scrape_further(driver,path,session):
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description='Automated microsoft SSO login.')
|
parser = argparse.ArgumentParser(description='Automated microsoft SSO login.')
|
||||||
# parser.add_argument("-p", "--password", help="Automatically use provided password", default="")
|
# parser.add_argument("-p", "--password", help="Automatically use provided password", default="")
|
||||||
parser.add_argument("-u", "--username", help="Automatically use provided userID", default="")
|
parser.add_argument("-u", "--username",
|
||||||
|
help="Automatically use provided userID", default="")
|
||||||
|
|
||||||
path = ['grades']
|
path = ['grades']
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
CAPABILITIES = DesiredCapabilities.CHROME
|
CAPABILITIES = DesiredCapabilities.CHROME
|
||||||
CAPABILITIES['goog:loggingPrefs'] = {
|
CAPABILITIES['goog:loggingPrefs'] = {
|
||||||
'performance' : 'ALL',
|
'performance': 'ALL',
|
||||||
}
|
}
|
||||||
|
|
||||||
for f in os.listdir(DL_DIR):
|
for f in os.listdir(DL_DIR):
|
||||||
os.remove(Path(DL_DIR).joinpath(f))
|
os.remove(Path(DL_DIR).joinpath(f))
|
||||||
prefs = {
|
prefs = {
|
||||||
"profile.default_content_settings.popups": 0,
|
"profile.default_content_settings.popups": 0,
|
||||||
"download.default_directory": DL_DIR,
|
"download.default_directory": DL_DIR,
|
||||||
"directory_upgrade": True
|
"directory_upgrade": True
|
||||||
}
|
}
|
||||||
OPTIONS = Options()
|
OPTIONS = Options()
|
||||||
|
OPTIONS.add_argument('--no-sandbox')
|
||||||
|
OPTIONS.add_argument('--disable-dev-shm-usage')
|
||||||
OPTIONS.add_experimental_option("prefs", prefs)
|
OPTIONS.add_experimental_option("prefs", prefs)
|
||||||
# OPTIONS.add_argument("--headless")
|
# OPTIONS.add_argument("--headless")
|
||||||
driver = webdriver.Chrome(
|
driver = webdriver.Chrome(
|
||||||
executable_path='chromedriver.exe',
|
executable_path='chromedriver.exe',
|
||||||
desired_capabilities=CAPABILITIES,
|
desired_capabilities=CAPABILITIES,
|
||||||
options=OPTIONS
|
options=OPTIONS
|
||||||
)
|
)
|
||||||
driver.maximize_window()
|
driver.maximize_window()
|
||||||
|
|
||||||
cookies = login(args, driver) # do Login.
|
cookies = login(args, driver) # do Login.
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
for cookie in cookies:
|
for cookie in cookies:
|
||||||
session.cookies.set(cookie["name"], cookie["value"])
|
session.cookies.set(cookie["name"], cookie["value"])
|
||||||
|
@ -121,13 +141,15 @@ for cookie in cookies:
|
||||||
# need to load this page JUST to remove the tos warning so it doesnt fuck up everything down the line.
|
# need to load this page JUST to remove the tos warning so it doesnt fuck up everything down the line.
|
||||||
driver.get(BASE_URL+"/webapps/gradebook/do/student/viewCourses")
|
driver.get(BASE_URL+"/webapps/gradebook/do/student/viewCourses")
|
||||||
try:
|
try:
|
||||||
WaitClickable(driver,(By.CLASS_NAME, "button-1")).click()
|
WaitClickable(driver, (By.CLASS_NAME, "button-1")).click()
|
||||||
except:
|
except:
|
||||||
print("no tos warning - skipped")
|
print("no tos warning - skipped")
|
||||||
|
|
||||||
driver.get(BASE_URL+"/webapps/streamViewer/streamViewer?cmd=view&streamName=mygrades")
|
driver.get(
|
||||||
|
BASE_URL+"/webapps/streamViewer/streamViewer?cmd=view&streamName=mygrades")
|
||||||
save_html(sep.join(path), 'entrypoint', driver.page_source)
|
save_html(sep.join(path), 'entrypoint', driver.page_source)
|
||||||
|
|
||||||
|
WaitClickable(driver, (By.ID, "left_stream_mygrades"))
|
||||||
# get courseIDs
|
# get courseIDs
|
||||||
courses = driver.find_element(By.ID, "left_stream_mygrades")\
|
courses = driver.find_element(By.ID, "left_stream_mygrades")\
|
||||||
.find_elements(By.XPATH, "//div[@role='tab']")
|
.find_elements(By.XPATH, "//div[@role='tab']")
|
||||||
|
@ -137,15 +159,17 @@ for i, course_results in enumerate(courses):
|
||||||
course_results = courses[i]
|
course_results = courses[i]
|
||||||
ActionChains(driver).move_to_element(course_results).perform()
|
ActionChains(driver).move_to_element(course_results).perform()
|
||||||
course_url = course_results.get_attribute("bb:rhs")
|
course_url = course_results.get_attribute("bb:rhs")
|
||||||
course_name = course_results.find_elements(By.XPATH, "//span[@class='stream_area_name']")[i].text
|
course_name = course_results.find_elements(
|
||||||
course_name += " ["+parse_qs(urlparse(course_url).query)['course_id'][0]+"]"
|
By.XPATH, "//span[@class='stream_area_name']")[i].text
|
||||||
|
course_name += " [" + \
|
||||||
|
parse_qs(urlparse(course_url).query)['course_id'][0]+"]"
|
||||||
course_details.append({
|
course_details.append({
|
||||||
'name': course_name,
|
'name': course_name,
|
||||||
'url' : course_url
|
'url': course_url
|
||||||
})
|
})
|
||||||
|
|
||||||
for i, course in enumerate(course_details):
|
for i, course in enumerate(course_details):
|
||||||
path.append(course['name']) # course name
|
path.append(course['name']) # course name
|
||||||
print(course['name'])
|
print(course['name'])
|
||||||
driver.get(BASE_URL+course['url'])
|
driver.get(BASE_URL+course['url'])
|
||||||
|
|
||||||
|
@ -155,8 +179,8 @@ for i, course in enumerate(course_details):
|
||||||
}
|
}
|
||||||
""")
|
""")
|
||||||
|
|
||||||
WaitClickable(driver,(By.XPATH,"//a[@value='A']")).click()
|
WaitClickable(driver, (By.XPATH, "//a[@value='A']")).click()
|
||||||
WaitClickable(driver,(By.XPATH,"//a[@value='A']")).click()
|
WaitClickable(driver, (By.XPATH, "//a[@value='A']")).click()
|
||||||
|
|
||||||
table = driver.find_elements(By.XPATH, "//div[@id='grades_wrapper']/div")
|
table = driver.find_elements(By.XPATH, "//div[@id='grades_wrapper']/div")
|
||||||
|
|
||||||
|
@ -167,20 +191,23 @@ for i, course in enumerate(course_details):
|
||||||
assignment_name = None
|
assignment_name = None
|
||||||
information_link = False
|
information_link = False
|
||||||
try:
|
try:
|
||||||
block = assignment.find_element(By.XPATH, "./div[@class='cell gradable']/a[@onclick]")
|
block = assignment.find_element(
|
||||||
|
By.XPATH, "./div[@class='cell gradable']/a[@onclick]")
|
||||||
information_link = True
|
information_link = True
|
||||||
except:
|
except:
|
||||||
block = assignment.find_element(By.XPATH, "./div[@class='cell gradable']")
|
block = assignment.find_element(
|
||||||
assignment_name = get_assignment_name(driver,block)
|
By.XPATH, "./div[@class='cell gradable']")
|
||||||
|
assignment_name = get_assignment_name(driver, block)
|
||||||
path.append(assignment_name)
|
path.append(assignment_name)
|
||||||
# download information if it exists.
|
# download information if it exists.
|
||||||
if information_link:
|
if information_link:
|
||||||
try:
|
try:
|
||||||
ActionChains(driver).move_to_element(block).click(block).perform()
|
ActionChains(driver).move_to_element(
|
||||||
|
block).click(block).perform()
|
||||||
print("Switched "+assignment_name)
|
print("Switched "+assignment_name)
|
||||||
WebDriverWait(driver,10).until(EC.number_of_windows_to_be(2))
|
WebDriverWait(driver, 10).until(EC.number_of_windows_to_be(2))
|
||||||
driver.switch_to.window(driver.window_handles[1])
|
driver.switch_to.window(driver.window_handles[1])
|
||||||
save_html(sep.join(path),"information",driver.page_source)
|
save_html(sep.join(path), "information", driver.page_source)
|
||||||
scrape_further(driver, sep.join(path), session)
|
scrape_further(driver, sep.join(path), session)
|
||||||
driver.close()
|
driver.close()
|
||||||
driver.switch_to.window(driver.window_handles[0])
|
driver.switch_to.window(driver.window_handles[0])
|
||||||
|
@ -190,31 +217,36 @@ for i, course in enumerate(course_details):
|
||||||
for button in buttons:
|
for button in buttons:
|
||||||
action = button.get_attribute("onclick")
|
action = button.get_attribute("onclick")
|
||||||
if action != None and "showInLightBox" not in action:
|
if action != None and "showInLightBox" not in action:
|
||||||
click_the_fing_button(driver,button)
|
click_the_fing_button(driver, button)
|
||||||
driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
|
driver.execute_script(
|
||||||
|
"window.scrollTo(0, document.body.scrollHeight)")
|
||||||
driver.switch_to.window(driver.window_handles[1])
|
driver.switch_to.window(driver.window_handles[1])
|
||||||
WaitDiv(driver, (By.CLASS_NAME, "rubricControlContainer"))
|
WaitDiv(driver, (By.CLASS_NAME, "rubricControlContainer"))
|
||||||
save_html(sep.join(path),"rubric",driver.page_source)
|
save_html(sep.join(path), "rubric", driver.page_source)
|
||||||
driver.find_element(By.XPATH, "//li[@id='listViewTab']/a").click()
|
driver.find_element(
|
||||||
|
By.XPATH, "//li[@id='listViewTab']/a").click()
|
||||||
WaitDiv(driver, (By.CLASS_NAME, "rubricGradingList"))
|
WaitDiv(driver, (By.CLASS_NAME, "rubricGradingList"))
|
||||||
save_html(sep.join(path),"list",driver.page_source)
|
save_html(sep.join(path), "list", driver.page_source)
|
||||||
detailed_buttons = driver.find_elements(By.XPATH, "//div[@class='u_controlsWrapper']/input")
|
detailed_buttons = driver.find_elements(
|
||||||
|
By.XPATH, "//div[@class='u_controlsWrapper']/input")
|
||||||
detailed_buttons[1].click()
|
detailed_buttons[1].click()
|
||||||
detailed_buttons[0].click()
|
detailed_buttons[0].click()
|
||||||
save_html(sep.join(path),"list_detailed",driver.page_source)
|
save_html(sep.join(path), "list_detailed", driver.page_source)
|
||||||
driver.close()
|
driver.close()
|
||||||
driver.switch_to.window(driver.window_handles[0])
|
driver.switch_to.window(driver.window_handles[0])
|
||||||
path.pop()
|
path.pop()
|
||||||
save_html(sep.join(path), path[0], driver.page_source)
|
save_html(sep.join(path), path[0], driver.page_source)
|
||||||
WaitClickable(driver,(By.XPATH,"//a[@value='S']")).click()
|
WaitClickable(driver, (By.XPATH, "//a[@value='S']")).click()
|
||||||
save_html(sep.join(path),"submitted",driver.page_source)
|
save_html(sep.join(path), "submitted", driver.page_source)
|
||||||
try:
|
try:
|
||||||
WaitClickable(driver,(By.XPATH,"//div[@id='submissionReceipts']//a")).click()
|
WaitClickable(
|
||||||
WaitClickable(driver,(By.XPATH,"//div[@id='listContainer_itemcount']//a[@class='pagelink']")).click()
|
driver, (By.XPATH, "//div[@id='submissionReceipts']//a")).click()
|
||||||
|
WaitClickable(
|
||||||
|
driver, (By.XPATH, "//div[@id='listContainer_itemcount']//a[@class='pagelink']")).click()
|
||||||
except:
|
except:
|
||||||
print('No items?')
|
print('No items?')
|
||||||
save_html(sep.join(path),"receipts",driver.page_source)
|
save_html(sep.join(path), "receipts", driver.page_source)
|
||||||
path.pop()
|
path.pop()
|
||||||
|
|
||||||
|
|
||||||
driver.quit()
|
driver.quit()
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
# https://stackoverflow.com/a/49375740
|
# https://stackoverflow.com/a/49375740
|
||||||
import os, sys
|
import os
|
||||||
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
|
import sys
|
||||||
|
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
|
||||||
|
|
|
@ -6,6 +6,7 @@ import shutil
|
||||||
import csv
|
import csv
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
def convert_filename(name, hash):
|
def convert_filename(name, hash):
|
||||||
_name = name.split('.')
|
_name = name.split('.')
|
||||||
if len(_name) > 1:
|
if len(_name) > 1:
|
||||||
|
@ -14,30 +15,33 @@ def convert_filename(name, hash):
|
||||||
_name[0] += ("["+hash+"]")
|
_name[0] += ("["+hash+"]")
|
||||||
return '.'.join(_name)
|
return '.'.join(_name)
|
||||||
|
|
||||||
|
|
||||||
class RequestStack:
|
class RequestStack:
|
||||||
def __init__(self,token):
|
def __init__(self, token):
|
||||||
self.request_stack = []
|
self.request_stack = []
|
||||||
self.token = token
|
self.token = token
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
def add_file(self,url,path):
|
def add_file(self, url, path):
|
||||||
self.request_stack.append(Asset(url,path))
|
self.request_stack.append(Asset(url, path))
|
||||||
|
|
||||||
def download_all(self):
|
def download_all(self):
|
||||||
for file in self.request_stack:
|
for file in self.request_stack:
|
||||||
print(f"\tDownloading {file.url}")
|
print(f"\tDownloading {file.url}")
|
||||||
file.download(self.token)
|
file.download(self.token)
|
||||||
|
|
||||||
|
|
||||||
class Asset:
|
class Asset:
|
||||||
def __init__(self,url,path):
|
def __init__(self, url, path):
|
||||||
self.path = Path(path)
|
self.path = Path(path)
|
||||||
self.url = re.sub("^"+BASE_URL,"",url)
|
self.url = re.sub("^"+BASE_URL, "", url)
|
||||||
# self.file_id = re.findall('file_id=(.+)&',url)
|
# self.file_id = re.findall('file_id=(.+)&',url)
|
||||||
self.path.mkdir(parents=True, exist_ok=True)
|
self.path.mkdir(parents=True, exist_ok=True)
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
def download(self,session):
|
def download(self, session):
|
||||||
response = session.get(BASE_URL+self.url, stream=True, allow_redirects=False)
|
response = session.get(
|
||||||
|
BASE_URL+self.url, stream=True, allow_redirects=False)
|
||||||
headers = response.headers
|
headers = response.headers
|
||||||
if response.status_code == 302 and len(headers['location']) > 0:
|
if response.status_code == 302 and len(headers['location']) > 0:
|
||||||
Asset(headers['location'], self.path).download(session)
|
Asset(headers['location'], self.path).download(session)
|
||||||
|
@ -45,24 +49,28 @@ class Asset:
|
||||||
elif response.status_code != 200:
|
elif response.status_code != 200:
|
||||||
print("[!] Error "+str(response.status_code))
|
print("[!] Error "+str(response.status_code))
|
||||||
return response.status_code
|
return response.status_code
|
||||||
headers = { x:re.sub(r'^"*|"*?$', '', headers.get(x)) for x in headers } # ewww regex
|
headers = {x: re.sub(r'^"*|"*?$', '', headers.get(x))
|
||||||
|
for x in headers} # ewww regex
|
||||||
if 'Content-Disposition' in headers.keys():
|
if 'Content-Disposition' in headers.keys():
|
||||||
self.original_filename = re.findall('filename="(.+)"', headers['Content-Disposition'])[0]
|
self.original_filename = re.findall(
|
||||||
|
'filename="(.+)"', headers['Content-Disposition'])[0]
|
||||||
else:
|
else:
|
||||||
self.original_filename = re.sub(".*/","",self.url)
|
self.original_filename = re.sub(".*/", "", self.url)
|
||||||
self.etag_hash = hashlib.md5(headers['ETag'].encode()).hexdigest()
|
self.etag_hash = hashlib.md5(headers['ETag'].encode()).hexdigest()
|
||||||
self.filename = convert_filename(self.original_filename, self.etag_hash[0:6])
|
self.filename = convert_filename(
|
||||||
|
self.original_filename, self.etag_hash[0:6])
|
||||||
|
|
||||||
with open(self.path.joinpath(self.filename), 'wb') as f:
|
with open(self.path.joinpath(self.filename), 'wb') as f:
|
||||||
shutil.copyfileobj(response.raw, f)
|
shutil.copyfileobj(response.raw, f)
|
||||||
self.write_metadata(headers)
|
self.write_metadata(headers)
|
||||||
|
|
||||||
def write_metadata(self,headers):
|
def write_metadata(self, headers):
|
||||||
metacsv = [
|
metacsv = [
|
||||||
["original_filename", self.original_filename],
|
["original_filename", self.original_filename],
|
||||||
["readable_filename", self.filename],
|
["readable_filename", self.filename],
|
||||||
["url", self.url],
|
["url", self.url],
|
||||||
["pathhash", hashlib.md5(self.url.encode()).hexdigest()],
|
["pathhash", hashlib.md5(
|
||||||
|
self.url.encode()).hexdigest()],
|
||||||
["etag", headers['ETag']],
|
["etag", headers['ETag']],
|
||||||
["etaghash", self.etag_hash],
|
["etaghash", self.etag_hash],
|
||||||
["last-modified", headers["Last-Modified"]],
|
["last-modified", headers["Last-Modified"]],
|
||||||
|
@ -73,4 +81,4 @@ class Asset:
|
||||||
csvpath.mkdir(parents=True, exist_ok=True)
|
csvpath.mkdir(parents=True, exist_ok=True)
|
||||||
with open(csvpath.joinpath(self.filename+"__metadata.csv"), "w", newline="") as f:
|
with open(csvpath.joinpath(self.filename+"__metadata.csv"), "w", newline="") as f:
|
||||||
writer = csv.writer(f)
|
writer = csv.writer(f)
|
||||||
writer.writerows(metacsv)
|
writer.writerows(metacsv)
|
||||||
|
|
|
@ -9,6 +9,7 @@ from constants.constants import BASE_URL
|
||||||
import re
|
import re
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
||||||
def login(args, driver):
|
def login(args, driver):
|
||||||
driver.get(BASE_URL)
|
driver.get(BASE_URL)
|
||||||
USERNAME = args.username
|
USERNAME = args.username
|
||||||
|
@ -19,25 +20,27 @@ def login(args, driver):
|
||||||
print('Password: ')
|
print('Password: ')
|
||||||
PASSWORD = getpass('')
|
PASSWORD = getpass('')
|
||||||
|
|
||||||
WaitClickable(driver,Selectors.BOX_USERNAME).send_keys(USERNAME)
|
WaitClickable(driver, Selectors.BOX_USERNAME).send_keys(USERNAME)
|
||||||
WaitClickable(driver,Selectors.BUTTON_NEXT).click()
|
WaitClickable(driver, Selectors.BUTTON_NEXT).click()
|
||||||
print('Entered username.')
|
print('Entered username.')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
WaitClickable(driver,Selectors.BOX_PASSWORD).send_keys(PASSWORD)
|
WaitClickable(driver, Selectors.BOX_PASSWORD).send_keys(PASSWORD)
|
||||||
WaitClickable(driver,Selectors.BUTTON_NEXT).click()
|
WaitClickable(driver, Selectors.BUTTON_NEXT).click()
|
||||||
print('Entered password.')
|
print('Entered password.')
|
||||||
except:
|
except:
|
||||||
print(WebDriverWait(driver, 1).until(EC.visibility_of_element_located(Selectors.DIV_USERERROR)).text)
|
print(WebDriverWait(driver, 1).until(
|
||||||
|
EC.visibility_of_element_located(Selectors.DIV_USERERROR)).text)
|
||||||
driver.quit()
|
driver.quit()
|
||||||
exit(2)
|
exit(2)
|
||||||
|
|
||||||
WaitClickable(driver,Selectors.BUTTON_DENY).click()
|
WaitClickable(driver, Selectors.BUTTON_DENY).click()
|
||||||
# WaitClickable(driver,BUTTON_NEXT).click() #IF you want to remember credentials, switch these comments
|
# WaitClickable(driver,BUTTON_NEXT).click() #IF you want to remember credentials, switch these comments
|
||||||
|
|
||||||
cookie = driver.get_cookies()
|
cookie = driver.get_cookies()
|
||||||
if not cookie == None: return cookie
|
if not cookie == None:
|
||||||
|
return cookie
|
||||||
|
|
||||||
print('Could not get auth cookie - Invalid ID or password?', file=sys.stderr)
|
print('Could not get auth cookie - Invalid ID or password?', file=sys.stderr)
|
||||||
driver.quit()
|
driver.quit()
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
from selenium.webdriver.common.by import By
|
from selenium.webdriver.common.by import By
|
||||||
|
|
||||||
|
|
||||||
class Selectors:
|
class Selectors:
|
||||||
# Microsoft login
|
# Microsoft login
|
||||||
BOX_USERNAME = (By.ID, "i0116")
|
BOX_USERNAME = (By.ID, "i0116")
|
||||||
|
@ -7,4 +8,4 @@ class Selectors:
|
||||||
DIV_USERERROR = (By.ID, 'usernameError')
|
DIV_USERERROR = (By.ID, 'usernameError')
|
||||||
BUTTON_NEXT = (By.ID, "idSIButton9")
|
BUTTON_NEXT = (By.ID, "idSIButton9")
|
||||||
BUTTON_DENY = (By.ID, "idBtn_Back")
|
BUTTON_DENY = (By.ID, "idBtn_Back")
|
||||||
# Selectors for grades
|
# Selectors for grades
|
||||||
|
|
|
@ -7,41 +7,41 @@ import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
|
|
||||||
def friendly_filename(name):
|
def friendly_filename(name):
|
||||||
name = friendly_dirname(name)
|
name = friendly_dirname(name)
|
||||||
return re.sub("[\\\/]",'',name)
|
return re.sub("[\\\/]", '', name)
|
||||||
|
|
||||||
|
|
||||||
def friendly_dirname(name):
|
def friendly_dirname(name):
|
||||||
#.gsub(/[^\w\s_-]+/, '')
|
# .gsub(/[^\w\s_-]+/, '')
|
||||||
# .gsub(/\s+/, '_')
|
# .gsub(/\s+/, '_')
|
||||||
# pipeline:
|
# pipeline:
|
||||||
name = re.sub("[\x00-\x1f]",'',name)
|
name = re.sub("[\x00-\x1f]", '', name)
|
||||||
name = re.sub("[\:\<\>\"\|\?\*]",'',name)
|
name = re.sub("[\:\<\>\"\|\?\*]", '', name)
|
||||||
name = re.sub("(^|\b\s)\s+($|\s?\b)", '\\1\\2', name)
|
name = re.sub("(^|\b\s)\s+($|\s?\b)", '\\1\\2', name)
|
||||||
return name.strip()
|
return name.strip()
|
||||||
|
|
||||||
|
|
||||||
def get_assignment_name(driver,block):
|
def get_assignment_name(driver, block):
|
||||||
s = friendly_filename(get_text_excluding_children(driver,block))
|
s = friendly_filename(get_text_excluding_children(driver, block))
|
||||||
print("Assesment: "+s)
|
print("Assesment: "+s)
|
||||||
return s
|
return s
|
||||||
|
|
||||||
def save_html(dir,filename,page_source):
|
|
||||||
|
def save_html(dir, filename, page_source):
|
||||||
dir = pathlib.Path(friendly_dirname(dir))
|
dir = pathlib.Path(friendly_dirname(dir))
|
||||||
dir.mkdir(parents=True, exist_ok=True)
|
dir.mkdir(parents=True, exist_ok=True)
|
||||||
file = dir.joinpath(friendly_filename(filename)+".html")
|
file = dir.joinpath(friendly_filename(filename)+".html")
|
||||||
with open(file, "w", encoding="utf-8") as f:
|
with open(file, "w", encoding="utf-8") as f:
|
||||||
f.write(page_source)
|
f.write(page_source)
|
||||||
|
|
||||||
# Why is it so hard to just get the url of a single tab...
|
# NOTE: Switching to a "download" tab causes issues so we must use the in built
|
||||||
# def get_fast_dl(driver,button):
|
# download in Chrome, which does not have etag or metadata information.
|
||||||
# windows = len(driver.window_handles)
|
# Files are using annotate-au.foundations.blackboard.com and not bbcswebdav system
|
||||||
# return
|
|
||||||
|
|
||||||
# Because selenium seems to fuck up the url switching to a "download" tab,
|
|
||||||
# I have to use the inbuilt download in chrome :(. That also means no etag/metadata
|
|
||||||
# but to be honest it's using annotate-au.foundations.blackboard.com and not bbcswebdav system
|
|
||||||
# so the tag may not exist in the first place.
|
# so the tag may not exist in the first place.
|
||||||
|
|
||||||
|
|
||||||
def download_file(dest):
|
def download_file(dest):
|
||||||
d = Path(DL_DIR)
|
d = Path(DL_DIR)
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
@ -56,10 +56,10 @@ def download_file(dest):
|
||||||
else:
|
else:
|
||||||
_dest = Path(dest).joinpath("MARKED__"+f)
|
_dest = Path(dest).joinpath("MARKED__"+f)
|
||||||
try:
|
try:
|
||||||
shutil.move(d.joinpath(f),_dest)
|
shutil.move(d.joinpath(f), _dest)
|
||||||
except shutil.SameFileError:
|
except shutil.SameFileError:
|
||||||
os.remove(_dest)
|
os.remove(_dest)
|
||||||
shutil.move(d.joinpath(f),_dest)
|
shutil.move(d.joinpath(f), _dest)
|
||||||
|
|
||||||
if len(os.listdir(d)) == 0:
|
if len(os.listdir(d)) == 0:
|
||||||
downloading = False
|
downloading = False
|
||||||
|
@ -71,4 +71,4 @@ def get_text_excluding_children(driver, element):
|
||||||
return jQuery(arguments[0]).contents().filter(function() {
|
return jQuery(arguments[0]).contents().filter(function() {
|
||||||
return this.nodeType == Node.TEXT_NODE;
|
return this.nodeType == Node.TEXT_NODE;
|
||||||
}).text();
|
}).text();
|
||||||
""", element)
|
""", element)
|
||||||
|
|
|
@ -2,6 +2,15 @@ from selenium.webdriver.support.wait import WebDriverWait
|
||||||
from selenium.webdriver.support import expected_conditions as EC
|
from selenium.webdriver.support import expected_conditions as EC
|
||||||
timeout = 5
|
timeout = 5
|
||||||
# find_element_safe = lambda name,timeout=30:WebDriverWait(driver, timeout).until(lambda x: x.find_element(By.ID, name))
|
# find_element_safe = lambda name,timeout=30:WebDriverWait(driver, timeout).until(lambda x: x.find_element(By.ID, name))
|
||||||
WaitClickable = lambda driver,locator:WebDriverWait(driver, timeout).until(EC.element_to_be_clickable(locator))
|
|
||||||
WaitDiv = lambda driver,locator:WebDriverWait(driver, timeout).until(EC.presence_of_element_located(locator))
|
|
||||||
SwitchToIFrame = lambda driver,locator:WebDriverWait(driver, timeout).until(EC.frame_to_be_available_and_switch_to_it(locator))
|
def WaitClickable(driver, locator): return WebDriverWait(
|
||||||
|
driver, timeout).until(EC.element_to_be_clickable(locator))
|
||||||
|
|
||||||
|
|
||||||
|
def WaitDiv(driver, locator): return WebDriverWait(
|
||||||
|
driver, timeout).until(EC.presence_of_element_located(locator))
|
||||||
|
|
||||||
|
|
||||||
|
def SwitchToIFrame(driver, locator): return WebDriverWait(
|
||||||
|
driver, timeout).until(EC.frame_to_be_available_and_switch_to_it(locator))
|
||||||
|
|
Loading…
Reference in New Issue
Block a user