Skip to content

Commit

Permalink
fix: Use PurePosixPath is URL paths to make the library windows compa…
Browse files Browse the repository at this point in the history
…tible
  • Loading branch information
Rafiot committed Feb 8, 2024
1 parent 3be25c9 commit f8306d5
Showing 1 changed file with 28 additions and 29 deletions.
57 changes: 28 additions & 29 deletions pylookyloo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from io import BytesIO, StringIO
from typing import Optional, Dict, Any, List, Union, TypedDict, overload
from urllib.parse import urljoin, urlparse
from pathlib import Path
from pathlib import PurePosixPath, Path

import requests

Expand Down Expand Up @@ -91,22 +91,22 @@ def get_status(self, tree_uuid: str) -> Dict[str, Any]:
* 1: The capture is ready.
* 2: The capture is ongoing and will be ready soon.
'''
r = self.session.get(urljoin(self.root_url, str(Path('json', tree_uuid, 'status'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', tree_uuid, 'status'))))
return r.json()

def get_capture_stats(self, tree_uuid: str) -> Dict[str, Any]:
'''Get statistics of the capture'''
r = self.session.get(urljoin(self.root_url, str(Path('json', tree_uuid, 'stats'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', tree_uuid, 'stats'))))
return r.json()

def get_info(self, tree_uuid: str) -> Dict[str, Any]:
'''Get information about the capture (url, timestamp, user agent)'''
r = self.session.get(urljoin(self.root_url, str(Path('json', tree_uuid, 'info'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', tree_uuid, 'info'))))
return r.json()

def get_comparables(self, tree_uuid: str) -> Dict[str, Any]:
'''Get comparable information from the capture'''
r = self.session.get(urljoin(self.root_url, str(Path('json', tree_uuid, 'comparables'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', tree_uuid, 'comparables'))))
return r.json()

def enqueue(self, url: Optional[str]=None, quiet: bool=False, # type: ignore[no-untyped-def]
Expand Down Expand Up @@ -264,7 +264,7 @@ def submit(self, *, quiet: bool=False,
def get_apikey(self, username: str, password: str) -> Dict[str, str]:
'''Get the API key for the given user.'''
to_post = {'username': username, 'password': password}
r = self.session.post(urljoin(self.root_url, str(Path('json', 'get_token'))), json=to_post)
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('json', 'get_token'))), json=to_post)
return r.json()

def init_apikey(self, username: Optional[str]=None, password: Optional[str]=None, apikey: Optional[str]=None) -> None:
Expand All @@ -284,7 +284,7 @@ def init_apikey(self, username: Optional[str]=None, password: Optional[str]=None

def misp_export(self, tree_uuid: str) -> Dict[str, Any]:
'''Export the capture in MISP format'''
r = self.session.get(urljoin(self.root_url, str(Path('json', tree_uuid, 'misp_export'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', tree_uuid, 'misp_export'))))
return r.json()

def misp_push(self, tree_uuid: str) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
Expand All @@ -293,86 +293,86 @@ def misp_push(self, tree_uuid: str) -> Union[Dict[str, Any], List[Dict[str, Any]
'''
if not self.apikey:
raise AuthError('You need to initialize the apikey to use this method (see init_apikey)')
r = self.session.get(urljoin(self.root_url, str(Path('json', tree_uuid, 'misp_push'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', tree_uuid, 'misp_push'))))
return r.json()

def trigger_modules(self, tree_uuid: str, force: bool=False) -> Dict[str, Any]:
'''Trigger all the available 3rd party modules on the given capture.
:param force: Trigger the modules even if they were already triggered today.
'''
to_send = {'force': force}
r = self.session.post(urljoin(self.root_url, str(Path('json', tree_uuid, 'trigger_modules'))),
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('json', tree_uuid, 'trigger_modules'))),
json=to_send)
return r.json()

def rebuild_capture(self, tree_uuid: str) -> Dict[str, str]:
'''Force rebuild a capture (requires an authenticated user, use init_apikey first)'''
if not self.apikey:
raise AuthError('You need to initialize the apikey to use this method (see init_apikey)')
r = self.session.get(urljoin(self.root_url, str(Path('admin', tree_uuid, 'rebuild'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('admin', tree_uuid, 'rebuild'))))
return r.json()

def hide_capture(self, tree_uuid: str) -> Dict[str, str]:
'''Hide a capture from the index page (requires an authenticated user, use init_apikey first)'''
if not self.apikey:
raise AuthError('You need to initialize the apikey to use this method (see init_apikey)')
r = self.session.get(urljoin(self.root_url, str(Path('admin', tree_uuid, 'hide'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('admin', tree_uuid, 'hide'))))
return r.json()

def get_redirects(self, capture_uuid: str) -> Dict[str, Any]:
'''Returns the initial redirects.
:param capture_uuid: UUID of the capture
'''
r = self.session.get(urljoin(self.root_url, str(Path('json', capture_uuid, 'redirects'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', capture_uuid, 'redirects'))))
return r.json()

def get_urls(self, capture_uuid: str) -> Dict[str, Any]:
'''Returns all the URLs seen during the capture.
:param capture_uuid: UUID of the capture
'''
r = self.session.get(urljoin(self.root_url, str(Path('json', capture_uuid, 'urls'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', capture_uuid, 'urls'))))
return r.json()

def get_hostnames(self, capture_uuid: str) -> Dict[str, Any]:
'''Returns all the hostnames seen during the capture.
:param capture_uuid: UUID of the capture
'''
r = self.session.get(urljoin(self.root_url, str(Path('json', capture_uuid, 'hostnames'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', capture_uuid, 'hostnames'))))
return r.json()

def get_screenshot(self, capture_uuid: str) -> BytesIO:
'''Returns the screenshot.
:param capture_uuid: UUID of the capture
'''
r = self.session.get(urljoin(self.root_url, str(Path('bin', capture_uuid, 'screenshot'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('bin', capture_uuid, 'screenshot'))))
return BytesIO(r.content)

def get_data(self, capture_uuid: str) -> BytesIO:
'''Returns the downloaded data.
:param capture_uuid: UUID of the capture
'''
r = self.session.get(urljoin(self.root_url, str(Path('bin', capture_uuid, 'data'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('bin', capture_uuid, 'data'))))
return BytesIO(r.content)

def get_cookies(self, capture_uuid: str) -> List[Dict[str, str]]:
'''Returns the complete cookies jar.
:param capture_uuid: UUID of the capture
'''
r = self.session.get(urljoin(self.root_url, str(Path('json', capture_uuid, 'cookies'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', capture_uuid, 'cookies'))))
return r.json()

def get_html(self, capture_uuid: str) -> StringIO:
'''Returns the rendered HTML as it would be in the browser after the page loaded.
:param capture_uuid: UUID of the capture
'''
r = self.session.get(urljoin(self.root_url, str(Path('tree', capture_uuid, 'html'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('tree', capture_uuid, 'html'))))
return StringIO(r.text)

def get_hashes(self, capture_uuid: str, algorithm: str='sha512', hashes_only: bool=True) -> StringIO:
Expand All @@ -384,23 +384,23 @@ def get_hashes(self, capture_uuid: str, algorithm: str='sha512', hashes_only: bo
'''
params: Dict[str, Union[str, int]] = {'algorithm': algorithm, 'hashes_only': int(hashes_only)}

r = self.session.get(urljoin(self.root_url, str(Path('json', capture_uuid, 'hashes'))), params=params)
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', capture_uuid, 'hashes'))), params=params)
return r.json()

def get_complete_capture(self, capture_uuid: str) -> BytesIO:
'''Returns a zip files that contains the screenshot, the har, the rendered HTML, and the cookies.
:param capture_uuid: UUID of the capture
'''
r = self.session.get(urljoin(self.root_url, str(Path('bin', capture_uuid, 'export'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('bin', capture_uuid, 'export'))))
return BytesIO(r.content)

def get_hash_occurrences(self, h: str) -> Dict[str, Any]:
'''Returns the base 64 body related the the hash, and a list of all the captures containing that hash.
:param h: sha512 to search
'''
r = self.session.get(urljoin(self.root_url, str(Path('json', 'hash_info', h))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', 'hash_info', h))))
return r.json()

def get_url_occurrences(self, url: str, limit: int=20, cached_captures_only: bool=True) -> Dict[str, Any]:
Expand All @@ -410,8 +410,8 @@ def get_url_occurrences(self, url: str, limit: int=20, cached_captures_only: boo
:param limit: The max amount of entries to return.
:param cached_captures_only: If False, Lookyloo will attempt to re-cache the missing captures. It might take some time.
'''
r = self.session.post(urljoin(self.root_url, str(Path('json', 'url_info'))), json={'url': url,
'limit': limit})
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('json', 'url_info'))),
json={'url': url, 'limit': limit})
return r.json()

def get_hostname_occurrences(self, hostname: str, with_urls_occurrences: bool=False, limit: int=20, cached_captures_only: bool=True) -> Dict[str, Any]:
Expand All @@ -422,23 +422,22 @@ def get_hostname_occurrences(self, hostname: str, with_urls_occurrences: bool=Fa
:param limit: The max amount of entries to return.
:param cached_captures_only: If False, Lookyloo will attempt to re-cache the missing captures. It might take some time.
'''
r = self.session.post(urljoin(self.root_url, str(Path('json', 'hostname_info'))), json={'hostname': hostname,
'with_urls_occurrences': with_urls_occurrences,
'limit': limit})
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('json', 'hostname_info'))),
json={'hostname': hostname, 'with_urls_occurrences': with_urls_occurrences, 'limit': limit})
return r.json()

def get_stats(self) -> Dict[str, Any]:
'''Returns all the captures contining the URL'''

r = self.session.get(urljoin(self.root_url, str(Path('json', 'stats'))))
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('json', 'stats'))))
return r.json()

def get_takedown_information(self, capture_uuid: str) -> Dict[str, Any]:
'''Returns information required to request a takedown for a capture
:param capture_uuid: UUID of the capture
'''
r = self.session.post(urljoin(self.root_url, str(Path('json', 'takedown'))),
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('json', 'takedown'))),
json={'capture_uuid': capture_uuid})
return r.json()

Expand All @@ -449,7 +448,7 @@ def compare_captures(self, capture_left: str, capture_right: str, /, *, compare_
:param capture_right: UUID of the capture to compare to
:param compare_settings: The settings for the comparison itself (what to ignore without marking the captures as different)
'''
r = self.session.post(urljoin(self.root_url, str(Path('json', 'compare_captures'))),
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('json', 'compare_captures'))),
json={'capture_left': capture_left,
'capture_right': capture_right,
'compare_settings': compare_settings})
Expand Down

0 comments on commit f8306d5

Please sign in to comment.