-
Notifications
You must be signed in to change notification settings - Fork 15
/
LogFile.java
604 lines (502 loc) · 21.4 KB
/
LogFile.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
package simpledb;
import java.io.*;
import java.util.*;
import java.lang.reflect.*;
/**
LogFile implements the recovery subsystem of SimpleDb. This class is
able to write different log records as needed, but it is the
responsibility of the caller to ensure that write ahead logging and
two-phase locking discipline are followed. <p>
<u> Locking note: </u>
<p>
Many of the methods here are synchronized (to prevent concurrent log
writes from happening); many of the methods in BufferPool are also
synchronized (for similar reasons.) Problem is that BufferPool writes
log records (on page flushed) and the log file flushes BufferPool
pages (on checkpoints and recovery.) This can lead to deadlock. For
that reason, any LogFile operation that needs to access the BufferPool
must not be declared synchronized and must begin with a block like:
<p>
<pre>
synchronized (Database.getBufferPool()) {
synchronized (this) {
..
}
}
</pre>
*/
/**
<p> The format of the log file is as follows:
<ul>
<li> The first long integer of the file represents the offset of the
last written checkpoint, or -1 if there are no checkpoints
<li> All additional data in the log consists of log records. Log
records are variable length.
<li> Each log record begins with an integer type and a long integer
transaction id.
<li> Each log record ends with a long integer file offset representing
the position in the log file where the record began.
<li> There are five record types: ABORT, COMMIT, UPDATE, BEGIN, and
CHECKPOINT
<li> ABORT, COMMIT, and BEGIN records contain no additional data
<li>UPDATE RECORDS consist of two entries, a before image and an
after image. These images are serialized Page objects, and can be
accessed with the LogFile.readPageData() and LogFile.writePageData()
methods. See LogFile.print() for an example.
<li> CHECKPOINT records consist of active transactions at the time
the checkpoint was taken and their first log record on disk. The format
of the record is an integer count of the number of transactions, as well
as a long integer transaction id and a long integer first record offset
for each active transaction.
</ul>
*/
public class LogFile {
final File logFile;
private RandomAccessFile raf;
Boolean recoveryUndecided; // no call to recover() and no append to log
static final int ABORT_RECORD = 1;
static final int COMMIT_RECORD = 2;
static final int UPDATE_RECORD = 3;
static final int BEGIN_RECORD = 4;
static final int CHECKPOINT_RECORD = 5;
static final long NO_CHECKPOINT_ID = -1;
final static int INT_SIZE = 4;
final static int LONG_SIZE = 8;
long currentOffset = -1;//protected by this
// int pageSize;
int totalRecords = 0; // for PatchTest //protected by this
HashMap<Long,Long> tidToFirstLogRecord = new HashMap<Long,Long>();
/** Constructor.
Initialize and back the log file with the specified file.
We're not sure yet whether the caller is creating a brand new DB,
in which case we should ignore the log file, or whether the caller
will eventually want to recover (after populating the Catalog).
So we make this decision lazily: if someone calls recover(), then
do it, while if someone starts adding log file entries, then first
throw out the initial log file contents.
@param f The log file's name
*/
public LogFile(File f) throws IOException {
this.logFile = f;
raf = new RandomAccessFile(f, "rw");
recoveryUndecided = true;
// install shutdown hook to force cleanup on close
// Runtime.getRuntime().addShutdownHook(new Thread() {
// public void run() { shutdown(); }
// });
//XXX WARNING -- there is nothing that verifies that the specified
// log file actually corresponds to the current catalog.
// This could cause problems since we log tableids, which may or
// may not match tableids in the current catalog.
}
// we're about to append a log record. if we weren't sure whether the
// DB wants to do recovery, we're sure now -- it didn't. So truncate
// the log.
void preAppend() throws IOException {
totalRecords++;
if(recoveryUndecided){
recoveryUndecided = false;
raf.seek(0);
raf.setLength(0);
raf.writeLong(NO_CHECKPOINT_ID);
raf.seek(raf.length());
currentOffset = raf.getFilePointer();
}
}
public synchronized int getTotalRecords() {
return totalRecords;
}
/** Write an abort record to the log for the specified tid, force
the log to disk, and perform a rollback
@param tid The aborting transaction.
*/
public void logAbort(TransactionId tid) throws IOException {
// must have buffer pool lock before proceeding, since this
// calls rollback
synchronized (Database.getBufferPool()) {
synchronized(this) {
preAppend();
//Debug.log("ABORT");
//should we verify that this is a live transaction?
// must do this here, since rollback only works for
// live transactions (needs tidToFirstLogRecord)
rollback(tid);
raf.writeInt(ABORT_RECORD);
raf.writeLong(tid.getId());
raf.writeLong(currentOffset);
currentOffset = raf.getFilePointer();
force();
tidToFirstLogRecord.remove(tid.getId());
}
}
}
/** Write a commit record to disk for the specified tid,
and force the log to disk.
@param tid The committing transaction.
*/
public synchronized void logCommit(TransactionId tid) throws IOException {
preAppend();
Debug.log("COMMIT " + tid.getId());
//should we verify that this is a live transaction?
raf.writeInt(COMMIT_RECORD);
raf.writeLong(tid.getId());
raf.writeLong(currentOffset);
currentOffset = raf.getFilePointer();
force();
tidToFirstLogRecord.remove(tid.getId());
}
/** Write an UPDATE record to disk for the specified tid and page
(with provided before and after images.)
@param tid The transaction performing the write
@param before The before image of the page
@param after The after image of the page
@see simpledb.Page#getBeforeImage
*/
public synchronized void logWrite(TransactionId tid, Page before,
Page after)
throws IOException {
Debug.log("WRITE, offset = " + raf.getFilePointer());
preAppend();
/* update record conists of
record type
transaction id
before page data (see writePageData)
after page data
start offset
*/
raf.writeInt(UPDATE_RECORD);
raf.writeLong(tid.getId());
writePageData(raf,before);
writePageData(raf,after);
raf.writeLong(currentOffset);
currentOffset = raf.getFilePointer();
Debug.log("WRITE OFFSET = " + currentOffset);
}
void writePageData(RandomAccessFile raf, Page p) throws IOException{
PageId pid = p.getId();
int pageInfo[] = pid.serialize();
//page data is:
// page class name
// id class name
// id class bytes
// id class data
// page class bytes
// page class data
String pageClassName = p.getClass().getName();
String idClassName = pid.getClass().getName();
raf.writeUTF(pageClassName);
raf.writeUTF(idClassName);
raf.writeInt(pageInfo.length);
for (int i = 0; i < pageInfo.length; i++) {
raf.writeInt(pageInfo[i]);
}
byte[] pageData = p.getPageData();
raf.writeInt(pageData.length);
raf.write(pageData);
// Debug.log ("WROTE PAGE DATA, CLASS = " + pageClassName + ", table = " + pid.getTableId() + ", page = " + pid.pageno());
}
Page readPageData(RandomAccessFile raf) throws IOException {
PageId pid;
Page newPage = null;
String pageClassName = raf.readUTF();
String idClassName = raf.readUTF();
try {
Class<?> idClass = Class.forName(idClassName);
Class<?> pageClass = Class.forName(pageClassName);
Constructor<?>[] idConsts = idClass.getDeclaredConstructors();
int numIdArgs = raf.readInt();
Object idArgs[] = new Object[numIdArgs];
for (int i = 0; i<numIdArgs;i++) {
idArgs[i] = new Integer(raf.readInt());
}
pid = (PageId)idConsts[0].newInstance(idArgs);
Constructor<?>[] pageConsts = pageClass.getDeclaredConstructors();
int pageSize = raf.readInt();
byte[] pageData = new byte[pageSize];
raf.read(pageData); //read before image
Object[] pageArgs = new Object[2];
pageArgs[0] = pid;
pageArgs[1] = pageData;
newPage = (Page)pageConsts[0].newInstance(pageArgs);
// Debug.log("READ PAGE OF TYPE " + pageClassName + ", table = " + newPage.getId().getTableId() + ", page = " + newPage.getId().pageno());
} catch (ClassNotFoundException e){
e.printStackTrace();
throw new IOException();
} catch (InstantiationException e) {
e.printStackTrace();
throw new IOException();
} catch (IllegalAccessException e) {
e.printStackTrace();
throw new IOException();
} catch (InvocationTargetException e) {
e.printStackTrace();
throw new IOException();
}
return newPage;
}
/** Write a BEGIN record for the specified transaction
@param tid The transaction that is beginning
*/
public synchronized void logXactionBegin(TransactionId tid)
throws IOException {
Debug.log("BEGIN");
if(tidToFirstLogRecord.get(tid.getId()) != null){
System.err.printf("logXactionBegin: already began this tid\n");
throw new IOException("double logXactionBegin()");
}
preAppend();
raf.writeInt(BEGIN_RECORD);
raf.writeLong(tid.getId());
raf.writeLong(currentOffset);
tidToFirstLogRecord.put(tid.getId(), currentOffset);
currentOffset = raf.getFilePointer();
Debug.log("BEGIN OFFSET = " + currentOffset);
}
/** Checkpoint the log and write a checkpoint record. */
public void logCheckpoint() throws IOException {
//make sure we have buffer pool lock before proceeding
synchronized (Database.getBufferPool()) {
synchronized (this) {
//Debug.log("CHECKPOINT, offset = " + raf.getFilePointer());
preAppend();
long startCpOffset, endCpOffset;
Set<Long> keys = tidToFirstLogRecord.keySet();
Iterator<Long> els = keys.iterator();
force();
Database.getBufferPool().flushAllPages();
startCpOffset = raf.getFilePointer();
raf.writeInt(CHECKPOINT_RECORD);
raf.writeLong(-1); //no tid , but leave space for convenience
//write list of outstanding transactions
raf.writeInt(keys.size());
while (els.hasNext()) {
Long key = els.next();
//System.out.println("WRITING CHECKPOINT TRANSACTION ID: " + key);
Debug.log("WRITING CHECKPOINT TRANSACTION ID: " + key);
raf.writeLong(key);
//Debug.log("WRITING CHECKPOINT TRANSACTION OFFSET: " + tidToFirstLogRecord.get(key));
raf.writeLong(tidToFirstLogRecord.get(key));
}
//once the CP is written, make sure the CP location at the
// beginning of the log file is updated
endCpOffset = raf.getFilePointer();
raf.seek(0);
raf.writeLong(startCpOffset);
raf.seek(endCpOffset);
raf.writeLong(currentOffset);
currentOffset = raf.getFilePointer();
//Debug.log("CP OFFSET = " + currentOffset);
}
}
logTruncate();
}
/** Truncate any unneeded portion of the log to reduce its space
consumption */
public synchronized void logTruncate() throws IOException {
preAppend();
raf.seek(0);
long cpLoc = raf.readLong();
long minLogRecord = cpLoc;
if (cpLoc != -1L) {
raf.seek(cpLoc);
int cpType = raf.readInt();
@SuppressWarnings("unused")
long cpTid = raf.readLong();
if (cpType != CHECKPOINT_RECORD) {
throw new RuntimeException("Checkpoint pointer does not point to checkpoint record");
}
int numOutstanding = raf.readInt();
for (int i = 0; i < numOutstanding; i++) {
@SuppressWarnings("unused")
long tid = raf.readLong();
long firstLogRecord = raf.readLong();
if (firstLogRecord < minLogRecord) {
minLogRecord = firstLogRecord;
}
}
}
// we can truncate everything before minLogRecord
File newFile = new File("logtmp" + System.currentTimeMillis());
RandomAccessFile logNew = new RandomAccessFile(newFile, "rw");
logNew.seek(0);
logNew.writeLong((cpLoc - minLogRecord) + LONG_SIZE);
raf.seek(minLogRecord);
//have to rewrite log records since offsets are different after truncation
while (true) {
try {
int type = raf.readInt();
long record_tid = raf.readLong();
long newStart = logNew.getFilePointer();
Debug.log("NEW START = " + newStart);
logNew.writeInt(type);
logNew.writeLong(record_tid);
switch (type) {
case UPDATE_RECORD:
Page before = readPageData(raf);
Page after = readPageData(raf);
writePageData(logNew, before);
writePageData(logNew, after);
break;
case CHECKPOINT_RECORD:
int numXactions = raf.readInt();
logNew.writeInt(numXactions);
while (numXactions-- > 0) {
long xid = raf.readLong();
long xoffset = raf.readLong();
logNew.writeLong(xid);
logNew.writeLong((xoffset - minLogRecord) + LONG_SIZE);
}
break;
case BEGIN_RECORD:
tidToFirstLogRecord.put(record_tid,newStart);
break;
}
//all xactions finish with a pointer
logNew.writeLong(newStart);
raf.readLong();
} catch (EOFException e) {
break;
}
}
Debug.log("TRUNCATING LOG; WAS " + raf.length() + " BYTES ; NEW START : " + minLogRecord + " NEW LENGTH: " + (raf.length() - minLogRecord));
raf.close();
logFile.delete();
newFile.renameTo(logFile);
raf = new RandomAccessFile(logFile, "rw");
raf.seek(raf.length());
newFile.delete();
currentOffset = raf.getFilePointer();
//print();
}
/** Rollback the specified transaction, setting the state of any
of pages it updated to their pre-updated state. To preserve
transaction semantics, this should not be called on
transactions that have already committed (though this may not
be enforced by this method.)
@param tid The transaction to rollback
*/
public void rollback(TransactionId tid)
throws NoSuchElementException, IOException {
synchronized (Database.getBufferPool()) {
synchronized(this) {
preAppend();
// Done
/* Get offsets of all related UpdateRecords. */
Stack<Long> stack = new Stack<Long>();
raf.seek(tidToFirstLogRecord.get(tid.getId()));
while (raf.getFilePointer() < raf.length()) {
Long offset = raf.getFilePointer();
LogRecord record = LogRecord.readNext(raf);
if (null == record || ((record instanceof AbortRecord) &&
record.tid == tid.getId())) {
break;
} else if ((record instanceof UpdateRecord) &&
tid.getId() == record.tid) {
stack.push(offset);
}
}
/* Undo all related updates in reversed order. */
while (!stack.isEmpty()) {
raf.seek(stack.pop());
LogRecord record = LogRecord.readNext(raf);
Page page = ((UpdateRecord)record).beforeImage;
Database.getCatalog().getDatabaseFile
(page.getId().getTableId()).writePage(page);
Database.getBufferPool().discardPage(page.getId());
}
tidToFirstLogRecord.remove(tid);
}
}
}
/** Shutdown the logging system, writing out whatever state
is necessary so that start up can happen quickly (without
extensive recovery.)
*/
public synchronized void shutdown() {
try {
logCheckpoint(); //simple way to shutdown is to write a checkpoint record
raf.close();
} catch (IOException e) {
System.out.println("ERROR SHUTTING DOWN -- IGNORING.");
e.printStackTrace();
}
}
/** Recover the database system by ensuring that the updates of
committed transactions are installed and that the
updates of uncommitted transactions are not installed.
*/
public void recover() throws IOException {
synchronized (Database.getBufferPool()) {
synchronized (this) {
recoveryUndecided = false;
// Done
/* Phase 1. Analysis */
raf.seek(0);
currentOffset = raf.readLong();
if (currentOffset >= 0) {
raf.seek(currentOffset);
}
/* Phase 2. Redo */
Map<Long,Set<PageId>> pageIds = new HashMap<Long,Set<PageId>>();
Map<PageId,Page> pages = new HashMap<PageId,Page>();
while (raf.getFilePointer()+4 <= raf.length()) {
LogRecord record = LogRecord.readNext(raf);
if (null == record) {
break;
} else if (record instanceof CheckPointRecord) {
for (Long key : ((CheckPointRecord)record).keySet()) {
tidToFirstLogRecord.put(key, ((CheckPointRecord)record).getOffset(key));
if (!pageIds.containsKey(key)) {
pageIds.put(key, new HashSet<PageId>());
}
}
} else if (record instanceof BeginRecord) {
tidToFirstLogRecord.put(((BeginRecord)record).tid, (record).offset);
pageIds.put(((BeginRecord)record).tid, new HashSet<PageId>());
} else if (record instanceof CommitRecord) {
for (PageId pageId : pageIds.get(((CommitRecord)record).tid)) {
Page page = pages.get(pageId);
Database.getCatalog().getDatabaseFile(pageId.
getTableId()).writePage(page);
page.setBeforeImage();
}
tidToFirstLogRecord.remove(record.tid);
pageIds.remove(record.tid);
} else if (record instanceof UpdateRecord) {
Page page = ((UpdateRecord)record).afterImage;
pages.put(page.getId(), page);
pageIds.get(((UpdateRecord)record).tid).add(page.getId());
} else if (record instanceof AbortRecord) {
currentOffset = raf.getFilePointer();
rollback(TransactionId.of(record.tid));
pageIds.remove(record.tid);
raf.seek(currentOffset);
}
}
/* Phase 3. Undo */
currentOffset = raf.getFilePointer();
for (long tid : pageIds.keySet()) {
raf.seek(currentOffset);
raf.writeInt(ABORT_RECORD);
raf.writeLong(tid);
raf.writeLong(currentOffset);
force();
currentOffset = raf.getFilePointer();
rollback(TransactionId.of(tid));
}
}
}
}
/** Print out a human readable represenation of the log */
public void print() throws IOException {
// Done
raf.seek(0);
while (raf.getFilePointer() < raf.length()) {
System.out.println(LogRecord.readNext(raf));
currentOffset = raf.getFilePointer();
}
}
public synchronized void force() throws IOException {
raf.getChannel().force(true);
}
}