Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add some test_utils to experiment with #38

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ _scrapes/
.coverage
htmlcov/
site/
_test_responses/
2 changes: 1 addition & 1 deletion src/spatula/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(

def get_response(
self, scraper: scrapelib.Scraper
) -> Optional[requests.models.Response]:
) -> requests.models.Response:
return scraper.request(
method=self.method,
url=self.url,
Expand Down
119 changes: 119 additions & 0 deletions src/spatula/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
import re
import requests
import scrapelib
import warnings
from pathlib import Path
from typing import Optional, Union
from .pages import Page
from .sources import URL


class SpatulaTestError(Exception):
pass


class CachedTestURL(URL):
def __init__(
self,
url: str,
method: str = "GET",
data: dict = None,
headers: dict = None,
verify: bool = True,
timeout: Optional[float] = None,
retries: Optional[int] = None,
response: Union[requests.models.Response, str, None] = None,
):
"""
Defines a resource to fetch via URL, particularly useful for handling non-GET
requests.

:param url: URL to fetch
:param method: HTTP method to use, defaults to "GET"
:param data: POST data to include in request body.
:param headers: dictionary of HTTP headers to set for the request.
:param verify: bool indicating whether or not to verify SSL certificates for request, defaults to True
:param timeout: HTTP(S) timeout in seconds
:param retries: number of retries to make
:param response: optional response to use instead of fetching
"""
super().__init__(
url=url,
method=method,
data=data,
headers=headers,
verify=verify,
timeout=timeout,
retries=retries,
)
# convert string responses to simple Response objects
if isinstance(response, str):
resp = requests.models.Response()
resp._content = response.encode()
response = resp
self.response = response

@classmethod
def from_url(cls, url: Union[URL, str]) -> "CachedTestURL":
if isinstance(url, str):
url = URL(url)
return cls(
url=url.url,
method=url.method,
data=url.data,
headers=url.headers,
verify=url.verify,
timeout=url.timeout,
retries=url.retries,
)

def get_response(
self, scraper: scrapelib.Scraper
) -> requests.models.Response:
if self.response:
return self.response
path = _source_to_test_path(self)
if path.exists():
resp = requests.models.Response()
resp._content = path.read_bytes()
return resp
else:
if os.environ.get("SPATULA_TEST_ALLOW_FETCH") == "1":
warnings.warn(f"spatula test fetching {self} -> {path}")
response = super().get_response(scraper)
if not path.parent.exists():
path.parent.mkdir()
with path.open("w") as cf:
cf.write(response.text)
resp = requests.models.Response()
resp._content = response.content
return resp
else:
warnings.warn("Set SPATULA_TEST_ALLOW_FETCH=1 to allow fetching.")
raise SpatulaTestError(f"spatula test missing {self} @ {path}")

def __repr__(self) -> str:
return f"CachedTestURL({self.url})"


def _source_to_test_path(source: URL) -> Path:
# TODO: do this the right way
clean_url = re.sub(r"[:/]", "_", source.url)
return Path(__file__).parent / "_test_responses" / clean_url


def cached_page_response(page: Page) -> Page:
scraper = scrapelib.Scraper()
if isinstance(page.source, URL):
page.source = CachedTestURL.from_url(page.source)
page._fetch_data(scraper)
page.postprocess_response()
return page


def cached_page_items(page: Page) -> list:
if isinstance(page.source, URL):
page.source = CachedTestURL.from_url(page.source)
items = list(page.do_scrape())
return items