Skip to content

BigQuery

Client for Google Cloud BigQuery API.

Getting Started

To get started with sbt, add the dependency to your project in build.sbt

scala
libraryDependencies ++= Seq(
  "com.anymindgroup" %% "zio-gcp-auth" % "latest",
  "com.anymindgroup" %% "zio-gcp-bigquery-v2" % "latest"
)

Usage examples

scala
import zio.*, zio.ZIO.{logError, logInfo}
import com.anymindgroup.gcp.bigquery.v2.resources.*
import com.anymindgroup.gcp.bigquery.v2.schemas.*
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker

// Authenticate with Application Default Credentials that include BigQuery scopes:
// gcloud auth application-default login \
//   --scopes=openid,https://www.googleapis.com/auth/userinfo.email,https://www.googleapis.com/auth/cloud-platform
//
// Run with: sbt "examplesJVM/runMain bigquery_v2_example"
// Requires environment variable GCP_PROJECT_ID to be set.
private given JsonValueCodec[String] = JsonCodecMaker.make

object bigquery_v2_example extends ZIOAppDefault:
  val datasetId = "zio_gcp_example"
  val tableId   = "employees"

  def run =
    for
      projectId <- System
                     .env("GCP_PROJECT_ID")
                     .someOrFail(new RuntimeException("GCP_PROJECT_ID env var is required"))
      backend <- com.anymindgroup.gcp.auth.defaultAccessTokenBackend()

      // 1. Create a dataset
      _ <- backend
             .send(
               Datasets.insert(
                 projectsId = projectId,
                 request = Dataset(
                   datasetReference = DatasetReference(
                     datasetId = datasetId,
                     projectId = Some(projectId),
                   ),
                   description = Some("zio-gcp BigQuery example dataset"),
                   location = Some("US"),
                 ),
               )
             )
             .flatMap:
               _.body match
                 case Right(ds) => logInfo(s"Created dataset: ${ds.datasetReference.datasetId}")
                 case Left(err) => logError(s"Failed to create dataset: $err")

      // 2. Create a table with a schema
      _ <- backend
             .send(
               Tables.insert(
                 projectsId = projectId,
                 datasetsId = datasetId,
                 request = Table(
                   tableReference = TableReference(
                     projectId = projectId,
                     datasetId = datasetId,
                     tableId = tableId,
                   ),
                   description = Some("Example employees table"),
                   schema = Some(
                     TableSchema(
                       fields = Some(
                         Chunk(
                           TableFieldSchema(name = "id", `type` = "INTEGER", mode = Some("REQUIRED")),
                           TableFieldSchema(name = "name", `type` = "STRING", mode = Some("REQUIRED")),
                           TableFieldSchema(name = "city", `type` = "STRING", mode = Some("NULLABLE")),
                           TableFieldSchema(name = "score", `type` = "FLOAT64", mode = Some("NULLABLE")),
                         )
                       )
                     )
                   ),
                 ),
               )
             )
             .flatMap:
               _.body match
                 case Right(tbl) => logInfo(s"Created table: ${tbl.tableReference.tableId}")
                 case Left(err)  => logError(s"Failed to create table: $err")

      // 3. Insert rows via a DML INSERT query
      insertQuery =
        s"""INSERT INTO `$projectId.$datasetId.$tableId` (id, name, city, score)
           |VALUES
           |  (1, 'Alice',  'Tokyo',     9.5),
           |  (2, 'Bob',    'Singapore', 8.2),
           |  (3, 'Carol',  'Bangkok',   7.8)""".stripMargin
      _ <- backend
             .send(
               Jobs.query(
                 projectsId = projectId,
                 request = QueryRequest(
                   query = insertQuery,
                   useLegacySql = Some(false),
                   timeoutMs = Some(30000),
                 ),
               )
             )
             .flatMap:
               _.body match
                 case Right(resp) =>
                   logInfo(s"Inserted rows, job complete: ${resp.jobComplete.getOrElse(false)}")
                 case Left(err) => logError(s"Failed to insert rows: $err")

      // 4. Stream additional rows via Tabledata.insertAll
      //    Note: the `json` field for row data (JsonObject) is not yet generated by the codegen.
      //    Only the deduplication insertId is available in TableDataInsertAllRequestRows.
      _ <- backend
             .send(
               Tabledata.insertAll(
                 projectsId = projectId,
                 datasetsId = datasetId,
                 tablesId = tableId,
                 request = TableDataInsertAllRequest(
                   rows = Chunk(
                     Some(TableDataInsertAllRequestRows(insertId = Some("row-4"))),
                     Some(TableDataInsertAllRequestRows(insertId = Some("row-5"))),
                   ),
                   skipInvalidRows = Some(true),
                 ),
               )
             )
             .flatMap:
               _.body match
                 case Right(resp) =>
                   val errors = resp.insertErrors.getOrElse(Chunk.empty)
                   if errors.isEmpty then logInfo("Streaming insert request sent successfully")
                   else logError(s"Streaming insert errors: $errors")
                 case Left(err) => logError(s"Failed to stream rows: $err")

      // 5. Query the inserted data back
      selectQuery =
        s"SELECT id, name, city, score FROM `$projectId.$datasetId.$tableId` ORDER BY id"
      _ <- backend
             .send(
               Jobs.query(
                 projectsId = projectId,
                 request = QueryRequest(
                   query = selectQuery,
                   useLegacySql = Some(false),
                   timeoutMs = Some(30000),
                 ),
               )
             )
             .flatMap:
               _.body match
                 case Right(resp) =>
                   val resultRows = resp.rows.getOrElse(Chunk.empty)
                   logInfo(s"Query returned ${resultRows.size} rows:") *>
                     ZIO.foreachDiscard(resultRows): row =>
                       val cells = row.f
                         .getOrElse(Chunk.empty)
                         .map(cell => cell.v.readAs[String].getOrElse("null"))
                         .mkString(" | ")
                       logInfo(s"  $cells")
                 case Left(err) => logError(s"Failed to query data: $err")

      // 6. Clean up: delete the table and dataset
      _ <- backend
             .send(Tables.delete(projectsId = projectId, datasetsId = datasetId, tablesId = tableId))
             .flatMap:
               _.body match
                 case Right(_)  => logInfo(s"Deleted table: $tableId")
                 case Left(err) => logError(s"Failed to delete table: $err")
      _ <- backend
             .send(Datasets.delete(projectsId = projectId, datasetsId = datasetId))
             .flatMap:
               _.body match
                 case Right(_)  => logInfo(s"Deleted dataset: $datasetId")
                 case Left(err) => logError(s"Failed to delete dataset: $err")
    yield ()