406 строки
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
		
			Исполняемый файл
		
	
	
	
	
			
		
		
	
	
			406 строки
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
		
			Исполняемый файл
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| Generate DISARM MISP galaxy
 | |
|     see also https://github.com/MISP/misp-galaxy
 | |
| 
 | |
| Author: Christophe Vandeplas
 | |
| License: AGPL-3
 | |
| """
 | |
| 
 | |
| from generate_DISARM_pages import Disarm
 | |
| import json
 | |
| import uuid
 | |
| import os
 | |
| 
 | |
| 
 | |
| DISARM_DESCRIPTION = 'DISARM is a framework designed for describing and understanding disinformation incidents.'
 | |
| DISARM_CATEGORY = 'disarm'
 | |
| DISARM_AUTHORS = ['DISARM Project']
 | |
| DISARM_SOURCE = 'https://github.com/DISARMFoundation/DISARMframeworks'
 | |
| CORE_UUID = "9d6bd9d2-2cd3-4900-b61a-06cd64df3996"
 | |
| 
 | |
| 
 | |
| class DisarmGalaxy:
 | |
|     def __init__(self, out_path=os.path.join('..', '..', 'misp-galaxy')):
 | |
|         self.disarm = Disarm()
 | |
|         self.out_path = out_path
 | |
| 
 | |
|         self.galaxy_types = ['techniques', 'countermeasures', 'detections', 'actortypes']
 | |
| 
 | |
|     def generate_all_galaxies(self):
 | |
|         for galaxy_type in self.galaxy_types:
 | |
|             getattr(self, f'generate_{galaxy_type}_galaxy')()  # also saves the files
 | |
| 
 | |
|     def generate_all_clusters(self):
 | |
|         for galaxy_type in self.galaxy_types:
 | |
|             getattr(self, f'generate_{galaxy_type}_clusters')()
 | |
| 
 | |
|     def write_json_file(self, fname, file_data):
 | |
|         with open(fname, 'w') as f:
 | |
|             json.dump(file_data, f, indent=2, sort_keys=True, ensure_ascii=False)
 | |
|             f.write('\n')
 | |
| 
 | |
|     def generate_techniques_galaxy(self):
 | |
|         galaxy_type = 'techniques'
 | |
|         galaxy = {'name': 'Techniques',
 | |
|                   'type': f'disarm-{galaxy_type}',
 | |
|                   'description': DISARM_DESCRIPTION,
 | |
|                   'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-galaxy-{galaxy_type}')),
 | |
|                   'version': 1,
 | |
|                   'icon': 'map',
 | |
|                   'namespace': 'disarm',
 | |
|                   'kill_chain_order': {
 | |
|                       'tactics': []
 | |
|                   }}
 | |
| 
 | |
|         for k, v in self.disarm.tactics.items():
 | |
|             galaxy['kill_chain_order']['tactics'].append(f'{v}')
 | |
| 
 | |
|         self.write_json_file(os.path.join(self.out_path, 'galaxies', f'disarm-{galaxy_type}.json'), galaxy)
 | |
| 
 | |
|     def generate_techniques_clusters(self):
 | |
|         galaxy_type = 'techniques'
 | |
|         cluster = {'authors': DISARM_AUTHORS,
 | |
|                    'category': DISARM_CATEGORY,
 | |
|                    'description': DISARM_DESCRIPTION,
 | |
|                    'name': 'Techniques',
 | |
|                    'source': DISARM_SOURCE,
 | |
|                    'type': f'disarm-{galaxy_type}',
 | |
|                    'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-cluster-{galaxy_type}')),
 | |
|                    'values': [],
 | |
|                    'version': 1}
 | |
|         values = []
 | |
|         seen_values = []
 | |
|         df = self.disarm.df_techniques
 | |
|         for i in range(len(df)):
 | |
|             if df.values[i][1] in seen_values:  # remove duplicates
 | |
|                 continue
 | |
|             seen_values.append(df.values[i][1])
 | |
| 
 | |
|             entry_id = df.values[i][0]
 | |
|             kill_chain = [f'tactics:{self.disarm.tactics[df.values[i][3]]}']
 | |
|             related = []
 | |
|             # Countermeasures relations
 | |
|             mapping = self.disarm.cross_counterid_techniqueid[
 | |
|                 self.disarm.cross_counterid_techniqueid['technique_id'] == entry_id]
 | |
|             for index, row in mapping.sort_values('disarm_id').iterrows():
 | |
|                 related_id = row['disarm_id']
 | |
|                 related.append({
 | |
|                   "dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
 | |
|                   "type": "blocked-by"  # mitigated-by would be cleaner, but does not exist as relationship type
 | |
|                 })
 | |
|             # Detections relations
 | |
|             mapping = self.disarm.cross_detectionid_techniqueid[
 | |
|                 self.disarm.cross_detectionid_techniqueid['technique_id'] == entry_id]
 | |
|             for index, row in mapping.sort_values('disarm_id').iterrows():
 | |
|                 related_id = row['disarm_id']
 | |
|                 related.append({
 | |
|                     "dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
 | |
|                     "type": "detected-by"
 | |
|                 })
 | |
| 
 | |
|             value = {
 | |
|                 'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), entry_id)),
 | |
|                 'value': df.values[i][1],
 | |
|                 'description': df.values[i][4],
 | |
|                 'meta': {
 | |
|                     'external_id': entry_id,
 | |
|                     'kill_chain': kill_chain,
 | |
|                     'refs': [
 | |
|                         f'https://github.com/DISARMFoundation/DISARMframeworks/blob/main/generated_pages/{galaxy_type}/{entry_id}.md'
 | |
|                     ]
 | |
|                 },
 | |
|                 'related': related
 | |
|             }
 | |
|             values.append(value)
 | |
| 
 | |
|         cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
 | |
|         self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
 | |
| 
 | |
|     def generate_countermeasures_galaxy(self):
 | |
|         galaxy_type = 'countermeasures'
 | |
|         galaxy = {'name': 'Countermeasures',
 | |
|                   'type': f'disarm-{galaxy_type}',
 | |
|                   'description': DISARM_DESCRIPTION,
 | |
|                   'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-galaxy-{galaxy_type}')),
 | |
|                   'version': 1,
 | |
|                   'icon': 'shield-alt',
 | |
|                   'namespace': 'disarm',
 | |
|                   'kill_chain_order': {
 | |
|                       'tactics': [],
 | |
|                       'responsetypes': [],
 | |
|                       'metatechniques': []
 | |
|                   }}
 | |
| 
 | |
|         for k, v in self.disarm.tactics.items():
 | |
|             galaxy['kill_chain_order']['tactics'].append(f'{v}')
 | |
|         for k, v in self.disarm.responsetypes.items():
 | |
|             galaxy['kill_chain_order']['responsetypes'].append(f'{v}')
 | |
|         for k, v in self.disarm.metatechniques.items():
 | |
|             galaxy['kill_chain_order']['metatechniques'].append(f'{v}')
 | |
| 
 | |
|         self.write_json_file(os.path.join(self.out_path, 'galaxies', f'disarm-{galaxy_type}.json'), galaxy)
 | |
| 
 | |
|     def generate_countermeasures_clusters(self):
 | |
|         galaxy_type = 'countermeasures'
 | |
|         cluster = {'authors': DISARM_AUTHORS,
 | |
|                    'category': DISARM_CATEGORY,
 | |
|                    'description': DISARM_DESCRIPTION,
 | |
|                    'name': 'Countermeasures',
 | |
|                    'source': DISARM_SOURCE,
 | |
|                    'type': f'disarm-{galaxy_type}',
 | |
|                    'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-cluster-{galaxy_type}')),
 | |
|                    'values': [],
 | |
|                    'version': 1}
 | |
|         values = []
 | |
|         seen_values = []
 | |
|         df = self.disarm.df_counters
 | |
|         for i in range(len(df)):
 | |
|             if df.values[i][1] in seen_values:  # remove duplicates
 | |
|                 continue
 | |
|             seen_values.append(df.values[i][1])
 | |
| 
 | |
|             entry_id = df.values[i][0]
 | |
|             kill_chain = []
 | |
|             if self.disarm.tactics[df.values[i][15]]:
 | |
|                 kill_chain.append(f'tactics:{self.disarm.tactics[df.values[i][15]]}')
 | |
|             if self.disarm.responsetypes[df.values[i][10]]:
 | |
|                 kill_chain.append(f'responsetypes:{self.disarm.responsetypes[df.values[i][10]]}')
 | |
|             if self.disarm.metatechniques[df.values[i][17]]:
 | |
|                 kill_chain.append(f'metatechniques:{self.disarm.metatechniques[df.values[i][17]]}')
 | |
| 
 | |
|             related = []
 | |
|             # Techniques relations
 | |
|             mapping = self.disarm.cross_counterid_techniqueid[
 | |
|                 self.disarm.cross_counterid_techniqueid['disarm_id'] == entry_id]
 | |
|             for index, row in mapping.sort_values('technique_id').iterrows():
 | |
|                 related_id = row['technique_id']
 | |
|                 related.append({
 | |
|                     "dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
 | |
|                     "type": "blocks"  # mitigated would be cleaner, but mitigated-by does not exist as relationship type
 | |
|                 })
 | |
|             # Actortype relations
 | |
|             mapping = self.disarm.cross_counterid_actortypeid[
 | |
|                 self.disarm.cross_counterid_actortypeid['disarm_id'] == entry_id]
 | |
|             for index, row in mapping.sort_values('actortype_id').iterrows():
 | |
|                 related_id = row['actortype_id']
 | |
|                 related.append({
 | |
|                     "dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
 | |
|                     "type": "affected-by"
 | |
|                     # mitigated-by would be cleaner, but mitigated-by does not exist as relationship type
 | |
|                 })
 | |
| 
 | |
|             value = {
 | |
|                 'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), entry_id)),
 | |
|                 'value': df.values[i][1],
 | |
|                 'description': df.values[i][3],
 | |
|                 'meta': {
 | |
|                     'external_id': entry_id,
 | |
|                     'kill_chain': kill_chain,
 | |
|                     'refs': [
 | |
|                         f'https://github.com/DISARMFoundation/DISARMframeworks/blob/main/generated_pages/counters/{entry_id}.md'
 | |
|                     ]
 | |
|                 },
 | |
|                 'related': related
 | |
|             }
 | |
|             values.append(value)
 | |
| 
 | |
|         cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
 | |
|         self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
 | |
| 
 | |
|     def generate_detections_galaxy(self):
 | |
|         galaxy_type = 'detections'
 | |
|         galaxy = {'name': 'Detections',
 | |
|                   'type': f'disarm-{galaxy_type}',
 | |
|                   'description': DISARM_DESCRIPTION,
 | |
|                   'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-galaxy-{galaxy_type}')),
 | |
|                   'version': 1,
 | |
|                   'icon': 'bell',
 | |
|                   'namespace': 'disarm',
 | |
|                   'kill_chain_order': {
 | |
|                       'tactics': [],
 | |
|                       'responsetypes': [],
 | |
|                       # 'metatechniques': []
 | |
|                   }}
 | |
| 
 | |
|         for k, v in self.disarm.tactics.items():
 | |
|             galaxy['kill_chain_order']['tactics'].append(f'{v}')
 | |
|         for k, v in self.disarm.responsetypes.items():
 | |
|             galaxy['kill_chain_order']['responsetypes'].append(f'{v}')
 | |
|         # for k, v in self.disarm.metatechniques.items():
 | |
|         #     galaxy['kill_chain_order']['metatechniques'].append(f'{v}')
 | |
| 
 | |
|         self.write_json_file(os.path.join(self.out_path, 'galaxies', f'disarm-{galaxy_type}.json'), galaxy)
 | |
| 
 | |
|     def generate_detections_clusters(self):
 | |
|         galaxy_type = 'detections'
 | |
|         cluster = {'authors': DISARM_AUTHORS,
 | |
|                    'category': DISARM_CATEGORY,
 | |
|                    'description': DISARM_DESCRIPTION,
 | |
|                    'name': 'Detections',
 | |
|                    'source': DISARM_SOURCE,
 | |
|                    'type': f'disarm-{galaxy_type}',
 | |
|                    'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-cluster-{galaxy_type}')),
 | |
|                    'values': [],
 | |
|                    'version': 1}
 | |
|         values = []
 | |
|         seen_values = []
 | |
|         df = self.disarm.df_detections
 | |
|         for i in range(len(df)):
 | |
|             if df.values[i][1] in seen_values:  # remove duplicates
 | |
|                 continue
 | |
|             seen_values.append(df.values[i][1])
 | |
| 
 | |
|             entry_id = df.values[i][0]
 | |
|             kill_chain = []
 | |
|             try:
 | |
|                 if self.disarm.tactics[df.values[i][14]]:
 | |
|                     kill_chain.append(f'tactics:{self.disarm.tactics[df.values[i][14]]}')
 | |
|             except KeyError:
 | |
|                 pass
 | |
|             try:
 | |
|                 if self.disarm.responsetypes[df.values[i][10]]:
 | |
|                     kill_chain.append(f'responsetypes:{self.disarm.responsetypes[df.values[i][10]]}')
 | |
|             except KeyError:
 | |
|                 pass
 | |
|             # Metatechnique ID is not in the array
 | |
|             # if self.disarm.metatechniques[df.values[i][???]]:
 | |
|             #     kill_chain.append(f'metatechniques:{self.disarm.metatechniques[df.values[i][???]]}')
 | |
| 
 | |
|             related = []
 | |
|             # Techniques relations
 | |
|             mapping = self.disarm.cross_detectionid_techniqueid[
 | |
|                 self.disarm.cross_detectionid_techniqueid['disarm_id'] == entry_id]
 | |
|             for index, row in mapping.sort_values('technique_id').iterrows():
 | |
|                 related_id = row['technique_id']
 | |
|                 related.append({
 | |
|                     "dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
 | |
|                     "type": "detects"
 | |
|                 })
 | |
|             # Actortypes relations
 | |
|             mapping = self.disarm.cross_detectionid_actortypeid[
 | |
|                 self.disarm.cross_detectionid_actortypeid['disarm_id'] == entry_id]
 | |
|             for index, row in mapping.sort_values('actortype_id').iterrows():
 | |
|                 related_id = row['actortype_id']
 | |
|                 related.append({
 | |
|                     "dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
 | |
|                     "type": "detected-by"
 | |
|                     # mitigated-by would be cleaner, but mitigated-by does not exist as relationship type
 | |
|                 })
 | |
| 
 | |
|             value = {
 | |
|                 'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), entry_id)),
 | |
|                 'value': df.values[i][1],
 | |
|                 'description': df.values[i][3],
 | |
|                 'meta': {
 | |
|                     'external_id': entry_id,
 | |
|                     'kill_chain': kill_chain,
 | |
|                     'refs': [
 | |
|                         f'https://github.com/DISARMFoundation/DISARMframeworks/blob/main/generated_pages/{galaxy_type}/{entry_id}.md'
 | |
|                     ]
 | |
|                 },
 | |
|                 'related': related
 | |
|             }
 | |
|             values.append(value)
 | |
| 
 | |
|         cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
 | |
|         self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
 | |
| 
 | |
|     def generate_actortypes_galaxy(self):
 | |
|         galaxy_type = 'actortypes'
 | |
|         galaxy = {'name': 'Actor Types',
 | |
|                   'type': f'disarm-{galaxy_type}',
 | |
|                   'description': DISARM_DESCRIPTION,
 | |
|                   'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-galaxy-{galaxy_type}')),
 | |
|                   'version': 1,
 | |
|                   'icon': 'user-secret',
 | |
|                   'namespace': 'disarm',
 | |
|                   'kill_chain_order': {
 | |
|                       'sectors': []
 | |
|                   }}
 | |
| 
 | |
|         for k, v in self.disarm.sectors.items():
 | |
|             galaxy['kill_chain_order']['sectors'].append(f'{v}')
 | |
| 
 | |
|         self.write_json_file(os.path.join(self.out_path, 'galaxies', f'disarm-{galaxy_type}.json'), galaxy)
 | |
| 
 | |
|     def generate_actortypes_clusters(self):
 | |
|         galaxy_type = 'actortypes'
 | |
|         cluster = {'authors': DISARM_AUTHORS,
 | |
|                    'category': DISARM_CATEGORY,
 | |
|                    'description': DISARM_DESCRIPTION,
 | |
|                    'name': 'Actor Types',
 | |
|                    'source': DISARM_SOURCE,
 | |
|                    'type': f'disarm-{galaxy_type}',
 | |
|                    'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), f'disarm-cluster-{galaxy_type}')),
 | |
|                    'values': [],
 | |
|                    'version': 1}
 | |
|         values = []
 | |
|         seen_values = []
 | |
|         df = self.disarm.df_actortypes
 | |
|         for i in range(len(df)):
 | |
|             if df.values[i][1] in seen_values:  # remove duplicates
 | |
|                 continue
 | |
|             seen_values.append(df.values[i][1])
 | |
| 
 | |
|             entry_id = df.values[i][0]
 | |
|             kill_chain = []
 | |
|             try:
 | |
|                 sectors = df.values[i][3].split(',')
 | |
|                 for sector in sectors:
 | |
|                     sector = sector.strip()
 | |
|                     if self.disarm.sectors[sector]:
 | |
|                         kill_chain.append(f'sectors:{self.disarm.sectors[sector]}')
 | |
|             except KeyError:
 | |
|                 pass
 | |
| 
 | |
|             related = []
 | |
|             # Countermeasures relations
 | |
|             mapping = self.disarm.cross_counterid_actortypeid[
 | |
|                 self.disarm.cross_counterid_actortypeid['actortype_id'] == entry_id]
 | |
|             for index, row in mapping.sort_values('disarm_id').iterrows():
 | |
|                 related_id = row['disarm_id']
 | |
|                 related.append({
 | |
|                     "dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
 | |
|                     "type": "affects"
 | |
|                 })
 | |
|             # Detections relations
 | |
|             mapping = self.disarm.cross_detectionid_actortypeid[
 | |
|                 self.disarm.cross_detectionid_actortypeid['actortype_id'] == entry_id]
 | |
|             for index, row in mapping.sort_values('disarm_id').iterrows():
 | |
|                 related_id = row['disarm_id']
 | |
|                 related.append({
 | |
|                     "dest-uuid": str(uuid.uuid5(uuid.UUID(CORE_UUID), related_id)),
 | |
|                     "type": "detects"
 | |
|                 })
 | |
| 
 | |
|             value = {
 | |
|                 'uuid': str(uuid.uuid5(uuid.UUID(CORE_UUID), entry_id)),
 | |
|                 'value': df.values[i][1],
 | |
|                 'description': df.values[i][2],
 | |
|                 'meta': {
 | |
|                     'external_id': entry_id,
 | |
|                     'kill_chain': kill_chain,
 | |
|                     'refs': [
 | |
|                         f'https://github.com/DISARMFoundation/DISARMframeworks/blob/main/generated_pages/{galaxy_type}/{entry_id}.md'
 | |
|                     ]
 | |
|                 },
 | |
|                 'related': related
 | |
|             }
 | |
| 
 | |
|             values.append(value)
 | |
|         cluster['values'] = sorted(values, key=lambda x: x['meta']['external_id'])
 | |
|         self.write_json_file(os.path.join(self.out_path, 'clusters', f'disarm-{galaxy_type}.json'), cluster)
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     disarm_galaxy = DisarmGalaxy()
 | |
|     disarm_galaxy.generate_all_galaxies()
 | |
|     disarm_galaxy.generate_all_clusters()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | |
|     print("All done, please look at the delta, and update the version number if needed.")
 | |
|     print("After that do ./jq_all_the_things.sh, commit, and then ./validate_all.sh.")
 | 
