Use case example for using Vulcan API to create tags on assets using external data source.
This python script will create tags on images based on CSV file that contains matching repository name. the tag reflects the owner of the image according to the repository.
Image_tags.py
import csv
import json
from collections import defaultdict# region Config
import requestsBASE_URL = 'https://demo.vulcancyber.com'
FILE_PATH = '<FILE_PATH>'
FILE_NAME = '<FILE_NAME>.csv'
API_TOKEN = '<API_TOKEN>'
HEADERS = {
'Authorization': f'Bearer {API_TOKEN}',
'Content-Type': 'application/json'
}# endregion# region Methods
def __create_tag(tag_name, assets_ids):
data = {
'biz_group': False,
'name': tag_name,
'severity_score': 0,
'tag_join_method': 'intersection',
'tag_query': {
'Specific Assets': {'repositories': assets_ids}
}
}
create_tag_response = requests.post(f'{BASE_URL}/api/v1/tags/', headers=HEADERS, json=data)
if create_tag_response.ok:
print(f"Successfully created tag {tag_name} in Vulcan.")
else:
print(f"Failed creating tag {tag_name}. Errors: {create_tag_response.content.decode('utf-8')}")def __attach_assets_to_tag(tag_name, tag_id, assets_ids):
data = {
'biz_group': False,
'name': tag_name,
'severity_score': 0,
'tag_join_method': 'intersection',
'tag_query': {
'Specific Assets': {'repositories': assets_ids}
}
}
edit_tag_response = requests.put(f'{BASE_URL}/api/v1/tags/{tag_id}/', headers=HEADERS, json=data)
if edit_tag_response.ok:
print(f"Successfully edited tag {tag_name} in Vulcan.")
else:
print(f"Failed attaching assets to tag {tag_name}. Errors: {edit_tag_response.content.decode('utf-8')}")def __read_teams_file():
tags_to_assets_ids = defaultdict(list)
with open(f'{FILE_PATH}/{FILE_NAME}', 'r') as csvfile:
csvreader = csv.DictReader(csvfile, delimiter=',')
for row in csvreader: # your code
name = row['Name']
team = row['Team']
tags_to_assets_ids[team].append(name)
return tags_to_assets_idsdef __fetch_all_tags():
exist_tag_name_to_id = {}
tags_response = requests.get(f'{BASE_URL}/api/v1/tags/', headers=HEADERS)
if tags_response.ok:
print(f"Successfully fetched tags from Vulcan.")
res_json = json.loads(tags_response.content)
exist_tag_name_to_id = {tag['name']: tag['id'] for tag in res_json}
else:
print(f"Failed fetching tags from Vulcan. Errors: {tags_response.content.decode('utf-8')}")
return exist_tag_name_to_iddef __fetch_all_images():
exist_image_name_to_ids = defaultdict(list)
data = {
'business_group_id': 0,
'end_row': 100_000,
'filter': {},
'first_row': 0,
'sort_by': [],
}
images_response = requests.post(f'{BASE_URL}/api/v1/assets/repositories/', headers=HEADERS, json=data)
if images_response.ok:
print(f"Successfully fetched images from Vulcan.")
res_json = json.loads(images_response.content)
for image in res_json['data']:
exist_image_name_to_ids[image['path']].append(image['id'])
else:
print(f"Failed fetching images from Vulcan. Errors: {images_response.content.decode('utf-8')}")
return exist_image_name_to_idsdef __generate_tags(exist_tag_name_to_id, tags_to_assets_names, exist_image_name_to_ids):
for tag_name, assets_names in tags_to_assets_names.items():
tag_id = exist_tag_name_to_id.get(tag_name)
assets_ids = [exist_image_name_to_ids[name] for name in assets_names if exist_image_name_to_ids.get(name)]
flat_assets_ids = [item for sublist in assets_ids for item in sublist]
if flat_assets_ids:
if tag_id:
__attach_assets_to_tag(tag_name, tag_id, flat_assets_ids)
else:
__create_tag(tag_name, flat_assets_ids)# endregion# region Script
print(f'Started fetching vulcan tags')
exist_tag_name_to_id = __fetch_all_tags()
print(f'Fetched total of {len(exist_tag_name_to_id.keys())} vulcan tags')print(f'Started fetching vulcan images')
exist_image_name_to_ids = __fetch_all_images()
print(f'Fetched total of {len(exist_image_name_to_ids.keys())} vulcan images')print(f'Started reading teams csv')
tags_to_assets_ids = __read_teams_file()
print(f'Fetched total of {len(tags_to_assets_ids.keys())} teams from csv file')print(f'Generating teams as tags in vulcan')
__generate_tags(exist_tag_name_to_id, tags_to_assets_ids, exist_image_name_to_ids)
print(f'Finished script with success!')
# endregion