BigQuery
Client for Google Cloud BigQuery API.
Getting Started
To get started with sbt, add the dependency to your project in build.sbt
scala
libraryDependencies ++= Seq(
"com.anymindgroup" %% "zio-gcp-auth" % "latest",
"com.anymindgroup" %% "zio-gcp-bigquery-v2" % "latest"
)Usage examples
scala
import zio.*, zio.ZIO.{logError, logInfo}
import com.anymindgroup.gcp.bigquery.v2.resources.*
import com.anymindgroup.gcp.bigquery.v2.schemas.*
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
// Authenticate with Application Default Credentials that include BigQuery scopes:
// gcloud auth application-default login \
// --scopes=openid,https://www.googleapis.com/auth/userinfo.email,https://www.googleapis.com/auth/cloud-platform
//
// Run with: sbt "examplesJVM/runMain bigquery_v2_example"
// Requires environment variable GCP_PROJECT_ID to be set.
private given JsonValueCodec[String] = JsonCodecMaker.make
object bigquery_v2_example extends ZIOAppDefault:
val datasetId = "zio_gcp_example"
val tableId = "employees"
def run =
for
projectId <- System
.env("GCP_PROJECT_ID")
.someOrFail(new RuntimeException("GCP_PROJECT_ID env var is required"))
backend <- com.anymindgroup.gcp.auth.defaultAccessTokenBackend()
// 1. Create a dataset
_ <- backend
.send(
Datasets.insert(
projectsId = projectId,
request = Dataset(
datasetReference = DatasetReference(
datasetId = datasetId,
projectId = Some(projectId),
),
description = Some("zio-gcp BigQuery example dataset"),
location = Some("US"),
),
)
)
.flatMap:
_.body match
case Right(ds) => logInfo(s"Created dataset: ${ds.datasetReference.datasetId}")
case Left(err) => logError(s"Failed to create dataset: $err")
// 2. Create a table with a schema
_ <- backend
.send(
Tables.insert(
projectsId = projectId,
datasetsId = datasetId,
request = Table(
tableReference = TableReference(
projectId = projectId,
datasetId = datasetId,
tableId = tableId,
),
description = Some("Example employees table"),
schema = Some(
TableSchema(
fields = Some(
Chunk(
TableFieldSchema(name = "id", `type` = "INTEGER", mode = Some("REQUIRED")),
TableFieldSchema(name = "name", `type` = "STRING", mode = Some("REQUIRED")),
TableFieldSchema(name = "city", `type` = "STRING", mode = Some("NULLABLE")),
TableFieldSchema(name = "score", `type` = "FLOAT64", mode = Some("NULLABLE")),
)
)
)
),
),
)
)
.flatMap:
_.body match
case Right(tbl) => logInfo(s"Created table: ${tbl.tableReference.tableId}")
case Left(err) => logError(s"Failed to create table: $err")
// 3. Insert rows via a DML INSERT query
insertQuery =
s"""INSERT INTO `$projectId.$datasetId.$tableId` (id, name, city, score)
|VALUES
| (1, 'Alice', 'Tokyo', 9.5),
| (2, 'Bob', 'Singapore', 8.2),
| (3, 'Carol', 'Bangkok', 7.8)""".stripMargin
_ <- backend
.send(
Jobs.query(
projectsId = projectId,
request = QueryRequest(
query = insertQuery,
useLegacySql = Some(false),
timeoutMs = Some(30000),
),
)
)
.flatMap:
_.body match
case Right(resp) =>
logInfo(s"Inserted rows, job complete: ${resp.jobComplete.getOrElse(false)}")
case Left(err) => logError(s"Failed to insert rows: $err")
// 4. Stream additional rows via Tabledata.insertAll
// Note: the `json` field for row data (JsonObject) is not yet generated by the codegen.
// Only the deduplication insertId is available in TableDataInsertAllRequestRows.
_ <- backend
.send(
Tabledata.insertAll(
projectsId = projectId,
datasetsId = datasetId,
tablesId = tableId,
request = TableDataInsertAllRequest(
rows = Chunk(
Some(TableDataInsertAllRequestRows(insertId = Some("row-4"))),
Some(TableDataInsertAllRequestRows(insertId = Some("row-5"))),
),
skipInvalidRows = Some(true),
),
)
)
.flatMap:
_.body match
case Right(resp) =>
val errors = resp.insertErrors.getOrElse(Chunk.empty)
if errors.isEmpty then logInfo("Streaming insert request sent successfully")
else logError(s"Streaming insert errors: $errors")
case Left(err) => logError(s"Failed to stream rows: $err")
// 5. Query the inserted data back
selectQuery =
s"SELECT id, name, city, score FROM `$projectId.$datasetId.$tableId` ORDER BY id"
_ <- backend
.send(
Jobs.query(
projectsId = projectId,
request = QueryRequest(
query = selectQuery,
useLegacySql = Some(false),
timeoutMs = Some(30000),
),
)
)
.flatMap:
_.body match
case Right(resp) =>
val resultRows = resp.rows.getOrElse(Chunk.empty)
logInfo(s"Query returned ${resultRows.size} rows:") *>
ZIO.foreachDiscard(resultRows): row =>
val cells = row.f
.getOrElse(Chunk.empty)
.map(cell => cell.v.readAs[String].getOrElse("null"))
.mkString(" | ")
logInfo(s" $cells")
case Left(err) => logError(s"Failed to query data: $err")
// 6. Clean up: delete the table and dataset
_ <- backend
.send(Tables.delete(projectsId = projectId, datasetsId = datasetId, tablesId = tableId))
.flatMap:
_.body match
case Right(_) => logInfo(s"Deleted table: $tableId")
case Left(err) => logError(s"Failed to delete table: $err")
_ <- backend
.send(Datasets.delete(projectsId = projectId, datasetsId = datasetId))
.flatMap:
_.body match
case Right(_) => logInfo(s"Deleted dataset: $datasetId")
case Left(err) => logError(s"Failed to delete dataset: $err")
yield ()