From 547bed3173c4a2ed160338d7629c028a5854c9ee Mon Sep 17 00:00:00 2001 From: Valerie Kwek Date: Sun, 1 Dec 2024 15:24:58 -0500 Subject: [PATCH] Lab 4 --- godb/buffer_pool.go | 381 +++++++++++++++++---------------------- godb/buffer_pool_copy.go | 291 ++++++++++++++++++++++++++++++ godb/heap_file.go | 30 ++- godb/validation_test.go | 227 +++++++++++++++++++++++ 4 files changed, 709 insertions(+), 220 deletions(-) create mode 100644 godb/buffer_pool_copy.go create mode 100644 godb/validation_test.go diff --git a/godb/buffer_pool.go b/godb/buffer_pool.go index e2b2afd..42a0ff3 100644 --- a/godb/buffer_pool.go +++ b/godb/buffer_pool.go @@ -1,17 +1,11 @@ package godb -//BufferPool provides methods to cache pages that have been read from disk. -//It has a fixed capacity to limit the total amount of memory used by GoDB. -//It is also the primary way in which transactions are enforced, by using page -//level locking (you will not need to worry about this until lab3). - import ( "fmt" "sync" - "time" ) -// Permissions used to when reading / locking pages +// Permissions used when reading / locking pages type RWPerm int const ( @@ -19,81 +13,41 @@ const ( WritePerm RWPerm = iota ) -type BufferPool struct { - // TODO: some code goes here - pages map[any]Page - numPages int - currPage int - mutex sync.Mutex - pageMutexes map[any]*pageLock - dirtyPages map[TransactionID][]any - sharedPages map[TransactionID][]any - dependencyGraph map[TransactionID][]TransactionID - runningTransactions []TransactionID -} +type TransactionPhase int -type pageLock struct { - sharedLocks int - exclusiveLock TransactionID - mutex sync.Mutex - permittedTids []TransactionID +const ( + ReadPhase TransactionPhase = iota + ValidationPhase + WritePhase +) + +type BufferPool struct { + pages map[any]Page + transactionPages map[TransactionID]map[any]Page + numPages int + currPage int + mutex sync.Mutex + dirtyPages map[TransactionID][]any + sharedPages map[TransactionID][]any + runningTransactions map[TransactionID]TransactionPhase + concurrentAccessRecord map[TransactionID]map[TransactionID]map[any]RWPerm } // Create a new BufferPool with the specified number of pages func NewBufferPool(numPages int) (*BufferPool, error) { return &BufferPool{ - pages: make(map[any]Page), - numPages: numPages, - currPage: 0, - mutex: sync.Mutex{}, - pageMutexes: make(map[any]*pageLock), - dirtyPages: make(map[TransactionID][]any), - sharedPages: make(map[TransactionID][]any), - dependencyGraph: make(map[TransactionID][]TransactionID), - runningTransactions: []TransactionID{}, + pages: make(map[any]Page), + transactionPages: make(map[TransactionID]map[any]Page), + numPages: numPages, + currPage: 0, + mutex: sync.Mutex{}, + dirtyPages: make(map[TransactionID][]any), + sharedPages: make(map[TransactionID][]any), + runningTransactions: make(map[TransactionID]TransactionPhase), + concurrentAccessRecord: make(map[TransactionID]map[TransactionID]map[any]RWPerm), }, nil } -// helper function to check if transaction id is contained in list of transaction ids -func contains(list []TransactionID, value TransactionID) bool { - for _, v := range list { - if v == value { - return true - } - } - return false -} - -// helper function to remove transaction id from list of transaction ids -func remove(list []TransactionID, value TransactionID) []TransactionID { - for i, v := range list { - if v == value { - return append(list[:i], list[i+1:]...) - } - } - return list -} - -// helper function to detect cycle in dependency graph including tid using BFS -func (bp *BufferPool) cycle(tid TransactionID) bool { - seen := make(map[TransactionID]bool) - var queue []TransactionID - queue = append(queue, tid) - for len(queue) != 0 { - curr := queue[0] - queue = queue[1:] - if seen[curr] { - return true - } - seen[curr] = true - queue = append(queue, bp.dependencyGraph[tid]...) - } - return false -} - -// Testing method -- iterate through all pages in the buffer pool -// and flush them using [DBFile.flushPage]. Does not need to be thread/transaction safe. -// Mark pages as not dirty after flushing them. func (bp *BufferPool) FlushAllPages() { // TODO: some code goes here for _, page := range bp.pages { @@ -105,183 +59,188 @@ func (bp *BufferPool) FlushAllPages() { bp.currPage = 0 } -// Abort the transaction, releasing locks. Because GoDB is FORCE/NO STEAL, none -// of the pages tid has dirtied will be on disk so it is sufficient to just -// release locks to abort. You do not need to implement this for lab 1. +// helper function to find in map +func (bp *BufferPool) ExistAccess(pageKey any, tid TransactionID, dirty bool) bool { + var pages []any + if dirty { + pages = bp.dirtyPages[tid] + } else { + pages = bp.sharedPages[tid] + } + for _, key := range pages { + if key == pageKey { + return true + } + } + return false +} + +// Abort a transaction and clean up resources func (bp *BufferPool) AbortTransaction(tid TransactionID) { - // TODO: some code goes here bp.mutex.Lock() defer bp.mutex.Unlock() + // aborted transaction no longer conflicts with other concurrent transactions + for conflictTid, accessMap := range bp.concurrentAccessRecord { + if conflictTid == tid { + continue + } + if _, exists := accessMap[tid]; exists { + delete(bp.concurrentAccessRecord[conflictTid], tid) + } + } for _, dirtyKey := range bp.dirtyPages[tid] { - bp.pageMutexes[dirtyKey].exclusiveLock = 0 - bp.pageMutexes[dirtyKey].permittedTids = remove(bp.pageMutexes[dirtyKey].permittedTids, tid) delete(bp.pages, dirtyKey) } - for _, sharedKey := range bp.sharedPages[tid] { - bp.pageMutexes[sharedKey].sharedLocks -= 1 - bp.pageMutexes[sharedKey].permittedTids = remove(bp.pageMutexes[sharedKey].permittedTids, tid) - } - bp.sharedPages[tid] = []any{} - bp.dirtyPages[tid] = []any{} - delete(bp.dependencyGraph, tid) - remove(bp.runningTransactions, tid) + delete(bp.transactionPages, tid) + delete(bp.concurrentAccessRecord, tid) + delete(bp.dirtyPages, tid) + delete(bp.sharedPages, tid) + delete(bp.runningTransactions, tid) } -// Commit the transaction, releasing locks. Because GoDB is FORCE/NO STEAL, none -// of the pages tid has dirtied will be on disk, so prior to releasing locks you -// should iterate through pages and write them to disk. In GoDB lab3 we assume -// that the system will not crash while doing this, allowing us to avoid using a -// WAL. You do not need to implement this for lab 1. +// Commit a transaction with OCC validation func (bp *BufferPool) CommitTransaction(tid TransactionID) { - // TODO: some code goes here bp.mutex.Lock() - defer bp.mutex.Unlock() - for pageKey, page := range bp.pages { - for _, dirtyKey := range bp.dirtyPages[tid] { - if dirtyKey == pageKey { - dbfile := page.getFile() - dbfile.(*HeapFile).flushPage(page) - page.setDirty(0, false) + + // Validation phase + bp.runningTransactions[tid] = ValidationPhase + // Check for conflicts + for otherTid, accessMap := range bp.concurrentAccessRecord[tid] { + if otherTid == tid { + continue + } + for pageKey, accessPerm := range accessMap { + // 1) W(Ti) ∩ R(Tj) ≠ { }, and Ti does not finish writing before Tj starts + if accessPerm == WritePerm && bp.ExistAccess(pageKey, tid, false) { + bp.mutex.Unlock() + bp.AbortTransaction(tid) + return + } + // W(Ti) ∩ (W(Tj) U R(Tj)) ≠ { }, and Tj overlaps with Ti validation or write phase + if accessPerm == WritePerm && bp.ExistAccess(pageKey, tid, true) { + bp.mutex.Unlock() + bp.AbortTransaction(tid) + return } } } - for _, dirtyKey := range bp.dirtyPages[tid] { - bp.pageMutexes[dirtyKey].exclusiveLock = 0 - bp.pageMutexes[dirtyKey].permittedTids = remove(bp.pageMutexes[dirtyKey].permittedTids, tid) + + // Write phase + bp.runningTransactions[tid] = WritePhase + // Commit + for pageKey, pageCopy := range bp.transactionPages[tid] { + bp.pages[pageKey] = pageCopy + if pageCopy.isDirty() { + dbfile := pageCopy.getFile() + dbfile.(*HeapFile).flushPage(pageCopy) + pageCopy.setDirty(0, false) + } } - for _, sharedKey := range bp.sharedPages[tid] { - bp.pageMutexes[sharedKey].sharedLocks -= 1 - bp.pageMutexes[sharedKey].permittedTids = remove(bp.pageMutexes[sharedKey].permittedTids, tid) + // Add as concurrent access/potential conflict to all tids in concurrent access record + for otherTid := range bp.concurrentAccessRecord { + if otherTid != tid { + if _, exists := bp.concurrentAccessRecord[otherTid][tid]; !exists { + bp.concurrentAccessRecord[otherTid][tid] = make(map[any]RWPerm) + } + for _, pageKey := range bp.sharedPages[tid] { + bp.concurrentAccessRecord[otherTid][tid][pageKey] = ReadPerm + } + for _, pageKey := range bp.dirtyPages[tid] { + bp.concurrentAccessRecord[otherTid][tid][pageKey] = WritePerm + } + } } - bp.sharedPages[tid] = []any{} - bp.dirtyPages[tid] = []any{} - remove(bp.runningTransactions, tid) + // Clear transaction records + delete(bp.transactionPages, tid) + delete(bp.concurrentAccessRecord, tid) + delete(bp.dirtyPages, tid) + delete(bp.sharedPages, tid) + delete(bp.runningTransactions, tid) + bp.mutex.Unlock() } // Begin a new transaction. You do not need to implement this for lab 1. // // Returns an error if the transaction is already running. func (bp *BufferPool) BeginTransaction(tid TransactionID) error { - // TODO: some code goes here - for _, runningTid := range bp.runningTransactions { - if runningTid == tid { - return fmt.Errorf("transaction is already running") - } + bp.mutex.Lock() + defer bp.mutex.Unlock() + + if _, exists := bp.runningTransactions[tid]; exists { + return fmt.Errorf("transaction is already running") } + + bp.concurrentAccessRecord[tid] = make(map[TransactionID]map[any]RWPerm) + bp.runningTransactions[tid] = ReadPhase + bp.concurrentAccessRecord[tid] = make(map[TransactionID]map[any]RWPerm) + bp.transactionPages[tid] = make(map[any]Page) return nil } -// Retrieve the specified page from the specified DBFile (e.g., a HeapFile), on -// behalf of the specified transaction. If a page is not cached in the buffer pool, -// you can read it from disk uing [DBFile.readPage]. If the buffer pool is full (i.e., -// already stores numPages pages), a page should be evicted. Should not evict -// pages that are dirty, as this would violate NO STEAL. If the buffer pool is -// full of dirty pages, you should return an error. Before returning the page, -// attempt to lock it with the specified permission. If the lock is -// unavailable, should block until the lock is free. If a deadlock occurs, abort -// one of the transactions in the deadlock. For lab 1, you do not need to -// implement locking or deadlock detection. You will likely want to store a list -// of pages in the BufferPool in a map keyed by the [DBFile.pageKey]. func (bp *BufferPool) GetPage(file DBFile, pageNo int, tid TransactionID, perm RWPerm) (Page, error) { bp.mutex.Lock() + defer bp.mutex.Unlock() pageKey := file.pageKey(pageNo) - - _, exists := bp.pageMutexes[pageKey] - if !exists { - bp.pageMutexes[pageKey] = &pageLock{ - sharedLocks: 0, - exclusiveLock: 0, - mutex: sync.Mutex{}, - permittedTids: []TransactionID{}, - } - } - - _, inBuffer := bp.pages[pageKey] - - if inBuffer { - pageMutex, ok := bp.pageMutexes[pageKey] - if !ok { - return nil, fmt.Errorf("page mutex not found") - } - if perm == WritePerm { - for _, dependentTid := range pageMutex.permittedTids { - if dependentTid != tid { - bp.dependencyGraph[tid] = append(bp.dependencyGraph[tid], dependentTid) + // Check if the transaction already has a copy of the page + if _, exists := bp.transactionPages[tid][pageKey]; !exists { + // If no copy exists, check if buffer pool has space + load the original page from the file + pageKey := file.pageKey(pageNo) + _, exists := bp.pages[pageKey] + if !exists { + numDirty := 0 + for _, page := range bp.pages { + if page.isDirty() { + numDirty += 1 } } - if bp.cycle(tid) { - bp.mutex.Unlock() - bp.AbortTransaction(tid) - return nil, fmt.Errorf("aborted transaction due to deadlock") + if numDirty == bp.numPages { + return nil, fmt.Errorf("buffer is full of dirty pages") } - } - bp.mutex.Unlock() - for { - bp.mutex.Lock() - if perm == ReadPerm && (pageMutex.exclusiveLock == 0 || contains(pageMutex.permittedTids, tid)) { - pageMutex.sharedLocks += 1 - pageMutex.permittedTids = append(pageMutex.permittedTids, tid) - bp.sharedPages[tid] = append(bp.sharedPages[tid], pageKey) - break + + page, err := file.readPage(pageNo) + if err != nil { + return nil, fmt.Errorf("could not read page") } - if perm == WritePerm && ((pageMutex.exclusiveLock == 0 && pageMutex.sharedLocks == 0) || contains(pageMutex.permittedTids, tid)) { - pageMutex.exclusiveLock = 1 - pageMutex.permittedTids = append(pageMutex.permittedTids, tid) - bp.dirtyPages[tid] = append(bp.dirtyPages[tid], pageKey) - break + if len(bp.pages) == bp.numPages { + for _, page := range bp.pages { + if !page.isDirty() { + delete(bp.pages, page.getFile().pageKey(page.(*heapPage).PageNo)) + break + } + } } - bp.mutex.Unlock() - time.Sleep(1000) + bp.pages[pageKey] = page + bp.currPage++ } - page := bp.pages[pageKey] - bp.mutex.Unlock() - return page, nil - } - - numDirty := 0 - for _, page := range bp.pages { - if page.isDirty() { - numDirty += 1 + originalPage := bp.pages[pageKey] + + // Create a new Page and copy the contents of the original page for writes + hp := originalPage.(*heapPage) + tuplesCopy := make([]*Tuple, len(hp.tuples)) + copy(tuplesCopy, hp.tuples) + newPage := &heapPage{ + Desc: hp.Desc, + PageNo: hp.PageNo, + HeapF: hp.HeapF, + tuples: tuplesCopy, + IsDirty: hp.IsDirty, + numUsed: hp.numUsed, + numSlots: hp.numSlots, + emptySlots: append([]int{}, hp.emptySlots...), } - } - if numDirty == bp.numPages { - bp.mutex.Unlock() - return nil, fmt.Errorf("buffer is full of dirty pages") - } - - page, err := file.readPage(pageNo) - if err != nil { - bp.mutex.Unlock() - return nil, fmt.Errorf("could not read page") - } - if len(bp.pages) == bp.numPages { - for _, page := range bp.pages { - if !page.isDirty() { - delete(bp.pages, page.getFile().pageKey(page.(*heapPage).PageNo)) - break - } + if _, exists := bp.transactionPages[tid]; !exists { + bp.transactionPages[tid] = make(map[any]Page) } + bp.transactionPages[tid][pageKey] = newPage } - bp.pages[pageKey] = page - bp.currPage++ - bp.pageMutexes[pageKey] = &pageLock{ - sharedLocks: 0, - exclusiveLock: 0, - mutex: sync.Mutex{}, - permittedTids: []TransactionID{}, - } - pageMutex := bp.pageMutexes[pageKey] - if perm == ReadPerm { - pageMutex.sharedLocks += 1 - pageMutex.permittedTids = append(pageMutex.permittedTids, tid) - bp.sharedPages[tid] = append(bp.sharedPages[tid], pageKey) - } + + // Record access in shared pages or dirty pages data structure if perm == WritePerm { - pageMutex.exclusiveLock = 1 - pageMutex.permittedTids = append(pageMutex.permittedTids, tid) bp.dirtyPages[tid] = append(bp.dirtyPages[tid], pageKey) + } else { + bp.sharedPages[tid] = append(bp.sharedPages[tid], pageKey) } - defer bp.mutex.Unlock() - return page, nil + + return bp.transactionPages[tid][pageKey], nil } diff --git a/godb/buffer_pool_copy.go b/godb/buffer_pool_copy.go new file mode 100644 index 0000000..3a919a5 --- /dev/null +++ b/godb/buffer_pool_copy.go @@ -0,0 +1,291 @@ +package godb + +/** + +//BufferPool provides methods to cache pages that have been read from disk. +//It has a fixed capacity to limit the total amount of memory used by GoDB. +//It is also the primary way in which transactions are enforced, by using page +//level locking (you will not need to worry about this until lab3). + +import ( + "fmt" + "sync" + "time" +) + +// Permissions used to when reading / locking pages +type RWPerm int + +const ( + ReadPerm RWPerm = iota + WritePerm RWPerm = iota +) + +type BufferPool struct { + // TODO: some code goes here + pages map[any]Page + numPages int + currPage int + mutex sync.Mutex + pageMutexes map[any]*pageLock + dirtyPages map[TransactionID][]any + sharedPages map[TransactionID][]any + dependencyGraph map[TransactionID][]TransactionID + runningTransactions []TransactionID +} + +type pageLock struct { + sharedLocks int + exclusiveLock TransactionID + mutex sync.Mutex + permittedTids []TransactionID +} + +// Create a new BufferPool with the specified number of pages +func NewBufferPool(numPages int) (*BufferPool, error) { + return &BufferPool{ + pages: make(map[any]Page), + numPages: numPages, + currPage: 0, + mutex: sync.Mutex{}, + pageMutexes: make(map[any]*pageLock), + dirtyPages: make(map[TransactionID][]any), + sharedPages: make(map[TransactionID][]any), + dependencyGraph: make(map[TransactionID][]TransactionID), + runningTransactions: []TransactionID{}, + }, nil +} + +// helper function to check if transaction id is contained in list of transaction ids +func contains(list []TransactionID, value TransactionID) bool { + for _, v := range list { + if v == value { + return true + } + } + return false +} + +// helper function to remove transaction id from list of transaction ids +func remove(list []TransactionID, value TransactionID) []TransactionID { + for i, v := range list { + if v == value { + return append(list[:i], list[i+1:]...) + } + } + return list +} + +// helper function to detect cycle in dependency graph including tid using BFS +func (bp *BufferPool) cycle(tid TransactionID) bool { + seen := make(map[TransactionID]bool) + var queue []TransactionID + queue = append(queue, tid) + for len(queue) != 0 { + curr := queue[0] + queue = queue[1:] + if seen[curr] { + return true + } + seen[curr] = true + queue = append(queue, bp.dependencyGraph[tid]...) + } + return false +} + +// Testing method -- iterate through all pages in the buffer pool +// and flush them using [DBFile.flushPage]. Does not need to be thread/transaction safe. +// Mark pages as not dirty after flushing them. +func (bp *BufferPool) FlushAllPages() { + // TODO: some code goes here + for _, page := range bp.pages { + dbfile := page.getFile() + dbfile.(*HeapFile).flushPage(page) + page.setDirty(0, false) + } + bp.pages = make(map[any]Page) + bp.currPage = 0 +} + +// Abort the transaction, releasing locks. Because GoDB is FORCE/NO STEAL, none +// of the pages tid has dirtied will be on disk so it is sufficient to just +// release locks to abort. You do not need to implement this for lab 1. +func (bp *BufferPool) AbortTransaction(tid TransactionID) { + // TODO: some code goes here + bp.mutex.Lock() + defer bp.mutex.Unlock() + for _, dirtyKey := range bp.dirtyPages[tid] { + bp.pageMutexes[dirtyKey].exclusiveLock = 0 + bp.pageMutexes[dirtyKey].permittedTids = remove(bp.pageMutexes[dirtyKey].permittedTids, tid) + delete(bp.pages, dirtyKey) + } + for _, sharedKey := range bp.sharedPages[tid] { + bp.pageMutexes[sharedKey].sharedLocks -= 1 + bp.pageMutexes[sharedKey].permittedTids = remove(bp.pageMutexes[sharedKey].permittedTids, tid) + } + bp.sharedPages[tid] = []any{} + bp.dirtyPages[tid] = []any{} + delete(bp.dependencyGraph, tid) + remove(bp.runningTransactions, tid) +} + +// Commit the transaction, releasing locks. Because GoDB is FORCE/NO STEAL, none +// of the pages tid has dirtied will be on disk, so prior to releasing locks you +// should iterate through pages and write them to disk. In GoDB lab3 we assume +// that the system will not crash while doing this, allowing us to avoid using a +// WAL. You do not need to implement this for lab 1. +func (bp *BufferPool) CommitTransaction(tid TransactionID) { + // TODO: some code goes here + bp.mutex.Lock() + defer bp.mutex.Unlock() + for pageKey, page := range bp.pages { + for _, dirtyKey := range bp.dirtyPages[tid] { + if dirtyKey == pageKey { + dbfile := page.getFile() + dbfile.(*HeapFile).flushPage(page) + page.setDirty(0, false) + } + } + } + for _, dirtyKey := range bp.dirtyPages[tid] { + bp.pageMutexes[dirtyKey].exclusiveLock = 0 + bp.pageMutexes[dirtyKey].permittedTids = remove(bp.pageMutexes[dirtyKey].permittedTids, tid) + } + for _, sharedKey := range bp.sharedPages[tid] { + bp.pageMutexes[sharedKey].sharedLocks -= 1 + bp.pageMutexes[sharedKey].permittedTids = remove(bp.pageMutexes[sharedKey].permittedTids, tid) + } + bp.sharedPages[tid] = []any{} + bp.dirtyPages[tid] = []any{} + remove(bp.runningTransactions, tid) +} + +// Begin a new transaction. You do not need to implement this for lab 1. +// +// Returns an error if the transaction is already running. +func (bp *BufferPool) BeginTransaction(tid TransactionID) error { + // TODO: some code goes here + for _, runningTid := range bp.runningTransactions { + if runningTid == tid { + return fmt.Errorf("transaction is already running") + } + } + return nil +} + +// Retrieve the specified page from the specified DBFile (e.g., a HeapFile), on +// behalf of the specified transaction. If a page is not cached in the buffer pool, +// you can read it from disk uing [DBFile.readPage]. If the buffer pool is full (i.e., +// already stores numPages pages), a page should be evicted. Should not evict +// pages that are dirty, as this would violate NO STEAL. If the buffer pool is +// full of dirty pages, you should return an error. Before returning the page, +// attempt to lock it with the specified permission. If the lock is +// unavailable, should block until the lock is free. If a deadlock occurs, abort +// one of the transactions in the deadlock. For lab 1, you do not need to +// implement locking or deadlock detection. You will likely want to store a list +// of pages in the BufferPool in a map keyed by the [DBFile.pageKey]. +func (bp *BufferPool) GetPage(file DBFile, pageNo int, tid TransactionID, perm RWPerm) (Page, error) { + bp.mutex.Lock() + + pageKey := file.pageKey(pageNo) + + _, exists := bp.pageMutexes[pageKey] + if !exists { + bp.pageMutexes[pageKey] = &pageLock{ + sharedLocks: 0, + exclusiveLock: 0, + mutex: sync.Mutex{}, + permittedTids: []TransactionID{}, + } + } + + _, inBuffer := bp.pages[pageKey] + + if inBuffer { + pageMutex, ok := bp.pageMutexes[pageKey] + if !ok { + return nil, fmt.Errorf("page mutex not found") + } + if perm == WritePerm { + for _, dependentTid := range pageMutex.permittedTids { + if dependentTid != tid { + bp.dependencyGraph[tid] = append(bp.dependencyGraph[tid], dependentTid) + } + } + if bp.cycle(tid) { + bp.mutex.Unlock() + bp.AbortTransaction(tid) + return nil, fmt.Errorf("aborted transaction due to deadlock") + } + } + bp.mutex.Unlock() + for { + bp.mutex.Lock() + if perm == ReadPerm && (pageMutex.exclusiveLock == 0 || contains(pageMutex.permittedTids, tid)) { + pageMutex.sharedLocks += 1 + pageMutex.permittedTids = append(pageMutex.permittedTids, tid) + bp.sharedPages[tid] = append(bp.sharedPages[tid], pageKey) + break + } + if perm == WritePerm && ((pageMutex.exclusiveLock == 0 && pageMutex.sharedLocks == 0) || contains(pageMutex.permittedTids, tid)) { + pageMutex.exclusiveLock = 1 + pageMutex.permittedTids = append(pageMutex.permittedTids, tid) + bp.dirtyPages[tid] = append(bp.dirtyPages[tid], pageKey) + break + } + bp.mutex.Unlock() + time.Sleep(1000) + } + page := bp.pages[pageKey] + bp.mutex.Unlock() + return page, nil + } + + numDirty := 0 + for _, page := range bp.pages { + if page.isDirty() { + numDirty += 1 + } + } + if numDirty == bp.numPages { + bp.mutex.Unlock() + return nil, fmt.Errorf("buffer is full of dirty pages") + } + + page, err := file.readPage(pageNo) + if err != nil { + bp.mutex.Unlock() + return nil, fmt.Errorf("could not read page") + } + if len(bp.pages) == bp.numPages { + for _, page := range bp.pages { + if !page.isDirty() { + delete(bp.pages, page.getFile().pageKey(page.(*heapPage).PageNo)) + break + } + } + } + bp.pages[pageKey] = page + bp.currPage++ + bp.pageMutexes[pageKey] = &pageLock{ + sharedLocks: 0, + exclusiveLock: 0, + mutex: sync.Mutex{}, + permittedTids: []TransactionID{}, + } + pageMutex := bp.pageMutexes[pageKey] + if perm == ReadPerm { + pageMutex.sharedLocks += 1 + pageMutex.permittedTids = append(pageMutex.permittedTids, tid) + bp.sharedPages[tid] = append(bp.sharedPages[tid], pageKey) + } + if perm == WritePerm { + pageMutex.exclusiveLock = 1 + pageMutex.permittedTids = append(pageMutex.permittedTids, tid) + bp.dirtyPages[tid] = append(bp.dirtyPages[tid], pageKey) + } + defer bp.mutex.Unlock() + return page, nil +} + +*/ diff --git a/godb/heap_file.go b/godb/heap_file.go index d7d7a42..cdd0c89 100644 --- a/godb/heap_file.go +++ b/godb/heap_file.go @@ -48,6 +48,7 @@ func NewHeapFile(fromFile string, td *TupleDesc, bp *BufferPool) (*HeapFile, err backingFile: fromFile, Desc: *td, numPages: int(fileSize) / PageSize, + mutex: sync.Mutex{}, }, nil } @@ -73,6 +74,7 @@ func (f *HeapFile) NumPages() int { func (f *HeapFile) LoadFromCSV(file *os.File, hasHeader bool, sep string, skipLastField bool) error { scanner := bufio.NewScanner(file) cnt := 0 + tid := TransactionID(0) for scanner.Scan() { line := scanner.Text() fields := strings.Split(line, sep) @@ -110,7 +112,7 @@ func (f *HeapFile) LoadFromCSV(file *os.File, hasHeader bool, sep string, skipLa } } newT := Tuple{*f.Descriptor(), newFields, nil} - tid := NewTID() + // tid := NewTid() bp := f.bufPool f.insertTuple(&newT, tid) @@ -119,6 +121,11 @@ func (f *HeapFile) LoadFromCSV(file *os.File, hasHeader bool, sep string, skipLa bp.FlushAllPages() } + delete(f.bufPool.transactionPages, tid) + delete(f.bufPool.concurrentAccessRecord, tid) + delete(f.bufPool.dirtyPages, tid) + delete(f.bufPool.sharedPages, tid) + delete(f.bufPool.runningTransactions, tid) return nil } @@ -217,15 +224,20 @@ func (f *HeapFile) insertTuple(t *Tuple, tid TransactionID) error { } } f.bufPool.pages[f.pageKey(newPageNo)] = newHeapPage - _, exists := f.bufPool.pageMutexes[f.pageKey(newPageNo)] - if !exists { - f.bufPool.pageMutexes[f.pageKey(newPageNo)] = &pageLock{ - sharedLocks: 0, - exclusiveLock: 1, - mutex: sync.Mutex{}, - permittedTids: []TransactionID{tid}, - } + // _, exists := f.bufPool.pageMutexes[f.pageKey(newPageNo)] + // if !exists { + // f.bufPool.pageMutexes[f.pageKey(newPageNo)] = &pageLock{ + // sharedLocks: 0, + // exclusiveLock: 1, + // mutex: sync.Mutex{}, + // permittedTids: []TransactionID{tid}, + // } + // } + // f.bufPool.dirtyPages[tid] = append(f.bufPool.dirtyPages[tid], f.pageKey(newPageNo)) + if f.bufPool.transactionPages[tid] == nil { + f.bufPool.transactionPages[tid] = make(map[any]Page) } + f.bufPool.transactionPages[tid][f.pageKey(newPageNo)] = newHeapPage f.bufPool.dirtyPages[tid] = append(f.bufPool.dirtyPages[tid], f.pageKey(newPageNo)) return nil } diff --git a/godb/validation_test.go b/godb/validation_test.go new file mode 100644 index 0000000..86c744e --- /dev/null +++ b/godb/validation_test.go @@ -0,0 +1,227 @@ +package godb + +import ( + "testing" +) + +/** +* Test to construct an invalidation situation. +* tid1 writes t1 to page; tid2 writes t2 to same page; tid1 tries to commit; tid2 tries to commit +* outcome: tid1 commits, tid2 aborts + */ +func TestInvalidateWriteWrite(t *testing.T) { + _, t2, t1, _, _, _ := makeTestVars(t) + bp, hf, tid1, tid2, t2 := transactionTestSetUp(t) + + pg, _ := bp.GetPage(hf, 2, tid1, WritePerm) + heapp := pg.(*heapPage) + heapp.insertTuple(&t1) + heapp.setDirty(tid1, true) + + pg2, _ := bp.GetPage(hf, 2, tid2, WritePerm) + heapp2 := pg2.(*heapPage) + heapp2.insertTuple(&t2) + heapp2.setDirty(tid2, true) + + bp.CommitTransaction(tid1) + bp.CommitTransaction(tid2) + + bp.FlushAllPages() + + pg, _ = bp.GetPage(hf, 2, tid1, WritePerm) + heapp = pg.(*heapPage) + iter := heapp.tupleIter() + + correctCommit := false + correctAbort := true + for tup, err := iter(); tup != nil || err != nil; tup, err = iter() { + if err != nil { + t.Fatalf("Iterator error") + } + if t1.equals(tup) { + correctCommit = correctCommit || true + } + if t2.equals(tup) { + correctAbort = false + } + } + + if !correctCommit || !correctAbort { + t.Errorf("Commit: %t, Abort: %t", correctCommit, correctAbort) + } +} + +/** +* Test to construct an invalidation situation. +* tid1 writes t1 to page; tid2 reads from same page + writes t2 to different page; tid1 tries to commit; tid2 tries to commit +* outcome: tid1 commits, tid2 aborts + */ +func TestInvalidateWriteRead(t *testing.T) { + _, t1, t2, _, _, _ := makeTestVars(t) + bp, hf, tid1, tid2, t1 := transactionTestSetUp(t) + + pg, _ := bp.GetPage(hf, 2, tid1, WritePerm) + heapp := pg.(*heapPage) + heapp.insertTuple(&t1) + heapp.setDirty(tid1, true) + + bp.GetPage(hf, 2, tid2, ReadPerm) + pg2, _ := bp.GetPage(hf, 1, tid2, WritePerm) + heapp2 := pg2.(*heapPage) + heapp2.insertTuple(&t2) + heapp2.setDirty(tid2, true) + + bp.CommitTransaction(tid1) + bp.CommitTransaction(tid2) + + bp.FlushAllPages() + + pg, _ = bp.GetPage(hf, 2, tid1, WritePerm) + heapp = pg.(*heapPage) + iter := heapp.tupleIter() + + correctCommit := false + for tup, err := iter(); tup != nil || err != nil; tup, err = iter() { + if err != nil { + t.Fatalf("Iterator error") + } + if t1.equals(tup) { + correctCommit = correctCommit || true + } + } + + pg, _ = bp.GetPage(hf, 1, tid1, WritePerm) + heapp = pg.(*heapPage) + iter = heapp.tupleIter() + + correctAbort := false + for tup, err := iter(); tup != nil || err != nil; tup, err = iter() { + if err != nil { + t.Fatalf("Iterator error") + } + if t2.equals(tup) { + correctAbort = false + } + } + + if !correctCommit { + t.Errorf("Commit: %t, Abort: %t", correctCommit, correctAbort) + } +} + +/** +* Test to construct a validation situation. +* tid1 reads from page + writes t1 to different page; tid2 writes t2 to same page that tid1 read from; tid1 tries to commit; tid2 tries to commit +* outcome: tid1 commits, tid2 commits + */ +func TestValidateReadWrite(t *testing.T) { + _, t1, t2, _, _, _ := makeTestVars(t) + bp, hf, tid1, tid2, t1 := transactionTestSetUp(t) + + bp.GetPage(hf, 2, tid1, ReadPerm) + pg, _ := bp.GetPage(hf, 1, tid1, WritePerm) + heapp := pg.(*heapPage) + heapp.insertTuple(&t1) + heapp.setDirty(tid1, true) + + pg2, _ := bp.GetPage(hf, 1, tid2, WritePerm) + heapp2 := pg2.(*heapPage) + heapp2.insertTuple(&t2) + heapp2.setDirty(tid2, true) + + bp.CommitTransaction(tid1) + bp.CommitTransaction(tid2) + + bp.FlushAllPages() + + pg, _ = bp.GetPage(hf, 1, tid1, WritePerm) + heapp = pg.(*heapPage) + iter := heapp.tupleIter() + + correctCommit1 := false + for tup, err := iter(); tup != nil || err != nil; tup, err = iter() { + if err != nil { + t.Fatalf("Iterator error") + } + if t1.equals(tup) { + correctCommit1 = correctCommit1 || true + } + } + + pg, _ = bp.GetPage(hf, 2, tid1, WritePerm) + heapp = pg.(*heapPage) + iter = heapp.tupleIter() + + correctCommit2 := false + for tup, err := iter(); tup != nil || err != nil; tup, err = iter() { + if err != nil { + t.Fatalf("Iterator error") + } + if t2.equals(tup) { + correctCommit2 = correctCommit2 || true + } + } + + if !correctCommit1 || !correctCommit2 { + t.Errorf("Commit 1: %t, Commit 2: %t", correctCommit1, correctCommit2) + } +} + +/** +* Test to construct a validation situation. +* tid1 reads from page + writes t1 to different page; tid2 reads from same page tid1 read from + writes t2 to different page than what tid1 read and wrote to; tid1 tries to commit; tid2 tries to commit +* outcome: tid1 commits, tid2 commits + */ +func TestValidateReadRead(t *testing.T) { + _, t1, t2, _, _, _ := makeTestVars(t) + bp, hf, tid1, tid2, t1 := transactionTestSetUp(t) + + bp.GetPage(hf, 2, tid1, WritePerm) + pg, _ := bp.GetPage(hf, 1, tid1, WritePerm) + heapp := pg.(*heapPage) + heapp.insertTuple(&t1) + heapp.setDirty(tid1, true) + + bp.GetPage(hf, 2, tid2, ReadPerm) + pg2, _ := bp.GetPage(hf, 0, tid2, WritePerm) + heapp2 := pg2.(*heapPage) + heapp2.insertTuple(&t2) + heapp2.setDirty(tid2, true) + + bp.CommitTransaction(tid1) + bp.CommitTransaction(tid2) + + bp.FlushAllPages() + + pg, _ = bp.GetPage(hf, 1, tid1, WritePerm) + heapp = pg.(*heapPage) + iter := heapp.tupleIter() + + correctCommit1 := false + for tup, err := iter(); tup != nil || err != nil; tup, err = iter() { + if err != nil { + t.Fatalf("Iterator error") + } + if t1.equals(tup) { + correctCommit1 = correctCommit1 || true + } + } + + pg, _ = bp.GetPage(hf, 0, tid1, WritePerm) + heapp = pg.(*heapPage) + iter = heapp.tupleIter() + + correctCommit2 := false + for tup, err := iter(); tup != nil || err != nil; tup, err = iter() { + if err != nil { + t.Fatalf("Iterator error") + } + if t2.equals(tup) { + correctCommit2 = correctCommit2 || true + } + } + + if !correctCommit1 || !correctCommit2 { + t.Errorf("Commit 1: %t, Commit 2: %t", correctCommit1, correctCommit2) + } +}