How Do I Trigger A Scheduled Query When The GA4 Daily Export Happens?

Simmer Clips #3

How do I trigger a scheduled query when the GA4 daily export happens?
The question we’re going to be looking at today is inspired by our course, Query GA4 Data in Google BigQuery.

How do I run a scheduled query in BigQuery as soon as the Google Analytics 4 daily export is complete for any given day?

If you’ve ever tried to build any sort of logic on top of the Google Analytics 4 daily export to Google BigQuery, you might have noticed one very annoying thing.

The daily export schedule is erraticย at best. On one day, it might land in BigQuery before noon, on another day late in the afternoon, and on some other day it might arrive past midnight your time the following day!

Add to this the fact that it looks like there can be more than oneย load job from the service account associated with the export, and you have something of a mess on your hands.

Today’s question in Simmer Clips deals with this dilemma. I’ll help you solve it in a more deterministic way: instead of trying to figure out a time of day when it’s “safest” to run your queries, I’ll show you how you can build a simple pipelineย with a sole purpose:

Run a scheduled queryย as soon as the daily export job is complete.

Along the way, you’ll learn about some really coolย Google Cloud Platform components, such as Pub/Subย and Cloud Functions.

Video walkthrough

If you prefer this walkthrough inย video format, you can check out the video below.

Don’t forget to subscribe to the Simmer YouTube channel for more content like this.

Overview of the problem

Before we get started, let’s take a look at the problem.

Run the following query in your project.

You need the change the region specifier `region-eu` to whatever matches the Google Cloud region where your GA4 / BigQuery dataset resides.

Also, change theย dataset ID in the WHERE clause to match your Google Analytics 4 export dataset ID.

				
					SELECT
  FORMAT_DATE('%F', (
    SELECT
      creation_time)) AS creation_date,
  MIN(FORMAT_DATE('%R', (
    SELECT
      creation_time))) AS creation_time_UTC,
  destination_table.table_id,
  total_bytes_processed,
  COUNT(*) AS load_jobs
FROM
  `region-eu`.INFORMATION_SCHEMA.JOBS
WHERE
  start_time > TIMESTAMP(DATE_SUB(current_date, INTERVAL 7 day))
  AND job_type = 'LOAD'
  AND destination_table.dataset_id = 'my_dataset'
GROUP BY
  1, 
  3, 
  4
ORDER BY
  creation_date, 
  creation_time_UTC
				
			

When you run this query, you should see something like this:

This table lists the load jobsย from Google Analytics 4 over the past 7 days.ย 

The main thing to focus on is the column creation_time_UTC. As you can see, during this 7-day stretch the load job times were actually quite consistent, with “only” four hours apart for most of them. However, the load job for the table events_20221201ย happened at 6:39 a.m. the following day. That’s messed up!ย 

If I had a scheduled query that ran at 3 p.m. UTCย every day, which based on the data above would be quite a good decision, it would have missed this latecomer and I’d have a gap in my target dataset.

In order to fix this, we have to:

  • Modify ourย scheduled query so that it only runsย on demand (i.e. when manually triggered) and not with a schedule.
  • Build a trigger system that runs the scheduled queryย as soon as a GA4 load job happens.
  • Use theย table_id from the load job to determine what is “today”.

This last point is very important. For our scheduled query to be correctly aligned in time, you can’t use the date of the export load job. Otherwise you would place the data in the wrong daily shard, if the daily job updated a table further in the past.

What we'll build

Here’s a rough overview of the system that we’ll build.

  1. When the BigQuery load job completes, it generates a log entryย into Google Cloud Logging (this happens automatically).
  2. In Cloud Logging, we build a Logs Router, which sends the load job log entry into a Pub/Sub topic.
  3. A Cloud Functionย is run when the Pub/Sub topic receives the log. The Cloud Function checks the destination table ID from the log entry, and then it runs the scheduled query in Google BigQuery, setting the target date to match that of the table ID.

There might be other, more elegant ways to do this. However, using this system gives us an opportunity to learn about some useful Google Cloud Platform components and services.

1. Create the scheduled query

The first thing you’ll need to do is build the scheduled query itself. The easiest way to do this is to build the query and then save it as a scheduled query using the BigQuery console user interface.

Here’s the query I’m going to use. It’s a simpleย flatten query, which takes some key dimensions from the GA4 export and flattens them into aย user data table I can then use later for joins.

The focus of this article isn’t what I have in the query but rather how I orchestrate it. Take a look:

				
					SELECT
  COALESCE(user_id, user_pseudo_id) AS user_id,
  (SELECT
      value.int_value
    FROM
      UNNEST(event_params)
    WHERE
      key = 'ga_session_id') AS session_id,
  traffic_source.source AS acquisition_source,
  traffic_source.medium AS acquisition_medium,
  ARRAY_AGG((SELECT
      value.string_value
    FROM
      UNNEST(event_params)
    WHERE
      key = 'source') IGNORE NULLS)[OFFSET(0)] AS session_source,
  ARRAY_AGG((SELECT
      value.string_value
    FROM
      UNNEST(event_params)
    WHERE
      key = 'medium') IGNORE NULLS)[OFFSET(0)] AS session_medium,
  SUM(ecommerce.purchase_revenue) AS total_session_revenue
FROM
  `project.my_dataset.events_*`
WHERE
  _table_suffix = FORMAT_DATE('%Y%m%d', DATE_SUB(@run_date, INTERVAL 1 DAY))
  AND COALESCE(user_id, user_pseudo_id) IS NOT null
GROUP BY
  1, 2, 3, 4
ORDER BY
  session_id ASC
				
			

The purpose of this query is to generate aย flattened table of the GA4 export. It generates a table of users and sessions, including the originalย acquisition campaign of the user as well as theย session source/medium (by taking the first non-null source/medium combination of any given session and applying it to the entire session).

The key part of this query is in theย WHERE clause. See theย @run_date thing? That’s automatically populated by theย run time of the scheduled query.

What you’ll learn later is that we can actuallyย manipulate this run time to match the table that the GA4 data was loaded to. This is an absolutely critical part of this process because it allows us to safely run the scheduled query onย backfilled data too.

This is a sample output of the query:

When you’re happy with the query, click theย SCHEDULE button in the BigQuery console top button and thenย Create new scheduled query.

The three options you need to pay attention to in the scheduled query settings are:

  1. Set the repeatsย option to On-demand. We want to run this query only when triggered manually (i.e. by the Cloud Function we’ll build later).
  2. Use a variableย in the Destination Table ID.
  3. Set the query to Overwrite rather than Append.

Point (2) is important. If you want the table ID to have a similar YYYYMMDDย suffix as the GA4 export, you can set the field to something like this:

events_{run_time-24h|"%Y%m%d"}

Here, the syntax in the curly braces basically takes theย day before the scheduled query run time and generates aย YYYYMMDD string from it. That way if the run time of the scheduled query isย May 25th, 2023, the destination table ID for the scheduled query becomes:

events_20230524

And this will match the destination table ID of the GA4 export, too, as you’ll see when we build the Cloud Function.

Set the query toย Overwrite the destination table. This is because the source of the query is theย entire GA4 daily table, so it doesn’t matter if you overwrite the destination table each time the query is run. As I mentioned earlier, the GA4 daily job can happen multiple times per day, and it’s possible for backfilling to happen, too.

By having theย Overwrite option selected, it ensures the flattened table is always generated from the most recent state of the source table (the GA4 daily export table).

2. Create the Logs Router

We now have our scheduled query built, waiting to be triggered with the GA4 load job.

But that’s theย end of the pipeline. We need to build everything else now.

Theย Logs Router looks for a log entry that is generated when a GA4 daily load job is complete. We use this as the trigger event for our entire pipeline setup.

This log entry is then shipped to aย Pub/Sub topic, which starts the Cloud Function.

Now, head over to Logs Router and click theย CREATE SINK button in the top to create a new router sink.

  1. Give the sink a name.
  2. Chooseย Google Cloud Pub/Sub topic as the destination.
  3. From the list of available Pub/Sub topics, click toย create a new topic.
  1. Give the topic and ID, and leave all the other settings as their defaults. Clickย CREATE TOPIC.
  2. In theย Build inclusion filter, copy-paste the following code. Modify the dataset IDย on line 3ย to match the dataset ID of your GA4 export dataset.
				
					protoPayload.methodName="jobservice.jobcompleted" 
protoPayload.authenticationInfo.principalEmail="firebase-measurement@system.gserviceaccount.com"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.datasetId="my_dataset"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId=~"^events_\d+"
				
			
  1. Clickย CREATE SINK once you’re done.

The inclusion filter looks for logs that have the methodย jobservice.jobcompleted and were specifically generated by theย firebase-measurement@... service account. This is the account that performs all the Google Analytics 4 export jobs.

Here’s what the Logs Router should look like:

Just one thing remains! We need to build theย Google Cloud Function that is subscribed to the Pub/Sub topic receiving these load job log entries.

3. Build the Google Cloud Function

Google Cloud Functions are serverless, well, functions that can be run on-demand in the cloud. Their purpose is to perform small, low-intensity tasks at scale.

In this case, we want the Cloud Function to subscribe to the Pub/Sub topic we created above.

We then want it to parse the log entry for theย destination table details, so that we can calibrate the run time of the scheduled query to match the date of the table that the GA4 daily job created (or updated).

So, head on over to Cloud Functions and clickย CREATE FUNCTION to get started.

  1. Keep environment asย 1st gen.
  2. Give the function a descriptive name.
  3. Choose aย region for the function โ€“ this can be anything you like but it would make sense to have the regions of your BigQuery datasets and related Google Cloud components be the same.
  4. Chooseย Cloud Pub/Sub as the trigger type.
  5. Select theย Pub/Sub topic you created in the previous chapter as the trigger.
  6. Check the boxย Retry on failure (this retries the Cloud Function up to 7 days in case it fails in an error).
  7. Clickย SAVEย to save the trigger settings.

This is what it should look like:

  1. Ignore theย Runtime, build, connections and security settings accordion and clickย NEXT to continue.

In the next screen, we need to add the function code itself. This is what gets executed when the function is run.

  1. Keepย Node.js as the runtime (choose the latest non-Preview version).
  2. Clickย package.json to edit its contents.
  3. Add the following line in theย "dependencies" property:

ย "@google-cloud/bigquery-data-transfer": "^3.1.3"

This is whatย package.json should look like (the version numbers might be different when you try this solution):

  1. Selectย index.js from the list of files to activate the code editor.
  2. Edit theย Entry point field on top of the editor to beย runScheduledQuery.
  3. Copy-paste the following code into the editor.
				
					const bigqueryDataTransfer = require('@google-cloud/bigquery-data-transfer');

exports.runScheduledQuery = async (event, context) => {
  // Update configuration options
  const projectId = '';
  const configId = '';
  const region = '';

  // Load the log data from the buffer
  const eventData = JSON.parse(Buffer.from(event.data, 'base64').toString());
  const destinationTableId = eventData.protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId;
  
  // Grab the table date and turn it into the run time for the scheduled query
  const tableTime = destinationTableId.replace('events_', '');
  const year = tableTime.substring(0, 4),
        month = tableTime.substring(4, 6),
        day = tableTime.substring(6, 8);
  // Set the run time for the day after the table date so that the scheduled query works with "yesterday's" data
  const runTime = new Date(Date.UTC(year, month - 1, parseInt(day) + 1, 12));
  // Create a proto-buffer Timestamp object from this
  const requestedRunTime = bigqueryDataTransfer.protos.google.protobuf.Timestamp.fromObject({
    seconds: runTime / 1000,
    nanos: (runTime % 1000) * 1e6
  });
  
  const client = new bigqueryDataTransfer.v1.DataTransferServiceClient();
  const parent = client.projectLocationTransferConfigPath(projectId, region, configId);
  
  const request = {
    parent,
    requestedRunTime
  };

  const response = await client.startManualTransferRuns(request);
  return response;
};
				
			
  1. Change theย projectId value to match the Google Cloud Platform project ID where your build yourย scheduled query.
  2. To get values forย region andย configId, browse to scheduled queries, open your scheduled query, and click theย Configuration tabย to view its details.
  3. region value should be the Google Cloud region of the Destination dataset, so click through to that to check if you don’t remember what it was.
  4. configIdย is the GUID at the end of the Resource nameย of the scheduled query.
This is what my function looks like:

My destination dataset isย multi-region EU, hence theย 'eu' value forย region. If your dataset is in a specific region, the value would be something likeย us-west1, for example.

Once you’re happy, clickย DEPLOY in the bottom of the screen to deploy the function.

The purpose of the function is to take the log entry and parse it for theย destination table ID (events_YYYYMMDD).

The date part is then extracted, and a new proto-bufferย Timestamp object is created from that date object, as this is required if you want to manually set the run time of the query.

The only special thing we do is moveย one day forward in time, because we want the run time to beย one day after the destination table ID that was updated.

This is to mirror how the GA4 daily export works โ€“ it always updatesย yesterday’s table (or a table further in the past).

When you click DEPLOY, it will take a moment for the function to be ready for action. Once the spinner stops spinning and you see a green checkmark, you’re good to go.

4. Test the setup

I couldn’t figure out a way to manually test the Cloud Function, as it would require generating an identical log entry as those created by theย GA4 export service account.

If you have an idea how to test this with a mock log entry, please let me know in the comments!

There are typically 2โ€“3 daily load jobs from GA4, so you’ll need to wait some hours in the afternoon for a load job to be generated.ย 

The best way to check if your scheduled queries are running is to go to the BigQuery user interface and check theย PROJECT HISTORY tab at the bottom of the screen.

Eachย LOAD job where the owner isย firebase-measurement@... should be followed shortly by aย QUERY job with a Job ID that startsย scheduled_query.... If you see this, it means your setup is working, and you can verify this by visiting the dataset where your scheduled query writes its output to.

If youย don’t see this, there’s a problem somewhere along the line, and a good first step is to visitย Logs Explorer and look for error logs related to one or more of the components in your setup.

Typical culprits are an incorrectly configuredย Logs Router or a mistake in theย Cloud Function.

Summary

It might be a good idea to double-check your setup by watching the video as well.

As far as Google Cloud Platform pipelines go, this isย not a complicated setup. You actually only had to configureย three things:

  • Theย scheduled query that you want to run on-demand,
  • theย Logs Router that listens for load job complete entries,
  • and theย Cloud Functionย that triggers with the log entry and runs the scheduled query.

Yes, you created aย Pub/Sub topic during the process, but the beauty of the Google Cloud Platform is that this was embedded in the flow of setting up the pipeline itself, and you never had to actually configure the Pub/Sub topic or its subscriptions separately, it was all done without having to break your stride.

This is why the Google Cloud Platform is such a pleasure to work with. The way it guides the user through interconnected components is way ahead of its competition (in my opinion).

I hope you found this informational, and I hope you’ve been bitten by the Google Cloud Platform bug (if you weren’t a fan already). Let me know in the comments if you have any questions about the setup!

If you’ve built this funnel some other way, I’d love to hear more about your approach, too.

11 thoughts on “How Do I Trigger A Scheduled Query When The GA4 Daily Export Happens?”

  1. Hi Simo –

    Just wanted to express my thanks for this blog post. I appreciate the knowledge sharing of multiple different aspects of GCP. Currently building a pipeline to send data from GA4/BigQuery to Redshift, and was able to extrapolate my process from the lessons in this video.

    My aspiration is to move from doing data analytics to data engineering and this is just what I needed!

    Happy Holidays from NYC ๐Ÿ™‚

  2. Hi Simo, thanks for this nice article. I implemented the solution and it works as expected. However, I would like the destination table to be a date-partitioned table rather than a sharded table. How would you suggest to approach this challenge? Is this something that should be changed in the logic of the cloud function?

  3. Hi Simo and thx for this,
    I run the load-jobs query on my dataset, but I found something different from expected.
    In particular, it looks like everyday the load is performed three times minimum, going back in time.
    For example: “2022-12-05” (creation_date) is associated with “events_20221202”, “events_20221203” and “events_20221204” (table_ids). Is this because of the 72h post-processing of GA4 ?
    Moreover, for certain “creation_dates”, I find 6 table_ids associated with those! For example, creation_date = “2022-12-04” and table_id = “events_20221130” or “events_20221101”.
    I expected a relationship like “creation_date” associated with the previous day “table_id” only, like your example above. Thanks!

    1. Hi!

      Yes, there’s a 72h update window so historical updates are not out of the question. Per Google’s documentation, sometimes the historical updates can go even further back if they’re fixing bugs in data collection retroactively, for example. That’s why the Cloud Function gets the run time from the destination table ID and *not* from the creation_time of the daily job!

      Also, multiple loads per day is the norm โ€“ย for some reason the same table gets loaded more than once with the export job.

  4. Hi! Really insightful blog post, thanks for this! I was wondering whether you also have a way of incorporating a backfill with this set-up?

    I have some tables paired with manual scheduled queries that I want to switch over to this set-up. Obviously these already contain quite some data accumulated from the past and I do not want to lose this data. Perhaps an easy task, but it does not seem that straightforward to me.

  5. A poor man’s solution (avoiding cloud functions and super precise timings) – you could maybe create a sink of the update events into a BQ table and then have your scheduled query / stored procedure handle the create/replace. This could be less scary for those unfamiliar with Node/Python or if your cloud team doesn’t want you stepping outside the “walled garden” of BigQuery. It’s definitely less flexible going this route could reduce fear factor for lazy martechies, “non-technical marketers” or disaffected data scientists. The scheduled query could be set to run at whatever frequency and while it wouldn’t be run immediately it could do a simple check every hour or so and be timely enough for most use cases. Getting those historic changes accounted for is the most critical / stressful imo…

    1. Hey Adam!

      Yeah, there are many ways to skin the cat ๐Ÿ™‚ A slightly “richer man’s solution” would be to use Gen2 of GCF and EventArc, although that might not actually be any simpler, even if it’s somewhat more straightforward.

      But one of the key values of Simmer is to destroy this myth of the “non-technical marketer”. I believe in not sugarcoating things when the payoff for learning things the hard way outweighs other solutions. That’s one of the reasons I think this article could be valuable โ€“ย it solves the problem, yes, but it also introduces the reader to a number of components in the GCP stack that can be really valuable in future data engineering work. Hopefully I manage to demystify them somewhat!

      Thanks for the feedback โ€“ย your approach is sound but as you mention it does still surface the problem of historical updates and avoiding unnecessary query runs.

Leave a Comment

Hide picture