What hurts more, the pain of hard work or the pain of regret?

0%

看完了村上春树的《1Q84》,这应该是第五本看的他的书了,继 跑步,挪威的森林,刺杀骑士团长,海边的卡夫卡之后,不是其中最长的,好像是海边的卡夫卡还是刺杀骑士团长比较长一点,都是在微信读书上看的,比较方便,最开始在上面看的是高晓松的《鱼羊野史》,不知道为啥取这个名字,但是还是满吸引我的,不过由于去年的种种,没有很多心思把它看完,而且本身的组织形式就是比较松散的,看到哪算哪,其实一些野史部分是我比较喜欢,有些谈到人物的就不太有兴趣,而且类似于大祥哥吃的东西,反正都是哇,怎么这么好吃,嗯,太爱(niu)你(bi)了,高晓松就是这个人是我最喜欢的 xxx 家,我也没去细究过他有没有说重复过,反正是不太爱,后来因为这书还一度对战争史有了浓厚的兴趣,然而事实告诉我,大部头的战争史,其实正史我是真的啃不下去,我可能只对其中 10%的内容感兴趣,不过终于也在今年把它看完了,好像高晓松的晓说也最终季了,貌似其中讲朝鲜战争的还被和谐了,看样子是说出了一些故事(truth)。

本来只是想把 《1Q84》的读后感写下,现在觉得还是把这篇当成我今年的读书总结吧,不过先从《1Q84》说起。

严格来讲,这不是很书面化的读后感,可能我想写的也只是像聊天一样的说下我读过的书,包括的技术博客其实也是类似的,以后或许会转变,但是目前水平如此吧,写多了可能会变好,也可能不会。

开始正文吧,这书有点类似于海边的卡夫卡,一开始是通过两条故事线,穿插着叙述,一条是青豆的,不算是个职业杀手的女杀手,要去解决一个经常家暴的斯文败类,穿着描述得比较性感吧,杀人方式是通过比较长的细针,从脖子后面一个精巧的位置插入,可以造成是未知原因死亡的假象,可能会推断成心梗之类的,这里有个前置的细节,就是青豆是乘坐一辆很高级的出租车,内饰什么的都非常有质感,有点不像一辆出租车,然后车里放了一首比较小众的歌,雅纳切克的《小交响曲》,但是青豆知道它,这跟后面的情节也有些许关系,这是女主人公青豆的出场;相应的男主的出场印象不是太深刻,男主叫天吾,是个不知名的作家,跟一个叫小松的编辑有比较好的关系,虽然天吾还没有拿到比较有分量的奖项,但是小松很看好他,也让他帮忙审校一个新作家奖的投稿文章,虽然天吾自身还没获得过这个奖,天吾还有个正式工作,是当数学老师,天吾在学生时代是个数学天才,但后面有对文学产生了兴趣,文学还不足以养活自己,靠着教课还是能保持温饱;

接下来是正式故事的起点了,就是小松收到了一部小说投稿,名叫《空气蛹》,是个叫深绘里的女孩子投的稿,小松对他赋予了很高的评价,这里好像记岔了,好像是天吾对这部小说很有好感,但是小松比较怀疑,然后小松看了之后也有了浓厚的兴趣,这里就是开端了,小松想让天吾来重写润色这部《空气蛹》,因为故事本身很有分量,但是描写手法叙事方式等都很拙劣,而天吾正好擅长这个,小松对天吾的评价是,描写技巧无可挑剔,就是故事主体的火花还没际遇迸发,需要一个导火索,这个就可以类比我们程序员,很多比较初中级的程序员主要擅长在原来的代码上修修改改或者给他分配个小功能,比较高级的程序员就需要能做一些项目的架构设计,核心的技术方案设计,以前我也觉得写文档这个比较无聊,但是当一个项目真的比较庞大,复杂的时候,整体和核心部分的架构设计和方案还是需要有文档沉淀的,不然别人不知道没法接受,自己过段时间也会忘记。

对于小松的这个建议,他的初衷是想搅一搅这个死气沉沉套路颇深的文坛,因为本身《空气蛹》这部小说的内容很吸引人,小松想通过天吾的润色补充让这部小说冲击新人奖,有种恶作剧的意图,天吾对此表示很多担心和顾虑,小松的这个建议其实也是一种文学作假,有两方面的担心,一方面是原作者深绘里是否同意如此操作,一方面是外界如果发现了这个事实会有什么样的后果,但是小松表示不用担心,前一步由小松牵线,让天吾跟原作者深绘里当面沟通这个代写是否被允许,结果当然是被允许了,这里有了对深绘里的初步描写,按我的理解是比较仙的感觉,然后语言沟通有些吃力,或者说有她自己的特色,当面沟通时貌似是让深绘里回去再考虑下,然后后面再由天吾去深绘里寄宿的戎野老师家沟通具体的细节。

2019年12月18日23:37:19 更新
去到戎野老师家之后,天吾知道了关于深绘里的一些事情,深绘里的父亲与戎野老师应该是老友,深绘里的父亲在当初成立了一个叫”先驱”的公社,一个独立运行的社会组织,以运营农场作为物资来源,追求更为松散的共同体,即不过分激进地公有制,进行松散的共同生活,承认私有财产,简而言之就是这样一个能稳定存活下来的独立社会组织,但是随着稳定运行,内部的激进派和稳健派开始出现分歧,不可磨合,后来两派就分裂了,深绘里的父亲,深田保留在了稳健派,但是此时其实深田保内心是矛盾的,以为一开始其实是他倡导的独立革命才组织起了这群人,然而现在他又认清了现实社会已经不太相信能通过革命来独立的可能性,后来激进派便开始越加封闭,而且进行军事训练和思想教育,而后这个先驱的激进派别便有了新的名字”黎明”,深绘里也是在此时从先驱逃离来投靠戎野老师
暂时先写到这,未完待续~

今天看了一下 redis 分布式锁 redlock 的实现,简单记录下,

加锁

原先我对 redis 锁的概念就是加锁使用 setnx,解锁使用 lua 脚本,但是 setnx 具体是啥,lua 脚本是啥不是很清楚
首先简单思考下这个问题,首先为啥不是先 get 一下 key 存不存在,然后再 set 一个 key value,因为加锁这个操作我们是要保证两点,一个是不能中途被打断,也就是说要原子性,如果是先 get 一下 key,如果不存在再 set 值的话,那就不是原子操作了;第二个是可不可以直接 set 值呢,显然不行,锁要保证唯一性,有且只能有一个线程或者其他应用单位获得该锁,正好 setnx 给了我们这种原子命令

然后是 setnx 的键和值分别是啥,键比较容易想到是要锁住的资源,比如 user_id, 这里有个我自己之前比较容易陷进去的误区,但是这个误区后
面再说,这里其实是把user_id 作为要锁住的资源,在我获得锁的时候别的线程不允许操作,以此保证业务的正确性,不会被多个线程同时修改,确定了键,再来看看值是啥,其实原先我认为值是啥都没关系,我只要锁住了,光键就够我用了,但是考虑下多个线程的问题,如果我这个线程加了锁,然后我因为 gc 停顿等原因卡死了,这个时候redis 的锁或者说就是 redis 的缓存已经过期了,这时候另一个线程获得锁成功,然后我这个线程又活过来了,然后我就仍然认为我拿着锁,我去对数据进行修改或者释放锁,是不是就出现问题了,所以是不是我们还需要一个东西来区分这个锁是哪个线程加的,所以我们可以将值设置成为一个线程独有识别的值,至少在相对长的一段时间内不会重复。

上面其实还有两个问题,一个是当 gc 超时时,我这个线程如何知道我手里的锁已经过期了,一种方法是我在加好锁之后就维护一个超时时间,这里其实还有个问题,不过跟第二个问题相关,就一起说了,就是设置超时时间,有些对于不是锁的 redis 缓存操作可以是先设置好值,然后在设置过期时间,那么这就又有上面说到的不是原子性的问题,那么就需要在同一条指令里把超时时间也设置了,幸好 redis 提供了这种支持

1
SET resource_name my_random_value NX PX 30000

这里借鉴一下解释下,resource_name就是 key,代表要锁住的东西,my_random_value就是识别我这个线程的,NX代表只有在不存在的时候才设置,然后PX 30000表示超时时间是 30秒自动过期

PS:记录下我原先有的一个误区,是不是要用 key 来区分加锁的线程,这样只有一个用处,就是自身线程可以识别是否是自己加的锁,但是最大的问题是别的线程不知道,其实这个用户的出发点是我在担心前面提过的一个问题,就是当 gc 停顿后,我要去判断当前的这个锁是否是我加的,还有就是当释放锁的时候,如果保证不会错误释放了其他线程加的锁,但是这样附带很多其他问题,最大的就是其他线程怎么知道能不能加这个锁。

解锁

当线程在锁过期之前就处理完了业务逻辑,那就可以提前释放这个锁,那么提前释放要怎么操作,直接del key显然是不行的,因为这样就是我前面想用线程随机值加资源名作为锁的初衷,我不能去释放别的线程加的锁,那么我要怎么办呢,先 get 一下看是不是我的?那又变成非原子的操作了,幸好redis 也考虑到了这个问题,给了lua 脚本来操作这种

1
2
3
4
5
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

这里的KEYS[1]就是前面加锁的resource_name,ARGV[1]就是线程的随机值my_random_value

多节点

前面说的其实是单节点 redis 作为分布式锁的情况,那么当我们的 redis 有多节点的情况呢,如果多节点下处于加锁或者解锁或者锁有效情况下
redis 的某个节点宕掉了怎么办,这里就有一些需要思考的地方,是否单独搞一个单节点的 redis作为分布式锁专用的,但是如果这个单节点的挂了呢,还有就是成本问题,所以我们需要一个多节点的分布式锁方案
这里就引出了开头说到的redlock,这个可是 redis的作者写的, 他的加锁过程是分以下几步去做这个事情

  • 获取当前时间(毫秒数)。
  • 按顺序依次向N个Redis节点执行获取锁的操作。这个获取操作跟前面基于单Redis节点的获取锁的过程相同,包含随机字符串my_random_value,也包含过期时间(比如PX 30000,即锁的有效时间)。为了保证在某个Redis节点不可用的时候算法能够继续运行,这个获取锁的操作还有一个超时时间(time out),它要远小于锁的有效时间(几十毫秒量级)。客户端在向某个Redis节点获取锁失败以后,应该立即尝试下一个Redis节点。这里的失败,应该包含任何类型的失败,比如该Redis节点不可用,或者该Redis节点上的锁已经被其它客户端持有(注:Redlock原文中这里只提到了Redis节点不可用的情况,但也应该包含其它的失败情况)。
  • 计算整个获取锁的过程总共消耗了多长时间,计算方法是用当前时间减去第1步记录的时间。如果客户端从大多数Redis节点(>= N/2+1)成功获取到了锁,并且获取锁总共消耗的时间没有超过锁的有效时间(lock validity time),那么这时客户端才认为最终获取锁成功;否则,认为最终获取锁失败。
  • 如果最终获取锁成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间。
  • 如果最终获取锁失败了(可能由于获取到锁的Redis节点个数少于N/2+1,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该立即向所有Redis节点发起释放锁的操作(即前面介绍的Redis Lua脚本)。
    释放锁的过程比较简单:客户端向所有Redis节点发起释放锁的操作,不管这些节点当时在获取锁的时候成功与否。这里为什么要向所有的节点发送释放锁的操作呢,这里是因为有部分的节点的失败原因可能是加锁时阻塞,加锁成功的结果没有及时返回,所以为了防止这种情况还是需要向所有发起这个释放锁的操作。
    初步记录就先到这。

对 Java 的 gc 实现比较感兴趣,原先一般都是看周志明的书,但其实并没有讲具体的 gc 源码,而是把整个思路和流程讲解了一下
特别是 G1 的具体实现
一般对 G1 的理解其实就是把原先整块的新生代老年代分成了以 region 为单位的小块内存,简而言之,就是原先对新生代老年代的收集会涉及到整个代的堆内存空间,而G1 把它变成了更细致的小块内存
这带来了一个很明显的好处和一个很明显的坏处,好处是内存收集可以更灵活,耗时会变短,但整个收集的处理复杂度就变高了
目前看了一点点关于 G1 收集的预期时间相关的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
HeapWord* G1CollectedHeap::do_collection_pause(size_t word_size,
uint gc_count_before,
bool* succeeded,
GCCause::Cause gc_cause) {
assert_heap_not_locked_and_not_at_safepoint();
VM_G1CollectForAllocation op(word_size,
gc_count_before,
gc_cause,
false, /* should_initiate_conc_mark */
g1_policy()->max_pause_time_ms());
VMThread::execute(&op);

HeapWord* result = op.result();
bool ret_succeeded = op.prologue_succeeded() && op.pause_succeeded();
assert(result == NULL || ret_succeeded,
"the result should be NULL if the VM did not succeed");
*succeeded = ret_succeeded;

assert_heap_not_locked();
return result;
}

这里就是收集时需要停顿的,其中VMThread::execute(&op);是具体执行的,真正执行的是VM_G1CollectForAllocation::doit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
void VM_G1CollectForAllocation::doit() {
G1CollectedHeap* g1h = G1CollectedHeap::heap();
assert(!_should_initiate_conc_mark || g1h->should_do_concurrent_full_gc(_gc_cause),
"only a GC locker, a System.gc(), stats update, whitebox, or a hum allocation induced GC should start a cycle");

if (_word_size > 0) {
// An allocation has been requested. So, try to do that first.
_result = g1h->attempt_allocation_at_safepoint(_word_size,
false /* expect_null_cur_alloc_region */);
if (_result != NULL) {
// If we can successfully allocate before we actually do the
// pause then we will consider this pause successful.
_pause_succeeded = true;
return;
}
}

GCCauseSetter x(g1h, _gc_cause);
if (_should_initiate_conc_mark) {
// It's safer to read old_marking_cycles_completed() here, given
// that noone else will be updating it concurrently. Since we'll
// only need it if we're initiating a marking cycle, no point in
// setting it earlier.
_old_marking_cycles_completed_before = g1h->old_marking_cycles_completed();

// At this point we are supposed to start a concurrent cycle. We
// will do so if one is not already in progress.
bool res = g1h->g1_policy()->force_initial_mark_if_outside_cycle(_gc_cause);

// The above routine returns true if we were able to force the
// next GC pause to be an initial mark; it returns false if a
// marking cycle is already in progress.
//
// If a marking cycle is already in progress just return and skip the
// pause below - if the reason for requesting this initial mark pause
// was due to a System.gc() then the requesting thread should block in
// doit_epilogue() until the marking cycle is complete.
//
// If this initial mark pause was requested as part of a humongous
// allocation then we know that the marking cycle must just have
// been started by another thread (possibly also allocating a humongous
// object) as there was no active marking cycle when the requesting
// thread checked before calling collect() in
// attempt_allocation_humongous(). Retrying the GC, in this case,
// will cause the requesting thread to spin inside collect() until the
// just started marking cycle is complete - which may be a while. So
// we do NOT retry the GC.
if (!res) {
assert(_word_size == 0, "Concurrent Full GC/Humongous Object IM shouldn't be allocating");
if (_gc_cause != GCCause::_g1_humongous_allocation) {
_should_retry_gc = true;
}
return;
}
}

// Try a partial collection of some kind.
_pause_succeeded = g1h->do_collection_pause_at_safepoint(_target_pause_time_ms);

if (_pause_succeeded) {
if (_word_size > 0) {
// An allocation had been requested. Do it, eventually trying a stronger
// kind of GC.
_result = g1h->satisfy_failed_allocation(_word_size, &_pause_succeeded);
} else {
bool should_upgrade_to_full = !g1h->should_do_concurrent_full_gc(_gc_cause) &&
!g1h->has_regions_left_for_allocation();
if (should_upgrade_to_full) {
// There has been a request to perform a GC to free some space. We have no
// information on how much memory has been asked for. In case there are
// absolutely no regions left to allocate into, do a maximally compacting full GC.
log_info(gc, ergo)("Attempting maximally compacting collection");
_pause_succeeded = g1h->do_full_collection(false, /* explicit gc */
true /* clear_all_soft_refs */);
}
}
guarantee(_pause_succeeded, "Elevated collections during the safepoint must always succeed.");
} else {
assert(_result == NULL, "invariant");
// The only reason for the pause to not be successful is that, the GC locker is
// active (or has become active since the prologue was executed). In this case
// we should retry the pause after waiting for the GC locker to become inactive.
_should_retry_gc = true;
}
}

这里可以看到核心的是G1CollectedHeap::do_collection_pause_at_safepoint这个方法,它带上了目标暂停时间的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
G1CollectedHeap::do_collection_pause_at_safepoint(double target_pause_time_ms) {
assert_at_safepoint_on_vm_thread();
guarantee(!is_gc_active(), "collection is not reentrant");

if (GCLocker::check_active_before_gc()) {
return false;
}

_gc_timer_stw->register_gc_start();

GCIdMark gc_id_mark;
_gc_tracer_stw->report_gc_start(gc_cause(), _gc_timer_stw->gc_start());

SvcGCMarker sgcm(SvcGCMarker::MINOR);
ResourceMark rm;

g1_policy()->note_gc_start();

wait_for_root_region_scanning();

print_heap_before_gc();
print_heap_regions();
trace_heap_before_gc(_gc_tracer_stw);

_verifier->verify_region_sets_optional();
_verifier->verify_dirty_young_regions();

// We should not be doing initial mark unless the conc mark thread is running
if (!_cm_thread->should_terminate()) {
// This call will decide whether this pause is an initial-mark
// pause. If it is, in_initial_mark_gc() will return true
// for the duration of this pause.
g1_policy()->decide_on_conc_mark_initiation();
}

// We do not allow initial-mark to be piggy-backed on a mixed GC.
assert(!collector_state()->in_initial_mark_gc() ||
collector_state()->in_young_only_phase(), "sanity");

// We also do not allow mixed GCs during marking.
assert(!collector_state()->mark_or_rebuild_in_progress() || collector_state()->in_young_only_phase(), "sanity");

// Record whether this pause is an initial mark. When the current
// thread has completed its logging output and it's safe to signal
// the CM thread, the flag's value in the policy has been reset.
bool should_start_conc_mark = collector_state()->in_initial_mark_gc();

// Inner scope for scope based logging, timers, and stats collection
{
EvacuationInfo evacuation_info;

if (collector_state()->in_initial_mark_gc()) {
// We are about to start a marking cycle, so we increment the
// full collection counter.
increment_old_marking_cycles_started();
_cm->gc_tracer_cm()->set_gc_cause(gc_cause());
}

_gc_tracer_stw->report_yc_type(collector_state()->yc_type());

GCTraceCPUTime tcpu;

G1HeapVerifier::G1VerifyType verify_type;
FormatBuffer<> gc_string("Pause Young ");
if (collector_state()->in_initial_mark_gc()) {
gc_string.append("(Concurrent Start)");
verify_type = G1HeapVerifier::G1VerifyConcurrentStart;
} else if (collector_state()->in_young_only_phase()) {
if (collector_state()->in_young_gc_before_mixed()) {
gc_string.append("(Prepare Mixed)");
} else {
gc_string.append("(Normal)");
}
verify_type = G1HeapVerifier::G1VerifyYoungNormal;
} else {
gc_string.append("(Mixed)");
verify_type = G1HeapVerifier::G1VerifyMixed;
}
GCTraceTime(Info, gc) tm(gc_string, NULL, gc_cause(), true);

uint active_workers = AdaptiveSizePolicy::calc_active_workers(workers()->total_workers(),
workers()->active_workers(),
Threads::number_of_non_daemon_threads());
active_workers = workers()->update_active_workers(active_workers);
log_info(gc,task)("Using %u workers of %u for evacuation", active_workers, workers()->total_workers());

TraceCollectorStats tcs(g1mm()->incremental_collection_counters());
TraceMemoryManagerStats tms(&_memory_manager, gc_cause(),
collector_state()->yc_type() == Mixed /* allMemoryPoolsAffected */);

G1HeapTransition heap_transition(this);
size_t heap_used_bytes_before_gc = used();

// Don't dynamically change the number of GC threads this early. A value of
// 0 is used to indicate serial work. When parallel work is done,
// it will be set.

{ // Call to jvmpi::post_class_unload_events must occur outside of active GC
IsGCActiveMark x;

gc_prologue(false);

if (VerifyRememberedSets) {
log_info(gc, verify)("[Verifying RemSets before GC]");
VerifyRegionRemSetClosure v_cl;
heap_region_iterate(&v_cl);
}

_verifier->verify_before_gc(verify_type);

_verifier->check_bitmaps("GC Start");

#if COMPILER2_OR_JVMCI
DerivedPointerTable::clear();
#endif

// Please see comment in g1CollectedHeap.hpp and
// G1CollectedHeap::ref_processing_init() to see how
// reference processing currently works in G1.

// Enable discovery in the STW reference processor
_ref_processor_stw->enable_discovery();

{
// We want to temporarily turn off discovery by the
// CM ref processor, if necessary, and turn it back on
// on again later if we do. Using a scoped
// NoRefDiscovery object will do this.
NoRefDiscovery no_cm_discovery(_ref_processor_cm);

// Forget the current alloc region (we might even choose it to be part
// of the collection set!).
_allocator->release_mutator_alloc_region();

// This timing is only used by the ergonomics to handle our pause target.
// It is unclear why this should not include the full pause. We will
// investigate this in CR 7178365.
//
// Preserving the old comment here if that helps the investigation:
//
// The elapsed time induced by the start time below deliberately elides
// the possible verification above.
double sample_start_time_sec = os::elapsedTime();

g1_policy()->record_collection_pause_start(sample_start_time_sec);

if (collector_state()->in_initial_mark_gc()) {
concurrent_mark()->pre_initial_mark();
}

g1_policy()->finalize_collection_set(target_pause_time_ms, &_survivor);

evacuation_info.set_collectionset_regions(collection_set()->region_length());

// Make sure the remembered sets are up to date. This needs to be
// done before register_humongous_regions_with_cset(), because the
// remembered sets are used there to choose eager reclaim candidates.
// If the remembered sets are not up to date we might miss some
// entries that need to be handled.
g1_rem_set()->cleanupHRRS();

register_humongous_regions_with_cset();

assert(_verifier->check_cset_fast_test(), "Inconsistency in the InCSetState table.");

// We call this after finalize_cset() to
// ensure that the CSet has been finalized.
_cm->verify_no_cset_oops();

if (_hr_printer.is_active()) {
G1PrintCollectionSetClosure cl(&_hr_printer);
_collection_set.iterate(&cl);
}

// Initialize the GC alloc regions.
_allocator->init_gc_alloc_regions(evacuation_info);

G1ParScanThreadStateSet per_thread_states(this, workers()->active_workers(), collection_set()->young_region_length());
pre_evacuate_collection_set();

// Actually do the work...
evacuate_collection_set(&per_thread_states);

post_evacuate_collection_set(evacuation_info, &per_thread_states);

const size_t* surviving_young_words = per_thread_states.surviving_young_words();
free_collection_set(&_collection_set, evacuation_info, surviving_young_words);

eagerly_reclaim_humongous_regions();

record_obj_copy_mem_stats();
_survivor_evac_stats.adjust_desired_plab_sz();
_old_evac_stats.adjust_desired_plab_sz();

double start = os::elapsedTime();
start_new_collection_set();
g1_policy()->phase_times()->record_start_new_cset_time_ms((os::elapsedTime() - start) * 1000.0);

if (evacuation_failed()) {
set_used(recalculate_used());
if (_archive_allocator != NULL) {
_archive_allocator->clear_used();
}
for (uint i = 0; i < ParallelGCThreads; i++) {
if (_evacuation_failed_info_array[i].has_failed()) {
_gc_tracer_stw->report_evacuation_failed(_evacuation_failed_info_array[i]);
}
}
} else {
// The "used" of the the collection set have already been subtracted
// when they were freed. Add in the bytes evacuated.
increase_used(g1_policy()->bytes_copied_during_gc());
}

if (collector_state()->in_initial_mark_gc()) {
// We have to do this before we notify the CM threads that
// they can start working to make sure that all the
// appropriate initialization is done on the CM object.
concurrent_mark()->post_initial_mark();
// Note that we don't actually trigger the CM thread at
// this point. We do that later when we're sure that
// the current thread has completed its logging output.
}

allocate_dummy_regions();

_allocator->init_mutator_alloc_region();

{
size_t expand_bytes = _heap_sizing_policy->expansion_amount();
if (expand_bytes > 0) {
size_t bytes_before = capacity();
// No need for an ergo logging here,
// expansion_amount() does this when it returns a value > 0.
double expand_ms;
if (!expand(expand_bytes, _workers, &expand_ms)) {
// We failed to expand the heap. Cannot do anything about it.
}
g1_policy()->phase_times()->record_expand_heap_time(expand_ms);
}
}

// We redo the verification but now wrt to the new CSet which
// has just got initialized after the previous CSet was freed.
_cm->verify_no_cset_oops();

// This timing is only used by the ergonomics to handle our pause target.
// It is unclear why this should not include the full pause. We will
// investigate this in CR 7178365.
double sample_end_time_sec = os::elapsedTime();
double pause_time_ms = (sample_end_time_sec - sample_start_time_sec) * MILLIUNITS;
size_t total_cards_scanned = g1_policy()->phase_times()->sum_thread_work_items(G1GCPhaseTimes::ScanRS, G1GCPhaseTimes::ScanRSScannedCards);
g1_policy()->record_collection_pause_end(pause_time_ms, total_cards_scanned, heap_used_bytes_before_gc);

evacuation_info.set_collectionset_used_before(collection_set()->bytes_used_before());
evacuation_info.set_bytes_copied(g1_policy()->bytes_copied_during_gc());

if (VerifyRememberedSets) {
log_info(gc, verify)("[Verifying RemSets after GC]");
VerifyRegionRemSetClosure v_cl;
heap_region_iterate(&v_cl);
}

_verifier->verify_after_gc(verify_type);
_verifier->check_bitmaps("GC End");

assert(!_ref_processor_stw->discovery_enabled(), "Postcondition");
_ref_processor_stw->verify_no_references_recorded();

// CM reference discovery will be re-enabled if necessary.
}

#ifdef TRACESPINNING
ParallelTaskTerminator::print_termination_counts();
#endif

gc_epilogue(false);
}

// Print the remainder of the GC log output.
if (evacuation_failed()) {
log_info(gc)("To-space exhausted");
}

g1_policy()->print_phases();
heap_transition.print();

// It is not yet to safe to tell the concurrent mark to
// start as we have some optional output below. We don't want the
// output from the concurrent mark thread interfering with this
// logging output either.

_hrm.verify_optional();
_verifier->verify_region_sets_optional();

TASKQUEUE_STATS_ONLY(print_taskqueue_stats());
TASKQUEUE_STATS_ONLY(reset_taskqueue_stats());

print_heap_after_gc();
print_heap_regions();
trace_heap_after_gc(_gc_tracer_stw);

// We must call G1MonitoringSupport::update_sizes() in the same scoping level
// as an active TraceMemoryManagerStats object (i.e. before the destructor for the
// TraceMemoryManagerStats is called) so that the G1 memory pools are updated
// before any GC notifications are raised.
g1mm()->update_sizes();

_gc_tracer_stw->report_evacuation_info(&evacuation_info);
_gc_tracer_stw->report_tenuring_threshold(_g1_policy->tenuring_threshold());
_gc_timer_stw->register_gc_end();
_gc_tracer_stw->report_gc_end(_gc_timer_stw->gc_end(), _gc_timer_stw->time_partitions());
}
// It should now be safe to tell the concurrent mark thread to start
// without its logging output interfering with the logging output
// that came from the pause.

if (should_start_conc_mark) {
// CAUTION: after the doConcurrentMark() call below,
// the concurrent marking thread(s) could be running
// concurrently with us. Make sure that anything after
// this point does not assume that we are the only GC thread
// running. Note: of course, the actual marking work will
// not start until the safepoint itself is released in
// SuspendibleThreadSet::desynchronize().
do_concurrent_mark();
}

return true;
}

往下走就是这一步G1Policy::finalize_collection_set,去处理新生代和老年代

1
2
3
4
void G1Policy::finalize_collection_set(double target_pause_time_ms, G1SurvivorRegions* survivor) {
double time_remaining_ms = _collection_set->finalize_young_part(target_pause_time_ms, survivor);
_collection_set->finalize_old_part(time_remaining_ms);
}

这里分别调用了两个方法,可以看到剩余时间是往下传的,来看一下具体的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
double G1CollectionSet::finalize_young_part(double target_pause_time_ms, G1SurvivorRegions* survivors) {
double young_start_time_sec = os::elapsedTime();

finalize_incremental_building();

guarantee(target_pause_time_ms > 0.0,
"target_pause_time_ms = %1.6lf should be positive", target_pause_time_ms);

size_t pending_cards = _policy->pending_cards();
double base_time_ms = _policy->predict_base_elapsed_time_ms(pending_cards);
double time_remaining_ms = MAX2(target_pause_time_ms - base_time_ms, 0.0);

log_trace(gc, ergo, cset)("Start choosing CSet. pending cards: " SIZE_FORMAT " predicted base time: %1.2fms remaining time: %1.2fms target pause time: %1.2fms",
pending_cards, base_time_ms, time_remaining_ms, target_pause_time_ms);

// The young list is laid with the survivor regions from the previous
// pause are appended to the RHS of the young list, i.e.
// [Newly Young Regions ++ Survivors from last pause].

uint survivor_region_length = survivors->length();
uint eden_region_length = _g1h->eden_regions_count();
init_region_lengths(eden_region_length, survivor_region_length);

verify_young_cset_indices();

// Clear the fields that point to the survivor list - they are all young now.
survivors->convert_to_eden();

_bytes_used_before = _inc_bytes_used_before;
time_remaining_ms = MAX2(time_remaining_ms - _inc_predicted_elapsed_time_ms, 0.0);

log_trace(gc, ergo, cset)("Add young regions to CSet. eden: %u regions, survivors: %u regions, predicted young region time: %1.2fms, target pause time: %1.2fms",
eden_region_length, survivor_region_length, _inc_predicted_elapsed_time_ms, target_pause_time_ms);

// The number of recorded young regions is the incremental
// collection set's current size
set_recorded_rs_lengths(_inc_recorded_rs_lengths);

double young_end_time_sec = os::elapsedTime();
phase_times()->record_young_cset_choice_time_ms((young_end_time_sec - young_start_time_sec) * 1000.0);

return time_remaining_ms;
}

下面是老年代的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
void G1CollectionSet::finalize_old_part(double time_remaining_ms) {
double non_young_start_time_sec = os::elapsedTime();
double predicted_old_time_ms = 0.0;

if (collector_state()->in_mixed_phase()) {
cset_chooser()->verify();
const uint min_old_cset_length = _policy->calc_min_old_cset_length();
const uint max_old_cset_length = _policy->calc_max_old_cset_length();

uint expensive_region_num = 0;
bool check_time_remaining = _policy->adaptive_young_list_length();

HeapRegion* hr = cset_chooser()->peek();
while (hr != NULL) {
if (old_region_length() >= max_old_cset_length) {
// Added maximum number of old regions to the CSet.
log_debug(gc, ergo, cset)("Finish adding old regions to CSet (old CSet region num reached max). old %u regions, max %u regions",
old_region_length(), max_old_cset_length);
break;
}

// Stop adding regions if the remaining reclaimable space is
// not above G1HeapWastePercent.
size_t reclaimable_bytes = cset_chooser()->remaining_reclaimable_bytes();
double reclaimable_percent = _policy->reclaimable_bytes_percent(reclaimable_bytes);
double threshold = (double) G1HeapWastePercent;
if (reclaimable_percent <= threshold) {
// We've added enough old regions that the amount of uncollected
// reclaimable space is at or below the waste threshold. Stop
// adding old regions to the CSet.
log_debug(gc, ergo, cset)("Finish adding old regions to CSet (reclaimable percentage not over threshold). "
"old %u regions, max %u regions, reclaimable: " SIZE_FORMAT "B (%1.2f%%) threshold: " UINTX_FORMAT "%%",
old_region_length(), max_old_cset_length, reclaimable_bytes, reclaimable_percent, G1HeapWastePercent);
break;
}

double predicted_time_ms = predict_region_elapsed_time_ms(hr);
if (check_time_remaining) {
if (predicted_time_ms > time_remaining_ms) {
// Too expensive for the current CSet.

if (old_region_length() >= min_old_cset_length) {
// We have added the minimum number of old regions to the CSet,
// we are done with this CSet.
log_debug(gc, ergo, cset)("Finish adding old regions to CSet (predicted time is too high). "
"predicted time: %1.2fms, remaining time: %1.2fms old %u regions, min %u regions",
predicted_time_ms, time_remaining_ms, old_region_length(), min_old_cset_length);
break;
}

// We'll add it anyway given that we haven't reached the
// minimum number of old regions.
expensive_region_num += 1;
}
} else {
if (old_region_length() >= min_old_cset_length) {
// In the non-auto-tuning case, we'll finish adding regions
// to the CSet if we reach the minimum.

log_debug(gc, ergo, cset)("Finish adding old regions to CSet (old CSet region num reached min). old %u regions, min %u regions",
old_region_length(), min_old_cset_length);
break;
}
}

// We will add this region to the CSet.
time_remaining_ms = MAX2(time_remaining_ms - predicted_time_ms, 0.0);
predicted_old_time_ms += predicted_time_ms;
cset_chooser()->pop(); // already have region via peek()
_g1h->old_set_remove(hr);
add_old_region(hr);

hr = cset_chooser()->peek();
}
if (hr == NULL) {
log_debug(gc, ergo, cset)("Finish adding old regions to CSet (candidate old regions not available)");
}

if (expensive_region_num > 0) {
// We print the information once here at the end, predicated on
// whether we added any apparently expensive regions or not, to
// avoid generating output per region.
log_debug(gc, ergo, cset)("Added expensive regions to CSet (old CSet region num not reached min)."
"old: %u regions, expensive: %u regions, min: %u regions, remaining time: %1.2fms",
old_region_length(), expensive_region_num, min_old_cset_length, time_remaining_ms);
}

cset_chooser()->verify();
}

stop_incremental_building();

log_debug(gc, ergo, cset)("Finish choosing CSet. old: %u regions, predicted old region time: %1.2fms, time remaining: %1.2f",
old_region_length(), predicted_old_time_ms, time_remaining_ms);

double non_young_end_time_sec = os::elapsedTime();
phase_times()->record_non_young_cset_choice_time_ms((non_young_end_time_sec - non_young_start_time_sec) * 1000.0);

QuickSort::sort(_collection_set_regions, _collection_set_cur_length, compare_region_idx, true);
}

上面第三行是个判断,当前是否是 mixed 回收阶段,如果不是的话其实是没有老年代什么事的,所以可以看到代码基本是从这个 if 判断
if (collector_state()->in_mixed_phase()) {开始往下走的
先写到这,偏向于做笔记用,有错轻拍

最近看了大神的 AQS 的文章,之前总是断断续续地看一点,每次都知难而退,下次看又从头开始,昨天总算硬着头皮看完了第一部分
首先 AQS 只要有这些属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

大概了解了 aqs 底层的双向等待队列,
结构是这样的

每个 node 里面主要是的代码结构也比较简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;

// ======== 下面的几个int常量是给waitStatus用的 ===========
/** waitStatus value to indicate thread has cancelled */
// 代码此线程取消了争抢这个锁
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 本文不分析condition,所以略过吧,下一篇文章会介绍这个
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// 同样的不分析,略过吧
static final int PROPAGATE = -3;
// =====================================================


// 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
// ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 这个就是线程本尊
volatile Thread thread;

}

其实可以主要关注这个 waitStatus 因为这个是后面的节点给前面的节点设置的,等于-1 的时候代表后面有节点等待,需要去唤醒,
这里使用了一个变种的 CLH 队列实现,CLH 队列相关内容可以查看这篇 自旋锁、排队自旋锁、MCS锁、CLH锁

目前公司要对一些新的产品功能做灰度测试,因为在后端业务代码层面添加判断比较麻烦,所以想在nginx上做点手脚,就想到了openresty
前后也踩了不少坑,这边先写一点

首先是日志
error_log logs/error.log debug;
需要nginx开启日志的debug才能看到日志

使用 lua_code_cache off即可, 另外注意只有使用 content_by_lua_file 才会生效

1
2
3
4
5
6
7
8
http {
lua_code_cache off;
}

location ~* /(\d+-.*)/api/orgunits/load_all(.*) {
default_type 'application/json;charset=utf-8';
content_by_lua_file /data/projects/xxx/current/lua/controller/load_data.lua;
}

使用lua给nginx请求response头添加内容可以用这个

1
ngx.header['response'] = 'header'

使用总结

后续:

  1. 一开始在本地环境的时候使用content_by_lua_file只关注了头,后来发到测试环境发现请求内容都没代理转发到后端服务上
    网上查了下发现content_by_lua_file是将请求的所有内容包括response都用这里面的lua脚本生成了,content这个词就表示是请求内容
    后来改成了access_by_lua_file就正常了,只是要去获取请求内容和修改响应头,并不是要完整的接管请求
  1. 后来又碰到了一个坑是nginx有个client_body_buffer_size的配置参数,nginx在32位和64位系统里有8K和16K两个默认值,当请求内容大于这两个值的时候,会把请求内容放到临时文件里,这个时候openresty里的ngx.req.get_post_args()就会报“failed to get post args: requesty body in temp file not supported”这个错误,将client_body_buffer_size这个参数配置调大一点就好了

  2. 还有就是lua的异常捕获,网上看一般是用pcall和xpcall来进行保护调用,因为问题主要出在cjson的decode,这里有两个解决方案,一个就是将cjson.decode使用pcall封装,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    local decode = require("cjson").decode

    function json_decode( str )
    local ok, t = pcall(decode, str)
    if not ok then
    return nil
    end

    return t
    end

    这个是使用了pcall,称为保护调用,会在内部错误后返回两个参数,第一个是false,第二个是错误信息
    还有一种是使用cjson.safe包

    1
    2
    3
    4
    5
    6
    7
    local json = require("cjson.safe")
    local str = [[ {"key:"value"} ]]

    local t = json.decode(str)
    if t then
    ngx.say(" --> ", type(t))
    end

    cjson.safe包会在解析失败的时候返回nil

  3. 还有一个是redis链接时如果host使用的是域名的话会提示“failed to connect: no resolver defined to resolve “redis.xxxxxx.com””,这里需要使用nginx的resolver指令,
    resolver 8.8.8.8 valid=3600s;

  4. 还有一点补充下
    就是业务在使用redis的时候使用了db的特性,所以在lua访问redis的时候也需要执行db,这里lua的redis库也支持了这个特性,可以使用instance:select(config:get(‘db’))来切换db

  5. 性能优化tips
    建议是尽量少使用阶段钩子,例如content_by_lua_file,*_by_lua

  6. 发现一个不错的openresty站点
    地址

初识ambari

ambari是一个大数据平台的管理工具,包含了hadoop, yarn, hive, hbase, spark等大数据的基础架构和工具,简化了数据平台的搭建,之前只是在同事搭建好平台后的一些使用,这次有机会从头开始用ambari来搭建一个测试的数据平台,过程中也踩到不少坑,简单记录下。

简单过程

  • 第一个坑
    在刚开始是按照官网的指南,用maven构建,因为GFW的原因,导致反复失败等待,也就是这个guide,因为对maven不熟悉导致有些按图索骥,浪费了很多时间,之后才知道可以直接加repo用yum安装,然而用yum安装马上就出现了第二个坑。
  • 第二个坑
    因为在线的repo还是因为网络原因很慢很慢,用proxychains勉强把ambari-server本身安装好了,ambari.repo将这个放进/etc/yum.repos.d/路径下,然后yum update && yum install ambari-server安装即可,如果有条件就用proxychains走下代理。
  • 第三步
    安装好ambari-server后先执行ambari-server setup做一些初始化设置,其中包含了JDK路径的设置,数据库设置,设置好就OK了,然后执行ambari-server start启动服务,这里有个小插曲,因为ambari-server涉及到这么多服务,所以管理控制监控之类的模块是必不可少的,这部分可以在ambari-server的web ui界面安装,也可以命令行提前安装,这部分被称为HDF Management Pack,运行```ambari-server install-mpack \
  • -mpack=http://public-repo-1.hortonworks.com/HDF/centos7/2.x/updates/2.1.4.0/tars/hdf_ambari_mp/hdf-ambari-mpack-2.1.4.0-5.tar.gz \
  • -purge \
  • -verbose```
    安装,当然这个压缩包可以下载之后指到本地路径安装,然后就可以重启ambari-server

rabbitmq 介绍

接触了一下rabbitmq,原来在选型的时候是在rabbitmq跟kafka之间做选择,网上搜了一下之后发现kafka的优势在于吞吐量,而rabbitmq相对注重可靠性,因为应用在im上,需要保证消息不能丢失所以就暂时选定rabbitmq,
Message Queue的需求由来已久,80年代最早在金融交易中,高盛等公司采用Teknekron公司的产品,当时的Message queuing软件叫做:the information bus(TIB)。 TIB被电信和通讯公司采用,路透社收购了Teknekron公司。之后,IBM开发了MQSeries,微软开发了Microsoft Message Queue(MSMQ)。这些商业MQ供应商的问题是厂商锁定,价格高昂。2001年,Java Message queuing试图解决锁定和交互性的问题,但对应用来说反而更加麻烦了。
RabbitMQ采用Erlang语言开发。Erlang语言由Ericson设计,专门为开发concurrent和distribution系统的一种语言,在电信领域使用广泛。OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件/库/工具,如mnesia/SASL,极大方便了Erlang应用的开发。OTP就类似于Python语言中众多的module,用户借助这些module可以很方便的开发应用。
于是2004年,摩根大通和iMatrix开始着手Advanced Message Queuing Protocol (AMQP)开放标准的开发。2006年,AMQP规范发布。2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ 1.0 发布。所有主要的编程语言均有与代理接口通讯的客户端库。

简单的使用经验

通俗的理解

这里介绍下其中的一些概念,connection表示和队列服务器的连接,一般情况下是tcp连接, channel表示通道,可以在一个连接上建立多个通道,这里主要是节省了tcp连接握手的成本,exchange可以理解成一个路由器,将消息推送给对应的队列queue,其实是像一个订阅的模式。

集群经验

rabbitmqctl stop这个是关闭rabbitmq,在搭建集群时候先关闭服务,然后使用rabbitmq-server -detached静默启动,这时候使用rabbitmqctl cluster_status查看集群状态,因为还没将节点加入集群,所以只能看到类似

1
2
3
4
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}]
...done.

然后就可以把当前节点加入集群,

1
2
3
4
5
6
7
rabbit2$ rabbitmqctl stop_app #这个stop_app与stop的区别是前者停的是rabbitmq应用,保留erlang节点,
#后者是停止了rabbitmq和erlang节点
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1 #这里可以用--ram指定将当前节点作为内存节点加入集群
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

其他可以参考官方文档

一些坑

消息丢失

这里碰到过一个坑,对于使用exchange来做消息路由的,会有一个情况,就是在routing_key没被订阅的时候,会将该条找不到路由对应的queue的消息丢掉What happens if we break our contract and send a message with one or four words, like "orange" or "quick.orange.male.rabbit"? Well, these messages won't match any bindings and will be lost.对应链接,而当使用空的exchange时,会保留消息,当出现消费者的时候就可以将收到之前生产者所推送的消息对应链接,这里就是用了空的exchange。

集群搭建

集群搭建的时候有个erlang vm生成的random cookie,这个是用来做集群之间认证的,相同的cookie才能连接,但是如果通过vim打开复制后在其他几点新建文件写入会多一个换行,导致集群建立是报错,所以这里最好使用scp等传输命令直接传输cookie文件,同时要注意下cookie的文件权限。
另外在集群搭建的时候如果更改过hostname,那么要把rabbitmq的数据库删除,否则启动后会马上挂掉

spark 的一些粗浅使用经验

工作中学习使用了一下Spark做数据分析,主要是用spark的python接口,首先是pyspark.SparkContext(appName=xxx),这是初始化一个Spark应用实例或者说会话,不能重复,
返回的实例句柄就可以调用textFile(path)读取文本文件,这里的文本文件可以是HDFS上的文本文件,也可以普通文本文件,但是需要在Spark的所有集群上都存在,否则会
读取失败,parallelize则可以将python生成的集合数据读取后转换成rdd(A Resilient Distributed Dataset (RDD),一种spark下的基本抽象数据集),基于这个RDD就可以做
数据的流式计算,例如map reduce,在Spark中可以非常方便地实现

简单的mapreduce word count示例

1
2
3
4
textFile = sc.parallelize([(1,1), (2,1), (3,1), (4,1), (5,1),(1,1), (2,1), (3,1), (4,1), (5,1)])
data = textFile.reduceByKey(lambda x, y: x + y).collect()
for _ in data:
print(_)

结果

1
2
3
4
5
(3, 2)
(1, 2)
(4, 2)
(2, 2)
(5, 2)

PHP抽象类和接口

  • 抽象类与接口
  • 抽象类内可以包含非抽象函数,即可实现函数
  • 抽象类内必须包含至少一个抽象方法,抽象类和接口均不能实例化
  • 抽象类可以设置访问级别,接口默认都是public
  • 类可以实现多个接口但不能继承多个抽象类
  • 类必须实现抽象类和接口里的抽象方法,不一定要实现抽象类的非抽象方法
  • 接口内不能定义变量,但是可以定义常量

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
interface int1{
const INTER1 = 111;
function inter1();
}
interface int2{
const INTER1 = 222;
function inter2();
}
abstract class abst1{
public function abstr1(){
echo 1111;
}
abstract function abstra1(){
echo 'ahahahha';
}
}
abstract class abst2{
public function abstr2(){
echo 1111;
}
abstract function abstra2();
}
class normal1 extends abst1{
protected function abstr2(){
echo 222;
}
}

result

1
2
3
PHP Fatal error:  Abstract function abst1::abstra1() cannot contain body in new.php on line 17

Fatal error: Abstract function abst1::abstra1() cannot contain body in php on line 17

problem

Given a sorted integer array without duplicates, return the summary of its ranges.

For example, given [0,1,2,4,5,7], return ["0->2","4->5","7"].

题解

每一个区间的起点nums[i]加上j是否等于nums[i+j]
参考

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Solution {
public:
vector<string> summaryRanges(vector<int>& nums) {
int i = 0, j = 1, n;
vector<string> res;
n = nums.size();
while(i < n){
j = 1;
while(j < n && nums[i+j] - nums[i] == j) j++;
res.push_back(j <= 1 ? to_string(nums[i]) : to_string(nums[i]) + "->" + to_string(nums[i + j - 1]));
i += j;
}
return res;
}
};