diff --git a/builds/win32/msvc15/engine_static.vcxproj b/builds/win32/msvc15/engine_static.vcxproj
index 67bbab0013e..22a3486a27e 100644
--- a/builds/win32/msvc15/engine_static.vcxproj
+++ b/builds/win32/msvc15/engine_static.vcxproj
@@ -69,6 +69,7 @@
+
@@ -241,6 +242,7 @@
+
diff --git a/builds/win32/msvc15/engine_static.vcxproj.filters b/builds/win32/msvc15/engine_static.vcxproj.filters
index c5e0ec0fcee..8d4e384613d 100644
--- a/builds/win32/msvc15/engine_static.vcxproj.filters
+++ b/builds/win32/msvc15/engine_static.vcxproj.filters
@@ -228,6 +228,9 @@
JRD files
+
+ JRD files
+
JRD files
@@ -719,6 +722,9 @@
Header files
+
+ Header files
+
Header files
diff --git a/src/burp/restore.epp b/src/burp/restore.epp
index 5d5a31765fb..c51fc9fe7fe 100644
--- a/src/burp/restore.epp
+++ b/src/burp/restore.epp
@@ -13503,6 +13503,8 @@ bool WriteRelationMeta::prepareBatch(BurpGlobals* tdgbl)
void WriteRelationMeta::prepareRequest(BurpGlobals* tdgbl)
{
+ const bool use_bulk_insert = (tdgbl->runtimeODS >= DB_VERSION_DDL14);
+
m_batchMode = false;
m_inMgsNum = 0;
@@ -13657,18 +13659,27 @@ void WriteRelationMeta::prepareRequest(BurpGlobals* tdgbl)
add_byte(blr, blr_loop);
add_byte(blr, blr_receive);
add_byte(blr, 0);
- if (identity_type == IDENT_TYPE_ALWAYS)
+
+ if (!use_bulk_insert)
{
- add_byte(blr, blr_store3);
- add_byte(blr, blr_store_override_system);
+ if (identity_type == IDENT_TYPE_ALWAYS)
+ {
+ add_byte(blr, blr_store3);
+ add_byte(blr, blr_store_override_system);
+ }
+ else
+ add_byte(blr, blr_store);
+
+ // Mark this store operation as bulk one
+ add_byte(blr, blr_marks);
+ add_byte(blr, 1);
+ add_byte(blr, 0x10); // must be Jrd::StatementNode::MARK_BULK_INSERT
}
else
- add_byte(blr, blr_store);
-
- // Mark this store operation as bulk one
- add_byte(blr, blr_marks);
- add_byte(blr, 1);
- add_byte(blr, 0x10); // must be Jrd::StatementNode::MARK_BULK_INSERT
+ {
+ add_byte(blr, blr_bulk_insert);
+ add_byte(blr, blr_null); // no source RSE
+ }
if (m_relation->rel_name.schema.hasData())
{
@@ -13710,8 +13721,11 @@ void WriteRelationMeta::prepareRequest(BurpGlobals* tdgbl)
}
add_byte(blr, blr_end);
- if (identity_type == IDENT_TYPE_ALWAYS)
- add_byte(blr, blr_null);
+ if (!use_bulk_insert)
+ {
+ if (identity_type == IDENT_TYPE_ALWAYS)
+ add_byte(blr, blr_null);
+ }
add_byte(blr, blr_end);
add_byte(blr, blr_eoc);
@@ -13811,7 +13825,7 @@ void WriteRelationReq::send(BurpGlobals* tdgbl, ITransaction* tran, bool lastRec
// memory when there are blobs and arrays fields - CORE-3802.
FbLocalStatus status;
- if (m_resync || m_recs % 1000 == 1)
+ if (m_resync || ((m_recs % 1000 == 1) && !tdgbl->gbl_use_auto_release_temp_blobid))
m_request->startAndSend(&status, tran, 0, 0, m_inMsg.getCount(), m_inMsg.begin());
else
m_request->send(&status, 0, 0, m_inMsg.getCount(), m_inMsg.begin());
diff --git a/src/dsql/Nodes.h b/src/dsql/Nodes.h
index 8ed8edadb2f..53255f0e283 100644
--- a/src/dsql/Nodes.h
+++ b/src/dsql/Nodes.h
@@ -1454,6 +1454,7 @@ class StmtNode : public DmlNode
{
TYPE_ASSIGNMENT,
TYPE_BLOCK,
+ TYPE_BULK_INSERT,
TYPE_COMPOUND_STMT,
TYPE_CONTINUE_LEAVE,
TYPE_CURSOR_STMT,
diff --git a/src/dsql/StmtNodes.cpp b/src/dsql/StmtNodes.cpp
index dd11eab52df..5c38ddcdce6 100644
--- a/src/dsql/StmtNodes.cpp
+++ b/src/dsql/StmtNodes.cpp
@@ -65,6 +65,7 @@
#include "../dsql/make_proto.h"
#include "../dsql/pass1_proto.h"
#include "../dsql/DsqlStatementCache.h"
+#include "../jrd/BulkInsert.h"
using namespace Firebird;
using namespace Jrd;
@@ -779,6 +780,349 @@ bool BlockNode::testAndFixupError(thread_db* tdbb, Request* request, const Excep
//--------------------
+static RegisterNode regBulkInsertNode({blr_bulk_insert});
+
+DmlNode* BulkInsertNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb, const UCHAR blrOp)
+{
+ auto node = FB_NEW_POOL(pool) BulkInsertNode(pool);
+
+ if (csb->csb_blr_reader.peekByte() == blr_marks)
+ /*node->marks |= */PAR_marks(csb);
+
+ // Should be enough for cursor stability
+ AutoSetRestore autoCurrentDMLNode(&csb->csb_currentDMLNode, node);
+
+ //AutoSetRestore autoCurrentForNode(&csb->csb_currentForNode, node);
+
+ const auto rseBlr = csb->csb_blr_reader.peekByte();
+
+ switch (rseBlr)
+ {
+ case blr_null:
+ csb->csb_blr_reader.getByte();
+ break; // no RSE
+
+ case blr_rse:
+ case blr_lateral_rse:
+ case blr_singular:
+ case blr_scrollable:
+ node->rse = PAR_rse(tdbb, csb);
+ break;
+
+ default:
+ node->rse = PAR_rse(tdbb, csb, blrOp);
+ break;
+ }
+
+ const UCHAR* blrPos = csb->csb_blr_reader.getPos();
+
+ node->target = PAR_parseRecordSource(tdbb, csb);
+
+ if (!nodeIs(node->target) &&
+ !nodeIs(node->target))
+ {
+ csb->csb_blr_reader.setPos(blrPos);
+ PAR_syntax_error(csb, "relation source");
+ }
+
+ node->statement = PAR_parse_stmt(tdbb, csb);
+
+ return node;
+}
+
+string BulkInsertNode::internalPrint(NodePrinter& printer) const
+{
+ StmtNode::internalPrint(printer);
+
+ NODE_PRINT(printer, rse);
+ NODE_PRINT(printer, target);
+ NODE_PRINT(printer, statement);
+ NODE_PRINT(printer, cursor);
+
+ return "BulkInsertNode";
+}
+
+BulkInsertNode* BulkInsertNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
+{
+ fb_assert(false); // not implemented
+ return nullptr;
+}
+
+void BulkInsertNode::genBlr(DsqlCompilerScratch* dsqlScratch)
+{
+ fb_assert(false); // not implemented
+}
+
+BulkInsertNode* BulkInsertNode::pass1(thread_db* tdbb, CompilerScratch* csb)
+{
+ if (rse)
+ {
+ AutoSetRestore autoImplicitCursor(&csb->csb_implicit_cursor, true);
+ doPass1(tdbb, csb, rse.getAddress());
+ }
+
+ preprocessAssignments(tdbb, csb, target->getStream(), nodeAs(statement), nullptr);
+ doPass1(tdbb, csb, statement.getAddress());
+
+ const StreamType stream = target->getStream();
+ auto& tail = csb->csb_rpt[stream];
+ tail.csb_flags |= csb_store;
+
+ jrd_rel* const relation = tail.csb_relation(tdbb);
+ if (relation)
+ {
+ pass1Update(tdbb, csb, relation,
+ relation->rel_triggers[TRIGGER_PRE_STORE], // unused inside pass1Update
+ stream, stream, SCL_insert, nullptr, 0, 0);
+ }
+
+ return this;
+}
+
+BulkInsertNode* BulkInsertNode::pass2(thread_db* tdbb, CompilerScratch* csb)
+{
+ if (rse)
+ {
+ AutoSetCurrentCursorId autoSetCurrentCursorId(csb);
+
+ StreamList streams;
+ streams.add(target->getStream());
+
+ StreamStateHolder stateHolder(csb, streams);
+ stateHolder.activate();
+
+ rse->pass2Rse(tdbb, csb);
+ doPass2(tdbb, csb, statement.getAddress(), this);
+
+ RecordSource* const rsb = CMP_post_rse(tdbb, csb, rse.getObject());
+
+ cursor = FB_NEW_POOL(*tdbb->getDefaultPool())
+ Cursor(csb, rsb, rse, true, line, column, "");
+
+ csb->csb_fors.add(cursor);
+ }
+ else
+ doPass2(tdbb, csb, statement.getAddress(), this);
+
+ impureOffset = csb->allocImpure();
+
+ return this;
+}
+
+const StmtNode* BulkInsertNode::execute(thread_db* tdbb, Request* request, ExeState* exeState) const
+{
+ if (request->req_operation == Request::req_evaluate)
+ {
+ if (cursor)
+ fromCursor(tdbb, request);
+ else
+ fromMessage(tdbb, request);
+
+ request->req_operation = Request::req_return;
+ }
+
+ return parentStmt;
+}
+
+void BulkInsertNode::fromCursor(thread_db* tdbb, Request* request) const
+{
+ jrd_tra* transaction = request->req_transaction;
+
+ const StreamType stream = target->getStream();
+ record_param* rpb = &request->req_rpb[stream];
+ jrd_rel* relation = rpb->rpb_relation;
+ RLCK_reserve_relation(tdbb, transaction, relation->getPermanent(), true);
+
+ const Format* format = relation->currentFormat(tdbb);
+ auto record = VIO_record(tdbb, rpb, format, tdbb->getDefaultPool());
+
+ rpb->rpb_address = record->getData();
+ rpb->rpb_length = format->fmt_length;
+ rpb->rpb_format_number = format->fmt_version;
+
+ auto compound = nodeAs(statement);
+
+ HalfStaticArray toDescs(*tdbb->getDefaultPool(), compound->statements.getCount());
+ prepareTarget(tdbb, request, toDescs.begin());
+
+ cursor->open(tdbb);
+ auto bulk = transaction->getBulkInsert(tdbb, relation, true);
+ fb_assert(bulk);
+
+ Cleanup clean([&] {
+ cursor->close(tdbb);
+ transaction->finiBulkInsert(tdbb, false);
+ });
+
+ while (cursor->fetchNext(tdbb))
+ {
+ rpb->rpb_number.setValue(BOF_NUMBER);
+ record->nullify();
+ record->setTransactionNumber(transaction->tra_number);
+
+ assignValues(tdbb, request, relation, record, toDescs.begin());
+ cleanupRpb(tdbb, rpb);
+
+ bulk->putRecord(tdbb, rpb, transaction);
+ REPL_store(tdbb, rpb, transaction);
+
+ JRD_reschedule(tdbb);
+ }
+
+ transaction->finiBulkInsert(tdbb, true);
+}
+
+void BulkInsertNode::fromMessage(thread_db* tdbb, Request* request) const
+{
+ Impure* impure = request->getImpure(impureOffset);
+ jrd_tra* transaction = request->req_transaction;
+
+ const StreamType stream = target->getStream();
+ record_param* rpb = &request->req_rpb[stream];
+
+ jrd_rel* relation = rpb->rpb_relation;
+
+ const Format* format = relation->currentFormat(tdbb);
+ auto record = VIO_record(tdbb, rpb, format, tdbb->getDefaultPool());
+
+ rpb->rpb_address = record->getData();
+ rpb->rpb_length = format->fmt_length;
+ rpb->rpb_format_number = format->fmt_version;
+ rpb->rpb_number.setValue(BOF_NUMBER);
+
+ record->nullify();
+ record->setTransactionNumber(transaction->tra_number);
+
+ if (!(impure->flags & INIT_DONE))
+ {
+ RLCK_reserve_relation(tdbb, transaction, relation->getPermanent(), true);
+
+ auto compound = nodeAs(statement);
+ const auto count = compound->statements.getCount();
+
+ impure->flags = INIT_DONE;
+ impure->descs = FB_NEW_POOL(*request->req_pool) Array(*request->req_pool, count);
+
+ prepareTarget(tdbb, request, impure->descs->getBuffer(count));
+ }
+ auto bulk = transaction->getBulkInsert(tdbb, relation, true);
+ assignValues(tdbb, request, relation, record, impure->descs->begin());
+ cleanupRpb(tdbb, rpb);
+
+ bulk->putRecord(tdbb, rpb, transaction);
+ REPL_store(tdbb, rpb, transaction);
+}
+
+void BulkInsertNode::prepareTarget(thread_db* tdbb, Request* request, dsc* descs) const
+{
+ auto compound = nodeAs(statement);
+ fb_assert(compound->onlyAssignments);
+
+ dsc* toDesc = descs;
+ for (auto stmt : compound->statements)
+ {
+ auto assign = nodeAs(stmt);
+
+ // Get descriptor of target field
+ const auto* toField = nodeAs(assign->asgnTo);
+ *toDesc = *EVL_assign_to(tdbb, toField);
+ toDesc++;
+ }
+}
+
+void BulkInsertNode::assignValues(thread_db* tdbb, Request* request, jrd_rel* relation, Record* record, dsc* to_desc) const
+{
+ auto compound = nodeAs(statement);
+
+ // assignments
+ for (auto stmt : compound->statements)
+ {
+ auto assign = nodeAs(stmt);
+ //EXE_assignment(tdbb, assign);
+
+ if (assign->hasLineColumn)
+ {
+ request->req_src_line = assign->line;
+ request->req_src_column = assign->column;
+ }
+
+ const FieldNode* toField = nodeAs(assign->asgnTo);
+
+ fb_assert(record == request->req_rpb[toField->fieldStream].rpb_record);
+
+ dsc* from_desc = EVL_expr(tdbb, request, assign->asgnFrom);
+
+ if (!from_desc)
+ {
+ const auto relField = (*relation->rel_fields)[toField->fieldId];
+ if (relField->fld_not_null)
+ {
+ string name;
+ auto& rel_name = relation->getName();
+
+ if (!rel_name.isEmpty())
+ name.printf("%s.\"%s\"", rel_name.toQuotedString().c_str(), relField->fld_name.c_str());
+ else
+ name = relField->fld_name.toQuotedString();
+
+ if (name.isEmpty())
+ name = UNKNOWN_STRING_MARK;
+
+ // validation error for column @1, value \"@2\"
+ ERR_post(Arg::Gds(isc_not_valid) << Arg::Str(name) << Arg::Str(NULL_STRING_MARK));
+ }
+
+ record->setNull(toField->fieldId);
+ to_desc->dsc_flags |= DSC_null;
+ }
+ else
+ {
+ record->clearNull(toField->fieldId);
+
+ if (DTYPE_IS_BLOB_OR_QUAD(from_desc->dsc_dtype) || DTYPE_IS_BLOB_OR_QUAD(to_desc->dsc_dtype))
+ {
+ // ASF: Don't let MOV_move call blb::move because MOV
+ // will not pass the destination field to blb::move.
+
+ blb::move(tdbb, from_desc, to_desc, relation, record, toField->fieldId, true);
+ }
+ else if (!DSC_EQUIV(from_desc, to_desc, false))
+ {
+ MOV_move(tdbb, from_desc, to_desc);
+ }
+ else if (DTYPE_IS_TEXT(from_desc->dsc_dtype))
+ {
+ // Force slow move to properly handle the case when source string is provided with real length instead of padded length
+ MOV_move(tdbb, from_desc, to_desc);
+ }
+ else if (from_desc->dsc_dtype == dtype_short)
+ {
+ *((SSHORT*) to_desc->dsc_address) = *((SSHORT*) from_desc->dsc_address);
+ }
+ else if (from_desc->dsc_dtype == dtype_long)
+ {
+ *((SLONG*) to_desc->dsc_address) = *((SLONG*) from_desc->dsc_address);
+ }
+ else if (from_desc->dsc_dtype == dtype_int64)
+ {
+ *((SINT64*) to_desc->dsc_address) = *((SINT64*) from_desc->dsc_address);
+ }
+ else
+ {
+ memcpy(to_desc->dsc_address, from_desc->dsc_address, from_desc->dsc_length);
+ }
+
+ to_desc->dsc_flags &= ~DSC_null;
+ }
+
+ to_desc++;
+ }
+}
+
+
+//--------------------
+
+
static RegisterNode regCompoundStmtNode({blr_begin});
DmlNode* CompoundStmtNode::parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb, const UCHAR /*blrOp*/)
diff --git a/src/dsql/StmtNodes.h b/src/dsql/StmtNodes.h
index 901224784ff..3b8ffaf20aa 100644
--- a/src/dsql/StmtNodes.h
+++ b/src/dsql/StmtNodes.h
@@ -44,6 +44,7 @@ class RecordBuffer;
class RelationSourceNode;
class SelectNode;
class GeneratorItem;
+class BulkInsert;
class ExceptionItem final : public Firebird::PermanentStorage, public Printable
@@ -181,6 +182,48 @@ class BlockNode final : public TypedNode
};
+class BulkInsertNode : public TypedNode
+{
+public:
+ explicit BulkInsertNode(MemoryPool& pool)
+ : TypedNode(pool)
+ {
+ }
+
+public:
+ static DmlNode* parse(thread_db* tdbb, MemoryPool& pool, CompilerScratch* csb, const UCHAR blrOp);
+
+ Firebird::string internalPrint(NodePrinter& printer) const override;
+ BulkInsertNode* dsqlPass(DsqlCompilerScratch* dsqlScratch) override;
+ void genBlr(DsqlCompilerScratch* dsqlScratch) override;
+ BulkInsertNode* pass1(thread_db* tdbb, CompilerScratch* csb) override;
+ BulkInsertNode* pass2(thread_db* tdbb, CompilerScratch* csb) override;
+ const StmtNode* execute(thread_db* tdbb, Request* request, ExeState* exeState) const override;
+
+public:
+ NestConst rse = nullptr; // source RSE
+ NestConst target = nullptr; // target relation
+ NestConst statement = nullptr; // assignments: field = value [, ...]
+ NestConst cursor = nullptr; // source cursor
+
+private:
+ // Impure flags
+ static constexpr int INIT_DONE = 0x01;
+
+ struct Impure
+ {
+ Firebird::Array* descs;
+ int flags;
+ };
+
+ void fromCursor(thread_db* tdbb, Request* request) const;
+ void fromMessage(thread_db* tdbb, Request* request) const;
+
+ void prepareTarget(thread_db* tdbb, Request* request, dsc* descs) const;
+ void assignValues(thread_db* tdbb, Request* request, jrd_rel* relation, Record* record, dsc* to_desc) const;
+};
+
+
class CompoundStmtNode : public TypedNode // blr_begin
{
public:
diff --git a/src/include/firebird/impl/blr.h b/src/include/firebird/impl/blr.h
index 4fc7669a807..c9625762dd8 100644
--- a/src/include/firebird/impl/blr.h
+++ b/src/include/firebird/impl/blr.h
@@ -140,6 +140,7 @@
#define blr_receive_batch (unsigned char)32
// unused code: 33
+#define blr_bulk_insert (unsigned char)33
#define blr_add (unsigned char)34
#define blr_subtract (unsigned char)35
diff --git a/src/jrd/BulkInsert.cpp b/src/jrd/BulkInsert.cpp
new file mode 100644
index 00000000000..c3f11e6a81d
--- /dev/null
+++ b/src/jrd/BulkInsert.cpp
@@ -0,0 +1,493 @@
+/*
+ * PROGRAM: JRD Access Method
+ * MODULE: BulkInsert.cpp
+ * DESCRIPTION: Support for faster inserting of bunch of rows into table
+ *
+ * The contents of this file are subject to the Initial
+ * Developer's Public License Version 1.0 (the "License");
+ * you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl.
+ *
+ * Software distributed under the License is distributed AS IS,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied.
+ * See the License for the specific language governing rights
+ * and limitations under the License.
+ *
+ * The Original Code was created by Vladyslav Khorsun for the
+ * Firebird Open Source RDBMS project.
+ *
+ * Copyright (c) 2025 Vladyslav Khorsun
+ * and all contributors signed below.
+ *
+ * All Rights Reserved.
+ * Contributor(s): ______________________________________.
+ *
+ */
+
+#include "../jrd/BulkInsert.h"
+#include "../jrd/sqz.h"
+#include "../jrd/tra.h"
+#include "../jrd/cch_proto.h"
+#include "../jrd/dpm_proto.h"
+#include "../jrd/ods_proto.h"
+
+
+using namespace Firebird;
+using namespace Ods;
+
+static inline data_page* nextPage(data_page* ptr, ULONG pageSize)
+{
+ UCHAR* p = reinterpret_cast(ptr);
+ p += pageSize;
+ return reinterpret_cast(p);
+};
+
+namespace Jrd
+{
+
+// How many bytes per record should be reserved, see SPACE_FUDGE in dpm.epp
+constexpr unsigned RESERVE_SIZE = (ROUNDUP(RHDF_SIZE, ODS_ALIGNMENT) + sizeof(data_page::dpg_repeat));
+
+BulkInsert::BulkInsert(MemoryPool& pool, thread_db* tdbb, jrd_rel* relation) :
+ PermanentStorage(pool),
+ m_request(tdbb->getRequest())
+{
+ Database* dbb = tdbb->getDatabase();
+
+ m_primary = FB_NEW_POOL(getPool())
+ Buffer(getPool(), dbb->dbb_page_size, (dbb->dbb_flags & DBB_no_reserve) ? 0 : RESERVE_SIZE, true, relation);
+}
+
+void BulkInsert::putRecord(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
+{
+ m_primary->putRecord(tdbb, rpb, transaction);
+}
+
+RecordNumber BulkInsert::putBlob(thread_db* tdbb, blb* blob, Record* record)
+{
+ if (!m_other)
+ m_other = FB_NEW_POOL(getPool()) Buffer(getPool(), m_primary->m_pageSize, 0, false, m_primary->m_relation);
+
+ return m_other->putBlob(tdbb, blob, record);
+}
+
+void BulkInsert::flush(thread_db* tdbb)
+{
+ if (m_other)
+ m_other->flush(tdbb);
+ m_primary->flush(tdbb);
+}
+
+
+BulkInsert::Buffer::Buffer(MemoryPool& pool, ULONG pageSize, ULONG spaceReserve, bool primary, jrd_rel* relation) :
+ PermanentStorage(pool),
+ m_pageSize(pageSize),
+ m_spaceReserve(spaceReserve),
+ m_isPrimary(primary),
+ m_relation(relation),
+ m_buffer(pool),
+ m_highPages(getPool())
+{
+}
+
+void BulkInsert::Buffer::putRecord(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
+{
+ transaction->tra_flags |= TRA_write;
+
+ rpb->rpb_b_page = 0;
+ rpb->rpb_b_line = 0;
+ rpb->rpb_flags = 0;
+ rpb->rpb_transaction_nr = transaction->tra_number;
+
+ Compressor dcc(getPool(), true, true, rpb->rpb_length, rpb->rpb_address);
+ const ULONG packed = dcc.getPackedLength();
+
+ const ULONG header_size = (transaction->tra_number > MAX_ULONG) ? RHDE_SIZE : RHD_SIZE;
+ const ULONG max_data = m_pageSize - sizeof(data_page) - header_size;
+
+ if (packed > max_data)
+ {
+ // store big
+ fragmentRecord(tdbb, rpb, &dcc);
+ return;
+ }
+
+ SLONG fill = (RHDF_SIZE - header_size) - packed;
+ if (fill < 0)
+ fill = 0;
+
+ rhd* header = (rhd*) findSpace(tdbb, rpb, header_size + packed + fill);
+
+ if (auto* record = rpb->rpb_record)
+ {
+ auto& stack = record->getPrecedence();
+ while (stack.hasData())
+ m_highPages[m_index].push(stack.pop());
+ }
+
+ rpb->rpb_flags &= ~rpb_not_packed;
+
+ header->rhd_flags = rpb->rpb_flags;
+ Ods::writeTraNum(header, rpb->rpb_transaction_nr, header_size);
+ header->rhd_format = rpb->rpb_format_number;
+
+ fb_assert(rpb->rpb_b_page == 0);
+ header->rhd_b_page = rpb->rpb_b_page;
+ header->rhd_b_line = rpb->rpb_b_line;
+
+ if (!dcc.isPacked())
+ header->rhd_flags |= rhd_not_packed;
+
+ UCHAR* const data = (UCHAR*) header + header_size;
+
+ dcc.pack(rpb->rpb_address, data);
+
+ if (fill)
+ memset(data + packed, 0, fill);
+}
+
+RecordNumber BulkInsert::Buffer::putBlob(thread_db* tdbb, blb* blob, Record* record)
+{
+ //fb_assert(blob->blb_relation == m_relation);
+
+ Database* dbb = tdbb->getDatabase();
+
+ // Figure out length of blob on page. Remember that blob can either
+ // be a clump of data or a vector of page pointers.
+ USHORT length;
+ const UCHAR* q;
+ PageStack stack;
+ Array buffer;
+
+ blob->storeToPage(&length, buffer, &q, &stack);
+
+ // Locate space to store blob
+
+ record_param rpb;
+ //rpb.getWindow(tdbb).win_flags = 0; redundant.
+
+ rpb.rpb_relation = m_relation; //blob->blb_relation;
+ rpb.rpb_transaction_nr = tdbb->getTransaction()->tra_number;
+ rpb.rpb_flags = rpb_blob;
+
+ blh* header = (blh*) findSpace(tdbb, &rpb, (BLH_SIZE + length));
+
+ while (stack.hasData())
+ m_highPages[m_index].push(stack.pop());
+
+ header->blh_flags = rhd_blob;
+
+ if (blob->blb_flags & BLB_stream)
+ header->blh_flags |= rhd_stream_blob;
+
+ if (blob->getLevel())
+ {
+ header->blh_flags |= rhd_large;
+ markLarge();
+ }
+
+ blob->toPageHeader(header);
+
+ if (length)
+ memcpy(header->blh_page, q, length);
+
+ if (record)
+ {
+ RelationPages* relPages = rpb.rpb_relation->getPages(tdbb);
+ record->pushPrecedence(PageNumber(relPages->rel_pg_space_id, m_current->dpg_header.pag_pageno));
+ }
+
+ return rpb.rpb_number;
+}
+
+void BulkInsert::Buffer::fragmentRecord(thread_db* tdbb, record_param* rpb, Compressor* dcc)
+{
+ Database* dbb = tdbb->getDatabase();
+
+ // Start compression from the end.
+
+ const UCHAR* in = rpb->rpb_address + rpb->rpb_length;
+ RelationPages* relPages = rpb->rpb_relation->getPages(tdbb);
+ PageNumber prior(relPages->rel_pg_space_id, 0);
+
+ // The last fragment should have rhd header because rhd_incomplete flag won't be set for it.
+ // It's important for get_header() function which relies on rhd_incomplete flag to determine header size.
+ ULONG header_size = RHD_SIZE;
+ ULONG max_data = dbb->dbb_page_size - sizeof(data_page) - header_size;
+
+ // Fill up data pages tail first until what's left fits on a single page.
+
+ auto size = dcc->getPackedLength();
+ fb_assert(size > max_data);
+
+ do
+ {
+ // Allocate and format data page and fragment header
+
+ data_page* page = (data_page*) DPM_allocate(tdbb, &rpb->getWindow(tdbb));
+
+ page->dpg_header.pag_type = pag_data;
+ page->dpg_header.pag_flags = dpg_orphan | dpg_full;
+ page->dpg_relation = rpb->rpb_relation->getId();
+ page->dpg_count = 1;
+
+ const auto inLength = dcc->truncateTail(max_data);
+ in -= inLength;
+ size = dcc->getPackedLength();
+
+ const Compressor tailDcc(tdbb, inLength, in);
+ const auto tail_size = tailDcc.getPackedLength();
+ fb_assert(tail_size <= max_data);
+
+ // Cast to (rhdf*) but use only rhd fields for the last fragment
+ rhdf* header = (rhdf*) &page->dpg_rpt[1];
+ page->dpg_rpt[0].dpg_offset = (UCHAR*) header - (UCHAR*) page;
+ page->dpg_rpt[0].dpg_length = tail_size + header_size;
+ header->rhdf_flags = rhd_fragment;
+
+ if (prior.getPageNum())
+ {
+ // This is not the last fragment
+ header->rhdf_flags |= rhd_incomplete;
+ header->rhdf_f_page = prior.getPageNum();
+ }
+
+ if (!tailDcc.isPacked())
+ header->rhdf_flags |= rhd_not_packed;
+
+ const auto out = (UCHAR*) header + header_size;
+ tailDcc.pack(in, out);
+
+ if (prior.getPageNum())
+ CCH_precedence(tdbb, &rpb->getWindow(tdbb), prior);
+
+ CCH_RELEASE(tdbb, &rpb->getWindow(tdbb));
+ prior = rpb->getWindow(tdbb).win_page;
+
+ // Other fragments except the last one should have rhdf header
+ header_size = RHDF_SIZE;
+ max_data = dbb->dbb_page_size - sizeof(data_page) - header_size;
+ } while (size > max_data);
+
+ // What's left fits on a page. Store it somewhere.
+
+ const auto inLength = in - rpb->rpb_address;
+
+ rhdf* header = (rhdf*) findSpace(tdbb, rpb, RHDF_SIZE + size);
+
+ rpb->rpb_flags &= ~rpb_not_packed;
+
+ header->rhdf_flags = rhd_incomplete | rhd_large | rpb->rpb_flags;
+ Ods::writeTraNum(header, rpb->rpb_transaction_nr, RHDF_SIZE);
+ header->rhdf_format = rpb->rpb_format_number;
+ header->rhdf_b_page = rpb->rpb_b_page;
+ header->rhdf_b_line = rpb->rpb_b_line;
+ header->rhdf_f_page = prior.getPageNum();
+ header->rhdf_f_line = 0;
+
+ if (!dcc->isPacked())
+ header->rhdf_flags |= rhd_not_packed;
+
+ dcc->pack(rpb->rpb_address, header->rhdf_data);
+
+ markLarge();
+
+ m_highPages[m_index].push(prior);
+}
+
+void BulkInsert::Buffer::markLarge()
+{
+ m_current->dpg_header.pag_flags |= dpg_large;
+}
+
+UCHAR* BulkInsert::Buffer::findSpace(thread_db* tdbb, record_param* rpb, USHORT size)
+{
+ // record (with header) size, aligned up to ODS_ALIGNMENT
+ const ULONG aligned = ROUNDUP(size, ODS_ALIGNMENT);
+
+ // size to allocate
+ const ULONG alloc = aligned + sizeof(data_page::dpg_repeat);
+
+ // already used slots
+ const ULONG used = (m_current ? m_current->dpg_count : 0);
+
+ if (alloc + m_spaceReserve * (used + 1) > m_freeSpace)
+ {
+ if (m_current)
+ {
+ m_current->dpg_header.pag_flags |= dpg_full;
+
+ // Get next reserved page, or reserve a new set of pages.
+
+ UCHAR* const ptr = reinterpret_cast(nextPage(m_current, m_pageSize));
+ if (ptr + m_pageSize < m_buffer.end())
+ {
+ m_current = reinterpret_cast(ptr);
+ m_index++;
+ }
+ else
+ {
+ flush(tdbb);
+ m_current = nullptr;
+ }
+ }
+
+ if (!m_current)
+ {
+ m_current = allocatePages(tdbb);
+ m_index = 0;
+ }
+
+ m_current->dpg_header.pag_flags |= (m_isPrimary ? dpg_swept : dpg_secondary);
+ m_freeSpace = m_pageSize - sizeof(data_page) + sizeof(data_page::dpg_repeat);
+
+ m_highPages[m_index].push(PageNumber(TRANS_PAGE_SPACE, rpb->rpb_transaction_nr));
+ }
+
+ fb_assert(alloc <= m_freeSpace);
+
+ data_page::dpg_repeat* index = m_current->dpg_rpt + m_current->dpg_count;
+ index->dpg_length = size;
+
+ index->dpg_offset = (m_current->dpg_count > 0) ? index[-1].dpg_offset : m_pageSize;
+ index->dpg_offset -= aligned;
+
+ m_current->dpg_count++;
+ m_freeSpace -= alloc;
+
+ Database* dbb = tdbb->getDatabase();
+ rpb->rpb_number.setValue(((SINT64) m_current->dpg_sequence) * dbb->dbb_max_records + m_current->dpg_count - 1);
+
+ return reinterpret_cast(m_current) + index->dpg_offset;
+}
+
+data_page* BulkInsert::Buffer::allocatePages(thread_db* tdbb)
+{
+ Database* dbb = tdbb->getDatabase();
+ RelationPages* relPages = m_relation->getPages(tdbb);
+
+ WIN window(relPages->rel_pg_space_id, 0);
+
+ const auto reserved = DPM_reserve_pages(tdbb, m_relation, &window);
+
+ if (!m_pages || m_reserved != reserved)
+ {
+ m_pages = reinterpret_cast (m_buffer.getAlignedBuffer(m_pageSize * reserved, ODS_ALIGNMENT));
+ m_highPages.resize(reserved);
+ m_reserved = reserved;
+ }
+
+ fb_assert(m_reserved == reserved);
+
+ auto dpage = reinterpret_cast(window.win_buffer);
+
+ // format data pages in the buffer
+ auto ptr = m_pages;
+ for (auto i = 0; i < m_reserved; i++)
+ {
+ *ptr = *dpage;
+
+ ptr->dpg_header.pag_pageno += i;
+ ptr->dpg_sequence += i;
+
+ ptr = nextPage(ptr, m_pageSize);
+ }
+
+ CCH_RELEASE(tdbb, &window);
+
+ m_current = m_pages;
+ return m_current;
+}
+
+void BulkInsert::Buffer::flush(thread_db* tdbb)
+{
+ if (!m_current)
+ return;
+
+ Database* dbb = tdbb->getDatabase();
+ RelationPages* relPages = m_relation->getPages(tdbb);
+
+ const ULONG pp_sequence = m_current->dpg_sequence / dbb->dbb_dp_per_pp;
+
+ // copy buffered data into buffers in page cache
+ m_current = m_pages;
+ for (auto i = 0; i < m_reserved; i++)
+ {
+ if (m_current->dpg_count == 0)
+ break;
+
+ win dpWindow(relPages->rel_pg_space_id, m_current->dpg_header.pag_pageno);
+
+ auto dpage = CCH_FETCH(tdbb, &dpWindow, LCK_write, pag_data);
+
+ while (m_highPages[i].hasData())
+ CCH_precedence(tdbb, &dpWindow, m_highPages[i].pop());
+
+ CCH_MARK(tdbb, &dpWindow);
+
+ // don't overwrite pag_scn
+ m_current->dpg_header.pag_scn = dpage->pag_scn;
+ memcpy(dpage, m_current, m_pageSize);
+
+ CCH_RELEASE(tdbb, &dpWindow);
+
+ if (m_isPrimary)
+ tdbb->bumpStats(RecordStatType::INSERTS, m_relation->getId(), m_current->dpg_count);
+
+ m_current = nextPage(m_current, m_pageSize);
+ }
+
+ win ppWindow(relPages->rel_pg_space_id, (*relPages->rel_pages)[pp_sequence]);
+ pointer_page* ppage = (pointer_page*) CCH_FETCH(tdbb, &ppWindow, LCK_write, pag_pointer);
+
+ m_current = m_pages;
+ for (auto i = 0; i < m_reserved; i++)
+ {
+ if (m_current->dpg_count == 0)
+ break;
+
+ CCH_precedence(tdbb, &ppWindow, m_current->dpg_header.pag_pageno + i);
+ m_current = nextPage(m_current, m_pageSize);
+ }
+
+ m_current = m_pages;
+
+ CCH_MARK(tdbb, &ppWindow);
+
+ UCHAR* bits = (UCHAR*) (ppage->ppg_page + dbb->dbb_dp_per_pp);
+ const USHORT firstSlot = m_current->dpg_sequence % dbb->dbb_dp_per_pp;
+
+ for (USHORT slot = firstSlot; slot < firstSlot + m_reserved; slot++)
+ {
+ PPG_DP_BIT_CLEAR(bits, slot, ppg_dp_reserved);
+
+ if (m_current->dpg_count > 0)
+ {
+ PPG_DP_BIT_CLEAR(bits, slot, ppg_dp_empty);
+
+ if (m_isPrimary)
+ {
+ PPG_DP_BIT_SET(bits, slot, ppg_dp_swept);
+ PPG_DP_BIT_CLEAR(bits, slot, ppg_dp_secondary);
+ }
+ else
+ {
+ PPG_DP_BIT_CLEAR(bits, slot, ppg_dp_swept);
+ PPG_DP_BIT_SET(bits, slot, ppg_dp_secondary);
+ }
+
+ if (m_current->dpg_header.pag_flags & dpg_full)
+ PPG_DP_BIT_SET(bits, slot, ppg_dp_full);
+
+ if (m_current->dpg_header.pag_flags & dpg_large)
+ PPG_DP_BIT_SET(bits, slot, ppg_dp_large);
+ }
+
+ m_current = nextPage(m_current, m_pageSize);
+ }
+
+ CCH_RELEASE(tdbb, &ppWindow);
+}
+
+}; // namespace Jrd
diff --git a/src/jrd/BulkInsert.h b/src/jrd/BulkInsert.h
new file mode 100644
index 00000000000..067c16ab0d0
--- /dev/null
+++ b/src/jrd/BulkInsert.h
@@ -0,0 +1,107 @@
+/*
+ * PROGRAM: JRD Access Method
+ * MODULE: BulkInsert.h
+ * DESCRIPTION: Support for faster inserting of bunch of rows into table
+ *
+ * The contents of this file are subject to the Initial
+ * Developer's Public License Version 1.0 (the "License");
+ * you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl.
+ *
+ * Software distributed under the License is distributed AS IS,
+ * WITHOUT WARRANTY OF ANY KIND, either express or implied.
+ * See the License for the specific language governing rights
+ * and limitations under the License.
+ *
+ * The Original Code was created by Vladyslav Khorsun for the
+ * Firebird Open Source RDBMS project.
+ *
+ * Copyright (c) 2025 Vladyslav Khorsun
+ * and all contributors signed below.
+ *
+ * All Rights Reserved.
+ * Contributor(s): ______________________________________.
+ *
+ */
+
+#ifndef JRD_BULKINSERT
+#define JRD_BULKINSERT
+
+#include "firebird.h"
+#include "../common/classes/alloc.h"
+#include "../common/classes/array.h"
+#include "../jrd/jrd.h"
+#include "../jrd/ods.h"
+#include "../jrd/pag.h"
+#include "../jrd/RecordNumber.h"
+
+
+namespace Jrd
+{
+
+class Compressor;
+class jrd_rel;
+class jrd_tra;
+struct record_param;
+class Record;
+class Request;
+
+class BulkInsert : public Firebird::PermanentStorage
+{
+public:
+ BulkInsert(Firebird::MemoryPool& pool, thread_db* tdbb, jrd_rel* relation);
+
+ void putRecord(thread_db* tdbb, record_param* rpb, jrd_tra* transaction);
+ RecordNumber putBlob(thread_db* tdbb, blb* blob, Record* record);
+ void flush(thread_db* tdbb);
+
+ Request* getRequest() const
+ {
+ return m_request;
+ }
+
+ jrd_rel* getRelation() const
+ {
+ return m_primary->m_relation;
+ }
+
+private:
+ struct Buffer : public Firebird::PermanentStorage
+ {
+ Buffer(Firebird::MemoryPool& pool, ULONG pageSize, ULONG spaceReserve, bool primary,
+ jrd_rel* relation);
+
+ void putRecord(thread_db* tdbb, record_param* rpb, jrd_tra* transaction);
+ RecordNumber putBlob(thread_db* tdbb, blb* blob, Record* record);
+ void flush(thread_db* tdbb);
+
+ // allocate and reserve data pages
+ Ods::data_page* allocatePages(thread_db* tdbb);
+ UCHAR* findSpace(thread_db* tdbb, record_param* rpb, USHORT size);
+ void fragmentRecord(thread_db* tdbb, record_param* rpb, Compressor* dcc);
+ void markLarge();
+
+ const ULONG m_pageSize;
+ const ULONG m_spaceReserve;
+ const bool m_isPrimary;
+ jrd_rel* const m_relation;
+
+ Firebird::Array m_buffer; // buffer for data pages
+ Ods::data_page* m_pages = nullptr; // first DP in buffer
+ Ods::data_page* m_current = nullptr; // current DP to put records
+ unsigned m_index = 0; // index of the current page [0..m_reserved)
+ Firebird::ObjectsArray m_highPages; // high precedence pages, per data page
+ ULONG m_freeSpace = 0; // free space on current DP
+ USHORT m_reserved = 0; // count of reserved pages
+ };
+
+ Request* const m_request; // "owner" request that will destroy this object on unwind
+
+ Firebird::AutoPtr m_primary;
+ Firebird::AutoPtr m_other;
+};
+
+}; // namespace Jrd
+
+#endif // JRD_BULKINSERT
diff --git a/src/jrd/blb.cpp b/src/jrd/blb.cpp
index f7e141a1be8..ccc50e7e1e6 100644
--- a/src/jrd/blb.cpp
+++ b/src/jrd/blb.cpp
@@ -69,6 +69,7 @@
#include "../jrd/mov_proto.h"
#include "../jrd/pag_proto.h"
#include "../jrd/scl_proto.h"
+#include "../jrd/BulkInsert.h"
#include "../common/sdl_proto.h"
#include "../common/dsc_proto.h"
#include "../common/classes/array.h"
@@ -1258,10 +1259,18 @@ void blb::move(thread_db* tdbb, dsc* from_desc, dsc* to_desc,
#ifdef CHECK_BLOB_FIELD_ACCESS_FOR_SELECT
blob->blb_fld_id = fieldId;
#endif
- if (bulk)
- blob->blb_flags |= BLB_bulk;
+ BulkInsert* bulkInsert = bulk ? transaction->getBulkInsert(tdbb, relation, false) : nullptr;
+
+ if (bulkInsert)
+ destination->set_permanent(relation->getId(), bulkInsert->putBlob(tdbb, blob, record));
+ else
+ {
+ if (bulk)
+ blob->blb_flags |= BLB_bulk;
+
+ destination->set_permanent(relation->getId(), DPM_store_blob(tdbb, blob, relation, record));
+ }
- destination->set_permanent(relation->getId(), DPM_store_blob(tdbb, blob, relation, record));
// This is the only place in the engine where blobs are materialized
// If new places appear code below should transform to common sub-routine
if (materialized_blob)
diff --git a/src/jrd/blp.h b/src/jrd/blp.h
index 797bfa94b38..df9f8ee33d4 100644
--- a/src/jrd/blp.h
+++ b/src/jrd/blp.h
@@ -58,7 +58,7 @@ static inline constexpr struct
{"minimum", two}, // 30
{"total", two},
{"receive_batch", byte_verb},
- {NULL, NULL},
+ {"bulk_insert", three},
{"add", two},
{"subtract", two},
{"multiply", two},
diff --git a/src/jrd/dpm.epp b/src/jrd/dpm.epp
index 3feddb21b9a..98e42950514 100644
--- a/src/jrd/dpm.epp
+++ b/src/jrd/dpm.epp
@@ -85,7 +85,7 @@ static void check_swept(thread_db*, record_param*);
static USHORT compress(thread_db*, data_page*);
static void delete_tail(thread_db*, rhdf*, const USHORT, USHORT);
static void fragment(thread_db*, record_param*, SSHORT, Compressor&, SSHORT, const jrd_tra*);
-static void extend_relation(thread_db*, Cached::Relation*, WIN*, const Jrd::RecordStorageType type);
+static USHORT extend_relation(thread_db*, Cached::Relation*, WIN*, const Jrd::RecordStorageType type, bool reserve);
static UCHAR* find_space(thread_db*, record_param*, SSHORT, PageStack&, Record*, const Jrd::RecordStorageType type);
static bool get_header(WIN*, USHORT, record_param*);
static pointer_page* get_pointer_page(thread_db*, RelationPermanent*, RelationPages*, WIN*, ULONG, USHORT);
@@ -1201,6 +1201,13 @@ void DPM_delete_relation_pages(Jrd::thread_db* tdbb, Jrd::RelationPermanent* rel
relPages->rel_index_root = 0;
}
+// Reserve pages for bulk insert, return count of reserved pages.
+// window points to the first reserved page locked for write.
+USHORT DPM_reserve_pages(thread_db* tdbb, jrd_rel* relation, WIN* window)
+{
+ return extend_relation(tdbb, relation->getPermanent(), window, DPM_primary, true);
+}
+
bool DPM_fetch(thread_db* tdbb, record_param* rpb, USHORT lock)
{
/**************************************
@@ -1868,6 +1875,7 @@ bool DPM_next(thread_db* tdbb, record_param* rpb, USHORT lock_type, FindNextReco
const UCHAR* bits = (UCHAR*) (ppage->ppg_page + dbb->dbb_dp_per_pp);
if (page_number && !PPG_DP_BIT_TEST(bits, slot, ppg_dp_secondary) &&
!PPG_DP_BIT_TEST(bits, slot, ppg_dp_empty) &&
+ !PPG_DP_BIT_TEST(bits, slot, ppg_dp_reserved) &&
(!sweeper || !PPG_DP_BIT_TEST(bits, slot, ppg_dp_swept)) )
{
#ifdef SUPERSERVER_V2
@@ -3138,7 +3146,8 @@ static void fragment(thread_db* tdbb,
}
-static void extend_relation(thread_db* tdbb, Cached::Relation* relation, WIN* window, const Jrd::RecordStorageType type)
+static USHORT extend_relation(thread_db* tdbb, Cached::Relation* relation, WIN* window,
+ RecordStorageType type, bool reserve)
{
/**************************************
*
@@ -3240,6 +3249,8 @@ static void extend_relation(thread_db* tdbb, Cached::Relation* relation, WIN* wi
// - next PAGES_IN_EXTENT-1 slots also empty
if ((slot % PAGES_IN_EXTENT == 0) && (ppage->ppg_count >= PAGES_IN_EXTENT || pp_sequence))
{
+ fb_assert(slot + PAGES_IN_EXTENT <= dbb->dbb_dp_per_pp);
+
cntAlloc = PAGES_IN_EXTENT;
for (USHORT i = 0; i < PAGES_IN_EXTENT; i++)
@@ -3295,6 +3306,9 @@ static void extend_relation(thread_db* tdbb, Cached::Relation* relation, WIN* wi
else
PPG_DP_BIT_SET(bits, slot, ppg_dp_secondary);
+ if (reserve)
+ PPG_DP_BIT_SET(bits, slot, ppg_dp_reserved);
+
for (unsigned i = 1; i < cntAlloc; i++)
{
fb_assert(ppage->ppg_page[slot + i] == 0);
@@ -3302,17 +3316,23 @@ static void extend_relation(thread_db* tdbb, Cached::Relation* relation, WIN* wi
PPG_DP_BIT_CLEAR(bits, slot + i, PPG_DP_ALL_BITS);
PPG_DP_BIT_SET(bits, slot + i, ppg_dp_empty);
- }
- if (type == DPM_primary)
- {
- relPages->rel_pri_data_space = pp_sequence;
- relPages->rel_sec_data_space = MIN(pp_sequence, relPages->rel_sec_data_space);
+ if (reserve)
+ PPG_DP_BIT_SET(bits, slot + i, ppg_dp_reserved);
}
- else
+
+ if (!reserve)
{
- relPages->rel_sec_data_space = pp_sequence;
- relPages->rel_pri_data_space = MIN(pp_sequence, relPages->rel_pri_data_space);
+ if (type == DPM_primary)
+ {
+ relPages->rel_pri_data_space = pp_sequence;
+ relPages->rel_sec_data_space = MIN(pp_sequence, relPages->rel_sec_data_space);
+ }
+ else
+ {
+ relPages->rel_sec_data_space = pp_sequence;
+ relPages->rel_pri_data_space = MIN(pp_sequence, relPages->rel_pri_data_space);
+ }
}
relPages->rel_data_pages += cntAlloc;
@@ -3325,6 +3345,7 @@ static void extend_relation(thread_db* tdbb, Cached::Relation* relation, WIN* wi
" extended_relation (relation %d, window_page %" ULONGFORMAT")\n",
relation->getId(), window->win_page.getPageNum());
#endif
+ return cntAlloc;
}
@@ -3447,7 +3468,7 @@ static UCHAR* find_space(thread_db* tdbb,
rpb->rpb_number.setValue(((SINT64) page->dpg_sequence) * dbb->dbb_max_records + slot);
if (record)
- record->pushPrecedence(PageNumber(DB_PAGE_SPACE, rpb->rpb_page));
+ record->pushPrecedence(rpb->getWindow(tdbb).win_page);
if (page->dpg_count == 1)
{
@@ -3703,6 +3724,9 @@ static rhd* locate_space(thread_db* tdbb,
if (PPG_DP_BIT_TEST(bits, slot, ppg_dp_full))
continue;
+ if (PPG_DP_BIT_TEST(bits, slot, ppg_dp_reserved))
+ continue;
+
// hvlad: avoid creating circle in precedence graph, if possible
if (type == DPM_secondary && lowPages.exist(dp_number))
continue;
@@ -3807,7 +3831,7 @@ static rhd* locate_space(thread_db* tdbb,
int i;
for (i = 0; i < 20; ++i)
{
- extend_relation(tdbb, relation, window, type);
+ extend_relation(tdbb, relation, window, type, false);
space = find_space(tdbb, rpb, size, stack, record, type);
if (space)
diff --git a/src/jrd/dpm_proto.h b/src/jrd/dpm_proto.h
index a882ec53548..20f9c342f41 100644
--- a/src/jrd/dpm_proto.h
+++ b/src/jrd/dpm_proto.h
@@ -64,6 +64,7 @@ void DPM_create_relation(Jrd::thread_db*, Jrd::Cached::Relation*);
ULONG DPM_data_pages(Jrd::thread_db*, Jrd::Cached::Relation*);
void DPM_delete(Jrd::thread_db*, Jrd::record_param*, ULONG);
void DPM_delete_relation(Jrd::thread_db*, Jrd::RelationPermanent*);
+USHORT DPM_reserve_pages(Jrd::thread_db*, Jrd::jrd_rel*, Jrd::win*);
bool DPM_fetch(Jrd::thread_db*, Jrd::record_param*, USHORT);
bool DPM_fetch_back(Jrd::thread_db*, Jrd::record_param*, USHORT, SSHORT);
void DPM_fetch_fragment(Jrd::thread_db*, Jrd::record_param*, USHORT);
diff --git a/src/jrd/exe.cpp b/src/jrd/exe.cpp
index 0ec9deafabd..efd4bba8a79 100644
--- a/src/jrd/exe.cpp
+++ b/src/jrd/exe.cpp
@@ -1267,6 +1267,10 @@ void EXE_unwind(thread_db* tdbb, Request* request)
}
request->req_sorts.unlinkAll();
+
+ if (request->req_transaction)
+ request->req_transaction->finiBulkInsert(tdbb, request);
+
TRA_release_request_snapshot(tdbb, request);
TRA_detach_request(request);
diff --git a/src/jrd/ods.h b/src/jrd/ods.h
index 8703ad46b17..a8a0f62b89d 100644
--- a/src/jrd/ods.h
+++ b/src/jrd/ods.h
@@ -845,6 +845,7 @@ inline constexpr UCHAR ppg_dp_large = 0x02; // Large object is on data page
inline constexpr UCHAR ppg_dp_swept = 0x04; // Sweep has nothing to do on data page
inline constexpr UCHAR ppg_dp_secondary = 0x08; // Primary record versions not stored on data page
inline constexpr UCHAR ppg_dp_empty = 0x10; // Data page is empty
+inline constexpr UCHAR ppg_dp_reserved = 0x20; // Slot is reserved for bulk insert
inline constexpr UCHAR PPG_DP_ALL_BITS = (1 << PPG_DP_BITS_NUM) - 1;
diff --git a/src/jrd/tra.cpp b/src/jrd/tra.cpp
index 259664f3f3d..7cb02106b4c 100644
--- a/src/jrd/tra.cpp
+++ b/src/jrd/tra.cpp
@@ -76,6 +76,7 @@
#include "../jrd/Collation.h"
#include "../jrd/Mapping.h"
#include "../jrd/DbCreators.h"
+#include "../jrd/BulkInsert.h"
#include "../common/os/fbsyslog.h"
#include "../jrd/Resources.h"
#include "firebird/impl/msg_helper.h"
@@ -486,6 +487,10 @@ void TRA_commit(thread_db* tdbb, jrd_tra* transaction, const bool retaining_flag
if (!(transaction->tra_flags & TRA_prepared))
DFW_perform_work(tdbb, transaction);
+ // Finish bulk insert operation, if any
+
+ transaction->finiBulkInsert(tdbb, true);
+
// Commit associated transaction in security DB
SecDbContext* secContext = transaction->getSecDbContext();
@@ -1079,6 +1084,10 @@ void TRA_prepare(thread_db* tdbb, jrd_tra* transaction, USHORT length, const UCH
DFW_perform_work(tdbb, transaction);
+ // Finish bulk insert operation, if any
+
+ transaction->finiBulkInsert(tdbb, true);
+
// Flush pages if transaction logically modified data
const jrd_tra* sysTran = tdbb->getAttachment()->getSysTransaction();
@@ -1340,6 +1349,10 @@ void TRA_rollback(thread_db* tdbb, jrd_tra* transaction, const bool retaining_fl
if (transaction->tra_flags & (TRA_prepare2 | TRA_reconnected))
MET_update_transaction(tdbb, transaction, false);
+ // Finish bulk insert operation, if any
+
+ transaction->finiBulkInsert(tdbb, false);
+
// If force flag is true, get rid of all savepoints to mark the transaction as dead
if (force_flag || (transaction->tra_flags & TRA_invalidated))
{
@@ -4213,6 +4226,42 @@ void jrd_tra::checkBlob(thread_db* tdbb, const bid* blob_id, jrd_fld* fld, bool
}
}
+BulkInsert* jrd_tra::getBulkInsert(thread_db* tdbb, jrd_rel* relation, bool create)
+{
+ if (tra_bulkInsert)
+ {
+ if (tra_bulkInsert->getRelation() == relation)
+ return tra_bulkInsert;
+
+ // only one bulk insert at a time supported currently
+ return nullptr;
+ }
+ else if (create)
+ {
+ tra_bulkInsert = FB_NEW_POOL(*tra_pool) BulkInsert(*tra_pool, tdbb, relation);
+ }
+
+ return tra_bulkInsert;
+}
+
+void jrd_tra::finiBulkInsert(thread_db* tdbb, bool commit)
+{
+ if (tra_bulkInsert)
+ {
+ tra_bulkInsert->flush(tdbb);
+
+ delete tra_bulkInsert;
+ tra_bulkInsert = nullptr;
+ }
+}
+
+void jrd_tra::finiBulkInsert(thread_db* tdbb, Request* request)
+{
+ if (tra_bulkInsert && tra_bulkInsert->getRequest() == request)
+ finiBulkInsert(tdbb, true);
+}
+
+
/// class TraceSweepEvent
TraceSweepEvent::TraceSweepEvent(thread_db* tdbb)
diff --git a/src/jrd/tra.h b/src/jrd/tra.h
index b4e55e28711..fdac2dc265d 100644
--- a/src/jrd/tra.h
+++ b/src/jrd/tra.h
@@ -69,6 +69,7 @@ class MappingList;
class DbCreatorsList;
class thread_db;
class Resources;
+class BulkInsert;
class SecDbContext
{
@@ -330,6 +331,7 @@ class jrd_tra final : public pool_alloc
MemoryPool* tra_autonomous_pool;
USHORT tra_autonomous_cnt;
static constexpr USHORT TRA_AUTONOMOUS_PER_POOL = 64;
+ BulkInsert* tra_bulkInsert = nullptr;
public:
Firebird::Array tra_dependencies;
@@ -418,6 +420,15 @@ class jrd_tra final : public pool_alloc
return tra_gen_ids;
}
+
+ // Get existing or create new BulkInsert for the relation.
+ BulkInsert* getBulkInsert(thread_db* tdbb, jrd_rel* relation, bool create);
+
+ // Finish and delete BulkInsert, if exists.
+ void finiBulkInsert(thread_db* tdbb, bool commit);
+
+ // Finish and delete BulkInsert that belongs to the request
+ void finiBulkInsert(thread_db* tdbb, Request* request);
};
// System transaction is always transaction 0.
diff --git a/src/jrd/validation.cpp b/src/jrd/validation.cpp
index 183e2489a34..88a039b789d 100644
--- a/src/jrd/validation.cpp
+++ b/src/jrd/validation.cpp
@@ -651,6 +651,13 @@ static void explain_pp_bits(const UCHAR bits, Firebird::string& names)
names.append(", ");
names.append("empty");
}
+
+ if (bits & ppg_dp_reserved)
+ {
+ if (!names.empty())
+ names.append(", ");
+ names.append("reserved");
+ }
}