@@ -70,7 +70,7 @@ jsonbd_shmem_size(void)
7070 shm_toc_estimator e ;
7171 Size size ;
7272
73- Assert (jsonbd_nworkers != -1 );
73+ Assert (jsonbd_nworkers > 0 );
7474 shm_toc_initialize_estimator (& e );
7575
7676 shm_toc_estimate_chunk (& e , sizeof (jsonbd_shm_hdr ));
@@ -83,17 +83,26 @@ jsonbd_shmem_size(void)
8383 shm_toc_estimate_chunk (& e , jsonbd_get_queue_size ());
8484 }
8585
86- /* 3 keys each worker + 3 for header (header itself and two queues) */
87- shm_toc_estimate_keys (& e , MAX_JSONBD_WORKERS * 3 + 3 );
86+ /* 3 keys each worker + 3 for header (header itself, launcher and its two queues) */
87+ shm_toc_estimate_keys (& e , MAX_JSONBD_WORKERS * 3 + 4 );
8888 size = shm_toc_estimate (& e );
8989 return size ;
9090}
9191
92+ /*
93+ * Initialize worker shm block
94+ *
95+ * About keys in toc:
96+ * 0 - for header
97+ * 1..MAX_JSONBD_WORKERS - workers
98+ * MAX_JSONBD_WORKERS + 1 - launcher
99+ * MAX_JSONBD_WORKERS + 2 .. - queues
100+ */
92101static void
93102jsonbd_init_worker (shm_toc * toc , jsonbd_shm_worker * wd , int worker_num ,
94103 size_t queue_size )
95104{
96- static int mqkey = MAX_JSONBD_WORKERS + 1 ;
105+ static int mqkey = MAX_JSONBD_WORKERS + 2 ;
97106
98107 /* each worker will have two mq, for input and output */
99108 wd -> mqin = shm_mq_create (shm_toc_allocate (toc , queue_size ), queue_size );
@@ -109,7 +118,7 @@ jsonbd_init_worker(shm_toc *toc, jsonbd_shm_worker *wd, int worker_num,
109118 shm_mq_clean_sender (wd -> mqout );
110119
111120 if (worker_num )
112- shm_toc_insert (toc , i + 1 , wd );
121+ shm_toc_insert (toc , worker_num , wd );
113122
114123 shm_toc_insert (toc , mqkey ++ , wd -> mqin );
115124 shm_toc_insert (toc , mqkey ++ , wd -> mqout );
@@ -141,7 +150,7 @@ jsonbd_shmem_startup_hook(void)
141150 hdr = shm_toc_allocate (toc , sizeof (jsonbd_shm_hdr ));
142151 hdr -> workers_ready = 0 ;
143152 hdr -> launcher_sem = PGSemaphoreCreate ();
144- jsonbd_init_worker (toc , & hdr -> launcher , sizeof (Oid ));
153+ jsonbd_init_worker (toc , & hdr -> launcher , MAX_JSONBD_WORKERS + 1 , sizeof (Oid ));
145154
146155 for (i = 0 ; i < MAX_DATABASES ; i ++ )
147156 sem_init (& hdr -> workers_sem [i ], 1 , jsonbd_nworkers );
@@ -176,9 +185,8 @@ _PG_init(void)
176185
177186 if (jsonbd_nworkers )
178187 {
179- int i ;
180188 RequestAddinShmemSpace (jsonbd_shmem_size ());
181- jsonbd_register_worker_launcher ();
189+ jsonbd_register_launcher ();
182190 }
183191 else elog (LOG , "jsonbd: workers are disabled" );
184192}
@@ -281,8 +289,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
281289 shm_mq_result resmq ;
282290 shm_mq_handle * mqh ;
283291 jsonbd_shm_hdr * hdr ;
284- jsonbd_shm_worker * wd ,
285- * wd_inner ;
292+ jsonbd_shm_worker * wd ;
286293
287294 char * res ;
288295 Size reslen ;
@@ -312,7 +319,11 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
312319 for (j = i ; j < (i + jsonbd_nworkers ); j ++ )
313320 {
314321 wd = shm_toc_lookup (toc , j + 1 , false);
315- Assert (wd -> dboid == MyDatabaseId );
322+
323+ if (wd -> dboid != MyDatabaseId )
324+ /* somehow not all workers started for this database */
325+ break ;
326+
316327 if (pg_atomic_test_set_flag (& wd -> busy ))
317328 goto comm ;
318329 }
@@ -355,7 +366,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
355366 elog (ERROR , "jsonbd: workers launcher was detached" );
356367 }
357368
358- goto comm :
369+ comm :
359370 detached = false;
360371
361372 /* send data */
@@ -599,7 +610,7 @@ packJsonbValue(JsonbValue *val, int header_size, int *len)
599610
600611/* Compress jsonb using dictionary */
601612static struct varlena *
602- jsonbd_compress (AttributeCompression * ac , const struct varlena * data )
613+ jsonbd_compress (CompressionMethodOptions * cmoptions , const struct varlena * data )
603614{
604615 int size ;
605616 JsonbIteratorToken r ;
@@ -668,7 +679,7 @@ jsonbd_compress(AttributeCompression *ac, const struct varlena *data)
668679 Assert (offset == len );
669680
670681 /* retrieve or generate ids */
671- jsonbd_worker_get_key_ids (ac -> cmoptoid , buf , len , idsbuf , nkeys );
682+ jsonbd_worker_get_key_ids (cmoptions -> cmoptoid , buf , len , idsbuf , nkeys );
672683
673684 /* replace the old keys with encoded ids */
674685 for (i = 0 ; i < nkeys ; i ++ )
@@ -704,7 +715,7 @@ jsonbd_configure(Form_pg_attribute attr, List *options)
704715}
705716
706717static struct varlena *
707- jsonbd_decompress (AttributeCompression * ac , const struct varlena * data )
718+ jsonbd_decompress (CompressionMethodOptions * cmoptions , const struct varlena * data )
708719{
709720 JsonbIteratorToken r ;
710721 JsonbValue v ,
@@ -752,7 +763,7 @@ jsonbd_decompress(AttributeCompression *ac, const struct varlena *data)
752763 }
753764
754765 /* retrieve keys */
755- buf = jsonbd_worker_get_keys (ac -> cmoptoid , compression_buffers -> idsbuf , nkeys , & buflen );
766+ buf = jsonbd_worker_get_keys (cmoptions -> cmoptoid , compression_buffers -> idsbuf , nkeys , & buflen );
756767 if (buf == NULL )
757768 elog (ERROR , "jsonbd: decompression error" );
758769
0 commit comments