Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

很多技术栈在优化过程中都会有更便捷的接入方式,或者接入demo,这次想拿apollo来对比一些例如向量数据库的部署方式,对我说的就是milvus,
apollo如果生产环境部署完全不推荐用这种方式,但是如果为了做个实验,研究下源码还是很方便的,当然前提是有docker环境
首先下载docker-compose配置文件,如果是x86环境就是这个链接,如果是m1这类的就是用这个链接,然后再下载sql文件夹
目录结构差不多是这样

然后在这个目录下执行
sudo docker-compose -f docker-compose-arm64.yml up -d
如果是非m1的话直接用
sudo docker-compose up -d
就好,因为docker-compose默认识别的文件名就是docker-compose.yml
然后看下日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
==== starting service ====
Service logging file is ./service/apollo-service.log
Application is running as root (UID 0). This is considered insecure.
Started [66]
Waiting for config service startup...
Config service started. You may visit http://localhost:8080 for service status now!
Waiting for admin service startup.
Admin service started
==== starting portal ====
Portal logging file is ./portal/apollo-portal.log
Application is running as root (UID 0). This is considered insecure.
Started [211]
Waiting for portal startup...
Portal started. You can visit http://localhost:8070 now!

就表示启动成功了,然后就可以访问后面那个地址 http://localhost:8070 进入控制台,默认用户名apollo,密码是admin
然后我们在应用中想要使用,主要是这个几个配置
第一个就是在resources目录下创建apollo-env.properties
里面是meta server的地址,比如我这边就是

1
dev.meta=http://127.0.0.1:8080

这表示是对应的spring.profiles.activedev的配置
第二步是添加pom依赖

1
2
3
4
5
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>

最后一步是在springboot的启动类添加注解

1
@EnableApolloConfig

然后就可以直接使用@Value注解使用配置的值

请求下这个接口,就可以看到对应的值

然后我们可以在控制台修改下这个值,发布

默认日志也会把这个打印出来

1
2024-08-24 19:38:30.462  INFO 57920 --- [Apollo-Config-1] c.f.a.s.p.AutoUpdateConfigChangeListener : Auto update apollo changed value successfully, new value: apolloDemoValue, key: demo, beanName: demoController, field: com.nicksxs.spbdemo.controller.DemoController.demo

对应的请求也会拿到最新的值

Java在后续版本中添加了虚拟线程,也是类似于php跟go的协程,对应操作系统的线程是在线程基础上模拟了一层子线程的逻辑,因为减少了操作系统的线程上下文切换开销,能够在常规业务场景带了比较大的性能提升,但也并非银弹,不能包治百病
首先安装下jdk 21 版本,需要用 /usr/libexec/java_home 切换下JAVA_HOME
然后在PATH中设置好

1
export JAVA_HOME=$(/usr/libexec/java_home -v 21)

首先是试一下线程版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(200);
for (int i = 0; i < 100000; i++) {
executor.submit(() -> {
try {
// 线程睡眠 10ms,可以等同于模拟业务耗时10ms
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {

}
});
}
executor.close();
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);


耗时5897ms

1
2
3
4
5
6
7
8
9
10
11
12
13
14
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
// 模拟业务处理
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {

}
});
}
executor.close();
System.out.printf("totalMillis:%dms\n", System.currentTimeMillis() - start);


耗时154ms
相对来说还是能快很多的
而核心的虚拟线程实现主要来自于调度

1
2
3
4
5
6
7
8
9
10
11
12
static Thread newVirtualThread(Executor scheduler,
String name,
int characteristics,
Runnable task) {
if (ContinuationSupport.isSupported()) {
return new VirtualThread(scheduler, name, characteristics, task);
} else {
if (scheduler != null)
throw new UnsupportedOperationException();
return new BoundVirtualThread(name, characteristics, task);
}
}

在创建虚拟线程过程中,我们需要去处理调度器,初始时调度器为空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);

// choose scheduler if not specified
if (scheduler == null) {
Thread parent = Thread.currentThread();
if (parent instanceof VirtualThread vparent) {
scheduler = vparent.scheduler;
} else {
scheduler = DEFAULT_SCHEDULER;
}
}

this.scheduler = scheduler;
this.cont = new VThreadContinuation(this, task);
this.runContinuation = this::runContinuation;
}

而这个默认调度器就是

1
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();

对应创建的就是个ForkJoinPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static ForkJoinPool createDefaultScheduler() {
ForkJoinWorkerThreadFactory factory = pool -> {
PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
return AccessController.doPrivileged(pa);
};
PrivilegedAction<ForkJoinPool> pa = () -> {
int parallelism, maxPoolSize, minRunnable;
String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
if (parallelismValue != null) {
parallelism = Integer.parseInt(parallelismValue);
} else {
parallelism = Runtime.getRuntime().availableProcessors();
}
if (maxPoolSizeValue != null) {
maxPoolSize = Integer.parseInt(maxPoolSizeValue);
parallelism = Integer.min(parallelism, maxPoolSize);
} else {
maxPoolSize = Integer.max(parallelism, 256);
}
if (minRunnableValue != null) {
minRunnable = Integer.parseInt(minRunnableValue);
} else {
minRunnable = Integer.max(parallelism / 2, 1);
}
Thread.UncaughtExceptionHandler handler = (t, e) -> { };
boolean asyncMode = true; // FIFO
return new ForkJoinPool(parallelism, factory, handler, asyncMode,
0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
};
return AccessController.doPrivileged(pa);
}

可以看到参数都是通过系统参数获取,或者用系统的cpu数量来决定并行度,主体的逻辑就是既然系统线程开销大,那我就在系统线程内部模拟一个更小颗粒度的,在线程内部进行调度的模型,以此来减少系统切换开销,只不过细节还有很多需要研究,有兴趣的可以留言探讨

最近碰到一个问题,因为一些干扰因素导致排查的时候走了一段歧路,
报错信息是

1
Exception in thread "main" com.alibaba.fastjson.JSONException: scan null error

用一个简单的demo来复现下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public enum DemoEnum {

DEMO1("demo1", "desc");
private String code;

private String desc;

DemoEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
public static DemoEnum getByCode(String code) {
for (DemoEnum demoEnum : values()) {
if (demoEnum.code.equals(code)) {
return demoEnum;
}
}
return null;
}

public String getCode() {
return code;
}

public void setCode(String code) {
this.code = code;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}
}

先定义一个枚举类,然后有个demo类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class FastJsonDemo implements Serializable {

private static final long serialVersionUID = 1131767138182111892L;
private String name;

private Map<DemoEnum, String> map = new HashMap<>();

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Map<DemoEnum, String> getMap() {
return map;
}

public void setMap(Map<DemoEnum, String> map) {
this.map = map;
}
}

其中map的key是上面定义的枚举类,然后在main方法中做一下序列化和反序列化

1
2
3
4
5
6
7
8
9
10
11
public class demo {

public static void main(String[] args) {
FastJsonDemo fastJsonDemo = new FastJsonDemo();
fastJsonDemo.setName("nick");
Map<DemoEnum, String> map = new HashMap<>();
map.put(DemoEnum.getByCode("code"), "null key value");
fastJsonDemo.setMap(map);
FastJsonDemo decodeFastJsonDemo = JSONObject.parseObject(JSONObject.toJSONString(fastJsonDemo), FastJsonDemo.class);
}
}

这样就会出现这个异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Exception in thread "main" com.alibaba.fastjson.JSONException: scan null error
at com.alibaba.fastjson.parser.JSONLexerBase.scanNullOrNew(JSONLexerBase.java:4531)
at com.alibaba.fastjson.parser.JSONLexerBase.nextToken(JSONLexerBase.java:154)
at com.alibaba.fastjson.parser.JSONLexerBase.nextToken(JSONLexerBase.java:358)
at com.alibaba.fastjson.parser.deserializer.MapDeserializer.parseMap(MapDeserializer.java:227)
at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:61)
at com.alibaba.fastjson.parser.deserializer.MapDeserializer.deserialze(MapDeserializer.java:41)
at com.alibaba.fastjson.parser.deserializer.FastjsonASMDeserializer_1_FastJsonDemo.deserialze(Unknown Source)
at com.alibaba.fastjson.parser.deserializer.JavaBeanDeserializer.deserialze(JavaBeanDeserializer.java:269)
at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:671)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:365)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:269)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:488)
at fastjson.demo.main(demo.java:23)

这个异常有两个原因,第一当然是在map中出现了null作为key的数据,第二就是这个key是复杂对象
在反序列化以后就会出现这个异常,因为最初出现这个异常是因为我改了另一个字符串字段,并且会反序列化成json就让我判断出现了误差
这边做一个记录。

上次我们尝试用 towheemilvus 实现了图片的向量化,那么顺势我们就能在这个基础上实现以图搜图
首先我们找一些图片,

然后我们先把他们都向量化,存储在milvus里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from towhee import AutoPipes, AutoConfig, ops
import towhee
import os
from pymilvus import MilvusClient
import json

# 1. 设置一个Milvus客户端
client = MilvusClient(
uri="http://localhost:19530"
)
insert_conf = AutoConfig.load_config('insert_milvus')
insert_conf.collection_name = 'text_image_search'


insert_pipe = AutoPipes.pipeline('insert_milvus', insert_conf)

# 创建图像嵌入管道
image_embedding_pipe = AutoPipes.pipeline('image-embedding')
files = os.listdir("./images")
for file in files:
file_path = os.path.join("./images", file)
if os.path.isfile(file_path):
embedding = image_embedding_pipe(file_path).get()[0]
insert_pipe([file_path, embedding])

然后我们找一张神仙姐姐的其他图片,先把它 embedding 后在 milvus 里进行向量检索

1
2
3
4
5
6
7
8
9
10
11
12
13
image_embedding_pipe = AutoPipes.pipeline('image-embedding')

# 生成嵌入
embedding = image_embedding_pipe('./to_search.jpg').get()[0]
print(embedding)
res = client.search(
collection_name="text_image_search",
data=[embedding],
limit=5,
search_params={"metric_type": "IP", "params": {}}
)
result = json.dumps(res, indent=4)
print(result)

我们检索出来5个结果,

可以通过distance找到距离最近的这个是id=451291280409722600
可以发现也是神仙姐姐的,只是作为参考
to_search 目标图片是

搜索的最短距离就是id对应的图片

这张图片就是神仙姐姐

之前觉得谷歌的以图搜图很厉害,现在似乎这个路径还毕竟清晰了,首先要有图片库,把它们向量化以后存储起来,然后对于目标图片也做向量化,再做检索
那么我们先来做重要的这一步,图片的向量化,因为向量化以后就跟图片没关系了,直接用前面讲到的向量的近似搜索就可以做到以图搜图了
这边我们用到了towhee工具,towhee是个机器学习的pipeline工具,可以做数据源(文件,图片,媒体,文本)–> 模型 –> 向量。
首先我们安装towhee

1
pip install towhee towhee.models

这里有第一个坑,因为有torch依赖,但是目前torch支持的python版本是最高3.9,再往上可能就有问题了
所以我们要先创建一个3.9的环境

1
conda create -n py9 python=3.9

然后我们会遇到第二个坑
就是milvus的客户端

1
pip install pymilvus==2.3.0

再早的客户端会有依赖不支持,更新的客户端会连接不上
然后再在milvus创建一个Collection

然后这个是不对的,因为我使用attu界面创建的,默认第二个字段一定得是向量字段,所以又用了上次的Java代码来创建

1
2
3
4
5
6
7
8
9
10
11
12
13
CreateCollectionReq.CollectionSchema collectionSchema = clientV2.createSchema();
// add two fileds, id and vector
Integer dim = 2048;
collectionSchema.addField(AddFieldReq.builder().fieldName("url").dataType(DataType.VarChar).build());
collectionSchema.addField(AddFieldReq.builder().fieldName("embedding").dataType(DataType.FloatVector).dimension(dim).build());
collectionSchema.addField(AddFieldReq.builder().fieldName("id").dataType(DataType.Int64).isPrimaryKey(Boolean.TRUE).autoID(Boolean.TRUE).description("id").build());

CreateCollectionReq req = CreateCollectionReq
.builder()
.collectionSchema(collectionSchema)
.collectionName("text_image_search")
.dimension(dim).build();
clientV2.createCollection(req);

并且注意字段顺序不能错,这个顺序在towhee的demo中也没地方直接修改
看一下towhee的demo
地址在这
https://towhee.io/tasks/detail/pipeline/text-image-search

1
2
3
4
5
6
7
8
9
10
11
12
13
from towhee import AutoPipes, AutoConfig

# set MilvusInsertConfig for the built-in insert_milvus pipeline
insert_conf = AutoConfig.load_config('insert_milvus')
insert_conf.collection_name = 'text_image_search'

insert_pipe = AutoPipes.pipeline('insert_milvus', insert_conf)

# generate embedding
embedding = image_embedding('./test1.png').get()[0]

# insert text and embedding into Milvus
insert_pipe(['./test1.png', embedding])

第三个坑,这并不能跑起来

1
2
3
 embedding = insert_pipe.image_embedding('./test.jpg').get()[0]
^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'RuntimePipeline' object has no attribute 'image_embedding'

因为找不到这个 image_embedding
估计是这个也是包路径变更了,但是并没有什么文档可以找到,之前LlamaIndex还有点更新指南,后来是问了Claude才知道了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from towhee import AutoPipes, AutoConfig, ops


# set MilvusInsertConfig for the built-in insert_milvus pipeline
insert_conf = AutoConfig.load_config('insert_milvus')
insert_conf.collection_name = 'text_image_search'


insert_pipe = AutoPipes.pipeline('insert_milvus', insert_conf)

# 创建图像嵌入管道
image_embedding_pipe = AutoPipes.pipeline('image-embedding')

# 生成嵌入
embedding = image_embedding_pipe('./test.jpg').get()[0]


# insert text and embedding into Milvus
insert_pipe(['./test.jpg', embedding])

需要从 AutoPipes 中把这个 image-embedding 找出来
然后就是上面说到的字段映射问题,默认是先url,再embedding字段,并且主键字段必须是autoId,不然也会缺少默认值,还是感叹下,工程化的东西还是要工程化的质量保证,否则变更都无从知晓,现在人工智能大热,大家都在追风,只是基础的软件还是要稳扎稳打,这样我们工程人员才能把它们更好的用起来,PS:娃真可爱,但真的好累

0%