GCP

GCP를 활용한 데이터 자동화(MongoDB, CloudSQL, GA, Dune)

Somaz 2023. 12. 22. 00:50
728x90
반응형

Overview

Google Cloud Platform(GCP)를 활용한 데이터 자동화 방법에 대해서 공부해보려고 한다.

 

Data Workflow

 


데이터 자동화

Daily 업무는 `오늘 날짜 기준으로 어제의 데이터`를 가져와서 구글시트에 입력한다.

따라서 Daily 업무의 모든 함수는 `이번달을 기준으로 지난달 마지막날 데이터 ~ 오늘까지의 데이터`를 조회한다.

 

Monthly 업무는 `이번달 기준으로 지난달의 데이터`를 가져와서 구글시트에 입력한다.

따라서 Monthly 업무의 모든 함수는 `이번달을 기준으로 지난달 데이터`를 조회한다.

 

GCP 서비스들을 사용하여 데이터 업무를 자동화한다.

  • BigQuery
  • Dataflow
  • Cloud Functions
  • Cloud Scheduler
  • Cloud Storage
  • Artifact Registry

 


 

DB → BigQuery

참고할 수 있는 코드는 아래의 링크에 있다.

 


 

MongoDB to BigQuery

  1. 서비스 계정을 생성하고 BigQuery 데이터셋을 설정
  2. MongoDB에서 BigQuery로 데이터를 변환하기 위해 Cloud Function을 준비하고 배포
  3. Cloud Scheduler를 사용하여 이 Cloud Function을 정기적으로 트리거
  4. Dataflow 작업을 통해 MongoDB 데이터를 BigQuery로 로드
  5. BigQuery에서 중복 데이터를 제거하기 위한 별도의 Cloud Function을 준비하고 배포
  6. 다른 Cloud Scheduler 작업을 사용하여 이 중복 제거 Cloud Function을 정기적으로 트리거

MongoDB to BigQuery

 


 

CloudSQL to BigQuery

CloudSQL은 GCP의 서비스이기 때문에 Connection을 제공해준다.

 

출처 : ChatGPT

 


 

Query 변경

  • MongoDB Query → BigQuery
  • CloudSQL Query → BigQuery

 


 

Query Example

 

아래와 같이 Query를 전부 변경해준다.

 

  • MongoDB Query
db = db.getSiblingDB("production");

db.getCollection("mongologs").aggregate([
    {
        $match: {
            $or: [
                {
                    time: {
                        $gte: ISODate(new Date(Date.UTC(new Date().getFullYear(), new Date().getMonth(), 1)).toISOString()),
                        $lt: ISODate(new Date(Date.UTC(new Date().getFullYear(), new Date().getMonth() + 1, 1)).toISOString())
                    }
                },
                {
                    time: {
                        $gte: ISODate(new Date(Date.UTC(new Date().getFullYear(), new Date().getMonth() - 1, new Date(new Date().getFullYear(), new Date().getMonth(), 0).getDate())).toISOString()),
                        $lt: ISODate(new Date(Date.UTC(new Date().getFullYear(), new Date().getMonth(), 1)).toISOString())
                    }
                }
            ],
            reason: "/user/create",
            collectionName: { $in: ["create_user_detail"] }
        }
    },
    {
        $group: {
            "_id": {
                "$dateToString": {
                    "format": "%Y-%m-%d",
                    "date": "$time"
                }
            },
            count: { $sum: 1 }
        }
    },
    {
        $sort: { _id: 1 }
    }
]);

 

 

  • BigQuery
SELECT
  FORMAT_DATE("%Y-%m-%d", PARSE_TIMESTAMP('%a %b %d %H:%M:%S UTC %Y', time)) AS date,
  COUNT(*) as count
FROM
  `mgmt-2023.mongodb_dataset.production-mongodb-internal-table`
WHERE
  reason = '/user/create'
  AND collectionName IN UNNEST(["create_user_detail"])
  AND PARSE_TIMESTAMP('%a %b %d %H:%M:%S UTC %Y', time) >= TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), MONTH)
GROUP BY
  date
ORDER BY
  date;

 

 


 

기타 설정


 

Dune API Key

Dune에 있는 쿼리들을 자동화하기 위해서는 API Key가 필요하다.

 

API Key 생성 후 확인

 

 


 

Google Sheet 권한 설정

자동화할 모든 Google Sheet에 사용하는 Service Account 권한설정을 해야한다.

 

원하는 Google Sheet에 가서 `파일 - 공유` 에 가서 권한설정을 해주면 된다.

 

 

주의! Cloud Functions와 Cloud Scheduler는 자신의 프로젝트 Service Account를 사용해야 한다.

다른 프로젝트의 Service Account를 사용해서 생성할 수 없다.


 

Example Code

예시 코드를 작성해보도록 한다. 더 많은 코드를 확인하고 싶으면 Reference에 있는 Github Repo로 가서 확인하면 된다.

 

mongodb-bigquery-workflow.tf

## Service Account ##
module "service_accounts_bigquery" {
  source       = "../../modules/service_accounts"
  project_id   = var.project
  names        = ["bigquery"]
  display_name = "bigquery"
  description  = "bigquery admin"
}

## Biguery MongoDB Dataset ##
resource "google_bigquery_dataset" "mongodb_dataset" {
  dataset_id    = "mongodb_dataset"
  friendly_name = "mongodb_dataset"
  description   = "This is a Mongodb dataset"
  location      = var.region
  labels        = local.default_labels

  access {
    role          = "OWNER"
    user_by_email = module.service_accounts_bigquery.email
  }

  access {
    role          = "OWNER"
    user_by_email = "terraform@mgmt-2023.iam.gserviceaccount.com"
  }
}

## cloudfunction source Bucket
resource "google_storage_bucket" "cloud_function_storage" {

  name                        = "mongodb-bigquery-sheet-cloud-function-storage"
  location                    = var.region
  labels                      = local.default_labels
  uniform_bucket_level_access = true
  force_destroy               = true
}

## mongodb -> bigquery table workflow
resource "null_resource" "mongodb_bigquery_zip_cloud_function" {
  depends_on = [google_bigquery_dataset.mongodb_dataset, google_storage_bucket.cloud_function_storage]

  provisioner "local-exec" {
    command = <<EOT
      cd ./mongodb-to-bigquery
      zip -r mongodb-to-bigquery.zip main.py requirements.txt
    EOT
  }

  triggers = {
    main_content_hash         = filesha256("./mongodb-to-bigquery/main.py")
    requirements_content_hash = filesha256("./mongodb-to-bigquery/requirements.txt")
  }
}

resource "google_storage_bucket_object" "mongodb_bigquery_cloudfunction_archive" {
  depends_on = [null_resource.mongodb_bigquery_zip_cloud_function]

  name   = "source/mongodb-to-bigquery.zip"
  bucket = google_storage_bucket.cloud_function_storage.name
  source = "./mongodb-to-bigquery/mongodb-to-bigquery.zip"
}

## cloud_function
resource "google_cloudfunctions_function" "mongodb_bigquery_dataflow_function" {
  depends_on = [null_resource.mongodb_bigquery_zip_cloud_function, google_storage_bucket_object.mongodb_bigquery_cloudfunction_archive]

  name                  = "mongodb-to-bigquery-dataflow"
  description           = "Function to mongodb-to-bigquery the Dataflow job"
  runtime               = "python38"
  service_account_email = module.service_accounts_bigquery.email
  docker_registry       = "ARTIFACT_REGISTRY"
  timeout               = 540

  available_memory_mb   = 512
  source_archive_bucket = google_storage_bucket.cloud_function_storage.name
  source_archive_object = google_storage_bucket_object.mongodb_bigquery_cloudfunction_archive.name
  trigger_http          = true
  entry_point           = "start_dataflow"

  environment_variables = {
    PROJECT_ID            = var.project,
    REGION                = var.region,
    SHARED_VPC            = var.shared_vpc,
    SUBNET_SHARE          = var.subnet_share,
    SERVICE_ACCOUNT_EMAIL = module.service_accounts_bigquery.email
  }
}

## cloud_scheduler
resource "google_cloud_scheduler_job" "mongodb_bigquery_job" {
  depends_on = [google_cloudfunctions_function.mongodb_bigquery_dataflow_function]

  name      = "mongodb-to-bigquery-daily-job"
  region    = var.region
  schedule  = "20 9 * * *" # Daily 09:20 AM
  time_zone = "Asia/Seoul"

  http_target {
    http_method = "POST"
    uri         = google_cloudfunctions_function.mongodb_bigquery_dataflow_function.https_trigger_url
    oidc_token {
      service_account_email = module.service_accounts_bigquery.email
    }
  }
}

 

 

main.tf

import os
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
from flask import jsonify

def start_dataflow(request):
    # HTTP requests are Flask Request objects, so you can use the object's methods to analyze the request data.
    # If you want to receive data as a POST request, or if you want to use another method, modify this part appropriately.
    
    # Importing Environment Variables
    PROJECT_ID = os.environ.get('PROJECT_ID')
    REGION = os.environ.get('REGION')
    SHARED_VPC = os.environ.get('SHARED_VPC')
    SUBNET_SHARE = os.environ.get('SUBNET_SHARE')
    SERVICE_ACCOUNT_EMAIL = os.environ.get('SERVICE_ACCOUNT_EMAIL')

    # Initialize Google Cloud SDK Authentication
    credentials = GoogleCredentials.get_application_default()

    # Creating API clients for Dataflow services
    service = build('dataflow', 'v1b3', credentials=credentials)

    databases = ["production"]  # List of your MongoDB databases

    responses = []

    for database in databases:
        # Setting Dataflow Job Parameters
        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch
        # https://cloud.google.com/dataflow/docs/guides/templates/provided/mongodb-to-bigquery?hl=ko#api
        # https://console.cloud.google.com/storage/browser/_details/dataflow-templates/latest/flex/MongoDB_to_BigQuery;tab=live_object?hl=ko
        job_parameters = {
            "launchParameter": {
                "jobName": f"{database}-to-bigquery-job",
                "parameters": {
                    "mongoDbUri": f"mongodb://mongo:somaz!2023@34.11.11.111:27017", # mongodb://<DB id>:<DB Password>@<DB IP>:<DB Port>
                    "database": database,
                    "collection": "mongologs",
                    "outputTableSpec": f"{PROJECT_ID}:mongodb_dataset.{database}-mongodb-internal-table",
                    "userOption": "FLATTEN"
                },
                "environment": {
                    "tempLocation": "gs://bigquery-sheet-cloud-function-storage/tmp",
                    "network": SHARED_VPC,
                    "subnetwork": f"regions/{REGION}/subnetworks/{SUBNET_SHARE}-mgmt-b",
                    "serviceAccountEmail": SERVICE_ACCOUNT_EMAIL
                },
                "containerSpecGcsPath": 'gs://dataflow-templates/latest/flex/MongoDB_to_BigQuery'
            }
        }

    # error handling
        try:
            # Starting Dataflow Job
            request = service.projects().locations().flexTemplates().launch(
                projectId=PROJECT_ID,
                location=REGION,
                body=job_parameters
            )
            response = request.execute()
            responses.append(response)

        except Exception as e:
            print(f"Error occurred while processing {database}: {e}")
            responses.append({"database": database, "error": str(e)})

    return jsonify(responses)

 

 

requirements.txt

google-api-python-client
oauth2client
Flask<3.0,>=1.0
werkzeug

 


마무리

예시 코드와 유사한 방법으로 MongoDB, CloudSQL, GA, Dune 4가지의 리소스들의 데이터를 Query 또는 API를 사용해서 가져와 구글 시트에 자동으로 입력할 수 있게 자동화하였다.

 

 


Reference

https://github.com/somaz94/terraform-bigquery-googlesheet

https://github.com/somaz94/various-db-query-collections

Google 공식문서 : CloudSQL 연결 만들기

728x90
반응형