surfiniaburger commited on
Commit
603bc54
Β·
1 Parent(s): 78dccad
Files changed (1) hide show
  1. bigquery_uploader.py +67 -16
bigquery_uploader.py CHANGED
@@ -5,6 +5,7 @@ from google.cloud.exceptions import NotFound
5
  import uuid
6
  from datetime import datetime
7
 
 
8
  PROJECT_ID = "gem-creation"
9
  DATASET_ID = "aura_mind_glow_data"
10
  TABLE_ID = "farm_analysis"
@@ -13,10 +14,10 @@ def get_bigquery_client():
13
  """Returns an authenticated BigQuery client."""
14
  try:
15
  client = bigquery.Client(project=PROJECT_ID)
16
- print("Successfully authenticated with BigQuery.")
17
  return client
18
  except Exception as e:
19
- print(f"Error authenticating with BigQuery: {e}")
20
  return None
21
 
22
  def create_dataset_if_not_exists(client):
@@ -24,13 +25,13 @@ def create_dataset_if_not_exists(client):
24
  dataset_id = f"{PROJECT_ID}.{DATASET_ID}"
25
  try:
26
  client.get_dataset(dataset_id) # Make an API request.
27
- print(f"Dataset {dataset_id} already exists.")
28
  except NotFound:
29
- print(f"Dataset {dataset_id} is not found. Creating dataset...")
30
  dataset = bigquery.Dataset(dataset_id)
31
- dataset.location = "US"
32
  dataset = client.create_dataset(dataset, timeout=30) # Make an API request.
33
- print(f"Created dataset {client.project}.{dataset.dataset_id}")
34
 
35
 
36
  def create_table_if_not_exists(client):
@@ -38,9 +39,9 @@ def create_table_if_not_exists(client):
38
  table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
39
  try:
40
  client.get_table(table_id) # Make an API request.
41
- print(f"Table {table_id} already exists.")
42
  except NotFound:
43
- print(f"Table {table_id} is not found. Creating table...")
44
  schema = [
45
  bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"),
46
  bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
@@ -58,13 +59,13 @@ def create_table_if_not_exists(client):
58
  ]
59
  table = bigquery.Table(table_id, schema=schema)
60
  table = client.create_table(table) # Make an API request.
61
- print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")
62
 
63
  def upload_diagnosis_to_bigquery(diagnosis_data: dict):
64
- """Uploads a single diagnosis record to BigQuery."""
65
  client = get_bigquery_client()
66
  if client is None:
67
- print("BigQuery client not available. Cannot upload diagnosis.")
68
  return "BigQuery client not available."
69
 
70
  create_dataset_if_not_exists(client)
@@ -72,7 +73,6 @@ def upload_diagnosis_to_bigquery(diagnosis_data: dict):
72
 
73
  table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
74
 
75
- # Add required fields if not present
76
  if "analysis_id" not in diagnosis_data:
77
  diagnosis_data["analysis_id"] = str(uuid.uuid4())
78
  if "timestamp" not in diagnosis_data:
@@ -81,9 +81,60 @@ def upload_diagnosis_to_bigquery(diagnosis_data: dict):
81
  rows_to_insert = [diagnosis_data]
82
 
83
  errors = client.insert_rows_json(table_id, rows_to_insert)
84
- if errors == []:
85
- print(f"Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully to BigQuery.")
86
  return "Diagnosis uploaded successfully."
87
  else:
88
- print(f"Encountered errors while inserting diagnosis record: {errors}")
89
- return f"Error uploading diagnosis: {errors}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  import uuid
6
  from datetime import datetime
7
 
8
+ # --- Configuration: Set your project, dataset, and table details here ---
9
  PROJECT_ID = "gem-creation"
10
  DATASET_ID = "aura_mind_glow_data"
11
  TABLE_ID = "farm_analysis"
 
14
  """Returns an authenticated BigQuery client."""
15
  try:
16
  client = bigquery.Client(project=PROJECT_ID)
17
+ print("βœ… Successfully authenticated with BigQuery.")
18
  return client
19
  except Exception as e:
20
+ print(f"❌ Error authenticating with BigQuery: {e}")
21
  return None
22
 
23
  def create_dataset_if_not_exists(client):
 
25
  dataset_id = f"{PROJECT_ID}.{DATASET_ID}"
26
  try:
27
  client.get_dataset(dataset_id) # Make an API request.
28
+ print(f"ℹ️ Dataset {dataset_id} already exists.")
29
  except NotFound:
30
+ print(f"🟑 Dataset {dataset_id} not found. Creating dataset...")
31
  dataset = bigquery.Dataset(dataset_id)
32
+ dataset.location = "US" # You can change the location if needed
33
  dataset = client.create_dataset(dataset, timeout=30) # Make an API request.
34
+ print(f"βœ… Created dataset {client.project}.{dataset.dataset_id}")
35
 
36
 
37
  def create_table_if_not_exists(client):
 
39
  table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
40
  try:
41
  client.get_table(table_id) # Make an API request.
42
+ print(f"ℹ️ Table {table_id} already exists.")
43
  except NotFound:
44
+ print(f"🟑 Table {table_id} not found. Creating table...")
45
  schema = [
46
  bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"),
47
  bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
 
59
  ]
60
  table = bigquery.Table(table_id, schema=schema)
61
  table = client.create_table(table) # Make an API request.
62
+ print(f"βœ… Created table {table.project}.{table.dataset_id}.{table.table_id}")
63
 
64
  def upload_diagnosis_to_bigquery(diagnosis_data: dict):
65
+ """Uploads a single diagnosis record (from a dictionary) to BigQuery."""
66
  client = get_bigquery_client()
67
  if client is None:
68
+ print("❌ BigQuery client not available. Cannot upload diagnosis.")
69
  return "BigQuery client not available."
70
 
71
  create_dataset_if_not_exists(client)
 
73
 
74
  table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
75
 
 
76
  if "analysis_id" not in diagnosis_data:
77
  diagnosis_data["analysis_id"] = str(uuid.uuid4())
78
  if "timestamp" not in diagnosis_data:
 
81
  rows_to_insert = [diagnosis_data]
82
 
83
  errors = client.insert_rows_json(table_id, rows_to_insert)
84
+ if not errors:
85
+ print(f"βœ… Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully.")
86
  return "Diagnosis uploaded successfully."
87
  else:
88
+ print(f"❌ Encountered errors while inserting diagnosis record: {errors}")
89
+ return f"Error uploading diagnosis: {errors}"
90
+
91
+
92
+ def upload_csv_to_bigquery(csv_file_path: str):
93
+ """
94
+ Uploads the contents of a CSV file to the specified BigQuery table.
95
+
96
+ Args:
97
+ csv_file_path (str): The local path to the CSV file.
98
+ """
99
+ client = get_bigquery_client()
100
+ if client is None:
101
+ print("❌ BigQuery client not available. Cannot upload CSV.")
102
+ return
103
+
104
+ create_dataset_if_not_exists(client)
105
+ create_table_if_not_exists(client)
106
+
107
+ table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
108
+
109
+ # Configure the load job
110
+ job_config = bigquery.LoadJobConfig(
111
+ source_format=bigquery.SourceFormat.CSV,
112
+ skip_leading_rows=1, # Skip the header row
113
+ # We REMOVE autodetect=True. The job will now use the table's existing schema.
114
+ write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
115
+ )
116
+
117
+ print(f"πŸš€ Starting CSV upload from '{csv_file_path}' to table '{table_id}'...")
118
+
119
+ try:
120
+ with open(csv_file_path, "rb") as source_file:
121
+ load_job = client.load_table_from_file(source_file, table_id, job_config=job_config)
122
+
123
+ load_job.result() # Wait for the job to complete
124
+
125
+ destination_table = client.get_table(table_id)
126
+ # To get the number of rows uploaded in this job, we look at the job's output statistics
127
+ rows_uploaded = load_job.output_rows
128
+ print(f"βœ… Job finished. Loaded {rows_uploaded} new rows. The table '{table_id}' now has a total of {destination_table.num_rows} rows.")
129
+ return "CSV upload successful."
130
+ except Exception as e:
131
+ print(f"❌ An error occurred during the CSV upload: {e}")
132
+ return f"Error during CSV upload: {e}"
133
+
134
+
135
+ if __name__ == "__main__":
136
+ csv_file_to_upload = "farm_analysis_data.csv"
137
+
138
+ print("--- Running BigQuery CSV Uploader Test ---")
139
+ upload_csv_to_bigquery(csv_file_to_upload)
140
+ print("--- Test complete ---")