Take the community feedback survey now.

Using UdpUnicastEventProvider in StefanOlsen.Optimizely.Events.Sockets library

Vote:
 

I am working with this library (StefanOlsen.Optimizely.Events.Sockets) to set up a UdpUnicastEventProvider to do CMS replication.  However I have a unique scenario where I'd like to update the endpoints used by the provider dynamically at runtime.  We're running in a K8 environment where pods can go up and down.  We'd like to implement a hosted service that does DNS lookups periodically to constantly tell the UdpUnicastEventProvider all the ip's and ports where replication is required.  Is this even possible or can you only add endpoints at startup like such?  Is there another path to a solution that does not involve updating the Endpoints array?

services.Configure<UdpUnicastEventProviderOptions>(options =>
{
    options.BindHost = "0.0.0.0";
    options.Port = 9100;
    options.Endpoints = Array.Empty<UdpEndpoint>();
});
services.AddUdpUnicastEventProvider();
#339837
Jul 31, 2025 23:44
Vote:
 

Hi Todd

I am the author of the library you mention. Happy to know you find it useful.

To be honest I have not yet tested it in a Kubernetes environment. Usually for Kubernetes you would deploy a broker like Redis or MassTransit in the namespace.

The unicast provider will not work in your case, because it requires prior configuration of each pod IP. In theory you should be able to enable a multicast IP in Kubernetes with a NetworkAttachmentDefinition configuration, so you can use the UdpMulticast provider.

An alternative, which I have not yet tested, could be a broadcast provider that simply sends the messages to all nodes in a namespace.

#339839
Aug 01, 2025 12:42
Vote:
 

Thank you Stefan, for your swift reply.  It sounds like this is not possible at the moment which I was coming to the conclusion of myself after looking into your repo.  I noticed your constructor in UdpUnicastEventProvider where you loop through Endpoints and do linking.  This is what gets registered at startup.  Is it possible to have a public method to re-link at runtime?

public UdpUnicastEventProvider(EventsServiceKnownTypesLookup knownTypesLookup, UdpUnicastEventProviderOptions options)
        {
            _serializer = new DataContractSerializer(typeof(EventMessage), knownTypesLookup.KnownTypes);
            _udpClient = new UdpClient(new IPEndPoint(IPAddress.Parse(options.BindHost), options.Port));

            _sender = new BroadcastBlock<EventMessage>(msg => msg);
            foreach (var endpoint in options.Endpoints)
            {
                var actionBlock = new ActionBlock<EventMessage>(async msg =>
                {
                    await SendMessageInternal(msg, endpoint.Host, endpoint.Port);
                });

                _sender.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
            }
        }
#339840
Aug 01, 2025 14:11
Vote:
 

I am not sure that would be reliable in the long run.

Do you know if you can run multicast in your Kubernetes environment?

Or can you deploy a service IP that sends the same message to all nodes? This could in theory be used as an endpoint IP in the unicast configuration. Then you would just add that IP to your configuration. I believe each Optimizely node filters away messages from itself. But if it doesn't we can add filtering in the library, so when a node receives a message it just sent to the service IP, it will not act on it.

#339842
Aug 01, 2025 14:43
Vote:
 

Your suggestion was helpful so we thought we'd share our solution. 

In our environment we have a UI and API app (both Optimizely) running in each pod.  We now created a new relay app to run in each one of those pods as well.  In each pod, the UI and API communicate messages with each other and the relay sends those messages to the other pods UI and API app.  The relay app also keeps track of all of the other pods IP's as they go up and down via a DNS lookup.  We then register the UdpUnicastEventProvider in Startup with the port of the current app and add 2 entries for endpoints.  The endpoint of the relay and the endpoint of whatever the "other" app is.  If we're in UI, it's the API and vice versa.

Also, we're not sure on how big messages can get, hence the max size.

Relay App

using System.Net;
using System.Net.Sockets;
using System.Net.NetworkInformation;
using System.Buffers;

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();

string GetRequiredEnv(string name) => Environment.GetEnvironmentVariable(name) ?? throw new Exception($"{name} is required.");

var unicastUi = ####;
var unicastApi = ####;
var unicastRelay = ####;
var unicastDns = GetRequiredEnv("UNICAST_DNS");

IPEndPoint[]? currentEndpoints = Array.Empty<IPEndPoint>();
long messageCount = 0;
// START Remove this when we know the correct buffer size
int maxPayloadSize = 0;
// END

var cts = new CancellationTokenSource();
var token = cts.Token;

app.Lifetime.ApplicationStopping.Register(() =>
{
    Console.WriteLine("Application is shutting down...");
    cts.Cancel();
});

// 65507 is the largest it could possibly be, but this is memory-inefficient. Console output will help us tune this.
const int MaxUdpSize = 65507;

// This is safe in a container, move within the loop above newIps if running elsewhere.
var localIps = NetworkInterface.GetAllNetworkInterfaces()
    .Where(ni => ni.OperationalStatus == OperationalStatus.Up)
    .SelectMany(ni => ni.GetIPProperties().UnicastAddresses)
    .Where(ua => ua.Address.AddressFamily == AddressFamily.InterNetwork)
    .Select(ua => ua.Address)
    .ToHashSet();

_ = Task.Run(async () =>
{
    // Track the last update so that we don't thrash creatig IPEndPoint arrays and doing Interlocked.Exchanges
    var lastIpSet = new HashSet<IPAddress>();

    while (!token.IsCancellationRequested)
    {
        try
        {
            var newIps = (await Dns.GetHostAddressesAsync(unicastDns))
                .Where(ip => ip.AddressFamily == AddressFamily.InterNetwork && !localIps.Contains(ip))
                .ToArray();

            var newIpSet = new HashSet<IPAddress>(newIps);

            if (!newIpSet.SetEquals(lastIpSet))
            {
                Console.WriteLine($"[{DateTime.Now:G}] IP list changed:");
                Console.WriteLine($"Before: {string.Join(", ", lastIpSet.Select(ip => ip.ToString()))}");
                Console.WriteLine($"After:  {string.Join(", ", newIpSet.Select(ip => ip.ToString()))}");

                lastIpSet = newIpSet;

                var newEndpoints = new IPEndPoint[newIps.Length * 2];
                for (int i = 0; i < newIps.Length; i++)
                {
                    newEndpoints[i * 2] = new IPEndPoint(newIps[i], unicastUi);
                    newEndpoints[i * 2 + 1] = new IPEndPoint(newIps[i], unicastApi);
                }

                Interlocked.Exchange(ref currentEndpoints, newEndpoints);
            }
        }
        catch
        {
            // Dns.GetHostAddressesAsync errors when a record doesn't exist.  Headless services in kubernetes don't get created unless at least one pod is running.
            Interlocked.Exchange(ref currentEndpoints, Array.Empty<IPEndPoint>());
            lastIpSet = new HashSet<IPAddress>();
        }

        await Task.Delay(1000, token).ContinueWith(t => { });
    }
}, token);

_ = Task.Run(async () =>
{
    var env = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") ?? string.Empty;
    var loopbackEnvs = new[] { "Development", "QA", "Sandbox", "Staging", "Production" };
    var bindAddress = loopbackEnvs.Contains(env, StringComparer.OrdinalIgnoreCase) ? IPAddress.Loopback : IPAddress.Any;

    // We're using separate sockets for sending and receiving and not UdpClient (which is a wrapper for Socket with a bunch of stuff we don't need added in).
    using var recvSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
    recvSocket.Bind(new IPEndPoint(bindAddress, unicastRelay));

    using var sendSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
    // Rent-a-buffer reduces GC thrashing, though the current value I have of 65507 is too high
    var buffer = ArrayPool<byte>.Shared.Rent(MaxUdpSize);

    try
    {
        while (!token.IsCancellationRequested)
        {
            var receiveArgs = new SocketAsyncEventArgs
            {
                RemoteEndPoint = new IPEndPoint(IPAddress.Any, 0)
            };
            receiveArgs.SetBuffer(buffer, 0, buffer.Length);

            var tcs = new TaskCompletionSource<SocketAsyncEventArgs>(TaskCreationOptions.RunContinuationsAsynchronously);
            receiveArgs.Completed += (_, args) => tcs.TrySetResult(args);

            if (!recvSocket.ReceiveFromAsync(receiveArgs))
            {
                tcs.TrySetResult(receiveArgs);
            }

            var result = await Task.WhenAny(tcs.Task, Task.Delay(Timeout.Infinite, token));
            if (result != tcs.Task) break;

            Interlocked.Increment(ref messageCount);
            // START Remove this when we know the correct buffer size
            InterlockedExtensions.UpdateMax(ref maxPayloadSize, receiveArgs.BytesTransferred);
            // END

            var endpoints = Interlocked.CompareExchange(ref currentEndpoints, null, null)!;
            if (endpoints.Length == 0) continue;

            var segment = new ArraySegment<byte>(buffer, 0, receiveArgs.BytesTransferred);

            foreach (var ep in endpoints)
            {
                try
                {
                    _ = sendSocket.SendToAsync(segment, SocketFlags.None, ep);
                }
                catch
                {
                    // swallow send exceptions for performance, it's udp so most errors are going to be missed anyway
                }
            }
        }
    }
    finally
    {
        ArrayPool<byte>.Shared.Return(buffer);
    }
}, token);

_ = Task.Run(async () =>
{
    var logDelay = TimeSpan.FromSeconds(60);
    var delayEnv = Environment.GetEnvironmentVariable("UNICAST_RELAY_LOGDELAY");
    if (int.TryParse(delayEnv, out var delaySeconds) && delaySeconds > 0)
    {
        logDelay = TimeSpan.FromSeconds(delaySeconds);
    }

    while (!token.IsCancellationRequested)
    {
        try
        {
            await Task.Delay(logDelay, token);
        }
        catch (OperationCanceledException)
        {
            break;
        }

        var eps = Interlocked.CompareExchange(ref currentEndpoints, null, null)!;
        var count = Interlocked.Exchange(ref messageCount, 0);
        // START Remove this when we know the correct buffer size
        var maxSize = Interlocked.CompareExchange(ref maxPayloadSize, 0, 0);
        // END

        Console.WriteLine($"[{DateTime.Now:G}] Current Endpoints: {string.Join(", ", eps.Select(e => e.Address.ToString()).Distinct())}");
        Console.WriteLine($"Messages received in last {logDelay.TotalSeconds} seconds: {count}");
        // START Remove this when we know the correct buffer size
        Console.WriteLine($"Largest message size since start: {maxSize + 28} bytes (includes 20B IP + 8B UDP headers)");
        // END
    }
}, token);

await app.RunAsync();

// START Remove this when we know the correct buffer size
static class InterlockedExtensions
{
    public static void UpdateMax(ref int target, int value)
    {
        int initial, computed;
        do
        {
            initial = target;
            if (value <= initial) return;
            computed = value;
        } while (Interlocked.CompareExchange(ref target, computed, initial) != initial);
    }
}
// END


Startup.cs (UI and API)

services.Configure<UdpUnicastEventProviderOptions>(opts =>
{
    opts.BindHost = "0.0.0.0";
    opts.Port = <CURRENT_APP>;
    opts.Endpoints = new[]
    {
        new UdpEndpoint { Host = "127.0.0.1", Port = <RELAY_APP> },
        new UdpEndpoint { Host = "127.0.0.1", Port = <OTHER_APP> }
    };
});
services.AddUdpUnicastEventProvider();




#339874
Aug 05, 2025 15:33
Stefan Holm Olsen - Aug 05, 2025 16:49
You are right about the max UDP payload size (64 KB). In theory everything above around 500 bytes can be fragmented. But if the datagrams are kept within a small non-routed network, the risk of data loss should be minimal.
Vote:
 

Nice solution. I had thought about making a simple message broker app as well. But I never got around to do it.

It might be easier to spin up a small Redis (or Garnet) instance in Kubernetes, for the pub/sub feature. I made a connector some time ago.

With the Redis connector, all pods connect to Redis and starts listening to messages. No pod need to know the other pod adresses in advance. So it is basically the same as your relay app.

#339875
Aug 05, 2025 16:43
* You are NOT allowed to include any hyperlinks in the post because your account hasn't associated to your company. User profile should be updated.