Andy in the Cloud

From BBC Basic to Force.com and beyond…

Apex Process Orchestration and Monitoring with Platform Events

7 Comments

When it comes to implementing asynchronous workloads in Apex developers have a number of options such as Batch Apex and Queueable, each can be driven by user or system driven actions. This blog focuses on some of the more advanced aspects of implementing async workloads using Platform Events and Apex.

In comparison to other approaches, implementing asynchronous workloads using Platform Events offers two unique features. The first helps you better dynamically calibrate and manage resources based on data volumes to stay within limits, while the second feature provides automatic retry capabilities when errors occur. Lastly I want to highlight an approach I used to add some custom realtime telemetry to the workload using Platform Events.

Side Note: Before getting into the details, the goal of this blog is not to say one async option is better than another, rather to highlight the above features further so you can better consider your options. I also include a short comparison at the end of this blog with Batch Apex.

Business Scenario

Lets imagine the following scenario to help illustrate the use of the features described below:

  • Business Process
    Imagine that your business is processing Invoice generation on the platform and that the Orders that drive this arrive and are updated constantly.
  • Continuous Processing
    In order to avoid backlogs or spikes of invoices being processed, you want to maintain a continuous flow of the overall process. For this you create a Platform Event called Generate Invoice. This event can easily be sent by an admin / declarative builders who have perhaps setup some rules on the Orders object using Process Builder.
  • Resource Management
    Orders arrive in all shapes and sizes, meaning the processing required to generate Invoices can also vary when you consider variables such as number of order lines, product regional discounts, currencies and tax rules etc. Processing each one at time per execution context is an obvious way to maximize use of available resources and is certainly an option, but if resources allow, processing multiple invoices in one execution context is more efficient.

Below is what the Generate Invoice Platform Event looks like, it simply has a reference to the Order Id (though it could equally reference an External Id on the Order object).

For the purposes of this blog we are not focusing on how the events are sent / published. You can publish events using programatic API’s on or off platform or using one of the platforms declarative tools, there are in fact many ways to send events. For this blog we will just use a basic Apex snippet to generate the events as shown below.

List<GenerateInvoice__e> events = new List<GenerateInvoice__e>();
for(Order order : 
       [select Id from Order 
          where Invoiced__c != true 
          order by OrderNumber asc]) {
   events.add(new GenerateInvoice__e(OrderId__c = order.Id));
}
EventBus.publish(events);        

Here is a basic Apex handler for the above Platform Event that delegates the processing to another Apex class:

trigger GenerateInvoiceSubscriber on GenerateInvoice__e (after insert) {
    Set<Id> orderIds = new Set<Id>();
    for (GenerateInvoice__e event : Trigger.New) {
        orderIds.add(event.OrderId__c);
    }
    OrderService.generateInvoices(orderIds);
}

Processing Chunks of Events and Handling Retries

The following diagram highlights how a more advanced version of the above Apex handler can be used to optimally work within the limits to process chunks of Orders based on their size/complexity and also retry those that result in some errors along the way.

In order to orchestrate things this way you need to use some Apex API’s in your handler logic to let the platform know a few things. At the end of this blog I also share how I added telemetry to better visualize this along with a video. So don’t worry at this juncture if its not 100% clear how what you are seeing below is possible, just keep reading and watching!

Controlling how many Events are passed to your handler

Imagine the above code snippet published 1000 events. The platform docs state that it can pass up to a maximum of 2000 events to an Apex event handler at once, meaning the above will be invoked once. If you have been on the platform a while you will know that 200 (not 2000) is common number used to expressed a minimum number of records you should use when testing Apex Triggers and general bulkkification best practice. So why 2000 in the case of platform event handlers? Well the main aim of the platform is to drain the Platform Event message queue quickly and so it attempts to give the handler as much as possible, just in case it can process it.

As we have set out in our scenario above, Orders can be quite variable in nature and thus while a batch of 1000 orders with only a few order lines might be possible within the execution limits, include few orders in that batch with 100’s or a few thousand line items and its more likely you will hit CPU or heap limits. Fortunately, unlike Batch Apex, you get to control the size of each individual chunk. This is done by effectively giving some of the 1000 block of events passed to your handler back to the platform to pass back in a separate handler invocation where the limits are reset.

Below is some basic code that illustrates how you might go about pre-scanning the Orders to determine complexity (by number of lines) and thus dynamically calibrating how many of the events your code can really process within the limits. The orderIds list that is passed to the service class is reset with the orders that can be processed. The key part here is the use of the setResumeCheckpoint method that tells the platform where to resume from after this handler has completed its processing.

trigger GenerateInvoiceSubscriber on GenerateInvoice__e (after insert) { 

    // Determine number overall order lines to process 
    //   vs maximum within limits (could be config)
    Integer maxLines = 40000;
    Set<Id> orderIds = new Set<Id>();
    for (GenerateInvoice__e event : Trigger.New) {
        orderIds.add(event.OrderId__c);
    }
    Map<Id, Integer> lineCountByOrderId = 
        new OrdersSelector().selectLineCountById(orderIds);

    // Bulkify events passed to the OrderService
    orderIds = new Set<Id>();
    Integer lineCount = 0;
    for (GenerateInvoice__e event : Trigger.New) {
        orderIds.add(event.OrderId__c);
        EventBus.TriggerContext.currentContext().setResumeCheckpoint(event.ReplayId);
        lineCount = lineCount + lineCountByOrderId.get(event.OrderId__c);
        if(lineCount>maxLines) { 
            break;
        }
    }

    OrderService.generateInvoices(orderIds);
}

You can read more about this approach in the formal documentation here.

Implementing Retry Logic

There are a number of reasons errors can occur when your handler code is running. For errors that represent transient situations, such as timeouts and row locks for example, you would normally have to ask the user to retry (via email or notification) or utilize a dynamically scheduled job to retry. With Platform Event handlers in Apex, when the system RetryableException is thrown the Platform will automatically retry a batch events after period of time, up to 9 times (the batch sizes may vary between attempts). It is generally recommended that you do not let your code retry above 6, since when the maximum is reached the platform deactivates the handler/trigger.

The following code is basic illustration of how to use this facility and track the number of retries before reaching the max. In this example if the soft maximum is reached the events are effectively just lost in this case or if needed you can write them to a staging custom object for resubmission or just simply have some code such as the above scan for unprocessed Orders and resubmit events.

    // Invoke OrderService, support retries
    try {
        OrderService.generateInvoices(orderIds);
    } catch (Exception e) {
        // Only retry so many times, before giving up (thus avoid disabling the trigger)
        if (EventBus.TriggerContext.currentContext().retries < 6) {
            throw new EventBus.RetryableException(e.getMessage());
        }
        // In this case its ok to let the events drain away... 
        //   since new events for unprocessed Orders can always be re-generated
    }    

Using Platform Events to monitor activity

I used Platform Events to publish telemetry about the execution of the above handlers, by creating another Platform Event called Subscriber Telemetry and used a Lightning Web Component to monitor the events in realtime. Because Platform Events can be declared as being outside the standard Apex Transaction they are sent immediately using the “Publish Immediately” setting, they are sent even if an error occurs.

To publish to this event I simply added the following snippet of code to the top of my handler.

// Emit telemetry
EventBus.publish(
    new SubscriberTelemetry__e(
        Topic__c = 'GenerateInvoice__e', 
        ApexTrigger__c = 'GenerateInvoiceSubscriber',
        Position__c = 
           [select Position from EventBusSubscriber
              where Topic = 'GenerateInvoice__e'][0].Position,
        BatchSize__c = Trigger.new.size(),
        Retries__c = EventBus.TriggerContext.currentContext().retries,
        LastError__c = EventBus.TriggerContext.currentContext().lastError));

The following video shows me clicking a button to publish a batch of 1000 events, then monitoring the effects on my chunking logic and retry logic. The video actually includes me fixing some data errors in order to highlight the retry capabilities. The errors shown are contrived by some deliberately bad code to illustrate the retry logic, hence the fix to the Order records looks a bit odd, so please ignore that. Finally note that the platform chose to send my handler 83 events first then larger chunks thereafter, but in other tests I got 1000 events in the first chunk.

Batch Apex vs Platform Events

Batch Apex also provides a means to orchestration sequentially the processing of records in chunks, so I thought I would end here with a summary of some of the other differences. As you can see one of the key ones to consider is the user identity the code runs as. This is not impossible to workaround in the platform event handler case, but requires some coding to explicitly set the OwnerId field on records if that information is important to you. Overall though I do feel that Platform Events offers some useful options for switching to a more continuous mode of operation vs batch, so long as your aware of the differences this might be a good fit for you.

Side Note: For Apex Queueable handlers you will soon have the option to implement so called Transaction Finalizers that allow you to implement retry or logging logic.

7 thoughts on “Apex Process Orchestration and Monitoring with Platform Events

  1. Awesome blog! Learned a lot from this pattern. Thanks!

  2. Hi Andrew,Great Post.
    Do we have the github repo for this use case.I would like to test this.

  3. Hi Andyn thanks for your blog. It really help to understand how to use the retry (as a developer)
    I’m currently trying to prepare a webinar on “how to use them as an admin or a dev” but I’m struggling with the bulkify within ProcessBuilder.

    If I create a batch of 5 platformEvent, it seems that ProcessBuilder is only able to process the first one and that is skipping the 4 other ones . I’ve you try or experienced this ?
    Other question is regarding the 50K limit of cometD client connection. While using a lwc, have you try to see what would happen if you process more than 50000 PE. I’m afraid that you’ll hit the hidden 50K limit (I mean hidden as I can’t find it in the limits displayed by Salesforce)

    Thanks if you have any information to share regarding this subject.

    Fabrice

  4. Great learning from this article. However I have a question.EventBus.TriggerContext.currentContext().setResumeCheckpoint is set at the last event in the chunk #2. And when the retry logic fires, does it pick up from the last event in the chunk #2 or all the events in the chunk #2?
    I am assuming from the demo, its the whole chunk#2 however could you explain more on how setting the resumepoint to the last event of the chunk, picks up the whole chunk?

    Thanks
    Nikhil

Leave a comment