Module framework.cloudhelpers.deployments
Source code
import random
import random
import os
import time
import sys
import jinja2
import google.auth
from googleapiclient import discovery
from googleapiclient.errors import HttpError
from . import iam, gcstorage
from .. import levels
import yaml
def _read_render_config(file_name, template_args={}, load_path=[]):
' ' 'Use load_path to set jinja env loader, if there are blocks in level yaml.
Parameters:
file_name (str): Relative file path with name. example core/levels/leastprivilege/c1project/c1project.yaml.
if load_path provided, should be file name only. example c1project.yaml
template_args (dict, optional): Dictionary of arguments to use when rendering the top level configuration template using Jinja2.
Keys should be strings that correspond to the names of variables in the Jinja template, and each corresponding value should be the passed value of the variable.
If not supplied, the top level configuration will not be treated as a template.
load_path (str): Relative path example core/levels/leastprivilege/c1project/
' ' '
if not load_path == []:
loader = jinja2.FileSystemLoader(searchpath=load_path)
env = jinja2.Environment(loader=loader)
content = env.get_template(file_name)
if not template_args == {}:
return content.render(**template_args)
else:
return content.render()
else:
with open(file_name) as f:
content = f.read()
if not template_args == {}:
return jinja2.Template(content).render(**template_args)
else:
#deployment will faild if there are jinja blocks in yaml
return content
def insert(level_path, template_files=[],
config_template_args={}, labels={}):
'''Inserts a deployment using deployment manager, importing any specified template files.
If template arguments are included, the top level configuration file will be rendered using Jinja2.
Parameters:
level_path (str): Relative path of the level from the levels/ directory
template_files (list of str, optional): List of paths of the template files that are used in the deployment configuration, starting with "core/".
The names of the templates in the configuration use the filenames of the templates, not the full paths.
config_template_args (dict, optional): Dictionary of arguments to use when rendering the top level configuration template using Jinja2.
Keys should be strings that correspond to the names of variables in the Jinja template, and each corresponding value should be the passed value of the variable.
If not supplied, the top level configuration will not be treated as a template.
labels (dict, optional): Dictionary of key/value pairs that will be included as labels on the deployment,
and can be retrieved later using `framework.cloudhelpers.deployments.get_labels`.
Labels are the recommended way to store any information that will be necessary for level deletion.
The keyword "level" is reserved for storing the active level path.
'''
# Get current credentials from environment variables and build deployment API object
credentials, project_id = google.auth.default()
deployment_api = discovery.build(
'deploymentmanager', 'v2', credentials=credentials)
level_name = os.path.basename(level_path)
# Create request to insert deployment
request_body = {
"name": "thunder",
"target": {
"config": {
"content": _read_render_config(
f'core/levels/{level_path}/{level_name}.yaml',
template_args=config_template_args)
},
"imports": []
},
"labels": []
}
# Add imports to deployment json
for template in template_files:
request_body['target']['imports'].append(
{"name": os.path.basename(template),
"content": _read_render_config(template)})
# If schema is present in sibling directory to template, import it
schema_path = f'{os.path.dirname(template)}/schema/{os.path.basename(template)}.schema'
if os.path.exists(schema_path):
request_body['target']['imports'].append(
{"name": os.path.basename(template) + '.schema',
"content": _read_render_config(schema_path)})
# Add labels to deployment json
for key in labels.keys():
if key == 'level':
exit('The label key "level" is reserved for storing the level path of the active deployment.')
request_body['labels'].append({
"key": key,
"value": labels[key]
})
request_body['labels'].append({
"key": 'level',
"value": level_path.replace('/', '-')
})
# Send insert request then wait for operation
operation = deployment_api.deployments().insert(
project=project_id, body=request_body).execute()
op_name = operation['name']
_wait_for_operation(op_name, deployment_api,
project_id, level_path=level_path)
def patch(level_path, template_files=[],
config_template_args={}, labels={}, second_deploy = False):
' ' 'Patches a deployment using deployment manager, importing any specified template files.
If template arguments are included, the top level configuration file will be rendered using Jinja2.
Parameters:
level_path (str): Relative path of the level from the levels/ directory
template_files (list of str, optional): List of paths of the template files that are used in the deployment configuration, starting with "core/ ".
The names of the templates in the configuration use the filenames of the templates, not the full paths.
config_template_args (dict, optional): Dictionary of arguments to use when rendering the top level configuration template using Jinja2.
Keys should be strings that correspond to the names of variables in the Jinja template, and each corresponding value should be the passed value of the variable.
If not supplied, the top level configuration will not be treated as a template.
labels (dict, optional): Dictionary of key/value pairs that will be included as labels on the deployment,
and can be retrieved later using `framework.cloudhelpers.deployments.get_labels`.
Labels are the recommended way to store any information that will be necessary for level deletion.
The keyword "level " is reserved for storing the active level path.
second_deploy (boolean): Automatically start destroy level and recreate if set to True
' ' '
# Get current credentials from environment variables and build deployment API object
credentials, project_id = google.auth.default()
deployment_api = discovery.build(
'deploymentmanager ', 'v2 ', credentials=credentials)
# get current deployment
current_depoy = deployment_api.deployments().get(
project=project_id, deployment= 'thunder ').execute()
labels = current_depoy[ 'labels ']
fingerprint = current_depoy[ 'fingerprint ']
level_name = os.path.basename(level_path)
#render patched contnent
content_patch = _read_render_config(
f '{level_name}_patch.yaml ',
template_args=config_template_args,
load_path = f 'core/levels/{level_path}/ '
)
# Create request to patch deployment
request_body = {
"name ": "thunder ",
"fingerprint ": fingerprint,
"target ": {
"config ": {
"content ": content_patch
},
"imports ": []
},
"labels ": [labels]
}
# Add imports to deployment json
for template in template_files:
request_body[ 'target '][ 'imports '].append(
{ "name ": os.path.basename(template),
"content ": _read_render_config(template)})
# If schema is present in sibling directory to template, import it
schema_path = f '{os.path.dirname(template)}/schema/{os.path.basename(template)}.schema '
if os.path.exists(schema_path):
request_body[ 'target '][ 'imports '].append(
{ "name ": os.path.basename(template) + '.schema ',
"content ": _read_render_config(schema_path)})
# Send patch request then wait for operation
try:
operation = deployment_api.deployments().patch(
project=project_id, deployment= 'thunder ', body=request_body).execute()
op_name = operation[ 'name ']
except Exception as e:
print(str(e))
if not second_deploy:
#destroy and restart deployment if error in patching operation
_wait_for_patch(op_name, deployment_api,
project_id, level_path=level_path)
else:
_wait_for_operation(op_name, deployment_api,
project_id, level_path=level_path)
def delete():
'''Deletes the active deployment.
Automatically empties and deletes any buckets in the deployment,
and deletes all IAM bindings of service accounts in the deployment.
This function should be called during level destruction
'''
_delete_resources()
# Get current credentials from environment variables and build deployment API object
credentials, project_id = google.auth.default()
deployment_api = discovery.build(
'deploymentmanager', 'v2', credentials=credentials)
# Send delete request
operation = deployment_api.deployments().delete(
project=project_id, deployment='thunder').execute()
op_name = operation['name']
_wait_for_operation(op_name, deployment_api, project_id)
def _delete_resources():
print('Deleting buckets and IAM entries')
# Get current credentials from environment variables and build deployment API object
credentials, project_id = google.auth.default()
deployment_api = discovery.build(
'deploymentmanager', 'v2', credentials=credentials)
manifest_url = deployment_api.deployments().get(
project=project_id, deployment='thunder').execute()['manifest']
manifest_name = os.path.basename(manifest_url)
manifest = deployment_api.manifests().get(deployment='thunder', project=project_id,
manifest=manifest_name).execute()
expanded_config = yaml.safe_load(manifest['expandedConfig'])
buckets = []
service_accounts = []
for resource in expanded_config['resources']:
if 'type' in resource:
if resource['type'] == 'storage.v1.bucket':
buckets.append(resource['name'])
if resource['type'] == 'iam.v1.serviceAccount':
service_accounts.append(
iam.service_account_email(resource['name']))
# Delete iam entries
if service_accounts:
iam.remove_iam_entries(service_accounts)
# Force delete buckets
for bucket_name in buckets:
gcstorage.delete_bucket(bucket_name)
def _wait_for_operation(op_name, deployment_api, project_id, level_path=None):
# Wait till operation finishes, giving updates every 5 seconds
op_done = False
t = 0
start_time = time.time()
time_string = ''
while not op_done:
time_string = f'[{int(t/60)}m {(t%60)//10}{t%10}s]'
sys.stdout.write(
f'\r{time_string} Deployment operation in progress...')
t += 5
while t < time.time()-start_time:
t += 5
time.sleep(t-(time.time()-start_time))
op_status = deployment_api.operations().get(
project=project_id,
operation=op_name).execute()['status']
op_done = (op_status == 'DONE')
sys.stdout.write(
f'\r{time_string} Deployment operation in progress... Done\n')
operation = op_status = deployment_api.operations().get(
project=project_id,
operation=op_name).execute()
if 'error' in operation and level_path:
print("\nDeployment Error:\n" + yaml.dump(operation['error']))
if 'y' == input('\nDeployment error caused deployment to fail. '
'Would you like to destroy the deployment [y] or continue [n]? [y/n] ').lower().strip()[0]:
level_module = levels.import_level(level_path)
level_module.destroy()
exit()
def _wait_for_patch(op_name, deployment_api, project_id, level_path=None):
# Wait till operation finishes, giving updates every 5 seconds
#destroy and restart deployment if error in patching operation
op_done = False
t = 0
start_time = time.time()
time_string = ''
while not op_done:
time_string = f'[{int(t/60)}m {(t%60)//10}{t%10}s]'
sys.stdout.write(
f'\r{time_string}Deployment patching in progress...')
t += 5
while t < time.time()-start_time:
t += 5
time.sleep(t-(time.time()-start_time))
op_status = deployment_api.operations().get(
project=project_id,
operation=op_name).execute()['status']
op_done = (op_status == 'DONE')
sys.stdout.write(
f'\r{time_string}Deployment patching in progress... Done\n')
operation = op_status = deployment_api.operations().get(
project=project_id,
operation=op_name).execute()
if 'error' in operation and level_path:
print("\nDeployment patching Error:\n" + yaml.dump(operation['error']))
print("\nSecond try of deploymnent")
level_module = levels.import_level(level_path)
level_module.destroy()
level_module.create(True)
def get_labels():
'''Queries the Deployment Manager API to retrieve the labels on the active level's deployment.
Returns:
dict: Dictionary of labels
'''
# Get current credentials from environment variables and build deployment API object
credentials, project_id = google.auth.default()
deployment_api = discovery.build(
'deploymentmanager', 'v2', credentials=credentials)
# Get deployment information
try:
deployment = deployment_api.deployments().get(
project=project_id,
deployment='thunder').execute()
except HttpError:
return None
# Get labels as list of k/v pairs
labels_list = deployment['labels']
# Insert all k/v pairs into python dictionary
labels_dict = {}
for label in labels_list:
labels_dict[label['key']] = label['value']
labels_dict['level'] = labels_dict['level'].replace('-', '/')
return labels_dict
def get_active_level():
'''Returns the active level path by querying the labels of the active deployment'''
labels = get_labels()
if labels:
return labels['level']
else:
return None
Functions
def delete()
-
Deletes the active deployment. Automatically empties and deletes any buckets in the deployment, and deletes all IAM bindings of service accounts in the deployment. This function should be called during level destruction
Source code
def delete(): '''Deletes the active deployment. Automatically empties and deletes any buckets in the deployment, and deletes all IAM bindings of service accounts in the deployment. This function should be called during level destruction ''' _delete_resources() # Get current credentials from environment variables and build deployment API object credentials, project_id = google.auth.default() deployment_api = discovery.build( 'deploymentmanager', 'v2', credentials=credentials) # Send delete request operation = deployment_api.deployments().delete( project=project_id, deployment='thunder').execute() op_name = operation['name'] _wait_for_operation(op_name, deployment_api, project_id)
def get_active_level()
-
Returns the active level path by querying the labels of the active deployment
Source code
def get_active_level(): '''Returns the active level path by querying the labels of the active deployment''' labels = get_labels() if labels: return labels['level'] else: return None
def get_labels()
-
Queries the Deployment Manager API to retrieve the labels on the active level's deployment.
Returns
dict
- Dictionary of labels
Source code
def get_labels(): '''Queries the Deployment Manager API to retrieve the labels on the active level's deployment. Returns: dict: Dictionary of labels ''' # Get current credentials from environment variables and build deployment API object credentials, project_id = google.auth.default() deployment_api = discovery.build( 'deploymentmanager', 'v2', credentials=credentials) # Get deployment information try: deployment = deployment_api.deployments().get( project=project_id, deployment='thunder').execute() except HttpError: return None # Get labels as list of k/v pairs labels_list = deployment['labels'] # Insert all k/v pairs into python dictionary labels_dict = {} for label in labels_list: labels_dict[label['key']] = label['value'] labels_dict['level'] = labels_dict['level'].replace('-', '/') return labels_dict
def insert(level_path, template_files=[], config_template_args={}, labels={})
-
Inserts a deployment using deployment manager, importing any specified template files. If template arguments are included, the top level configuration file will be rendered using Jinja2.
Parameters
level_path
:str
- Relative path of the level from the levels/ directory
template_files
:list
ofstr
, optional- List of paths of the template files that are used in the deployment configuration, starting with "core/". The names of the templates in the configuration use the filenames of the templates, not the full paths.
config_template_args
:dict
, optional- Dictionary of arguments to use when rendering the top level configuration template using Jinja2. Keys should be strings that correspond to the names of variables in the Jinja template, and each corresponding value should be the passed value of the variable. If not supplied, the top level configuration will not be treated as a template.
labels
:dict
, optional- Dictionary of key/value pairs that will be included as labels on the deployment,
and can be retrieved later using
get_labels()
. Labels are the recommended way to store any information that will be necessary for level deletion. The keyword "level" is reserved for storing the active level path.
Source code
def insert(level_path, template_files=[], config_template_args={}, labels={}): '''Inserts a deployment using deployment manager, importing any specified template files. If template arguments are included, the top level configuration file will be rendered using Jinja2. Parameters: level_path (str): Relative path of the level from the levels/ directory template_files (list of str, optional): List of paths of the template files that are used in the deployment configuration, starting with "core/". The names of the templates in the configuration use the filenames of the templates, not the full paths. config_template_args (dict, optional): Dictionary of arguments to use when rendering the top level configuration template using Jinja2. Keys should be strings that correspond to the names of variables in the Jinja template, and each corresponding value should be the passed value of the variable. If not supplied, the top level configuration will not be treated as a template. labels (dict, optional): Dictionary of key/value pairs that will be included as labels on the deployment, and can be retrieved later using `framework.cloudhelpers.deployments.get_labels`. Labels are the recommended way to store any information that will be necessary for level deletion. The keyword "level" is reserved for storing the active level path. ''' # Get current credentials from environment variables and build deployment API object credentials, project_id = google.auth.default() deployment_api = discovery.build( 'deploymentmanager', 'v2', credentials=credentials) level_name = os.path.basename(level_path) # Create request to insert deployment request_body = { "name": "thunder", "target": { "config": { "content": _read_render_config( f'core/levels/{level_path}/{level_name}.yaml', template_args=config_template_args) }, "imports": [] }, "labels": [] } # Add imports to deployment json for template in template_files: request_body['target']['imports'].append( {"name": os.path.basename(template), "content": _read_render_config(template)}) # If schema is present in sibling directory to template, import it schema_path = f'{os.path.dirname(template)}/schema/{os.path.basename(template)}.schema' if os.path.exists(schema_path): request_body['target']['imports'].append( {"name": os.path.basename(template) + '.schema', "content": _read_render_config(schema_path)}) # Add labels to deployment json for key in labels.keys(): if key == 'level': exit('The label key "level" is reserved for storing the level path of the active deployment.') request_body['labels'].append({ "key": key, "value": labels[key] }) request_body['labels'].append({ "key": 'level', "value": level_path.replace('/', '-') }) # Send insert request then wait for operation operation = deployment_api.deployments().insert( project=project_id, body=request_body).execute() op_name = operation['name'] _wait_for_operation(op_name, deployment_api, project_id, level_path=level_path)
def patch(level_path, template_files=[], config_template_args={}, labels={}, second_deploy = False)
-
Patches a deployment using deployment manager, importing any specified template files. If template arguments are included, the top level configuration file will be rendered using Jinja2.
Parameters
level_path
:str
- Relative path of the level from the levels/ directory
template_files
:list
ofstr
, optional- List of paths of the template files that are used in the deployment configuration, starting with "core/". The names of the templates in the configuration use the filenames of the templates, not the full paths.
config_template_args
:dict
, optional- Dictionary of arguments to use when rendering the top level configuration template using Jinja2. Keys should be strings that correspond to the names of variables in the Jinja template, and each corresponding value should be the passed value of the variable. If not supplied, the top level configuration will not be treated as a template.
labels
:dict
, optional- Dictionary of key/value pairs that will be included as labels on the deployment,
and can be retrieved later using
get_labels()
. Labels are the recommended way to store any information that will be necessary for level deletion. The keyword "level" is reserved for storing the active level path. second_deploy
:boolean
- Automatically start destroy level and recreate if set to True
Source code
def patch(level_path, template_files=[], config_template_args={}, labels={},second_deploy = False): ' ' 'Patches a deployment using deployment manager, importing any specified template files. If template arguments are included, the top level configuration file will be rendered using Jinja2. Parameters: level_path (str): Relative path of the level from the levels/ directory template_files (list of str, optional): List of paths of the template files that are used in the deployment configuration, starting with "core/ ". The names of the templates in the configuration use the filenames of the templates, not the full paths. config_template_args (dict, optional): Dictionary of arguments to use when rendering the top level configuration template using Jinja2. Keys should be strings that correspond to the names of variables in the Jinja template, and each corresponding value should be the passed value of the variable. If not supplied, the top level configuration will not be treated as a template. labels (dict, optional): Dictionary of key/value pairs that will be included as labels on the deployment, and can be retrieved later using `framework.cloudhelpers.deployments.get_labels`. Labels are the recommended way to store any information that will be necessary for level deletion. The keyword "level " is reserved for storing the active level path. second_deploy (boolean): Automatically start destroy level and recreate if set to True ' ' ' # Get current credentials from environment variables and build deployment API object credentials, project_id = google.auth.default() deployment_api = discovery.build( 'deploymentmanager ', 'v2 ', credentials=credentials) # get current deployment current_depoy = deployment_api.deployments().get( project=project_id, deployment= 'thunder ').execute() labels = current_depoy[ 'labels '] fingerprint = current_depoy[ 'fingerprint '] level_name = os.path.basename(level_path) #render patched contnent content_patch = _read_render_config( f '{level_name}_patch.yaml ', template_args=config_template_args, load_path = f 'core/levels/{level_path}/ ' ) # Create request to patch deployment request_body = { "name ": "thunder ", "fingerprint ": fingerprint, "target ": { "config ": { "content ": content_patch }, "imports ": [] }, "labels ": [labels] } # Add imports to deployment json for template in template_files: request_body[ 'target '][ 'imports '].append( { "name ": os.path.basename(template), "content ": _read_render_config(template)}) # If schema is present in sibling directory to template, import it schema_path = f '{os.path.dirname(template)}/schema/{os.path.basename(template)}.schema ' if os.path.exists(schema_path): request_body[ 'target '][ 'imports '].append( { "name ": os.path.basename(template) + '.schema ', "content ": _read_render_config(schema_path)}) # Send patch request then wait for operation try: operation = deployment_api.deployments().patch( project=project_id, deployment= 'thunder ', body=request_body).execute() op_name = operation[ 'name '] except Exception as e: print(str(e)) if not second_deploy: #destroy and restart deployment if error in patching operation _wait_for_patch(op_name, deployment_api, project_id, level_path=level_path) else: _wait_for_operation(op_name, deployment_api, project_id, level_path=level_path)