微服务系列之服务注册发现 Consul

2023-02-12,,,,

1.为什么需要服务注册发现

  微服务架构中,服务于服务之间内部通信必不可少,比如A服务调用B服务,起初我们的做法是,A服务从配置文件中拿到B服务的IP、端口地址,进行访问,本身是没什么问题的,但是随着业务的复杂性越来越高,会遇到一个最蛋疼的问题,服务A可能依赖很多其他服务,这样就要维护好多个服务的地址,如果某个服务的负载地址换了,服务A就要去跟着更改对应的地址配置,这一看可能也没什么,当服务依赖之间的复杂度在非常高的时候,成批的服务由于某种原因更换地址时,噩梦就来了。

  解决上述问题的办法很简单,服务名肯定是不会变化的,当我们服务之间调用时候,知道对方服务名称就能对此通信就解决了,所以就需要一个中间件来实现如下功能:

    服务启动时,将该服务的地址或者其所在集群的负载地址注册到中间件中;
    当服务A要调用服务B时候,通过服务B的名称,从中间件拉取一个健康的服务B地址,或者服务B的集群负载地址;
    这个中间件要具备很高的可用性、可靠性;

      

2.Consul

  Consul是一款基于Go语言开发的支持多数据中心,分布式高可用的服务注册发现的中间件,支持注册服务的健康检查,并且自带管理后台便于查看维护Consul集群。官方网站

上图是官方的架构图,可见Consul几大核心概念如下:

    Server就是Consul的核心服务端,用来持久化大量服务的注册,并且对client代理提供基于RPC的查询访问,考虑consul的高可用性,官方给出的集群解决方案,最少由3个server组成,进行主从模式,基于RAFT算法来实现强一致性同步数据,这里其中一个server为Leader-server,主要做的事情就是同步服务注册信息的工作,由其他server来提供查询相应与监测工作。
    Client可以理解为是Server集群的代理,我们实际的服务,其实就是通过Client集群来注册与拉取目标服务列表,而Client本身并不做任何持久化工作,Client的主要用来承接大量服务的请求转发和对注册服务进行健康检测(周期可指定)。
    DataCenter数据中心,每一个Consul集群就是一个数据中心,这个我们用到的也不深,我们生产目前就是一个数据中心。

网上讨论很多的问题是Consul官方给出的集群架构很奇怪,有Server集群了,完全可以满足业务了,为什么还要来一层Client呢?我个人的理解如下,但不一定保证对或者完全,大家可以自己去深入思考下:

    由于Server的主要工作是持久化存储,与提供查询,并保证高可用性主从同步消耗资源的工作,而对于注册服务的主动健康检测也是非常非常重要,所以这里边把职责拆分,利用无状态的client代理集群来做这件事;
    对于大型微服务架构来说,拉取目标服务列表的查询动作会很多,Client代理可以进行适当的聚合查询甚至利用缓存来减少对Server层的性能开销;

3.基于docker的consul集群搭建

  本文采用3Server+2Client的方式搭建

    相关参数解释:
    agent:启动agent进程,必须开启,Agent的工作是维护成员关系信息、注册服务、健康检查、响应查询等等;
    -server:启动server模式;
    -bootstrap-expect:简单意思,就是server集群需要最少多少可以正常选举leader;
    -ui:运行web管理界面;
    -node:节点名称,集群中必须唯一;
    -client:绑定客户端接口地址,0.0.0.0标识所有地址都可以访问;
    -datacenter:数据中心名称;
    -join:表示加入某一个集群;

1)启动server1

docker run -d -p 8501:8500 -v /e/docker/consul/data/server1:/consul/data -v /e/docker/consul/conf/server1:/consul/config -e CONSUL_BIND_INTERFACE='eth0' --privileged=true --name=consul_server_1 consul agent -server -bootstrap-expect=3 -ui -node=node_consul_server_1 -client='0.0.0.0' -data-dir /consul/data -config-dir /consul/config -datacenter=dc1

2)查询一下server1的容器ip

docker exec consinspect -f '{{.NetworkSettings.IPAddress}}' consul_server_1

3)启动server2

docker run -d -p 8502:8500 -v /e/docker/consul/data/server2:/consul/data -v /e/docker/consul/conf/server2:/consul/config -e CONSUL_BIND_INTERFACE='eth0' --privileged=true --name=consul_server_2 consul agent -server -ui -node=consul_server_2 -client='0.0.0.0' -data-dir /consul/data -config-dir /consul/config -datacenter=dc1 -join 172.17.0.5

4)启动server4

docker run -d -p 8503:8500 --restart=always -v /e/docker/consul/data/server3:/consul/data -v /e/docker/consul/conf/server3:/consul/config -e CONSUL_BIND_INTERFACE='eth0' --privileged=true --name=consul_server_3 consul agent -server -ui -node=consul_server_3 -client='0.0.0.0' -data-dir /consul/data -config-dir /consul/config -datacenter=dc1 -join 172.17.0.5

5)列出server集群

docker exec consexec consul_server_1 consul operator raft list-peers

6)启动client1

docker run -d -p 8504:8500 -v /e/docker/consul/conf/client1:/consul/config -e CONSUL_BIND_INTERFACE='eth0' --name=consul_client_1 consul agent -node=consul_client_1 -client='0.0.0.0' -config-dir /consul/config -datacenter=dc1 -join 172.17.0.5

7)启动client2

docker run -d -p 8505:8500 -v /e/docker/consul/conf/client1:/consul/config -e CONSUL_BIND_INTERFACE='eth0' --name=consul_client_2 consul agent -node=consul_client_2 -client='0.0.0.0' -config-dir /consul/config -datacenter=dc1 -join 172.17.0.5

都运行起来后,运行docker conscontainer ls -a

全部运行成功

8)运行管理后台,输入http://xxxxxx:8501/ui/dc1/nodes

标识所有节点正常。

4. .net core使用consul

代码里面用到了apollo配置中心

    服务注册,新建.net core 3.1项目,nuget添加Consul,核心代码如下
/// <summary>
/// 注册Consul,使用在startup中Configure最后面添加app.RegisterConsul(lifetime,configuration),只能使用内网IP注册,如果需要联调请使用ConsulRegister注册
/// </summary>
/// <param name="app"></param>
/// <param name="lifetime"></param>
/// <param name="configuration">配置文件</param>
/// <param name="serviceName">服务名称</param>
/// <param name="servicePort">服务端口</param>
/// <param name="serviceHealthPath">服务健康检查地址 默认/api/healthcheck</param>
/// <param name="runtimeEnv">服务执行环境,默认apollo["Meta:Env"]</param>
/// <param name="tagList">标签列表</param>
/// <returns></returns>
public static IApplicationBuilder RegisterConsul(this IApplicationBuilder app, IHostApplicationLifetime lifetime, IConfiguration configuration, string serviceName = "", int servicePort = 0, string serviceHealthPath = "", string runtimeEnv = "", List<string> tagList = null)
{
Random rnd = new Random(); string[] clientIps = configuration["Consul:ClientRegisterAddress"].Split(","); var serviceAddress = System.Net.Dns.GetHostEntry(System.Net.Dns.GetHostName()).AddressList.FirstOrDefault(p => p.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork)?.ToString(); foreach (var clientIp in clientIps)
{
//创建Consul客户端
ConsulClient consulClient = new ConsulClient(c =>
{
//请求注册的 Consul 地址(你要注册到哪个consul服务就填写哪个)
c.Address = new Uri($"http://{clientIp}/");
}); //服务名称
serviceName = string.IsNullOrEmpty(serviceName.Trim()) ? configuration["Consul:ServiceName"] : serviceName.Trim();
if (string.IsNullOrEmpty(serviceName))
{
serviceName = $"core-service-{Guid.NewGuid()}";
} if (servicePort == 0)
{
//服务接口
var servicePortStr = configuration["Consul:ServicePort"];
if (string.IsNullOrEmpty(servicePortStr))
{
servicePortStr = "5000";
}
servicePort = Convert.ToInt32(servicePortStr);
} //健康检查地址
serviceHealthPath = string.IsNullOrEmpty(serviceHealthPath.Trim()) ? configuration["Consul:ServiceHealthPath"] : serviceHealthPath.Trim();
if (string.IsNullOrEmpty(serviceHealthPath))
{
serviceHealthPath = "/api/healthcheck";
} var httpCheck = new AgentServiceCheck()
{
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服务启动多久后注册,在遇到异常后关闭自身服务通道,失败后多久移除
Interval = TimeSpan.FromSeconds(15),//健康检查时间间隔,或者称为心跳间隔 间隔15s一次
HTTP = $"http://{serviceAddress}:{servicePort}{serviceHealthPath}",
Timeout = TimeSpan.FromSeconds(3) //多少秒为超时
}; List<string> tags = new List<string>(); runtimeEnv = string.IsNullOrEmpty(runtimeEnv.Trim()) ? configuration["Meta:Env"] : runtimeEnv.Trim();
if (!string.IsNullOrEmpty(runtimeEnv))
{
tags.Add(runtimeEnv);
tags.Add($"group={runtimeEnv}");
} if (tagList != null && tagList.Count > 0)
{
foreach (var item in tagList)
{
tags.Add(item);
}
}
var registration = new AgentServiceRegistration()
{
//ID = Guid.NewGuid().ToString(),
ID = $"{serviceName}-{serviceAddress.Replace(".", "-")}-{servicePort}",
Name = serviceName, // 服务名
Address = serviceAddress, // 服务绑定IP
Port = servicePort, // 服务绑定端口
Tags = tags.ToArray(),//添加 urlprefix-/servicename 格式的 tag 标签,以便 Fabio 识别
Checks = new[] { httpCheck }
}; //服务启动时注册,内部实现其实就是使用 Consul API 进行注册(HttpClient发起)
consulClient.Agent.ServiceRegister(registration).Wait();
//服务停止时取消注册
lifetime.ApplicationStopping.Register(() =>
{
consulClient.Agent.ServiceDeregister(registration.ID).Wait();
});
}
return app;
}
}
app.RegisterConsul(lifetime, Configuration, Configuration["Consul:ServiceName:MyDockerApi"]);

服务启动后,注册成功后如下几张图

2.服务发现,核心代码如下

/// <summary>
/// 获取服务其中一个
/// </summary>
/// <returns></returns>
private async Task<Tuple<bool, string>> GetConsulServiceUrl(string serviceName, string tagSign)
{
Random rnd = new Random();
string[] ips = _configuration["Consul:ClientRegisterAddress"].Split(",");
var consulAddress = ips[rnd.Next(ips.Length)];
_logger.LogInformation($"Consul服务->{serviceName},地址->{consulAddress}"); var consuleClient = new ConsulClient(consulConfig =>
{
consulConfig.Address = new Uri($"http://{consulAddress}/");
});
var result = new List<string>(); //获取健康的服务
var queryResult = await consuleClient.Health.Service(serviceName, string.Empty, true); var healthList = queryResult.Response.ToList(); _logger.LogInformation($"Consul服务->{serviceName},地址->{consulAddress},查询到可用ip"); if (!string.IsNullOrEmpty(tagSign))
{
healthList = queryResult.Response.Where(p => p.Service.Tags.Contains(tagSign)).ToList();
}
foreach (var serviceEntry in healthList)
{
result.Add($"http://{serviceEntry.Service.Address}:{serviceEntry.Service.Port}");
} if (result.Count > 0)
{
string consulCacheKey = $"cache_{_consulCacheKey}{serviceName}"; CacheHelper.CacheInsertAddMinutes(consulCacheKey, result, 600); return await ServiceRequestHost(result, serviceName); }
else
{
return new Tuple<bool, string>(false, "未找到服务");
}
}
/// <summary>
/// consul服务Get请求
/// </summary>
/// <param name="httpClient">http请求</param>
/// <param name="requestPath">请求的地址</param>
/// <param name="serviceName">服务名称</param>
/// <param name="serviceTag">服务标志</param>
/// <param name="dicParam">url参数</param>
/// <param name="isNeedLocalDebug">是否需要本机调试,如果需要,请在consul_service_ip配置服务IP,命名是服务名称-host 例:accountbook-service-host</param>
/// <returns></returns>
private async Task<HttpResponseMessage> ConsulHttpClientGetByHttpResponseMessage(HttpClient httpClient, string serviceName, string serviceTag, string requestPath, Dictionary<string, string> dicParam, bool isNeedLocalDebug = false)
{
_logger.LogInformation($"Consul服务get请求参数{serviceName},{requestPath},结果为HttpResponseMessage"); if (!requestPath.StartsWith("/"))
{
requestPath = "/" + requestPath;
} int n = 0;
again:
var serviceHost = await GetHost(serviceName, serviceTag, isNeedLocalDebug); if (serviceHost.Item1)
{
try
{
if (dicParam != null && dicParam.Count > 0)
{
if (requestPath.Contains("?"))
{
requestPath = requestPath + "&" + string.Join("&", dicParam.Select(x => $"{x.Key}={x.Value}"));
}
else
{
requestPath = requestPath + "?" + string.Join("&", dicParam.Select(x => $"{x.Key}={x.Value}"));
}
} string httpGetUrl = $"{serviceHost.Item2}{requestPath}"; var response = await httpClient.GetAsync(httpGetUrl); if (!response.IsSuccessStatusCode)
{
n = n + 1;
if (n < 2)
{
await ConsulCacheRemove(serviceName);
goto again;
}
} return response;
}
catch (Exception ex)
{
_logger.LogError($"{JsonConvert.SerializeObject(serviceHost)}-{serviceName}-异常{JsonConvert.SerializeObject(ex)}");
n = n + 1;
if (n < 2)
{
await ConsulCacheRemove(serviceName);
goto again;
}
return null;
}
}
else
{
_logger.LogInformation($"{serviceName}-{JsonConvert.SerializeObject(serviceHost)}");
return null;
}
}
 /// <summary>
/// 通过consul调用服务
/// </summary>
/// <returns></returns>
[HttpGet("getby/consul")]
public async Task<IActionResult> GetByConsul()
{
var serverHost = configuration["Consul:ServiceName:MyDockerApi"];
var response = await consulServiceProxy.ConsulGetByHttpResponseMessage(serverHost, "", "api/demo/get/configs", null);
var res = "";
if (response.IsSuccessStatusCode)
res = await response.Content.ReadAsStringAsync();
return Ok($"通过consul请求结果:{res}");
} [HttpGet("get/configs")]
public async Task<IActionResult> GetConfigs()
{
var res = "";
//获取公共命名空间下的配置
var publicConfig = configuration["env"];
//获取该私有项目下的配置
var privateConfig = configuration["myConfig1"];
res = $"公共:{publicConfig},私有:{privateConfig}";
return Ok(res); }

我这边懒了,就用了1个服务,自己注册,自己发现自己。

.net core集成consul大概就是这个样子了

5.问题

    调试问题,我这个使用的腾讯云,并且基于docker部署的,由于注册时候是注册的docker ip,我本机的网络并没有与腾讯云打通,所以我只能部署到云上面请求,正常情况下,公司里边运维会打通腾讯云你们的测试环境集群网络与你们公司本地环境的网络,这样就可以调试了,但是也可能人家会隔离你们的本地网络与docker的直接通信,这时候运维会提供该服务的docker单点对外IP或者集群的对外负载IP,用于你们本机调试。我们公司就是这样目前,当然代码里需要一些判断。

    注册docker ip还是该服务的负载ip,都可以,我们就是注册的docker ip所以在服务发现的时候,会拉取一个服务列表,通过轮询算法,来取其中一个。

微服务系列之服务注册发现 Consul的相关教程结束。

《微服务系列之服务注册发现 Consul.doc》

下载本文的Word格式文档,以方便收藏与打印。