How to replace azure-storage

Continue of previous task.


from azure.storage.blob import BlockBlobService, ContentSettings, PublicAccess
BlockBlobService(
                account_name=cluster_config["config"]["AZURE_STORAGE_ACCOUNT"],
                account_key=cluster_config["config"]["AZURE_STORAGE_KEY"],
            )
storage.create_container(container_name, public_access=PublicAccess.Blob)
        storage.create_blob_from_bytes(
            container_name, blob_name, content, content_settings=ContentSettings(content_type=content_type) 
block_blob_service.create_blob_from_stream(
                folder_name, filename, content, content_settings=ContentSettings(content_type=mimetype)
            )                       

from azure.storage.blob import ContainerClient, ContentSettings, PublicAccess
account_name = cluster_config["config"]["AZURE_STORAGE_ACCOUNT"]
        account_key = cluster_config["config"]["AZURE_STORAGE_KEY"]
        account_url = f"https://{account_name}.blob.core.windows.net"
        container_clients.append(
            ContainerClient(account_url=account_url, credential=account_key, container_name=container_name)
if not container_client.exists():
            container_client.create_container(public_access=PublicAccess.Blob)  # exception, or overwrite true
        container_client.get_blob_client(blob_name).upload_blob(
            content, content_settings=ContentSettings(content_type=content_type)            , overwrite=Tru

If your code looks like this:

import httplib2
from oauth2client.client import GoogleCredentials, OAuth2Credentials
from oauth2client.service_account import ServiceAccountCredentials

credentials = OAuth2Credentials.from_json(credentials_json)

# to get list of domains
service = discovery.build("admin", "directory_v1", credentials=credentials)
request = service.domains().list(customer="my_customer")
result = request.execute()

# to get user signature
scopes = ["https://www.googleapis.com/auth/gmail.settings.basic"]
credentials = ServiceAccountCredentials.from_json_keyfile_name("google_config.json", scopes=scopes)
credentials = credentials.create_delegated(email)
service = discovery.build("gmail", "v1", credentials=credentials)
request = service.users().settings().sendAs().get(userId=email, sendAsEmail=email)
signature = request.execute()["signature"]

# to refresh
if credentials.access_token_expired:
    credentials.refresh(httplib2.Http())

You’ll need only to replace credentials with version from google-auth.

from datetime import datetime
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request

credentials_dict = json.loads(credentials_json)  # credentials_json are credentials saved from oauth2client
credentials = google.oauth2.credentials.Credentials(
    token=credentials_dict["access_token"],
    refresh_token=credentials_dict["refresh_token"],
    token_uri=credentials_dict["token_uri"],
    client_id=credentials_dict["client_id"],
    client_secret=credentials_dict["client_secret"],
    scopes=credentials_dict["scopes"],
    expiry=datetime.fromisoformat(credentials_dict["token_expiry"][:-1])
)

# to get list of domains - the same
service = discovery.build("admin", "directory_v1", credentials=credentials)
request = service.domains().list(customer="my_customer")
result = request.execute()

# to get user signature - almost the same
scopes = ["https://www.googleapis.com/auth/gmail.settings.basic"]
credentials = service_account.Credentials.from_service_account_file("google_config.json", scopes=scopes)
credentials = credentials.with_subject(email)  # with_subject instead create_delegated
service = discovery.build("gmail", "v1", credentials=cred_serv2)
request = service.users().settings().sendAs().get(userId=email, sendAsEmail=email)
signature = request.execute()["signature"]

# to refresh
if credentials.expired:
    credentials.refresh(Request())  # Request() instead httplib2.Http()

Authorization flow

Main steps with oauth2client:

import httplib2
from oauth2client.client import HttpAccessTokenRefreshError, OAuth2Credentials, OAuth2WebServerFlow

# init flow
scopes = [
    "https://www.googleapis.com/auth/admin.directory.user.readonly",
    "https://www.googleapis.com/auth/admin.directory.group.readonly",
    "https://www.googleapis.com/auth/admin.directory.domain.readonly",
    "https://www.googleapis.com/auth/admin.directory.userschema.readonly",
]

flow = OAuth2WebServerFlow(
        client_id=GOOGLE_CLIENT_ID,
        client_secret=GOOGLE_CLIENT_SECRET,
        scope=scope,
        redirect_uri=url_for("authorisation.google", _external=True),
        access_type="offline",
        prompt="consent",

# 1. redirect to google
...
auth_uri = flow.step1_get_authorize_url()
return redirect(auth_uri)
...

# 2. get credentials in auhtorisation.google route after redirect from google
...
auth_code = request.args.get("code")
credentials = flow.step2_exchange(auth_code)
...

With google_auth_oauthlib it’s mostly the same too:

from google.auth.exceptions import RefreshError
from google_auth_oauthlib.flow import Flow

# init flow
scopes = [
    "https://www.googleapis.com/auth/admin.directory.user.readonly",
    "https://www.googleapis.com/auth/admin.directory.group.readonly",
    "https://www.googleapis.com/auth/admin.directory.domain.readonly",
    "https://www.googleapis.com/auth/admin.directory.userschema.readonly",
]

redirect_uri = url_for("authorisation.google", _external=True)
flow = Flow.from_client_config(
    {
        "web": {
            "client_id": GOOGLE_CLIENT_ID,
            "client_secret": GOOGLE_CLIENT_SECRET,
            "redirect_uris": [redirect_uri],
            "auth_uri": "https://accounts.google.com/o/oauth2/auth",
            "token_uri": "https://accounts.google.com/o/oauth2/token",
        }
    },
    scopes=scopes,
)
flow.redirect_uri = redirect_uri

# 1. redirect to google
...
authorization_url, state = flow.authorization_url(access_type="offline", prompt="consent")
return redirect(authorization_url)
...

# 2. get credentials in auhtorisation.google route after redirect from google
...
authorization_response = request.url
flow.fetch_token(authorization_response=authorization_response)
credentials = flow.credentials
...

To test it without https you’ll need to set environment variable OAUTHLIB_INSECURE_TRANSPORT=1

comments powered by Disqus