简要说明
- // 连接代码.
- using (var client = await StartClientWithRetries())
- {
- }
从方法看, 只是一个简单允许重试的启动客户端. 追踪进去会发现关于重试逻辑的实践, Socket 编程的实践, 基于内存的消息队列的实践, 依赖注入. 再看源码的基础上, 最好能配合一些理论书籍来看. 理论指导实践, 实践反馈理论, 才是技术成长的步骤.
这篇文章只涉及 Connect 所引用方法的部分说明, 一步一步来加深理解.
本来我是打算把 orleans 研究透之后再来写一篇, 但看了一周之后, 发下 connect 里面调用了很多类, 每个类又有很多方法, 这样下去没有尽头, 到最终估计什么也写不成.
分析源码本来就是循环渐进的过程, 也是一个熟悉框架 / 原理 / 实践的过程. 直接跳过这个步骤, 必然损失良多. 所以这部分就叫开胃菜吧. 在查看 connect 过程, 会越来越接触到各种知识.
本篇暂不涉及数据持久化, 主要依赖. netcore 内置方法操纵内存实现.
您会接触到的扩展知识
扩展知识之 Timer&TimerQueue
- Timer https://msdn.microsoft.com/zh-cn/library/system.timers.timer.aspx
- Timer
在设置的间隔后生成事件, 并提供生成重复事件的选项
TimerQueue
时间队列
扩展知识之信号量
SemaphoreSlim https://msdn.microsoft.com/zh-cn/library/system.threading.semaphoreslim.aspx
SemaphoreSlim 实现
- // 信号量
- SemaphoreSlim
表示 Semaphore 的轻量级替代, 它限制了可以同时访问资源或资源池的线程数
>>Release 释放
>> Wait 等待.
信号量有两种类型: 本地信号量和命名系统信号量. 前者是应用程序的本地. 后者在整个操作系统中是可见的, 并且适用于进程间同步. 该 SemaphoreSlim 是一个轻量级替代信号量不使用 Windows 内核中的信号类. 与 Semaphore 类不同, SemaphoreSlim 类不支持命名系统信号量. 您只能将其用作本地信号量. 所述 SemaphoreSlim 类为单一的应用程序内的同步推荐的信号量.
扩展知识之 BlockingCollection
BlockingCollection 介绍 https://docs.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview
利用 BlockingCollection 实现生产者和消费者队列
BlockingCollection
为实现 IProducerConsumerCollection<T> 的线程安全集合提供阻塞和限制功能.
>> Take
>> Add
有这个类型,
扩展知识之 Interlocked
Interlocked https://msdn.microsoft.com/zh-cn/library/system.threading.interlocked(v=vs.110).aspx
Interlocked 为多个线程共享的变量提供原子操作.
>>Add
>>Decrement 以原子操作的形式递减指定变量的值并存储结果.
>>Increment 以原子操作的形式递增指定变量的值并存储结果
- >>Exchange
- >>CompareExchange
- >>Read
个人想法: 和 Redis 的 Increment/Decrement 类似, 部分情况下可以取代 Redis 的 increment/decrement, 提高速度.
扩展知识之 SpinWait
SpinWait https://msdn.microsoft.com/zh-cn/library/system.threading.spinwait.aspx
两阶段提交 https://docs.microsoft.com/zh-cn/dotnet/standard/threading/how-to-use-spinwait-to-implement-a-two-phase-wait-operation
- Monitor https://msdn.microsoft.com/zh-cn/library/system.threading.monitor.aspx
- SpinWait
为基于旋转的等待提供支持.
SpinWait 是一种值类型, 这意味着低级代码可以使用 SpinWait 而不必担心不必要的分配开销. SpinWait 通常不适用于普通应用程序. 在大多数情况下, 您应该使用. NET Framework 提供的同步类, 例如 Monitor
>> SpinOnce
扩展知识之 Queue&Stack
- Queue https://msdn.microsoft.com/zh-cn/library/7977ey2c(v=vs.110).aspx
- Stack https://msdn.microsoft.com/zh-cn/library/3278tedw(v=vs.110).aspx
- Queue<T>
表示先进先出的对象集合, 此类将通用队列实现为循环数组. 存储在队列 < T > 中的对象在一端插入并从另一端移除.
- >Enqueue
- >Dequeue
- >Peek
- Stack<T>
表示具有相同指定类型的实例的可变大小后进先出 (LIFO) 集合.
- >Push
- >Pop
- >PeeK
- ConcurrentQueue <T>
表示线程安全的先进先出的对象集合
ConcurrentStack <T>
表示线程安全的后进先出 (LIFO) 集合
如果需要以与存储在集合中的顺序相同的顺序访问信息, 请使用 Queue <T>. 如果需要以相反的顺序访问信息, 请使用 Stack <T>. 使用 ConcurrentQueue <T > 或 ConcurrentStack <T> 如果您需要同时从多个线程访问该集合.
扩展知识之 Task
TaskCompletionSource https://technet.microsoft.com/zh-cn/library/dd449174(v=vs.110).aspx
基于 Task 的异步模式 -- 全面介绍
TaskCompletionSource 表示未绑定到委托的 Task <TResult > 的生产者端, 通过 Task 属性提供对使用者端的访问.
扩展知识之线程安全的集合
- System.Collections.Concurrent https://msdn.microsoft.com/zh-cn/library/system.collections.concurrent(v=vs.110).aspx
- ConcurrentDictionary https://msdn.microsoft.com/en-us/library/dd287191.aspx
ConcurrentDictionary 对决 Dictionary+Locking https://www.tuicool.com/articles/vaeiie
System.Collections.Concurrent 提供了应在的地方对应的类型在使用几个线程安全的集合类 System.Collections 中和 System.Collections.Generic 命名空间, 只要多线程并发访问的集合.
但是, 通过当前集合实现的其中一个接口访问的成员 (包括扩展方法) 不保证是线程安全的, 并且可能需要由调用者同步.
ConcurrentDictionary: 表示可以由多个线程同时访问的键 / 值对的线程安全集合
对于 ConcurrentDictionary <TKey,TValue > 类上的所有其他操作, 所有这些操作都是原子操作并且是线程安全的. 唯一的例外是接受委托的方法, 即 AddOrUpdate 和 GetOrAdd. 对于字典的修改和写入操作, ConcurrentDictionary <TKey,TValue > 使用细粒度锁定来确保线程安全.(对字典的读取操作是以无锁方式执行的.)但是, 这些方法的委托在锁外部调用, 以避免在锁定下执行未知代码时可能出现的问题. 因此, 这些代理执行的代码不受操作的原子性影响.
扩展知识之网络编程
Socket 微软官方文档 https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socket(v=vs.110).aspx
Socket 博客园
Socket 类提供一组丰富的方法和属性进行网络通信
TCP 协议
- >BeginConnect
- >EndConnect
- >BeginSend
- >EndSend
- >BeginReceive
- >EndReceive
- >BeginAccept
- >EndAccept
UDP 协议
- >BeginSendTo
- >EndSendTo
- >BeginReceiveFromandEndReceiveFrom
扩展知识之线程通知:
- AutoResetEvent https://msdn.microsoft.com/zh-cn/library/system.threading.autoresetevent.aspx
- ManualResetEvent https://msdn.microsoft.com/zh-cn/library/system.threading.manualresetevent(v=vs.110).aspx
- ManualResetEventSlim https://msdn.microsoft.com/zh-cn/library/system.threading.manualreseteventslim.aspx
AutoResetEvent 允许线程通过信令相互通信. 通常, 当线程需要对资源的独占访问时, 可以使用此类.
>Set 释放线程
>WaitOne 等待线程
ManualResetEvent
通知一个或多个等待线程发生了事件
ManualResetEventSlim
当等待时间预期非常短, 并且事件未跨越进程边界时, 您可以使用此类以获得比 ManualResetEvent 更好的性能
扩展知识之依赖注入:
ActivatorUtilities https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/dependency-injection
扩展. net - 使用. netcore 进行依赖注入 https://msdn.microsoft.com/en-us/magazine/mt707534.aspx
服务可以通过两种机制来解析:
IServiceProvider
ActivatorUtilities - 允许在依赖关系注入容器中创建没有服务注册的对象. ActivatorUtilities 用于面向用户的抽象, 例如标记帮助器, MVC 控制器, SignalR 集线器和模型绑定器.
- >ActivatorUtilities.CreateInstance
- >ActivatorUtilities.GetServiceOrCreateInstance
Client 连接代码.
- // 连接代码.
- using (var client = await StartClientWithRetries())
- {
- await DoClientWork(client);
- Console.ReadKey();
- }
重点分析 StartClientWithRetries
UseLocalhostClustering 用来配置连接参数: 端口 / ClusterId/ServiceId 等. 配置一个连接本地 silo 的客户端, 也有其他类型的如: UseServiceProviderFactory,UseStaticClustering
- private static async Task<IClusterClient> StartClientWithRetries()
- {
- attempt = 0;
- IClusterClient client;
- client = new ClientBuilder()
- .UseLocalhostClustering()
- .Configure<ClusterOptions>(options =>
- {
- options.ClusterId = "dev";
- options.ServiceId = "HelloWorldApp";
- })
- .ConfigureLogging(logging => logging.AddConsole())
- .Build();
- await client.Connect(RetryFilter);
- Console.WriteLine("Client successfully connect to silo host");
- return client;
- }
- public async Task Connect(Func<Exception, Task<bool>> retryFilter = null)
- {
- this.ThrowIfDisposedOrAlreadyInitialized();
- using (await this.initLock.LockAsync().ConfigureAwait(false))
- {
- this.ThrowIfDisposedOrAlreadyInitialized();
- if (this.state == LifecycleState.Starting)
- {
- throw new InvalidOperationException("A prior connection attempt failed. This instance must be disposed.");
- }
- this.state = LifecycleState.Starting;
- if (this.runtimeClient is OutsideRuntimeClient orc) await orc.Start(retryFilter).ConfigureAwait(false);
- await this.clusterClientLifecycle.OnStart().ConfigureAwait(false);
- this.state = LifecycleState.Started;
- }
- }
- public async Task Start(Func<Exception, Task<bool>> retryFilter = null)
- {
- // Deliberately avoid capturing the current synchronization context during startup and execute on the default scheduler.
- // This helps to avoid any issues (such as deadlocks) caused by executing with the client's synchronization context/scheduler.
- await Task.Run(() => this.StartInternal(retryFilter)).ConfigureAwait(false);
- logger.Info(ErrorCode.ProxyClient_StartDone, "{0} Started OutsideRuntimeClient with Global Client ID: {1}", BARS, CurrentActivationAddress.ToString() + ", client GUID ID:" + handshakeClientId);
- }
- private async Task StartInternal(Func<Exception, Task<bool>> retryFilter)
- {
- // Initialize the gateway list provider, since information from the cluster is required to successfully
- // initialize subsequent services.
- var initializedGatewayProvider = new[] {false};
- await ExecuteWithRetries(async () =>
- {
- if (!initializedGatewayProvider[0])
- {
- await this.gatewayListProvider.InitializeGatewayListProvider();
- initializedGatewayProvider[0] = true;
- }
- var gateways = await this.gatewayListProvider.GetGateways();
- if (gateways.Count == 0)
- {
- var gatewayProviderType = this.gatewayListProvider.GetType().GetParseableName();
- var err = $"Could not find any gateway in {gatewayProviderType}. Orleans client cannot initialize.";
- logger.Error(ErrorCode.GatewayManager_NoGateways, err);
- throw new SiloUnavailableException(err);
- }
- },
- retryFilter);
- var generation = -SiloAddress.AllocateNewGeneration(); // Client generations are negative
- transport = ActivatorUtilities.CreateInstance<ClientMessageCenter>(this.ServiceProvider, localAddress, generation, handshakeClientId);
- transport.Start();
- CurrentActivationAddress = ActivationAddress.NewActivationAddress(transport.MyAddress, handshakeClientId);
- listeningCts = new CancellationTokenSource();
- var ct = listeningCts.Token;
- listenForMessages = true;
- // Keeping this thread handling it very simple for now. Just queue task on thread pool.
- Task.Run(
- () =>
- {
- while (listenForMessages && !ct.IsCancellationRequested)
- {
- try
- {
- RunClientMessagePump(ct);
- }
- catch (Exception exc)
- {
- logger.Error(ErrorCode.Runtime_Error_100326, "RunClientMessagePump has thrown exception", exc);
- }
- }
- },
- ct).Ignore();
- await ExecuteWithRetries(
- async () => this.GrainTypeResolver = await transport.GetGrainTypeResolver(this.InternalGrainFactory),
- retryFilter);
- this.typeMapRefreshTimer = new AsyncTaskSafeTimer(
- this.logger,
- RefreshGrainTypeResolver,
- null,
- this.typeMapRefreshInterval,
- this.typeMapRefreshInterval);
- ClientStatistics.Start(transport, clientId);
- await ExecuteWithRetries(StreamingInitialize, retryFilter);
- async Task ExecuteWithRetries(Func<Task> task, Func<Exception, Task<bool>> shouldRetry)
- {
- while (true)
- {
- try
- {
- await task();
- return;
- }
- catch (Exception exception) when (shouldRetry != null)
- {
- var retry = await shouldRetry(exception);
- if (!retry) throw;
- }
- }
- }
- }
- public ClientMessageCenter(
- IOptions<GatewayOptions> gatewayOptions,
- IOptions<ClientMessagingOptions> clientMessagingOptions,
- IPAddress localAddress,
- int gen,
- GrainId clientId,
- IGatewayListProvider gatewayListProvider,
- SerializationManager serializationManager,
- IRuntimeClient runtimeClient,
- MessageFactory messageFactory,
- IClusterConnectionStatusListener connectionStatusListener,
- ExecutorService executorService,
- ILoggerFactory loggerFactory,
- IOptions<NetworkingOptions> networkingOptions,
- IOptions<StatisticsOptions> statisticsOptions)
- {
- this.loggerFactory = loggerFactory;
- this.openConnectionTimeout = networkingOptions.Value.OpenConnectionTimeout;
- this.SerializationManager = serializationManager;
- this.executorService = executorService;
- lockable = new object();
- MyAddress = SiloAddress.New(new IPEndPoint(localAddress, 0), gen);
- ClientId = clientId;
- this.RuntimeClient = runtimeClient;
- this.messageFactory = messageFactory;
- this.connectionStatusListener = connectionStatusListener;
- Running = false;
- GatewayManager = new GatewayManager(gatewayOptions.Value, gatewayListProvider, loggerFactory);
- PendingInboundMessages = new BlockingCollection<Message>();
- gatewayConnections = new Dictionary<Uri, GatewayConnection>();
- numMessages = 0;
- grainBuckets = new WeakReference[clientMessagingOptions.Value.ClientSenderBuckets];
- logger = loggerFactory.CreateLogger<ClientMessageCenter>();
- if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Proxy grain client constructed");
- IntValueStatistic.FindOrCreate(
- StatisticNames.CLIENT_CONNECTED_GATEWAY_COUNT,
- () =>
- {
- lock (gatewayConnections)
- {
- return gatewayConnections.Values.Count(conn => conn.IsLive);
- }
- });
- statisticsLevel = statisticsOptions.Value.CollectionLevel;
- if (statisticsLevel.CollectQueueStats())
- {
- queueTracking = new QueueTrackingStatistic("ClientReceiver", statisticsOptions);
- }
- }
- private void RunClientMessagePump(CancellationToken ct)
- {
- incomingMessagesThreadTimeTracking?.OnStartExecution();
- while (listenForMessages)
- {
- var message = transport.WaitMessage(Message.Categories.Application, ct);
- if (message == null) // if wait was cancelled
- break;
- // when we receive the first message, we update the
- // clientId for this client because it may have been modified to
- // include the cluster name
- if (!firstMessageReceived)
- {
- firstMessageReceived = true;
- if (!handshakeClientId.Equals(message.TargetGrain))
- {
- clientId = message.TargetGrain;
- transport.UpdateClientId(clientId);
- CurrentActivationAddress = ActivationAddress.GetAddress(transport.MyAddress, clientId, CurrentActivationAddress.Activation);
- }
- else
- {
- clientId = handshakeClientId;
- }
- }
- switch (message.Direction)
- {
- case Message.Directions.Response:
- {
- ReceiveResponse(message);
- break;
- }
- case Message.Directions.OneWay:
- case Message.Directions.Request:
- {
- this.localObjects.Dispatch(message);
- break;
- }
- default:
- logger.Error(ErrorCode.Runtime_Error_100327, $"Message not supported: {message}.");
- break;
- }
- }
- incomingMessagesThreadTimeTracking?.OnStopExecution();
- }
- public void ReceiveResponse(Message response)
- {
- if (logger.IsEnabled(LogLevel.Trace)) logger.Trace("Received {0}", response);
- // ignore duplicate requests
- if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.DuplicateRequest)
- return;
- CallbackData callbackData;
- var found = callbacks.TryGetValue(response.Id, out callbackData);
- if (found)
- {
- // We need to import the RequestContext here as well.
- // Unfortunately, it is not enough, since CallContext.LogicalGetData will not flow "up" from task completion source into the resolved task.
- // RequestContextExtensions.Import(response.RequestContextData);
- callbackData.DoCallback(response);
- }
- else
- {
- logger.Warn(ErrorCode.Runtime_Error_100011, "No callback for response message:" + response);
- }
- }
- //DoCallBack
- public void DoCallback(Message response)
- {
- if (this.IsCompleted)
- return;
- var requestStatistics = this.shared.RequestStatistics;
- lock (this)
- {
- if (this.IsCompleted)
- return;
- if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.Transient)
- {
- if (this.shared.ShouldResend(this.Message))
- {
- return;
- }
- }
- this.IsCompleted = true;
- if (requestStatistics.CollectApplicationRequestsStats)
- {
- this.stopwatch.Stop();
- }
- this.shared.Unregister(this.Message);
- }
- if (requestStatistics.CollectApplicationRequestsStats)
- {
- requestStatistics.OnAppRequestsEnd(this.stopwatch.Elapsed);
- }
- // do callback outside the CallbackData lock. Just not a good practice to hold a lock for this unrelated operation.
- this.shared.ResponseCallback(response, this.context);
- }
- //this.shared.Unregister(this.Message);
- private async Task LocalObjectMessagePumpAsync(LocalObjectData objectData)
- {
- while (true)
- {
- try
- {
- Message message;
- lock (objectData.Messages)
- {
- if (objectData.Messages.Count == 0)
- {
- objectData.Running = false;
- break;
- }
- message = objectData.Messages.Dequeue();
- }
- if (ExpireMessageIfExpired(message, MessagingStatisticsGroup.Phase.Invoke))
- continue;
- RequestContextExtensions.Import(message.RequestContextData);
- var request = (InvokeMethodRequest)message.GetDeserializedBody(this.serializationManager);
- var targetOb = (IAddressable)objectData.LocalObject.Target;
- object resultObject = null;
- Exception caught = null;
- try
- {
- // exceptions thrown within this scope are not considered to be thrown from user code
- // and not from runtime code.
- var resultPromise = objectData.Invoker.Invoke(targetOb, request);
- if (resultPromise != null) // it will be null for one way messages
- {
- resultObject = await resultPromise;
- }
- }
- catch (Exception exc)
- {
- // the exception needs to be reported in the log or propagated back to the caller.
- caught = exc;
- }
- if (caught != null)
- this.ReportException(message, caught);
- else if (message.Direction != Message.Directions.OneWay)
- this.SendResponseAsync(message, resultObject);
- }
- catch (Exception)
- {
- // ignore, keep looping.
- }
- }
- }
- public void SendMessage(Message msg)
- {
- GatewayConnection gatewayConnection = null;
- bool startRequired = false;
- if (!Running)
- {
- this.logger.Error(ErrorCode.ProxyClient_MsgCtrNotRunning, $"Ignoring {msg} because the Client message center is not running");
- return;
- }
- // If there's a specific gateway specified, use it
- if (msg.TargetSilo != null && GatewayManager.GetLiveGateways().Contains(msg.TargetSilo.ToGatewayUri()))
- {
- Uri addr = msg.TargetSilo.ToGatewayUri();
- lock (lockable)
- {
- if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
- {
- gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, executorService, this.loggerFactory, this.openConnectionTimeout);
- gatewayConnections[addr] = gatewayConnection;
- if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Creating gateway to {0} for pre-addressed message", addr);
- startRequired = true;
- }
- }
- }
- // For untargeted messages to system targets, and for unordered messages, pick a next connection in round robin fashion.
- else if (msg.TargetGrain.IsSystemTarget || msg.IsUnordered)
- {
- // Get the cached list of live gateways.
- // Pick a next gateway name in a round robin fashion.
- // See if we have a live connection to it.
- // If Yes, use it.
- // If not, create a new GatewayConnection and start it.
- // If start fails, we will mark this connection as dead and remove it from the GetCachedLiveGatewayNames.
- lock (lockable)
- {
- int msgNumber = numMessages;
- numMessages = unchecked(numMessages + 1);
- IList<Uri> gatewayNames = GatewayManager.GetLiveGateways();
- int numGateways = gatewayNames.Count;
- if (numGateways == 0)
- {
- RejectMessage(msg, "No gateways available");
- logger.Warn(ErrorCode.ProxyClient_CannotSend, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager);
- return;
- }
- Uri addr = gatewayNames[msgNumber % numGateways];
- if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
- {
- gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout);
- gatewayConnections[addr] = gatewayConnection;
- if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayUnordered, "Creating gateway to {0} for unordered message to grain {1}", addr, msg.TargetGrain);
- startRequired = true;
- }
- // else - Fast path - we've got a live gatewayConnection to use
- }
- }
- // Otherwise, use the buckets to ensure ordering.
- else
- {
- var index = msg.TargetGrain.GetHashCode_Modulo((uint)grainBuckets.Length);
- lock (lockable)
- {
- // Repeated from above, at the declaration of the grainBuckets array:
- // Requests are bucketed by GrainID, so that all requests to a grain get routed through the same bucket.
- // Each bucket holds a (possibly null) weak reference to a GatewayConnection object. That connection instance is used
- // if the WeakReference is non-null, is alive, and points to a live gateway connection. If any of these conditions is
- // false, then a new gateway is selected using the gateway manager, and a new connection established if necessary.
- var weakRef = grainBuckets[index];
- if ((weakRef != null) && weakRef.IsAlive)
- {
- gatewayConnection = weakRef.Target as GatewayConnection;
- }
- if ((gatewayConnection == null) || !gatewayConnection.IsLive)
- {
- var addr = GatewayManager.GetLiveGateway();
- if (addr == null)
- {
- RejectMessage(msg, "No gateways available");
- logger.Warn(ErrorCode.ProxyClient_CannotSend_NoGateway, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager);
- return;
- }
- if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_NewBucketIndex, "Starting new bucket index {0} for ordered messages to grain {1}", index, msg.TargetGrain);
- if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
- {
- gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout);
- gatewayConnections[addr] = gatewayConnection;
- if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayToGrain, "Creating gateway to {0} for message to grain {1}, bucket {2}, grain id hash code {3}X", addr, msg.TargetGrain, index,
- msg.TargetGrain.GetHashCode().ToString("x"));
- startRequired = true;
- }
- grainBuckets[index] = new WeakReference(gatewayConnection);
- }
- }
- }
- if (startRequired)
- {
- gatewayConnection.Start();
- if (!gatewayConnection.IsLive)
- {
- // if failed to start Gateway connection (failed to connect), try sending this msg to another Gateway.
- RejectOrResend(msg);
- return;
- }
- }
- try
- {
- gatewayConnection.QueueRequest(msg);
- if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_QueueRequest, "Sending message {0} via gateway {1}", msg, gatewayConnection.Address);
- }
- catch (InvalidOperationException)
- {
- // This exception can be thrown if the gateway connection we selected was closed since we checked (i.e., we lost the race)
- // If this happens, we reject if the message is targeted to a specific silo, or try again if not
- RejectOrResend(msg);
- }
- }
- public void Connect()
- {
- if (!MsgCenter.Running)
- {
- if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_MsgCtrNotRunning, "Ignoring connection attempt to gateway {0} because the proxy message center is not running", Address);
- return;
- }
- // Yes, we take the lock around a Sleep. The point is to ensure that no more than one thread can try this at a time.
- // There's still a minor problem as written -- if the sending thread and receiving thread both get here, the first one
- // will try to reconnect. eventually do so, and then the other will try to reconnect even though it doesn't have to...
- // Hopefully the initial "if" statement will prevent that.
- lock (Lockable)
- {
- if (!IsLive)
- {
- if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_DeadGateway, "Ignoring connection attempt to gateway {0} because this gateway connection is already marked as non live", Address);
- return; // if the connection is already marked as dead, don't try to reconnect. It has been doomed.
- }
- for (var i = 0; i < ClientMessageCenter.CONNECT_RETRY_COUNT; i++)
- {
- try
- {
- if (Socket != null)
- {
- if (Socket.Connected)
- return;
- MarkAsDisconnected(Socket); // clean up the socket before reconnecting.
- }
- if (lastConnect != new DateTime())
- {
- // We already tried at least once in the past to connect to this GW.
- // If we are no longer connected to this GW and it is no longer in the list returned
- // from the GatewayProvider, consider directly this connection dead.
- if (!MsgCenter.GatewayManager.GetLiveGateways().Contains(Address))
- break;
- // Wait at least ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY before reconnection tries
- var millisecondsSinceLastAttempt = DateTime.UtcNow - lastConnect;
- if (millisecondsSinceLastAttempt < ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY)
- {
- var wait = ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY - millisecondsSinceLastAttempt;
- if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_PauseBeforeRetry, "Pausing for {0} before trying to connect to gateway {1} on trial {2}", wait, Address, i);
- Thread.Sleep(wait);
- }
- }
- lastConnect = DateTime.UtcNow;
- Socket = new Socket(Silo.Endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- Socket.EnableFastpath();
- SocketManager.Connect(Socket, Silo.Endpoint, this.openConnectionTimeout);
- NetworkingStatisticsGroup.OnOpenedGatewayDuplexSocket();
- MsgCenter.OnGatewayConnectionOpen();
- SocketManager.WriteConnectionPreamble(Socket, MsgCenter.ClientId); // Identifies this client
- Log.Info(ErrorCode.ProxyClient_Connected, "Connected to gateway at address {0} on trial {1}.", Address, i);
- return;
- }
- catch (Exception ex)
- {
- Log.Warn(ErrorCode.ProxyClient_CannotConnect, $"Unable to connect to gateway at address {Address} on trial {i} (Exception: {ex.Message})");
- MarkAsDisconnected(Socket);
- }
- }
- // Failed too many times -- give up
- MarkAsDead();
- }
- }
- protected override void Run()
- {
- try
- {
- while (!Cts.IsCancellationRequested)
- {
- int bytesRead = FillBuffer(buffer.BuildReceiveBuffer());
- if (bytesRead == 0)
- {
- continue;
- }
- buffer.UpdateReceivedData(bytesRead);
- Message msg;
- while (buffer.TryDecodeMessage(out msg))
- {
- gatewayConnection.MsgCenter.QueueIncomingMessage(msg);
- if (Log.IsEnabled(LogLevel.Trace)) Log.Trace("Received a message from gateway {0}: {1}", gatewayConnection.Address, msg);
- }
- }
- }
- catch (Exception ex)
- {
- buffer.Reset();
- Log.Warn(ErrorCode.ProxyClientUnhandledExceptionWhileReceiving, $"Unexpected/unhandled exception while receiving: {ex}. Restarting gateway receiver for {gatewayConnection.Address}.", ex);
- throw;
- }
- }
来源: https://www.cnblogs.com/fancunwei/p/9442469.html