如果想获取一个stream所有的reader,那么必须调用这个函数:
Bitmapset *targets = GetLocalStreamReaders(relid);
如果stream下面没有reader,那么这个targets返回NULL。
我们跟到GetLocalStreamReaders里面看看
Bitmapset * GetLocalStreamReaders(Oid relid) { Bitmapset *readers = GetAllStreamReaders(relid); if (stream_targets && readers) { .....} return readers; }
中间的if不看,这个readers是通过调用GetAllStreamReaders来获取的,我们继续跟进去看看。
Bitmapset * GetAllStreamReaders(Oid relid) { HeapTuple tup = SearchSysCache1(PIPELINESTREAMRELID, ObjectIdGetDatum(relid)); bool isnull; ..... raw = SysCacheGetAttr(PIPELINESTREAMRELID, tup, Anum_pipeline_stream_queries, &isnull); if (isnull) return NULL; ...... ReleaseSysCache(tup); return result; }
这段代码就很有意思了。
如果isnull直接return,而后面的ReleaseSysCaceh没有执行。
这样上面的tup就一直存在,没有释放掉。
这样会导致后面的一个断言错误。
来看看下面堆栈信息。
TRAP: FailedAssertion("!(ct->refcount == 0)", File: "catcache.c", Line: 588, PID: 3829, Query: (null))
assertion failure at:
pipeline: bgworker: worker [postgres] (ExceptionalCondition+0xaf)[0x906b0f]
pipeline: bgworker: worker [postgres] (AtEOXact_CatCache+0x1e6)[0x8eb735]
pipeline: bgworker: worker [postgres] [0x4fe75a]
pipeline: bgworker: worker [postgres] (CommitTransactionCommand+0x72)[0x4ff19c]
pipeline: bgworker: worker [postgres] (ContinuousQueryWorkerMain+0x6cd)[0x7366a1]
pipeline: bgworker: worker [postgres] [0x7343f9]
pipeline: bgworker: worker [postgres] (StartBackgroundWorker+0x2bd)[0x7427ea]
pipeline: bgworker: worker [postgres] [0x75532a]
pipeline: bgworker: worker [postgres] [0x755646]
pipeline: bgworker: worker [postgres] [0x750473]
pipeline: bgworker: worker [postgres] (PostmasterMain+0x110c)[0x74f92a]
pipeline: bgworker: worker [postgres] [0x694f85]
/lib64/libc.so.6(__libc_start_main+0xf5)[0x7fb8eb84caf5]
pipeline: bgworker: worker [postgres] [0x462e09]
我们看看catcache.c:588
582 dlist_foreach(iter, bucket) 583 { 584 CatCTup *ct; 585 586 ct = dlist_container(CatCTup, cache_elem, iter.cur); 587 Assert(ct->ct_magic == CT_MAGIC); 588 Assert(ct->refcount == 0); 589 Assert(!ct->dead); 590 }
我们看看ct->refcount的解释:
int refcount; /* number of active references */
这其实跟我修改的代码有关系,我们从上面堆栈信息分析。
ContinuousQueryWorkerMain-->CommitTransactionCommand
我在ContinuousQueryWorkerMain里面自己调用了
Bitmapset *targets = GetLocalStreamReaders(relid);
而我判断targets的时候,
if (!targets) { donothing... }
我特意看了一下官方的用法。
src/backend/pipeline/stream.c:200
if (targets == NULL) { char *name = get_rel_name(pstmt->relid); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("no continuous views are currently reading from stream %s", name), errhint("Use CREATE CONTINUOUS VIEW to create a continuous view that includes %s in its FROM clause.", name))); }
很清楚的看到,这个里面直接丢了个ERROR,
直接abort,这样就不会像我上面堆栈信息那样,后面commit就会断言异常。
话说,一个stream下面没有readers是很正常的,但是这么明显是代码有错误,该释放的没有释放。
修改如下:
src/backend/catalog/pipeline_stream.c
GetAllStreamReaders函数
if (isnull) return NULL;
修改成
if (isnull){ ReleaseSysCache(tup); return NULL; }
这样在返回的时候就直接释放了tup。
这个里面还有别的问题。后面再写。