How Do I Trigger A Scheduled Query When The GA4 Daily Export Happens?

Simmer Clips #3

How do I trigger a scheduled query when the GA4 daily export happens?
The question we’re going to be looking at today is inspired by our course, Query GA4 Data in Google BigQuery.

How do I run a scheduled query in BigQuery as soon as the Google Analytics 4 daily export is complete for any given day?

If you’ve ever tried to build any sort of logic on top of the Google Analytics 4 daily export to Google BigQuery, you might have noticed one very annoying thing.

The daily export schedule is erratic at best. On one day, it might land in BigQuery before noon, on another day late in the afternoon, and on some other day it might arrive past midnight your time the following day!

Add to this the fact that it looks like there can be more than one load job from the service account associated with the export, and you have something of a mess on your hands.

Today’s question in Simmer Clips deals with this dilemma. I’ll help you solve it in a more deterministic way: instead of trying to figure out a time of day when it’s “safest” to run your queries, I’ll show you how you can build a simple pipeline with a sole purpose:

Run a scheduled query as soon as the daily export job is complete.

Along the way, you’ll learn about some really cool Google Cloud Platform components, such as Pub/Sub and Cloud Functions.

Video walkthrough

If you prefer this walkthrough in video format, you can check out the video below.

Don’t forget to subscribe to the Simmer YouTube channel for more content like this.

Overview of the problem

Before we get started, let’s take a look at the problem.

Run the following query in your project.

You need the change the region specifier `region-eu` to whatever matches the Google Cloud region where your GA4 / BigQuery dataset resides.

Also, change the dataset ID in the WHERE clause to match your Google Analytics 4 export dataset ID.

				
					SELECT
  FORMAT_DATE('%F', (
    SELECT
      creation_time)) AS creation_date,
  MIN(FORMAT_DATE('%R', (
    SELECT
      creation_time))) AS creation_time_UTC,
  destination_table.table_id,
  total_bytes_processed,
  COUNT(*) AS load_jobs
FROM
  `region-eu`.INFORMATION_SCHEMA.JOBS
WHERE
  start_time > TIMESTAMP(DATE_SUB(current_date, INTERVAL 7 day))
  AND job_type = 'LOAD'
  AND destination_table.dataset_id = 'my_dataset'
GROUP BY
  1, 
  3, 
  4
ORDER BY
  creation_date, 
  creation_time_UTC
				
			

When you run this query, you should see something like this:

This table lists the load jobs from Google Analytics 4 over the past 7 days. 

The main thing to focus on is the column creation_time_UTC. As you can see, during this 7-day stretch the load job times were actually quite consistent, with “only” four hours apart for most of them. However, the load job for the table events_20221201 happened at 6:39 a.m. the following day. That’s messed up! 

If I had a scheduled query that ran at 3 p.m. UTC every day, which based on the data above would be quite a good decision, it would have missed this latecomer and I’d have a gap in my target dataset.

In order to fix this, we have to:

  • Modify our scheduled query so that it only runs on demand (i.e. when manually triggered) and not with a schedule.
  • Build a trigger system that runs the scheduled query as soon as a GA4 load job happens.
  • Use the table_id from the load job to determine what is “today”.

This last point is very important. For our scheduled query to be correctly aligned in time, you can’t use the date of the export load job. Otherwise you would place the data in the wrong daily shard, if the daily job updated a table further in the past.

What we'll build

Here’s a rough overview of the system that we’ll build.

  1. When the BigQuery load job completes, it generates a log entry into Google Cloud Logging (this happens automatically).
  2. In Cloud Logging, we build a Logs Router, which sends the load job log entry into a Pub/Sub topic.
  3. A Cloud Function is run when the Pub/Sub topic receives the log. The Cloud Function checks the destination table ID from the log entry, and then it runs the scheduled query in Google BigQuery, setting the target date to match that of the table ID.

There might be other, more elegant ways to do this. However, using this system gives us an opportunity to learn about some useful Google Cloud Platform components and services.

1. Create the scheduled query

The first thing you’ll need to do is build the scheduled query itself. The easiest way to do this is to build the query and then save it as a scheduled query using the BigQuery console user interface.

Here’s the query I’m going to use. It’s a simple flatten query, which takes some key dimensions from the GA4 export and flattens them into a user data table I can then use later for joins.

The focus of this article isn’t what I have in the query but rather how I orchestrate it. Take a look:

				
					SELECT
  COALESCE(user_id, user_pseudo_id) AS user_id,
  (SELECT
      value.int_value
    FROM
      UNNEST(event_params)
    WHERE
      key = 'ga_session_id') AS session_id,
  traffic_source.source AS acquisition_source,
  traffic_source.medium AS acquisition_medium,
  ARRAY_AGG((SELECT
      value.string_value
    FROM
      UNNEST(event_params)
    WHERE
      key = 'source') IGNORE NULLS)[OFFSET(0)] AS session_source,
  ARRAY_AGG((SELECT
      value.string_value
    FROM
      UNNEST(event_params)
    WHERE
      key = 'medium') IGNORE NULLS)[OFFSET(0)] AS session_medium,
  SUM(ecommerce.purchase_revenue) AS total_session_revenue
FROM
  `project.my_dataset.events_*`
WHERE
  _table_suffix = FORMAT_DATE('%Y%m%d', DATE_SUB(@run_date, INTERVAL 1 DAY))
  AND COALESCE(user_id, user_pseudo_id) IS NOT null
GROUP BY
  1, 2, 3, 4
ORDER BY
  session_id ASC
				
			

The purpose of this query is to generate a flattened table of the GA4 export. It generates a table of users and sessions, including the original acquisition campaign of the user as well as the session source/medium (by taking the first non-null source/medium combination of any given session and applying it to the entire session).

The key part of this query is in the WHERE clause. See the @run_date thing? That’s automatically populated by the run time of the scheduled query.

What you’ll learn later is that we can actually manipulate this run time to match the table that the GA4 data was loaded to. This is an absolutely critical part of this process because it allows us to safely run the scheduled query on backfilled data too.

This is a sample output of the query:

When you’re happy with the query, click the SCHEDULE button in the BigQuery console top button and then Create new scheduled query.

The three options you need to pay attention to in the scheduled query settings are:

  1. Set the repeats option to On-demand. We want to run this query only when triggered manually (i.e. by the Cloud Function we’ll build later).
  2. Use a variable in the Destination Table ID.
  3. Set the query to Overwrite rather than Append.

Point (2) is important. If you want the table ID to have a similar YYYYMMDD suffix as the GA4 export, you can set the field to something like this:

events_{run_time-24h|"%Y%m%d"}

Here, the syntax in the curly braces basically takes the day before the scheduled query run time and generates a YYYYMMDD string from it. That way if the run time of the scheduled query is May 25th, 2023, the destination table ID for the scheduled query becomes:

events_20230524

And this will match the destination table ID of the GA4 export, too, as you’ll see when we build the Cloud Function.

Set the query to Overwrite the destination table. This is because the source of the query is the entire GA4 daily table, so it doesn’t matter if you overwrite the destination table each time the query is run. As I mentioned earlier, the GA4 daily job can happen multiple times per day, and it’s possible for backfilling to happen, too.

By having the Overwrite option selected, it ensures the flattened table is always generated from the most recent state of the source table (the GA4 daily export table).

2. Create the Logs Router

We now have our scheduled query built, waiting to be triggered with the GA4 load job.

But that’s the end of the pipeline. We need to build everything else now.

The Logs Router looks for a log entry that is generated when a GA4 daily load job is complete. We use this as the trigger event for our entire pipeline setup.

This log entry is then shipped to a Pub/Sub topic, which starts the Cloud Function.

Now, head over to Logs Router and click the CREATE SINK button in the top to create a new router sink.

  1. Give the sink a name.
  2. Choose Google Cloud Pub/Sub topic as the destination.
  3. From the list of available Pub/Sub topics, click to create a new topic.
  1. Give the topic and ID, and leave all the other settings as their defaults. Click CREATE TOPIC.
  2. In the Build inclusion filter, copy-paste the following code. Modify the dataset ID on line 3 to match the dataset ID of your GA4 export dataset.
				
					protoPayload.methodName="jobservice.jobcompleted" 
protoPayload.authenticationInfo.principalEmail="firebase-measurement@system.gserviceaccount.com"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.datasetId="my_dataset"
protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId=~"^events_\d+"
				
			
  1. Click CREATE SINK once you’re done.

The inclusion filter looks for logs that have the method jobservice.jobcompleted and were specifically generated by the firebase-measurement@... service account. This is the account that performs all the Google Analytics 4 export jobs.

Here’s what the Logs Router should look like:

Just one thing remains! We need to build the Google Cloud Function that is subscribed to the Pub/Sub topic receiving these load job log entries.

3. Build the Google Cloud Function

Google Cloud Functions are serverless, well, functions that can be run on-demand in the cloud. Their purpose is to perform small, low-intensity tasks at scale.

In this case, we want the Cloud Function to subscribe to the Pub/Sub topic we created above.

We then want it to parse the log entry for the destination table details, so that we can calibrate the run time of the scheduled query to match the date of the table that the GA4 daily job created (or updated).

So, head on over to Cloud Functions and click CREATE FUNCTION to get started.

  1. Keep environment as 1st gen.
  2. Give the function a descriptive name.
  3. Choose a region for the function – this can be anything you like but it would make sense to have the regions of your BigQuery datasets and related Google Cloud components be the same.
  4. Choose Cloud Pub/Sub as the trigger type.
  5. Select the Pub/Sub topic you created in the previous chapter as the trigger.
  6. Check the box Retry on failure (this retries the Cloud Function up to 7 days in case it fails in an error).
  7. Click SAVE to save the trigger settings.

This is what it should look like:

  1. Ignore the Runtime, build, connections and security settings accordion and click NEXT to continue.

In the next screen, we need to add the function code itself. This is what gets executed when the function is run.

  1. Keep Node.js as the runtime (choose the latest non-Preview version).
  2. Click package.json to edit its contents.
  3. Add the following line in the "dependencies" property:

 "@google-cloud/bigquery-data-transfer": "^3.1.3"

This is what package.json should look like (the version numbers might be different when you try this solution):

  1. Select index.js from the list of files to activate the code editor.
  2. Edit the Entry point field on top of the editor to be runScheduledQuery.
  3. Copy-paste the following code into the editor.
				
					const bigqueryDataTransfer = require('@google-cloud/bigquery-data-transfer');

exports.runScheduledQuery = async (event, context) => {
  // Update configuration options
  const projectId = '';
  const configId = '';
  const region = '';

  // Load the log data from the buffer
  const eventData = JSON.parse(Buffer.from(event.data, 'base64').toString());
  const destinationTableId = eventData.protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId;
  
  // Grab the table date and turn it into the run time for the scheduled query
  const tableTime = destinationTableId.replace('events_', '');
  const year = tableTime.substring(0, 4),
        month = tableTime.substring(4, 6),
        day = tableTime.substring(6, 8);
  // Set the run time for the day after the table date so that the scheduled query works with "yesterday's" data
  const runTime = new Date(Date.UTC(year, month - 1, parseInt(day) + 1, 12));
  // Create a proto-buffer Timestamp object from this
  const requestedRunTime = bigqueryDataTransfer.protos.google.protobuf.Timestamp.fromObject({
    seconds: runTime / 1000,
    nanos: (runTime % 1000) * 1e6
  });
  
  const client = new bigqueryDataTransfer.v1.DataTransferServiceClient();
  const parent = client.projectLocationTransferConfigPath(projectId, region, configId);
  
  const request = {
    parent,
    requestedRunTime
  };

  const response = await client.startManualTransferRuns(request);
  return response;
};
				
			
  1. Change the projectId value to match the Google Cloud Platform project ID where your build your scheduled query.
  2. To get values for region and configId, browse to scheduled queries, open your scheduled query, and click the Configuration tab to view its details.
  3. region value should be the Google Cloud region of the Destination dataset, so click through to that to check if you don’t remember what it was.
  4. configId is the GUID at the end of the Resource name of the scheduled query.
This is what my function looks like:

My destination dataset is multi-region EU, hence the 'eu' value for region. If your dataset is in a specific region, the value would be something like us-west1, for example.

Once you’re happy, click DEPLOY in the bottom of the screen to deploy the function.

The purpose of the function is to take the log entry and parse it for the destination table ID (events_YYYYMMDD).

The date part is then extracted, and a new proto-buffer Timestamp object is created from that date object, as this is required if you want to manually set the run time of the query.

The only special thing we do is move one day forward in time, because we want the run time to be one day after the destination table ID that was updated.

This is to mirror how the GA4 daily export works – it always updates yesterday’s table (or a table further in the past).

When you click DEPLOY, it will take a moment for the function to be ready for action. Once the spinner stops spinning and you see a green checkmark, you’re good to go.

4. Test the setup

I couldn’t figure out a way to manually test the Cloud Function, as it would require generating an identical log entry as those created by the GA4 export service account.

If you have an idea how to test this with a mock log entry, please let me know in the comments!

There are typically 2–3 daily load jobs from GA4, so you’ll need to wait some hours in the afternoon for a load job to be generated. 

The best way to check if your scheduled queries are running is to go to the BigQuery user interface and check the PROJECT HISTORY tab at the bottom of the screen.

Each LOAD job where the owner is firebase-measurement@... should be followed shortly by a QUERY job with a Job ID that starts scheduled_query.... If you see this, it means your setup is working, and you can verify this by visiting the dataset where your scheduled query writes its output to.

If you don’t see this, there’s a problem somewhere along the line, and a good first step is to visit Logs Explorer and look for error logs related to one or more of the components in your setup.

Typical culprits are an incorrectly configured Logs Router or a mistake in the Cloud Function.

Summary

It might be a good idea to double-check your setup by watching the video as well.

As far as Google Cloud Platform pipelines go, this is not a complicated setup. You actually only had to configure three things:

  • The scheduled query that you want to run on-demand,
  • the Logs Router that listens for load job complete entries,
  • and the Cloud Function that triggers with the log entry and runs the scheduled query.

Yes, you created a Pub/Sub topic during the process, but the beauty of the Google Cloud Platform is that this was embedded in the flow of setting up the pipeline itself, and you never had to actually configure the Pub/Sub topic or its subscriptions separately, it was all done without having to break your stride.

This is why the Google Cloud Platform is such a pleasure to work with. The way it guides the user through interconnected components is way ahead of its competition (in my opinion).

I hope you found this informational, and I hope you’ve been bitten by the Google Cloud Platform bug (if you weren’t a fan already). Let me know in the comments if you have any questions about the setup!

If you’ve built this funnel some other way, I’d love to hear more about your approach, too.

11 thoughts on “How Do I Trigger A Scheduled Query When The GA4 Daily Export Happens?”

  1. Hi Simo –

    Just wanted to express my thanks for this blog post. I appreciate the knowledge sharing of multiple different aspects of GCP. Currently building a pipeline to send data from GA4/BigQuery to Redshift, and was able to extrapolate my process from the lessons in this video.

    My aspiration is to move from doing data analytics to data engineering and this is just what I needed!

    Happy Holidays from NYC 🙂

  2. Hi Simo, thanks for this nice article. I implemented the solution and it works as expected. However, I would like the destination table to be a date-partitioned table rather than a sharded table. How would you suggest to approach this challenge? Is this something that should be changed in the logic of the cloud function?

  3. Hi Simo and thx for this,
    I run the load-jobs query on my dataset, but I found something different from expected.
    In particular, it looks like everyday the load is performed three times minimum, going back in time.
    For example: “2022-12-05” (creation_date) is associated with “events_20221202”, “events_20221203” and “events_20221204” (table_ids). Is this because of the 72h post-processing of GA4 ?
    Moreover, for certain “creation_dates”, I find 6 table_ids associated with those! For example, creation_date = “2022-12-04” and table_id = “events_20221130” or “events_20221101”.
    I expected a relationship like “creation_date” associated with the previous day “table_id” only, like your example above. Thanks!

    1. Hi!

      Yes, there’s a 72h update window so historical updates are not out of the question. Per Google’s documentation, sometimes the historical updates can go even further back if they’re fixing bugs in data collection retroactively, for example. That’s why the Cloud Function gets the run time from the destination table ID and *not* from the creation_time of the daily job!

      Also, multiple loads per day is the norm – for some reason the same table gets loaded more than once with the export job.

  4. Hi! Really insightful blog post, thanks for this! I was wondering whether you also have a way of incorporating a backfill with this set-up?

    I have some tables paired with manual scheduled queries that I want to switch over to this set-up. Obviously these already contain quite some data accumulated from the past and I do not want to lose this data. Perhaps an easy task, but it does not seem that straightforward to me.

  5. A poor man’s solution (avoiding cloud functions and super precise timings) – you could maybe create a sink of the update events into a BQ table and then have your scheduled query / stored procedure handle the create/replace. This could be less scary for those unfamiliar with Node/Python or if your cloud team doesn’t want you stepping outside the “walled garden” of BigQuery. It’s definitely less flexible going this route could reduce fear factor for lazy martechies, “non-technical marketers” or disaffected data scientists. The scheduled query could be set to run at whatever frequency and while it wouldn’t be run immediately it could do a simple check every hour or so and be timely enough for most use cases. Getting those historic changes accounted for is the most critical / stressful imo…

    1. Hey Adam!

      Yeah, there are many ways to skin the cat 🙂 A slightly “richer man’s solution” would be to use Gen2 of GCF and EventArc, although that might not actually be any simpler, even if it’s somewhat more straightforward.

      But one of the key values of Simmer is to destroy this myth of the “non-technical marketer”. I believe in not sugarcoating things when the payoff for learning things the hard way outweighs other solutions. That’s one of the reasons I think this article could be valuable – it solves the problem, yes, but it also introduces the reader to a number of components in the GCP stack that can be really valuable in future data engineering work. Hopefully I manage to demystify them somewhat!

      Thanks for the feedback – your approach is sound but as you mention it does still surface the problem of historical updates and avoiding unnecessary query runs.

Leave a Comment

Hide picture