Module framework.cloudhelpers.gcstorage
Source code
import os
from google.cloud import storage
def upload_directory_recursive(top_dir_path, bucket_name):
'''Recursively uploads all the files in the given directory path to the bucket with the given name.
Parameters:
top_dir_path (str): Relative path to the directory that will be uploaded, starting with "core/"
bucket_name (str): Name of the GCS bucket.
'''
# Get bucket object
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
# Recursively go through the subfiles and subdirectories of the directory and upload the files
for dir_path, subdir_paths, f_names in os.walk(top_dir_path):
for f in f_names:
abs_path = dir_path + '/' + f
rel_path = abs_path.replace(top_dir_path+'/', '')
blob = storage.Blob(rel_path, bucket)
with open(abs_path, 'rb') as f:
blob.upload_from_file(f)
def delete_bucket(bucket_name):
'''Deletes the bucket of the given name if it exists, even if there are objects in the bucket.'''
# Forcefully delete bucket to also get rid of items inside bucket
storage_client = storage.Client()
bucket = storage_client.lookup_bucket(bucket_name)
if bucket:
bucket.delete(force=True)
Functions
def delete_bucket(bucket_name)
-
Deletes the bucket of the given name if it exists, even if there are objects in the bucket.
Source code
def delete_bucket(bucket_name): '''Deletes the bucket of the given name if it exists, even if there are objects in the bucket.''' # Forcefully delete bucket to also get rid of items inside bucket storage_client = storage.Client() bucket = storage_client.lookup_bucket(bucket_name) if bucket: bucket.delete(force=True)
def upload_directory_recursive(top_dir_path, bucket_name)
-
Recursively uploads all the files in the given directory path to the bucket with the given name.
Parameters
top_dir_path
:str
- Relative path to the directory that will be uploaded, starting with "core/"
bucket_name
:str
- Name of the GCS bucket.
Source code
def upload_directory_recursive(top_dir_path, bucket_name): '''Recursively uploads all the files in the given directory path to the bucket with the given name. Parameters: top_dir_path (str): Relative path to the directory that will be uploaded, starting with "core/" bucket_name (str): Name of the GCS bucket. ''' # Get bucket object storage_client = storage.Client() bucket = storage_client.get_bucket(bucket_name) # Recursively go through the subfiles and subdirectories of the directory and upload the files for dir_path, subdir_paths, f_names in os.walk(top_dir_path): for f in f_names: abs_path = dir_path + '/' + f rel_path = abs_path.replace(top_dir_path+'/', '') blob = storage.Blob(rel_path, bucket) with open(abs_path, 'rb') as f: blob.upload_from_file(f)