Try our conversational search powered by Generative AI!

Aniket
Feb 27, 2023
  948
(0 votes)

Azure Service Bus Messaging (Topic/Queues) for transferring data between external PIM/ERP to Optimizely

Optimizely provides a PIM solution as a part of the DXP package. More information here: https://www.optimizely.com/product-information-management/

More often that not, clients have their existing PIM and/or ERP systems that feed other systems in their organization. For ex: Their PIM/ERP system may be serving physical stores, running reports, feeding their invoicing details and the SOURCE of TRUTH. There are numerous blog posts on importing catalog one time into Optimizely using the out-of-the-box Optimizely APIs.

Needless to say, as updates are made to say pricing, inventory, assets, delivery charges, taxes etc. in ERP/PIM/DAM, we need to keep that data synchronized in the Optimizely catalog. to ensure the customers see the most up-to-date information on the website as quickly as possible.

This requires a strategy to figure out how to move content between two systems and do it on a regular fault tolerant basis. A quick solution is the use of Optimizely's scheduled job to fetch data and update it in the database. though there are some limitations with a scheduled job - timeouts, low fault tolerance, logging, speed, resource constraints, alerting etc. 

Another alternative is to Azure Service Bus Messaging to line up the product updates from the source system (client's PIM/ERP) and synchronize it to the Optimizely catalog on a configurable schedule. Azure Service bus have a lot of advantages as described below and you can also read up online. 

Advantages:

  • Message Sessions
  • Auto-forwarding
  • Dead-lettering
  • Scheduled Delivery
  • Message deferral
  • Transactions
  • Auto-delete on idle
  • Duplicate detection
  • Geo Disaster recovery

You can use the Azure Service Bus .NET SDK for integration: https://learn.microsoft.com/en-us/dotnet/api/overview/azure/service-bus?preserve-view=true&view=azure-dotnet

Strategy:

We have used the following strategy on a huge B2C retail client and works really well.  

  1. Our custom C# function/console app (extract job) deployed on Azure gets all products that have been updated in the last 'x' mins/hours by pinging the custom endpoint provided by client
  2. This function app is run using a 'TimerTrigger' configurable in Azure function app configuration. More info on function apps: https://learn.microsoft.com/en-us/azure/azure-functions/functions-create-your-first-function-visual-studio?tabs=in-process
  3. This function app is responsible for getting the data from the endpoint, serializing each message as a JSON and send it to the ASB topic (product extract topic)
  4. A second custom C# function app (transload job) which was subscribed to the above topic in ASB using 'ServiceBusTrigger' (executes every time there's a new message)
  5. This function app's job was to read the message from the topic, deserialize it and update the product item using Optimizely Service API

Diagram:

Sample Code (Export Job):

namespace ClientNamespace.Export.Features.CartCheckout.TaxSync
{
    using System;
    using System.Linq;
    using System.Net.Http;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.WebJobs;
    using ClientNamespace.Export.Core.Features.CartCheckout.TaxRateSync.Models;
    using ClientNamespace.Export.Core.Features.Infrastructure.Azure.Constants;
    using ClientNamespace.Export.Core.Features.Infrastructure.Azure.Services;
    using ClientNamespace.Export.Core.Features.Infrastructure.Logging;
    using ClientNamespace.Export.Features.Infrastructure.Azure.Constants;
    using ClientNamespace.Export.Features.Infrastructure.Azure.Extensions;
    using ClientNamespace.Export.Features.Infrastructure.Azure.Services;
    using ClientNamespace.Export.Features.Infrastructure.Rfapi.Clients;
    using Serilog;
    using Serilog.Core;
    using ConnectionStringNames = ClientNamespace.Export.Core.Features.Infrastructure.Azure.Constants.ConnectionStringNames;
    using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext;

    public class TaxRatesExportFunction
    {
        private const int ShortCircuit = 100_000;

        private IHttpClientFactory _clientFactory;

        public TaxRatesExportFunction(IHttpClientFactory clientFactory)
        {
            _clientFactory = clientFactory;
        }

       #if !DEBUG // remove this line to run locally as a console app
        [FunctionName("TaxRatesExport")]
       #endif
        public async Task Run(
            [TimerTrigger(
                ScheduleExpressions.TaxRatesExport,
                RunOnStartup = false)]
            TimerInfo myTimer)
        {
            var log = LoglevelWrapper.WrapLogger(Log.Logger);

            try
            {
                log.Information("Starting TaxRatesExportFunction: {starttime}", DateTime.UtcNow);

                using (var topicMessageSender = new TopicMessageSender(ConnectionStringNames.ServiceBusTaxRates, TopicNames.TaxRates, log))
                {
                    var taxRates = await apiClient.TaxesAllAsync(); // custom endpoint from the client
                    
		    var export = new TaxRateExport
                    {
                        TaxRates = taxRates
                            .Select(x => new TaxRate
                            {
                               Percentage = x.TaxRate ?? 0.000,
                               PostalCode = x.PostalCode,
                               TaxCode = x.TaxCode,
                               TaxableDelivery = x.TaxableDelivery,
                               TaxablePlatinum = x.TaxablePlatinum,
                             })
                             .ToList(),
                        };
		
                   // Send the message to the topic to be consumed by the the transload function app
		    
                        try
                        {
                            var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(export)))
                            {
                                MessageId = Guid.NewGuid().ToString(),
                                SessionId = "sesionid",
                            };
                            string connectionString = Environment.GetEnvironmentVariable("connectionStringName");
                            if (string.IsNullOrEmpty(connectionString))
                            {
                                connectionString = Environment.GetEnvironmentVariable($"CUSTOMCONNSTR_{"connectionStringName"}");
                            }

                            var topicClient = new TopicClient(connectionString, "topicName", RetryPolicy.Default);
                            await topicClient.SendAsync(message);
                        }
                        catch (Exception ex)
                        {
                            // logging
                        }
	
                }
            }
            catch (Exception ex)
            {
                log.Error(ex, "Unhandled exception in TaxRatesExportFunction {exception}", ex);
            }
            finally
            {
                log.Information("TaxRatesExportFunction Complete: {endtime}", DateTime.UtcNow);
            }
        }
    }
}

Sample code Import job:

namespace .Website.Import.Features.CartCheckout.TaxSync
{
    using System;
    using System.Net.Http;
    using System.Threading.Tasks;
    using Infrastructure.Azure.Constants;
    using Microsoft.Azure.WebJobs;
    using Newtonsoft.Json;
    using ClientNamespace.Export.Core.Features.CartCheckout.TaxRateSync.Models;
    using ClientNamespace.Export.Core.Features.Infrastructure.Azure.Constants;
    using ClientNamespace.Export.Core.Features.Infrastructure.Logging;
    using .Website.Core.Features.Infrastructure.Episerver.Clients;
    using Serilog;
    using Serilog.Context;
    using ConnectionStringNames = ClientNamespace.Export.Core.Features.Infrastructure.Azure.Constants.ConnectionStringNames;

    public class TaxRatesImportFunction
    {
        private readonly IHttpClientFactory _clientFactory;

        public TaxRatesImportFunction(IHttpClientFactory clientFactory)
        {
            _clientFactory = clientFactory;
        }

        #if !DEBUG // Remove this to run locally (will be triggered when it sees a message on the topic it's subscribed to)
        [FunctionName(FunctionNames.TaxRatesImport)]
        #endif
        public async Task Run(
            [ServiceBusTrigger(
                TopicNames.TaxRates,
                SubscriptionNames.TaxRates,
                Connection = ConnectionStringNames.ServiceBusTaxRates,
                IsSessionsEnabled = true)]
            string mySbMsg)
           {
              var log = LoglevelWrapper.WrapLogger(Log.Logger);

            try
            {
                log.Information("Starting TaxRatesImportFunction: {starttime}", DateTime.UtcNow);
                log.Debug("Tax Rates Import Message: {message}", mySbMsg);

                TaxRateExport export = null;

                try
                {
                    // Get taxes from topic queue
                    export = JsonConvert.DeserializeObject<TaxRateExport>(mySbMsg);
                }
                catch (Exception ex)
                {
                    log.Error(ex, "Could not JSON deserialize tax rates message {message} with exception {exception}", mySbMsg, ex);
                }

                if (export?.TaxRates == null)
                {
                    log.Warning("Tax rates deserialized, but data was null");
                    return;
                }

                // Load taxes into Episerver
                var serviceApiClient = EpiserverApiClientFactory.Create(log, _clientFactory);
                foreach (var taxRate in export.TaxRates)
                {
                    try
                    {
                        using (LogContext.PushProperty("importtaxrate", taxRate.TaxCode))
                        {
			   // Update the taxes table (either custom endpoint or using Service API)
                            await serviceApiClient.SaveTaxRateAsync(taxRate);
                        }
                    }
                    catch (Exception ex)
                    {
                        // Don't fail the group
			// Custom logic to handle exception when updating in the Optimizely dtabase.
                    }
                }
            }
            catch (Exception ex)
            {
                log.Error(ex, "Unhandled exception in TaxRatesImportFunction {exception}", ex);
            }
            finally
            {
                log.Information("TaxRatesImportFunction Complete: {endtime}", DateTime.UtcNow);
            }
        }
    }
}

As you see, with minimal code you can create a more fault tolerant synchronization to the optimizely database. You can now visualize this scaling to other areas of your website. For ex: We have scaled this system to automate processing of orders - As orders come in, the serialized order object is placed on the Azure service bus for automated processing all the way to completing the orders. Yes the client's IT team needs to write some code to automate it on their side but it has saved them hundred's of thousands of dollars in costs of manually updating each order by a keying team member. 

Can you think of other ways to scale the Optimizely system to use Azure Service Bus Messaging? 

Happy coding!

Feb 27, 2023

Comments

Please login to comment.
Latest blogs
Azure AI Language – Extractive Summarisation in Optimizely CMS

In this article, I demonstrate how extractive summarisation, provided by the Azure AI Language platform, can be leveraged to produce a set of summa...

Anil Patel | Apr 26, 2024 | Syndicated blog

Optimizely Unit Testing Using CmsContentScaffolding Package

Introduction Unit tests shouldn't be created just for business logic, but also for the content and rules defined for content creation (available...

MilosR | Apr 26, 2024

Solving the mystery of high memory usage

Sometimes, my work is easy, the problem could be resolved with one look (when I’m lucky enough to look at where it needs to be looked, just like th...

Quan Mai | Apr 22, 2024 | Syndicated blog

Search & Navigation reporting improvements

From version 16.1.0 there are some updates on the statistics pages: Add pagination to search phrase list Allows choosing a custom date range to get...

Phong | Apr 22, 2024