Skip to content

Commit

Permalink
Implement additional User/Workspace fields (#7)
Browse files Browse the repository at this point in the history
* Implement additional User/Workspace fields

* Refactor value retrieval

* Compare datetime without microseconds

* Syntax
  • Loading branch information
lloesche authored Oct 1, 2024
1 parent 94d34fd commit 1b3943c
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 56 deletions.
3 changes: 2 additions & 1 deletion fixattiosync/attiodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def _request(
endpoint: str,
json: Optional[dict[str, Any]] = None,
params: Optional[dict[str, str]] = None,
timeout: int = 10,
) -> dict[str, Any]:
url = self.base_url + endpoint
headers = self._headers(json=bool(json))
Expand All @@ -47,7 +48,7 @@ def _request(
action_str = action_strings.get(method.upper(), f"Requesting data via {method} from")

log.debug(f"{action_str} {url}")
response = requests.request(method, url, headers=headers, json=json, params=params)
response = requests.request(method, url, headers=headers, json=json, params=params, timeout=timeout)

if response.status_code == 200:
return response.json() # type: ignore
Expand Down
114 changes: 69 additions & 45 deletions fixattiosync/attioresources.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from uuid import UUID
from typing import Optional, Self, Type, ClassVar, Any
from enum import Enum
from .logger import log


def get_latest_value(value: list[dict[str, Any]]) -> dict[str, Any]:
if value and len(value) > 0:
return value[0]
return {}
def get_nested_field(values_dict: dict[str, Any], key: str, field_path: list[str], default: Any = None) -> Any:
items = values_dict.get(key, [{}])
if items and isinstance(items, list) and len(items) > 0:
data = items[0]
for f in field_path:
if isinstance(data, dict):
data = data.get(f, default)
else:
return default
return data
return default


def optional_uuid(value: str) -> Optional[UUID]:
Expand Down Expand Up @@ -77,22 +84,13 @@ def make(cls: Type[Self], data: dict[str, Any]) -> Self:

values = data.get("values", {})

name_info = get_latest_value(values.get("name", [{}]))
name = name_info.get("value")

product_tier_info = get_latest_value(values.get("product_tier", [{}]))
product_tier = product_tier_info.get("option", {}).get("title")

status_info = get_latest_value(values.get("status", [{}]))
status = status_info.get("status", {}).get("title")

fix_workspace_id_info = get_latest_value(values.get("workspace_id", [{}]))
fix_workspace_id = optional_uuid(str(fix_workspace_id_info.get("value")))
name = get_nested_field(values, "name", ["value"])
product_tier = get_nested_field(values, "product_tier", ["option", "title"])
status = get_nested_field(values, "status", ["status", "title"])
fix_workspace_id = optional_uuid(str(get_nested_field(values, "workspace_id", ["value"])))
if fix_workspace_id is None:
log.error(f"Fix workspace ID not found for {record_id}: {data}")

cloud_account_connected_info = get_latest_value(values.get("cloud_account_connected", [{}]))
cloud_account_connected = cloud_account_connected_info.get("value")
cloud_account_connected = get_nested_field(values, "cloud_account_connected", ["value"])

cls_data = {
"id": fix_workspace_id,
Expand Down Expand Up @@ -132,19 +130,12 @@ def make(cls: Type[Self], data: dict[str, Any]) -> Self:

values = data["values"]

name_info = get_latest_value(values.get("name", [{}]))
full_name = name_info.get("full_name")
first_name = name_info.get("first_name")
last_name = name_info.get("last_name")

email_info = get_latest_value(values.get("email_addresses", [{}]))
email_address = email_info.get("email_address")

job_title_info = get_latest_value(values.get("job_title", [{}]))
job_title = job_title_info.get("value")

linkedin_info = get_latest_value(values.get("linkedin", [{}]))
linkedin = linkedin_info.get("value")
full_name = get_nested_field(values, "name", ["full_name"])
first_name = get_nested_field(values, "name", ["first_name"])
last_name = get_nested_field(values, "name", ["last_name"])
email_address = get_nested_field(values, "email_addresses", ["email_address"])
job_title = get_nested_field(values, "job_title", ["value"])
linkedin = get_nested_field(values, "linkedin", ["value"])

cls_data = {
"object_id": object_id,
Expand All @@ -159,7 +150,7 @@ def make(cls: Type[Self], data: dict[str, Any]) -> Self:
"linkedin": linkedin,
}

return cls(**cls_data) # type: ignore
return cls(**cls_data)


@dataclass
Expand All @@ -177,44 +168,72 @@ class AttioUser(AttioResource):
workspace_refs: Optional[list[UUID]] = None
person: Optional[AttioPerson] = None
workspaces: list[AttioWorkspace] = field(default_factory=list)
user_email_notifications_disabled: Optional[bool] = None
at_least_one_cloud_account_connected: Optional[bool] = None
is_main_user_in_at_least_one_workspace: Optional[bool] = None
cloud_account_connected_workspace_name: Optional[str] = None
workspace_has_subscription: Optional[bool] = None

def __eq__(self: Self, other: Any) -> bool:
if (
not hasattr(other, "id")
or not hasattr(other, "email")
or not hasattr(other, "registered_at")
or not isinstance(self.registered_at, datetime)
or not isinstance(other.registered_at, datetime)
or not hasattr(other, "workspaces")
or not isinstance(other.workspaces, list)
or not hasattr(other, "user_email_notifications_disabled")
or not hasattr(other, "at_least_one_cloud_account_connected")
or not hasattr(other, "is_main_user_in_at_least_one_workspace")
or not hasattr(other, "cloud_account_connected_workspace_name")
or not hasattr(other, "workspace_has_subscription")
):
return False
return bool(
self.id == other.id
and str(self.email).lower() == str(other.email).lower()
and self.registered_at.astimezone(timezone.utc) == other.registered_at.astimezone(timezone.utc)
and {w.id for w in self.workspaces} == {w.id for w in other.workspaces}
and self.user_email_notifications_disabled == other.user_email_notifications_disabled
and self.at_least_one_cloud_account_connected == other.at_least_one_cloud_account_connected
and self.is_main_user_in_at_least_one_workspace == other.is_main_user_in_at_least_one_workspace
and self.cloud_account_connected_workspace_name == other.cloud_account_connected_workspace_name
and self.workspace_has_subscription == other.workspace_has_subscription
)

@classmethod
def make(cls: Type[Self], data: dict[str, Any]) -> Self:
object_id = UUID(data["id"]["object_id"])
record_id = UUID(data["id"]["record_id"])
workspace_id = UUID(data["id"]["workspace_id"])
created_at = datetime.fromisoformat(data["created_at"].rstrip("Z"))
created_at = datetime.fromisoformat(data["created_at"])

values = data.get("values", {})

email_info = get_latest_value(values.get("primary_email_address", [{}]))
primary_email_address = email_info.get("email_address")

status_info = get_latest_value(values.get("status", [{}]))
status = status_info.get("status", {}).get("title")
registered_at = get_nested_field(values, "registered_at", ["value"])
if registered_at:
registered_at = datetime.fromisoformat(registered_at).replace(microsecond=0)

primary_email_address = get_nested_field(values, "primary_email_address", ["email_address"])
status = get_nested_field(values, "status", ["status", "title"])
user_id = optional_uuid(str(get_nested_field(values, "user_id", ["value"])))
person_id = optional_uuid(str(get_nested_field(values, "person", ["target_record_id"])))
user_email_notifications_disabled = get_nested_field(values, "user_email_notifications_disabled", ["value"])
at_least_one_cloud_account_connected = get_nested_field(
values, "at_least_one_cloud_account_connected", ["value"]
)
is_main_user_in_at_least_one_workspace = get_nested_field(
values, "is_main_user_in_at_least_one_workspace", ["value"]
)
cloud_account_connected_workspace_name = get_nested_field(
values, "cloud_account_connected_workspace_name", ["value"]
)
workspace_has_subscription = get_nested_field(values, "workspace_has_subscription", ["value"])

user_id_info = get_latest_value(values.get("user_id", [{}]))
user_id = optional_uuid(user_id_info["value"])
if user_id is None:
log.error(f"Fix user ID not found for {record_id}: {data}")

person_info = get_latest_value(values.get("person", [{}]))
person_id = optional_uuid(str(person_info.get("target_record_id")))

workspace_refs = None
workspace_info = values.get("workspace", [])
for workspace in workspace_info:
Expand All @@ -231,11 +250,16 @@ def make(cls: Type[Self], data: dict[str, Any]) -> Self:
"created_at": created_at,
"demo_workspace_viewed": None,
"email": primary_email_address,
"registered_at": None,
"registered_at": registered_at,
"status": status,
"user_id": user_id,
"person_id": person_id,
"workspace_refs": workspace_refs,
"user_email_notifications_disabled": user_email_notifications_disabled,
"at_least_one_cloud_account_connected": at_least_one_cloud_account_connected,
"is_main_user_in_at_least_one_workspace": is_main_user_in_at_least_one_workspace,
"cloud_account_connected_workspace_name": cloud_account_connected_workspace_name,
"workspace_has_subscription": workspace_has_subscription,
}

return cls(**cls_data)
22 changes: 15 additions & 7 deletions fixattiosync/fixdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from uuid import UUID
from argparse import ArgumentParser
from .logger import log
from .fixresources import FixUser, FixWorkspace, FixCloudAccount
from .fixresources import FixUser, FixWorkspace, FixCloudAccount, FixRoles
from typing import Optional


Expand Down Expand Up @@ -66,16 +66,21 @@ def hydrate(self) -> None:
workspace = FixWorkspace(**row)
self.__workspaces[workspace.id] = workspace
with self.conn.cursor(row_factory=dict_row) as cursor:
cursor.execute('SELECT * FROM public."organization_owners";')
cursor.execute('SELECT * FROM public."user_role_assignment";')
rows = cursor.fetchall()
for row in rows:
self.__workspaces[row["organization_id"]].owner = self.__users[row["user_id"]]
user = self.__users[row["user_id"]]
workspace = self.__workspaces[row["workspace_id"]]
roles = FixRoles(row["role_names"])
user.workspaces.append(workspace)
workspace.users.append(user)
user.workspace_roles[workspace.id] = roles
workspace.user_roles[user.id] = roles
with self.conn.cursor(row_factory=dict_row) as cursor:
cursor.execute('SELECT * FROM public."organization_members";')
cursor.execute('SELECT * FROM public."organization_owners";')
rows = cursor.fetchall()
for row in rows:
self.__workspaces[row["organization_id"]].users.append(self.__users[row["user_id"]])
self.__users[row["user_id"]].workspaces.append(self.__workspaces[row["organization_id"]])
self.__workspaces[row["organization_id"]].owner = self.__users[row["user_id"]]
with self.conn.cursor(row_factory=dict_row) as cursor:
cursor.execute('SELECT * FROM public."cloud_account";')
rows = cursor.fetchall()
Expand All @@ -84,9 +89,12 @@ def hydrate(self) -> None:
self.__cloud_accounts[cloud_account.id] = cloud_account
if cloud_account.tenant_id in self.__workspaces:
self.__workspaces[cloud_account.tenant_id].cloud_accounts.append(cloud_account)
self.__workspaces[cloud_account.tenant_id].update_status()
else:
log.error(f"Data error: cloud account {cloud_account.id} does not have a workspace")
for workspace in self.__workspaces.values():
workspace.update_info()
for user in self.__users.values():
user.update_info()
except psycopg.Error as e:
log.error(f"Error fetching data: {e}")
sys.exit(2)
Expand Down
Loading

0 comments on commit 1b3943c

Please sign in to comment.