Skip to content

Commit

Permalink
Detect CVE ids (#13)
Browse files Browse the repository at this point in the history
* add cve-id detection and lookup via vulmatch #11

* updating cve for testing

---------

Co-authored-by: Fadl <[email protected]>
  • Loading branch information
himynamesdave and fqrious authored Jan 10, 2025
1 parent c0ec885 commit f928abd
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 19 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ GOOGLE_API_KEY=
TEMPERATURE=
## CTIBUTLER
CTIBUTLER_BASE_URL=
CTIBUTLER_API_KEY=
CTIBUTLER_API_KEY=
## VULMATCH
VULMATCH_BASE_URL=
VULMATCH_API_KEY=
12 changes: 11 additions & 1 deletion .env.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,14 @@ txt2detection requires [ctibutler](https://github.com/muchdogesec/ctibutler) to
* `CTIBUTLER_BASE_URL`: `'http://api.ctibutler.com'` (recommended)
* If you are running CTI Butler locally, be sure to set `'http://host.docker.internal:8006/api/'` in the `.env` file otherwise you will run into networking errors.
* `CTIBUTLER_BASE_URL`:
* If using `'http://api.ctibutler.com'`, [get your API key here](http://app.ctibutler.com). Can be left blank if running locally.
* If using `'http://api.ctibutler.com'`, [get your API key here](http://app.ctibutler.com). Can be left blank if running locally.


## VULMATCH

txt2detection requires [vulmatch](https://github.com/muchdogesec/vulmatch) to lookup CVE IDs

* `VULMATCH_BASE_URL`: `'http://api.vulmatch.com'` (recommended)
* If you are running CTI Butler locally, be sure to set `'http://host.docker.internal:8005/api/'` in the `.env` file otherwise you will run into networking errors.
* `VULMATCH_BASE_URL`:
* If using `'http://api.vulmatch.com'`, [get your API key here](http://app.vulmatch.com). Can be left blank if running locally.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ e.g.

```shell
python3 txt2detection.py \
--input_file tests/files/CVE-2024-1212.txt \
--input_file tests/files/CVE-2024-56520.txt \
--name "lynx ransomware" \
--tlp_level green \
--labels label1,label2 \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
A detection rule for the potential exploitation of CVE-2024-1709 an unauthenticated command injection in Progress Kemp LoadMaster.
A detection rule for the potential exploitation of CVE-2024-56520 an unauthenticated command injection in Progress Kemp LoadMaster.

It needs to look for GET requests to '/access/set' API with the parameters 'param=enableapi' and 'value=1' as well as an "Authorization" header with a base64 encoded value with an uncommon character.

Expand Down
1 change: 1 addition & 0 deletions txt2detection/ai_extractor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Detection(BaseModel):
rule: str = Field(description="The detection rule in format described, escape properly.")
indicator_types: list[str]
mitre_attack_ids: list[str]
cve_ids : list[str] = Field(description="CVE ID that matches the rule or is explicitly stated in the input", examples=["CVE-2019-1234", "CVE-2024-41312"])


class DetectionContainer(BaseModel):
Expand Down
47 changes: 32 additions & 15 deletions txt2detection/bundler.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,29 +230,32 @@ def add_rule_indicator(self, detection: Detection):
"pattern": detection.rule,
"valid_from": self.report.created,
"object_marking_refs": self.report.object_marking_refs,
"external_references": [
dict(source_name="mitre_attack_ids", description=detection.mitre_attack_ids)
]
# "external_references": [
# dict(source_name="mitre_attack_ids", description=detection.mitre_attack_ids),
# dict(source_name="cve-ids", description=detection.cve_ids),
# ]
}
self.add_ref(indicator)
for obj in self.get_attack_objects(detection.mitre_attack_ids):
self.add_ref(obj)
self.add_relation(indicator, obj)
self.add_relation(indicator, obj, 'mitre-attack')

for obj in self.get_cve_objects(detection.cve_ids):
self.add_ref(obj)
self.add_relation(indicator, obj, 'nvd-cve')

def add_relation(self, indicator, attack):
def add_relation(self, indicator, target_object, type='mitre-attack'):
rel = Relationship(
id="relationship--"
+ str(
id="relationship--" + str(
uuid.uuid5(
UUID_NAMESPACE, f"mitre-attack+{indicator['id']}+{attack['id']}"
UUID_NAMESPACE, f"{type}+{indicator['id']}+{target_object['id']}"
)
),
source_ref=indicator['id'],
target_ref=attack['id'],
relationship_type="mitre-attack",
target_ref=target_object['id'],
relationship_type=type,
created_by_ref=self.report.created_by_ref,
description=f"{indicator['name']} is linked to {attack['external_references'][0]['external_id']} ({attack['name']})",
description=f"{indicator['name']} is linked to {target_object['external_references'][0]['external_id']} ({target_object['name']})",
created=self.report.created,
modified=self.report.modified,
object_marking_refs=self.report.object_marking_refs,
Expand All @@ -267,14 +270,28 @@ def get_attack_objects(self, attack_ids):
logger.debug(f"retrieving attack objects: {attack_ids}")
endpoint = urljoin(os.environ['CTIBUTLER_BASE_URL'] + '/', f"v1/attack-enterprise/objects/?attack_id="+','.join(attack_ids))

s = requests.Session()
headers = {}
if api_key := os.environ.get('CTIBUTLER_API_KEY'):
s.headers['API-KEY'] = api_key

headers['API-KEY'] = api_key

return self._get_objects(endpoint, headers)


def get_cve_objects(self, cve_ids):
logger.debug(f"retrieving cve objects: {cve_ids}")
endpoint = urljoin(os.environ['VULMATCH_BASE_URL'] + '/', f"v1/cve/objects/?cve_id="+','.join(cve_ids))
headers = {}
if api_key := os.environ.get('VULMATCH_API_KEY'):
headers['API-KEY'] = api_key

return self._get_objects(endpoint, headers)


def _get_objects(self, endpoint, headers):
data = []
page = 1
while True:
resp = s.get(endpoint, params=dict(page=page, page_size=1000))
resp = requests.get(endpoint, params=dict(page=page, page_size=1000), headers=headers)
if resp.status_code != 200:
break
d = resp.json()
Expand Down

0 comments on commit f928abd

Please sign in to comment.