diff --git a/.env.example b/.env.example index c954aef..b876613 100644 --- a/.env.example +++ b/.env.example @@ -6,4 +6,7 @@ GOOGLE_API_KEY= TEMPERATURE= ## CTIBUTLER CTIBUTLER_BASE_URL= -CTIBUTLER_API_KEY= \ No newline at end of file +CTIBUTLER_API_KEY= +## VULMATCH +VULMATCH_BASE_URL= +VULMATCH_API_KEY= \ No newline at end of file diff --git a/.env.markdown b/.env.markdown index 0081dc2..079e8f0 100644 --- a/.env.markdown +++ b/.env.markdown @@ -24,4 +24,14 @@ txt2detection requires [ctibutler](https://github.com/muchdogesec/ctibutler) to * `CTIBUTLER_BASE_URL`: `'http://api.ctibutler.com'` (recommended) * If you are running CTI Butler locally, be sure to set `'http://host.docker.internal:8006/api/'` in the `.env` file otherwise you will run into networking errors. * `CTIBUTLER_BASE_URL`: - * If using `'http://api.ctibutler.com'`, [get your API key here](http://app.ctibutler.com). Can be left blank if running locally. \ No newline at end of file + * If using `'http://api.ctibutler.com'`, [get your API key here](http://app.ctibutler.com). Can be left blank if running locally. + + +## VULMATCH + +txt2detection requires [vulmatch](https://github.com/muchdogesec/vulmatch) to lookup CVE IDs + +* `VULMATCH_BASE_URL`: `'http://api.vulmatch.com'` (recommended) + * If you are running CTI Butler locally, be sure to set `'http://host.docker.internal:8005/api/'` in the `.env` file otherwise you will run into networking errors. +* `VULMATCH_BASE_URL`: + * If using `'http://api.vulmatch.com'`, [get your API key here](http://app.vulmatch.com). Can be left blank if running locally. \ No newline at end of file diff --git a/README.md b/README.md index affb352..578aabc 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ e.g. ```shell python3 txt2detection.py \ - --input_file tests/files/CVE-2024-1212.txt \ + --input_file tests/files/CVE-2024-56520.txt \ --name "lynx ransomware" \ --tlp_level green \ --labels label1,label2 \ diff --git a/tests/files/CVE-2024-1212.txt b/tests/files/CVE-2024-56520.txt similarity index 65% rename from tests/files/CVE-2024-1212.txt rename to tests/files/CVE-2024-56520.txt index dc75369..1052e40 100644 --- a/tests/files/CVE-2024-1212.txt +++ b/tests/files/CVE-2024-56520.txt @@ -1,4 +1,4 @@ -A detection rule for the potential exploitation of CVE-2024-1709 an unauthenticated command injection in Progress Kemp LoadMaster. +A detection rule for the potential exploitation of CVE-2024-56520 an unauthenticated command injection in Progress Kemp LoadMaster. It needs to look for GET requests to '/access/set' API with the parameters 'param=enableapi' and 'value=1' as well as an "Authorization" header with a base64 encoded value with an uncommon character. diff --git a/txt2detection/ai_extractor/utils.py b/txt2detection/ai_extractor/utils.py index 1062bb2..038b347 100644 --- a/txt2detection/ai_extractor/utils.py +++ b/txt2detection/ai_extractor/utils.py @@ -14,6 +14,7 @@ class Detection(BaseModel): rule: str = Field(description="The detection rule in format described, escape properly.") indicator_types: list[str] mitre_attack_ids: list[str] + cve_ids : list[str] = Field(description="CVE ID that matches the rule or is explicitly stated in the input", examples=["CVE-2019-1234", "CVE-2024-41312"]) class DetectionContainer(BaseModel): diff --git a/txt2detection/bundler.py b/txt2detection/bundler.py index a58675b..c90a473 100644 --- a/txt2detection/bundler.py +++ b/txt2detection/bundler.py @@ -230,29 +230,32 @@ def add_rule_indicator(self, detection: Detection): "pattern": detection.rule, "valid_from": self.report.created, "object_marking_refs": self.report.object_marking_refs, - "external_references": [ - dict(source_name="mitre_attack_ids", description=detection.mitre_attack_ids) - ] + # "external_references": [ + # dict(source_name="mitre_attack_ids", description=detection.mitre_attack_ids), + # dict(source_name="cve-ids", description=detection.cve_ids), + # ] } self.add_ref(indicator) for obj in self.get_attack_objects(detection.mitre_attack_ids): self.add_ref(obj) - self.add_relation(indicator, obj) + self.add_relation(indicator, obj, 'mitre-attack') + for obj in self.get_cve_objects(detection.cve_ids): + self.add_ref(obj) + self.add_relation(indicator, obj, 'nvd-cve') - def add_relation(self, indicator, attack): + def add_relation(self, indicator, target_object, type='mitre-attack'): rel = Relationship( - id="relationship--" - + str( + id="relationship--" + str( uuid.uuid5( - UUID_NAMESPACE, f"mitre-attack+{indicator['id']}+{attack['id']}" + UUID_NAMESPACE, f"{type}+{indicator['id']}+{target_object['id']}" ) ), source_ref=indicator['id'], - target_ref=attack['id'], - relationship_type="mitre-attack", + target_ref=target_object['id'], + relationship_type=type, created_by_ref=self.report.created_by_ref, - description=f"{indicator['name']} is linked to {attack['external_references'][0]['external_id']} ({attack['name']})", + description=f"{indicator['name']} is linked to {target_object['external_references'][0]['external_id']} ({target_object['name']})", created=self.report.created, modified=self.report.modified, object_marking_refs=self.report.object_marking_refs, @@ -267,14 +270,28 @@ def get_attack_objects(self, attack_ids): logger.debug(f"retrieving attack objects: {attack_ids}") endpoint = urljoin(os.environ['CTIBUTLER_BASE_URL'] + '/', f"v1/attack-enterprise/objects/?attack_id="+','.join(attack_ids)) - s = requests.Session() + headers = {} if api_key := os.environ.get('CTIBUTLER_API_KEY'): - s.headers['API-KEY'] = api_key - + headers['API-KEY'] = api_key + + return self._get_objects(endpoint, headers) + + + def get_cve_objects(self, cve_ids): + logger.debug(f"retrieving cve objects: {cve_ids}") + endpoint = urljoin(os.environ['VULMATCH_BASE_URL'] + '/', f"v1/cve/objects/?cve_id="+','.join(cve_ids)) + headers = {} + if api_key := os.environ.get('VULMATCH_API_KEY'): + headers['API-KEY'] = api_key + + return self._get_objects(endpoint, headers) + + + def _get_objects(self, endpoint, headers): data = [] page = 1 while True: - resp = s.get(endpoint, params=dict(page=page, page_size=1000)) + resp = requests.get(endpoint, params=dict(page=page, page_size=1000), headers=headers) if resp.status_code != 200: break d = resp.json()