Skip to main content
All CollectionsSettings and AccountVulcan API
Creating tags from a CSV file using Vulcan Cyber ExposureOS API
Creating tags from a CSV file using Vulcan Cyber ExposureOS API
Updated over 2 weeks ago

Use case example for using Vulcan Cyber ExposureOS API to create tags on assets using external data source.

This Python script will create tags on images based on a CSV file containing the matching repository name. The tag reflects the image's owner according to the repository.

Image_tags.py

import csv
import json
from collections import defaultdict# region Config
import requestsBASE_URL = 'https://demo.vulcancyber.com'
FILE_PATH = '<FILE_PATH>'
FILE_NAME = '<FILE_NAME>.csv'
API_TOKEN = '<API_TOKEN>'
HEADERS = {
'Authorization': f'Bearer {API_TOKEN}',
'Content-Type': 'application/json'
}# endregion# region Methods
def __create_tag(tag_name, assets_ids):
data = {
'biz_group': False,
'name': tag_name,
'severity_score': 0,
'tag_join_method': 'intersection',
'tag_query': {
'Specific Assets': {'repositories': assets_ids}
}
}
create_tag_response = requests.post(f'{BASE_URL}/api/v1/tags/', headers=HEADERS, json=data)
if create_tag_response.ok:
print(f"Successfully created tag {tag_name} in Vulcan.")
else:
print(f"Failed creating tag {tag_name}. Errors: {create_tag_response.content.decode('utf-8')}")def __attach_assets_to_tag(tag_name, tag_id, assets_ids):
data = {
'biz_group': False,
'name': tag_name,
'severity_score': 0,
'tag_join_method': 'intersection',
'tag_query': {
'Specific Assets': {'repositories': assets_ids}
}
}
edit_tag_response = requests.put(f'{BASE_URL}/api/v1/tags/{tag_id}/', headers=HEADERS, json=data)
if edit_tag_response.ok:
print(f"Successfully edited tag {tag_name} in Vulcan.")
else:
print(f"Failed attaching assets to tag {tag_name}. Errors: {edit_tag_response.content.decode('utf-8')}")def __read_teams_file():
tags_to_assets_ids = defaultdict(list)
with open(f'{FILE_PATH}/{FILE_NAME}', 'r') as csvfile:
csvreader = csv.DictReader(csvfile, delimiter=',')
for row in csvreader: # your code
name = row['Name']
team = row['Team']
tags_to_assets_ids[team].append(name)
return tags_to_assets_idsdef __fetch_all_tags():
exist_tag_name_to_id = {}
tags_response = requests.get(f'{BASE_URL}/api/v1/tags/', headers=HEADERS)
if tags_response.ok:
print(f"Successfully fetched tags from Vulcan.")
res_json = json.loads(tags_response.content)
exist_tag_name_to_id = {tag['name']: tag['id'] for tag in res_json}
else:
print(f"Failed fetching tags from Vulcan. Errors: {tags_response.content.decode('utf-8')}")
return exist_tag_name_to_iddef __fetch_all_images():
exist_image_name_to_ids = defaultdict(list)
data = {
'business_group_id': 0,
'end_row': 100_000,
'filter': {},
'first_row': 0,
'sort_by': [],
}
images_response = requests.post(f'{BASE_URL}/api/v1/assets/repositories/', headers=HEADERS, json=data)
if images_response.ok:
print(f"Successfully fetched images from Vulcan.")
res_json = json.loads(images_response.content)
for image in res_json['data']:
exist_image_name_to_ids[image['path']].append(image['id'])
else:
print(f"Failed fetching images from Vulcan. Errors: {images_response.content.decode('utf-8')}")
return exist_image_name_to_idsdef __generate_tags(exist_tag_name_to_id, tags_to_assets_names, exist_image_name_to_ids):
for tag_name, assets_names in tags_to_assets_names.items():
tag_id = exist_tag_name_to_id.get(tag_name)
assets_ids = [exist_image_name_to_ids[name] for name in assets_names if exist_image_name_to_ids.get(name)]
flat_assets_ids = [item for sublist in assets_ids for item in sublist]
if flat_assets_ids:
if tag_id:
__attach_assets_to_tag(tag_name, tag_id, flat_assets_ids)
else:
__create_tag(tag_name, flat_assets_ids)# endregion# region Script
print(f'Started fetching vulcan tags')
exist_tag_name_to_id = __fetch_all_tags()
print(f'Fetched total of {len(exist_tag_name_to_id.keys())} vulcan tags')print(f'Started fetching vulcan images')
exist_image_name_to_ids = __fetch_all_images()
print(f'Fetched total of {len(exist_image_name_to_ids.keys())} vulcan images')print(f'Started reading teams csv')
tags_to_assets_ids = __read_teams_file()
print(f'Fetched total of {len(tags_to_assets_ids.keys())} teams from csv file')print(f'Generating teams as tags in vulcan')
__generate_tags(exist_tag_name_to_id, tags_to_assets_ids, exist_image_name_to_ids)
print(f'Finished script with success!')
# endregion
Did this answer your question?