''' Manage AMITT metadata Create a page for each of the AMITT objects, if it doesn't already exist. If it does exist, update the metadata on it, and preserve any hand- created notes below the metadata area in it. * todo: add all framework comments to the repo issues list ''' import pandas as pd import numpy as np import os class Amitt: def __init__(self, infile = '../AMITT_MASTER_DATA/AMITT_TTPs_MASTER.xlsx'): # Load metadata from file metadata = {} xlsx = pd.ExcelFile(infile) for sheetname in xlsx.sheet_names: metadata[sheetname] = xlsx.parse(sheetname) # Create individual tables and dictionaries self.phases = metadata['phases'] self.techniques = metadata['techniques'] self.tasks = metadata['tasks'] self.incidents = metadata['incidents'] self.it = self.create_incident_technique_crosstable(metadata['incidenttechniques']) tactechs = self.techniques.groupby('tactic')['id'].apply(list).reset_index().rename({'id':'techniques'}, axis=1) self.tactics = metadata['tactics'].merge(tactechs, left_on='id', right_on='tactic', how='left').fillna('').drop('tactic', axis=1) self.phasedict = self.make_object_dict(self.phases) self.tacdict = self.make_object_dict(self.tactics) self.techdict = self.make_object_dict(self.techniques) self.ngridrows = max(tactechs['techniques'].apply(len)) +2 self.ngridcols = len(self.tactics) self.grid = self.create_display_grid() def create_incident_technique_crosstable(self, it_metadata): # Generate full cross-table between incidents and techniques it = it_metadata it.index=it['id'] it = it['techniques'].str.split(',').apply(lambda x: pd.Series(x)).stack().reset_index(level=1, drop=True).to_frame('technique').reset_index().merge(it.drop('id', axis=1).reset_index()).drop('techniques', axis=1) it = it.merge(self.incidents[['id','name']], left_on='incident', right_on='id', suffixes=['','_incident']).drop('incident', axis=1) it = it.merge(self.techniques[['id','name']], left_on='technique', right_on='id', suffixes=['','_technique']).drop('technique', axis=1) return(it) def make_object_dict(self, df): return(pd.Series(df.name.values,index=df.id).to_dict()) def create_display_grid(self, tofile=True): # Create the master grid that we make all the framework visuals from # cols = number of tactics # rows = max number of techniques per tactic + 2 arr = [['' for i in range(self.ngridcols)] for j in range(self.ngridrows)] for index, tactic in self.tactics.iterrows(): arr[0][index] = tactic['phase'] arr[1][index] = tactic['id'] if tactic['techniques'] == '': continue for index2, technique in enumerate(tactic['techniques']): arr[index2+2][index] = technique #Save grid to file if tofile: matrixdir = '../matrices' if not os.path.exists(matrixdir): os.makedirs(matrixdir) pd.DataFrame(arr).to_csv(matrixdir + '/matrix_arr.csv', index=False, header=False) return(arr) def create_incidentstring(self, techniqueid): incidentstr = ''' | Incident | Descriptions given for this incident | | -------- | -------------------- | ''' incirow = '| [{0} {1}](../incidents/{0}.md) | {2} |\n' its = self.it[self.it['id_technique']==techniqueid] for index, row in its[['id_incident', 'name_incident']].drop_duplicates().sort_values('id_incident').iterrows(): techstring = ', '.join(its[its['id_incident']==row['id_incident']]['name'].to_list()) incidentstr += incirow.format(row['id_incident'], row['name_incident'], techstring) return incidentstr def create_techstring(self, incidentid): techstr = ''' | Technique | Description given for this incident | | --------- | ------------------------- | ''' techrow = '| [{0} {1}](../techniques/{0}.md) | {2} {3} |\n' techlist = self.it[self.it['id_incident'] == incidentid] for index, row in techlist.sort_values('id_technique').iterrows(): techstr += techrow.format(row['id_technique'], row['name_technique'], row['id'], row['name']) return techstr def create_taskstring(self, tacticid): taskstr = ''' | Task | | ---- | ''' tasklist = self.tasks[self.tasks['tactic']==tacticid] taskrow = '| [{0} {1}](../tasks/{0}.md) |\n' for index, row in tasklist.sort_values('id').iterrows(): taskstr += taskrow.format(row['id'], row['name']) return taskstr def create_techtacstring(self, tacticid): techstr = ''' | Technique | | --------- | ''' techlist = self.techniques[self.techniques['tactic']==tacticid] techrow = '| [{0} {1}](../techniques/{0}.md) |\n' for index, row in techlist.sort_values('id').iterrows(): techstr += techrow.format(row['id'], row['name']) return techstr def generate_datasheets(self): # Generate datafiles warntext = 'DO NOT EDIT ABOVE THIS LINE - PLEASE ADD NOTES BELOW' warnlen = len(warntext) metadata = { 'phase': self.phases, 'tactic': self.tactics, 'technique': self.techniques, 'task': self.tasks, 'incident': self.incidents } for entity, df in metadata.items(): entities = entity + 's' entitydir = '../{}'.format(entities) if not os.path.exists(entitydir): os.makedirs(entitydir) template = open('template_{}.md'.format(entity)).read() for index, row in df[df['name'].notnull()].iterrows(): # First read in the file - if it exists - and grab everything # below the "do not write about this line". Will write this # out below new metadata. datafile = '../{}/{}.md'.format(entities, row['id']) oldmetatext = '' if os.path.exists(datafile): with open(datafile) as f: filetext = f.read() warnpos = filetext.find(warntext) if warnpos == -1: print('no warning text found in {}: adding to file'.format(datafile)) usertext = filetext else: oldmetatext = filetext[:warnpos+warnlen] usertext = filetext[warnpos+warnlen:] else: usertext = '' # Now populate datafiles with new metadata plus old userdata if entity == 'phase': metatext = template.format(id=row['id'], name=row['name'], summary=row['summary']) if entity == 'tactic': metatext = template.format(id=row['id'], name=row['name'], phase=row['phase'], summary=row['summary'], tasks=self.create_taskstring(row['id']), techniques=self.create_techtacstring(row['id'])) if entity == 'task': metatext = template.format(id=row['id'], name=row['name'], tactic=row['tactic'], summary=row['summary']) if entity == 'technique': metatext = template.format(id=row['id'], name=row['name'], tactic=row['tactic'], summary=row['summary'], incidents=self.create_incidentstring(row['id'])) if entity == 'incident': metatext = template.format(id=row['id'], name=row['name'], type=row['type'], summary=row['summary'], yearstarted=row['Year Started'], fromcountry=row['From country'], tocountry=row['To country'], foundvia=row['Found via'], dateadded=row['When added'], techniques=self.create_techstring(row['id'])) # Make sure the user data goes in if (metatext + warntext) != oldmetatext: print('Updating {}'.format(datafile)) with open(datafile, 'w') as f: f.write(metatext) f.write(warntext) f.write(usertext) f.close() return def write_grid_markdown(self, outfile = '../matrix.md'): # Write HTML version of framework diagram to markdown file # Needs phasedict, tacdict, techdict, grid html = '''# AMITT Latest Framework: ''' for col in range(self.ngridcols): html += '\n'.format( self.grid[0][col], self.phasedict[self.grid[0][col]]) html += '\n' html += '\n' for col in range(self.ngridcols): html += '\n'.format( self.grid[1][col], self.tacdict[self.grid[1][col]]) html += '\n\n' for row in range(2,self.ngridrows): for col in range(self.ngridcols): if self.grid[row][col] == '': html += '\n' else: html += '\n'.format( self.grid[row][col], self.techdict[self.grid[row][col]]) html += '\n\n' html += '\n
{0} {1}
{0} {1}
{0} {1}
\n' with open(outfile, 'w') as f: f.write(html) print('updated {}'.format(outfile)) return def write_incidentlist_markdown(self, outfile='../incidents.md'): # Write HTML version of incident list to markdown file html = '''# AMITT Incidents: ''' cols = ['name', 'type', 'Year Started', 'From country', 'To country', 'Found via'] html += '\n'.format('id') for col in cols: html += '\n'.format(col) html += '\n' for index, row in self.incidents[self.incidents['name'].notnull()].iterrows(): html += '\n' html += '\n'.format(row['id']) for col in cols: html += '\n'.format(row[col]) html += '\n' html += '
{}{}
{0}{}
\n' with open(outfile, 'w') as f: f.write(html) print('updated {}'.format(outfile)) return def write_grid_message_generator(self, outfile='../matrix_to_message.html'): # Write clickable html version of the matrix grid to html file html = ''' AMITT

AMITT

''' html += '\n' for col in range(self.ngridcols): html += '\n'.format(self.grid[0][col], self.phasedict[self.grid[0][col]]) html += '\n' html += '\n' for col in range(self.ngridcols): html += '\n'.format(self.grid[1][col], self.tacdict[self.grid[1][col]]) html += '\n' liststr = '' html += '\n' for row in range(2,self.ngridrows): for col in range(self.ngridcols): techid = self.grid[row][col] if techid == '': html += '\n' else: html += '\n'.format( techid, self.techdict[techid]) liststr += '\n'.format( techid, self.techdict[techid]) html += '\n\n' html += '\n
{0} {1}
{0} {1}
{0} {1}
\n
\n' html += '\n'.format(liststr) html += ''' ''' with open(outfile, 'w') as f: f.write(html) print('updated {}'.format(outfile)) return def print_technique_incidents(self): for id_technique in self.techniques['id'].to_list(): print('{}\n{}'.format(id_technique, self.create_incidentstring(id_technique))) return def print_incident_techniques(self): for id_incident in self.incidents['id'].to_list(): print('{}\n{}'.format(id_incident, self.create_techstring(id_incident))) return def generate_datafiles(self): self.generate_datasheets() self.write_grid_markdown() self.write_incidentlist_markdown() self.write_grid_message_generator() return def main(): amitt = Amitt() amitt.generate_datafiles() if __name__ == "__main__": main()