How to replace azure-storage
Continue of previous task.
from azure.storage.blob import BlockBlobService, ContentSettings, PublicAccess
BlockBlobService(
account_name=cluster_config["config"]["AZURE_STORAGE_ACCOUNT"],
account_key=cluster_config["config"]["AZURE_STORAGE_KEY"],
)
storage.create_container(container_name, public_access=PublicAccess.Blob)
storage.create_blob_from_bytes(
container_name, blob_name, content, content_settings=ContentSettings(content_type=content_type)
block_blob_service.create_blob_from_stream(
folder_name, filename, content, content_settings=ContentSettings(content_type=mimetype)
)
from azure.storage.blob import ContainerClient, ContentSettings, PublicAccess
account_name = cluster_config["config"]["AZURE_STORAGE_ACCOUNT"]
account_key = cluster_config["config"]["AZURE_STORAGE_KEY"]
account_url = f"https://{account_name}.blob.core.windows.net"
container_clients.append(
ContainerClient(account_url=account_url, credential=account_key, container_name=container_name)
if not container_client.exists():
container_client.create_container(public_access=PublicAccess.Blob) # exception, or overwrite true
container_client.get_blob_client(blob_name).upload_blob(
content, content_settings=ContentSettings(content_type=content_type) , overwrite=Tru
If your code looks like this:
import httplib2
from oauth2client.client import GoogleCredentials, OAuth2Credentials
from oauth2client.service_account import ServiceAccountCredentials
credentials = OAuth2Credentials.from_json(credentials_json)
# to get list of domains
service = discovery.build("admin", "directory_v1", credentials=credentials)
request = service.domains().list(customer="my_customer")
result = request.execute()
# to get user signature
scopes = ["https://www.googleapis.com/auth/gmail.settings.basic"]
credentials = ServiceAccountCredentials.from_json_keyfile_name("google_config.json", scopes=scopes)
credentials = credentials.create_delegated(email)
service = discovery.build("gmail", "v1", credentials=credentials)
request = service.users().settings().sendAs().get(userId=email, sendAsEmail=email)
signature = request.execute()["signature"]
# to refresh
if credentials.access_token_expired:
credentials.refresh(httplib2.Http())
You’ll need only to replace credentials with version from google-auth.
from datetime import datetime
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
credentials_dict = json.loads(credentials_json) # credentials_json are credentials saved from oauth2client
credentials = google.oauth2.credentials.Credentials(
token=credentials_dict["access_token"],
refresh_token=credentials_dict["refresh_token"],
token_uri=credentials_dict["token_uri"],
client_id=credentials_dict["client_id"],
client_secret=credentials_dict["client_secret"],
scopes=credentials_dict["scopes"],
expiry=datetime.fromisoformat(credentials_dict["token_expiry"][:-1])
)
# to get list of domains - the same
service = discovery.build("admin", "directory_v1", credentials=credentials)
request = service.domains().list(customer="my_customer")
result = request.execute()
# to get user signature - almost the same
scopes = ["https://www.googleapis.com/auth/gmail.settings.basic"]
credentials = service_account.Credentials.from_service_account_file("google_config.json", scopes=scopes)
credentials = credentials.with_subject(email) # with_subject instead create_delegated
service = discovery.build("gmail", "v1", credentials=cred_serv2)
request = service.users().settings().sendAs().get(userId=email, sendAsEmail=email)
signature = request.execute()["signature"]
# to refresh
if credentials.expired:
credentials.refresh(Request()) # Request() instead httplib2.Http()
Authorization flow
Main steps with oauth2client:
import httplib2
from oauth2client.client import HttpAccessTokenRefreshError, OAuth2Credentials, OAuth2WebServerFlow
# init flow
scopes = [
"https://www.googleapis.com/auth/admin.directory.user.readonly",
"https://www.googleapis.com/auth/admin.directory.group.readonly",
"https://www.googleapis.com/auth/admin.directory.domain.readonly",
"https://www.googleapis.com/auth/admin.directory.userschema.readonly",
]
flow = OAuth2WebServerFlow(
client_id=GOOGLE_CLIENT_ID,
client_secret=GOOGLE_CLIENT_SECRET,
scope=scope,
redirect_uri=url_for("authorisation.google", _external=True),
access_type="offline",
prompt="consent",
# 1. redirect to google
...
auth_uri = flow.step1_get_authorize_url()
return redirect(auth_uri)
...
# 2. get credentials in auhtorisation.google route after redirect from google
...
auth_code = request.args.get("code")
credentials = flow.step2_exchange(auth_code)
...
With google_auth_oauthlib it’s mostly the same too:
from google.auth.exceptions import RefreshError
from google_auth_oauthlib.flow import Flow
# init flow
scopes = [
"https://www.googleapis.com/auth/admin.directory.user.readonly",
"https://www.googleapis.com/auth/admin.directory.group.readonly",
"https://www.googleapis.com/auth/admin.directory.domain.readonly",
"https://www.googleapis.com/auth/admin.directory.userschema.readonly",
]
redirect_uri = url_for("authorisation.google", _external=True)
flow = Flow.from_client_config(
{
"web": {
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"redirect_uris": [redirect_uri],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
}
},
scopes=scopes,
)
flow.redirect_uri = redirect_uri
# 1. redirect to google
...
authorization_url, state = flow.authorization_url(access_type="offline", prompt="consent")
return redirect(authorization_url)
...
# 2. get credentials in auhtorisation.google route after redirect from google
...
authorization_response = request.url
flow.fetch_token(authorization_response=authorization_response)
credentials = flow.credentials
...
To test it without https you’ll need to set environment variable OAUTHLIB_INSECURE_TRANSPORT=1